From: Fredrik Tolf Date: Tue, 16 Nov 2021 13:26:58 +0000 (+0100) Subject: acmecert: Added a simple built-in DER decoder to not have to rely on external openssl. X-Git-Url: http://git.dolda2000.com/gitweb/?p=utils.git;a=commitdiff_plain;h=d2252d106a1bf0a18e0838b5f945ada648029773 acmecert: Added a simple built-in DER decoder to not have to rely on external openssl. --- diff --git a/acmecert b/acmecert index 4c56ad8..56b1a58 100755 --- a/acmecert +++ b/acmecert @@ -2,7 +2,7 @@ #### ACME client (only http-01 challenges supported thus far) -import sys, os, getopt, binascii, json, pprint, signal, time, threading +import sys, os, getopt, binascii, json, pprint, signal, time, calendar, threading import urllib.request import Crypto.PublicKey.RSA, Crypto.Random, Crypto.Hash.SHA256, Crypto.Signature.PKCS1_v1_5 @@ -44,20 +44,202 @@ class maybeopen(object): ### Crypto utilities +class dererror(Exception): + pass + +class pemerror(Exception): + pass + +def pemdec(pem, ptypes): + if isinstance(ptypes, str): + ptypes = [ptypes] + p = 0 + while True: + p = pem.find("-----BEGIN ", p) + if p < 0: + raise pemerror("could not find any %s in PEM-encoded data" % (ptypes,)) + p2 = pem.find("-----", p + 11) + if p2 < 0: + raise pemerror("incomplete PEM header") + ptype = pem[p + 11 : p2] + if ptype not in ptypes: + p = p2 + 5 + continue + p3 = pem.find("-----END " + ptype + "-----", p2 + 5) + if p3 < 0: + raise pemerror("incomplete PEM data") + pem = pem[p2 + 5 : p3] + return binascii.a2b_base64(pem) + +class derdecoder(object): + def __init__(self, data, offset=0, size=None): + self.data = data + self.offset = offset + self.size = len(data) if size is None else size + + def end(self): + return self.offset >= self.size + + def byte(self): + if self.offset >= self.size: + raise dererror("unexpected end-of-data") + ret = self.data[self.offset] + self.offset += 1 + return ret + + def splice(self, ln): + if self.offset + ln > self.size: + raise dererror("unexpected end-of-data") + ret = self.data[self.offset : self.offset + ln] + self.offset += ln + return ret + + def dectag(self): + h = self.byte() + cl = (h & 0xc0) >> 6 + cons = (h & 0x20) != 0 + tag = h & 0x1f + if tag == 0x1f: + raise dererror("extended type tags not supported") + return cl, cons, tag + + def declen(self): + h = self.byte() + if (h & 0x80) == 0: + return h + if h == 0x80: + raise dererror("indefinite lengths not supported in DER") + if h == 0xff: + raise dererror("invalid length byte") + n = h & 0x7f + ret = 0 + for i in range(n): + ret = (ret << 8) + self.byte() + return ret + + def get(self): + cl, cons, tag = self.dectag() + ln = self.declen() + return cons, cl, tag, self.splice(ln) + + def getcons(self, ckcl, cktag): + cons, cl, tag, data = self.get() + if not cons: + raise dererror("expected constructed value") + if (ckcl != None and ckcl != cl) or (cktag != None and cktag != tag): + raise dererror("unexpected value tag: got (%d, %d), expected (%d, %d)" % (cl, tag, ckcl, cktag)) + return derdecoder(data) + + def getint(self): + cons, cl, tag, data = self.get() + if (cons, cl, tag) == (False, 0, 2): + ret = 0 + for b in data: + ret = (ret << 8) + b + return ret + raise dererror("unexpected integer type: (%s, %d, %d)" % (cons, cl, tag)) + + def getstr(self): + cons, cl, tag, data = self.get() + if (cons, cl, tag) == (False, 0, 12): + return data.decode("utf-8") + if (cons, cl, tag) == (False, 0, 13): + return data.decode("us-ascii") + if (cons, cl, tag) == (False, 0, 22): + return data.decode("us-ascii") + if (cons, cl, tag) == (False, 0, 30): + return data.decode("utf-16-be") + raise dererror("unexpected string type: (%s, %d, %d)" % (cons, cl, tag)) + + def getbytes(self): + cons, cl, tag, data = self.get() + if (cons, cl, tag) == (False, 0, 4): + return data + raise dererror("unexpected byte-string type: (%s, %d, %d)" % (cons, cl, tag)) + + def getoid(self): + cons, cl, tag, data = self.get() + if (cons, cl, tag) == (False, 0, 6): + ret = [] + ret.append(data[0] // 40) + ret.append(data[0] % 40) + p = 1 + while p < len(data): + n = 0 + v = data[p] + p += 1 + while v & 0x80: + n = (n + (v & 0x7f)) * 128 + v = data[p] + p += 1 + n += v + ret.append(n) + return tuple(ret) + raise dererror("unexpected object-id type: (%s, %d, %d)" % (cons, cl, tag)) + + @staticmethod + def parsetime(data, c): + if c: + y = int(data[0:4]) + data = data[4:] + else: + y = int(data[0:2]) + y += 1900 if y > 50 else 2000 + data = data[2:] + m = int(data[0:2]) + d = int(data[2:4]) + H = int(data[4:6]) + data = data[6:] + if data[:1].isdigit(): + M = int(data[0:2]) + data = data[2:] + else: + M = 0 + if data[:1].isdigit(): + S = int(data[0:2]) + data = data[2:] + else: + S = 0 + if data[:1] == '.': + p = 1 + while len(data) < p and data[p].isdigit(): + p += 1 + S += float("0." + data[1:p]) + data = data[p:] + if len(data) < 1: + raise dererror("unspecified local time not supported for decoding") + if data[0] == 'Z': + tz = 0 + elif data[0] == '+': + tz = (int(data[1:3]) * 60) + int(data[3:5]) + elif data[0] == '-': + tz = -((int(data[1:3]) * 60) + int(data[3:5])) + else: + raise dererror("cannot parse X.690 timestamp") + return calendar.timegm((y, m, d, H, M, S)) - (tz * 60) + + def gettime(self): + cons, cl, tag, data = self.get() + if (cons, cl, tag) == (False, 0, 23): + return self.parsetime(data.decode("us-ascii"), False) + if (cons, cl, tag) == (False, 0, 24): + return self.parsetime(data.decode("us-ascii"), True) + raise dererror("unexpected time type: (%s, %d, %d)" % (cons, cl, tag)) + + @classmethod + def frompem(cls, pem, ptypes): + return cls(pemdec(pem, ptypes)) + class certificate(object): - @property - def enddate(self): - # No X509 parser for Python? - import subprocess, re, calendar - with subprocess.Popen(["openssl", "x509", "-noout", "-enddate"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as openssl: - openssl.stdin.write(self.data.encode("us-ascii")) - openssl.stdin.close() - resp = openssl.stdout.read().decode("utf-8") - if openssl.wait() != 0: - raise Exception("openssl error") - m = re.search(r"notAfter=(.*)$", resp) - if m is None: raise Exception("unexpected openssl reply: %r" % (resp,)) - return calendar.timegm(time.strptime(m.group(1), "%b %d %H:%M:%S %Y GMT")) + def __init__(self, der): + ci = der.getcons(0, 16).getcons(0, 16) + self.ver = ci.getcons(2, 0).getint() + self.serial = ci.getint() + ci.getcons(0, 16) # Signature algorithm + ci.getcons(0, 16) # Issuer + vl = ci.getcons(0, 16) + self.startdate = vl.gettime() + self.enddate = vl.gettime() def expiring(self, timespec): if timespec.endswith("y"): @@ -76,46 +258,44 @@ class certificate(object): @classmethod def read(cls, fp): - self = cls() - self.data = fp.read() - return self + return cls(derdecoder.frompem(fp.read(), {"CERTIFICATE", "X509 CERTIFICATE"})) class signreq(object): + def __init__(self, der): + self.raw = der + req = derdecoder(der).getcons(0, 16).getcons(0, 16) + self.ver = req.getint() + req.getcons(0, 16) # Subject + req.getcons(0, 16) # Public key + self.altnames = [] + if not req.end(): + attrs = req.getcons(2, 0) + while not attrs.end(): + attr = attrs.getcons(0, 16) + anm = attr.getoid() + if anm == (1, 2, 840, 113549, 1, 9, 14): + # Certificate extension request + exts = attr.getcons(0, 17).getcons(0, 16) + while not exts.end(): + ext = exts.getcons(0, 16) + extnm = ext.getoid() + if extnm == (2, 5, 29, 17): + # Subject alternative names + names = derdecoder(ext.getbytes()).getcons(0, 16) + while not names.end(): + cons, cl, tag, data = names.get() + if (cons, cl, tag) == (False, 2, 2): + self.altnames.append(("DNS", data.decode("us-ascii"))) + def domains(self): - # No PCKS10 parser for Python? - import subprocess, re - with subprocess.Popen(["openssl", "req", "-noout", "-text"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as openssl: - openssl.stdin.write(self.data.encode("us-ascii")) - openssl.stdin.close() - resp = openssl.stdout.read().decode("utf-8") - if openssl.wait() != 0: - raise Exception("openssl error") - m = re.search(r"X509v3 Subject Alternative Name:[^\n]*\n\s*((\w+:\S+,\s*)*\w+:\S+)\s*\n", resp) - if m is None: - return [] - ret = [] - for nm in m.group(1).split(","): - nm = nm.strip() - typ, nm = nm.split(":", 1) - if typ == "DNS": - ret.append(nm) - return ret + return [nm[1] for nm in self.altnames if nm[0] == "DNS"] def der(self): - import subprocess - with subprocess.Popen(["openssl", "req", "-outform", "der"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as openssl: - openssl.stdin.write(self.data.encode("us-ascii")) - openssl.stdin.close() - resp = openssl.stdout.read() - if openssl.wait() != 0: - raise Exception("openssl error") - return resp + return self.raw @classmethod def read(cls, fp): - self = cls() - self.data = fp.read() - return self + return cls(pemdec(fp.read(), {"CERTIFICATE REQUEST"})) ### Somewhat general request utilities