python: Back-ported the new request handling to Python2.
authorFredrik Tolf <fredrik@dolda2000.com>
Mon, 6 Jan 2014 02:53:35 +0000 (03:53 +0100)
committerFredrik Tolf <fredrik@dolda2000.com>
Mon, 6 Jan 2014 02:53:35 +0000 (03:53 +0100)
python/ashd-wsgi
python/ashd/serve.py
python/scgi-wsgi

index 01c158f..f1f40b2 100755 (executable)
@@ -1,19 +1,19 @@
 #!/usr/bin/python
 
-import sys, os, getopt, threading, logging, time
-import ashd.proto, ashd.util, ashd.perf, ashd.serve
+import sys, os, getopt, socket, logging, time
+import ashd.util, ashd.serve
 try:
     import pdm.srv
 except:
     pdm = None
 
 def usage(out):
-    out.write("usage: ashd-wsgi [-hAL] [-m PDM-SPEC] [-p MODPATH] [-l REQLIMIT] HANDLER-MODULE [ARGS...]\n")
+    out.write("usage: ashd-wsgi [-hAL] [-m PDM-SPEC] [-p MODPATH] [-l REQLIMIT] [-t REQUEST-HANDLER[:PAR[=VAL](,PAR[=VAL])...]] HANDLER-MODULE [ARGS...]\n")
 
-reqlimit = 0
+hspec = "free", {}
 modwsgi_compat = False
 setlog = True
-opts, args = getopt.getopt(sys.argv[1:], "+hAp:l:m:")
+opts, args = getopt.getopt(sys.argv[1:], "+hALp:t:l:m:")
 for o, a in opts:
     if o == "-h":
         usage(sys.stdout)
@@ -25,7 +25,9 @@ for o, a in opts:
     elif o == "-A":
         modwsgi_compat = True
     elif o == "-l":
-        reqlimit = int(a)
+        hspec = "free", {"max": a, "abort": "10"}
+    elif o == "-t":
+        hspec = ashd.serve.parsehspec(a)
     elif o == "-m":
         if pdm is not None:
             pdm.srv.listen(a)
@@ -145,56 +147,52 @@ def mkenv(req):
     env["wsgi.run_once"] = False
     return env
 
-if reqlimit != 0:
-    guard = ashd.serve.abortlimiter(reqlimit).call
-else:
-    guard = lambda fun: fun()
+class request(ashd.serve.wsgirequest):
+    def __init__(self, bkreq, **kw):
+        super(request, self).__init__(**kw)
+        self.bkreq = bkreq.dup()
+
+    def mkenv(self):
+        return mkenv(self.bkreq)
 
-class reqthread(ashd.serve.wsgithread):
-    def __init__(self, req):
-        super(reqthread, self).__init__()
-        self.req = req.dup()
-    
-    def handlewsgi(self):
-        return handler(self.env, self.startreq)
+    def handlewsgi(self, env, startreq):
+        return handler(env, startreq)
+
+    def fileno(self):
+        return self.bkreq.bsk.fileno()
 
     def writehead(self, status, headers):
-        try:
-            self.req.sk.write("HTTP/1.1 %s\n" % status)
-            for nm, val in headers:
-                self.req.sk.write("%s: %s\n" % (nm, val))
-            self.req.sk.write("\n")
-        except IOError:
-            raise ashd.serve.closed()
+        w = self.buffer.extend
+        w("HTTP/1.1 %s\n" % status)
+        for nm, val in headers:
+            w("%s: %s\n" % (nm, val))
+        w("\n")
 
-    def writedata(self, data):
+    def flush(self):
         try:
-            self.req.sk.write(data)
-            self.req.sk.flush()
+            ret = self.bkreq.bsk.send(self.buffer, socket.MSG_DONTWAIT)
+            self.buffer[:ret] = ""
         except IOError:
             raise ashd.serve.closed()
 
-    def handle(self):
-        self.env = mkenv(self.req)
-        reqevent = ashd.perf.request(self.env)
-        exc = (None, None, None)
-        try:
-            super(reqthread, self).handle()
-            if self.status:
-                reqevent.response([self.status, self.headers])
-        except:
-            exc = sys.exc_info()
-            raise
-        finally:
-            reqevent.__exit__(*exc)
-
-    def run(self):
-        try:
-            guard(super(reqthread, self).run)
-        finally:
-            self.req.close()
-    
+    def close(self):
+        self.bkreq.close()
+
 def handle(req):
