From: Fredrik Tolf Date: Mon, 8 Nov 2021 18:17:46 +0000 (+0100) Subject: acmecert: Initial commit. X-Git-Url: http://git.dolda2000.com/gitweb/?a=commitdiff_plain;ds=sidebyside;h=61d08fc22b43cdf7a277f4f263ce53ac060e8991;p=utils.git acmecert: Initial commit. --- diff --git a/acmecert b/acmecert new file mode 100755 index 0000000..91fcece --- /dev/null +++ b/acmecert @@ -0,0 +1,315 @@ +#!/usr/bin/python3 + +import sys, os, getopt, binascii, json, pprint, signal, time +import urllib.request +import Crypto.PublicKey.RSA, Crypto.Random, Crypto.Hash.SHA256, Crypto.Signature.PKCS1_v1_5 + +service = "https://acme-v02.api.letsencrypt.org/directory" +_directory = None +def directory(): + global _directory + if _directory is None: + with req(service) as resp: + _directory = json.loads(resp.read().decode("utf-8")) + return _directory + +def base64url(dat): + return binascii.b2a_base64(dat).decode("us-ascii").translate({43: 45, 47: 95, 61: None}).strip() + +def ebignum(num): + h = "%x" % num + if len(h) % 2 == 1: h = "0" + h + return base64url(binascii.a2b_hex(h)) + +def getnonce(): + with urllib.request.urlopen(directory()["newNonce"]) as resp: + resp.read() + return resp.headers["Replay-Nonce"] + +def req(url, data=None, ctype=None, headers={}, method=None, **kws): + if data is not None and not isinstance(data, bytes): + data = json.dumps(data).encode("utf-8") + ctype = "application/jose+json" + req = urllib.request.Request(url, data=data, method=method) + for hnam, hval in headers.items(): + req.add_header(hnam, hval) + if ctype is not None: + req.add_header("Content-Type", ctype) + return urllib.request.urlopen(req) + +def jreq(url, data, auth): + authdata = {"alg": "RS256", "url": url, "nonce": getnonce()} + authdata.update(auth.authdata()) + authdata = base64url(json.dumps(authdata).encode("us-ascii")) + if data is None: + data = "" + else: + data = base64url(json.dumps(data).encode("us-ascii")) + seal = base64url(auth.sign(("%s.%s" % (authdata, data)).encode("us-ascii"))) + enc = {"protected": authdata, "payload": data, "signature": seal} + with req(url, data=enc) as resp: + return json.loads(resp.read().decode("utf-8")), resp.headers + +class signreq(object): + def domains(self): + # No PCKS10 parser for Python? + import subprocess, re + with subprocess.Popen(["openssl", "req", "-noout", "-text"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as openssl: + openssl.stdin.write(self.data.encode("us-ascii")) + openssl.stdin.close() + resp = openssl.stdout.read().decode("utf8") + if openssl.wait() != 0: + raise Exception("openssl error") + m = re.search(r"X509v3 Subject Alternative Name:[^\n]*\n\s*((\w+:\S+,\s*)*\w+:\S+)\s*\n", resp) + if m is None: + return [] + ret = [] + for nm in m.group(1).split(","): + nm = nm.strip() + typ, nm = nm.split(":", 1) + if typ == "DNS": + ret.append(nm) + return ret + + def der(self): + import subprocess + with subprocess.Popen(["openssl", "req", "-outform", "der"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as openssl: + openssl.stdin.write(self.data.encode("us-ascii")) + openssl.stdin.close() + resp = openssl.stdout.read() + if openssl.wait() != 0: + raise Exception("openssl error") + return resp + + @classmethod + def read(cls, fp): + self = cls() + self.data = fp.read() + return self + +class jwkauth(object): + def __init__(self, key): + self.key = key + + def authdata(self): + return {"jwk": {"kty": "RSA", "e": ebignum(self.key.e), "n": ebignum(self.key.n)}} + + def sign(self, data): + dig = Crypto.Hash.SHA256.new() + dig.update(data) + return Crypto.Signature.PKCS1_v1_5.new(self.key).sign(dig) + +class account(object): + def __init__(self, uri, key): + self.uri = uri + self.key = key + + def authdata(self): + return {"kid": self.uri} + + def sign(self, data): + dig = Crypto.Hash.SHA256.new() + dig.update(data) + return Crypto.Signature.PKCS1_v1_5.new(self.key).sign(dig) + + def getinfo(self): + data, headers = jreq(self.uri, None, self) + return data + + def validate(self): + data = self.getinfo() + if data.get("status", "") != "valid": + raise Exception("account is not valid: %s" % (data.get("status", "\"\""))) + + def write(self, out): + out.write("%s\n" % (self.uri,)) + out.write("%s\n" % (self.key.exportKey().decode("us-ascii"),)) + + @classmethod + def read(cls, fp): + uri = fp.readline() + if uri == "": + raise Exception("missing account URI") + uri = uri.strip() + key = Crypto.PublicKey.RSA.importKey(fp.read()) + return cls(uri, key) + +class htconfig(object): + def __init__(self): + self.roots = {} + + @classmethod + def read(cls, fp): + self = cls() + for ln in fp: + words = ln.split() + if len(words) < 1 or ln[0] == '#': + continue + if words[0] == "root": + self.roots[words[1]] = words[2] + else: + sys.stderr.write("acmecert: warning: unknown htconfig directive: %s\n" % (words[0])) + return self + +def register(keysize=4096): + key = Crypto.PublicKey.RSA.generate(keysize, Crypto.Random.new().read) + # jwk = {"kty": "RSA", "e": ebignum(key.e), "n": ebignum(key.n)} + # cjwk = json.dumps(jwk, separators=(',', ':'), sort_keys=True) + data, headers = jreq(directory()["newAccount"], {"termsOfServiceAgreed": True}, jwkauth(key)) + return account(headers["Location"], key) + +def mkorder(acct, csr): + data, headers = jreq(directory()["newOrder"], {"identifiers": [{"type": "dns", "value": dn} for dn in csr.domains()]}, acct) + data["acmecert.location"] = headers["Location"] + return data + +def httptoken(acct, ch): + jwk = {"kty": "RSA", "e": ebignum(acct.key.e), "n": ebignum(acct.key.n)} + dig = Crypto.Hash.SHA256.new() + dig.update(json.dumps(jwk, separators=(',', ':'), sort_keys=True).encode("us-ascii")) + khash = base64url(dig.digest()) + return ch["token"], ("%s.%s" % (ch["token"], khash)) + +def authorder(acct, htconf, orderid): + order, headers = jreq(orderid, None, acct) + valid = False + tries = 0 + while not valid: + valid = True + tries += 1 + if tries > 5: + raise Exception("challenges refuse to become valid even after 5 retries") + for authuri in order["authorizations"]: + auth, headers = jreq(authuri, None, acct) + if auth["status"] == "valid": + continue + elif auth["status"] == "pending": + pass + else: + raise Exception("unknown authorization status: %s" % (auth["status"],)) + valid = False + if auth["identifier"]["type"] != "dns": + raise Exception("unknown authorization type: %s" % (auth["identifier"]["type"],)) + dn = auth["identifier"]["value"] + if dn not in htconf.roots: + raise Exception("no configured ht-root for domain name %s" % (dn,)) + for ch in auth["challenges"]: + if ch["type"] == "http-01": + break + else: + raise Exception("no http-01 challenge for %s" % (dn,)) + root = htconf.roots[dn] + tokid, tokval = httptoken(acct, ch) + tokpath = os.path.join(root, tokid); + fp = open(tokpath, "w") + try: + with fp: + fp.write(tokval) + with req("http://%s/.well-known/acme-challenge/%s" % (dn, tokid)) as resp: + if resp.read().decode("utf-8") != tokval: + raise Exception("challenge from %s does not match written value" % (dn,)) + for n in range(30): + resp, headers = jreq(ch["url"], {}, acct) + if resp["status"] == "processing": + time.sleep(2) + elif resp["status"] == "valid": + break + else: + raise Exception("unexpected challenge status for %s when validating: %s" % (dn, resp["status"])) + else: + raise Exception("challenge processing timed out for %s" % (dn,)) + finally: + os.unlink(tokpath) + +def finalize(acct, csr, orderid): + order, headers = jreq(orderid, None, acct) + if order["status"] == "valid": + pass + elif order["status"] == "ready": + jreq(order["finalize"], {"csr": base64url(csr.der())}, acct) + for n in range(30): + resp, headers = jreq(orderid, None, acct) + if resp["status"] == "processing": + time.sleep(2) + elif resp["status"] == "valid": + order = resp + break + else: + raise Exception("unexpected order status when finalizing: %s" % resp["status"]) + else: + raise Exception("order finalization timed out") + else: + raise Exception("unexpected order state when finalizing: %s" % (order["status"],)) + with req(order["certificate"]) as resp: + return resp.read().decode("us-ascii") + +def usage(out): + out.write("usage: acmecert [-h] [-D SERVICE]\n") + +def main(argv): + global service + opts, args = getopt.getopt(argv[1:], "hD:") + for o, a in opts: + if o == "-h": + usage(sys.stdout) + sys.exit(0) + elif o == "-D": + service = a + if len(args) < 1: + usage(sys.stderr) + sys.exit(1) + if args[0] == "reg": + register().write(sys.stdout) + elif args[0] == "validate-acct": + with open(args[1], "r") as fp: + account.read(fp).validate() + elif args[0] == "acctinfo": + with open(args[1], "r") as fp: + pprint.pprint(account.read(fp).getinfo()) + elif args[0] == "order": + with open(args[1], "r") as fp: + acct = account.read(fp) + with open(args[2], "r") as fp: + csr = signreq.read(fp) + order = mkorder(acct, csr) + with open(args[3], "w") as fp: + fp.write("%s\n" % (order["acmecert.location"])) + elif args[0] == "http-auth": + with open(args[1], "r") as fp: + acct = account.read(fp) + with open(args[2], "r") as fp: + htconf = htconfig.read(fp) + with open(args[3], "r") as fp: + orderid = fp.readline().strip() + authorder(acct, htconf, orderid) + elif args[0] == "get": + with open(args[1], "r") as fp: + acct = account.read(fp) + with open(args[2], "r") as fp: + csr = signreq.read(fp) + with open(args[3], "r") as fp: + orderid = fp.readline().strip() + sys.stdout.write(finalize(acct, csr, orderid)) + elif args[0] == "http-order": + with open(args[1], "r") as fp: + acct = account.read(fp) + with open(args[2], "r") as fp: + csr = signreq.read(fp) + with open(args[3], "r") as fp: + htconf = htconfig.read(fp) + orderid = mkorder(acct, csr)["acmecert.location"] + authorder(acct, htconf, orderid) + sys.stdout.write(finalize(acct, csr, orderid)) + elif args[0] == "directory": + pprint.pprint(directory()) + else: + sys.stderr.write("acmecert: unknown command: %s\n" % (args[0],)) + usage(sys.stderr) + sys.exit(1) + +if __name__ == "__main__": + try: + main(sys.argv) + except KeyboardInterrupt: + signal.signal(signal.SIGINT, signal.SIG_DFL) + os.kill(os.getpid(), signal.SIGINT)