acmecert: Initial commit.
authorFredrik Tolf <fredrik@dolda2000.com>
Mon, 8 Nov 2021 18:17:46 +0000 (19:17 +0100)
committerFredrik Tolf <fredrik@dolda2000.com>
Mon, 8 Nov 2021 18:17:46 +0000 (19:17 +0100)
acmecert [new file with mode: 0755]

diff --git a/acmecert b/acmecert
new file mode 100755 (executable)
index 0000000..91fcece
--- /dev/null
+++ b/acmecert
@@ -0,0 +1,315 @@
+#!/usr/bin/python3
+
+import sys, os, getopt, binascii, json, pprint, signal, time
+import urllib.request
+import Crypto.PublicKey.RSA, Crypto.Random, Crypto.Hash.SHA256, Crypto.Signature.PKCS1_v1_5
+
+service = "https://acme-v02.api.letsencrypt.org/directory"
+_directory = None
+def directory():
+    global _directory
+    if _directory is None:
+        with req(service) as resp:
+            _directory = json.loads(resp.read().decode("utf-8"))
+    return _directory
+
+def base64url(dat):
+    return binascii.b2a_base64(dat).decode("us-ascii").translate({43: 45, 47: 95, 61: None}).strip()
+
+def ebignum(num):
+    h = "%x" % num
+    if len(h) % 2 == 1: h = "0" + h
+    return base64url(binascii.a2b_hex(h))
+
+def getnonce():
+    with urllib.request.urlopen(directory()["newNonce"]) as resp:
+        resp.read()
+        return resp.headers["Replay-Nonce"]
+
+def req(url, data=None, ctype=None, headers={}, method=None, **kws):
+    if data is not None and not isinstance(data, bytes):
+        data = json.dumps(data).encode("utf-8")
+        ctype = "application/jose+json"
+    req = urllib.request.Request(url, data=data, method=method)
+    for hnam, hval in headers.items():
+        req.add_header(hnam, hval)
+    if ctype is not None:
+        req.add_header("Content-Type", ctype)
+    return urllib.request.urlopen(req)
+
+def jreq(url, data, auth):
+    authdata = {"alg": "RS256", "url": url, "nonce": getnonce()}
+    authdata.update(auth.authdata())
+    authdata = base64url(json.dumps(authdata).encode("us-ascii"))
+    if data is None:
+        data = ""
+    else:
+        data = base64url(json.dumps(data).encode("us-ascii"))
+    seal = base64url(auth.sign(("%s.%s" % (authdata, data)).encode("us-ascii")))
+    enc = {"protected": authdata, "payload": data, "signature": seal}
+    with req(url, data=enc) as resp:
+        return json.loads(resp.read().decode("utf-8")), resp.headers
+
+class signreq(object):
+    def domains(self):
+        # No PCKS10 parser for Python?
+        import subprocess, re
+        with subprocess.Popen(["openssl", "req", "-noout", "-text"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as openssl:
+            openssl.stdin.write(self.data.encode("us-ascii"))
+            openssl.stdin.close()
+            resp = openssl.stdout.read().decode("utf8")
+            if openssl.wait() != 0:
+                raise Exception("openssl error")
+        m = re.search(r"X509v3 Subject Alternative Name:[^\n]*\n\s*((\w+:\S+,\s*)*\w+:\S+)\s*\n", resp)
+        if m is None:
+            return []
+        ret = []
+        for nm in m.group(1).split(","):
+            nm = nm.strip()
+            typ, nm = nm.split(":", 1)
+            if typ == "DNS":
+                ret.append(nm)
+        return ret
+
+    def der(self):
+        import subprocess
+        with subprocess.Popen(["openssl", "req", "-outform", "der"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as openssl:
+            openssl.stdin.write(self.data.encode("us-ascii"))
+            openssl.stdin.close()
+            resp = openssl.stdout.read()
+            if openssl.wait() != 0:
+                raise Exception("openssl error")
+        return resp
+
+    @classmethod
+    def read(cls, fp):
+        self = cls()
+        self.data = fp.read()
+        return self
+
+class jwkauth(object):
+    def __init__(self, key):
+        self.key = key
+
+    def authdata(self):
+        return {"jwk": {"kty": "RSA", "e": ebignum(self.key.e), "n": ebignum(self.key.n)}}
+
+    def sign(self, data):
+        dig = Crypto.Hash.SHA256.new()
+        dig.update(data)
+        return Crypto.Signature.PKCS1_v1_5.new(self.key).sign(dig)
+
+class account(object):
+    def __init__(self, uri, key):
+        self.uri = uri
+        self.key = key
+
+    def authdata(self):
+        return {"kid": self.uri}
+
+    def sign(self, data):
+        dig = Crypto.Hash.SHA256.new()
+        dig.update(data)
+        return Crypto.Signature.PKCS1_v1_5.new(self.key).sign(dig)
+
+    def getinfo(self):
+        data, headers = jreq(self.uri, None, self)
+        return data
+
+    def validate(self):
+        data = self.getinfo()
+        if data.get("status", "") != "valid":
+            raise Exception("account is not valid: %s" % (data.get("status", "\"\"")))
+
+    def write(self, out):
+        out.write("%s\n" % (self.uri,))
+        out.write("%s\n" % (self.key.exportKey().decode("us-ascii"),))
+
+    @classmethod
+    def read(cls, fp):
+        uri = fp.readline()
+        if uri == "":
+            raise Exception("missing account URI")
+        uri = uri.strip()
+        key = Crypto.PublicKey.RSA.importKey(fp.read())
+        return cls(uri, key)
+
+class htconfig(object):
+    def __init__(self):
+        self.roots = {}
+
+    @classmethod
+    def read(cls, fp):
+        self = cls()
+        for ln in fp:
+            words = ln.split()
+            if len(words) < 1 or ln[0] == '#':
+                continue
+            if words[0] == "root":
+                self.roots[words[1]] = words[2]
+            else:
+                sys.stderr.write("acmecert: warning: unknown htconfig directive: %s\n" % (words[0]))
+        return self
+
+def register(keysize=4096):
+    key = Crypto.PublicKey.RSA.generate(keysize, Crypto.Random.new().read)
+    # jwk = {"kty": "RSA", "e": ebignum(key.e), "n": ebignum(key.n)}
+    # cjwk = json.dumps(jwk, separators=(',', ':'), sort_keys=True)
+    data, headers = jreq(directory()["newAccount"], {"termsOfServiceAgreed": True}, jwkauth(key))
+    return account(headers["Location"], key)
+    
+def mkorder(acct, csr):
+    data, headers = jreq(directory()["newOrder"], {"identifiers": [{"type": "dns", "value": dn} for dn in csr.domains()]}, acct)
+    data["acmecert.location"] = headers["Location"]
+    return data
+
+def httptoken(acct, ch):
+    jwk = {"kty": "RSA", "e": ebignum(acct.key.e), "n": ebignum(acct.key.n)}
+    dig = Crypto.Hash.SHA256.new()
+    dig.update(json.dumps(jwk, separators=(',', ':'), sort_keys=True).encode("us-ascii"))
+    khash = base64url(dig.digest())
+    return ch["token"], ("%s.%s" % (ch["token"], khash))
+
+def authorder(acct, htconf, orderid):
+    order, headers = jreq(orderid, None, acct)
+    valid = False
+    tries = 0
+    while not valid:
+        valid = True
+        tries += 1
+        if tries > 5:
+            raise Exception("challenges refuse to become valid even after 5 retries")
+        for authuri in order["authorizations"]:
+            auth, headers = jreq(authuri, None, acct)
+            if auth["status"] == "valid":
+                continue
+            elif auth["status"] == "pending":
+                pass
+            else:
+                raise Exception("unknown authorization status: %s" % (auth["status"],))
+            valid = False
+            if auth["identifier"]["type"] != "dns":
+                raise Exception("unknown authorization type: %s" % (auth["identifier"]["type"],))
+            dn = auth["identifier"]["value"]
+            if dn not in htconf.roots:
+                raise Exception("no configured ht-root for domain name %s" % (dn,))
+            for ch in auth["challenges"]:
+                if ch["type"] == "http-01":
+                    break
+            else:
+                raise Exception("no http-01 challenge for %s" % (dn,))
+            root = htconf.roots[dn]
+            tokid, tokval = httptoken(acct, ch)
+            tokpath = os.path.join(root, tokid);
+            fp = open(tokpath, "w")
+            try:
+                with fp:
+                    fp.write(tokval)
+                with req("http://%s/.well-known/acme-challenge/%s" % (dn, tokid)) as resp:
+                    if resp.read().decode("utf-8") != tokval:
+                        raise Exception("challenge from %s does not match written value" % (dn,))
+                for n in range(30):
+                    resp, headers = jreq(ch["url"], {}, acct)
+                    if resp["status"] == "processing":
+                        time.sleep(2)
+                    elif resp["status"] == "valid":
+                        break
+                    else:
+                        raise Exception("unexpected challenge status for %s when validating: %s" % (dn, resp["status"]))
+                else:
+                    raise Exception("challenge processing timed out for %s" % (dn,))
+            finally:
+                os.unlink(tokpath)
+
+def finalize(acct, csr, orderid):
+    order, headers = jreq(orderid, None, acct)
+    if order["status"] == "valid":
+        pass
+    elif order["status"] == "ready":
+        jreq(order["finalize"], {"csr": base64url(csr.der())}, acct)
+        for n in range(30):
+            resp, headers = jreq(orderid, None, acct)
+            if resp["status"] == "processing":
+                time.sleep(2)
+            elif resp["status"] == "valid":
+                order = resp
+                break
+            else:
+                raise Exception("unexpected order status when finalizing: %s" % resp["status"])
+        else:
+            raise Exception("order finalization timed out")
+    else:
+        raise Exception("unexpected order state when finalizing: %s" % (order["status"],))
+    with req(order["certificate"]) as resp:
+        return resp.read().decode("us-ascii")
+
+def usage(out):
+    out.write("usage: acmecert [-h] [-D SERVICE]\n")
+
+def main(argv):
+    global service
+    opts, args = getopt.getopt(argv[1:], "hD:")
+    for o, a in opts:
+        if o == "-h":
+            usage(sys.stdout)
+            sys.exit(0)
+        elif o == "-D":
+            service = a
+    if len(args) < 1:
+        usage(sys.stderr)
+        sys.exit(1)
+    if args[0] == "reg":
+        register().write(sys.stdout)
+    elif args[0] == "validate-acct":
+        with open(args[1], "r") as fp:
+            account.read(fp).validate()
+    elif args[0] == "acctinfo":
+        with open(args[1], "r") as fp:
+            pprint.pprint(account.read(fp).getinfo())
+    elif args[0] == "order":
+        with open(args[1], "r") as fp:
+            acct = account.read(fp)
+        with open(args[2], "r") as fp:
+            csr = signreq.read(fp)
+        order = mkorder(acct, csr)
+        with open(args[3], "w") as fp:
+            fp.write("%s\n" % (order["acmecert.location"]))
+    elif args[0] == "http-auth":
+        with open(args[1], "r") as fp:
+            acct = account.read(fp)
+        with open(args[2], "r") as fp:
+            htconf = htconfig.read(fp)
+        with open(args[3], "r") as fp:
+            orderid = fp.readline().strip()
+        authorder(acct, htconf, orderid)
+    elif args[0] == "get":
+        with open(args[1], "r") as fp:
+            acct = account.read(fp)
+        with open(args[2], "r") as fp:
+            csr = signreq.read(fp)
+        with open(args[3], "r") as fp:
+            orderid = fp.readline().strip()
+        sys.stdout.write(finalize(acct, csr, orderid))
+    elif args[0] == "http-order":
+        with open(args[1], "r") as fp:
+            acct = account.read(fp)
+        with open(args[2], "r") as fp:
+            csr = signreq.read(fp)
+        with open(args[3], "r") as fp:
+            htconf = htconfig.read(fp)
+        orderid = mkorder(acct, csr)["acmecert.location"]
+        authorder(acct, htconf, orderid)
+        sys.stdout.write(finalize(acct, csr, orderid))
+    elif args[0] == "directory":
+        pprint.pprint(directory())
+    else:
+        sys.stderr.write("acmecert: unknown command: %s\n" % (args[0],))
+        usage(sys.stderr)
+        sys.exit(1)
+
+if __name__ == "__main__":
+    try:
+        main(sys.argv)
+    except KeyboardInterrupt:
+        signal.signal(signal.SIGINT, signal.SIG_DFL)
+        os.kill(os.getpid(), signal.SIGINT)