1 """Management for daemon processes
3 This module contains a utility to listen for management commands on a
4 socket, lending itself to managing daemon processes.
7 import os, sys, socket, threading, grp, select
8 import types, pprint, traceback
11 __all__ = ["listener", "unixlistener", "tcplistener", "listen"]
16 def __init__(self, cl):
18 self.mod = types.ModuleType("repl")
19 self.mod.echo = self.echo
20 self.printer = pprint.PrettyPrinter(indent = 4, depth = 6)
23 def sendlines(self, text):
24 for line in text.split("\n"):
25 self.cl.send(" " + line + "\n")
28 self.sendlines(self.printer.pformat(ob))
30 def command(self, cmd):
33 ccode = compile(cmd, "PDM Input", "eval")
35 ccode = compile(cmd, "PDM Input", "exec")
36 exec ccode in self.mod.__dict__
39 self.echo(eval(ccode, self.mod.__dict__))
42 for line in traceback.format_exception(*sys.exc_info()):
43 self.cl.send(" " + line)
44 self.cl.send("+EXC\n")
46 def handle(self, buf):
53 protocols["repl"] = repl
56 def __init__(self, cl):
61 self.lock = threading.Lock()
65 for id, recv in self.subscribed.iteritems():
67 if ob is None: continue
73 def send(self, *args):
76 buf = pickle.dumps(args)
77 buf = struct.pack(">l", len(buf)) + buf
82 def bindob(self, id, ob):
83 if not hasattr(ob, "pdm_protocols"):
84 raise ValueError("Object does not support PDM introspection")
86 proto = ob.pdm_protocols()
87 except Exception, exc:
88 raise ValueError("PDM introspection failed", exc)
89 self.odtab[id] = ob, proto
92 def bind(self, id, module, obnm):
93 resmod = sys.modules.get(module)
95 self.send("-", ImportError("No such module: %s" % module))
98 ob = getattr(resmod, obnm)
99 except AttributeError:
100 self.send("-", AttributeError("No such object: %s" % obnm))
103 proto = self.bindob(id, ob)
104 except Exception, exc:
107 self.send("+", proto)
109 def getob(self, id, proto):
110 ob = self.odtab.get(id)
112 self.send("-", ValueError("No such bound ID: %r" % id))
115 if proto not in protos:
116 self.send("-", ValueError("Object does not support that protocol"))
120 def lookup(self, tgtid, srcid, obnm):
121 src = self.getob(srcid, "dir")
125 ob = src.lookup(obnm)
126 except KeyError, exc:
130 proto = self.bindob(tgtid, ob)
131 except Exception, exc:
134 self.send("+", proto)
136 def unbind(self, id):
137 ob = self.odtab.get(id)
139 self.send("-", KeyError("No such name bound: %r" % id))
143 recv = self.subscribed.get(id)
146 del self.subscribed[id]
149 def listdir(self, id):
150 ob = self.getob(id, "dir")
153 self.send("+", ob.listdir())
155 def readattr(self, id):
156 ob = self.getob(id, "attr")
161 except Exception, exc:
162 self.send("-", Exception("Could not read attribute"))
166 def attrinfo(self, id):
167 ob = self.getob(id, "attr")
170 self.send("+", ob.attrinfo())
172 def invoke(self, id, method, args, kwargs):
173 ob = self.getob(id, "invoke")
177 self.send("+", ob.invoke(method, *args, **kwargs))
178 except Exception, exc:
181 def event(self, id, ob, ev):
182 self.send("*", id, ev)
184 def subscribe(self, id):
185 ob = self.getob(id, "event")
188 if id in self.subscribed:
189 self.send("-", ValueError("Already subscribed"))
191 self.event(id, ob, ev)
193 self.subscribed[id] = recv
196 def unsubscribe(self, id):
197 ob = self.getob(id, "event")
200 recv = self.subscribed.get(id)
202 self.send("-", ValueError("Not subscribed"))
204 del self.subscribed[id]
207 def command(self, data):
211 elif cmd == "unbind":
212 self.unbind(*data[1:])
213 elif cmd == "lookup":
214 self.lookup(*data[1:])
216 self.listdir(*data[1:])
217 elif cmd == "readattr":
218 self.readattr(*data[1:])
219 elif cmd == "attrinfo":
220 self.attrinfo(*data[1:])
221 elif cmd == "invoke":
222 self.invoke(*data[1:])
224 self.subscribe(*data[1:])
225 elif cmd == "unsubs":
226 self.unsubscribe(*data[1:])
228 self.send("-", Exception("Unknown command: %r" % (cmd,)))
230 def handle(self, buf):
233 dlen = struct.unpack(">l", buf[:4])[0]
234 if len(buf) < dlen + 4:
236 data = pickle.loads(buf[4:dlen + 4])
238 return buf[dlen + 4:]
240 protocols["perf"] = perf
242 class client(threading.Thread):
243 def __init__(self, sk):
244 super(client, self).__init__(name = "Management client")
249 def send(self, data):
250 return self.sk.send(data)
252 def choose(self, proto):
253 if proto in protocols:
254 self.handler = protocols[proto](self)
256 self.send("-ERR Unknown protocol: %s\n" % proto)
259 def handle(self, buf):
272 ret = self.sk.recv(1024)
278 nbuf = self.handler.handle(buf)
285 #for line in traceback.format_exception(*sys.exc_info()):
290 if hasattr(self.handler, "closed"):
291 self.handler.closed()
294 class listener(threading.Thread):
296 super(listener, self).__init__(name = "Management listener")
299 def listen(self, sk):
302 rfd, wfd, efd = select.select([sk], [], [sk], 1)
305 nsk, addr = sk.accept()
306 self.accept(nsk, addr)
312 def accept(self, sk, addr):
316 class unixlistener(listener):
317 def __init__(self, name, mode = 0600, group = None):
318 super(unixlistener, self).__init__()
324 sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
327 if os.path.exists(self.name) and os.path.stat.S_ISSOCK(os.stat(self.name).st_mode):
331 os.chmod(self.name, self.mode)
332 if self.group is not None:
333 os.chown(self.name, os.getuid(), grp.getgrnam(self.group).gr_gid)
341 class tcplistener(listener):
342 def __init__(self, port, bindaddr = "127.0.0.1"):
343 super(tcplistener, self).__init__()
345 self.bindaddr = bindaddr
348 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
350 sk.bind((self.bindaddr, self.port))
358 first = spec[:spec.index(":")]
359 last = spec[spec.rindex(":") + 1:]
364 parts = spec.split(":")
368 mode = int(parts[1], 0)
371 ret = unixlistener(parts[0], mode = mode, group = group)
377 port = int(spec[p + 1:])
378 ret = tcplistener(port, bindaddr = host)
381 raise ValueError("Unparsable listener specification: %r" % spec)