From f1b49ff649a64081573cf03822973320b2b1d603 Mon Sep 17 00:00:00 2001 From: Fredrik Tolf Date: Sat, 13 Nov 2021 01:09:25 +0100 Subject: [PATCH] acmecert: Reorganized a bit. --- acmecert | 298 ++++++++++++++++++++++++++++++++++----------------------------- 1 file changed, 159 insertions(+), 139 deletions(-) diff --git a/acmecert b/acmecert index 12e1847..4c56ad8 100755 --- a/acmecert +++ b/acmecert @@ -1,22 +1,17 @@ #!/usr/bin/python3 +#### ACME client (only http-01 challenges supported thus far) + import sys, os, getopt, binascii, json, pprint, signal, time, threading import urllib.request import Crypto.PublicKey.RSA, Crypto.Random, Crypto.Hash.SHA256, Crypto.Signature.PKCS1_v1_5 +### General utilities + class msgerror(Exception): def report(self, out): out.write("acmecert: undefined error\n") -service = "https://acme-v02.api.letsencrypt.org/directory" -_directory = None -def directory(): - global _directory - if _directory is None: - with req(service) as resp: - _directory = json.load(resp) - return _directory - def base64url(dat): return binascii.b2a_base64(dat).decode("us-ascii").translate({43: 45, 47: 95, 61: None}).strip() @@ -25,81 +20,29 @@ def ebignum(num): if len(h) % 2 == 1: h = "0" + h return base64url(binascii.a2b_hex(h)) -def getnonce(): - with urllib.request.urlopen(directory()["newNonce"]) as resp: - resp.read() - return resp.headers["Replay-Nonce"] - -def req(url, data=None, ctype=None, headers={}, method=None, **kws): - if data is not None and not isinstance(data, bytes): - data = json.dumps(data).encode("utf-8") - ctype = "application/jose+json" - req = urllib.request.Request(url, data=data, method=method) - for hnam, hval in headers.items(): - req.add_header(hnam, hval) - if ctype is not None: - req.add_header("Content-Type", ctype) - return urllib.request.urlopen(req) - -class problem(msgerror): - def __init__(self, code, data, *args, url=None, **kw): - super().__init__(*args, **kw) - self.code = code - self.data = data - self.url = url - if not isinstance(data, dict): - raise ValueError("unexpected problem object type: %r" % (data,)) - - @property - def type(self): - return self.data.get("type", "about:blank") - @property - def title(self): - return self.data.get("title") - @property - def detail(self): - return self.data.get("detail") - - def report(self, out): - extra = None - if self.title is None: - msg = self.detail - if "\n" in msg: - extra, msg = msg, None +class maybeopen(object): + def __init__(self, name, mode): + if name == "-": + self.opened = False + if mode == "r": + self.fp = sys.stdin + elif mode == "w": + self.fp = sys.stdout + else: + raise ValueError(mode) else: - msg = self.title - extra = self.detail - if msg is None: - msg = self.data.get("type") - if msg is not None: - out.write("acemcert: %s: %s\n" % ( - ("remote service error" if self.url is None else self.url), - ("unspecified error" if msg is None else msg))) - if extra is not None: - out.write("%s\n" % (extra,)) + self.opened = True + self.fp = open(name, mode) - @classmethod - def read(cls, err, **kw): - self = cls(err.code, json.load(err), **kw) - return self + def __enter__(self): + return self.fp -def jreq(url, data, auth): - authdata = {"alg": "RS256", "url": url, "nonce": getnonce()} - authdata.update(auth.authdata()) - authdata = base64url(json.dumps(authdata).encode("us-ascii")) - if data is None: - data = "" - else: - data = base64url(json.dumps(data).encode("us-ascii")) - seal = base64url(auth.sign(("%s.%s" % (authdata, data)).encode("us-ascii"))) - enc = {"protected": authdata, "payload": data, "signature": seal} - try: - with req(url, data=enc) as resp: - return json.load(resp), resp.headers - except urllib.error.HTTPError as exc: - if exc.headers["Content-Type"] == "application/problem+json": - raise problem.read(exc, url=url) - raise + def __exit__(self, *excinfo): + if self.opened: + self.fp.close() + return False + +### Crypto utilities class certificate(object): @property @@ -174,6 +117,86 @@ class signreq(object): self.data = fp.read() return self +### Somewhat general request utilities + +def getnonce(): + with urllib.request.urlopen(directory()["newNonce"]) as resp: + resp.read() + return resp.headers["Replay-Nonce"] + +def req(url, data=None, ctype=None, headers={}, method=None, **kws): + if data is not None and not isinstance(data, bytes): + data = json.dumps(data).encode("utf-8") + ctype = "application/jose+json" + req = urllib.request.Request(url, data=data, method=method) + for hnam, hval in headers.items(): + req.add_header(hnam, hval) + if ctype is not None: + req.add_header("Content-Type", ctype) + return urllib.request.urlopen(req) + +class problem(msgerror): + def __init__(self, code, data, *args, url=None, **kw): + super().__init__(*args, **kw) + self.code = code + self.data = data + self.url = url + if not isinstance(data, dict): + raise ValueError("unexpected problem object type: %r" % (data,)) + + @property + def type(self): + return self.data.get("type", "about:blank") + @property + def title(self): + return self.data.get("title") + @property + def detail(self): + return self.data.get("detail") + + def report(self, out): + extra = None + if self.title is None: + msg = self.detail + if "\n" in msg: + extra, msg = msg, None + else: + msg = self.title + extra = self.detail + if msg is None: + msg = self.data.get("type") + if msg is not None: + out.write("acemcert: %s: %s\n" % ( + ("remote service error" if self.url is None else self.url), + ("unspecified error" if msg is None else msg))) + if extra is not None: + out.write("%s\n" % (extra,)) + + @classmethod + def read(cls, err, **kw): + self = cls(err.code, json.load(err), **kw) + return self + +def jreq(url, data, auth): + authdata = {"alg": "RS256", "url": url, "nonce": getnonce()} + authdata.update(auth.authdata()) + authdata = base64url(json.dumps(authdata).encode("us-ascii")) + if data is None: + data = "" + else: + data = base64url(json.dumps(data).encode("us-ascii")) + seal = base64url(auth.sign(("%s.%s" % (authdata, data)).encode("us-ascii"))) + enc = {"protected": authdata, "payload": data, "signature": seal} + try: + with req(url, data=enc) as resp: + return json.load(resp), resp.headers + except urllib.error.HTTPError as exc: + if exc.headers["Content-Type"] == "application/problem+json": + raise problem.read(exc, url=url) + raise + +## Authentication + class jwkauth(object): def __init__(self, key): self.key = key @@ -221,22 +244,16 @@ class account(object): key = Crypto.PublicKey.RSA.importKey(fp.read()) return cls(uri, key) -class htconfig(object): - def __init__(self): - self.roots = {} +### ACME protocol - @classmethod - def read(cls, fp): - self = cls() - for ln in fp: - words = ln.split() - if len(words) < 1 or ln[0] == '#': - continue - if words[0] == "root": - self.roots[words[1]] = words[2] - else: - sys.stderr.write("acmecert: warning: unknown htconfig directive: %s\n" % (words[0])) - return self +service = "https://acme-v02.api.letsencrypt.org/directory" +_directory = None +def directory(): + global _directory + if _directory is None: + with req(service) as resp: + _directory = json.load(resp) + return _directory def register(keysize=4096): key = Crypto.PublicKey.RSA.generate(keysize, Crypto.Random.new().read) @@ -255,6 +272,47 @@ def httptoken(acct, ch): khash = base64url(dig.digest()) return ch["token"], ("%s.%s" % (ch["token"], khash)) +def finalize(acct, csr, orderid): + order, headers = jreq(orderid, None, acct) + if order["status"] == "valid": + pass + elif order["status"] == "ready": + jreq(order["finalize"], {"csr": base64url(csr.der())}, acct) + for n in range(30): + resp, headers = jreq(orderid, None, acct) + if resp["status"] == "processing": + time.sleep(2) + elif resp["status"] == "valid": + order = resp + break + else: + raise Exception("unexpected order status when finalizing: %s" % resp["status"]) + else: + raise Exception("order finalization timed out") + else: + raise Exception("unexpected order state when finalizing: %s" % (order["status"],)) + with req(order["certificate"]) as resp: + return resp.read().decode("us-ascii") + +## http-01 challenge + +class htconfig(object): + def __init__(self): + self.roots = {} + + @classmethod + def read(cls, fp): + self = cls() + for ln in fp: + words = ln.split() + if len(words) < 1 or ln[0] == '#': + continue + if words[0] == "root": + self.roots[words[1]] = words[2] + else: + sys.stderr.write("acmecert: warning: unknown htconfig directive: %s\n" % (words[0])) + return self + def authorder(acct, htconf, orderid): order, headers = jreq(orderid, None, acct) valid = False @@ -313,49 +371,7 @@ def authorder(acct, htconf, orderid): finally: os.unlink(tokpath) -def finalize(acct, csr, orderid): - order, headers = jreq(orderid, None, acct) - if order["status"] == "valid": - pass - elif order["status"] == "ready": - jreq(order["finalize"], {"csr": base64url(csr.der())}, acct) - for n in range(30): - resp, headers = jreq(orderid, None, acct) - if resp["status"] == "processing": - time.sleep(2) - elif resp["status"] == "valid": - order = resp - break - else: - raise Exception("unexpected order status when finalizing: %s" % resp["status"]) - else: - raise Exception("order finalization timed out") - else: - raise Exception("unexpected order state when finalizing: %s" % (order["status"],)) - with req(order["certificate"]) as resp: - return resp.read().decode("us-ascii") - -class maybeopen(object): - def __init__(self, name, mode): - if name == "-": - self.opened = False - if mode == "r": - self.fp = sys.stdin - elif mode == "w": - self.fp = sys.stdout - else: - raise ValueError(mode) - else: - self.opened = True - self.fp = open(name, mode) - - def __enter__(self): - return self.fp - - def __exit__(self, *excinfo): - if self.opened: - self.fp.close() - return False +### Invocation and commands invdata = threading.local() commands = {} @@ -367,6 +383,8 @@ class usageerr(msgerror): def report(self, out): out.write("%s\n" % (self.cmd.__doc__,)) +## User commands + def cmd_reg(args): "usage: acmecert reg [OUTPUT-FILE]" acct = register() @@ -459,6 +477,8 @@ def cmd_directory(args): pprint.pprint(directory()) commands["directory"] = cmd_directory +## Main invocation + def usage(out): out.write("usage: acmecert [-D SERVICE] COMMAND [ARGS...]\n") out.write(" acmecert -h [COMMAND]\n") -- 2.11.0