--- /dev/null
+"""Management for daemon processes
+
+This module provides some client support for the daemon management
+provided in the pdm.srv module.
+"""
+
+import socket, pickle, struct, select, threading
+
+__all__ = ["client", "replclient"]
+
+class protoerr(Exception):
+ pass
+
+def resolve(spec):
+ if isinstance(spec, socket.socket):
+ return spec
+ sk = None
+ try:
+ if "/" in spec:
+ sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sk.connect(spec)
+ elif spec.isdigit():
+ sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sk.connect(("localhost", int(spec)))
+ elif ":" in spec:
+ sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ p = spec.rindex(":")
+ sk.connect((spec[:p], int(spec[p + 1:])))
+ else:
+ raise Exception("Unknown target specification %r" % spec)
+ rv = sk
+ sk = None
+ finally:
+ if sk is not None: sk.close()
+ return rv
+
+class client(object):
+ def __init__(self, sk, proto = None):
+ self.sk = resolve(sk)
+ self.buf = ""
+ line = self.readline()
+ if line != "+PDM1":
+ raise protoerr("Illegal protocol signature")
+ if proto is not None:
+ self.select(proto)
+
+ def close(self):
+ self.sk.close()
+
+ def readline(self):
+ while True:
+ p = self.buf.find("\n")
+ if p >= 0:
+ ret = self.buf[:p]
+ self.buf = self.buf[p + 1:]
+ return ret
+ ret = self.sk.recv(1024)
+ if ret == "":
+ return None
+ self.buf += ret
+
+ def select(self, proto):
+ if "\n" in proto:
+ raise Exception("Illegal protocol specified: %r" % proto)
+ self.sk.send(proto + "\n")
+ rep = self.readline()
+ if len(rep) < 1 or rep[0] != "+":
+ raise protoerr("Error reply when selecting protocol %s: %s" % (proto, rep[1:]))
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *excinfo):
+ self.close()
+ return False
+
+class replclient(client):
+ def __init__(self, sk):
+ super(replclient, self).__init__(sk, "repl")
+
+ def run(self, code):
+ while True:
+ ncode = code.replace("\n\n", "\n")
+ if ncode == code: break
+ code = ncode
+ while len(code) > 0 and code[-1] == "\n":
+ code = code[:-1]
+ self.sk.send(code + "\n\n")
+ buf = ""
+ while True:
+ ln = self.readline()
+ if ln[0] == " ":
+ buf += ln[1:] + "\n"
+ elif ln[0] == "+":
+ return buf
+ elif ln[0] == "-":
+ raise protoerr("Error reply: %s" % ln[1:])
+ else:
+ raise protoerr("Illegal reply: %s" % ln)
+
+class perfproxy(object):
+ def __init__(self, cl, id, proto):
+ self.cl = cl
+ self.id = id
+ self.proto = proto
+ self.subscribers = set()
+
+ def lookup(self, name):
+ self.cl.lock.acquire()
+ try:
+ id = self.cl.nextid
+ self.cl.nextid += 1
+ finally:
+ self.cl.lock.release()
+ (proto,) = self.cl.run("lookup", id, self.id, name)
+ proxy = perfproxy(self.cl, id, proto)
+ self.cl.proxies[id] = proxy
+ return proxy
+
+ def listdir(self):
+ return self.cl.run("ls", self.id)[0]
+
+ def readattr(self):
+ return self.cl.run("readattr", self.id)[0]
+
+ def attrinfo(self):
+ return self.cl.run("attrinfo", self.id)[0]
+
+ def invoke(self, method, *args, **kwargs):
+ return self.cl.run("invoke", self.id, method, args, kwargs)[0]
+
+ def subscribe(self, cb):
+ if cb in self.subscribers:
+ raise ValueError("Already subscribed")
+ if len(self.subscribers) == 0:
+ self.cl.run("subs", self.id)
+ self.subscribers.add(cb)
+
+ def unsubscribe(self):
+ if cb not in self.subscribers:
+ raise ValueError("Not subscribed")
+ self.subscribers.remove(cb)
+ if len(self.subscribers) == 0:
+ self.cl.run("unsubs", self.id)
+
+ def notify(self, ev):
+ for cb in self.subscribers:
+ try:
+ cb(ev)
+ except: pass
+
+ def close(self):
+ self.cl.run("unbind", self.id)
+ del self.cl.proxies[self.id]
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *excinfo):
+ self.close()
+ return False
+
+class perfclient(client):
+ def __init__(self, sk):
+ super(perfclient, self).__init__(sk, "perf")
+ self.nextid = 0
+ self.lock = threading.Lock()
+ self.proxies = {}
+ self.names = {}
+
+ def send(self, ob):
+ buf = pickle.dumps(ob)
+ buf = struct.pack(">l", len(buf)) + buf
+ self.sk.send(buf)
+
+ def recvb(self, num):
+ buf = ""
+ while len(buf) < num:
+ data = self.sk.recv(num - len(buf))
+ if data == "":
+ raise EOFError()
+ buf += data
+ return buf
+
+ def recv(self):
+ return pickle.loads(self.recvb(struct.unpack(">l", self.recvb(4))[0]))
+
+ def event(self, id, ev):
+ proxy = self.proxies.get(id)
+ if proxy is None: return
+ proxy.notify(ev)
+
+ def dispatch(self, timeout = None):
+ rfd, wfd, efd = select.select([self.sk], [], [], timeout)
+ if self.sk in rfd:
+ msg = self.recv()
+ if msg[0] == "*":
+ self.event(msg[1], msg[2])
+ else:
+ raise ValueError("Unexpected non-event message: %r" % msg[0])
+
+ def recvreply(self):
+ while True:
+ reply = self.recv()
+ if reply[0] in ("+", "-"):
+ return reply
+ elif reply[0] == "*":
+ self.event(reply[1], reply[2])
+ else:
+ raise ValueError("Illegal reply header: %r" % reply[0])
+
+ def run(self, cmd, *args):
+ self.lock.acquire()
+ try:
+ self.send((cmd,) + args)
+ reply = self.recvreply()
+ if reply[0] == "+":
+ return reply[1:]
+ else:
+ raise reply[1]
+ finally:
+ self.lock.release()
+
+ def lookup(self, module, obnm):
+ self.lock.acquire()
+ try:
+ id = self.nextid
+ self.nextid += 1
+ finally:
+ self.lock.release()
+ (proto,) = self.run("bind", id, module, obnm)
+ proxy = perfproxy(self, id, proto)
+ self.proxies[id] = proxy
+ return proxy
+
+ def find(self, name):
+ ret = self.names.get(name)
+ if ret is None:
+ if "/" in name:
+ p = name.rindex("/")
+ ret = self.find(name[:p]).lookup(name[p + 1:])
+ else:
+ p = name.rindex(".")
+ ret = self.lookup(name[:p], name[p + 1:])
+ self.names[name] = ret
+ return ret
--- /dev/null
+import os, sys, resource, time, socket
+
+class attrinfo(object):
+ def __init__(self, desc = None):
+ self.desc = desc
+
+class perfobj(object):
+ def __init__(self, *args, **kwargs):
+ super(perfobj, self).__init__()
+
+ def pdm_protocols(self):
+ return []
+
+class simpleattr(perfobj):
+ def __init__(self, func, info = None, *args, **kwargs):
+ super(simpleattr, self).__init__(*args, **kwargs)
+ self.func = func
+ if info is None:
+ info = attrinfo()
+ self.info = info
+
+ def readattr(self):
+ return self.func()
+
+ def attrinfo(self):
+ return self.info
+
+ def pdm_protocols(self):
+ return super(simpleattr, self).pdm_protocols() + ["attr"]
+
+class valueattr(perfobj):
+ def __init__(self, init, info = None, *args, **kwargs):
+ super(valueattr, self).__init__(*args, **kwargs)
+ self.value = init
+ if info is None:
+ info = attrinfo()
+ self.info = info
+
+ def readattr(self):
+ return self.value
+
+ def attrinfo(self):
+ return self.info
+
+ def pdm_protocols(self):
+ return super(valueattr, self).pdm_protocols() + ["attr"]
+
+
+class eventobj(perfobj):
+ def __init__(self, *args, **kwargs):
+ super(eventobj, self).__init__(*args, **kwargs)
+ self.subscribers = set()
+
+ def subscribe(self, cb):
+ if cb in self.subscribers:
+ raise ValueError("Already subscribed")
+ self.subscribers.add(cb)
+
+ def unsubscribe(self, cb):
+ self.subscribers.remove(cb)
+
+ def notify(self, event):
+ for cb in self.subscribers:
+ try:
+ cb(event)
+ except: pass
+
+ def pdm_protocols(self):
+ return super(eventobj, self).pdm_protocols() + ["event"]
+
+class staticdir(perfobj):
+ def __init__(self, *args, **kwargs):
+ super(staticdir, self).__init__(*args, **kwargs)
+ self.map = {}
+
+ def __setitem__(self, name, ob):
+ self.map[name] = ob
+
+ def __delitem__(self, name):
+ del self.map[name]
+
+ def __getitem__(self, name):
+ return self.map[name]
+
+ def get(self, name, default = None):
+ return self.map.get(name, default)
+
+ def listdir(self):
+ return self.map.keys()
+
+ def lookup(self, name):
+ return self.map[name]
+
+ def pdm_protocols(self):
+ return super(staticdir, self).pdm_protocols() + ["dir"]
+
+sysres = staticdir()
+itime = time.time()
+ires = resource.getrusage(resource.RUSAGE_SELF)
+def ct():
+ ru = resource.getrusage(resource.RUSAGE_SELF)
+ return (ru.ru_utime - ires.ru_utime) + (ru.ru_stime - ires.ru_stime)
+sysres["realtime"] = simpleattr(func = lambda: time.time() - itime)
+sysres["cputime"] = simpleattr(func = ct)
+sysres["utime"] = simpleattr(func = lambda: resource.getrusage(resource.RUSAGE_SELF).ru_utime - ires.ru_utime)
+sysres["stime"] = simpleattr(func = lambda: resource.getrusage(resource.RUSAGE_SELF).ru_stime - ires.ru_stime)
+sysres["maxrss"] = simpleattr(func = lambda: resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
+sysres["rusage"] = simpleattr(func = lambda: resource.getrusage(resource.RUSAGE_SELF))
+
+sysinfo = staticdir()
+sysinfo["pid"] = simpleattr(func = os.getpid)
+sysinfo["uname"] = simpleattr(func = os.uname)
+sysinfo["hostname"] = simpleattr(func = socket.gethostname)
+sysinfo["platform"] = valueattr(init = sys.platform)
--- /dev/null
+"""Management for daemon processes
+
+This module contains a utility to listen for management commands on a
+socket, lending itself to managing daemon processes.
+"""
+
+import os, sys, socket, threading, grp, select
+import types, pprint, traceback
+import pickle, struct
+
+__all__ = ["listener", "unixlistener", "tcplistener", "listen"]
+
+protocols = {}
+
+class repl(object):
+ def __init__(self, cl):
+ self.cl = cl
+ self.mod = types.ModuleType("repl")
+ self.mod.echo = self.echo
+ self.printer = pprint.PrettyPrinter(indent = 4, depth = 6)
+ cl.send("+REPL\n")
+
+ def sendlines(self, text):
+ for line in text.split("\n"):
+ self.cl.send(" " + line + "\n")
+
+ def echo(self, ob):
+ self.sendlines(self.printer.pformat(ob))
+
+ def command(self, cmd):
+ try:
+ try:
+ ccode = compile(cmd, "PDM Input", "eval")
+ except SyntaxError:
+ ccode = compile(cmd, "PDM Input", "exec")
+ exec ccode in self.mod.__dict__
+ self.cl.send("+OK\n")
+ else:
+ self.echo(eval(ccode, self.mod.__dict__))
+ self.cl.send("+OK\n")
+ except:
+ for line in traceback.format_exception(*sys.exc_info()):
+ self.cl.send(" " + line)
+ self.cl.send("+EXC\n")
+
+ def handle(self, buf):
+ p = buf.find("\n\n")
+ if p < 0:
+ return buf
+ cmd = buf[:p + 1]
+ self.command(cmd)
+ return buf[p + 2:]
+protocols["repl"] = repl
+
+class perf(object):
+ def __init__(self, cl):
+ self.cl = cl
+ self.odtab = {}
+ cl.send("+PERF1\n")
+ self.buf = ""
+ self.lock = threading.Lock()
+ self.subscribed = {}
+
+ def closed(self):
+ for id, recv in self.subscribed.iteritems():
+ ob = self.odtab[id]
+ if ob is None: continue
+ ob, protos = ob
+ try:
+ ob.unsubscribe(recv)
+ except: pass
+
+ def send(self, *args):
+ self.lock.acquire()
+ try:
+ buf = pickle.dumps(args)
+ buf = struct.pack(">l", len(buf)) + buf
+ self.cl.send(buf)
+ finally:
+ self.lock.release()
+
+ def bindob(self, id, ob):
+ if not hasattr(ob, "pdm_protocols"):
+ raise ValueError("Object does not support PDM introspection")
+ try:
+ proto = ob.pdm_protocols()
+ except Exception, exc:
+ raise ValueError("PDM introspection failed", exc)
+ self.odtab[id] = ob, proto
+ return proto
+
+ def bind(self, id, module, obnm):
+ resmod = sys.modules.get(module)
+ if resmod is None:
+ self.send("-", ImportError("No such module: %s" % module))
+ return
+ try:
+ ob = getattr(resmod, obnm)
+ except AttributeError:
+ self.send("-", AttributeError("No such object: %s" % obnm))
+ return
+ try:
+ proto = self.bindob(id, ob)
+ except Exception, exc:
+ self.send("-", exc)
+ return
+ self.send("+", proto)
+
+ def getob(self, id, proto):
+ ob = self.odtab.get(id)
+ if ob is None:
+ self.send("-", ValueError("No such bound ID: %r" % id))
+ return None
+ ob, protos = ob
+ if proto not in protos:
+ self.send("-", ValueError("Object does not support that protocol"))
+ return None
+ return ob
+
+ def lookup(self, tgtid, srcid, obnm):
+ src = self.getob(srcid, "dir")
+ if src is None:
+ return
+ try:
+ ob = src.lookup(obnm)
+ except KeyError, exc:
+ self.send("-", exc)
+ return
+ try:
+ proto = self.bindob(tgtid, ob)
+ except Exception, exc:
+ self.send("-", exc)
+ return
+ self.send("+", proto)
+
+ def unbind(self, id):
+ ob = self.odtab.get(id)
+ if ob is None:
+ self.send("-", KeyError("No such name bound: %r" % id))
+ return
+ ob, protos = ob
+ del self.odtab[id]
+ recv = self.subscribed.get(id)
+ if recv is not None:
+ ob.unsubscribe(recv)
+ del self.subscribed[id]
+ self.send("+")
+
+ def listdir(self, id):
+ ob = self.getob(id, "dir")
+ if ob is None:
+ return
+ self.send("+", ob.listdir())
+
+ def readattr(self, id):
+ ob = self.getob(id, "attr")
+ if ob is None:
+ return
+ try:
+ ret = ob.readattr()
+ except Exception, exc:
+ self.send("-", Exception("Could not read attribute"))
+ return
+ self.send("+", ret)
+
+ def attrinfo(self, id):
+ ob = self.getob(id, "attr")
+ if ob is None:
+ return
+ self.send("+", ob.attrinfo())
+
+ def invoke(self, id, method, args, kwargs):
+ ob = self.getob(id, "invoke")
+ if ob is None:
+ return
+ try:
+ self.send("+", ob.invoke(method, *args, **kwargs))
+ except Exception, exc:
+ self.send("-", exc)
+
+ def event(self, id, ob, ev):
+ self.send("*", id, ev)
+
+ def subscribe(self, id):
+ ob = self.getob(id, "event")
+ if ob is None:
+ return
+ if id in self.subscribed:
+ self.send("-", ValueError("Already subscribed"))
+ def recv(ev):
+ self.event(id, ob, ev)
+ ob.subscribe(recv)
+ self.subscribed[id] = recv
+ self.send("+")
+
+ def unsubscribe(self, id):
+ ob = self.getob(id, "event")
+ if ob is None:
+ return
+ recv = self.subscribed.get(id)
+ if recv is None:
+ self.send("-", ValueError("Not subscribed"))
+ ob.unsubscribe(recv)
+ del self.subscribed[id]
+ self.send("+")
+
+ def command(self, data):
+ cmd = data[0]
+ if cmd == "bind":
+ self.bind(*data[1:])
+ elif cmd == "unbind":
+ self.unbind(*data[1:])
+ elif cmd == "lookup":
+ self.lookup(*data[1:])
+ elif cmd == "ls":
+ self.listdir(*data[1:])
+ elif cmd == "readattr":
+ self.readattr(*data[1:])
+ elif cmd == "attrinfo":
+ self.attrinfo(*data[1:])
+ elif cmd == "invoke":
+ self.invoke(*data[1:])
+ elif cmd == "subs":
+ self.subscribe(*data[1:])
+ elif cmd == "unsubs":
+ self.unsubscribe(*data[1:])
+ else:
+ self.send("-", Exception("Unknown command: %r" % (cmd,)))
+
+ def handle(self, buf):
+ if len(buf) < 4:
+ return buf
+ dlen = struct.unpack(">l", buf[:4])[0]
+ if len(buf) < dlen + 4:
+ return buf
+ data = pickle.loads(buf[4:dlen + 4])
+ self.command(data)
+ return buf[dlen + 4:]
+
+protocols["perf"] = perf
+
+class client(threading.Thread):
+ def __init__(self, sk):
+ super(client, self).__init__(name = "Management client")
+ self.setDaemon(True)
+ self.sk = sk
+ self.handler = self
+
+ def send(self, data):
+ return self.sk.send(data)
+
+ def choose(self, proto):
+ if proto in protocols:
+ self.handler = protocols[proto](self)
+ else:
+ self.send("-ERR Unknown protocol: %s\n" % proto)
+ raise Exception()
+
+ def handle(self, buf):
+ p = buf.find("\n")
+ if p >= 0:
+ proto = buf[:p]
+ buf = buf[p + 1:]
+ self.choose(proto)
+ return buf
+
+ def run(self):
+ try:
+ buf = ""
+ self.send("+PDM1\n")
+ while True:
+ ret = self.sk.recv(1024)
+ if ret == "":
+ return
+ buf += ret
+ while True:
+ try:
+ nbuf = self.handler.handle(buf)
+ except:
+ return
+ if nbuf == buf:
+ break
+ buf = nbuf
+ finally:
+ #for line in traceback.format_exception(*sys.exc_info()):
+ # print line
+ try:
+ self.sk.close()
+ finally:
+ if hasattr(self.handler, "closed"):
+ self.handler.closed()
+
+
+class listener(threading.Thread):
+ def __init__(self):
+ super(listener, self).__init__(name = "Management listener")
+ self.setDaemon(True)
+
+ def listen(self, sk):
+ self.running = True
+ while self.running:
+ rfd, wfd, efd = select.select([sk], [], [sk], 1)
+ for fd in rfd:
+ if fd == sk:
+ nsk, addr = sk.accept()
+ self.accept(nsk, addr)
+
+ def stop(self):
+ self.running = False
+ self.join()
+
+ def accept(self, sk, addr):
+ cl = client(sk)
+ cl.start()
+
+class unixlistener(listener):
+ def __init__(self, name, mode = 0600, group = None):
+ super(unixlistener, self).__init__()
+ self.name = name
+ self.mode = mode
+ self.group = group
+
+ def run(self):
+ sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ ul = False
+ try:
+ if os.path.exists(self.name) and os.path.stat.S_ISSOCK(os.stat(self.name).st_mode):
+ os.unlink(self.name)
+ sk.bind(self.name)
+ ul = True
+ os.chmod(self.name, self.mode)
+ if self.group is not None:
+ os.chown(self.name, os.getuid(), grp.getgrnam(self.group).gr_gid)
+ sk.listen(16)
+ self.listen(sk)
+ finally:
+ sk.close()
+ if ul:
+ os.unlink(self.name)
+
+class tcplistener(listener):
+ def __init__(self, port, bindaddr = "127.0.0.1"):
+ super(tcplistener, self).__init__()
+ self.port = port
+ self.bindaddr = bindaddr
+
+ def run(self):
+ sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ sk.bind((self.bindaddr, self.port))
+ sk.listen(16)
+ self.listen(sk)
+ finally:
+ sk.close()
+
+def listen(spec):
+ if ":" in spec:
+ first = spec[:spec.index(":")]
+ last = spec[spec.rindex(":") + 1:]
+ else:
+ first = spec
+ last = spec
+ if "/" in first:
+ parts = spec.split(":")
+ mode = 0600
+ group = None
+ if len(parts) > 1:
+ mode = int(parts[1], 0)
+ if len(parts) > 2:
+ group = parts[2]
+ ret = unixlistener(parts[0], mode = mode, group = group)
+ ret.start()
+ return ret
+ if last.isdigit():
+ p = spec.rindex(":")
+ host = spec[:p]
+ port = int(spec[p + 1:])
+ ret = tcplistener(port, bindaddr = host)
+ ret.start()
+ return ret
+ raise ValueError("Unparsable listener specification: %r" % spec)
+
+import pdm.perf