--- /dev/null
+import time, threading, struct
+from . import lib
+from bsddb3 import db as bd
+
+deadlock = bd.DBLockDeadlockError
+
+class environment(lib.closable):
+ def __init__(self, path, *, create=True, recover=False, mode=0o666):
+ self.env = bd.DBEnv()
+ self.env.set_lk_detect(bd.DB_LOCK_RANDOM)
+ fl = bd.DB_THREAD | bd.DB_INIT_MPOOL | bd.DB_INIT_LOCK | bd.DB_INIT_LOG | bd.DB_INIT_TXN
+ if recover:
+ fl |= bd.DB_RECOVER
+ if create:
+ fl |= bd.DB_CREATE
+ self.env.open(path, fl, mode)
+ self.lastckp = self.lastarch = time.time()
+ self.lock = threading.Lock()
+ self.dbs = {}
+
+ def close(self):
+ env = self.env
+ if env is None:
+ return
+ env.close()
+ self.env = None
+
+ def maint(self):
+ now = time.time()
+ try:
+ if now - self.lastckp > 60:
+ self.env.txn_checkpoint(1024)
+ self.lastckp = now
+ if now - self.lastarch > 3600:
+ self.env.log_archive(bd.DB_ARCH_REMOVE)
+ self.lastarch = now
+ except deadlock:
+ pass
+
+ def db(self, name, create=True, mode=0o666):
+ with self.lock:
+ if name not in self.dbs:
+ self.dbs[name] = database(self, name, create, mode)
+ return self.dbs[name]
+
+ def __del__(self):
+ self.close()
+ def __enter__(self):
+ return self
+ def __exit__(self, *excinfo):
+ self.close()
+ return False
+
+def opendb(env, fnm, dnm, typ, fl, mode):
+ ret = bd.DB(env)
+ while True:
+ try:
+ self.main.open(fnm, dnm, typ, fl, mode)
+ except deadlock:
+ continue
+ return ret
+
+class txn(object):
+ def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC):
+ self.tx = env.txn_begin(None, flags)
+ self.done = False
+
+ def commit(self):
+ self.done = True
+ self.tx.commit(0)
+
+ def abort(self):
+ self.done = True
+ self.tx.abort()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, etype, exc, tb):
+ if not self.done:
+ self.abort()
+ return False
+
+class database(object):
+ def __init__(self, env, name, create, mode):
+ self.env = env
+ self.mode = mode
+ self.fnm = name
+ fl = bd.DB_THREAD | bd.DB_AUTO_COMMIT
+ if create:
+ fl |= bd.DB_CREATE
+ self.cf = self._opendb("cf", bd.DB_HASH, fl)
+ self.ob = self._opendb("ob", bd.DB_HASH, fl)
+
+ def _opendb(self, dnm, typ, fl, init=None):
+ ret = bd.DB(self.env.env)
+ if init: init(ret)
+ while True:
+ try:
+ ret.open(self.fnm, dnm, typ, fl, self.mode)
+ except deadlock:
+ continue
+ return ret
+
+ def _nextseq(self, tx):
+ if self.cf.has_key(b"seq", txn=tx.tx):
+ seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0]
+ else:
+ seq = 1
+ self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx)
+ return seq
+
+ def add(self, ob):
+ while True:
+ try:
+ with txn(self.env.env) as tx:
+ seq = self._nextseq(tx)
+ self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE)
+ tx.commit()
+ return seq
+ except deadlock:
+ continue
+
+ def replace(self, id, ob):
+ while True:
+ try:
+ with txn(self.env.env) as tx:
+ key = struct.pack(">Q", id)
+ if not self.ob.has_key(key, txn=tx.tx):
+ raise KeyError(id)
+ self.ob.put(key, ob, txn=tx.tx)
+ tx.commit()
+ return
+ except deadlock:
+ continue
+
+ def get(self, id):
+ while True:
+ try:
+ return self.ob[struct.pack(">Q", id)]
+ except KeyError:
+ raise KeyError(id) from None
+ except deadlock:
+ continue
+
+ def remove(self, id):
+ while True:
+ try:
+ with txn(self.env.env) as tx:
+ self.ob.delete(struct.pack(">Q", id), txn=tx.tx)
+ tx.commit()
+ return
+ except deadlock:
+ continue
--- /dev/null
+import struct, contextlib
+from . import db, lib
+from .db import bd
+
+deadlock = bd.DBLockDeadlockError
+notfound = bd.DBNotFoundError
+
+class simpletype(object):
+ def __init__(self, encode, decode):
+ self.enc = encode
+ self.dec = decode
+
+ def encode(self, ob):
+ return self.enc(ob)
+ def decode(self, dat):
+ return self.dec(dat)
+ def compare(self, a, b):
+ if a < b:
+ return -1
+ elif a > b:
+ return 1
+ else:
+ return 0
+
+ @classmethod
+ def struct(cls, fmt):
+ return cls(lambda ob: struct.pack(fmt, ob),
+ lambda dat: struct.unpack(fmt, dat)[0])
+
+class maybe(object):
+ def __init__(self, bk):
+ self.bk = bk
+
+ def encode(self, ob):
+ if ob is None: return b""
+ return b"\0" + self.bk.encode(ob)
+ def decode(self, dat):
+ if dat == b"": return None
+ return self.bk.dec(dat[1:])
+ def compare(self, a, b):
+ if a is b is None:
+ return 0
+ elif a is None:
+ return -1
+ elif b is None:
+ return 1
+ else:
+ return self.bk.compare(a[1:], b[1:])
+
+t_int = simpletype.struct(">Q")
+
+class index(object):
+ def __init__(self, db, name, datatype):
+ self.db = db
+ self.nm = name
+ self.typ = datatype
+
+missing = object()
+
+class ordered(index, lib.closable):
+ def __init__(self, db, name, datatype, duplicates, create=True):
+ super().__init__(db, name, datatype)
+ self.dup = duplicates
+ fl = bd.DB_THREAD | bd.DB_AUTO_COMMIT
+ if create: fl |= bd.DB_CREATE
+ def initdb(db):
+ def compare(a, b):
+ if a == b == "": return 0
+ return self.typ.compare(self.typ.decode(a), self.typ.decode(b))
+ db.set_flags(bd.DB_DUPSORT)
+ db.set_bt_compare(compare)
+ self.bk = db._opendb("i-" + name, bd.DB_BTREE, fl, initdb)
+ self.bk.set_get_returns_none(False)
+
+ def close(self):
+ self.bk.close()
+
+ class cursor(lib.closable):
+ def __init__(self, idx, cur, item, stop):
+ self.idx = idx
+ self.cur = cur
+ self.item = item
+ self.stop = stop
+
+ def close(self):
+ if self.cur is not None:
+ self.cur.close()
+
+ def __iter__(self):
+ return self
+
+ def peek(self):
+ if self.item is None:
+ raise StopIteration()
+ rk, rv = self.item
+ rk = self.idx.typ.decode(rk)
+ rv = struct.unpack(">Q", rv)[0]
+ if self.stop(rk):
+ self.item = None
+ raise StopIteration()
+ return rk, rv
+
+ def __next__(self):
+ rk, rv = self.peek()
+ try:
+ while True:
+ try:
+ self.item = self.cur.next()
+ break
+ except deadlock:
+ continue
+ except notfound:
+ self.item = None
+ return rk, rv
+
+ def skip(self, n=1):
+ try:
+ for i in range(n):
+ next(self)
+ except StopIteration:
+ return
+
+ def get(self, *, match=missing, ge=missing, gt=missing, lt=missing, le=missing, all=False):
+ while True:
+ try:
+ cur = self.bk.cursor()
+ done = False
+ try:
+ if match is not missing:
+ try:
+ k, v = cur.set(self.typ.encode(match))
+ except notfound:
+ return self.cursor(None, None, None, None)
+ else:
+ done = True
+ return self.cursor(self, cur, (k, v), lambda o: (self.typ.compare(o, match) != 0))
+ elif all:
+ try:
+ k, v = cur.first()
+ except notfound:
+ return self.cursor(None, None, None, None)
+ else:
+ done = True
+ return self.cursor(self, cur, (k, v), lambda o: False)
+ elif ge is not missing or gt is not missing or lt is not missing or le is not missing:
+ skip = False
+ try:
+ if ge is not missing:
+ k, v = cur.set_range(self.typ.encode(ge))
+ elif gt is not missing:
+ k, v = cur.set_range(self.typ.encode(gt))
+ skip = True
+ else:
+ k, v = cur.first()
+ except notfound:
+ return self.cursor(None, None, None, None)
+ if lt is not missing:
+ stop = lambda o: self.typ.compare(o, lt) >= 0
+ elif le is not missing:
+ stop = lambda o: self.typ.compare(o, le) > 0
+ else:
+ stop = lambda o: False
+ ret = self.cursor(self, cur, (k, v), stop)
+ if skip:
+ try:
+ while self.typ.compare(ret.peek()[0], gt) == 0:
+ next(ret)
+ except StopIteration:
+ pass
+ done = True
+ return ret
+ else:
+ raise NameError("invalid get() specification")
+ finally:
+ if not done:
+ cur.close()
+ except deadlock:
+ continue
+
+ def put(self, key, id):
+ while True:
+ try:
+ with db.txn(self.db.env.env) as tx:
+ obid = struct.pack(">Q", id)
+ if not self.db.ob.has_key(obid, txn=tx.tx):
+ raise ValueError("no such object in database: " + str(id))
+ try:
+ self.bk.put(self.typ.encode(key), obid, txn=tx.tx, flags=bd.DB_NODUPDATA)
+ except bd.DBKeyExistError:
+ return False
+ tx.commit()
+ return True
+ except deadlock:
+ continue
+
+ def remove(self, key, id):
+ while True:
+ try:
+ with db.txn(self.db.env.env) as tx:
+ obid = struct.pack(">Q", id)
+ if not self.db.ob.has_key(obid, txn=tx.tx):
+ raise ValueError("no such object in database: " + str(id))
+ cur = self.bk.cursor(txn=tx.tx)
+ try:
+ try:
+ cur.get_both(self.typ.encode(key), obid)
+ except notfound:
+ return False
+ cur.delete()
+ finally:
+ cur.close()
+ tx.commit()
+ return True
+ except deadlock:
+ continue