super().__init__(name=name, **kw)
class wsgirequest(object):
- def __init__(self, handler):
+ def __init__(self, *, handler):
self.status = None
self.headers = []
self.respsent = False
return {}
class single(handler):
+ cname = "single"
+
def handle(self, req):
try:
env = req.mkenv()
finally:
req.close()
+def dbg(*a):
+ f = True
+ for o in a:
+ if not f:
+ sys.stderr.write(" ")
+ sys.stderr.write(str(a))
+ f = False
+ sys.stderr.write("\n")
+ sys.stderr.flush()
+
class freethread(handler):
+ cname = "free"
+
def __init__(self, *, max=None, timeout=None, **kw):
super().__init__(**kw)
self.current = set()
while len(self.current) >= self.max:
self.tcond.wait()
th = reqthread(target=self.run, args=[req])
+ th.registered = False
th.start()
- while th.is_alive() and th not in self.current:
+ while not th.registered:
self.tcond.wait()
def run(self, req):
th = threading.current_thread()
with self.lk:
self.current.add(th)
+ th.registered = True
self.tcond.notify_all()
try:
env = req.mkenv()
return
th.join()
-class threadpool(handler):
- def __init__(self, *, min=0, max=20, live=300, **kw):
+class resplex(handler):
+ cname = "rplex"
+
+ def __init__(self, *, max=None, **kw):
super().__init__(**kw)
self.current = set()
- self.free = set()
- self.lk = threading.RLock()
- self.pcond = threading.Condition(self.lk)
- self.rcond = threading.Condition(self.lk)
- self.wreq = None
- self.min = min
+ self.lk = threading.Lock()
+ self.tcond = threading.Condition(self.lk)
self.max = max
- self.live = live
- for i in range(self.min):
- self.newthread()
+ self.cqueue = queue.Queue(5)
+ self.cnpipe = os.pipe()
+ self.rthread = reqthread(name="Response thread", target=self.handle2)
+ self.rthread.start()
@classmethod
- def parseargs(cls, *, min=None, max=None, live=None, **args):
+ def parseargs(cls, *, max=None, **args):
ret = super().parseargs(**args)
- if min:
- ret["min"] = int(min)
if max:
ret["max"] = int(max)
- if live:
- ret["live"] = int(live)
return ret
- def newthread(self):
- with self.lk:
- th = reqthread(target=self.loop)
- th.start()
- while not th in self.current:
- self.pcond.wait()
-
- def _handle(self, req):
- try:
- env = req.mkenv()
- with perf.request(env) as reqevent:
- respiter = req.handlewsgi(env, req.startreq)
- for data in respiter:
- req.write(data)
- if req.status:
- reqevent.response([req.status, req.headers])
- req.flushreq()
- self.ckflush(req)
- except closed:
- pass
- except:
- log.error("exception occurred when handling request", exc_info=True)
- finally:
- req.close()
-
- def loop(self):
- th = threading.current_thread()
- with self.lk:
- self.current.add(th)
- try:
- while True:
- with self.lk:
- self.free.add(th)
- try:
- self.pcond.notify_all()
- now = start = time.time()
- while self.wreq is None:
- self.rcond.wait(start + self.live - now)
- now = time.time()
- if now - start > self.live:
- if len(self.current) > self.min:
- self.current.remove(th)
- return
- else:
- start = now
- req, self.wreq = self.wreq, None
- self.pcond.notify_all()
- finally:
- self.free.remove(th)
- self._handle(req)
- req = None
- finally:
- with self.lk:
- try:
- self.current.remove(th)
- except KeyError:
- pass
- self.pcond.notify_all()
-
- def handle(self, req):
- while True:
- with self.lk:
- if len(self.free) < 1 and len(self.current) < self.max:
- self.newthread()
- while self.wreq is not None:
- self.pcond.wait()
- if self.wreq is None:
- self.wreq = req
- self.rcond.notify(1)
- return
-
- def close(self):
- self.live = 0
- self.min = 0
- with self.lk:
- while len(self.current) > 0:
- self.rcond.notify_all()
- self.pcond.wait(1)
-
-class resplex(handler):
- def __init__(self, **kw):
- super().__init__(**kw)
- self.current = set()
- self.lk = threading.Lock()
- self.cqueue = queue.Queue(5)
- self.cnpipe = os.pipe()
- self.rthread = reqthread(name="Response thread", target=self.handle2)
- self.rthread.start()
-
def ckflush(self, req):
raise Exception("resplex handler does not support the write() function")
def handle(self, req):
- reqthread(target=self.handle1, args=[req]).start()
+ with self.lk:
+ if self.max is not None:
+ while len(self.current) >= self.max:
+ self.tcond.wait()
+ th = reqthread(target=self.handle1, args=[req])
+ th.registered = False
+ th.start()
+ while not th.registered:
+ self.tcond.wait()
def handle1(self, req):
try:
th = threading.current_thread()
with self.lk:
self.current.add(th)
+ th.registered = True
+ self.tcond.notify_all()
try:
env = req.mkenv()
respobj = req.handlewsgi(env, req.startreq)
os.write(self.cnpipe[1], b" ")
req = None
finally:
- self.current.remove(th)
+ with self.lk:
+ self.current.remove(th)
+ self.tcond.notify_all()
except closed:
pass
except:
data = next(respiter)
except StopIteration:
rem = True
- req.flushreq()
+ try:
+ req.flushreq()
+ except:
+ log.error("exception occurred when handling response data", exc_info=True)
except:
rem = True
log.error("exception occurred when iterating response", exc_info=True)
if not rem:
if data:
- req.flushreq()
- req.writedata(data)
- else:
+ try:
+ req.flushreq()
+ req.writedata(data)
+ except:
+ log.error("exception occurred when handling response data", exc_info=True)
+ rem = True
+ if rem:
current[req] = None
try:
if hasattr(respiter, "close"):
ckiter(req)
except:
log.critical("unexpected exception occurred in response handler thread", exc_info=True)
- sys.exit(1)
+ os.abort()
def close(self):
while True:
os.close(self.cnpipe[1])
self.rthread.join()
-names = {"single": single,
- "free": freethread,
- "pool": threadpool,
- "rplex": resplex}
+names = {cls.cname: cls for cls in globals().values() if
+ isinstance(cls, type) and
+ issubclass(cls, handler) and
+ hasattr(cls, "cname")}
def parsehspec(spec):
if ":" not in spec: