#!/usr/bin/python3 import sys, os, getopt, binascii, json, pprint, signal, time import urllib.request import Crypto.PublicKey.RSA, Crypto.Random, Crypto.Hash.SHA256, Crypto.Signature.PKCS1_v1_5 service = "https://acme-v02.api.letsencrypt.org/directory" _directory = None def directory(): global _directory if _directory is None: with req(service) as resp: _directory = json.loads(resp.read().decode("utf-8")) return _directory def base64url(dat): return binascii.b2a_base64(dat).decode("us-ascii").translate({43: 45, 47: 95, 61: None}).strip() def ebignum(num): h = "%x" % num if len(h) % 2 == 1: h = "0" + h return base64url(binascii.a2b_hex(h)) def getnonce(): with urllib.request.urlopen(directory()["newNonce"]) as resp: resp.read() return resp.headers["Replay-Nonce"] def req(url, data=None, ctype=None, headers={}, method=None, **kws): if data is not None and not isinstance(data, bytes): data = json.dumps(data).encode("utf-8") ctype = "application/jose+json" req = urllib.request.Request(url, data=data, method=method) for hnam, hval in headers.items(): req.add_header(hnam, hval) if ctype is not None: req.add_header("Content-Type", ctype) return urllib.request.urlopen(req) def jreq(url, data, auth): authdata = {"alg": "RS256", "url": url, "nonce": getnonce()} authdata.update(auth.authdata()) authdata = base64url(json.dumps(authdata).encode("us-ascii")) if data is None: data = "" else: data = base64url(json.dumps(data).encode("us-ascii")) seal = base64url(auth.sign(("%s.%s" % (authdata, data)).encode("us-ascii"))) enc = {"protected": authdata, "payload": data, "signature": seal} with req(url, data=enc) as resp: return json.loads(resp.read().decode("utf-8")), resp.headers class certificate(object): @property def enddate(self): # No X509 parser for Python? import subprocess, re, calendar with subprocess.Popen(["openssl", "x509", "-noout", "-enddate"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as openssl: openssl.stdin.write(self.data.encode("us-ascii")) openssl.stdin.close() resp = openssl.stdout.read().decode("utf-8") if openssl.wait() != 0: raise Exception("openssl error") m = re.search(r"notAfter=(.*)$", resp) if m is None: raise Exception("unexpected openssl reply: %r" % (resp,)) return calendar.timegm(time.strptime(m.group(1), "%b %d %H:%M:%S %Y GMT")) def expiring(self, timespec): if timespec.endswith("y"): timespec = int(timespec[:-1]) * 365 * 86400 elif timespec.endswith("m"): timespec = int(timespec[:-1]) * 30 * 86400 elif timespec.endswith("w"): timespec = int(timespec[:-1]) * 7 * 86400 elif timespec.endswith("d"): timespec = int(timespec[:-1]) * 86400 elif timespec.endswith("h"): timespec = int(timespec[:-1]) * 3600 else: timespec = int(timespec) return (self.enddate - time.time()) < timespec @classmethod def read(cls, fp): self = cls() self.data = fp.read() return self class signreq(object): def domains(self): # No PCKS10 parser for Python? import subprocess, re with subprocess.Popen(["openssl", "req", "-noout", "-text"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as openssl: openssl.stdin.write(self.data.encode("us-ascii")) openssl.stdin.close() resp = openssl.stdout.read().decode("utf-8") if openssl.wait() != 0: raise Exception("openssl error") m = re.search(r"X509v3 Subject Alternative Name:[^\n]*\n\s*((\w+:\S+,\s*)*\w+:\S+)\s*\n", resp) if m is None: return [] ret = [] for nm in m.group(1).split(","): nm = nm.strip() typ, nm = nm.split(":", 1) if typ == "DNS": ret.append(nm) return ret def der(self): import subprocess with subprocess.Popen(["openssl", "req", "-outform", "der"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as openssl: openssl.stdin.write(self.data.encode("us-ascii")) openssl.stdin.close() resp = openssl.stdout.read() if openssl.wait() != 0: raise Exception("openssl error") return resp @classmethod def read(cls, fp): self = cls() self.data = fp.read() return self class jwkauth(object): def __init__(self, key): self.key = key def authdata(self): return {"jwk": {"kty": "RSA", "e": ebignum(self.key.e), "n": ebignum(self.key.n)}} def sign(self, data): dig = Crypto.Hash.SHA256.new() dig.update(data) return Crypto.Signature.PKCS1_v1_5.new(self.key).sign(dig) class account(object): def __init__(self, uri, key): self.uri = uri self.key = key def authdata(self): return {"kid": self.uri} def sign(self, data): dig = Crypto.Hash.SHA256.new() dig.update(data) return Crypto.Signature.PKCS1_v1_5.new(self.key).sign(dig) def getinfo(self): data, headers = jreq(self.uri, None, self) return data def validate(self): data = self.getinfo() if data.get("status", "") != "valid": raise Exception("account is not valid: %s" % (data.get("status", "\"\""))) def write(self, out): out.write("%s\n" % (self.uri,)) out.write("%s\n" % (self.key.exportKey().decode("us-ascii"),)) @classmethod def read(cls, fp): uri = fp.readline() if uri == "": raise Exception("missing account URI") uri = uri.strip() key = Crypto.PublicKey.RSA.importKey(fp.read()) return cls(uri, key) class htconfig(object): def __init__(self): self.roots = {} @classmethod def read(cls, fp): self = cls() for ln in fp: words = ln.split() if len(words) < 1 or ln[0] == '#': continue if words[0] == "root": self.roots[words[1]] = words[2] else: sys.stderr.write("acmecert: warning: unknown htconfig directive: %s\n" % (words[0])) return self def register(keysize=4096): key = Crypto.PublicKey.RSA.generate(keysize, Crypto.Random.new().read) # jwk = {"kty": "RSA", "e": ebignum(key.e), "n": ebignum(key.n)} # cjwk = json.dumps(jwk, separators=(',', ':'), sort_keys=True) data, headers = jreq(directory()["newAccount"], {"termsOfServiceAgreed": True}, jwkauth(key)) return account(headers["Location"], key) def mkorder(acct, csr): data, headers = jreq(directory()["newOrder"], {"identifiers": [{"type": "dns", "value": dn} for dn in csr.domains()]}, acct) data["acmecert.location"] = headers["Location"] return data def httptoken(acct, ch): jwk = {"kty": "RSA", "e": ebignum(acct.key.e), "n": ebignum(acct.key.n)} dig = Crypto.Hash.SHA256.new() dig.update(json.dumps(jwk, separators=(',', ':'), sort_keys=True).encode("us-ascii")) khash = base64url(dig.digest()) return ch["token"], ("%s.%s" % (ch["token"], khash)) def authorder(acct, htconf, orderid): order, headers = jreq(orderid, None, acct) valid = False tries = 0 while not valid: valid = True tries += 1 if tries > 5: raise Exception("challenges refuse to become valid even after 5 retries") for authuri in order["authorizations"]: auth, headers = jreq(authuri, None, acct) if auth["status"] == "valid": continue elif auth["status"] == "pending": pass else: raise Exception("unknown authorization status: %s" % (auth["status"],)) valid = False if auth["identifier"]["type"] != "dns": raise Exception("unknown authorization type: %s" % (auth["identifier"]["type"],)) dn = auth["identifier"]["value"] if dn not in htconf.roots: raise Exception("no configured ht-root for domain name %s" % (dn,)) for ch in auth["challenges"]: if ch["type"] == "http-01": break else: raise Exception("no http-01 challenge for %s" % (dn,)) root = htconf.roots[dn] tokid, tokval = httptoken(acct, ch) tokpath = os.path.join(root, tokid); fp = open(tokpath, "w") try: with fp: fp.write(tokval) with req("http://%s/.well-known/acme-challenge/%s" % (dn, tokid)) as resp: if resp.read().decode("utf-8") != tokval: raise Exception("challenge from %s does not match written value" % (dn,)) for n in range(30): resp, headers = jreq(ch["url"], {}, acct) if resp["status"] == "processing": time.sleep(2) elif resp["status"] == "valid": break else: raise Exception("unexpected challenge status for %s when validating: %s" % (dn, resp["status"])) else: raise Exception("challenge processing timed out for %s" % (dn,)) finally: os.unlink(tokpath) def finalize(acct, csr, orderid): order, headers = jreq(orderid, None, acct) if order["status"] == "valid": pass elif order["status"] == "ready": jreq(order["finalize"], {"csr": base64url(csr.der())}, acct) for n in range(30): resp, headers = jreq(orderid, None, acct) if resp["status"] == "processing": time.sleep(2) elif resp["status"] == "valid": order = resp break else: raise Exception("unexpected order status when finalizing: %s" % resp["status"]) else: raise Exception("order finalization timed out") else: raise Exception("unexpected order state when finalizing: %s" % (order["status"],)) with req(order["certificate"]) as resp: return resp.read().decode("us-ascii") class maybeopen(object): def __init__(self, name, mode): if name == "-": self.opened = False if mode == "r": self.fp = sys.stdin elif mode == "w": self.fp = sys.stdout else: raise ValueError(mode) else: self.opened = True self.fp = open(name, mode) def __enter__(self): return self.fp def __exit__(self, *excinfo): if self.opened: self.fp.close() return False class usageerr(Exception): pass commands = {} def cmd_reg(args): "usage: acmecert reg [OUTPUT-FILE]" acct = register() os.umask(0o077) with maybeopen(args[1] if len(args) > 1 else "-", "w") as fp: acct.write(fp) commands["reg"] = cmd_reg def cmd_validate_acct(args): "usage: acmecert validate-acct ACCOUNT-FILE" if len(args) < 2: raise usageerr() with maybeopen(args[1], "r") as fp: account.read(fp).validate() commands["validate-acct"] = cmd_validate_acct def cmd_acct_info(args): "usage: acmecert acct-info ACCOUNT-FILE" if len(args) < 2: raise usageerr() with maybeopen(args[1], "r") as fp: pprint.pprint(account.read(fp).getinfo()) commands["acct-info"] = cmd_acct_info def cmd_order(args): "usage: acmecert order ACCOUNT-FILE CSR [OUTPUT-FILE]" if len(args) < 4: raise usageerr() with maybeopen(args[1], "r") as fp: acct = account.read(fp) with maybeopen(args[2], "r") as fp: csr = signreq.read(fp) order = mkorder(acct, csr) with maybeopen(args[3] if len(args) > 3 else "-", "w") as fp: fp.write("%s\n" % (order["acmecert.location"])) commands["order"] = cmd_order def cmd_http_auth(args): "usage: acmecert http-auth ACCOUNT-FILE HTTP-CONFIG {ORDER-ID|ORDER-FILE}" if len(args) < 4: raise usageerr() with maybeopen(args[1], "r") as fp: acct = account.read(fp) with maybeopen(args[2], "r") as fp: htconf = htconfig.read(fp) if "://" in args[3]: orderid = args[3] else: with maybeopen(args[3], "r") as fp: orderid = fp.readline().strip() authorder(acct, htconf, orderid) commands["http-auth"] = cmd_http_auth def cmd_get(args): "usage: acmecert get ACCOUNT-FILE CSR {ORDER-ID|ORDER-FILE}" if len(args) < 4: raise usageerr() with maybeopen(args[1], "r") as fp: acct = account.read(fp) with maybeopen(args[2], "r") as fp: csr = signreq.read(fp) if "://" in args[3]: orderid = args[3] else: with maybeopen(args[3], "r") as fp: orderid = fp.readline().strip() sys.stdout.write(finalize(acct, csr, orderid)) commands["get"] = cmd_get def cmd_http_order(args): "usage: acmecert http-order ACCOUNT-FILE CSR HTTP-CONFIG [OUTPUT-FILE]" if len(args) < 4: raise usageerr() with maybeopen(args[1], "r") as fp: acct = account.read(fp) with maybeopen(args[2], "r") as fp: csr = signreq.read(fp) with maybeopen(args[3], "r") as fp: htconf = htconfig.read(fp) orderid = mkorder(acct, csr)["acmecert.location"] authorder(acct, htconf, orderid) with maybeopen(args[4] if len(args) > 4 else "-", "w") as fp: fp.write(finalize(acct, csr, orderid)) commands["http-order"] = cmd_http_order def cmd_check_cert(args): "usage: acmecert check-cert CERT-FILE TIME-SPEC" if len(args) < 3: raise usageerr() with maybeopen(args[1], "r") as fp: crt = certificate.read(fp) sys.exit(1 if crt.expiring(args[2]) else 0) commands["check-cert"] = cmd_check_cert def cmd_directory(args): "usage: acmecert directory" pprint.pprint(directory()) commands["directory"] = cmd_directory def usage(out): out.write("usage: acmecert [-D SERVICE] COMMAND [ARGS...]\n") out.write(" acmecert -h [COMMAND]\n") buf = " COMMAND is any of: " f = True for cmd in commands: if len(buf) + len(cmd) > 70: out.write("%s\n" % (buf,)) buf = " " f = True if not f: buf += ", " buf += cmd f = False if not f: out.write("%s\n" % (buf,)) def main(argv): global service opts, args = getopt.getopt(argv[1:], "hD:") for o, a in opts: if o == "-h": if len(args) > 0: cmd = commands.get(args[0]) if cmd is None: sys.stderr.write("acmecert: unknown command: %s\n" % (args[0],)) sys.exit(1) sys.stdout.write("%s\n" % (cmd.__doc__,)) else: usage(sys.stdout) sys.exit(0) elif o == "-D": service = a if len(args) < 1: usage(sys.stderr) sys.exit(1) cmd = commands.get(args[0]) if cmd is None: sys.stderr.write("acmecert: unknown command: %s\n" % (args[0],)) usage(sys.stderr) sys.exit(1) try: cmd(args) except usageerr: sys.stderr.write("%s\n" % (cmd.__doc__,)) sys.exit(1) if __name__ == "__main__": try: main(sys.argv) except KeyboardInterrupt: signal.signal(signal.SIGINT, signal.SIG_DFL) os.kill(os.getpid(), signal.SIGINT)