1 import sys, os, threading, time, logging, select, Queue
4 log = logging.getLogger("ashd.serve")
6 seqlk = threading.Lock()
15 class closed(IOError):
17 super(closed, self).__init__("The client has closed the connection.")
19 class reqthread(threading.Thread):
20 def __init__(self, name=None, **kw):
22 name = "Request handler %i" % reqseq()
23 super(reqthread, self).__init__(name=name, **kw)
25 class wsgirequest(object):
26 def __init__(self, handler):
30 self.handler = handler
31 self.buffer = bytearray()
37 def writehead(self, status, headers):
43 def writedata(self, data):
44 self.buffer.extend(data)
49 raise Exception("Cannot send response body before starting response.")
51 self.writehead(self.status, self.headers)
53 def write(self, data):
58 self.handler.ckflush(self)
60 def startreq(self, status, headers, exc_info=None):
69 raise Exception("Can only start responding once.")
71 self.headers = headers
74 class handler(object):
75 def handle(self, request):
77 def ckflush(self, req):
79 p.register(req, select.POLLOUT)
80 while len(req.buffer) > 0:
87 def parseargs(cls, **args):
89 raise ValueError("unknown handler argument: " + iter(args).next())
92 class single(handler):
95 def handle(self, req):
98 with perf.request(env) as reqevent:
99 respiter = req.handlewsgi(env, req.startreq)
100 for data in respiter:
103 reqevent.response([req.status, req.headers])
109 log.error("exception occurred when handling request", exc_info=True)
113 class freethread(handler):
116 def __init__(self, max=None, timeout=None, **kw):
117 super(freethread, self).__init__(**kw)
119 self.lk = threading.Lock()
120 self.tcond = threading.Condition(self.lk)
122 self.timeout = timeout
125 def parseargs(cls, max=None, abort=None, **args):
126 ret = super(freethread, cls).parseargs(**args)
128 ret["max"] = int(max)
130 ret["timeout"] = int(abort)
133 def handle(self, req):
135 if self.max is not None:
136 if self.timeout is not None:
137 now = start = time.time()
138 while len(self.current) >= self.max:
139 self.tcond.wait(start + self.timeout - now)
141 if now - start > self.timeout:
144 while len(self.current) >= self.max:
146 th = reqthread(target=self.run, args=[req])
147 th.registered = False
149 while not th.registered:
154 th = threading.current_thread()
158 self.tcond.notify_all()
161 with perf.request(env) as reqevent:
162 respiter = req.handlewsgi(env, req.startreq)
163 for data in respiter:
166 reqevent.response([req.status, req.headers])
172 log.error("exception occurred when handling request", exc_info=True)
175 self.current.remove(th)
176 self.tcond.notify_all()
183 if len(self.current) > 0:
184 th = iter(self.current).next()
189 class resplex(handler):
192 def __init__(self, max=None, **kw):
193 super(resplex, self).__init__(**kw)
195 self.lk = threading.Lock()
196 self.tcond = threading.Condition(self.lk)
198 self.cqueue = Queue.Queue(5)
199 self.cnpipe = os.pipe()
200 self.rthread = reqthread(name="Response thread", target=self.handle2)
204 def parseargs(cls, max=None, **args):
205 ret = super(resplex, cls).parseargs(**args)
207 ret["max"] = int(max)
210 def ckflush(self, req):
211 raise Exception("resplex handler does not support the write() function")
213 def handle(self, req):
215 if self.max is not None:
216 while len(self.current) >= self.max:
218 th = reqthread(target=self.handle1, args=[req])
219 th.registered = False
221 while not th.registered:
224 def handle1(self, req):
226 th = threading.current_thread()
230 self.tcond.notify_all()
233 respobj = req.handlewsgi(env, req.startreq)
234 respiter = iter(respobj)
236 log.error("request handler returned without calling start_request")
237 if hasattr(respiter, "close"):
241 self.cqueue.put((req, respiter))
242 os.write(self.cnpipe[1], " ")
246 self.current.remove(th)
247 self.tcond.notify_all()
251 log.error("exception occurred when handling request", exc_info=True)
262 respiter = current[req]
264 if respiter is not None and hasattr(respiter, "close"):
267 log.error("exception occurred when closing iterator", exc_info=True)
271 log.error("exception occurred when closing request", exc_info=True)
274 respiter = current[req]
275 if respiter is not None:
278 data = respiter.next()
279 except StopIteration:
284 log.error("exception occurred when handling response data", exc_info=True)
287 log.error("exception occurred when iterating response", exc_info=True)
294 log.error("exception occurred when handling response data", exc_info=True)
299 if hasattr(respiter, "close"):
302 log.error("exception occurred when closing iterator", exc_info=True)
304 if respiter is None and not req.buffer:
308 bufl = list(req for req in current.iterkeys() if req.buffer)
309 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
311 ret = os.read(rp, 1024)
317 req, respiter = self.cqueue.get(False)
318 current[req] = respiter
328 log.error("exception occurred when writing response", exc_info=True)
331 if len(req.buffer) < 65536:
334 log.critical("unexpected exception occurred in response handler thread", exc_info=True)
340 if len(self.current) > 0:
341 th = iter(self.current).next()
345 os.close(self.cnpipe[1])
348 names = dict((cls.cname, cls) for cls in globals().itervalues() if
349 isinstance(cls, type) and
350 issubclass(cls, handler) and
351 hasattr(cls, "cname"))
353 def parsehspec(spec):
356 nm, spec = spec.split(":", 1)
360 part, spec = spec.split(",", 1)
362 part, spec = spec, None
364 key, val = part.split("=", 1)