da6bcc53d42d27fa9c37b05e46c0aa7e1ca176cd
[pdm.git] / pdm / cli.py
1 """Management for daemon processes
2
3 This module provides some client support for the daemon management
4 provided in the pdm.srv module.
5 """
6
7 import socket, pickle, struct, select, threading
8
9 __all__ = ["client", "replclient"]
10
11 class protoerr(Exception):
12     pass
13
14 def resolve(spec):
15     if isinstance(spec, socket.socket):
16         return spec
17     sk = None
18     try:
19         if "/" in spec:
20             sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
21             sk.connect(spec)
22         elif spec.isdigit():
23             sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
24             sk.connect(("localhost", int(spec)))
25         elif ":" in spec:
26             sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
27             p = spec.rindex(":")
28             sk.connect((spec[:p], int(spec[p + 1:])))
29         else:
30             raise Exception("Unknown target specification %r" % spec)
31         rv = sk
32         sk = None
33     finally:
34         if sk is not None: sk.close()
35     return rv
36
37 class client(object):
38     def __init__(self, sk, proto = None):
39         self.sk = resolve(sk)
40         self.buf = ""
41         line = self.readline()
42         if line != "+PDM1":
43             raise protoerr("Illegal protocol signature")
44         if proto is not None:
45             self.select(proto)
46
47     def close(self):
48         self.sk.close()
49
50     def readline(self):
51         while True:
52             p = self.buf.find("\n")
53             if p >= 0:
54                 ret = self.buf[:p]
55                 self.buf = self.buf[p + 1:]
56                 return ret
57             ret = self.sk.recv(1024)
58             if ret == "":
59                 return None
60             self.buf += ret
61
62     def select(self, proto):
63         if "\n" in proto:
64             raise Exception("Illegal protocol specified: %r" % proto)
65         self.sk.send(proto + "\n")
66         rep = self.readline()
67         if len(rep) < 1 or rep[0] != "+":
68             raise protoerr("Error reply when selecting protocol %s: %s" % (proto, rep[1:]))
69
70     def __enter__(self):
71         return self
72
73     def __exit__(self, *excinfo):
74         self.close()
75         return False
76
77 class replclient(client):
78     def __init__(self, sk):
79         super(replclient, self).__init__(sk, "repl")
80
81     def run(self, code):
82         while True:
83             ncode = code.replace("\n\n", "\n")
84             if ncode == code: break
85             code = ncode
86         while len(code) > 0 and code[-1] == "\n":
87             code = code[:-1]
88         self.sk.send(code + "\n\n")
89         buf = ""
90         while True:
91             ln = self.readline()
92             if ln[0] == " ":
93                 buf += ln[1:] + "\n"
94             elif ln[0] == "+":
95                 return buf
96             elif ln[0] == "-":
97                 raise protoerr("Error reply: %s" % ln[1:])
98             else:
99                 raise protoerr("Illegal reply: %s" % ln)
100
101 class perfproxy(object):
102     def __init__(self, cl, id, proto):
103         self.cl = cl
104         self.id = id
105         self.proto = proto
106         self.subscribers = set()
107
108     def lookup(self, name):
109         self.cl.lock.acquire()
110         try:
111             id = self.cl.nextid
112             self.cl.nextid += 1
113         finally:
114             self.cl.lock.release()
115         (proto,) = self.cl.run("lookup", id, self.id, name)
116         proxy = perfproxy(self.cl, id, proto)
117         self.cl.proxies[id] = proxy
118         return proxy
119
120     def listdir(self):
121         return self.cl.run("ls", self.id)[0]
122
123     def readattr(self):
124         return self.cl.run("readattr", self.id)[0]
125
126     def attrinfo(self):
127         return self.cl.run("attrinfo", self.id)[0]
128
129     def invoke(self, method, *args, **kwargs):
130         return self.cl.run("invoke", self.id, method, args, kwargs)[0]
131
132     def subscribe(self, cb):
133         if cb in self.subscribers:
134             raise ValueError("Already subscribed")
135         if len(self.subscribers) == 0:
136             self.cl.run("subs", self.id)
137         self.subscribers.add(cb)
138
139     def unsubscribe(self):
140         if cb not in self.subscribers:
141             raise ValueError("Not subscribed")
142         self.subscribers.remove(cb)
143         if len(self.subscribers) == 0:
144             self.cl.run("unsubs", self.id)
145
146     def notify(self, ev):
147         for cb in self.subscribers:
148             try:
149                 cb(ev)
150             except: pass
151
152     def close(self):
153         self.cl.run("unbind", self.id)
154         del self.cl.proxies[self.id]
155
156     def __enter__(self):
157         return self
158
159     def __exit__(self, *excinfo):
160         self.close()
161         return False
162
163 class perfclient(client):
164     def __init__(self, sk):
165         super(perfclient, self).__init__(sk, "perf")
166         self.nextid = 0
167         self.lock = threading.Lock()
168         self.proxies = {}
169         self.names = {}
170
171     def send(self, ob):
172         buf = pickle.dumps(ob)
173         buf = struct.pack(">l", len(buf)) + buf
174         self.sk.send(buf)
175
176     def recvb(self, num):
177         buf = ""
178         while len(buf) < num:
179             data = self.sk.recv(num - len(buf))
180             if data == "":
181                 raise EOFError()
182             buf += data
183         return buf
184
185     def recv(self):
186         return pickle.loads(self.recvb(struct.unpack(">l", self.recvb(4))[0]))
187
188     def event(self, id, ev):
189         proxy = self.proxies.get(id)
190         if proxy is None: return
191         proxy.notify(ev)
192
193     def dispatch(self, timeout = None):
194         rfd, wfd, efd = select.select([self.sk], [], [], timeout)
195         if self.sk in rfd:
196             msg = self.recv()
197             if msg[0] == "*":
198                 self.event(msg[1], msg[2])
199             else:
200                 raise ValueError("Unexpected non-event message: %r" % msg[0])
201
202     def recvreply(self):
203         while True:
204             reply = self.recv()
205             if reply[0] in ("+", "-"):
206                 return reply
207             elif reply[0] == "*":
208                 self.event(reply[1], reply[2])
209             else:
210                 raise ValueError("Illegal reply header: %r" % reply[0])
211
212     def run(self, cmd, *args):
213         self.lock.acquire()
214         try:
215             self.send((cmd,) + args)
216             reply = self.recvreply()
217             if reply[0] == "+":
218                 return reply[1:]
219             else:
220                 raise reply[1]
221         finally:
222             self.lock.release()
223
224     def lookup(self, module, obnm):
225         self.lock.acquire()
226         try:
227             id = self.nextid
228             self.nextid += 1
229         finally:
230             self.lock.release()
231         (proto,) = self.run("bind", id, module, obnm)
232         proxy = perfproxy(self, id, proto)
233         self.proxies[id] = proxy
234         return proxy
235
236     def find(self, name):
237         ret = self.names.get(name)
238         if ret is None:
239             if "/" in name:
240                 p = name.rindex("/")
241                 ret = self.find(name[:p]).lookup(name[p + 1:])
242             else:
243                 p = name.rindex(".")
244                 ret = self.lookup(name[:p], name[p + 1:])
245             self.names[name] = ret
246         return ret