-    reqthread(req).start()
+    reqhandler.handle(request(bkreq=req, handler=reqhandler))
 
-ashd.util.serveloop(handle)
+if hspec[0] not in ashd.serve.names:
+    sys.stderr.write("ashd-wsgi: no such request handler: %s\n" % hspec[0])
+    sys.exit(1)
+hclass = ashd.serve.names[hspec[0]]
+try:
+    hargs = hclass.parseargs(**hspec[1])
+except ValueError as exc:
+    sys.stderr.write("ashd-wsgi: %s\n" % exc)
+    sys.exit(1)
+
+reqhandler = hclass(**hargs)
+try:
+    ashd.util.serveloop(handle)
+finally:
+    reqhandler.close()
index 14170d7..0197d47 100644 (file)
@@ -1,4 +1,5 @@
-import os, threading, time, logging
+import sys, os, threading, time, logging, select, Queue
+import perf
 
 log = logging.getLogger("ashd.serve")
 seq = 1
@@ -6,52 +7,41 @@ seqlk = threading.Lock()
 
 def reqseq():
     global seq
-    seqlk.acquire()
-    try:
+    with seqlk:
         s = seq
         seq += 1
         return s
-    finally:
-        seqlk.release()
-
-class reqthread(threading.Thread):
-    def __init__(self, name=None):
-        if name is None:
-            name = "Request handler %i" % reqseq()
-        super(reqthread, self).__init__(name=name)
-
-    def handle(self):
-        raise Exception()
-
-    def run(self):
-        try:
-            self.handle()
-        except:
-            log.error("exception occurred when handling request", exc_info=True)
 
 class closed(IOError):
     def __init__(self):
         super(closed, self).__init__("The client has closed the connection.")
 
-class wsgithread(reqthread):
-    def __init__(self, **kwargs):
-        super(wsgithread, self).__init__(**kwargs)
+class reqthread(threading.Thread):
+    def __init__(self, name=None, **kw):
+        if name is None:
+            name = "Request handler %i" % reqseq()
+        super(reqthread, self).__init__(name=name, **kw)
+
+class wsgirequest(object):
+    def __init__(self, handler):
         self.status = None
         self.headers = []
         self.respsent = False
+        self.handler = handler
+        self.buffer = bytearray()
 
     def handlewsgi(self):
         raise Exception()
+    def fileno(self):
+        raise Exception()
     def writehead(self, status, headers):
         raise Exception()
-    def writedata(self, data):
+    def flush(self):
         raise Exception()
-
-    def write(self, data):
-        if not data:
-            return
-        self.flushreq()
-        self.writedata(data)
+    def close(self):
+        pass
+    def writedata(self, data):
+        self.buffer.extend(data)
 
     def flushreq(self):
         if not self.respsent:
@@ -60,73 +50,313 @@ class wsgithread(reqthread):
             self.respsent = True
             self.writehead(self.status, self.headers)
 
+    def write(self, data):
+        if not data:
+            return
+        self.flushreq()
+        self.writedata(data)
+        self.handler.ckflush(self)
+
     def startreq(self, status, headers, exc_info=None):
         if self.status:
-            if exc_info:                # Nice calling convetion ^^
+            if exc_info:
                 try:
                     if self.respsent:
-                        raise exc_info[0], exc_info[1], exc_info[2]
+                        raise exc_info[1]
                 finally:
-                    exc_info = None     # CPython GC bug?
+                    exc_info = None
             else:
                 raise Exception("Can only start responding once.")
         self.status = status
         self.headers = headers
         return self.write
-    def handle(self):
+
+class handler(object):
+    def handle(self, request):
+        raise Exception()
+    def ckflush(self, req):
+        while len(req.buffer) > 0:
+            rls, wls, els = select.select([], [req], [req])
+            req.flush()
+    def close(self):
+        pass
+
+    @classmethod
+    def parseargs(cls, **args):
+        if len(args) > 0:
+            raise ValueError("unknown handler argument: " + iter(args).next())
+        return {}
+
+class single(handler):
+    cname = "single"
+
+    def handle(self, req):
         try:
-            respiter = self.handlewsgi()
-            try:
+            env = req.mkenv()
+            with perf.request(env) as reqevent:
+                respiter = req.handlewsgi(env, req.startreq)
                 for data in respiter:
