#!/usr/bin/python
-import sys, os, getopt, threading, logging, time
-import ashd.proto, ashd.util, ashd.perf, ashd.serve
+import sys, os, getopt, socket, logging, time
+import ashd.util, ashd.serve
try:
import pdm.srv
except:
pdm = None
def usage(out):
- out.write("usage: ashd-wsgi [-hAL] [-m PDM-SPEC] [-p MODPATH] [-l REQLIMIT] HANDLER-MODULE [ARGS...]\n")
+ out.write("usage: ashd-wsgi [-hAL] [-m PDM-SPEC] [-p MODPATH] [-l REQLIMIT] [-t REQUEST-HANDLER[:PAR[=VAL](,PAR[=VAL])...]] HANDLER-MODULE [ARGS...]\n")
-reqlimit = 0
+hspec = "free", {}
modwsgi_compat = False
setlog = True
-opts, args = getopt.getopt(sys.argv[1:], "+hAp:l:m:")
+opts, args = getopt.getopt(sys.argv[1:], "+hALp:t:l:m:")
for o, a in opts:
if o == "-h":
usage(sys.stdout)
elif o == "-A":
modwsgi_compat = True
elif o == "-l":
- reqlimit = int(a)
+ hspec = "free", {"max": a, "abort": "10"}
+ elif o == "-t":
+ hspec = ashd.serve.parsehspec(a)
elif o == "-m":
if pdm is not None:
pdm.srv.listen(a)
env["wsgi.run_once"] = False
return env
-if reqlimit != 0:
- guard = ashd.serve.abortlimiter(reqlimit).call
-else:
- guard = lambda fun: fun()
+class request(ashd.serve.wsgirequest):
+ def __init__(self, bkreq, **kw):
+ super(request, self).__init__(**kw)
+ self.bkreq = bkreq.dup()
+
+ def mkenv(self):
+ return mkenv(self.bkreq)
-class reqthread(ashd.serve.wsgithread):
- def __init__(self, req):
- super(reqthread, self).__init__()
- self.req = req.dup()
-
- def handlewsgi(self):
- return handler(self.env, self.startreq)
+ def handlewsgi(self, env, startreq):
+ return handler(env, startreq)
+
+ def fileno(self):
+ return self.bkreq.bsk.fileno()
def writehead(self, status, headers):
- try:
- self.req.sk.write("HTTP/1.1 %s\n" % status)
- for nm, val in headers:
- self.req.sk.write("%s: %s\n" % (nm, val))
- self.req.sk.write("\n")
- except IOError:
- raise ashd.serve.closed()
+ w = self.buffer.extend
+ w("HTTP/1.1 %s\n" % status)
+ for nm, val in headers:
+ w("%s: %s\n" % (nm, val))
+ w("\n")
- def writedata(self, data):
+ def flush(self):
try:
- self.req.sk.write(data)
- self.req.sk.flush()
+ ret = self.bkreq.bsk.send(self.buffer, socket.MSG_DONTWAIT)
+ self.buffer[:ret] = ""
except IOError:
raise ashd.serve.closed()
- def handle(self):
- self.env = mkenv(self.req)
- reqevent = ashd.perf.request(self.env)
- exc = (None, None, None)
- try:
- super(reqthread, self).handle()
- if self.status:
- reqevent.response([self.status, self.headers])
- except:
- exc = sys.exc_info()
- raise
- finally:
- reqevent.__exit__(*exc)
-
- def run(self):
- try:
- guard(super(reqthread, self).run)
- finally:
- self.req.close()
-
+ def close(self):
+ self.bkreq.close()
+
def handle(req):
- reqthread(req).start()
+ reqhandler.handle(request(bkreq=req, handler=reqhandler))
-ashd.util.serveloop(handle)
+if hspec[0] not in ashd.serve.names:
+ sys.stderr.write("ashd-wsgi: no such request handler: %s\n" % hspec[0])
+ sys.exit(1)
+hclass = ashd.serve.names[hspec[0]]
+try:
+ hargs = hclass.parseargs(**hspec[1])
+except ValueError as exc:
+ sys.stderr.write("ashd-wsgi: %s\n" % exc)
+ sys.exit(1)
+
+reqhandler = hclass(**hargs)
+try:
+ ashd.util.serveloop(handle)
+finally:
+ reqhandler.close()
-import os, threading, time, logging
+import sys, os, threading, time, logging, select, Queue
+import perf
log = logging.getLogger("ashd.serve")
seq = 1
def reqseq():
global seq
- seqlk.acquire()
- try:
+ with seqlk:
s = seq
seq += 1
return s
- finally:
- seqlk.release()
-
-class reqthread(threading.Thread):
- def __init__(self, name=None):
- if name is None:
- name = "Request handler %i" % reqseq()
- super(reqthread, self).__init__(name=name)
-
- def handle(self):
- raise Exception()
-
- def run(self):
- try:
- self.handle()
- except:
- log.error("exception occurred when handling request", exc_info=True)
class closed(IOError):
def __init__(self):
super(closed, self).__init__("The client has closed the connection.")
-class wsgithread(reqthread):
- def __init__(self, **kwargs):
- super(wsgithread, self).__init__(**kwargs)
+class reqthread(threading.Thread):
+ def __init__(self, name=None, **kw):
+ if name is None:
+ name = "Request handler %i" % reqseq()
+ super(reqthread, self).__init__(name=name, **kw)
+
+class wsgirequest(object):
+ def __init__(self, handler):
self.status = None
self.headers = []
self.respsent = False
+ self.handler = handler
+ self.buffer = bytearray()
def handlewsgi(self):
raise Exception()
+ def fileno(self):
+ raise Exception()
def writehead(self, status, headers):
raise Exception()
- def writedata(self, data):
+ def flush(self):
raise Exception()
-
- def write(self, data):
- if not data:
- return
- self.flushreq()
- self.writedata(data)
+ def close(self):
+ pass
+ def writedata(self, data):
+ self.buffer.extend(data)
def flushreq(self):
if not self.respsent:
self.respsent = True
self.writehead(self.status, self.headers)
+ def write(self, data):
+ if not data:
+ return
+ self.flushreq()
+ self.writedata(data)
+ self.handler.ckflush(self)
+
def startreq(self, status, headers, exc_info=None):
if self.status:
- if exc_info: # Nice calling convetion ^^
+ if exc_info:
try:
if self.respsent:
- raise exc_info[0], exc_info[1], exc_info[2]
+ raise exc_info[1]
finally:
- exc_info = None # CPython GC bug?
+ exc_info = None
else:
raise Exception("Can only start responding once.")
self.status = status
self.headers = headers
return self.write
-
- def handle(self):
+
+class handler(object):
+ def handle(self, request):
+ raise Exception()
+ def ckflush(self, req):
+ while len(req.buffer) > 0:
+ rls, wls, els = select.select([], [req], [req])
+ req.flush()
+ def close(self):
+ pass
+
+ @classmethod
+ def parseargs(cls, **args):
+ if len(args) > 0:
+ raise ValueError("unknown handler argument: " + iter(args).next())
+ return {}
+
+class single(handler):
+ cname = "single"
+
+ def handle(self, req):
try:
- respiter = self.handlewsgi()
- try:
+ env = req.mkenv()
+ with perf.request(env) as reqevent:
+ respiter = req.handlewsgi(env, req.startreq)
for data in respiter:
- self.write(data)
- if self.status:
- self.flushreq()
- finally:
- if hasattr(respiter, "close"):
- respiter.close()
+ req.write(data)
+ if req.status:
+ reqevent.response([req.status, req.headers])
+ req.flushreq()
+ self.ckflush(req)
except closed:
pass
+ except:
+ log.error("exception occurred when handling request", exc_info=True)
+ finally:
+ req.close()
+
+class freethread(handler):
+ cname = "free"
-class calllimiter(object):
- def __init__(self, limit):
- self.limit = limit
- self.lock = threading.Condition()
- self.inflight = 0
+ def __init__(self, max=None, timeout=None, **kw):
+ super(freethread, self).__init__(**kw)
+ self.current = set()
+ self.lk = threading.Lock()
+ self.tcond = threading.Condition(self.lk)
+ self.max = max
+ self.timeout = timeout
- def waited(self, time):
- if time > 10:
- raise RuntimeError("Waited too long")
+ @classmethod
+ def parseargs(cls, max=None, abort=None, **args):
+ ret = super(freethread, cls).parseargs(**args)
+ if max:
+ ret["max"] = int(max)
+ if abort:
+ ret["timeout"] = int(abort)
+ return ret
- def __enter__(self):
- self.lock.acquire()
+ def handle(self, req):
+ with self.lk:
+ if self.max is not None:
+ if self.timeout is not None:
+ now = start = time.time()
+ while len(self.current) >= self.max:
+ self.tcond.wait(start + self.timeout - now)
+ now = time.time()
+ if now - start > self.timeout:
+ os.abort()
+ else:
+ while len(self.current) >= self.max:
+ self.tcond.wait()
+ th = reqthread(target=self.run, args=[req])
+ th.start()
+ while th.is_alive() and th not in self.current:
+ self.tcond.wait()
+
+ def run(self, req):
try:
- start = time.time()
- while self.inflight >= self.limit:
- self.lock.wait(10)
- self.waited(time.time() - start)
- self.inflight += 1
- return self
+ th = threading.current_thread()
+ with self.lk:
+ self.current.add(th)
+ self.tcond.notify_all()
+ try:
+ env = req.mkenv()
+ with perf.request(env) as reqevent:
+ respiter = req.handlewsgi(env, req.startreq)
+ for data in respiter:
+ req.write(data)
+ if req.status:
+ reqevent.response([req.status, req.headers])
+ req.flushreq()
+ self.ckflush(req)
+ except closed:
+ pass
+ except:
+ log.error("exception occurred when handling request", exc_info=True)
+ finally:
+ with self.lk:
+ self.current.remove(th)
+ self.tcond.notify_all()
finally:
- self.lock.release()
+ req.close()
+
+ def close(self):
+ while True:
+ with self.lk:
+ if len(self.current) > 0:
+ th = iter(self.current).next()
+ else:
+ return
+ th.join()
+
+class resplex(handler):
+ cname = "rplex"
+
+ def __init__(self, max=None, **kw):
+ super(resplex, self).__init__(**kw)
+ self.current = set()
+ self.lk = threading.Lock()
+ self.tcond = threading.Condition(self.lk)
+ self.max = max
+ self.cqueue = Queue.Queue(5)
+ self.cnpipe = os.pipe()
+ self.rthread = reqthread(name="Response thread", target=self.handle2)
+ self.rthread.start()
+
+ @classmethod
+ def parseargs(cls, max=None, **args):
+ ret = super(resplex, cls).parseargs(**args)
+ if max:
+ ret["max"] = int(max)
+ return ret
+
+ def ckflush(self, req):
+ raise Exception("resplex handler does not support the write() function")
- def __exit__(self, *excinfo):
- self.lock.acquire()
+ def handle(self, req):
+ with self.lk:
+ if self.max is not None:
+ while len(self.current) >= self.max:
+ self.tcond.wait()
+ th = reqthread(target=self.handle1, args=[req])
+ th.start()
+ while th.is_alive() and th not in self.current:
+ self.tcond.wait()
+
+ def handle1(self, req):
try:
- self.inflight -= 1
- self.lock.notify()
+ th = threading.current_thread()
+ with self.lk:
+ self.current.add(th)
+ self.tcond.notify_all()
+ try:
+ env = req.mkenv()
+ respobj = req.handlewsgi(env, req.startreq)
+ respiter = iter(respobj)
+ if not req.status:
+ log.error("request handler returned without calling start_request")
+ if hasattr(respiter, "close"):
+ respiter.close()
+ return
+ else:
+ self.cqueue.put((req, respiter))
+ os.write(self.cnpipe[1], " ")
+ req = None
+ finally:
+ with self.lk:
+ self.current.remove(th)
+ self.tcond.notify_all()
+ except closed:
+ pass
+ except:
+ log.error("exception occurred when handling request", exc_info=True)
finally:
- self.lock.release()
- return False
+ if req is not None:
+ req.close()
- def call(self, target):
- self.__enter__()
+ def handle2(self):
try:
- return target()
- finally:
- self.__exit__()
+ rp = self.cnpipe[0]
+ current = {}
-class abortlimiter(calllimiter):
- def waited(self, time):
- if time > 10:
+ def closereq(req):
+ respiter = current[req]
+ try:
+ if respiter is not None and hasattr(respiter, "close"):
+ respiter.close()
+ except:
+ log.error("exception occurred when closing iterator", exc_info=True)
+ try:
+ req.close()
+ except:
+ log.error("exception occurred when closing request", exc_info=True)
+ del current[req]
+ def ckiter(req):
+ respiter = current[req]
+ if respiter is not None:
+ rem = False
+ try:
+ data = respiter.next()
+ except StopIteration:
+ rem = True
+ try:
+ req.flushreq()
+ except:
+ log.error("exception occurred when handling response data", exc_info=True)
+ except:
+ rem = True
+ log.error("exception occurred when iterating response", exc_info=True)
+ if not rem:
+ if data:
+ try:
+ req.flushreq()
+ req.writedata(data)
+ except:
+ log.error("exception occurred when handling response data", exc_info=True)
+ rem = True
+ if rem:
+ current[req] = None
+ try:
+ if hasattr(respiter, "close"):
+ respiter.close()
+ except:
+ log.error("exception occurred when closing iterator", exc_info=True)
+ respiter = None
+ if respiter is None and not req.buffer:
+ closereq(req)
+
+ while True:
+ bufl = list(req for req in current.iterkeys() if req.buffer)
+ rls, wls, els = select.select([rp], bufl, [rp] + bufl)
+ if rp in rls:
+ ret = os.read(rp, 1024)
+ if not ret:
+ os.close(rp)
+ return
+ try:
+ while True:
+ req, respiter = self.cqueue.get(False)
+ current[req] = respiter
+ ckiter(req)
+ except Queue.Empty:
+ pass
+ for req in wls:
+ try:
+ req.flush()
+ except closed:
+ closereq(req)
+ except:
+ log.error("exception occurred when writing response", exc_info=True)
+ closereq(req)
+ else:
+ if len(req.buffer) < 65536:
+ ckiter(req)
+ except:
+ log.critical("unexpected exception occurred in response handler thread", exc_info=True)
os.abort()
+
+ def close(self):
+ while True:
+ with self.lk:
+ if len(self.current) > 0:
+ th = iter(self.current).next()
+ else:
+ break
+ th.join()
+ os.close(self.cnpipe[1])
+ self.rthread.join()
+
+names = dict((cls.cname, cls) for cls in globals().itervalues() if
+ isinstance(cls, type) and
+ issubclass(cls, handler) and
+ hasattr(cls, "cname"))
+
+def parsehspec(spec):
+ if ":" not in spec:
+ return spec, {}
+ nm, spec = spec.split(":", 1)
+ args = {}
+ while spec:
+ if "," in spec:
+ part, spec = spec.split(",", 1)
+ else:
+ part, spec = spec, None
+ if "=" in part:
+ key, val = part.split("=", 1)
+ else:
+ key, val = part, ""
+ args[key] = val
+ return nm, args
#!/usr/bin/python
-import sys, os, getopt, logging
+import sys, os, getopt, logging, platform
import socket
-import ashd.scgi, ashd.perf, ashd.serve
+import ashd.scgi, ashd.serve
try:
import pdm.srv
except:
pdm = None
def usage(out):
- out.write("usage: scgi-wsgi [-hAL] [-m PDM-SPEC] [-p MODPATH] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n")
+ out.write("usage: scgi-wsgi [-hAL] [-m PDM-SPEC] [-p MODPATH] [-t REQUEST-HANDLER[:PAR[=VAL](,PAR[=VAL])...]] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n")
sk = None
+hspec = "free", {}
modwsgi_compat = False
setlog = True
-opts, args = getopt.getopt(sys.argv[1:], "+hALp:T:m:")
+opts, args = getopt.getopt(sys.argv[1:], "+hALp:t:T:m:")
for o, a in opts:
if o == "-h":
usage(sys.stdout)
elif o == "-m":
if pdm is not None:
pdm.srv.listen(a)
+ elif o == "-t":
+ hspec = ashd.serve.parsehspec(a)
if len(args) < 1:
usage(sys.stderr)
sys.exit(1)
env["wsgi.run_once"] = False
return env
-class reqthread(ashd.serve.wsgithread):
- def __init__(self, sk):
- super(reqthread, self).__init__()
+class request(ashd.serve.wsgirequest):
+ def __init__(self, sk, **kw):
+ super(request, self).__init__(**kw)
self.bsk = sk.dup()
self.sk = self.bsk.makefile("r+")
- def handlewsgi(self):
- return handler(self.env, self.startreq)
+ def mkenv(self):
+ return mkenv(ashd.scgi.readhead(self.sk), self.sk)
+
+ def handlewsgi(self, env, startreq):
+ return handler(env, startreq)
+
+ _onjython = None
+ @staticmethod
+ def onjython():
+ if request._onjython is None:
+ request._onjython = ("java" in platform.system().lower())
+ return request._onjython
+
+ def fileno(self):
+ if request.onjython():
+ self.bsk.setblocking(False)
+ return self.bsk.fileno()
def writehead(self, status, headers):
- try:
- self.sk.write("Status: %s\n" % status)
- for nm, val in headers:
- self.sk.write("%s: %s\n" % (nm, val))
- self.sk.write("\n")
- except IOError:
- raise ashd.serve.closed()
+ w = self.buffer.extend
+ w("Status: %s\n" % status)
+ for nm, val in headers:
+ w("%s: %s\n" % (nm, val))
+ w("\n")
- def writedata(self, data):
+ def flush(self):
try:
- self.sk.write(data)
- self.sk.flush()
+ if not request.onjython():
+ ret = self.bsk.send(self.buffer, socket.MSG_DONTWAIT)
+ else:
+ ret = self.bsk.send(self.buffer)
+ self.buffer[:ret] = ""
except IOError:
raise ashd.serve.closed()
- def handle(self):
- head = ashd.scgi.readhead(self.sk)
- self.env = mkenv(head, self.sk)
- reqevent = ashd.perf.request(self.env)
- exc = (None, None, None)
- try:
- super(reqthread, self).handle()
- if self.status:
- reqevent.response([self.status, self.headers])
- except:
- exc = sys.exc_info()
- raise
- finally:
- reqevent.__exit__(*exc)
+ def close(self):
+ self.sk.close()
+ self.bsk.close()
+
+if hspec[0] not in ashd.serve.names:
+ sys.stderr.write("scgi-wsgi: no such request handler: %s\n" % hspec[0])
+ sys.exit(1)
+hclass = ashd.serve.names[hspec[0]]
+try:
+ hargs = hclass.parseargs(**hspec[1])
+except ValueError as exc:
+ sys.stderr.write("scgi-wsgi: %s\n" % exc)
+ sys.exit(1)
- def run(self):
+reqhandler = hclass(**hargs)
+try:
+ while True:
+ nsk, addr = sk.accept()
try:
- super(reqthread, self).run()
+ reqhandler.handle(request(sk=nsk, handler=reqhandler))
finally:
- self.sk.close()
- self.bsk.close()
-
-while True:
- nsk, addr = sk.accept()
- try:
- reqthread(nsk).start()
- finally:
- nsk.close()
+ nsk.close()
+finally:
+ reqhandler.close()