def __init__(self, *parts):
self.parts = parts
+ small = object()
+ large = object()
+ def minim(self, *parts):
+ return parts + tuple([self.small] * (len(self.parts) - len(parts)))
+ def maxim(self, *parts):
+ return parts + tuple([self.large] * (len(self.parts) - len(parts)))
+
def encode(self, obs):
if len(obs) != len(self.parts):
raise ValueError("invalid length of compound data: " + str(len(obs)) + ", rather than " + len(self.parts))
buf = bytearray()
for ob, part in zip(obs, self.parts):
- dat = part.encode(ob)
- if len(dat) < 128:
- buf.append(0x80 | len(dat))
- buf.extend(dat)
+ if ob is self.small:
+ buf.append(0x01)
+ elif ob is self.large:
+ buf.append(0x02)
else:
- buf.extend(struct.pack(">i", len(dat)))
- buf.extend(dat)
+ dat = part.encode(ob)
+ if len(dat) < 128:
+ buf.append(0x80 | len(dat))
+ buf.extend(dat)
+ else:
+ buf.extend(struct.pack(">BI", 0, len(dat)))
+ buf.extend(dat)
return bytes(buf)
def decode(self, dat):
ret = []
off = 0
for part in self.parts:
- if dat[off] & 0x80:
- ln = dat[off] & 0x7f
- off += 1
+ fl = dat[off]
+ off += 1
+ if fl & 0x80:
+ ln = fl & 0x7f
+ elif fl == 0x01:
+ ret.append(self.small)
+ continue
+ elif fl == 0x02:
+ ret.append(self.large)
+ continue
else:
- ln = struct.unpack(">i", dat[off:off + 4])[0]
+ ln = struct.unpack(">I", dat[off:off + 4])[0]
off += 4
- ret.append(part.decode(dat[off:off + len]))
- off += len
+ ret.append(part.decode(dat[off:off + ln]))
+ off += ln
return tuple(ret)
def compare(self, al, bl):
if (len(al) != len(self.parts)) or (len(bl) != len(self.parts)):
raise ValueError("invalid length of compound data: " + str(len(al)) + ", " + str(len(bl)) + ", rather than " + len(self.parts))
for a, b, part in zip(al, bl, self.parts):
+ if a in (self.small, self.large) or b in (self.small, self.large):
+ if a is b:
+ return 0
+ if a is self.small:
+ return -1
+ elif b is self.small:
+ return 1
+ elif a is self.large:
+ return 1
+ elif b is self.large:
+ return -1
c = part.compare(a, b)
if c != 0:
return c
self.parts = parts
self.iattr = "__idx_%s_cur" % name
+ def minim(self, *parts):
+ return self.typ.minim(*parts)
+ def maxim(self, *parts):
+ return self.typ.maxim(*parts)
+
+ def get(self, *, partial=None, **spec):
+ if partial is not None:
+ return super().get(ge=self.minim(*partial), le = self.maxim(*partial), **spec)
+ else:
+ return super().get(**spec)
+
def register(self, id, obj, tx):
val = tuple(part.__get__(obj, None) for part in self.parts)
self.index().put(val, id, tx=tx)