-                    self.write(data)
-                if self.status:
-                    self.flushreq()
-            finally:
-                if hasattr(respiter, "close"):
-                    respiter.close()
+                    req.write(data)
+                if req.status:
+                    reqevent.response([req.status, req.headers])
+                    req.flushreq()
+                self.ckflush(req)
         except closed:
             pass
+        except:
+            log.error("exception occurred when handling request", exc_info=True)
+        finally:
+            req.close()
+
+class freethread(handler):
+    cname = "free"
 
-class calllimiter(object):
-    def __init__(self, limit):
-        self.limit = limit
-        self.lock = threading.Condition()
-        self.inflight = 0
+    def __init__(self, max=None, timeout=None, **kw):
+        super(freethread, self).__init__(**kw)
+        self.current = set()
+        self.lk = threading.Lock()
+        self.tcond = threading.Condition(self.lk)
+        self.max = max
+        self.timeout = timeout
 
-    def waited(self, time):
-        if time > 10:
-            raise RuntimeError("Waited too long")
+    @classmethod
+    def parseargs(cls, max=None, abort=None, **args):
+        ret = super(freethread, cls).parseargs(**args)
+        if max:
+            ret["max"] = int(max)
+        if abort:
+            ret["timeout"] = int(abort)
+        return ret
 
-    def __enter__(self):
-        self.lock.acquire()
+    def handle(self, req):
+        with self.lk:
+            if self.max is not None:
+                if self.timeout is not None:
+                    now = start = time.time()
+                    while len(self.current) >= self.max:
+                        self.tcond.wait(start + self.timeout - now)
+                        now = time.time()
+                        if now - start > self.timeout:
+                            os.abort()
+                else:
+                    while len(self.current) >= self.max:
+                        self.tcond.wait()
+            th = reqthread(target=self.run, args=[req])
+            th.start()
+            while th.is_alive() and th not in self.current:
+                self.tcond.wait()
+
+    def run(self, req):
         try:
-            start = time.time()
-            while self.inflight >= self.limit:
-                self.lock.wait(10)
-                self.waited(time.time() - start)
-            self.inflight += 1
-            return self
+            th = threading.current_thread()
+            with self.lk:
+                self.current.add(th)
+                self.tcond.notify_all()
+            try:
+                env = req.mkenv()
+                with perf.request(env) as reqevent:
+                    respiter = req.handlewsgi(env, req.startreq)
+                    for data in respiter:
+                        req.write(data)
+                    if req.status:
+                        reqevent.response([req.status, req.headers])
+                        req.flushreq()
+                    self.ckflush(req)
+            except closed:
+                pass
+            except:
+                log.error("exception occurred when handling request", exc_info=True)
+            finally:
+                with self.lk:
+                    self.current.remove(th)
+                    self.tcond.notify_all()
         finally:
-            self.lock.release()
+            req.close()
+
+    def close(self):
+        while True:
+            with self.lk:
+                if len(self.current) > 0:
+                    th = iter(self.current).next()
+                else:
+                    return
+            th.join()
+
+class resplex(handler):
+    cname = "rplex"
+
+    def __init__(self, max=None, **kw):
+        super(resplex, self).__init__(**kw)
+        self.current = set()
+        self.lk = threading.Lock()
+        self.tcond = threading.Condition(self.lk)
+        self.max = max
+        self.cqueue = Queue.Queue(5)
+        self.cnpipe = os.pipe()
+        self.rthread = reqthread(name="Response thread", target=self.handle2)
+        self.rthread.start()
+
+    @classmethod
+    def parseargs(cls, max=None, **args):
+        ret = super(resplex, cls).parseargs(**args)
+        if max:
+            ret["max"] = int(max)
+        return ret
+
+    def ckflush(self, req):
+        raise Exception("resplex handler does not support the write() function")
 
-    def __exit__(self, *excinfo):
-        self.lock.acquire()
+    def handle(self, req):
+        with self.lk:
+            if self.max is not None:
+                while len(self.current) >= self.max:
+                    self.tcond.wait()
+            th = reqthread(target=self.handle1, args=[req])
+            th.start()
+            while th.is_alive() and th not in self.current:
+                self.tcond.wait()
+
+    def handle1(self, req):
         try:
