#!/usr/bin/python3
+#### ACME client (only http-01 challenges supported thus far)
+
import sys, os, getopt, binascii, json, pprint, signal, time, threading
import urllib.request
import Crypto.PublicKey.RSA, Crypto.Random, Crypto.Hash.SHA256, Crypto.Signature.PKCS1_v1_5
+### General utilities
+
class msgerror(Exception):
def report(self, out):
out.write("acmecert: undefined error\n")
-service = "https://acme-v02.api.letsencrypt.org/directory"
-_directory = None
-def directory():
- global _directory
- if _directory is None:
- with req(service) as resp:
- _directory = json.load(resp)
- return _directory
-
def base64url(dat):
return binascii.b2a_base64(dat).decode("us-ascii").translate({43: 45, 47: 95, 61: None}).strip()
if len(h) % 2 == 1: h = "0" + h
return base64url(binascii.a2b_hex(h))
-def getnonce():
- with urllib.request.urlopen(directory()["newNonce"]) as resp:
- resp.read()
- return resp.headers["Replay-Nonce"]
-
-def req(url, data=None, ctype=None, headers={}, method=None, **kws):
- if data is not None and not isinstance(data, bytes):
- data = json.dumps(data).encode("utf-8")
- ctype = "application/jose+json"
- req = urllib.request.Request(url, data=data, method=method)
- for hnam, hval in headers.items():
- req.add_header(hnam, hval)
- if ctype is not None:
- req.add_header("Content-Type", ctype)
- return urllib.request.urlopen(req)
-
-class problem(msgerror):
- def __init__(self, code, data, *args, url=None, **kw):
- super().__init__(*args, **kw)
- self.code = code
- self.data = data
- self.url = url
- if not isinstance(data, dict):
- raise ValueError("unexpected problem object type: %r" % (data,))
-
- @property
- def type(self):
- return self.data.get("type", "about:blank")
- @property
- def title(self):
- return self.data.get("title")
- @property
- def detail(self):
- return self.data.get("detail")
-
- def report(self, out):
- extra = None
- if self.title is None:
- msg = self.detail
- if "\n" in msg:
- extra, msg = msg, None
+class maybeopen(object):
+ def __init__(self, name, mode):
+ if name == "-":
+ self.opened = False
+ if mode == "r":
+ self.fp = sys.stdin
+ elif mode == "w":
+ self.fp = sys.stdout
+ else:
+ raise ValueError(mode)
else:
- msg = self.title
- extra = self.detail
- if msg is None:
- msg = self.data.get("type")
- if msg is not None:
- out.write("acemcert: %s: %s\n" % (
- ("remote service error" if self.url is None else self.url),
- ("unspecified error" if msg is None else msg)))
- if extra is not None:
- out.write("%s\n" % (extra,))
+ self.opened = True
+ self.fp = open(name, mode)
- @classmethod
- def read(cls, err, **kw):
- self = cls(err.code, json.load(err), **kw)
- return self
+ def __enter__(self):
+ return self.fp
-def jreq(url, data, auth):
- authdata = {"alg": "RS256", "url": url, "nonce": getnonce()}
- authdata.update(auth.authdata())
- authdata = base64url(json.dumps(authdata).encode("us-ascii"))
- if data is None:
- data = ""
- else:
- data = base64url(json.dumps(data).encode("us-ascii"))
- seal = base64url(auth.sign(("%s.%s" % (authdata, data)).encode("us-ascii")))
- enc = {"protected": authdata, "payload": data, "signature": seal}
- try:
- with req(url, data=enc) as resp:
- return json.load(resp), resp.headers
- except urllib.error.HTTPError as exc:
- if exc.headers["Content-Type"] == "application/problem+json":
- raise problem.read(exc, url=url)
- raise
+ def __exit__(self, *excinfo):
+ if self.opened:
+ self.fp.close()
+ return False
+
+### Crypto utilities
class certificate(object):
@property
self.data = fp.read()
return self
+### Somewhat general request utilities
+
+def getnonce():
+ with urllib.request.urlopen(directory()["newNonce"]) as resp:
+ resp.read()
+ return resp.headers["Replay-Nonce"]
+
+def req(url, data=None, ctype=None, headers={}, method=None, **kws):
+ if data is not None and not isinstance(data, bytes):
+ data = json.dumps(data).encode("utf-8")
+ ctype = "application/jose+json"
+ req = urllib.request.Request(url, data=data, method=method)
+ for hnam, hval in headers.items():
+ req.add_header(hnam, hval)
+ if ctype is not None:
+ req.add_header("Content-Type", ctype)
+ return urllib.request.urlopen(req)
+
+class problem(msgerror):
+ def __init__(self, code, data, *args, url=None, **kw):
+ super().__init__(*args, **kw)
+ self.code = code
+ self.data = data
+ self.url = url
+ if not isinstance(data, dict):
+ raise ValueError("unexpected problem object type: %r" % (data,))
+
+ @property
+ def type(self):
+ return self.data.get("type", "about:blank")
+ @property
+ def title(self):
+ return self.data.get("title")
+ @property
+ def detail(self):
+ return self.data.get("detail")
+
+ def report(self, out):
+ extra = None
+ if self.title is None:
+ msg = self.detail
+ if "\n" in msg:
+ extra, msg = msg, None
+ else:
+ msg = self.title
+ extra = self.detail
+ if msg is None:
+ msg = self.data.get("type")
+ if msg is not None:
+ out.write("acemcert: %s: %s\n" % (
+ ("remote service error" if self.url is None else self.url),
+ ("unspecified error" if msg is None else msg)))
+ if extra is not None:
+ out.write("%s\n" % (extra,))
+
+ @classmethod
+ def read(cls, err, **kw):
+ self = cls(err.code, json.load(err), **kw)
+ return self
+
+def jreq(url, data, auth):
+ authdata = {"alg": "RS256", "url": url, "nonce": getnonce()}
+ authdata.update(auth.authdata())
+ authdata = base64url(json.dumps(authdata).encode("us-ascii"))
+ if data is None:
+ data = ""
+ else:
+ data = base64url(json.dumps(data).encode("us-ascii"))
+ seal = base64url(auth.sign(("%s.%s" % (authdata, data)).encode("us-ascii")))
+ enc = {"protected": authdata, "payload": data, "signature": seal}
+ try:
+ with req(url, data=enc) as resp:
+ return json.load(resp), resp.headers
+ except urllib.error.HTTPError as exc:
+ if exc.headers["Content-Type"] == "application/problem+json":
+ raise problem.read(exc, url=url)
+ raise
+
+## Authentication
+
class jwkauth(object):
def __init__(self, key):
self.key = key
key = Crypto.PublicKey.RSA.importKey(fp.read())
return cls(uri, key)
-class htconfig(object):
- def __init__(self):
- self.roots = {}
+### ACME protocol
- @classmethod
- def read(cls, fp):
- self = cls()
- for ln in fp:
- words = ln.split()
- if len(words) < 1 or ln[0] == '#':
- continue
- if words[0] == "root":
- self.roots[words[1]] = words[2]
- else:
- sys.stderr.write("acmecert: warning: unknown htconfig directive: %s\n" % (words[0]))
- return self
+service = "https://acme-v02.api.letsencrypt.org/directory"
+_directory = None
+def directory():
+ global _directory
+ if _directory is None:
+ with req(service) as resp:
+ _directory = json.load(resp)
+ return _directory
def register(keysize=4096):
key = Crypto.PublicKey.RSA.generate(keysize, Crypto.Random.new().read)
khash = base64url(dig.digest())
return ch["token"], ("%s.%s" % (ch["token"], khash))
+def finalize(acct, csr, orderid):
+ order, headers = jreq(orderid, None, acct)
+ if order["status"] == "valid":
+ pass
+ elif order["status"] == "ready":
+ jreq(order["finalize"], {"csr": base64url(csr.der())}, acct)
+ for n in range(30):
+ resp, headers = jreq(orderid, None, acct)
+ if resp["status"] == "processing":
+ time.sleep(2)
+ elif resp["status"] == "valid":
+ order = resp
+ break
+ else:
+ raise Exception("unexpected order status when finalizing: %s" % resp["status"])
+ else:
+ raise Exception("order finalization timed out")
+ else:
+ raise Exception("unexpected order state when finalizing: %s" % (order["status"],))
+ with req(order["certificate"]) as resp:
+ return resp.read().decode("us-ascii")
+
+## http-01 challenge
+
+class htconfig(object):
+ def __init__(self):
+ self.roots = {}
+
+ @classmethod
+ def read(cls, fp):
+ self = cls()
+ for ln in fp:
+ words = ln.split()
+ if len(words) < 1 or ln[0] == '#':
+ continue
+ if words[0] == "root":
+ self.roots[words[1]] = words[2]
+ else:
+ sys.stderr.write("acmecert: warning: unknown htconfig directive: %s\n" % (words[0]))
+ return self
+
def authorder(acct, htconf, orderid):
order, headers = jreq(orderid, None, acct)
valid = False
finally:
os.unlink(tokpath)
-def finalize(acct, csr, orderid):
- order, headers = jreq(orderid, None, acct)
- if order["status"] == "valid":
- pass
- elif order["status"] == "ready":
- jreq(order["finalize"], {"csr": base64url(csr.der())}, acct)
- for n in range(30):
- resp, headers = jreq(orderid, None, acct)
- if resp["status"] == "processing":
- time.sleep(2)
- elif resp["status"] == "valid":
- order = resp
- break
- else:
- raise Exception("unexpected order status when finalizing: %s" % resp["status"])
- else:
- raise Exception("order finalization timed out")
- else:
- raise Exception("unexpected order state when finalizing: %s" % (order["status"],))
- with req(order["certificate"]) as resp:
- return resp.read().decode("us-ascii")
-
-class maybeopen(object):
- def __init__(self, name, mode):
- if name == "-":
- self.opened = False
- if mode == "r":
- self.fp = sys.stdin
- elif mode == "w":
- self.fp = sys.stdout
- else:
- raise ValueError(mode)
- else:
- self.opened = True
- self.fp = open(name, mode)
-
- def __enter__(self):
- return self.fp
-
- def __exit__(self, *excinfo):
- if self.opened:
- self.fp.close()
- return False
+### Invocation and commands
invdata = threading.local()
commands = {}
def report(self, out):
out.write("%s\n" % (self.cmd.__doc__,))
+## User commands
+
def cmd_reg(args):
"usage: acmecert reg [OUTPUT-FILE]"
acct = register()
pprint.pprint(directory())
commands["directory"] = cmd_directory
+## Main invocation
+
def usage(out):
out.write("usage: acmecert [-D SERVICE] COMMAND [ARGS...]\n")
out.write(" acmecert -h [COMMAND]\n")