Added initial binary decoder.
authorFredrik Tolf <fredrik@dolda2000.com>
Mon, 3 Jan 2022 18:49:33 +0000 (19:49 +0100)
committerFredrik Tolf <fredrik@dolda2000.com>
Mon, 3 Jan 2022 18:49:33 +0000 (19:49 +0100)
.gitignore [new file with mode: 0644]
coe/__init__.py [new file with mode: 0644]
coe/bin.py [new file with mode: 0644]
coe/data.py [new file with mode: 0644]

diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..0d20b64
--- /dev/null
@@ -0,0 +1 @@
+*.pyc
diff --git a/coe/__init__.py b/coe/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/coe/bin.py b/coe/bin.py
new file mode 100644 (file)
index 0000000..fae69b9
--- /dev/null
@@ -0,0 +1,178 @@
+from . import data
+
+T_END = 0
+T_INT = 1
+T_STR = 2
+T_BIT = 3
+T_NIL = 4
+T_SYM = 5
+T_CON = 6
+
+INT_REF = 1
+
+STR_SYM = 1
+
+BIT_BFLOAT = 1
+BIT_DFLOAT = 2
+
+CON_LIST = 0
+CON_SET = 1
+CON_MAP = 2
+CON_OBJ = 3
+
+NIL_FALSE = 1
+NIL_TRUE = 2
+
+class fmterror(Exception):
+    pass
+
+class eoferror(fmterror):
+    def __init__(self):
+        super().__init__("unexpected end-of-data")
+
+class referror(fmterror):
+    def __init__(self):
+        super().__init__("bad backref")
+
+class namedtype(type):
+    pass
+
+class decoder(object):
+    def __init__(self):
+        self.reftab = []
+        self.namedtypes = {}
+
+    @staticmethod
+    def byte(fp):
+        b = fp.read(1)
+        if b == b"":
+            raise eoferror()
+        return b[0]
+
+    @staticmethod
+    def loadint(fp):
+        ret = 0
+        p = 0
+        while True:
+            b = decoder.byte(fp)
+            ret += (b & 0x7f) << p
+            p += 7
+            if (b & 0x80) == 0:
+                break
+        if (b & 0x40) != 0:
+            ret = ret - (1 << p)
+        return ret
+
+    @staticmethod
+    def loadstr(fp):
+        buf = bytearray()
+        while True:
+            b = decoder.byte(fp)
+            if b == 0:
+                break
+            buf.append(b)
+        return buf.decode("utf-8")
+
+    def loadsym(self, fp):
+        h = self.byte(fp)
+        if h & 0x1:
+            nsref = self.loadint(fp)
+            if not 0 <= nsref < len(self.reftab):
+                raise fmterror("illegal namespace ref: " + str(nsref))
+            nssym = self.reftab[nsref]
+            if not isinstance(nssym, data.symbol):
+                raise fmterror("illegal namespace ref: " + str(nsref))
+            ns = nssym.ns
+        else:
+            ns = self.loadstr(fp)
+        nm = self.loadstr(fp)
+        ret = data.symbol.get(ns, nm)
+        return ret
+
+    def loadlist(self, fp, buf):
+        while True:
+            tag = self.byte(fp)
+            if tag == T_END:
+                return buf
+            buf.append(self.loadtagged(fp, tag))
+
+    def loadmap(self, fp, buf):
+        while True:
+            tag = self.byte(fp)
+            if tag == T_END:
+                return buf
+            key = self.loadtagged(fp, tag)
+            tag = self.byte(fp)
+            if tag == T_END:
+                return buf
+            buf[key] = self.loadtagged(fp, tag)
+
+    def loadobj(self, fp, ref=False):
+        if ref:
+            refid = len(self.reftab)
+            self.reftab.append(None)
+        nm = self.load(fp)
+        typ = self.namedtypes.get(nm)
+        if typ is None:
+            typ = self.namedtypes[nm] = namedtype(str(nm), (data.obj, object), {})
+            typ.typename = nm
+        ret = typ()
+        if ref:
+            self.reftab[refid] = ret
+        # st = fp.tell()
+        # print(">", nm, hex(st))
+        ret.__dict__.update(self.loadmap(fp, {}))
+        # print("<", nm, hex(fp.tell()), hex(st))
+        return ret
+
+    def addref(self, obj):
+        self.reftab.append(obj)
+        return obj
+
+    def loadtagged(self, fp, tag):
+        pri, sec = (tag & 0x7), (tag & 0xf8) >> 3
+        if pri == T_END:
+            raise fmterror("unexpected end-tag")
+        elif pri == T_INT:
+            if sec == INT_REF:
+                idx = self.loadint(fp)
+                if not 0 <= idx < len(self.reftab):
+                    raise referror()
+                # print(idx, self.reftab[idx], hex(fp.tell()))
+                return self.reftab[idx]
+            return self.addref(self.loadint(fp))
+        elif pri == T_STR:
+            ret = self.addref(self.loadstr(fp))
+            if sec == STR_SYM:
+                return data.symbol.get("", ret)
+            return ret
+        elif pri == T_BIT:
+            ln = self.loadint(fp)
+            ret = self.addref(fp.read(ln))
+            if len(ret) < ln:
+                raise eoferror()
+            return ret
+        elif pri == T_NIL:
+            if sec == NIL_TRUE:
+                return self.addref(True)
+            elif sec == NIL_FALSE:
+                return self.addref(False)
+            return self.addref(None)
+        elif pri == T_SYM:
+            return self.addref(self.loadsym(fp))
+        elif pri == T_CON:
+            if sec == CON_MAP:
+                return self.loadmap(fp, self.addref({}))
+            elif sec == CON_OBJ:
+                return self.loadobj(fp, ref=True)
+            else:
+                return self.loadlist(fp, self.addref([]))
+        else:
+            raise fmterror("unknown primary: " + str(pri))
+
+    def load(self, fp):
+        tag = self.byte(fp)
+        return self.loadtagged(fp, tag)
+
+def load(fp):
+    decoder().load(fp)
diff --git a/coe/data.py b/coe/data.py
new file mode 100644 (file)
index 0000000..05e129b
--- /dev/null
@@ -0,0 +1,73 @@
+class symbol(object):
+    isafe = set("abcdefghijklmnopqrstuvwxyz" +
+                "ABCDEFGHIJKLMNOPQRSTUVWXYZ" +
+                "$%^&*_=/.")
+    qsafe = isafe | set("0123456789-+")
+
+    def __eq__(s, o):
+        if not isinstance(o, symbol):
+            return False
+        return (s.name == o.name) and (s.ns == o.ns)
+
+    def __hash__(self):
+        return (hash(self.ns) * 31) + hash(self.name)
+
+    def __lt__(s, o):
+        ns, nm = ("", o) if isinstance(o, str) else (s.ns, s.name)
+        return s.ns < ns or s.name < nm
+    def __gt__(s, o):
+        ns, nm = ("", o) if isinstance(o, str) else (s.ns, s.name)
+        return s.ns > ns or s.name > nm
+
+    @staticmethod
+    def quote(st):
+        ret = ""
+        q = False
+        cset = symbol.isafe
+        for c in st:
+            if c not in cset:
+                q = True
+                ret += "\\" + c
+            else:
+                ret += c
+            cset = symbol.qsafe
+        if q:
+            ret = "|" + ret + "|"
+        return ret
+
+    @staticmethod
+    def get(ns, name):
+        if ns == "":
+            return usymbol(name)
+        return nssymbol(ns, name)
+
+    def __repr__(self):
+        if self.ns == "":
+            return self.quote(self.name)
+        return "%s:%s" % (self.quote(self.ns), self.quote(self.name))
+
+class usymbol(symbol):
+    ns = ""
+    def __init__(self, name):
+        self.name = name
+
+    def __hash__(self):
+        return hash(self.name)
+    def __eq__(s, o):
+        if isinstance(o, str):
+            return s.name == o
+        return super().__eq__(o)
+
+class nssymbol(symbol):
+    def __init__(self, ns, name, *args, **kw):
+        super().__init__(*args, **kw)
+        self.name = name
+        self.ns = ns
+
+class obj(object):
+    def __getitem__(self, key):
+        return self.__dict__[key]
+    def __setitem__(self, key, val):
+        self.__dict__[key] = val
+    def __delitem__(self, key):
+        del self.__dict__[key]