-            self.inflight -= 1
-            self.lock.notify()
+            th = threading.current_thread()
+            with self.lk:
+                self.current.add(th)
+                self.tcond.notify_all()
+            try:
+                env = req.mkenv()
+                respobj = req.handlewsgi(env, req.startreq)
+                respiter = iter(respobj)
+                if not req.status:
+                    log.error("request handler returned without calling start_request")
+                    if hasattr(respiter, "close"):
+                        respiter.close()
+                    return
+                else:
+                    self.cqueue.put((req, respiter))
+                    os.write(self.cnpipe[1], " ")
+                    req = None
+            finally:
+                with self.lk:
+                    self.current.remove(th)
+                    self.tcond.notify_all()
+        except closed:
+            pass
+        except:
+            log.error("exception occurred when handling request", exc_info=True)
         finally:
-            self.lock.release()
-        return False
+            if req is not None:
+                req.close()
 
-    def call(self, target):
-        self.__enter__()
+    def handle2(self):
         try:
-            return target()
-        finally:
-            self.__exit__()
+            rp = self.cnpipe[0]
+            current = {}
 
-class abortlimiter(calllimiter):
-    def waited(self, time):
-        if time > 10:
+            def closereq(req):
+                respiter = current[req]
+                try:
+                    if respiter is not None and hasattr(respiter, "close"):
+                        respiter.close()
+                except:
+                    log.error("exception occurred when closing iterator", exc_info=True)
+                try:
+                    req.close()
+                except:
+                    log.error("exception occurred when closing request", exc_info=True)
+                del current[req]
+            def ckiter(req):
+                respiter = current[req]
+                if respiter is not None:
+                    rem = False
+                    try:
+                        data = respiter.next()
+                    except StopIteration:
+                        rem = True
+                        try:
+                            req.flushreq()
+                        except:
+                            log.error("exception occurred when handling response data", exc_info=True)
+                    except:
+                        rem = True
+                        log.error("exception occurred when iterating response", exc_info=True)
+                    if not rem:
+                        if data:
+                            try:
+                                req.flushreq()
+                                req.writedata(data)
+                            except:
+                                log.error("exception occurred when handling response data", exc_info=True)
+                                rem = True
+                    if rem:
+                        current[req] = None
+                        try:
+                            if hasattr(respiter, "close"):
+                                respiter.close()
+                        except:
+                            log.error("exception occurred when closing iterator", exc_info=True)
+                        respiter = None
+                if respiter is None and not req.buffer:
+                    closereq(req)
+
+            while True:
+                bufl = list(req for req in current.iterkeys() if req.buffer)
+                rls, wls, els = select.select([rp], bufl, [rp] + bufl)
+                if rp in rls:
+                    ret = os.read(rp, 1024)
+                    if not ret:
+                        os.close(rp)
+                        return
+                    try:
+                        while True:
+                            req, respiter = self.cqueue.get(False)
+                            current[req] = respiter
+                            ckiter(req)
+                    except Queue.Empty:
+                        pass
+                for req in wls:
+                    try:
+                        req.flush()
+                    except closed:
+                        closereq(req)
+                    except:
+                        log.error("exception occurred when writing response", exc_info=True)
+                        closereq(req)
+                    else:
+                        if len(req.buffer) < 65536:
+                            ckiter(req)
+        except:
+            log.critical("unexpected exception occurred in response handler thread", exc_info=True)
             os.abort()
+
+    def close(self):
+        while True:
+            with self.lk:
+                if len(self.current) > 0:
+                    th = iter(self.current).next()
+                else:
+                    break
+            th.join()
+        os.close(self.cnpipe[1])
+        self.rthread.join()
+
+names = dict((cls.cname, cls) for cls in globals().itervalues() if
+             isinstance(cls, type) and
+             issubclass(cls, handler) and
+             hasattr(cls, "cname"))
+
+def parsehspec(spec):
+    if ":" not in spec:
+        return spec, {}
+    nm, spec = spec.split(":", 1)
+    args = {}
+    while spec:
+        if "," in spec:
+            part, spec = spec.split(",", 1)
+        else:
+            part, spec = spec, None
+        if "=" in part:
+            key, val = part.split("=", 1)
+        else:
+            key, val = part, ""
+        args[key] = val
+    return nm, args
index 2cf715b..6e04f20 100755 (executable)
@@ -1,20 +1,21 @@
 #!/usr/bin/python
 
-import sys, os, getopt, logging
+import sys, os, getopt, logging, platform
 import socket
-import ashd.scgi, ashd.perf, ashd.serve
+import ashd.scgi, ashd.serve
 try:
     import pdm.srv
 except:
     pdm = None
 
 def usage(out):
