Improved process event structure.
[pdm.git] / pdm / srv.py
CommitLineData
7f97a47e
FT
1"""Management for daemon processes
2
3This module contains a utility to listen for management commands on a
4socket, lending itself to managing daemon processes.
5"""
6
7import os, sys, socket, threading, grp, select
8import types, pprint, traceback
9import pickle, struct
10
11__all__ = ["listener", "unixlistener", "tcplistener", "listen"]
12
13protocols = {}
14
15class repl(object):
16 def __init__(self, cl):
17 self.cl = cl
18 self.mod = types.ModuleType("repl")
19 self.mod.echo = self.echo
20 self.printer = pprint.PrettyPrinter(indent = 4, depth = 6)
21 cl.send("+REPL\n")
22
23 def sendlines(self, text):
24 for line in text.split("\n"):
25 self.cl.send(" " + line + "\n")
26
27 def echo(self, ob):
28 self.sendlines(self.printer.pformat(ob))
29
30 def command(self, cmd):
31 try:
32 try:
33 ccode = compile(cmd, "PDM Input", "eval")
34 except SyntaxError:
35 ccode = compile(cmd, "PDM Input", "exec")
36 exec ccode in self.mod.__dict__
37 self.cl.send("+OK\n")
38 else:
39 self.echo(eval(ccode, self.mod.__dict__))
40 self.cl.send("+OK\n")
41 except:
42 for line in traceback.format_exception(*sys.exc_info()):
43 self.cl.send(" " + line)
44 self.cl.send("+EXC\n")
45
46 def handle(self, buf):
47 p = buf.find("\n\n")
48 if p < 0:
49 return buf
50 cmd = buf[:p + 1]
51 self.command(cmd)
52 return buf[p + 2:]
53protocols["repl"] = repl
54
55class perf(object):
56 def __init__(self, cl):
57 self.cl = cl
58 self.odtab = {}
59 cl.send("+PERF1\n")
60 self.buf = ""
61 self.lock = threading.Lock()
62 self.subscribed = {}
63
64 def closed(self):
65 for id, recv in self.subscribed.iteritems():
66 ob = self.odtab[id]
67 if ob is None: continue
68 ob, protos = ob
69 try:
70 ob.unsubscribe(recv)
71 except: pass
72
73 def send(self, *args):
74 self.lock.acquire()
75 try:
76 buf = pickle.dumps(args)
77 buf = struct.pack(">l", len(buf)) + buf
78 self.cl.send(buf)
79 finally:
80 self.lock.release()
81
82 def bindob(self, id, ob):
83 if not hasattr(ob, "pdm_protocols"):
84 raise ValueError("Object does not support PDM introspection")
85 try:
86 proto = ob.pdm_protocols()
87 except Exception, exc:
88 raise ValueError("PDM introspection failed", exc)
89 self.odtab[id] = ob, proto
90 return proto
91
92 def bind(self, id, module, obnm):
93 resmod = sys.modules.get(module)
94 if resmod is None:
95 self.send("-", ImportError("No such module: %s" % module))
96 return
97 try:
98 ob = getattr(resmod, obnm)
99 except AttributeError:
100 self.send("-", AttributeError("No such object: %s" % obnm))
101 return
102 try:
103 proto = self.bindob(id, ob)
104 except Exception, exc:
105 self.send("-", exc)
106 return
107 self.send("+", proto)
108
109 def getob(self, id, proto):
110 ob = self.odtab.get(id)
111 if ob is None:
112 self.send("-", ValueError("No such bound ID: %r" % id))
113 return None
114 ob, protos = ob
115 if proto not in protos:
116 self.send("-", ValueError("Object does not support that protocol"))
117 return None
118 return ob
119
120 def lookup(self, tgtid, srcid, obnm):
121 src = self.getob(srcid, "dir")
122 if src is None:
123 return
124 try:
125 ob = src.lookup(obnm)
126 except KeyError, exc:
127 self.send("-", exc)
128 return
129 try:
130 proto = self.bindob(tgtid, ob)
131 except Exception, exc:
132 self.send("-", exc)
133 return
134 self.send("+", proto)
135
136 def unbind(self, id):
137 ob = self.odtab.get(id)
138 if ob is None:
139 self.send("-", KeyError("No such name bound: %r" % id))
140 return
141 ob, protos = ob
142 del self.odtab[id]
143 recv = self.subscribed.get(id)
144 if recv is not None:
145 ob.unsubscribe(recv)
146 del self.subscribed[id]
147 self.send("+")
148
149 def listdir(self, id):
150 ob = self.getob(id, "dir")
151 if ob is None:
152 return
153 self.send("+", ob.listdir())
154
155 def readattr(self, id):
156 ob = self.getob(id, "attr")
157 if ob is None:
158 return
159 try:
160 ret = ob.readattr()
161 except Exception, exc:
162 self.send("-", Exception("Could not read attribute"))
163 return
164 self.send("+", ret)
165
166 def attrinfo(self, id):
167 ob = self.getob(id, "attr")
168 if ob is None:
169 return
170 self.send("+", ob.attrinfo())
171
172 def invoke(self, id, method, args, kwargs):
173 ob = self.getob(id, "invoke")
174 if ob is None:
175 return
176 try:
177 self.send("+", ob.invoke(method, *args, **kwargs))
178 except Exception, exc:
179 self.send("-", exc)
180
181 def event(self, id, ob, ev):
182 self.send("*", id, ev)
183
184 def subscribe(self, id):
185 ob = self.getob(id, "event")
186 if ob is None:
187 return
188 if id in self.subscribed:
189 self.send("-", ValueError("Already subscribed"))
190 def recv(ev):
191 self.event(id, ob, ev)
192 ob.subscribe(recv)
193 self.subscribed[id] = recv
194 self.send("+")
195
196 def unsubscribe(self, id):
197 ob = self.getob(id, "event")
198 if ob is None:
199 return
200 recv = self.subscribed.get(id)
201 if recv is None:
202 self.send("-", ValueError("Not subscribed"))
203 ob.unsubscribe(recv)
204 del self.subscribed[id]
205 self.send("+")
206
207 def command(self, data):
208 cmd = data[0]
209 if cmd == "bind":
210 self.bind(*data[1:])
211 elif cmd == "unbind":
212 self.unbind(*data[1:])
213 elif cmd == "lookup":
214 self.lookup(*data[1:])
215 elif cmd == "ls":
216 self.listdir(*data[1:])
217 elif cmd == "readattr":
218 self.readattr(*data[1:])
219 elif cmd == "attrinfo":
220 self.attrinfo(*data[1:])
221 elif cmd == "invoke":
222 self.invoke(*data[1:])
223 elif cmd == "subs":
224 self.subscribe(*data[1:])
225 elif cmd == "unsubs":
226 self.unsubscribe(*data[1:])
227 else:
228 self.send("-", Exception("Unknown command: %r" % (cmd,)))
229
230 def handle(self, buf):
231 if len(buf) < 4:
232 return buf
233 dlen = struct.unpack(">l", buf[:4])[0]
234 if len(buf) < dlen + 4:
235 return buf
236 data = pickle.loads(buf[4:dlen + 4])
237 self.command(data)
238 return buf[dlen + 4:]
239
240protocols["perf"] = perf
241
242class client(threading.Thread):
243 def __init__(self, sk):
244 super(client, self).__init__(name = "Management client")
245 self.setDaemon(True)
246 self.sk = sk
247 self.handler = self
248
249 def send(self, data):
250 return self.sk.send(data)
251
252 def choose(self, proto):
253 if proto in protocols:
254 self.handler = protocols[proto](self)
255 else:
256 self.send("-ERR Unknown protocol: %s\n" % proto)
257 raise Exception()
258
259 def handle(self, buf):
260 p = buf.find("\n")
261 if p >= 0:
262 proto = buf[:p]
263 buf = buf[p + 1:]
264 self.choose(proto)
265 return buf
266
267 def run(self):
268 try:
269 buf = ""
270 self.send("+PDM1\n")
271 while True:
272 ret = self.sk.recv(1024)
273 if ret == "":
274 return
275 buf += ret
276 while True:
277 try:
278 nbuf = self.handler.handle(buf)
279 except:
280 return
281 if nbuf == buf:
282 break
283 buf = nbuf
284 finally:
285 #for line in traceback.format_exception(*sys.exc_info()):
286 # print line
287 try:
288 self.sk.close()
289 finally:
290 if hasattr(self.handler, "closed"):
291 self.handler.closed()
292
293
294class listener(threading.Thread):
295 def __init__(self):
296 super(listener, self).__init__(name = "Management listener")
297 self.setDaemon(True)
298
299 def listen(self, sk):
300 self.running = True
301 while self.running:
302 rfd, wfd, efd = select.select([sk], [], [sk], 1)
303 for fd in rfd:
304 if fd == sk:
305 nsk, addr = sk.accept()
306 self.accept(nsk, addr)
307
308 def stop(self):
309 self.running = False
310 self.join()
311
312 def accept(self, sk, addr):
313 cl = client(sk)
314 cl.start()
315
316class unixlistener(listener):
317 def __init__(self, name, mode = 0600, group = None):
318 super(unixlistener, self).__init__()
319 self.name = name
320 self.mode = mode
321 self.group = group
322
323 def run(self):
324 sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
325 ul = False
326 try:
327 if os.path.exists(self.name) and os.path.stat.S_ISSOCK(os.stat(self.name).st_mode):
328 os.unlink(self.name)
329 sk.bind(self.name)
330 ul = True
331 os.chmod(self.name, self.mode)
332 if self.group is not None:
333 os.chown(self.name, os.getuid(), grp.getgrnam(self.group).gr_gid)
334 sk.listen(16)
335 self.listen(sk)
336 finally:
337 sk.close()
338 if ul:
339 os.unlink(self.name)
340
341class tcplistener(listener):
342 def __init__(self, port, bindaddr = "127.0.0.1"):
343 super(tcplistener, self).__init__()
344 self.port = port
345 self.bindaddr = bindaddr
346
347 def run(self):
348 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
349 try:
350 sk.bind((self.bindaddr, self.port))
351 sk.listen(16)
352 self.listen(sk)
353 finally:
354 sk.close()
355
356def listen(spec):
357 if ":" in spec:
358 first = spec[:spec.index(":")]
359 last = spec[spec.rindex(":") + 1:]
360 else:
361 first = spec
362 last = spec
363 if "/" in first:
364 parts = spec.split(":")
365 mode = 0600
366 group = None
367 if len(parts) > 1:
368 mode = int(parts[1], 0)
369 if len(parts) > 2:
370 group = parts[2]
371 ret = unixlistener(parts[0], mode = mode, group = group)
372 ret.start()
373 return ret
374 if last.isdigit():
375 p = spec.rindex(":")
376 host = spec[:p]
377 port = int(spec[p + 1:])
378 ret = tcplistener(port, bindaddr = host)
379 ret.start()
380 return ret
381 raise ValueError("Unparsable listener specification: %r" % spec)
382
383import pdm.perf