acmecert: Added a simple built-in DER decoder to not have to rely on external openssl.
authorFredrik Tolf <fredrik@dolda2000.com>
Tue, 16 Nov 2021 13:26:58 +0000 (14:26 +0100)
committerFredrik Tolf <fredrik@dolda2000.com>
Tue, 16 Nov 2021 13:26:58 +0000 (14:26 +0100)
acmecert

index 4c56ad8..56b1a58 100755 (executable)
--- a/acmecert
+++ b/acmecert
@@ -2,7 +2,7 @@
 
 #### ACME client (only http-01 challenges supported thus far)
 
-import sys, os, getopt, binascii, json, pprint, signal, time, threading
+import sys, os, getopt, binascii, json, pprint, signal, time, calendar, threading
 import urllib.request
 import Crypto.PublicKey.RSA, Crypto.Random, Crypto.Hash.SHA256, Crypto.Signature.PKCS1_v1_5
 
@@ -44,20 +44,202 @@ class maybeopen(object):
 
 ### Crypto utilities
 
+class dererror(Exception):
+    pass
+
+class pemerror(Exception):
+    pass
+
+def pemdec(pem, ptypes):
+    if isinstance(ptypes, str):
+        ptypes = [ptypes]
+    p = 0
+    while True:
+        p = pem.find("-----BEGIN ", p)
+        if p < 0:
+            raise pemerror("could not find any %s in PEM-encoded data" % (ptypes,))
+        p2 = pem.find("-----", p + 11)
+        if p2 < 0:
+            raise pemerror("incomplete PEM header")
+        ptype = pem[p + 11 : p2]
+        if ptype not in ptypes:
+            p = p2 + 5
+            continue
+        p3 = pem.find("-----END " + ptype + "-----", p2 + 5)
+        if p3 < 0:
+            raise pemerror("incomplete PEM data")
+        pem = pem[p2 + 5 : p3]
+        return binascii.a2b_base64(pem)
+
+class derdecoder(object):
+    def __init__(self, data, offset=0, size=None):
+        self.data = data
+        self.offset = offset
+        self.size = len(data) if size is None else size
+
+    def end(self):
+        return self.offset >= self.size
+
+    def byte(self):
+        if self.offset >= self.size:
+            raise dererror("unexpected end-of-data")
+        ret = self.data[self.offset]
+        self.offset += 1
+        return ret
+
+    def splice(self, ln):
+        if self.offset + ln > self.size:
+            raise dererror("unexpected end-of-data")
+        ret = self.data[self.offset : self.offset + ln]
+        self.offset += ln
+        return ret
+
+    def dectag(self):
+        h = self.byte()
+        cl = (h & 0xc0) >> 6
+        cons = (h & 0x20) != 0
+        tag = h & 0x1f
+        if tag == 0x1f:
+            raise dererror("extended type tags not supported")
+        return cl, cons, tag
+
+    def declen(self):
+        h = self.byte()
+        if (h & 0x80) == 0:
+            return h
+        if h == 0x80:
+            raise dererror("indefinite lengths not supported in DER")
+        if h == 0xff:
+            raise dererror("invalid length byte")
+        n = h & 0x7f
+        ret = 0
+        for i in range(n):
+            ret = (ret << 8) + self.byte()
+        return ret
+
+    def get(self):
+        cl, cons, tag = self.dectag()
+        ln = self.declen()
+        return cons, cl, tag, self.splice(ln)
+
+    def getcons(self, ckcl, cktag):
+        cons, cl, tag, data = self.get()
+        if not cons:
+            raise dererror("expected constructed value")
+        if (ckcl != None and ckcl != cl) or (cktag != None and cktag != tag):
+            raise dererror("unexpected value tag: got (%d, %d), expected (%d, %d)" % (cl, tag, ckcl, cktag))
+        return derdecoder(data)
+
+    def getint(self):
+        cons, cl, tag, data = self.get()
+        if (cons, cl, tag) == (False, 0, 2):
+            ret = 0
+            for b in data:
+                ret = (ret << 8) + b
+            return ret
+        raise dererror("unexpected integer type: (%s, %d, %d)" % (cons, cl, tag))
+
+    def getstr(self):
+        cons, cl, tag, data = self.get()
+        if (cons, cl, tag) == (False, 0, 12):
+            return data.decode("utf-8")
+        if (cons, cl, tag) == (False, 0, 13):
+            return data.decode("us-ascii")
+        if (cons, cl, tag) == (False, 0, 22):
+            return data.decode("us-ascii")
+        if (cons, cl, tag) == (False, 0, 30):
+            return data.decode("utf-16-be")
+        raise dererror("unexpected string type: (%s, %d, %d)" % (cons, cl, tag))
+
+    def getbytes(self):
+        cons, cl, tag, data = self.get()
+        if (cons, cl, tag) == (False, 0, 4):
+            return data
+        raise dererror("unexpected byte-string type: (%s, %d, %d)" % (cons, cl, tag))
+
+    def getoid(self):
+        cons, cl, tag, data = self.get()
+        if (cons, cl, tag) == (False, 0, 6):
+            ret = []
+            ret.append(data[0] // 40)
+            ret.append(data[0] % 40)
+            p = 1
+            while p < len(data):
+                n = 0
+                v = data[p]
+                p += 1
+                while v & 0x80:
+                    n = (n + (v & 0x7f)) * 128
+                    v = data[p]
+                    p += 1
+                n += v
+                ret.append(n)
+            return tuple(ret)
+        raise dererror("unexpected object-id type: (%s, %d, %d)" % (cons, cl, tag))
+
+    @staticmethod
+    def parsetime(data, c):
+        if c:
+            y = int(data[0:4])
+            data = data[4:]
+        else:
+            y = int(data[0:2])
+            y += 1900 if y > 50 else 2000
+            data = data[2:]
+        m = int(data[0:2])
+        d = int(data[2:4])
+        H = int(data[4:6])
+        data = data[6:]
+        if data[:1].isdigit():
+            M = int(data[0:2])
+            data = data[2:]
+        else:
+            M = 0
+        if data[:1].isdigit():
+            S = int(data[0:2])
+            data = data[2:]
+        else:
+            S = 0
+        if data[:1] == '.':
+            p = 1
+            while len(data) < p and data[p].isdigit():
+                p += 1
+            S += float("0." + data[1:p])
+            data = data[p:]
+        if len(data) < 1:
+            raise dererror("unspecified local time not supported for decoding")
+        if data[0] == 'Z':
+            tz = 0
+        elif data[0] == '+':
+            tz = (int(data[1:3]) * 60) + int(data[3:5])
+        elif data[0] == '-':
+            tz = -((int(data[1:3]) * 60) + int(data[3:5]))
+        else:
+            raise dererror("cannot parse X.690 timestamp")
+        return calendar.timegm((y, m, d, H, M, S)) - (tz * 60)
+
+    def gettime(self):
+        cons, cl, tag, data = self.get()
+        if (cons, cl, tag) == (False, 0, 23):
+            return self.parsetime(data.decode("us-ascii"), False)
+        if (cons, cl, tag) == (False, 0, 24):
+            return self.parsetime(data.decode("us-ascii"), True)
+        raise dererror("unexpected time type: (%s, %d, %d)" % (cons, cl, tag))
+
+    @classmethod
+    def frompem(cls, pem, ptypes):
+        return cls(pemdec(pem, ptypes))
+
 class certificate(object):
-    @property
-    def enddate(self):
-        # No X509 parser for Python?
-        import subprocess, re, calendar
-        with subprocess.Popen(["openssl", "x509", "-noout", "-enddate"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as openssl:
-            openssl.stdin.write(self.data.encode("us-ascii"))
-            openssl.stdin.close()
-            resp = openssl.stdout.read().decode("utf-8")
-            if openssl.wait() != 0:
-                raise Exception("openssl error")
-        m = re.search(r"notAfter=(.*)$", resp)
-        if m is None: raise Exception("unexpected openssl reply: %r" % (resp,))
-        return calendar.timegm(time.strptime(m.group(1), "%b %d %H:%M:%S %Y GMT"))
+    def __init__(self, der):
+        ci = der.getcons(0, 16).getcons(0, 16)
+        self.ver = ci.getcons(2, 0).getint()
+        self.serial = ci.getint()
+        ci.getcons(0, 16)       # Signature algorithm
+        ci.getcons(0, 16)       # Issuer
+        vl = ci.getcons(0, 16)
+        self.startdate = vl.gettime()
+        self.enddate = vl.gettime()
 
     def expiring(self, timespec):
         if timespec.endswith("y"):
@@ -76,46 +258,44 @@ class certificate(object):
 
     @classmethod
     def read(cls, fp):
-        self = cls()
-        self.data = fp.read()
-        return self
+        return cls(derdecoder.frompem(fp.read(), {"CERTIFICATE", "X509 CERTIFICATE"}))
 
 class signreq(object):
+    def __init__(self, der):
+        self.raw = der
+        req = derdecoder(der).getcons(0, 16).getcons(0, 16)
+        self.ver = req.getint()
+        req.getcons(0, 16)      # Subject
+        req.getcons(0, 16)      # Public key
+        self.altnames = []
+        if not req.end():
+            attrs = req.getcons(2, 0)
+            while not attrs.end():
+                attr = attrs.getcons(0, 16)
+                anm = attr.getoid()
+                if anm == (1, 2, 840, 113549, 1, 9, 14):
+                    # Certificate extension request
+                    exts = attr.getcons(0, 17).getcons(0, 16)
+                    while not exts.end():
+                        ext = exts.getcons(0, 16)
+                        extnm = ext.getoid()
+                        if extnm == (2, 5, 29, 17):
+                            # Subject alternative names
+                            names = derdecoder(ext.getbytes()).getcons(0, 16)
+                            while not names.end():
+                                cons, cl, tag, data = names.get()
+                                if (cons, cl, tag) == (False, 2, 2):
+                                    self.altnames.append(("DNS", data.decode("us-ascii")))
+
     def domains(self):
-        # No PCKS10 parser for Python?
-        import subprocess, re
-        with subprocess.Popen(["openssl", "req", "-noout", "-text"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as openssl:
-            openssl.stdin.write(self.data.encode("us-ascii"))
-            openssl.stdin.close()
-            resp = openssl.stdout.read().decode("utf-8")
-            if openssl.wait() != 0:
-                raise Exception("openssl error")
-        m = re.search(r"X509v3 Subject Alternative Name:[^\n]*\n\s*((\w+:\S+,\s*)*\w+:\S+)\s*\n", resp)
-        if m is None:
-            return []
-        ret = []
-        for nm in m.group(1).split(","):
-            nm = nm.strip()
-            typ, nm = nm.split(":", 1)
-            if typ == "DNS":
-                ret.append(nm)
-        return ret
+        return [nm[1] for nm in self.altnames if nm[0] == "DNS"]
 
     def der(self):
-        import subprocess
-        with subprocess.Popen(["openssl", "req", "-outform", "der"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as openssl:
-            openssl.stdin.write(self.data.encode("us-ascii"))
-            openssl.stdin.close()
-            resp = openssl.stdout.read()
-            if openssl.wait() != 0:
-                raise Exception("openssl error")
-        return resp
+        return self.raw
 
     @classmethod
     def read(cls, fp):
-        self = cls()
-        self.data = fp.read()
-        return self
+        return cls(pemdec(fp.read(), {"CERTIFICATE REQUEST"}))
 
 ### Somewhat general request utilities