3 import sys, os, getopt, binascii, json, pprint, signal, time
5 import Crypto.PublicKey.RSA, Crypto.Random, Crypto.Hash.SHA256, Crypto.Signature.PKCS1_v1_5
7 service = "https://acme-v02.api.letsencrypt.org/directory"
11 if _directory is None:
12 with req(service) as resp:
13 _directory = json.loads(resp.read().decode("utf-8"))
17 return binascii.b2a_base64(dat).decode("us-ascii").translate({43: 45, 47: 95, 61: None}).strip()
21 if len(h) % 2 == 1: h = "0" + h
22 return base64url(binascii.a2b_hex(h))
25 with urllib.request.urlopen(directory()["newNonce"]) as resp:
27 return resp.headers["Replay-Nonce"]
29 def req(url, data=None, ctype=None, headers={}, method=None, **kws):
30 if data is not None and not isinstance(data, bytes):
31 data = json.dumps(data).encode("utf-8")
32 ctype = "application/jose+json"
33 req = urllib.request.Request(url, data=data, method=method)
34 for hnam, hval in headers.items():
35 req.add_header(hnam, hval)
37 req.add_header("Content-Type", ctype)
38 return urllib.request.urlopen(req)
40 def jreq(url, data, auth):
41 authdata = {"alg": "RS256", "url": url, "nonce": getnonce()}
42 authdata.update(auth.authdata())
43 authdata = base64url(json.dumps(authdata).encode("us-ascii"))
47 data = base64url(json.dumps(data).encode("us-ascii"))
48 seal = base64url(auth.sign(("%s.%s" % (authdata, data)).encode("us-ascii")))
49 enc = {"protected": authdata, "payload": data, "signature": seal}
50 with req(url, data=enc) as resp:
51 return json.loads(resp.read().decode("utf-8")), resp.headers
53 class signreq(object):
55 # No PCKS10 parser for Python?
57 with subprocess.Popen(["openssl", "req", "-noout", "-text"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as openssl:
58 openssl.stdin.write(self.data.encode("us-ascii"))
60 resp = openssl.stdout.read().decode("utf8")
61 if openssl.wait() != 0:
62 raise Exception("openssl error")
63 m = re.search(r"X509v3 Subject Alternative Name:[^\n]*\n\s*((\w+:\S+,\s*)*\w+:\S+)\s*\n", resp)
67 for nm in m.group(1).split(","):
69 typ, nm = nm.split(":", 1)
76 with subprocess.Popen(["openssl", "req", "-outform", "der"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as openssl:
77 openssl.stdin.write(self.data.encode("us-ascii"))
79 resp = openssl.stdout.read()
80 if openssl.wait() != 0:
81 raise Exception("openssl error")
90 class jwkauth(object):
91 def __init__(self, key):
95 return {"jwk": {"kty": "RSA", "e": ebignum(self.key.e), "n": ebignum(self.key.n)}}
98 dig = Crypto.Hash.SHA256.new()
100 return Crypto.Signature.PKCS1_v1_5.new(self.key).sign(dig)
102 class account(object):
103 def __init__(self, uri, key):
108 return {"kid": self.uri}
110 def sign(self, data):
111 dig = Crypto.Hash.SHA256.new()
113 return Crypto.Signature.PKCS1_v1_5.new(self.key).sign(dig)
116 data, headers = jreq(self.uri, None, self)
120 data = self.getinfo()
121 if data.get("status", "") != "valid":
122 raise Exception("account is not valid: %s" % (data.get("status", "\"\"")))
124 def write(self, out):
125 out.write("%s\n" % (self.uri,))
126 out.write("%s\n" % (self.key.exportKey().decode("us-ascii"),))
132 raise Exception("missing account URI")
134 key = Crypto.PublicKey.RSA.importKey(fp.read())
137 class htconfig(object):
146 if len(words) < 1 or ln[0] == '#':
148 if words[0] == "root":
149 self.roots[words[1]] = words[2]
151 sys.stderr.write("acmecert: warning: unknown htconfig directive: %s\n" % (words[0]))
154 def register(keysize=4096):
155 key = Crypto.PublicKey.RSA.generate(keysize, Crypto.Random.new().read)
156 # jwk = {"kty": "RSA", "e": ebignum(key.e), "n": ebignum(key.n)}
157 # cjwk = json.dumps(jwk, separators=(',', ':'), sort_keys=True)
158 data, headers = jreq(directory()["newAccount"], {"termsOfServiceAgreed": True}, jwkauth(key))
159 return account(headers["Location"], key)
161 def mkorder(acct, csr):
162 data, headers = jreq(directory()["newOrder"], {"identifiers": [{"type": "dns", "value": dn} for dn in csr.domains()]}, acct)
163 data["acmecert.location"] = headers["Location"]
166 def httptoken(acct, ch):
167 jwk = {"kty": "RSA", "e": ebignum(acct.key.e), "n": ebignum(acct.key.n)}
168 dig = Crypto.Hash.SHA256.new()
169 dig.update(json.dumps(jwk, separators=(',', ':'), sort_keys=True).encode("us-ascii"))
170 khash = base64url(dig.digest())
171 return ch["token"], ("%s.%s" % (ch["token"], khash))
173 def authorder(acct, htconf, orderid):
174 order, headers = jreq(orderid, None, acct)
181 raise Exception("challenges refuse to become valid even after 5 retries")
182 for authuri in order["authorizations"]:
183 auth, headers = jreq(authuri, None, acct)
184 if auth["status"] == "valid":
186 elif auth["status"] == "pending":
189 raise Exception("unknown authorization status: %s" % (auth["status"],))
191 if auth["identifier"]["type"] != "dns":
192 raise Exception("unknown authorization type: %s" % (auth["identifier"]["type"],))
193 dn = auth["identifier"]["value"]
194 if dn not in htconf.roots:
195 raise Exception("no configured ht-root for domain name %s" % (dn,))
196 for ch in auth["challenges"]:
197 if ch["type"] == "http-01":
200 raise Exception("no http-01 challenge for %s" % (dn,))
201 root = htconf.roots[dn]
202 tokid, tokval = httptoken(acct, ch)
203 tokpath = os.path.join(root, tokid);
204 fp = open(tokpath, "w")
208 with req("http://%s/.well-known/acme-challenge/%s" % (dn, tokid)) as resp:
209 if resp.read().decode("utf-8") != tokval:
210 raise Exception("challenge from %s does not match written value" % (dn,))
212 resp, headers = jreq(ch["url"], {}, acct)
213 if resp["status"] == "processing":
215 elif resp["status"] == "valid":
218 raise Exception("unexpected challenge status for %s when validating: %s" % (dn, resp["status"]))
220 raise Exception("challenge processing timed out for %s" % (dn,))
224 def finalize(acct, csr, orderid):
225 order, headers = jreq(orderid, None, acct)
226 if order["status"] == "valid":
228 elif order["status"] == "ready":
229 jreq(order["finalize"], {"csr": base64url(csr.der())}, acct)
231 resp, headers = jreq(orderid, None, acct)
232 if resp["status"] == "processing":
234 elif resp["status"] == "valid":
238 raise Exception("unexpected order status when finalizing: %s" % resp["status"])
240 raise Exception("order finalization timed out")
242 raise Exception("unexpected order state when finalizing: %s" % (order["status"],))
243 with req(order["certificate"]) as resp:
244 return resp.read().decode("us-ascii")
247 out.write("usage: acmecert [-h] [-D SERVICE]\n")
251 opts, args = getopt.getopt(argv[1:], "hD:")
262 register().write(sys.stdout)
263 elif args[0] == "validate-acct":
264 with open(args[1], "r") as fp:
265 account.read(fp).validate()
266 elif args[0] == "acctinfo":
267 with open(args[1], "r") as fp:
268 pprint.pprint(account.read(fp).getinfo())
269 elif args[0] == "order":
270 with open(args[1], "r") as fp:
271 acct = account.read(fp)
272 with open(args[2], "r") as fp:
273 csr = signreq.read(fp)
274 order = mkorder(acct, csr)
275 with open(args[3], "w") as fp:
276 fp.write("%s\n" % (order["acmecert.location"]))
277 elif args[0] == "http-auth":
278 with open(args[1], "r") as fp:
279 acct = account.read(fp)
280 with open(args[2], "r") as fp:
281 htconf = htconfig.read(fp)
282 with open(args[3], "r") as fp:
283 orderid = fp.readline().strip()
284 authorder(acct, htconf, orderid)
285 elif args[0] == "get":
286 with open(args[1], "r") as fp:
287 acct = account.read(fp)
288 with open(args[2], "r") as fp:
289 csr = signreq.read(fp)
290 with open(args[3], "r") as fp:
291 orderid = fp.readline().strip()
292 sys.stdout.write(finalize(acct, csr, orderid))
293 elif args[0] == "http-order":
294 with open(args[1], "r") as fp:
295 acct = account.read(fp)
296 with open(args[2], "r") as fp:
297 csr = signreq.read(fp)
298 with open(args[3], "r") as fp:
299 htconf = htconfig.read(fp)
300 orderid = mkorder(acct, csr)["acmecert.location"]
301 authorder(acct, htconf, orderid)
302 sys.stdout.write(finalize(acct, csr, orderid))
303 elif args[0] == "directory":
304 pprint.pprint(directory())
306 sys.stderr.write("acmecert: unknown command: %s\n" % (args[0],))
310 if __name__ == "__main__":
313 except KeyboardInterrupt:
314 signal.signal(signal.SIGINT, signal.SIG_DFL)
315 os.kill(os.getpid(), signal.SIGINT)