Commit | Line | Data |
---|---|---|
7f97a47e FT |
1 | """Management for daemon processes |
2 | ||
3 | This module contains a utility to listen for management commands on a | |
4 | socket, lending itself to managing daemon processes. | |
5 | """ | |
6 | ||
7 | import os, sys, socket, threading, grp, select | |
8 | import types, pprint, traceback | |
9 | import pickle, struct | |
10 | ||
11 | __all__ = ["listener", "unixlistener", "tcplistener", "listen"] | |
12 | ||
13 | protocols = {} | |
14 | ||
15 | class repl(object): | |
16 | def __init__(self, cl): | |
17 | self.cl = cl | |
18 | self.mod = types.ModuleType("repl") | |
19 | self.mod.echo = self.echo | |
20 | self.printer = pprint.PrettyPrinter(indent = 4, depth = 6) | |
21 | cl.send("+REPL\n") | |
22 | ||
23 | def sendlines(self, text): | |
24 | for line in text.split("\n"): | |
25 | self.cl.send(" " + line + "\n") | |
26 | ||
27 | def echo(self, ob): | |
28 | self.sendlines(self.printer.pformat(ob)) | |
29 | ||
30 | def command(self, cmd): | |
31 | try: | |
32 | try: | |
33 | ccode = compile(cmd, "PDM Input", "eval") | |
34 | except SyntaxError: | |
35 | ccode = compile(cmd, "PDM Input", "exec") | |
afd9f04c | 36 | exec(ccode, self.mod.__dict__) |
7f97a47e FT |
37 | self.cl.send("+OK\n") |
38 | else: | |
39 | self.echo(eval(ccode, self.mod.__dict__)) | |
40 | self.cl.send("+OK\n") | |
41 | except: | |
42 | for line in traceback.format_exception(*sys.exc_info()): | |
43 | self.cl.send(" " + line) | |
44 | self.cl.send("+EXC\n") | |
45 | ||
46 | def handle(self, buf): | |
47 | p = buf.find("\n\n") | |
48 | if p < 0: | |
49 | return buf | |
50 | cmd = buf[:p + 1] | |
51 | self.command(cmd) | |
52 | return buf[p + 2:] | |
53 | protocols["repl"] = repl | |
54 | ||
55 | class perf(object): | |
56 | def __init__(self, cl): | |
57 | self.cl = cl | |
58 | self.odtab = {} | |
59 | cl.send("+PERF1\n") | |
60 | self.buf = "" | |
61 | self.lock = threading.Lock() | |
62 | self.subscribed = {} | |
63 | ||
64 | def closed(self): | |
afd9f04c | 65 | for id, recv in self.subscribed.items(): |
7f97a47e FT |
66 | ob = self.odtab[id] |
67 | if ob is None: continue | |
68 | ob, protos = ob | |
69 | try: | |
70 | ob.unsubscribe(recv) | |
71 | except: pass | |
72 | ||
73 | def send(self, *args): | |
74 | self.lock.acquire() | |
75 | try: | |
76 | buf = pickle.dumps(args) | |
77 | buf = struct.pack(">l", len(buf)) + buf | |
78 | self.cl.send(buf) | |
79 | finally: | |
80 | self.lock.release() | |
81 | ||
82 | def bindob(self, id, ob): | |
83 | if not hasattr(ob, "pdm_protocols"): | |
84 | raise ValueError("Object does not support PDM introspection") | |
85 | try: | |
86 | proto = ob.pdm_protocols() | |
afd9f04c | 87 | except Exception as exc: |
7f97a47e FT |
88 | raise ValueError("PDM introspection failed", exc) |
89 | self.odtab[id] = ob, proto | |
90 | return proto | |
91 | ||
92 | def bind(self, id, module, obnm): | |
93 | resmod = sys.modules.get(module) | |
94 | if resmod is None: | |
95 | self.send("-", ImportError("No such module: %s" % module)) | |
96 | return | |
97 | try: | |
98 | ob = getattr(resmod, obnm) | |
99 | except AttributeError: | |
100 | self.send("-", AttributeError("No such object: %s" % obnm)) | |
101 | return | |
102 | try: | |
103 | proto = self.bindob(id, ob) | |
afd9f04c | 104 | except Exception as exc: |
7f97a47e FT |
105 | self.send("-", exc) |
106 | return | |
107 | self.send("+", proto) | |
108 | ||
109 | def getob(self, id, proto): | |
110 | ob = self.odtab.get(id) | |
111 | if ob is None: | |
112 | self.send("-", ValueError("No such bound ID: %r" % id)) | |
113 | return None | |
114 | ob, protos = ob | |
115 | if proto not in protos: | |
116 | self.send("-", ValueError("Object does not support that protocol")) | |
117 | return None | |
118 | return ob | |
119 | ||
120 | def lookup(self, tgtid, srcid, obnm): | |
121 | src = self.getob(srcid, "dir") | |
122 | if src is None: | |
123 | return | |
124 | try: | |
125 | ob = src.lookup(obnm) | |
afd9f04c | 126 | except KeyError as exc: |
7f97a47e FT |
127 | self.send("-", exc) |
128 | return | |
129 | try: | |
130 | proto = self.bindob(tgtid, ob) | |
afd9f04c | 131 | except Exception as exc: |
7f97a47e FT |
132 | self.send("-", exc) |
133 | return | |
134 | self.send("+", proto) | |
135 | ||
136 | def unbind(self, id): | |
137 | ob = self.odtab.get(id) | |
138 | if ob is None: | |
139 | self.send("-", KeyError("No such name bound: %r" % id)) | |
140 | return | |
141 | ob, protos = ob | |
142 | del self.odtab[id] | |
143 | recv = self.subscribed.get(id) | |
144 | if recv is not None: | |
145 | ob.unsubscribe(recv) | |
146 | del self.subscribed[id] | |
147 | self.send("+") | |
148 | ||
149 | def listdir(self, id): | |
150 | ob = self.getob(id, "dir") | |
151 | if ob is None: | |
152 | return | |
153 | self.send("+", ob.listdir()) | |
154 | ||
155 | def readattr(self, id): | |
156 | ob = self.getob(id, "attr") | |
157 | if ob is None: | |
158 | return | |
159 | try: | |
160 | ret = ob.readattr() | |
afd9f04c | 161 | except Exception as exc: |
7f97a47e FT |
162 | self.send("-", Exception("Could not read attribute")) |
163 | return | |
164 | self.send("+", ret) | |
165 | ||
166 | def attrinfo(self, id): | |
167 | ob = self.getob(id, "attr") | |
168 | if ob is None: | |
169 | return | |
170 | self.send("+", ob.attrinfo()) | |
171 | ||
172 | def invoke(self, id, method, args, kwargs): | |
173 | ob = self.getob(id, "invoke") | |
174 | if ob is None: | |
175 | return | |
176 | try: | |
177 | self.send("+", ob.invoke(method, *args, **kwargs)) | |
afd9f04c | 178 | except Exception as exc: |
7f97a47e FT |
179 | self.send("-", exc) |
180 | ||
181 | def event(self, id, ob, ev): | |
182 | self.send("*", id, ev) | |
183 | ||
184 | def subscribe(self, id): | |
185 | ob = self.getob(id, "event") | |
186 | if ob is None: | |
187 | return | |
188 | if id in self.subscribed: | |
189 | self.send("-", ValueError("Already subscribed")) | |
190 | def recv(ev): | |
191 | self.event(id, ob, ev) | |
192 | ob.subscribe(recv) | |
193 | self.subscribed[id] = recv | |
194 | self.send("+") | |
195 | ||
196 | def unsubscribe(self, id): | |
197 | ob = self.getob(id, "event") | |
198 | if ob is None: | |
199 | return | |
200 | recv = self.subscribed.get(id) | |
201 | if recv is None: | |
202 | self.send("-", ValueError("Not subscribed")) | |
203 | ob.unsubscribe(recv) | |
204 | del self.subscribed[id] | |
205 | self.send("+") | |
206 | ||
207 | def command(self, data): | |
208 | cmd = data[0] | |
209 | if cmd == "bind": | |
210 | self.bind(*data[1:]) | |
211 | elif cmd == "unbind": | |
212 | self.unbind(*data[1:]) | |
213 | elif cmd == "lookup": | |
214 | self.lookup(*data[1:]) | |
215 | elif cmd == "ls": | |
216 | self.listdir(*data[1:]) | |
217 | elif cmd == "readattr": | |
218 | self.readattr(*data[1:]) | |
219 | elif cmd == "attrinfo": | |
220 | self.attrinfo(*data[1:]) | |
221 | elif cmd == "invoke": | |
222 | self.invoke(*data[1:]) | |
223 | elif cmd == "subs": | |
224 | self.subscribe(*data[1:]) | |
225 | elif cmd == "unsubs": | |
226 | self.unsubscribe(*data[1:]) | |
227 | else: | |
228 | self.send("-", Exception("Unknown command: %r" % (cmd,))) | |
229 | ||
230 | def handle(self, buf): | |
231 | if len(buf) < 4: | |
232 | return buf | |
233 | dlen = struct.unpack(">l", buf[:4])[0] | |
234 | if len(buf) < dlen + 4: | |
235 | return buf | |
236 | data = pickle.loads(buf[4:dlen + 4]) | |
237 | self.command(data) | |
238 | return buf[dlen + 4:] | |
239 | ||
240 | protocols["perf"] = perf | |
241 | ||
242 | class client(threading.Thread): | |
243 | def __init__(self, sk): | |
244 | super(client, self).__init__(name = "Management client") | |
245 | self.setDaemon(True) | |
246 | self.sk = sk | |
247 | self.handler = self | |
248 | ||
249 | def send(self, data): | |
250 | return self.sk.send(data) | |
251 | ||
252 | def choose(self, proto): | |
253 | if proto in protocols: | |
254 | self.handler = protocols[proto](self) | |
255 | else: | |
256 | self.send("-ERR Unknown protocol: %s\n" % proto) | |
257 | raise Exception() | |
258 | ||
259 | def handle(self, buf): | |
260 | p = buf.find("\n") | |
261 | if p >= 0: | |
262 | proto = buf[:p] | |
263 | buf = buf[p + 1:] | |
264 | self.choose(proto) | |
265 | return buf | |
266 | ||
267 | def run(self): | |
268 | try: | |
269 | buf = "" | |
270 | self.send("+PDM1\n") | |
271 | while True: | |
272 | ret = self.sk.recv(1024) | |
273 | if ret == "": | |
274 | return | |
275 | buf += ret | |
276 | while True: | |
277 | try: | |
278 | nbuf = self.handler.handle(buf) | |
279 | except: | |
280 | return | |
281 | if nbuf == buf: | |
282 | break | |
283 | buf = nbuf | |
284 | finally: | |
285 | #for line in traceback.format_exception(*sys.exc_info()): | |
286 | # print line | |
287 | try: | |
288 | self.sk.close() | |
289 | finally: | |
290 | if hasattr(self.handler, "closed"): | |
291 | self.handler.closed() | |
292 | ||
293 | ||
294 | class listener(threading.Thread): | |
295 | def __init__(self): | |
296 | super(listener, self).__init__(name = "Management listener") | |
297 | self.setDaemon(True) | |
298 | ||
299 | def listen(self, sk): | |
300 | self.running = True | |
301 | while self.running: | |
302 | rfd, wfd, efd = select.select([sk], [], [sk], 1) | |
303 | for fd in rfd: | |
304 | if fd == sk: | |
305 | nsk, addr = sk.accept() | |
306 | self.accept(nsk, addr) | |
307 | ||
308 | def stop(self): | |
309 | self.running = False | |
310 | self.join() | |
311 | ||
312 | def accept(self, sk, addr): | |
313 | cl = client(sk) | |
314 | cl.start() | |
315 | ||
316 | class unixlistener(listener): | |
afd9f04c | 317 | def __init__(self, name, mode = 0o600, group = None): |
7f97a47e FT |
318 | super(unixlistener, self).__init__() |
319 | self.name = name | |
320 | self.mode = mode | |
321 | self.group = group | |
322 | ||
323 | def run(self): | |
324 | sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
325 | ul = False | |
326 | try: | |
327 | if os.path.exists(self.name) and os.path.stat.S_ISSOCK(os.stat(self.name).st_mode): | |
328 | os.unlink(self.name) | |
329 | sk.bind(self.name) | |
330 | ul = True | |
331 | os.chmod(self.name, self.mode) | |
332 | if self.group is not None: | |
333 | os.chown(self.name, os.getuid(), grp.getgrnam(self.group).gr_gid) | |
334 | sk.listen(16) | |
335 | self.listen(sk) | |
336 | finally: | |
337 | sk.close() | |
338 | if ul: | |
339 | os.unlink(self.name) | |
340 | ||
341 | class tcplistener(listener): | |
342 | def __init__(self, port, bindaddr = "127.0.0.1"): | |
343 | super(tcplistener, self).__init__() | |
344 | self.port = port | |
345 | self.bindaddr = bindaddr | |
346 | ||
347 | def run(self): | |
348 | sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
349 | try: | |
350 | sk.bind((self.bindaddr, self.port)) | |
351 | sk.listen(16) | |
352 | self.listen(sk) | |
353 | finally: | |
354 | sk.close() | |
355 | ||
356 | def listen(spec): | |
357 | if ":" in spec: | |
358 | first = spec[:spec.index(":")] | |
359 | last = spec[spec.rindex(":") + 1:] | |
360 | else: | |
361 | first = spec | |
362 | last = spec | |
363 | if "/" in first: | |
364 | parts = spec.split(":") | |
afd9f04c | 365 | mode = 0o600 |
7f97a47e FT |
366 | group = None |
367 | if len(parts) > 1: | |
368 | mode = int(parts[1], 0) | |
369 | if len(parts) > 2: | |
370 | group = parts[2] | |
371 | ret = unixlistener(parts[0], mode = mode, group = group) | |
372 | ret.start() | |
373 | return ret | |
374 | if last.isdigit(): | |
375 | p = spec.rindex(":") | |
376 | host = spec[:p] | |
377 | port = int(spec[p + 1:]) | |
378 | ret = tcplistener(port, bindaddr = host) | |
379 | ret.start() | |
380 | return ret | |
381 | raise ValueError("Unparsable listener specification: %r" % spec) | |
382 | ||
383 | import pdm.perf |