from . import lib
from bsddb3 import db as bd
+__all__ = ["environment", "database"]
+
deadlock = bd.DBLockDeadlockError
class environment(lib.closable):
def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC):
self.tx = env.txn_begin(None, flags)
self.done = False
+ self.pcommit = set()
def commit(self):
self.done = True
self.tx.commit(0)
+ def run1(list):
+ if len(list) > 0:
+ try:
+ list[0]()
+ finally:
+ run1(list[1:])
+ run1(list(self.pcommit))
def abort(self):
self.done = True
self.abort()
return False
+ def postcommit(self, fun):
+ self.pcommit.add(fun)
+
+def dloopfun(fun):
+ def wrapper(self, *args, **kwargs):
+ while True:
+ try:
+ return fun(self, *args, **kwargs)
+ except deadlock:
+ continue
+ return wrapper
+
+def txnfun(envfun):
+ def fxf(fun):
+ def wrapper(self, *args, tx=None, **kwargs):
+ if tx is None:
+ while True:
+ try:
+ with txn(envfun(self)) as ltx:
+ ret = fun(self, *args, tx=ltx, **kwargs)
+ ltx.commit()
+ return ret
+ except deadlock:
+ continue
+ else:
+ return fun(self, *args, tx=tx, **kwargs)
+ return wrapper
+ return fxf
+
class database(object):
def __init__(self, env, name, create, mode):
self.env = env
continue
return ret
- def _nextseq(self, tx):
+ @txnfun(lambda self: self.env.env)
+ def _nextseq(self, *, tx):
if self.cf.has_key(b"seq", txn=tx.tx):
seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0]
else:
self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx)
return seq
- def add(self, ob):
- while True:
- try:
- with txn(self.env.env) as tx:
- seq = self._nextseq(tx)
- self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE)
- tx.commit()
- return seq
- except deadlock:
- continue
-
- def replace(self, id, ob):
- while True:
- try:
- with txn(self.env.env) as tx:
- key = struct.pack(">Q", id)
- if not self.ob.has_key(key, txn=tx.tx):
- raise KeyError(id)
- self.ob.put(key, ob, txn=tx.tx)
- tx.commit()
- return
- except deadlock:
- continue
+ @txnfun(lambda self: self.env.env)
+ def add(self, ob, *, tx):
+ seq = self._nextseq(tx=tx)
+ self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE)
+ return seq
- def get(self, id):
- while True:
- try:
- return self.ob[struct.pack(">Q", id)]
- except KeyError:
- raise KeyError(id) from None
- except deadlock:
- continue
+ @txnfun(lambda self: self.env.env)
+ def replace(self, id, ob, *, tx):
+ key = struct.pack(">Q", id)
+ if not self.ob.has_key(key, txn=tx.tx):
+ raise KeyError(id)
+ self.ob.put(key, ob, txn=tx.tx)
+
+ @txnfun(lambda self: self.env.env)
+ def get(self, id, *, tx):
+ ret = self.ob.get(struct.pack(">Q", id), None)
+ if ret is None:
+ raise KeyError(id)
+ return ret
- def remove(self, id):
- while True:
- try:
- with txn(self.env.env) as tx:
- self.ob.delete(struct.pack(">Q", id), txn=tx.tx)
- tx.commit()
- return
- except deadlock:
- continue
+ @txnfun(lambda self: self.env.env)
+ def remove(self, id, *, tx):
+ key = struct.pack(">Q", id)
+ if not self.ob.has_key(key, txn=tx.tx):
+ raise KeyError(id)
+ self.ob.delete(key, txn=tx.tx)