| 1 | import time, sys, io |
| 2 | |
| 3 | def htmlquote(text): |
| 4 | ret = "" |
| 5 | for c in text: |
| 6 | if c == '&': |
| 7 | ret += "&" |
| 8 | elif c == '<': |
| 9 | ret += "<" |
| 10 | elif c == '>': |
| 11 | ret += ">" |
| 12 | elif c == '"': |
| 13 | ret += """ |
| 14 | else: |
| 15 | ret += c |
| 16 | return ret |
| 17 | |
| 18 | def simpleerror(env, startreq, code, title, msg): |
| 19 | buf = """<?xml version="1.0" encoding="US-ASCII"?> |
| 20 | <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd"> |
| 21 | <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en-US"> |
| 22 | <head> |
| 23 | <title>%s</title> |
| 24 | </head> |
| 25 | <body> |
| 26 | <h1>%s</h1> |
| 27 | <p>%s</p> |
| 28 | </body> |
| 29 | </html>""" % (title, title, htmlquote(msg)) |
| 30 | buf = buf.encode("ascii") |
| 31 | startreq("%i %s" % (code, title), [("Content-Type", "text/html"), ("Content-Length", str(len(buf)))]) |
| 32 | return [buf] |
| 33 | |
| 34 | def httpdate(ts): |
| 35 | return time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime(ts)) |
| 36 | |
| 37 | def phttpdate(dstr): |
| 38 | tz = dstr[-6:] |
| 39 | dstr = dstr[:-6] |
| 40 | if tz[0] != " " or (tz[1] != "+" and tz[1] != "-") or not tz[2:].isdigit(): |
| 41 | return None |
| 42 | tz = int(tz[1:]) |
| 43 | tz = (((tz / 100) * 60) + (tz % 100)) * 60 |
| 44 | return time.mktime(time.strptime(dstr, "%a, %d %b %Y %H:%M:%S")) - tz - time.altzone |
| 45 | |
| 46 | def testenviron(uri, qs="", pi="", method=None, filename=None, host="localhost", data=None, ctype=None, head={}): |
| 47 | if method is None: |
| 48 | method = "GET" if data is None else "POST" |
| 49 | if ctype is None and data is not None: |
| 50 | ctype = "application/x-www-form-urlencoded" |
| 51 | ret = {} |
| 52 | ret["wsgi.version"] = 1, 0 |
| 53 | ret["SERVER_SOFTWARE"] = "ashd-test/1" |
| 54 | ret["GATEWAY_INTERFACE"] = "CGI/1.1" |
| 55 | ret["SERVER_PROTOCOL"] = "HTTP/1.1" |
| 56 | ret["REQUEST_METHOD"] = method |
| 57 | ret["wsgi.uri_encoding"] = "utf-8" |
| 58 | ret["SCRIPT_NAME"] = uri |
| 59 | ret["PATH_INFO"] = pi |
| 60 | ret["QUERY_STRING"] = qs |
| 61 | full = uri + pi |
| 62 | if qs: |
| 63 | full = full + "?" + qs |
| 64 | ret["REQUEST_URI"] = full |
| 65 | if filename is not None: |
| 66 | ret["SCRIPT_FILENAME"] = filename |
| 67 | ret["HTTP_HOST"] = ret["SERVER_NAME"] = host |
| 68 | ret["wsgi.url_scheme"] = "http" |
| 69 | ret["SERVER_ADDR"] = "127.0.0.1" |
| 70 | ret["SERVER_PORT"] = "80" |
| 71 | ret["REMOTE_ADDR"] = "127.0.0.1" |
| 72 | ret["REMOTE_PORT"] = "12345" |
| 73 | if data is not None: |
| 74 | ret["CONTENT_TYPE"] = ctype |
| 75 | ret["CONTENT_LENGTH"] = len(data) |
| 76 | ret["wsgi.input"] = io.BytesIO(data) |
| 77 | else: |
| 78 | ret["wsgi.input"] = io.BytesIO(b"") |
| 79 | ret["wsgi.errors"] = sys.stderr |
| 80 | ret["wsgi.multithread"] = True |
| 81 | ret["wsgi.multiprocess"] = False |
| 82 | ret["wsgi.run_once"] = False |
| 83 | for key, val in head.items(): |
| 84 | ret["HTTP_" + key.upper().replace("-", "_")] = val |
| 85 | return ret |
| 86 | |
| 87 | class testrequest(object): |
| 88 | def __init__(self): |
| 89 | self.wbuf = io.BytesIO() |
| 90 | self.headers = None |
| 91 | self.status = None |
| 92 | |
| 93 | def __call__(self, status, headers): |
| 94 | self.status = status |
| 95 | self.headers = headers |
| 96 | return self.wbuf.write |
| 97 | |
| 98 | def __repr__(self): |
| 99 | return "<ashd.wsgiutil.testrequest %r %s %s>" % (self.status, |
| 100 | "None" if self.headers is None else ("[%i]" % len(self.headers)), |
| 101 | "(no data)" if len(self.wbuf.getvalue()) == 0 else "(with data)") |
| 102 | |
| 103 | def __str__(self): |
| 104 | return repr(self) |