- def newthread(self):
- with self.lk:
- th = reqthread(target=self.loop)
- th.start()
- while not th in self.current:
- self.pcond.wait()
-
- def _handle(self, req):
- try:
- env = req.mkenv()
- with perf.request(env) as reqevent:
- respiter = req.handlewsgi(env, req.startreq)
- for data in respiter:
- req.write(data)
- if req.status:
- reqevent.response([req.status, req.headers])
- req.flushreq()
- self.ckflush(req)
- except closed:
- pass
- except:
- log.error("exception occurred when handling request", exc_info=True)
- finally:
- req.close()
-
- def loop(self):
- th = threading.current_thread()
- with self.lk:
- self.current.add(th)
- try:
- while True:
- with self.lk:
- self.free.add(th)
- try:
- self.pcond.notify_all()
- now = start = time.time()
- while self.wreq is None:
- self.rcond.wait(start + self.live - now)
- now = time.time()
- if now - start > self.live:
- if len(self.current) > self.min:
- self.current.remove(th)
- return
- else:
- start = now
- req, self.wreq = self.wreq, None
- self.pcond.notify_all()
- finally:
- self.free.remove(th)
- self._handle(req)
- req = None
- finally:
- with self.lk:
- try:
- self.current.remove(th)
- except KeyError:
- pass
- self.pcond.notify_all()
-
- def handle(self, req):
- while True:
- with self.lk:
- if len(self.free) < 1 and len(self.current) < self.max:
- self.newthread()
- while self.wreq is not None:
- self.pcond.wait()
- if self.wreq is None:
- self.wreq = req
- self.rcond.notify(1)
- return
-
- def close(self):
- self.live = 0
- self.min = 0
- with self.lk:
- while len(self.current) > 0:
- self.rcond.notify_all()
- self.pcond.wait(1)
-
-class resplex(handler):
- def __init__(self, **kw):
- super().__init__(**kw)
- self.current = set()
- self.lk = threading.Lock()
- self.cqueue = queue.Queue(5)
- self.cnpipe = os.pipe()
- self.rthread = reqthread(name="Response thread", target=self.handle2)
- self.rthread.start()
-