1 """Management for daemon processes
3 This module contains a utility to listen for management commands on a
4 socket, lending itself to managing daemon processes.
7 import os, sys, socket, threading, grp, select
8 import types, pprint, traceback
11 __all__ = ["listener", "unixlistener", "tcplistener", "listen"]
16 def __init__(self, cl):
18 self.mod = types.ModuleType("repl")
19 self.mod.echo = self.echo
20 self.printer = pprint.PrettyPrinter(indent = 4, depth = 6)
23 def sendlines(self, text):
24 for line in text.split("\n"):
25 self.cl.send(b" " + line.encode("utf-8") + b"\n")
28 self.sendlines(self.printer.pformat(ob))
30 def command(self, cmd):
31 cmd = cmd.decode("utf-8")
34 ccode = compile(cmd, "PDM Input", "eval")
36 ccode = compile(cmd, "PDM Input", "exec")
37 exec(ccode, self.mod.__dict__)
38 self.cl.send(b"+OK\n")
40 self.echo(eval(ccode, self.mod.__dict__))
41 self.cl.send(b"+OK\n")
43 for line in traceback.format_exception(*sys.exc_info()):
44 self.cl.send(b" " + line.encode("utf-8"))
45 self.cl.send(b"+EXC\n")
47 def handle(self, buf):
54 protocols["repl"] = repl
57 def __init__(self, cl):
62 self.lock = threading.Lock()
66 for id, recv in self.subscribed.items():
68 if ob is None: continue
74 def send(self, *args):
77 buf = pickle.dumps(args)
78 buf = struct.pack(">l", len(buf)) + buf
83 def bindob(self, id, ob):
84 if not hasattr(ob, "pdm_protocols"):
85 raise ValueError("Object does not support PDM introspection")
87 proto = ob.pdm_protocols()
88 except Exception as exc:
89 raise ValueError("PDM introspection failed", exc)
90 self.odtab[id] = ob, proto
93 def bind(self, id, module, obnm):
94 resmod = sys.modules.get(module)
96 self.send("-", ImportError("No such module: %s" % module))
99 ob = getattr(resmod, obnm)
100 except AttributeError:
101 self.send("-", AttributeError("No such object: %s" % obnm))
104 proto = self.bindob(id, ob)
105 except Exception as exc:
108 self.send("+", proto)
110 def getob(self, id, proto):
111 ob = self.odtab.get(id)
113 self.send("-", ValueError("No such bound ID: %r" % id))
116 if proto not in protos:
117 self.send("-", ValueError("Object does not support that protocol"))
121 def lookup(self, tgtid, srcid, obnm):
122 src = self.getob(srcid, "dir")
126 ob = src.lookup(obnm)
127 except KeyError as exc:
131 proto = self.bindob(tgtid, ob)
132 except Exception as exc:
135 self.send("+", proto)
137 def unbind(self, id):
138 ob = self.odtab.get(id)
140 self.send("-", KeyError("No such name bound: %r" % id))
144 recv = self.subscribed.get(id)
147 del self.subscribed[id]
150 def listdir(self, id):
151 ob = self.getob(id, "dir")
154 self.send("+", ob.listdir())
156 def readattr(self, id):
157 ob = self.getob(id, "attr")
162 except Exception as exc:
163 self.send("-", Exception("Could not read attribute"))
167 def attrinfo(self, id):
168 ob = self.getob(id, "attr")
171 self.send("+", ob.attrinfo())
173 def invoke(self, id, method, args, kwargs):
174 ob = self.getob(id, "invoke")
178 self.send("+", ob.invoke(method, *args, **kwargs))
179 except Exception as exc:
182 def event(self, id, ob, ev):
183 self.send("*", id, ev)
185 def subscribe(self, id):
186 ob = self.getob(id, "event")
189 if id in self.subscribed:
190 self.send("-", ValueError("Already subscribed"))
192 self.event(id, ob, ev)
194 self.subscribed[id] = recv
197 def unsubscribe(self, id):
198 ob = self.getob(id, "event")
201 recv = self.subscribed.get(id)
203 self.send("-", ValueError("Not subscribed"))
205 del self.subscribed[id]
208 def command(self, data):
212 elif cmd == "unbind":
213 self.unbind(*data[1:])
214 elif cmd == "lookup":
215 self.lookup(*data[1:])
217 self.listdir(*data[1:])
218 elif cmd == "readattr":
219 self.readattr(*data[1:])
220 elif cmd == "attrinfo":
221 self.attrinfo(*data[1:])
222 elif cmd == "invoke":
223 self.invoke(*data[1:])
225 self.subscribe(*data[1:])
226 elif cmd == "unsubs":
227 self.unsubscribe(*data[1:])
229 self.send("-", Exception("Unknown command: %r" % (cmd,)))
231 def handle(self, buf):
234 dlen = struct.unpack(">l", buf[:4])[0]
235 if len(buf) < dlen + 4:
237 data = pickle.loads(buf[4:dlen + 4])
239 return buf[dlen + 4:]
241 protocols["perf"] = perf
243 class client(threading.Thread):
244 def __init__(self, sk):
245 super(client, self).__init__(name = "Management client")
250 def send(self, data):
251 return self.sk.send(data)
253 def choose(self, proto):
255 proto = proto.decode("ascii")
258 if proto in protocols:
259 self.handler = protocols[proto](self)
261 self.send("-ERR Unknown protocol: %s\n" % proto)
264 def handle(self, buf):
275 self.send(b"+PDM1\n")
277 ret = self.sk.recv(1024)
283 nbuf = self.handler.handle(buf)
285 #for line in traceback.format_exception(*sys.exc_info()):
295 if hasattr(self.handler, "closed"):
296 self.handler.closed()
299 class listener(threading.Thread):
301 super(listener, self).__init__(name = "Management listener")
304 def listen(self, sk):
307 rfd, wfd, efd = select.select([sk], [], [sk], 1)
310 nsk, addr = sk.accept()
311 self.accept(nsk, addr)
317 def accept(self, sk, addr):
321 class unixlistener(listener):
322 def __init__(self, name, mode = 0o600, group = None):
323 super(unixlistener, self).__init__()
329 sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
332 if os.path.exists(self.name) and os.path.stat.S_ISSOCK(os.stat(self.name).st_mode):
336 os.chmod(self.name, self.mode)
337 if self.group is not None:
338 os.chown(self.name, os.getuid(), grp.getgrnam(self.group).gr_gid)
346 class tcplistener(listener):
347 def __init__(self, port, bindaddr = "127.0.0.1"):
348 super(tcplistener, self).__init__()
350 self.bindaddr = bindaddr
353 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
355 sk.bind((self.bindaddr, self.port))
363 first = spec[:spec.index(":")]
364 last = spec[spec.rindex(":") + 1:]
369 parts = spec.split(":")
373 mode = int(parts[1], 8)
376 ret = unixlistener(parts[0], mode = mode, group = group)
382 port = int(spec[p + 1:])
383 ret = tcplistener(port, bindaddr = host)
386 raise ValueError("Unparsable listener specification: %r" % spec)