Made the finishevent's aborted parameter optional.
[pdm.git] / pdm / srv.py
1 """Python Daemon Management -- Server functions
2
3 This module implements the server part of the PDM protocols. The
4 primary object of interest herein is the listen() function, which is
5 the most generic way to create PDM listeners based on user
6 configuration, and the documentation for the repl and perf classes,
7 which describes the functioning of the REPL and PERF protocols.
8 """
9
10 import os, sys, socket, threading, grp, select
11 import types, pprint, traceback
12 import pickle, struct
13
14 __all__ = ["repl", "perf", "listener", "unixlistener", "tcplistener", "listen"]
15
16 protocols = {}
17
18 class repl(object):
19     """REPL protocol handler
20     
21     Provides a read-eval-print loop. The primary client-side interface
22     is the pdm.cli.replclient class. Clients can send arbitrary code,
23     which is compiled and run on its own thread in the server process,
24     and output responses that are echoed back to the client.
25
26     Each client is provided with its own module, in which the code
27     runs. The module is prepared with a function named `echo', which
28     takes a single object and pretty-prints it as part of the command
29     response. If a command can be parsed as an expression, the value
30     it evaluates to is automatically echoed to the client. If the
31     evalution of the command terminates with an exception, its
32     traceback is echoed to the client.
33
34     The REPL protocol is only intended for interactive usage. In order
35     to interact programmatically with the server process, see the PERF
36     protocol instead.
37     """
38     def __init__(self, cl):
39         self.cl = cl
40         self.mod = types.ModuleType("repl")
41         self.mod.echo = self.echo
42         self.printer = pprint.PrettyPrinter(indent = 4, depth = 6)
43         cl.send("+REPL\n")
44
45     def sendlines(self, text):
46         for line in text.split("\n"):
47             self.cl.send(" " + line + "\n")
48
49     def echo(self, ob):
50         self.sendlines(self.printer.pformat(ob))
51
52     def command(self, cmd):
53         try:
54             try:
55                 ccode = compile(cmd, "PDM Input", "eval")
56             except SyntaxError:
57                 ccode = compile(cmd, "PDM Input", "exec")
58                 exec ccode in self.mod.__dict__
59                 self.cl.send("+OK\n")
60             else:
61                 self.echo(eval(ccode, self.mod.__dict__))
62                 self.cl.send("+OK\n")
63         except:
64             for line in traceback.format_exception(*sys.exc_info()):
65                 self.cl.send(" " + line)
66             self.cl.send("+EXC\n")
67
68     def handle(self, buf):
69         p = buf.find("\n\n")
70         if p < 0:
71             return buf
72         cmd = buf[:p + 1]
73         self.command(cmd)
74         return buf[p + 2:]
75 protocols["repl"] = repl
76
77 class perf(object):
78     """PERF protocol handler
79     
80     The PERF protocol provides an interface for program interaction
81     with the server process. It allows limited remote interactions
82     with Python objects over a few defined interfaces.
83
84     All objects that wish to be available for interaction need to
85     implement a method named `pdm_protocols' which, when called with
86     no arguments, should return a list of strings, each indicating a
87     PERF interface that the object implements. For each such
88     interface, the object must implement additional methods as
89     described below.
90
91     A client can find PERF objects to interact with either by
92     specifying the name of such an object in an existing module, or by
93     using the `dir' interface, described below. Thus, to make a PERF
94     object available for clients, it needs only be bound to a global
95     variable in a module and implement the `pdm_protocols'
96     method. When requesting an object from a module, the module must
97     already be imported. PDM will not import new modules for clients;
98     rather, the daemon process needs to import all modules that
99     clients should be able to interact with. PDM itself always imports
100     the pdm.perf module, which contains a few basic PERF objects. See
101     its documentation for details.
102
103     The following interfaces are currently known to PERF.
104
105      * attr:
106        An object that implements the `attr' interface models an
107        attribute that can be read by clients. The attribute can be
108        anything, as long as its representation can be
109        pickled. Examples of attributes could be such things as the CPU
110        time consumed by the server process, or the number of active
111        connections to whatever clients the program serves. To
112        implement the `attr' interface, an object must implement
113        methods called `readattr' and `attrinfo'. `readattr' is called
114        with no arguments to read the current value of the attribute,
115        and `attrinfo' is called with no arguments to read a
116        description of the attribute. Both should be
117        idempotent. `readattr' can return any pickleable object, and
118        `attrinfo' should return either None to indicate that it has no
119        description, or an instance of the pdm.perf.attrinfo class.
120
121      * dir:
122        The `dir' interface models a directory of other PERF
123        objects. An object implementing it must implement methods
124        called `lookup' and `listdir'. `lookup' is called with a single
125        string argument that names an object, and should either return
126        another PERF object based on the name, or raise KeyError if it
127        does not recognize the name. `listdir' is called with no
128        arguments, and should return a list of known names that can be
129        used as argument to `lookup', but the list is not required to
130        be exhaustive and may also be empty.
131
132      * invoke:
133        The `invoke' interface allows a more arbitrary form of method
134        calls to objects implementing it. Such objects must implement a
135        method called `invoke', which is called with one positional
136        argument naming a method to be called (which it is free to
137        interpret however it wishes), and with any additional
138        positional and keyword arguments that the client wishes to pass
139        to it. Whatever `invoke' returns is pickled and sent back to
140        the client. In case the method name is not recognized, `invoke'
141        should raise an AttributeError.
142
143      * event:
144        The `event' interface allows PERF objects to notify clients of
145        events asynchronously. Objects implementing it must implement
146        methods called `subscribe' and `unsubscribe'. `subscribe' will
147        be called with a single argument, which is a callable of one
148        argument, which should be registered to be called when an event
149        pertaining to the `event' object in question occurs. The
150        `event' object should then call all such registered callables
151        with a single argument describing the event. The argument could
152        be any object that can be pickled, but should be an instance of
153        a subclass of the pdm.perf.event class. If `subscribe' is
154        called with a callback object that it has already registered,
155        it should raise a ValueError. `unsubscribe' is called with a
156        single argument, which is a previously registered callback
157        object, which should then be unregistered to that it is no
158        longer called when an event occurs. If the given callback
159        object is not, in fact, registered, a ValueError should be
160        raised.
161
162     The pdm.perf module contains a few convenience classes which
163     implements the interfaces, but PERF objects are not required to be
164     instances of them. Any object can implement a PERF interface, as
165     long as it does so as described above.
166
167     The pdm.cli.perfclient class is the client-side implementation.
168     """
169     def __init__(self, cl):
170         self.cl = cl
171         self.odtab = {}
172         cl.send("+PERF1\n")
173         self.buf = ""
174         self.lock = threading.Lock()
175         self.subscribed = {}
176
177     def closed(self):
178         for id, recv in self.subscribed.iteritems():
179             ob = self.odtab[id]
180             if ob is None: continue
181             ob, protos = ob
182             try:
183                 ob.unsubscribe(recv)
184             except: pass
185
186     def send(self, *args):
187         self.lock.acquire()
188         try:
189             buf = pickle.dumps(args)
190             buf = struct.pack(">l", len(buf)) + buf
191             self.cl.send(buf)
192         finally:
193             self.lock.release()
194
195     def bindob(self, id, ob):
196         if not hasattr(ob, "pdm_protocols"):
197             raise ValueError("Object does not support PDM introspection")
198         try:
199             proto = ob.pdm_protocols()
200         except Exception, exc:
201             raise ValueError("PDM introspection failed", exc)
202         self.odtab[id] = ob, proto
203         return proto
204
205     def bind(self, id, module, obnm):
206         resmod = sys.modules.get(module)
207         if resmod is None:
208             self.send("-", ImportError("No such module: %s" % module))
209             return
210         try:
211             ob = getattr(resmod, obnm)
212         except AttributeError:
213             self.send("-", AttributeError("No such object: %s" % obnm))
214             return
215         try:
216             proto = self.bindob(id, ob)
217         except Exception, exc:
218             self.send("-", exc)
219             return
220         self.send("+", proto)
221
222     def getob(self, id, proto):
223         ob = self.odtab.get(id)
224         if ob is None:
225             self.send("-", ValueError("No such bound ID: %r" % id))
226             return None
227         ob, protos = ob
228         if proto not in protos:
229             self.send("-", ValueError("Object does not support that protocol"))
230             return None
231         return ob
232
233     def lookup(self, tgtid, srcid, obnm):
234         src = self.getob(srcid, "dir")
235         if src is None:
236             return
237         try:
238             ob = src.lookup(obnm)
239         except KeyError, exc:
240             self.send("-", exc)
241             return
242         try:
243             proto = self.bindob(tgtid, ob)
244         except Exception, exc:
245             self.send("-", exc)
246             return
247         self.send("+", proto)
248
249     def unbind(self, id):
250         ob = self.odtab.get(id)
251         if ob is None:
252             self.send("-", KeyError("No such name bound: %r" % id))
253             return
254         ob, protos = ob
255         del self.odtab[id]
256         recv = self.subscribed.get(id)
257         if recv is not None:
258             ob.unsubscribe(recv)
259             del self.subscribed[id]
260         self.send("+")
261
262     def listdir(self, id):
263         ob = self.getob(id, "dir")
264         if ob is None:
265             return
266         self.send("+", ob.listdir())
267
268     def readattr(self, id):
269         ob = self.getob(id, "attr")
270         if ob is None:
271             return
272         try:
273             ret = ob.readattr()
274         except Exception, exc:
275             self.send("-", Exception("Could not read attribute"))
276             return
277         self.send("+", ret)
278
279     def attrinfo(self, id):
280         ob = self.getob(id, "attr")
281         if ob is None:
282             return
283         self.send("+", ob.attrinfo())
284
285     def invoke(self, id, method, args, kwargs):
286         ob = self.getob(id, "invoke")
287         if ob is None:
288             return
289         try:
290             self.send("+", ob.invoke(method, *args, **kwargs))
291         except Exception, exc:
292             self.send("-", exc)
293
294     def event(self, id, ob, ev):
295         self.send("*", id, ev)
296
297     def subscribe(self, id):
298         ob = self.getob(id, "event")
299         if ob is None:
300             return
301         if id in self.subscribed:
302             self.send("-", ValueError("Already subscribed"))
303         def recv(ev):
304             self.event(id, ob, ev)
305         ob.subscribe(recv)
306         self.subscribed[id] = recv
307         self.send("+")
308
309     def unsubscribe(self, id):
310         ob = self.getob(id, "event")
311         if ob is None:
312             return
313         recv = self.subscribed.get(id)
314         if recv is None:
315             self.send("-", ValueError("Not subscribed"))
316         ob.unsubscribe(recv)
317         del self.subscribed[id]
318         self.send("+")
319
320     def command(self, data):
321         cmd = data[0]
322         if cmd == "bind":
323             self.bind(*data[1:])
324         elif cmd == "unbind":
325             self.unbind(*data[1:])
326         elif cmd == "lookup":
327             self.lookup(*data[1:])
328         elif cmd == "ls":
329             self.listdir(*data[1:])
330         elif cmd == "readattr":
331             self.readattr(*data[1:])
332         elif cmd == "attrinfo":
333             self.attrinfo(*data[1:])
334         elif cmd == "invoke":
335             self.invoke(*data[1:])
336         elif cmd == "subs":
337             self.subscribe(*data[1:])
338         elif cmd == "unsubs":
339             self.unsubscribe(*data[1:])
340         else:
341             self.send("-", Exception("Unknown command: %r" % (cmd,)))
342
343     def handle(self, buf):
344         if len(buf) < 4:
345             return buf
346         dlen = struct.unpack(">l", buf[:4])[0]
347         if len(buf) < dlen + 4:
348             return buf
349         data = pickle.loads(buf[4:dlen + 4])
350         self.command(data)
351         return buf[dlen + 4:]
352         
353 protocols["perf"] = perf
354
355 class client(threading.Thread):
356     def __init__(self, sk):
357         super(client, self).__init__(name = "Management client")
358         self.setDaemon(True)
359         self.sk = sk
360         self.handler = self
361
362     def send(self, data):
363         return self.sk.send(data)
364
365     def choose(self, proto):
366         if proto in protocols:
367             self.handler = protocols[proto](self)
368         else:
369             self.send("-ERR Unknown protocol: %s\n" % proto)
370             raise Exception()
371
372     def handle(self, buf):
373         p = buf.find("\n")
374         if p >= 0:
375             proto = buf[:p]
376             buf = buf[p + 1:]
377             self.choose(proto)
378         return buf
379
380     def run(self):
381         try:
382             buf = ""
383             self.send("+PDM1\n")
384             while True:
385                 ret = self.sk.recv(1024)
386                 if ret == "":
387                     return
388                 buf += ret
389                 while True:
390                     try:
391                         nbuf = self.handler.handle(buf)
392                     except:
393                         return
394                     if nbuf == buf:
395                         break
396                     buf = nbuf
397         finally:
398             #for line in traceback.format_exception(*sys.exc_info()):
399             #    print line
400             try:
401                 self.sk.close()
402             finally:
403                 if hasattr(self.handler, "closed"):
404                     self.handler.closed()
405             
406
407 class listener(threading.Thread):
408     """PDM listener
409
410     This subclass of a thread listens to PDM connections and handles
411     client connections properly. It is intended to be subclassed by
412     providers of specific domains, such as unixlistener and
413     tcplistener.
414     """
415     def __init__(self):
416         super(listener, self).__init__(name = "Management listener")
417         self.setDaemon(True)
418
419     def listen(self, sk):
420         """Listen for and accept connections."""
421         self.running = True
422         while self.running:
423             rfd, wfd, efd = select.select([sk], [], [sk], 1)
424             for fd in rfd:
425                 if fd == sk:
426                     nsk, addr = sk.accept()
427                     self.accept(nsk, addr)
428
429     def stop(self):
430         """Stop listening for client connections
431
432         Tells the listener thread to stop listening, and then waits
433         for it to terminate.
434         """
435         self.running = False
436         self.join()
437
438     def accept(self, sk, addr):
439         cl = client(sk)
440         cl.start()
441
442 class unixlistener(listener):
443     """Unix socket listener"""
444     def __init__(self, name, mode = 0600, group = None):
445         """Create a listener that will bind to the Unix socket named
446         by `name'. The socket will not actually be bound until the
447         listener is started. The socket will be chmodded to `mode',
448         and if `group' is given, the named group will be set as the
449         owner of the socket.
450         """
451         super(unixlistener, self).__init__()
452         self.name = name
453         self.mode = mode
454         self.group = group
455
456     def run(self):
457         sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
458         ul = False
459         try:
460             if os.path.exists(self.name) and os.path.stat.S_ISSOCK(os.stat(self.name).st_mode):
461                 os.unlink(self.name)
462             sk.bind(self.name)
463             ul = True
464             os.chmod(self.name, self.mode)
465             if self.group is not None:
466                 os.chown(self.name, os.getuid(), grp.getgrnam(self.group).gr_gid)
467             sk.listen(16)
468             self.listen(sk)
469         finally:
470             sk.close()
471             if ul:
472                 os.unlink(self.name)
473
474 class tcplistener(listener):
475     """TCP socket listener"""
476     def __init__(self, port, bindaddr = "127.0.0.1"):
477         """Create a listener that will bind to the given TCP port, and
478         the given local interface. The socket will not actually be
479         bound until the listener is started.
480         """
481         super(tcplistener, self).__init__()
482         self.port = port
483         self.bindaddr = bindaddr
484
485     def run(self):
486         sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
487         try:
488             sk.bind((self.bindaddr, self.port))
489             sk.listen(16)
490             self.listen(sk)
491         finally:
492             sk.close()
493
494 def listen(spec):
495     """Create and start a listener according to a string
496     specification. The string specifications can easily be passed from
497     command-line options, user configuration or the like. Currently,
498     the two following specification formats are recognized:
499
500     PATH[:MODE[:GROUP]] -- PATH must contain at least one slash. A
501     Unix socket listener will be created listening to that path, and
502     the socket will be chmodded to MODE and owned by GROUP. If MODE is
503     not given, it defaults to 0600, and if GROUP is not given, the
504     process' default group is used.
505
506     ADDRESS:PORT -- PORT must be entirely numeric. A TCP socket
507     listener will be created listening to that port, bound to the
508     given local interface address. Since PDM has no authentication
509     support, ADDRESS should probably be localhost.
510     """
511     if ":" in spec:
512         first = spec[:spec.index(":")]
513         last = spec[spec.rindex(":") + 1:]
514     else:
515         first = spec
516         last = spec
517     if "/" in first:
518         parts = spec.split(":")
519         mode = 0600
520         group = None
521         if len(parts) > 1:
522             mode = int(parts[1], 8)
523         if len(parts) > 2:
524             group = parts[2]
525         ret = unixlistener(parts[0], mode = mode, group = group)
526         ret.start()
527         return ret
528     if last.isdigit():
529         p = spec.rindex(":")
530         host = spec[:p]
531         port = int(spec[p + 1:])
532         ret = tcplistener(port, bindaddr = host)
533         ret.start()
534         return ret
535     raise ValueError("Unparsable listener specification: %r" % spec)
536
537 import pdm.perf