Initial import
[ldd.git] / ldd / rec.py
diff --git a/ldd/rec.py b/ldd/rec.py
new file mode 100644 (file)
index 0000000..4145343
--- /dev/null
@@ -0,0 +1,342 @@
+#    ldd - DNS implementation in Python
+#    Copyright (C) 2006 Fredrik Tolf <fredrik@dolda2000.com>
+#
+#    This program is free software; you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation; either version 2 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program; if not, write to the Free Software
+#    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+
+import socket
+import struct
+
+import proto
+import dn
+
+rtypes = []
+
+def addrtype(id, name, syntax):
+    rtypes.append((id, name, syntax))
+
+def rtypebyid(id):
+    for rtype in rtypes:
+        if rtype[0] == id:
+            return rtype
+    return None
+
+def rtypebyname(name):
+    for rtype in rtypes:
+        if rtype[1] == name.upper():
+            return rtype[0]
+    return None
+
+class error(Exception):
+    def __init__(self, text):
+        self.text = text
+
+    def __str__(self):
+        return self.text
+
+class malformedrr(Exception):
+    def __init__(self, text):
+        self.text = text
+
+    def __str__(self):
+        return self.text
+
+class rrhead:
+    def __init__(self, name = None, rtype = None, rclass = None):
+        if rclass is None: rclass = CLASSIN
+        if type(name) == str:
+            self.name = dn.fromstring(name)
+        else:
+            self.name = name
+        if type(rtype) == str:
+            self.rtype = rtypebyname(rtype)
+            if self.rtype is None:
+                raise error("no such rtype " + rtype)
+        else:
+            self.rtype = rtype
+        self.rclass = rclass
+
+    def encode(self, names, offset):
+        ret, names = proto.encodename(self.name, names, offset)
+        ret += struct.pack(">HH", self.rtype, self.rclass)
+        return ret, names
+
+    def __eq__(self, other):
+        return self.name == other.name and self.rtype == other.rtype
+
+    def __str__(self):
+        rtype = rtypebyid(self.rtype)
+        if rtype is None:
+            return "%02x RRhead %s" % (self.rtype, self.name)
+        else:
+            return "%s RRhead %s" % (rtype[1], self.name)
+
+    def istype(self, rtype):
+        if type(rtype) == str:
+            rtype = rtypebyname(rtype)
+        return self.rtype == rtype
+        
+    def decode(self, packet, offset):
+        name, offset = proto.decodename(packet, offset)
+        rtype, rclass = struct.unpack(">HH", packet[offset:offset + struct.calcsize(">HH")])
+        offset += struct.calcsize(">HH")
+        ret = rrhead(name, rtype, rclass)
+        return ret, offset
+    decode = classmethod(decode)
+
+class rrdata:
+    def __init__(self, rtype, *args):
+        if type(rtype) == tuple and type(args[0]) == dict:
+            self.rtype = rtype
+            self.rdata = args[0]
+            return
+        
+        if type(rtype) == str:
+            self.rtype = rtypebyname(rtype)
+            if self.rtype is None:
+                raise error("no such rtype " + rtype)
+        else:
+            self.rtype = rtype
+        rtid = self.rtype
+        self.rtype = rtypebyid(rtid)
+        if self.rtype is None:
+            raise error("no such rtype " + rtid)
+        self.rdata = {}
+        for i, e in enumerate(self.rtype[2]):
+            d = self.convdata(e[0], args[i])
+            self.rdata[e[1]] = d
+
+    def __eq__(self, other):
+        return(self.rdata == other.rdata)
+
+    def __str__(self):
+        ret = "{"
+        first = True
+        for e in self.rtype[2]:
+            if not first:
+                ret += ", "
+            first = False
+            ret += e[1] + ": "
+            d = self.rdata[e[1]]
+            if e[0] == "4":
+                ret += socket.inet_ntop(socket.AF_INET, d)
+            elif e[0] == "6":
+                ret += socket.inet_ntop(socket.AF_INET6, d)
+            elif e[0] == "s":
+                ret += '"' + d + '"'
+            else:
+                ret += str(d)
+        ret += "}"
+        return ret
+
+    def istype(self, rtype):
+        if type(rtype) == str:
+            rtype = rtypebyname(rtype)
+        return self.rtype[0] == rtype
+        
+    def convdata(self, dtype, data):
+        if dtype == "4":
+            if type(data) != str:
+                raise error("IPv4 address must be a string")
+            if len(data) == 4:
+                d = data
+            else:
+                d = socket.inet_pton(socket.AF_INET, data)
+        if dtype == "6":
+            if type(data) != str:
+                raise error("IPv6 address must be a string")
+            if len(data) == 16 and data.find(":") == -1:
+                d = data
+            else:
+                d = socket.inet_pton(socket.AF_INET6, data)
+        if dtype == "d":
+            if type(data) == str:
+                d = dn.fromstring(data)
+            elif isinstance(data, dn.domainname):
+                d = data
+            else:
+                raise error("Domain name must be either proper or string")
+        if dtype == "s":
+            d = str(data)
+        if dtype == "i":
+            d = int(data)
+        return d
+    
+    def __iter__(self):
+        return iter(self.rdata)
+    
+    def __getitem__(self, i):
+        return self.rdata[i]
+
+    def __setitem__(self, i, v):
+        for e in self.rtype[2]:
+            if e[1] == i:
+                break
+        else:
+            raise error("No such data for " + self.rtype[1] + " record: " + str(i))
+        self.rdata[i] = self.convdata(e[0], v)
+
+    def encode(self, names, offset):
+        ret = ""
+        for e in self.rtype[2]:
+            d = self.rdata[e[1]]
+            if e[2] == "strc":
+                ret += d
+                offset += len(d)
+            if e[2] == "cmdn":
+                buf, names = proto.encodename(d, names, offset)
+                ret += buf
+                offset += len(buf)
+            if e[2] == "lstr":
+                ret += chr(len(d)) + d
+                offset += 1 + len(d)
+            if e[2] == "llstr":
+                ret += struct.pack(">H", len(d)) + d
+                offset += struct.calcsize(">H") + len(d)
+            if e[2] == "short":
+                ret += struct.pack(">H", d)
+                offset += struct.calcsize(">H")
+            if e[2] == "long":
+                ret += struct.pack(">L", d)
+                offset += struct.calcsize(">L")
+            if e[2] == "int6":
+                ret += struct.pack(">Q", d)[-6:]
+                offset += 6
+        return ret, names
+
+    def decode(self, rtid, packet, offset, dlen):
+        rtype = rtypebyid(rtid)
+        origoff = offset
+        rdata = {}
+        if rtype is None:
+            rtype = (rtid, "Unknown", [("s", "unknown", "strc", dlen)])
+        for e in rtype[2]:
+            if e[2] == "strc":
+                d = packet[offset:offset + e[3]]
+                offset += e[3]
+            if e[2] == "cmdn":
+                d, offset = proto.decodename(packet, offset)
+            if e[2] == "lstr":
+                dl = ord(packet[offset])
+                offset += 1
+                d = packet[offset:offset + dl]
+                offset += dl
+            if e[2] == "llstr":
+                (dl,) = struct.unpack(">H", packet[offset:offset + struct.calcsize(">H")])
+                offset += struct.calcsize(">H")
+                d = packet[offset:offset + dl]
+                offset += dl
+            if e[2] == "short":
+                (d,) = struct.unpack(">H", packet[offset:offset + struct.calcsize(">H")])
+                offset += struct.calcsize(">H")
+            if e[2] == "long":
+                (d,) = struct.unpack(">L", packet[offset:offset + struct.calcsize(">L")])
+                offset += struct.calcsize(">L")
+            if e[2] == "int6":
+                (d,) = struct.unpack(">Q", ("\0" * (struct.calcsize(">Q") - 6)) + packet[offset:offset + 6])
+                offset += 6
+            rdata[e[1]] = d
+        if origoff + dlen != offset:
+            raise malformedrr(rtype[1] + " RR data length mismatch")
+        return rrdata(rtype, rdata)
+    decode = classmethod(decode)
+
+class rr:
+    def __init__(self, head, ttl, data):
+        if type(head) == tuple:
+            self.head = rrhead(*head)
+        else:
+            self.head = head
+        self.ttl = ttl
+        self.data = data
+        self.flags = set()
+
+    def setflags(self, flags):
+        self.flags |= set(flags)
+
+    def clrflags(self, flags):
+        self.flags -= set(flags)
+    
+    def encode(self, names, offset):
+        ret, names = self.head.encode(names, offset)
+        if self.data is None:
+            data = ""
+        else:
+            data, names = self.data.encode(names, offset + len(ret) + struct.calcsize(">LH"))
+        ret += struct.pack(">LH", self.ttl, len(data))
+        ret += data
+        return ret, names
+
+    def __eq__(self, other):
+        return self.head == other.head and self.ttl == other.ttl and self.data == other.data
+
+    def __str__(self):
+        rtype = rtypebyid(self.head.rtype)
+        if rtype is None:
+            ret = "%02x" % self.head.rtype
+        else:
+            ret = rtype[1]
+        ret += " RR %s, TTL=%i: %s" % (self.head.name, self.ttl, self.data)
+        if len(self.flags) > 0:
+            ret += " (Flags:"
+            for f in self.flags:
+                ret += " " + f
+            ret += ")"
+        return ret
+    
+    def decode(self, packet, offset):
+        head, offset = rrhead.decode(packet, offset)
+        ttl, dlen = struct.unpack(">LH", packet[offset:offset + struct.calcsize(">LH")])
+        offset += struct.calcsize(">LH")
+        if dlen == 0:
+            data = None
+        else:
+            data = rrdata.decode(head.rtype, packet, offset, dlen)
+            offset += dlen
+        return rr(head, ttl, data), offset
+    decode = classmethod(decode)
+
+addrtype(0x01, "A", [("4", "address", "strc", 4)])
+addrtype(0x02, "NS", [("d", "nsname", "cmdn")])
+addrtype(0x05, "CNAME", [("d", "priname", "cmdn")])
+addrtype(0x06, "SOA", [("d", "priserv", "cmdn"),
+                       ("d", "mailbox", "cmdn"),
+                       ("i", "serial", "long"),
+                       ("i", "refresh", "long"),
+                       ("i", "retry", "long"),
+                       ("i", "expire", "long"),
+                       ("i", "minttl", "long")])
+addrtype(0x0c, "PTR", [("d", "target", "cmdn")])
+addrtype(0x0f, "MX", [("i", "prio", "short"),
+                      ("d", "target", "cmdn")])
+addrtype(0x10, "TXT", [("s", "rrtext", "lstr")])
+addrtype(0x1c, "AAAA", [("6", "address", "strc", 16)])
+addrtype(0x21, "SRV", [("i", "prio", "short"),
+                       ("i", "weight", "short"),
+                       ("i", "port", "short"),
+                       ("d", "target", "cmdn")])
+addrtype(0xfa, "TSIG", [("d", "algo", "cmdn"),
+                        ("i", "stime", "int6"),
+                        ("i", "fudge", "short"),
+                        ("s", "mac", "llstr"),
+                        ("i", "orgid", "short"),
+                        ("i", "err", "short"),
+                        ("s", "other", "llstr")])
+
+CLASSIN = 1
+CLASSCS = 2
+CLASSCH = 3
+CLASSHS = 4
+CLASSNONE = 254
+CLASSANY = 255