#include <ctype.h>
#include <glob.h>
#include <libgen.h>
+#include <sys/socket.h>
#include <errno.h>
#ifdef HAVE_CONFIG_H
static int stdhandle(struct child *ch, struct hthead *req, int fd, void (*chinit)(void *), void *idata)
{
struct stdchild *i = ch->pdata;
+ int serr;
if(i->type == CH_SOCKET) {
if(i->fd < 0)
i->fd = stdmkchild(i->argv, chinit, idata);
- if(sendreq(i->fd, req, fd)) {
- if((errno == EPIPE) || (errno == ECONNRESET)) {
+ if(sendreq2(i->fd, req, fd, MSG_NOSIGNAL | MSG_DONTWAIT)) {
+ serr = errno;
+ if((serr == EPIPE) || (serr == ECONNRESET)) {
/* Assume that the child has crashed and restart it. */
close(i->fd);
i->fd = stdmkchild(i->argv, chinit, idata);
- if(!sendreq(i->fd, req, fd))
+ if(!sendreq2(i->fd, req, fd, MSG_NOSIGNAL | MSG_DONTWAIT))
return(0);
}
- flog(LOG_ERR, "could not pass on request to child %s: %s", ch->name, strerror(errno));
- close(i->fd);
- i->fd = -1;
+ flog(LOG_ERR, "could not pass on request to child %s: %s", ch->name, strerror(serr));
+ if(serr != EAGAIN) {
+ close(i->fd);
+ i->fd = -1;
+ }
return(-1);
}
} else if(i->type == CH_FORK) {
int sendfd(int sock, int fd, char *data, size_t datalen)
{
- return(sendfd2(sock, fd, data, datalen, MSG_NOSIGNAL | MSG_DONTWAIT));
+ return(sendfd2(sock, fd, data, datalen, MSG_NOSIGNAL));
}
int recvfd(int sock, char **data, size_t *datalen)
int sendreq(int sock, struct hthead *req, int fd)
{
- return(sendreq2(sock, req, fd, MSG_NOSIGNAL | MSG_DONTWAIT));
+ return(sendreq2(sock, req, fd, MSG_NOSIGNAL));
}
int recvreq(int sock, struct hthead **reqp)
#!/usr/bin/python
import sys, os, getopt, threading, logging, time
-import ashd.proto, ashd.util, ashd.perf
+import ashd.proto, ashd.util, ashd.perf, ashd.serve
try:
import pdm.srv
except:
sys.exit(1)
handler = handlermod.application
-class closed(IOError):
- def __init__(self):
- super(closed, self).__init__("The client has closed the connection.")
-
cwd = os.getcwd()
def absolutify(path):
if path[0] != '/':
buf += c
return buf
-def dowsgi(req):
+def mkenv(req):
env = {}
env["wsgi.version"] = 1, 0
for key, val in req.headers:
env["wsgi.multithread"] = True
env["wsgi.multiprocess"] = False
env["wsgi.run_once"] = False
+ return env
- resp = []
- respsent = []
+if reqlimit != 0:
+ guard = ashd.serve.abortlimiter(reqlimit).call
+else:
+ guard = lambda fun: fun()
- def flushreq():
- if not respsent:
- if not resp:
- raise Exception, "Trying to write data before starting response."
- status, headers = resp
- respsent[:] = [True]
- try:
- req.sk.write("HTTP/1.1 %s\n" % status)
- for nm, val in headers:
- req.sk.write("%s: %s\n" % (nm, val))
- req.sk.write("\n")
- except IOError:
- raise closed()
+class reqthread(ashd.serve.wsgithread):
+ def __init__(self, req):
+ super(reqthread, self).__init__()
+ self.req = req.dup()
+
+ def handlewsgi(self):
+ return handler(self.env, self.startreq)
- def write(data):
- if not data:
- return
- flushreq()
+ def writehead(self, status, headers):
try:
- req.sk.write(data)
- req.sk.flush()
+ self.req.sk.write("HTTP/1.1 %s\n" % status)
+ for nm, val in headers:
+ self.req.sk.write("%s: %s\n" % (nm, val))
+ self.req.sk.write("\n")
except IOError:
- raise closed()
+ raise ashd.serve.closed()
- def startreq(status, headers, exc_info = None):
- if resp:
- if exc_info: # Interesting, this...
- try:
- if respsent:
- raise exc_info[0], exc_info[1], exc_info[2]
- finally:
- exc_info = None # CPython GC bug?
- else:
- raise Exception, "Can only start responding once."
- resp[:] = status, headers
- return write
-
- reqevent = ashd.perf.request(env)
- exc = (None, None, None)
- try:
+ def writedata(self, data):
try:
- respiter = handler(env, startreq)
- try:
- for data in respiter:
- write(data)
- if resp:
- flushreq()
- finally:
- if hasattr(respiter, "close"):
- respiter.close()
- except closed:
- pass
- if resp:
- reqevent.response(resp)
- except:
- exc = sys.exc_info()
- raise
- finally:
- reqevent.__exit__(*exc)
+ self.req.sk.write(data)
+ self.req.sk.flush()
+ except IOError:
+ raise ashd.serve.closed()
-flightlock = threading.Condition()
-inflight = 0
+ def handle(self):
+ self.env = mkenv(self.req)
+ reqevent = ashd.perf.request(self.env)
+ exc = (None, None, None)
+ try:
+ super(reqthread, self).handle()
+ if self.status:
+ reqevent.response([self.status, self.headers])
+ except:
+ exc = sys.exc_info()
+ raise
+ finally:
+ reqevent.__exit__(*exc)
-class reqthread(threading.Thread):
- def __init__(self, req):
- super(reqthread, self).__init__(name = "Request handler")
- self.req = req.dup()
-
def run(self):
- global inflight
try:
- flightlock.acquire()
- try:
- if reqlimit != 0:
- start = time.time()
- while inflight >= reqlimit:
- flightlock.wait(10)
- if time.time() - start > 10:
- os.abort()
- inflight += 1
- finally:
- flightlock.release()
- try:
- dowsgi(self.req)
- finally:
- flightlock.acquire()
- try:
- inflight -= 1
- flightlock.notify()
- finally:
- flightlock.release()
- except:
- log.error("exception occurred in handler thread", exc_info=True)
+ guard(super(reqthread, self).run)
finally:
self.req.close()
-import sys
-import threading
-
class protoerr(Exception):
pass
-class closed(IOError):
- def __init__(self):
- super(closed, self).__init__("The client has closed the connection.")
-
def readns(sk):
hln = 0
while True:
ret[parts[i]] = parts[i + 1]
i += 2
return ret
-
-class reqthread(threading.Thread):
- def __init__(self, sk, handler):
- super(reqthread, self).__init__(name = "SCGI request handler")
- self.bsk = sk.dup()
- self.sk = self.bsk.makefile("r+")
- self.handler = handler
-
- def run(self):
- try:
- head = readhead(self.sk)
- self.handler(head, self.sk)
- finally:
- self.sk.close()
- self.bsk.close()
-
-def handlescgi(sk, handler):
- t = reqthread(sk, handler)
- t.start()
-
-def servescgi(socket, handler):
- while True:
- nsk, addr = socket.accept()
- try:
- handlescgi(nsk, handler)
- finally:
- nsk.close()
-
-def wrapwsgi(handler):
- def handle(head, sk):
- env = dict(head)
- env["wsgi.version"] = 1, 0
- if "HTTP_X_ASH_PROTOCOL" in env:
- env["wsgi.url_scheme"] = env["HTTP_X_ASH_PROTOCOL"]
- elif "HTTPS" in env:
- env["wsgi.url_scheme"] = "https"
- else:
- env["wsgi.url_scheme"] = "http"
- env["wsgi.input"] = sk
- env["wsgi.errors"] = sys.stderr
- env["wsgi.multithread"] = True
- env["wsgi.multiprocess"] = False
- env["wsgi.run_once"] = False
-
- resp = []
- respsent = []
-
- def flushreq():
- if not respsent:
- if not resp:
- raise Exception, "Trying to write data before starting response."
- status, headers = resp
- respsent[:] = [True]
- try:
- sk.write("Status: %s\n" % status)
- for nm, val in headers:
- sk.write("%s: %s\n" % (nm, val))
- sk.write("\n")
- except IOError:
- raise closed()
-
- def write(data):
- if not data:
- return
- flushreq()
- try:
- sk.write(data)
- sk.flush()
- except IOError:
- raise closed()
-
- def startreq(status, headers, exc_info = None):
- if resp:
- if exc_info: # Interesting, this...
- try:
- if respsent:
- raise exc_info[0], exc_info[1], exc_info[2]
- finally:
- exc_info = None # CPython GC bug?
- else:
- raise Exception, "Can only start responding once."
- resp[:] = status, headers
- return write
-
- respiter = handler(env, startreq)
- try:
- try:
- for data in respiter:
- write(data)
- if resp:
- flushreq()
- except closed:
- pass
- finally:
- if hasattr(respiter, "close"):
- respiter.close()
- return handle
--- /dev/null
+import os, threading, time, logging
+
+log = logging.getLogger("ashd.serve")
+seq = 1
+seqlk = threading.Lock()
+
+def reqseq():
+ global seq
+ seqlk.acquire()
+ try:
+ s = seq
+ seq += 1
+ return s
+ finally:
+ seqlk.release()
+
+class reqthread(threading.Thread):
+ def __init__(self, name=None):
+ if name is None:
+ name = "Request handler %i" % reqseq()
+ super(reqthread, self).__init__(name=name)
+
+ def handle(self):
+ raise Exception()
+
+ def run(self):
+ try:
+ self.handle()
+ except:
+ log.error("exception occurred when handling request", exc_info=True)
+
+class closed(IOError):
+ def __init__(self):
+ super(closed, self).__init__("The client has closed the connection.")
+
+class wsgithread(reqthread):
+ def __init__(self, **kwargs):
+ super(wsgithread, self).__init__(**kwargs)
+ self.status = None
+ self.headers = []
+ self.respsent = False
+
+ def handlewsgi(self):
+ raise Exception()
+ def writehead(self, status, headers):
+ raise Exception()
+ def writedata(self, data):
+ raise Exception()
+
+ def write(self, data):
+ if not data:
+ return
+ self.flushreq()
+ self.writedata(data)
+
+ def flushreq(self):
+ if not self.respsent:
+ if not self.status:
+ raise Exception("Cannot send response body before starting response.")
+ self.respsent = True
+ self.writehead(self.status, self.headers)
+
+ def startreq(self, status, headers, exc_info=None):
+ if self.status:
+ if exc_info: # Nice calling convetion ^^
+ try:
+ if self.respsent:
+ raise exc_info[0], exc_info[1], exc_info[2]
+ finally:
+ exc_info = None # CPython GC bug?
+ else:
+ raise Exception("Can only start responding once.")
+ self.status = status
+ self.headers = headers
+ return self.write
+
+ def handle(self):
+ try:
+ respiter = self.handlewsgi()
+ try:
+ for data in respiter:
+ self.write(data)
+ if self.status:
+ self.flushreq()
+ finally:
+ if hasattr(respiter, "close"):
+ respiter.close()
+ except closed:
+ pass
+
+class calllimiter(object):
+ def __init__(self, limit):
+ self.limit = limit
+ self.lock = threading.Condition()
+ self.inflight = 0
+
+ def waited(self, time):
+ if time > 10:
+ raise RuntimeError("Waited too long")
+
+ def __enter__(self):
+ self.lock.acquire()
+ try:
+ start = time.time()
+ while self.inflight >= self.limit:
+ self.lock.wait(10)
+ self.waited(time.time() - start)
+ self.inflight += 1
+ return self
+ finally:
+ self.lock.release()
+
+ def __exit__(self, *excinfo):
+ self.lock.acquire()
+ try:
+ self.inflight -= 1
+ self.lock.notify()
+ finally:
+ self.lock.release()
+ return False
+
+ def call(self, target):
+ self.__enter__()
+ try:
+ return target()
+ finally:
+ self.__exit__()
+
+class abortlimiter(calllimiter):
+ def waited(self, time):
+ if time > 10:
+ os.abort()
SYNOPSIS
--------
-*scgi-wsgi* [*-hA*] [*-p* 'MODPATH'] [*-T* \[HOST:]'PORT'] 'HANDLER-MODULE' ['ARGS'...]
+*scgi-wsgi* [*-hA*] [*-m* 'PDM-SPEC'] [*-p* 'MODPATH'] [*-T* \[HOST:]'PORT'] 'HANDLER-MODULE' ['ARGS'...]
DESCRIPTION
-----------
address listening for connections on 'PORT' instead. If 'HOST'
is not given, `localhost` is used by default.
+*-m* 'PDM-SPEC'::
+
+ If the PDM library is installed on the system, create a
+ listening socket for connection PDM clients according to
+ 'PDM-SPEC'.
+
AUTHOR
------
Fredrik Tolf <fredrik@dolda2000.com>
import sys, os, getopt, logging
import socket
-import ashd.scgi
+import ashd.scgi, ashd.perf, ashd.serve
+try:
+ import pdm.srv
+except:
+ pdm = None
def usage(out):
- out.write("usage: scgi-wsgi [-hAL] [-p MODPATH] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n")
+ out.write("usage: scgi-wsgi [-hAL] [-m PDM-SPEC] [-p MODPATH] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n")
sk = None
modwsgi_compat = False
setlog = True
-opts, args = getopt.getopt(sys.argv[1:], "+hALp:T:")
+opts, args = getopt.getopt(sys.argv[1:], "+hALp:T:m:")
for o, a in opts:
if o == "-h":
usage(sys.stdout)
sk.listen(32)
elif o == "-A":
modwsgi_compat = True
+ elif o == "-m":
+ if pdm is not None:
+ pdm.srv.listen(a)
if len(args) < 1:
usage(sys.stderr)
sys.exit(1)
sys.exit(1)
handler = handlermod.application
-ashd.scgi.servescgi(sk, ashd.scgi.wrapwsgi(handler))
+def mkenv(head, sk):
+ env = dict(head)
+ env["wsgi.version"] = 1, 0
+ if "HTTP_X_ASH_PROTOCOL" in env:
+ env["wsgi.url_scheme"] = env["HTTP_X_ASH_PROTOCOL"]
+ elif "HTTPS" in env:
+ env["wsgi.url_scheme"] = "https"
+ else:
+ env["wsgi.url_scheme"] = "http"
+ env["wsgi.input"] = sk
+ env["wsgi.errors"] = sys.stderr
+ env["wsgi.multithread"] = True
+ env["wsgi.multiprocess"] = False
+ env["wsgi.run_once"] = False
+ return env
+
+class reqthread(ashd.serve.wsgithread):
+ def __init__(self, sk):
+ super(reqthread, self).__init__()
+ self.bsk = sk.dup()
+ self.sk = self.bsk.makefile("r+")
+
+ def handlewsgi(self):
+ return handler(self.env, self.startreq)
+
+ def writehead(self, status, headers):
+ try:
+ self.sk.write("Status: %s\n" % status)
+ for nm, val in headers:
+ self.sk.write("%s: %s\n" % (nm, val))
+ self.sk.write("\n")
+ except IOError:
+ raise ashd.serve.closed()
+
+ def writedata(self, data):
+ try:
+ self.sk.write(data)
+ self.sk.flush()
+ except IOError:
+ raise ashd.serve.closed()
+
+ def handle(self):
+ head = ashd.scgi.readhead(self.sk)
+ self.env = mkenv(head, self.sk)
+ reqevent = ashd.perf.request(self.env)
+ exc = (None, None, None)
+ try:
+ super(reqthread, self).handle()
+ if self.status:
+ reqevent.response([self.status, self.headers])
+ except:
+ exc = sys.exc_info()
+ raise
+ finally:
+ reqevent.__exit__(*exc)
+
+ def run(self):
+ try:
+ super(reqthread, self).run()
+ finally:
+ self.sk.close()
+ self.bsk.close()
+
+while True:
+ nsk, addr = sk.accept()
+ try:
+ reqthread(nsk).start()
+ finally:
+ nsk.close()
#!/usr/bin/python3
import sys, os, getopt, threading, logging, time, locale, collections
-import ashd.proto, ashd.util, ashd.perf
+import ashd.proto, ashd.util, ashd.perf, ashd.serve
try:
import pdm.srv
except:
sys.exit(1)
handler = handlermod.application
-class closed(IOError):
- def __init__(self):
- super().__init__("The client has closed the connection.")
-
cwd = os.getcwd()
def absolutify(path):
if path[0] != '/':
buf.append(c)
return buf
-def dowsgi(req):
+def mkenv(req):
env = {}
env["wsgi.version"] = 1, 0
for key, val in req.headers:
env["wsgi.multithread"] = True
env["wsgi.multiprocess"] = False
env["wsgi.run_once"] = False
+ return env
- resp = []
- respsent = []
+if reqlimit != 0:
+ guard = ashd.serve.abortlimiter(reqlimit).call
+else:
+ guard = lambda fun: fun()
- def recode(thing):
- if isinstance(thing, collections.ByteString):
- return thing
- else:
- return str(thing).encode("latin-1")
+def recode(thing):
+ if isinstance(thing, collections.ByteString):
+ return thing
+ else:
+ return str(thing).encode("latin-1")
+
+class reqthread(ashd.serve.wsgithread):
+ def __init__(self, req):
+ super().__init__()
+ self.req = req.dup()
- def flushreq():
- if not respsent:
- if not resp:
- raise Exception("Trying to write data before starting response.")
- status, headers = resp
- respsent[:] = [True]
- buf = bytearray()
- buf += b"HTTP/1.1 " + recode(status) + b"\n"
- for nm, val in headers:
- buf += recode(nm) + b": " + recode(val) + b"\n"
- buf += b"\n"
- try:
- req.sk.write(buf)
- except IOError:
- raise closed()
+ def handlewsgi(self):
+ return handler(self.env, self.startreq)
- def write(data):
- if not data:
- return
- flushreq()
+ def writehead(self, status, headers):
+ buf = bytearray()
+ buf += b"HTTP/1.1 " + recode(status) + b"\n"
+ for nm, val in headers:
+ buf += recode(nm) + b": " + recode(val) + b"\n"
+ buf += b"\n"
try:
- req.sk.write(data)
- req.sk.flush()
+ self.req.sk.write(buf)
except IOError:
- raise closed()
+ raise ashd.serve.closed()
- def startreq(status, headers, exc_info = None):
- if resp:
- if exc_info: # Interesting, this...
- try:
- if respsent:
- raise exc_info[1]
- finally:
- exc_info = None # CPython GC bug?
- else:
- raise Exception("Can only start responding once.")
- resp[:] = status, headers
- return write
-
- with ashd.perf.request(env) as reqevent:
+ def writedata(self, data):
try:
- respiter = handler(env, startreq)
- try:
- for data in respiter:
- write(data)
- if resp:
- flushreq()
- finally:
- if hasattr(respiter, "close"):
- respiter.close()
- except closed:
- pass
- if resp:
- reqevent.response(resp)
-
-flightlock = threading.Condition()
-inflight = 0
-
-class reqthread(threading.Thread):
- def __init__(self, req):
- super().__init__(name = "Request handler")
- self.req = req.dup()
+ self.req.sk.write(data)
+ self.req.sk.flush()
+ except IOError:
+ raise ashd.serve.closed()
+
+ def handle(self):
+ self.env = mkenv(self.req)
+ with ashd.perf.request(self.env) as reqevent:
+ super().handle()
+ if self.status:
+ reqevent.response([self.status, self.headers])
def run(self):
- global inflight
try:
- with flightlock:
- if reqlimit != 0:
- start = time.time()
- while inflight >= reqlimit:
- flightlock.wait(10)
- if time.time() - start > 10:
- os.abort()
- inflight += 1
- try:
- dowsgi(self.req)
- finally:
- with flightlock:
- inflight -= 1
- flightlock.notify()
- except:
- log.error("exception occurred in handler thread", exc_info=True)
+ guard(super().run)
finally:
self.req.close()
sys.stderr.flush()
-import sys, collections
-import threading
-
class protoerr(Exception):
pass
-class closed(IOError):
- def __init__(self):
- super(closed, self).__init__("The client has closed the connection.")
-
def readns(sk):
hln = 0
while True:
i += 2
return ret
-class reqthread(threading.Thread):
- def __init__(self, sk, handler):
- super(reqthread, self).__init__(name = "SCGI request handler")
- self.bsk = sk.dup()
- self.sk = self.bsk.makefile("rwb")
- self.handler = handler
-
- def run(self):
- try:
- head = readhead(self.sk)
- self.handler(head, self.sk)
- finally:
- self.sk.close()
- self.bsk.close()
-
-def handlescgi(sk, handler):
- t = reqthread(sk, handler)
- t.start()
-
-def servescgi(socket, handler):
- while True:
- nsk, addr = socket.accept()
- try:
- handlescgi(nsk, handler)
- finally:
- nsk.close()
-
def decodehead(head, coding):
return {k.decode(coding): v.decode(coding) for k, v in head.items()}
-
-def wrapwsgi(handler):
- def handle(head, sk):
- try:
- env = decodehead(head, "utf-8")
- env["wsgi.uri_encoding"] = "utf-8"
- except UnicodeError:
- env = decodehead(head, "latin-1")
- env["wsgi.uri_encoding"] = "latin-1"
- env["wsgi.version"] = 1, 0
- if "HTTP_X_ASH_PROTOCOL" in env:
- env["wsgi.url_scheme"] = env["HTTP_X_ASH_PROTOCOL"]
- elif "HTTPS" in env:
- env["wsgi.url_scheme"] = "https"
- else:
- env["wsgi.url_scheme"] = "http"
- env["wsgi.input"] = sk
- env["wsgi.errors"] = sys.stderr
- env["wsgi.multithread"] = True
- env["wsgi.multiprocess"] = False
- env["wsgi.run_once"] = False
-
- resp = []
- respsent = []
-
- def recode(thing):
- if isinstance(thing, collections.ByteString):
- return thing
- else:
- return str(thing).encode("latin-1")
-
- def flushreq():
- if not respsent:
- if not resp:
- raise Exception("Trying to write data before starting response.")
- status, headers = resp
- respsent[:] = [True]
- buf = bytearray()
- buf += b"Status: " + recode(status) + b"\n"
- for nm, val in headers:
- buf += recode(nm) + b": " + recode(val) + b"\n"
- buf += b"\n"
- try:
- sk.write(buf)
- except IOError:
- raise closed()
-
- def write(data):
- if not data:
- return
- flushreq()
- try:
- sk.write(data)
- sk.flush()
- except IOError:
- raise closed()
-
- def startreq(status, headers, exc_info = None):
- if resp:
- if exc_info: # Interesting, this...
- try:
- if respsent:
- raise exc_info[1]
- finally:
- exc_info = None # CPython GC bug?
- else:
- raise Exception("Can only start responding once.")
- resp[:] = status, headers
- return write
-
- respiter = handler(env, startreq)
- try:
- try:
- for data in respiter:
- write(data)
- if resp:
- flushreq()
- except closed:
- pass
- finally:
- if hasattr(respiter, "close"):
- respiter.close()
- return handle
--- /dev/null
+import os, threading, time, logging
+
+log = logging.getLogger("ashd.serve")
+seq = 1
+seqlk = threading.Lock()
+
+def reqseq():
+ global seq
+ with seqlk:
+ s = seq
+ seq += 1
+ return s
+
+class reqthread(threading.Thread):
+ def __init__(self, name=None):
+ if name is None:
+ name = "Request handler %i" % reqseq()
+ super().__init__(name=name)
+
+ def handle(self):
+ raise Exception()
+
+ def run(self):
+ try:
+ self.handle()
+ except:
+ log.error("exception occurred when handling request", exc_info=True)
+
+class closed(IOError):
+ def __init__(self):
+ super().__init__("The client has closed the connection.")
+
+class wsgithread(reqthread):
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ self.status = None
+ self.headers = []
+ self.respsent = False
+
+ def handlewsgi(self):
+ raise Exception()
+ def writehead(self, status, headers):
+ raise Exception()
+ def writedata(self, data):
+ raise Exception()
+
+ def write(self, data):
+ if not data:
+ return
+ self.flushreq()
+ self.writedata(data)
+
+ def flushreq(self):
+ if not self.respsent:
+ if not self.status:
+ raise Exception("Cannot send response body before starting response.")
+ self.respsent = True
+ self.writehead(self.status, self.headers)
+
+ def startreq(self, status, headers, exc_info=None):
+ if self.status:
+ if exc_info: # Nice calling convetion ^^
+ try:
+ if self.respsent:
+ raise exc_info[1]
+ finally:
+ exc_info = None # CPython GC bug?
+ else:
+ raise Exception("Can only start responding once.")
+ self.status = status
+ self.headers = headers
+ return self.write
+
+ def handle(self):
+ try:
+ respiter = self.handlewsgi()
+ try:
+ for data in respiter:
+ self.write(data)
+ if self.status:
+ self.flushreq()
+ finally:
+ if hasattr(respiter, "close"):
+ respiter.close()
+ except closed:
+ pass
+
+class calllimiter(object):
+ def __init__(self, limit):
+ self.limit = limit
+ self.lock = threading.Condition()
+ self.inflight = 0
+
+ def waited(self, time):
+ if time > 10:
+ raise RuntimeError("Waited too long")
+
+ def __enter__(self):
+ with self.lock:
+ start = time.time()
+ while self.inflight >= self.limit:
+ self.lock.wait(10)
+ self.waited(time.time() - start)
+ self.inflight += 1
+ return self
+
+ def __exit__(self, *excinfo):
+ with self.lock:
+ self.inflight -= 1
+ self.lock.notify()
+ return False
+
+ def call(self, target):
+ with self:
+ return target()
+
+class abortlimiter(calllimiter):
+ def waited(self, time):
+ if time > 10:
+ os.abort()
SYNOPSIS
--------
-*scgi-wsgi3* [*-hA*] [*-p* 'MODPATH'] [*-T* \[HOST:]'PORT'] 'HANDLER-MODULE' ['ARGS'...]
+*scgi-wsgi3* [*-hA*] [*-m* 'PDM-SPEC'] [*-p* 'MODPATH'] [*-T* \[HOST:]'PORT'] 'HANDLER-MODULE' ['ARGS'...]
DESCRIPTION
-----------
address listening for connections on 'PORT' instead. If 'HOST'
is not given, `localhost` is used by default.
+*-m* 'PDM-SPEC'::
+
+ If the PDM library is installed on the system, create a
+ listening socket for connection PDM clients according to
+ 'PDM-SPEC'.
+
AUTHOR
------
Fredrik Tolf <fredrik@dolda2000.com>
#!/usr/bin/python3
-import sys, os, getopt, logging
+import sys, os, getopt, logging, collections
import socket
-import ashd.scgi
+import ashd.scgi, ashd.perf, ashd.serve
+try:
+ import pdm.srv
+except:
+ pdm = None
def usage(out):
- out.write("usage: scgi-wsgi3 [-hAL] [-p MODPATH] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n")
+ out.write("usage: scgi-wsgi3 [-hAL] [-m PDM-SPEC] [-p MODPATH] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n")
sk = None
modwsgi_compat = False
setlog = True
-opts, args = getopt.getopt(sys.argv[1:], "+hALp:T:")
+opts, args = getopt.getopt(sys.argv[1:], "+hALp:T:m:")
for o, a in opts:
if o == "-h":
usage(sys.stdout)
sk.listen(32)
elif o == "-A":
modwsgi_compat = True
+ elif o == "-m":
+ if pdm is not None:
+ pdm.srv.listen(a)
if len(args) < 1:
usage(sys.stderr)
sys.exit(1)
sys.exit(1)
handler = handlermod.application
-ashd.scgi.servescgi(sk, ashd.scgi.wrapwsgi(handler))
+def mkenv(head, sk):
+ try:
+ env = ashd.scgi.decodehead(head, "utf-8")
+ env["wsgi.uri_encoding"] = "utf-8"
+ except UnicodeError:
+ env = ashd.scgi.decodehead(head, "latin-1")
+ env["wsgi.uri_encoding"] = "latin-1"
+ env["wsgi.version"] = 1, 0
+ if "HTTP_X_ASH_PROTOCOL" in env:
+ env["wsgi.url_scheme"] = env["HTTP_X_ASH_PROTOCOL"]
+ elif "HTTPS" in env:
+ env["wsgi.url_scheme"] = "https"
+ else:
+ env["wsgi.url_scheme"] = "http"
+ env["wsgi.input"] = sk
+ env["wsgi.errors"] = sys.stderr
+ env["wsgi.multithread"] = True
+ env["wsgi.multiprocess"] = False
+ env["wsgi.run_once"] = False
+ return env
+
+def recode(thing):
+ if isinstance(thing, collections.ByteString):
+ return thing
+ else:
+ return str(thing).encode("latin-1")
+
+class reqthread(ashd.serve.wsgithread):
+ def __init__(self, sk):
+ super().__init__()
+ self.bsk = sk.dup()
+ self.sk = self.bsk.makefile("rwb")
+
+ def handlewsgi(self):
+ return handler(self.env, self.startreq)
+
+ def writehead(self, status, headers):
+ buf = bytearray()
+ buf += b"Status: " + recode(status) + b"\n"
+ for nm, val in headers:
+ buf += recode(nm) + b": " + recode(val) + b"\n"
+ buf += b"\n"
+ try:
+ self.sk.write(buf)
+ except IOError:
+ raise ashd.serve.closed()
+
+ def writedata(self, data):
+ try:
+ self.sk.write(data)
+ self.sk.flush()
+ except IOError:
+ raise ashd.serve.closed()
+
+ def handle(self):
+ head = ashd.scgi.readhead(self.sk)
+ self.env = mkenv(head, self.sk)
+ with ashd.perf.request(self.env) as reqevent:
+ super().handle()
+ if self.status:
+ reqevent.response([self.status, self.headers])
+
+ def run(self):
+ try:
+ super().run()
+ finally:
+ self.sk.close()
+ self.bsk.close()
+
+while True:
+ nsk, addr = sk.accept()
+ try:
+ reqthread(nsk).start()
+ finally:
+ nsk.close()
static void serve2(struct user *usr, struct hthead *req, int fd)
{
+ int serr;
+
if(usr->fd < 0)
usr->fd = forkchild(usr->name, req, fd);
- if(sendreq(usr->fd, req, fd)) {
- if((errno == EPIPE) || (errno == ECONNRESET)) {
+ if(sendreq2(usr->fd, req, fd, MSG_NOSIGNAL | MSG_DONTWAIT)) {
+ serr = errno;
+ if((serr == EPIPE) || (serr == ECONNRESET)) {
/* Assume that the child has crashed and restart it. */
close(usr->fd);
usr->fd = forkchild(usr->name, req, fd);
- if(!sendreq(usr->fd, req, fd))
+ if(!sendreq2(usr->fd, req, fd, MSG_NOSIGNAL | MSG_DONTWAIT))
return;
}
- flog(LOG_ERR, "could not pass on request to user `%s': %s", usr->name, strerror(errno));
- close(usr->fd);
- usr->fd = -1;
+ flog(LOG_ERR, "could not pass on request to user `%s': %s", usr->name, strerror(serr));
+ if(serr != EAGAIN) {
+ close(usr->fd);
+ usr->fd = -1;
+ }
}
}