+ with self.lk:
+ self.current.remove(th)
+ self.tcond.notify_all()
+ finally:
+ req.close()
+
+ def close(self):
+ while True:
+ with self.lk:
+ if len(self.current) > 0:
+ th = next(iter(self.current))
+ else:
+ return
+ th.join()
+
+class threadpool(handler):
+ def __init__(self, *, min=0, max=20, live=300, **kw):
+ super().__init__(**kw)
+ self.current = set()
+ self.free = set()
+ self.lk = threading.RLock()
+ self.pcond = threading.Condition(self.lk)
+ self.rcond = threading.Condition(self.lk)
+ self.wreq = None
+ self.min = min
+ self.max = max
+ self.live = live
+ for i in range(self.min):
+ self.newthread()
+
+ @classmethod
+ def parseargs(cls, *, min=None, max=None, live=None, **args):
+ ret = super().parseargs(**args)
+ if min:
+ ret["min"] = int(min)
+ if max:
+ ret["max"] = int(max)
+ if live:
+ ret["live"] = int(live)
+ return ret
+
+ def newthread(self):
+ with self.lk:
+ th = reqthread(target=self.loop)
+ th.start()
+ while not th in self.current:
+ self.pcond.wait()
+
+ def ckflush(self, req):
+ while len(req.buffer) > 0:
+ rls, wls, els = select.select([], [req], [req])
+ req.flush()
+
+ def _handle(self, req):
+ try:
+ env = req.mkenv()
+ with perf.request(env) as reqevent:
+ respiter = req.handlewsgi(env, req.startreq)
+ for data in respiter:
+ req.write(data)
+ if req.status:
+ reqevent.response([req.status, req.headers])
+ req.flushreq()
+ self.ckflush(req)