Merge branch 'master' of fludd.seatribe.se:/usr/local/src/ashd
[ashd.git] / python / ashd / serve.py
1 import sys, os, threading, time, logging, select, Queue
2 import perf
3
4 log = logging.getLogger("ashd.serve")
5 seq = 1
6 seqlk = threading.Lock()
7
8 def reqseq():
9     global seq
10     with seqlk:
11         s = seq
12         seq += 1
13         return s
14
15 class closed(IOError):
16     def __init__(self):
17         super(closed, self).__init__("The client has closed the connection.")
18
19 class reqthread(threading.Thread):
20     def __init__(self, name=None, **kw):
21         if name is None:
22             name = "Request handler %i" % reqseq()
23         super(reqthread, self).__init__(name=name, **kw)
24
25 class wsgirequest(object):
26     def __init__(self, handler):
27         self.status = None
28         self.headers = []
29         self.respsent = False
30         self.handler = handler
31         self.buffer = bytearray()
32
33     def handlewsgi(self):
34         raise Exception()
35     def fileno(self):
36         raise Exception()
37     def writehead(self, status, headers):
38         raise Exception()
39     def flush(self):
40         raise Exception()
41     def close(self):
42         pass
43     def writedata(self, data):
44         self.buffer.extend(data)
45
46     def flushreq(self):
47         if not self.respsent:
48             if not self.status:
49                 raise Exception("Cannot send response body before starting response.")
50             self.respsent = True
51             self.writehead(self.status, self.headers)
52
53     def write(self, data):
54         if not data:
55             return
56         self.flushreq()
57         self.writedata(data)
58         self.handler.ckflush(self)
59
60     def startreq(self, status, headers, exc_info=None):
61         if self.status:
62             if exc_info:
63                 try:
64                     if self.respsent:
65                         raise exc_info[1]
66                 finally:
67                     exc_info = None
68             else:
69                 raise Exception("Can only start responding once.")
70         self.status = status
71         self.headers = headers
72         return self.write
73
74 class handler(object):
75     def handle(self, request):
76         raise Exception()
77     def ckflush(self, req):
78         while len(req.buffer) > 0:
79             rls, wls, els = select.select([], [req], [req])
80             req.flush()
81     def close(self):
82         pass
83
84     @classmethod
85     def parseargs(cls, **args):
86         if len(args) > 0:
87             raise ValueError("unknown handler argument: " + iter(args).next())
88         return {}
89
90 class single(handler):
91     cname = "single"
92
93     def handle(self, req):
94         try:
95             env = req.mkenv()
96             with perf.request(env) as reqevent:
97                 respiter = req.handlewsgi(env, req.startreq)
98                 for data in respiter:
99                     req.write(data)
100                 if req.status:
101                     reqevent.response([req.status, req.headers])
102                     req.flushreq()
103                 self.ckflush(req)
104         except closed:
105             pass
106         except:
107             log.error("exception occurred when handling request", exc_info=True)
108         finally:
109             req.close()
110
111 class freethread(handler):
112     cname = "free"
113
114     def __init__(self, max=None, timeout=None, **kw):
115         super(freethread, self).__init__(**kw)
116         self.current = set()
117         self.lk = threading.Lock()
118         self.tcond = threading.Condition(self.lk)
119         self.max = max
120         self.timeout = timeout
121
122     @classmethod
123     def parseargs(cls, max=None, abort=None, **args):
124         ret = super(freethread, cls).parseargs(**args)
125         if max:
126             ret["max"] = int(max)
127         if abort:
128             ret["timeout"] = int(abort)
129         return ret
130
131     def handle(self, req):
132         with self.lk:
133             if self.max is not None:
134                 if self.timeout is not None:
135                     now = start = time.time()
136                     while len(self.current) >= self.max:
137                         self.tcond.wait(start + self.timeout - now)
138                         now = time.time()
139                         if now - start > self.timeout:
140                             os.abort()
141                 else:
142                     while len(self.current) >= self.max:
143                         self.tcond.wait()
144             th = reqthread(target=self.run, args=[req])
145             th.registered = False
146             th.start()
147             while not th.registered:
148                 self.tcond.wait()
149
150     def run(self, req):
151         try:
152             th = threading.current_thread()
153             with self.lk:
154                 self.current.add(th)
155                 th.registered = True
156                 self.tcond.notify_all()
157             try:
158                 env = req.mkenv()
159                 with perf.request(env) as reqevent:
160                     respiter = req.handlewsgi(env, req.startreq)
161                     for data in respiter:
162                         req.write(data)
163                     if req.status:
164                         reqevent.response([req.status, req.headers])
165                         req.flushreq()
166                     self.ckflush(req)
167             except closed:
168                 pass
169             except:
170                 log.error("exception occurred when handling request", exc_info=True)
171             finally:
172                 with self.lk:
173                     self.current.remove(th)
174                     self.tcond.notify_all()
175         finally:
176             req.close()
177
178     def close(self):
179         while True:
180             with self.lk:
181                 if len(self.current) > 0:
182                     th = iter(self.current).next()
183                 else:
184                     return
185             th.join()
186
187 class resplex(handler):
188     cname = "rplex"
189
190     def __init__(self, max=None, **kw):
191         super(resplex, self).__init__(**kw)
192         self.current = set()
193         self.lk = threading.Lock()
194         self.tcond = threading.Condition(self.lk)
195         self.max = max
196         self.cqueue = Queue.Queue(5)
197         self.cnpipe = os.pipe()
198         self.rthread = reqthread(name="Response thread", target=self.handle2)
199         self.rthread.start()
200
201     @classmethod
202     def parseargs(cls, max=None, **args):
203         ret = super(resplex, cls).parseargs(**args)
204         if max:
205             ret["max"] = int(max)
206         return ret
207
208     def ckflush(self, req):
209         raise Exception("resplex handler does not support the write() function")
210
211     def handle(self, req):
212         with self.lk:
213             if self.max is not None:
214                 while len(self.current) >= self.max:
215                     self.tcond.wait()
216             th = reqthread(target=self.handle1, args=[req])
217             th.registered = False
218             th.start()
219             while not th.registered:
220                 self.tcond.wait()
221
222     def handle1(self, req):
223         try:
224             th = threading.current_thread()
225             with self.lk:
226                 self.current.add(th)
227                 th.registered = True
228                 self.tcond.notify_all()
229             try:
230                 env = req.mkenv()
231                 respobj = req.handlewsgi(env, req.startreq)
232                 respiter = iter(respobj)
233                 if not req.status:
234                     log.error("request handler returned without calling start_request")
235                     if hasattr(respiter, "close"):
236                         respiter.close()
237                     return
238                 else:
239                     self.cqueue.put((req, respiter))
240                     os.write(self.cnpipe[1], " ")
241                     req = None
242             finally:
243                 with self.lk:
244                     self.current.remove(th)
245                     self.tcond.notify_all()
246         except closed:
247             pass
248         except:
249             log.error("exception occurred when handling request", exc_info=True)
250         finally:
251             if req is not None:
252                 req.close()
253
254     def handle2(self):
255         try:
256             rp = self.cnpipe[0]
257             current = {}
258
259             def closereq(req):
260                 respiter = current[req]
261                 try:
262                     if respiter is not None and hasattr(respiter, "close"):
263                         respiter.close()
264                 except:
265                     log.error("exception occurred when closing iterator", exc_info=True)
266                 try:
267                     req.close()
268                 except:
269                     log.error("exception occurred when closing request", exc_info=True)
270                 del current[req]
271             def ckiter(req):
272                 respiter = current[req]
273                 if respiter is not None:
274                     rem = False
275                     try:
276                         data = respiter.next()
277                     except StopIteration:
278                         rem = True
279                         try:
280                             req.flushreq()
281                         except:
282                             log.error("exception occurred when handling response data", exc_info=True)
283                     except:
284                         rem = True
285                         log.error("exception occurred when iterating response", exc_info=True)
286                     if not rem:
287                         if data:
288                             try:
289                                 req.flushreq()
290                                 req.writedata(data)
291                             except:
292                                 log.error("exception occurred when handling response data", exc_info=True)
293                                 rem = True
294                     if rem:
295                         current[req] = None
296                         try:
297                             if hasattr(respiter, "close"):
298                                 respiter.close()
299                         except:
300                             log.error("exception occurred when closing iterator", exc_info=True)
301                         respiter = None
302                 if respiter is None and not req.buffer:
303                     closereq(req)
304
305             while True:
306                 bufl = list(req for req in current.iterkeys() if req.buffer)
307                 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
308                 if rp in rls:
309                     ret = os.read(rp, 1024)
310                     if not ret:
311                         os.close(rp)
312                         return
313                     try:
314                         while True:
315                             req, respiter = self.cqueue.get(False)
316                             current[req] = respiter
317                             ckiter(req)
318                     except Queue.Empty:
319                         pass
320                 for req in wls:
321                     try:
322                         req.flush()
323                     except closed:
324                         closereq(req)
325                     except:
326                         log.error("exception occurred when writing response", exc_info=True)
327                         closereq(req)
328                     else:
329                         if len(req.buffer) < 65536:
330                             ckiter(req)
331         except:
332             log.critical("unexpected exception occurred in response handler thread", exc_info=True)
333             os.abort()
334
335     def close(self):
336         while True:
337             with self.lk:
338                 if len(self.current) > 0:
339                     th = iter(self.current).next()
340                 else:
341                     break
342             th.join()
343         os.close(self.cnpipe[1])
344         self.rthread.join()
345
346 names = dict((cls.cname, cls) for cls in globals().itervalues() if
347              isinstance(cls, type) and
348              issubclass(cls, handler) and
349              hasattr(cls, "cname"))
350
351 def parsehspec(spec):
352     if ":" not in spec:
353         return spec, {}
354     nm, spec = spec.split(":", 1)
355     args = {}
356     while spec:
357         if "," in spec:
358             part, spec = spec.split(",", 1)
359         else:
360             part, spec = spec, None
361         if "=" in part:
362             key, val = part.split("=", 1)
363         else:
364             key, val = part, ""
365         args[key] = val
366     return nm, args