Improved process event structure.
[pdm.git] / pdm / srv.py
1 """Management for daemon processes
2
3 This module contains a utility to listen for management commands on a
4 socket, lending itself to managing daemon processes.
5 """
6
7 import os, sys, socket, threading, grp, select
8 import types, pprint, traceback
9 import pickle, struct
10
11 __all__ = ["listener", "unixlistener", "tcplistener", "listen"]
12
13 protocols = {}
14
15 class repl(object):
16     def __init__(self, cl):
17         self.cl = cl
18         self.mod = types.ModuleType("repl")
19         self.mod.echo = self.echo
20         self.printer = pprint.PrettyPrinter(indent = 4, depth = 6)
21         cl.send("+REPL\n")
22
23     def sendlines(self, text):
24         for line in text.split("\n"):
25             self.cl.send(" " + line + "\n")
26
27     def echo(self, ob):
28         self.sendlines(self.printer.pformat(ob))
29
30     def command(self, cmd):
31         try:
32             try:
33                 ccode = compile(cmd, "PDM Input", "eval")
34             except SyntaxError:
35                 ccode = compile(cmd, "PDM Input", "exec")
36                 exec ccode in self.mod.__dict__
37                 self.cl.send("+OK\n")
38             else:
39                 self.echo(eval(ccode, self.mod.__dict__))
40                 self.cl.send("+OK\n")
41         except:
42             for line in traceback.format_exception(*sys.exc_info()):
43                 self.cl.send(" " + line)
44             self.cl.send("+EXC\n")
45
46     def handle(self, buf):
47         p = buf.find("\n\n")
48         if p < 0:
49             return buf
50         cmd = buf[:p + 1]
51         self.command(cmd)
52         return buf[p + 2:]
53 protocols["repl"] = repl
54
55 class perf(object):
56     def __init__(self, cl):
57         self.cl = cl
58         self.odtab = {}
59         cl.send("+PERF1\n")
60         self.buf = ""
61         self.lock = threading.Lock()
62         self.subscribed = {}
63
64     def closed(self):
65         for id, recv in self.subscribed.iteritems():
66             ob = self.odtab[id]
67             if ob is None: continue
68             ob, protos = ob
69             try:
70                 ob.unsubscribe(recv)
71             except: pass
72
73     def send(self, *args):
74         self.lock.acquire()
75         try:
76             buf = pickle.dumps(args)
77             buf = struct.pack(">l", len(buf)) + buf
78             self.cl.send(buf)
79         finally:
80             self.lock.release()
81
82     def bindob(self, id, ob):
83         if not hasattr(ob, "pdm_protocols"):
84             raise ValueError("Object does not support PDM introspection")
85         try:
86             proto = ob.pdm_protocols()
87         except Exception, exc:
88             raise ValueError("PDM introspection failed", exc)
89         self.odtab[id] = ob, proto
90         return proto
91
92     def bind(self, id, module, obnm):
93         resmod = sys.modules.get(module)
94         if resmod is None:
95             self.send("-", ImportError("No such module: %s" % module))
96             return
97         try:
98             ob = getattr(resmod, obnm)
99         except AttributeError:
100             self.send("-", AttributeError("No such object: %s" % obnm))
101             return
102         try:
103             proto = self.bindob(id, ob)
104         except Exception, exc:
105             self.send("-", exc)
106             return
107         self.send("+", proto)
108
109     def getob(self, id, proto):
110         ob = self.odtab.get(id)
111         if ob is None:
112             self.send("-", ValueError("No such bound ID: %r" % id))
113             return None
114         ob, protos = ob
115         if proto not in protos:
116             self.send("-", ValueError("Object does not support that protocol"))
117             return None
118         return ob
119
120     def lookup(self, tgtid, srcid, obnm):
121         src = self.getob(srcid, "dir")
122         if src is None:
123             return
124         try:
125             ob = src.lookup(obnm)
126         except KeyError, exc:
127             self.send("-", exc)
128             return
129         try:
130             proto = self.bindob(tgtid, ob)
131         except Exception, exc:
132             self.send("-", exc)
133             return
134         self.send("+", proto)
135
136     def unbind(self, id):
137         ob = self.odtab.get(id)
138         if ob is None:
139             self.send("-", KeyError("No such name bound: %r" % id))
140             return
141         ob, protos = ob
142         del self.odtab[id]
143         recv = self.subscribed.get(id)
144         if recv is not None:
145             ob.unsubscribe(recv)
146             del self.subscribed[id]
147         self.send("+")
148
149     def listdir(self, id):
150         ob = self.getob(id, "dir")
151         if ob is None:
152             return
153         self.send("+", ob.listdir())
154
155     def readattr(self, id):
156         ob = self.getob(id, "attr")
157         if ob is None:
158             return
159         try:
160             ret = ob.readattr()
161         except Exception, exc:
162             self.send("-", Exception("Could not read attribute"))
163             return
164         self.send("+", ret)
165
166     def attrinfo(self, id):
167         ob = self.getob(id, "attr")
168         if ob is None:
169             return
170         self.send("+", ob.attrinfo())
171
172     def invoke(self, id, method, args, kwargs):
173         ob = self.getob(id, "invoke")
174         if ob is None:
175             return
176         try:
177             self.send("+", ob.invoke(method, *args, **kwargs))
178         except Exception, exc:
179             self.send("-", exc)
180
181     def event(self, id, ob, ev):
182         self.send("*", id, ev)
183
184     def subscribe(self, id):
185         ob = self.getob(id, "event")
186         if ob is None:
187             return
188         if id in self.subscribed:
189             self.send("-", ValueError("Already subscribed"))
190         def recv(ev):
191             self.event(id, ob, ev)
192         ob.subscribe(recv)
193         self.subscribed[id] = recv
194         self.send("+")
195
196     def unsubscribe(self, id):
197         ob = self.getob(id, "event")
198         if ob is None:
199             return
200         recv = self.subscribed.get(id)
201         if recv is None:
202             self.send("-", ValueError("Not subscribed"))
203         ob.unsubscribe(recv)
204         del self.subscribed[id]
205         self.send("+")
206
207     def command(self, data):
208         cmd = data[0]
209         if cmd == "bind":
210             self.bind(*data[1:])
211         elif cmd == "unbind":
212             self.unbind(*data[1:])
213         elif cmd == "lookup":
214             self.lookup(*data[1:])
215         elif cmd == "ls":
216             self.listdir(*data[1:])
217         elif cmd == "readattr":
218             self.readattr(*data[1:])
219         elif cmd == "attrinfo":
220             self.attrinfo(*data[1:])
221         elif cmd == "invoke":
222             self.invoke(*data[1:])
223         elif cmd == "subs":
224             self.subscribe(*data[1:])
225         elif cmd == "unsubs":
226             self.unsubscribe(*data[1:])
227         else:
228             self.send("-", Exception("Unknown command: %r" % (cmd,)))
229
230     def handle(self, buf):
231         if len(buf) < 4:
232             return buf
233         dlen = struct.unpack(">l", buf[:4])[0]
234         if len(buf) < dlen + 4:
235             return buf
236         data = pickle.loads(buf[4:dlen + 4])
237         self.command(data)
238         return buf[dlen + 4:]
239         
240 protocols["perf"] = perf
241
242 class client(threading.Thread):
243     def __init__(self, sk):
244         super(client, self).__init__(name = "Management client")
245         self.setDaemon(True)
246         self.sk = sk
247         self.handler = self
248
249     def send(self, data):
250         return self.sk.send(data)
251
252     def choose(self, proto):
253         if proto in protocols:
254             self.handler = protocols[proto](self)
255         else:
256             self.send("-ERR Unknown protocol: %s\n" % proto)
257             raise Exception()
258
259     def handle(self, buf):
260         p = buf.find("\n")
261         if p >= 0:
262             proto = buf[:p]
263             buf = buf[p + 1:]
264             self.choose(proto)
265         return buf
266
267     def run(self):
268         try:
269             buf = ""
270             self.send("+PDM1\n")
271             while True:
272                 ret = self.sk.recv(1024)
273                 if ret == "":
274                     return
275                 buf += ret
276                 while True:
277                     try:
278                         nbuf = self.handler.handle(buf)
279                     except:
280                         return
281                     if nbuf == buf:
282                         break
283                     buf = nbuf
284         finally:
285             #for line in traceback.format_exception(*sys.exc_info()):
286             #    print line
287             try:
288                 self.sk.close()
289             finally:
290                 if hasattr(self.handler, "closed"):
291                     self.handler.closed()
292             
293
294 class listener(threading.Thread):
295     def __init__(self):
296         super(listener, self).__init__(name = "Management listener")
297         self.setDaemon(True)
298
299     def listen(self, sk):
300         self.running = True
301         while self.running:
302             rfd, wfd, efd = select.select([sk], [], [sk], 1)
303             for fd in rfd:
304                 if fd == sk:
305                     nsk, addr = sk.accept()
306                     self.accept(nsk, addr)
307
308     def stop(self):
309         self.running = False
310         self.join()
311
312     def accept(self, sk, addr):
313         cl = client(sk)
314         cl.start()
315
316 class unixlistener(listener):
317     def __init__(self, name, mode = 0600, group = None):
318         super(unixlistener, self).__init__()
319         self.name = name
320         self.mode = mode
321         self.group = group
322
323     def run(self):
324         sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
325         ul = False
326         try:
327             if os.path.exists(self.name) and os.path.stat.S_ISSOCK(os.stat(self.name).st_mode):
328                 os.unlink(self.name)
329             sk.bind(self.name)
330             ul = True
331             os.chmod(self.name, self.mode)
332             if self.group is not None:
333                 os.chown(self.name, os.getuid(), grp.getgrnam(self.group).gr_gid)
334             sk.listen(16)
335             self.listen(sk)
336         finally:
337             sk.close()
338             if ul:
339                 os.unlink(self.name)
340
341 class tcplistener(listener):
342     def __init__(self, port, bindaddr = "127.0.0.1"):
343         super(tcplistener, self).__init__()
344         self.port = port
345         self.bindaddr = bindaddr
346
347     def run(self):
348         sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
349         try:
350             sk.bind((self.bindaddr, self.port))
351             sk.listen(16)
352             self.listen(sk)
353         finally:
354             sk.close()
355
356 def listen(spec):
357     if ":" in spec:
358         first = spec[:spec.index(":")]
359         last = spec[spec.rindex(":") + 1:]
360     else:
361         first = spec
362         last = spec
363     if "/" in first:
364         parts = spec.split(":")
365         mode = 0600
366         group = None
367         if len(parts) > 1:
368             mode = int(parts[1], 0)
369         if len(parts) > 2:
370             group = parts[2]
371         ret = unixlistener(parts[0], mode = mode, group = group)
372         ret.start()
373         return ret
374     if last.isdigit():
375         p = spec.rindex(":")
376         host = spec[:p]
377         port = int(spec[p + 1:])
378         ret = tcplistener(port, bindaddr = host)
379         ret.start()
380         return ret
381     raise ValueError("Unparsable listener specification: %r" % spec)
382
383 import pdm.perf