1 import sys, os, threading, time, logging, select, queue
4 log = logging.getLogger("ashd.serve")
6 seqlk = threading.Lock()
15 class closed(IOError):
17 super().__init__("The client has closed the connection.")
19 class reqthread(threading.Thread):
20 def __init__(self, *, name=None, **kw):
22 name = "Request handler %i" % reqseq()
23 super().__init__(name=name, **kw)
25 class wsgirequest(object):
26 def __init__(self, *, handler):
30 self.handler = handler
31 self.buffer = bytearray()
37 def writehead(self, status, headers):
43 def writedata(self, data):
44 self.buffer.extend(data)
49 raise Exception("Cannot send response body before starting response.")
51 self.writehead(self.status, self.headers)
53 def write(self, data):
58 self.handler.ckflush(self)
60 def startreq(self, status, headers, exc_info=None):
69 raise Exception("Can only start responding once.")
71 self.headers = headers
74 class handler(object):
75 def handle(self, request):
77 def ckflush(self, req):
78 while len(req.buffer) > 0:
79 rls, wls, els = select.select([], [req], [req])
85 def parseargs(cls, **args):
87 raise ValueError("unknown handler argument: " + next(iter(args)))
90 class single(handler):
93 def handle(self, req):
96 with perf.request(env) as reqevent:
97 respiter = req.handlewsgi(env, req.startreq)
101 reqevent.response([req.status, req.headers])
107 log.error("exception occurred when handling request", exc_info=True)
111 class freethread(handler):
114 def __init__(self, *, max=None, timeout=None, **kw):
115 super().__init__(**kw)
117 self.lk = threading.Lock()
118 self.tcond = threading.Condition(self.lk)
120 self.timeout = timeout
123 def parseargs(cls, *, max=None, abort=None, **args):
124 ret = super().parseargs(**args)
126 ret["max"] = int(max)
128 ret["timeout"] = int(abort)
131 def handle(self, req):
133 if self.max is not None:
134 if self.timeout is not None:
135 now = start = time.time()
136 while len(self.current) >= self.max:
137 self.tcond.wait(start + self.timeout - now)
139 if now - start > self.timeout:
142 while len(self.current) >= self.max:
144 th = reqthread(target=self.run, args=[req])
146 while th.is_alive() and th not in self.current:
151 th = threading.current_thread()
154 self.tcond.notify_all()
157 with perf.request(env) as reqevent:
158 respiter = req.handlewsgi(env, req.startreq)
159 for data in respiter:
162 reqevent.response([req.status, req.headers])
168 log.error("exception occurred when handling request", exc_info=True)
171 self.current.remove(th)
172 self.tcond.notify_all()
179 if len(self.current) > 0:
180 th = next(iter(self.current))
185 class resplex(handler):
188 def __init__(self, *, max=None, **kw):
189 super().__init__(**kw)
191 self.lk = threading.Lock()
192 self.tcond = threading.Condition(self.lk)
194 self.cqueue = queue.Queue(5)
195 self.cnpipe = os.pipe()
196 self.rthread = reqthread(name="Response thread", target=self.handle2)
200 def parseargs(cls, *, max=None, **args):
201 ret = super().parseargs(**args)
203 ret["max"] = int(max)
206 def ckflush(self, req):
207 raise Exception("resplex handler does not support the write() function")
209 def handle(self, req):
211 if self.max is not None:
212 while len(self.current) >= self.max:
214 th = reqthread(target=self.handle1, args=[req])
216 while th.is_alive() and th not in self.current:
219 def handle1(self, req):
221 th = threading.current_thread()
224 self.tcond.notify_all()
227 respobj = req.handlewsgi(env, req.startreq)
228 respiter = iter(respobj)
230 log.error("request handler returned without calling start_request")
231 if hasattr(respiter, "close"):
235 self.cqueue.put((req, respiter))
236 os.write(self.cnpipe[1], b" ")
240 self.current.remove(th)
241 self.tcond.notify_all()
245 log.error("exception occurred when handling request", exc_info=True)
256 respiter = current[req]
258 if respiter is not None and hasattr(respiter, "close"):
261 log.error("exception occurred when closing iterator", exc_info=True)
265 log.error("exception occurred when closing request", exc_info=True)
268 respiter = current[req]
269 if respiter is not None:
272 data = next(respiter)
273 except StopIteration:
278 log.error("exception occurred when handling response data", exc_info=True)
281 log.error("exception occurred when iterating response", exc_info=True)
288 log.error("exception occurred when handling response data", exc_info=True)
293 if hasattr(respiter, "close"):
296 log.error("exception occurred when closing iterator", exc_info=True)
298 if respiter is None and not req.buffer:
302 bufl = list(req for req in current.keys() if req.buffer)
303 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
305 ret = os.read(rp, 1024)
311 req, respiter = self.cqueue.get(False)
312 current[req] = respiter
322 log.error("exception occurred when writing response", exc_info=True)
325 if len(req.buffer) < 65536:
328 log.critical("unexpected exception occurred in response handler thread", exc_info=True)
334 if len(self.current) > 0:
335 th = next(iter(self.current))
339 os.close(self.cnpipe[1])
342 names = {cls.cname: cls for cls in globals().values() if
343 isinstance(cls, type) and
344 issubclass(cls, handler) and
345 hasattr(cls, "cname")}
347 def parsehspec(spec):
350 nm, spec = spec.split(":", 1)
354 part, spec = spec.split(",", 1)
356 part, spec = spec, None
358 key, val = part.split("=", 1)