-import sys, os, threading, time, logging, select, queue
+import sys, os, threading, time, logging, select, queue, collections
from . import perf
log = logging.getLogger("ashd.serve")
return
th.join()
+class threadpool(handler):
+ cname = "pool"
+
+ def __init__(self, *, max=25, qsz=100, timeout=None, **kw):
+ super().__init__(**kw)
+ self.current = set()
+ self.clk = threading.Lock()
+ self.ccond = threading.Condition(self.clk)
+ self.queue = collections.deque()
+ self.waiting = set()
+ self.qlk = threading.Lock()
+ self.qcond = threading.Condition(self.qlk)
+ self.max = max
+ self.qsz = qsz
+ self.timeout = timeout
+
+ @classmethod
+ def parseargs(cls, *, max=None, queue=None, abort=None, **args):
+ ret = super().parseargs(**args)
+ if max:
+ ret["max"] = int(max)
+ if queue:
+ ret["qsz"] = int(queue)
+ if abort:
+ ret["timeout"] = int(abort)
+ return ret
+
+ def handle(self, req):
+ start = False
+ with self.qlk:
+ if self.timeout is not None:
+ now = start = time.time()
+ while len(self.queue) >= self.qsz:
+ self.qcond.wait(start + self.timeout - now)
+ now = time.time()
+ if now - start > self.timeout:
+ os.abort()
+ else:
+ while len(self.current) >= self.qsz:
+ self.qcond.wait()
+ self.queue.append(req)
+ self.qcond.notify()
+ if len(self.waiting) < 1:
+ start = True
+ if start:
+ with self.clk:
+ if len(self.current) < self.max:
+ th = reqthread(target=self.run)
+ th.registered = False
+ th.start()
+ while not th.registered:
+ self.ccond.wait()
+
+ def handle1(self, req):
+ try:
+ env = req.mkenv()
+ with perf.request(env) as reqevent:
+ respiter = req.handlewsgi(env, req.startreq)
+ for data in respiter:
+ req.write(data)
+ if req.status:
+ reqevent.response([req.status, req.headers])
+ req.flushreq()
+ self.ckflush(req)
+ except closed:
+ pass
+ except:
+ log.error("exception occurred when handling request", exc_info=True)
+
+ def run(self):
+ timeout = 10.0
+ th = threading.current_thread()
+ with self.clk:
+ self.current.add(th)
+ th.registered = True
+ self.ccond.notify_all()
+ try:
+ while True:
+ start = now = time.time()
+ with self.qlk:
+ while len(self.queue) < 1:
+ self.waiting.add(th)
+ self.qcond.wait(start + timeout - now)
+ self.waiting.remove(th)
+ now = time.time()
+ if now - start > timeout:
+ return
+ req = self.queue.popleft()
+ try:
+ self.handle1(req)
+ finally:
+ req.close()
+ finally:
+ with self.clk:
+ self.current.remove(th)
+
+ def close(self):
+ while True:
+ with self.lk:
+ if len(self.current) > 0:
+ th = next(iter(self.current))
+ else:
+ return
+ th.join()
+
class resplex(handler):
cname = "rplex"