-    out.write("usage: scgi-wsgi [-hAL] [-m PDM-SPEC] [-p MODPATH] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n")
+    out.write("usage: scgi-wsgi [-hAL] [-m PDM-SPEC] [-p MODPATH] [-t REQUEST-HANDLER[:PAR[=VAL](,PAR[=VAL])...]] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n")
 
 sk = None
+hspec = "free", {}
 modwsgi_compat = False
 setlog = True
-opts, args = getopt.getopt(sys.argv[1:], "+hALp:T:m:")
+opts, args = getopt.getopt(sys.argv[1:], "+hALp:t:T:m:")
 for o, a in opts:
     if o == "-h":
         usage(sys.stdout)
@@ -40,6 +41,8 @@ for o, a in opts:
     elif o == "-m":
         if pdm is not None:
             pdm.srv.listen(a)
+    elif o == "-t":
+        hspec = ashd.serve.parsehspec(a)
 if len(args) < 1:
     usage(sys.stderr)
     sys.exit(1)
@@ -84,56 +87,68 @@ def mkenv(head, sk):
     env["wsgi.run_once"] = False
     return env
 
-class reqthread(ashd.serve.wsgithread):
-    def __init__(self, sk):
-        super(reqthread, self).__init__()
+class request(ashd.serve.wsgirequest):
+    def __init__(self, sk, **kw):
+        super(request, self).__init__(**kw)
         self.bsk = sk.dup()
         self.sk = self.bsk.makefile("r+")
 
-    def handlewsgi(self):
-        return handler(self.env, self.startreq)
+    def mkenv(self):
+        return mkenv(ashd.scgi.readhead(self.sk), self.sk)
+
+    def handlewsgi(self, env, startreq):
+        return handler(env, startreq)
+
+    _onjython = None
+    @staticmethod
+    def onjython():
+        if request._onjython is None:
+            request._onjython = ("java" in platform.system().lower())
+        return request._onjython
+
+    def fileno(self):
+        if request.onjython():
+            self.bsk.setblocking(False)
+        return self.bsk.fileno()
 
     def writehead(self, status, headers):
-        try:
-            self.sk.write("Status: %s\n" % status)
-            for nm, val in headers:
-                self.sk.write("%s: %s\n" % (nm, val))
-            self.sk.write("\n")
-        except IOError:
-            raise ashd.serve.closed()
+        w = self.buffer.extend
+        w("Status: %s\n" % status)
+        for nm, val in headers:
+            w("%s: %s\n" % (nm, val))
+        w("\n")
 
-    def writedata(self, data):
+    def flush(self):
         try:
-            self.sk.write(data)
-            self.sk.flush()
+            if not request.onjython():
+                ret = self.bsk.send(self.buffer, socket.MSG_DONTWAIT)
+            else:
+                ret = self.bsk.send(self.buffer)
+            self.buffer[:ret] = ""
         except IOError:
             raise ashd.serve.closed()
 
-    def handle(self):
-        head = ashd.scgi.readhead(self.sk)
-        self.env = mkenv(head, self.sk)
-        reqevent = ashd.perf.request(self.env)
-        exc = (None, None, None)
-        try:
-            super(reqthread, self).handle()
-            if self.status:
-                reqevent.response([self.status, self.headers])
-        except:
-            exc = sys.exc_info()
-            raise
-        finally:
-            reqevent.__exit__(*exc)
+    def close(self):
+        self.sk.close()
+        self.bsk.close()
+
+if hspec[0] not in ashd.serve.names:
+    sys.stderr.write("scgi-wsgi: no such request handler: %s\n" % hspec[0])
+    sys.exit(1)
+hclass = ashd.serve.names[hspec[0]]
+try:
+    hargs = hclass.parseargs(**hspec[1])
+except ValueError as exc:
+    sys.stderr.write("scgi-wsgi: %s\n" % exc)
+    sys.exit(1)
 
-    def run(self):
+reqhandler = hclass(**hargs)
+try:
+    while True:
+        nsk, addr = sk.accept()
         try:
-            super(reqthread, self).run()
+            reqhandler.handle(request(sk=nsk, handler=reqhandler))
         finally:
-            self.sk.close()
-            self.bsk.close()
-
-while True:
-    nsk, addr = sk.accept()
-    try:
-        reqthread(nsk).start()
-    finally:
-        nsk.close()
+            nsk.close()
+finally:
+    reqhandler.close()