1 import sys, os, threading, time, logging, select
4 log = logging.getLogger("ashd.serve")
6 seqlk = threading.Lock()
15 class closed(IOError):
17 super().__init__("The client has closed the connection.")
19 class reqthread(threading.Thread):
20 def __init__(self, *, name=None, **kw):
22 name = "Request handler %i" % reqseq()
23 super().__init__(name=name, **kw)
25 class wsgirequest(object):
26 def __init__(self, handler):
30 self.handler = handler
31 self.buffer = bytearray()
37 def writehead(self, status, headers):
43 def writedata(self, data):
44 self.buffer.extend(data)
49 raise Exception("Cannot send response body before starting response.")
51 self.writehead(self.status, self.headers)
53 def write(self, data):
58 self.handler.ckflush(self)
60 def startreq(self, status, headers, exc_info=None):
69 raise Exception("Can only start responding once.")
71 self.headers = headers
74 class handler(object):
75 def handle(self, request):
77 def ckflush(self, req):
83 def parseargs(cls, **args):
85 raise ValueError("unknown handler argument: " + next(iter(args)))
88 class freethread(handler):
89 def __init__(self, *, max=None, timeout=None, **kw):
90 super().__init__(**kw)
92 self.lk = threading.Lock()
93 self.tcond = threading.Condition(self.lk)
95 self.timeout = timeout
98 def parseargs(cls, *, max=None, abort=None, **args):
99 ret = super().parseargs(**args)
101 ret["max"] = int(max)
103 ret["timeout"] = int(abort)
106 def handle(self, req):
108 if self.max is not None:
109 if self.timeout is not None:
110 now = start = time.time()
111 while len(self.current) >= self.max:
112 self.tcond.wait(start + self.timeout - now)
114 if now - start > self.timeout:
117 while len(self.current) >= self.max:
119 th = reqthread(target=self.run, args=[req])
121 while th.is_alive() and th not in self.current:
124 def ckflush(self, req):
125 while len(req.buffer) > 0:
126 rls, wls, els = select.select([], [req], [req])
131 th = threading.current_thread()
134 self.tcond.notify_all()
137 with perf.request(env) as reqevent:
138 respiter = req.handlewsgi(env, req.startreq)
139 for data in respiter:
142 reqevent.response([req.status, req.headers])
148 log.error("exception occurred when handling request", exc_info=True)
151 self.current.remove(th)
152 self.tcond.notify_all()
159 if len(self.current) > 0:
160 th = next(iter(self.current))
165 class threadpool(handler):
166 def __init__(self, *, min=0, max=20, live=300, **kw):
167 super().__init__(**kw)
170 self.lk = threading.RLock()
171 self.pcond = threading.Condition(self.lk)
172 self.rcond = threading.Condition(self.lk)
177 for i in range(self.min):
181 def parseargs(cls, *, min=None, max=None, live=None, **args):
182 ret = super().parseargs(**args)
184 ret["min"] = int(min)
186 ret["max"] = int(max)
188 ret["live"] = int(live)
193 th = reqthread(target=self.loop)
195 while not th in self.current:
198 def ckflush(self, req):
199 while len(req.buffer) > 0:
200 rls, wls, els = select.select([], [req], [req])
203 def _handle(self, req):
206 with perf.request(env) as reqevent:
207 respiter = req.handlewsgi(env, req.startreq)
208 for data in respiter:
211 reqevent.response([req.status, req.headers])
217 log.error("exception occurred when handling request", exc_info=True)
222 th = threading.current_thread()
230 self.pcond.notify_all()
231 now = start = time.time()
232 while self.wreq is None:
233 self.rcond.wait(start + self.live - now)
235 if now - start > self.live:
236 if len(self.current) > self.min:
237 self.current.remove(th)
241 req, self.wreq = self.wreq, None
242 self.pcond.notify_all()
250 self.current.remove(th)
253 self.pcond.notify_all()
255 def handle(self, req):
258 if len(self.free) < 1 and len(self.current) < self.max:
260 while self.wreq is not None:
262 if self.wreq is None:
271 while len(self.current) > 0:
272 self.rcond.notify_all()
275 names = {"free": freethread,
278 def parsehspec(spec):
281 nm, spec = spec.split(":", 1)
285 part, spec = spec.split(",", 1)
287 part, spec = spec, None
289 key, val = part.split("=", 1)