From b080a59cbbe6b293d754bfa97c386eeede6b51ee Mon Sep 17 00:00:00 2001 From: Fredrik Tolf Date: Fri, 20 Mar 2015 05:21:29 +0100 Subject: [PATCH] Added live-object store and simple indexing. --- didex/cache.py | 153 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ didex/store.py | 90 +++++++++++++++++++++++++++++++++ didex/values.py | 102 +++++++++++++++++++++++++++++++++++++ 3 files changed, 345 insertions(+) create mode 100644 didex/cache.py create mode 100644 didex/store.py create mode 100644 didex/values.py diff --git a/didex/cache.py b/didex/cache.py new file mode 100644 index 0000000..76742aa --- /dev/null +++ b/didex/cache.py @@ -0,0 +1,153 @@ +import threading, weakref + +class entry(object): + __slots__ = ["p", "n", "id", "obj", "st", "lk"] + def __init__(self, id, c): + self.id = id + self.obj = None + self.st = None + self.lk = None + self.n = c.mru + self.p = None + if c.mru is not None: + c.mru.p = self + c.mru = self + else: + c.mru = c.lru = self + c.n += 1 + + def relink(self, c): + if c.mru is self: + return + if self.n is not None: + self.n.p = self.p + self.p.n = self.n + if c.lru is self: + c.lru = self.p + self.p = None + self.n = c.mru + c.mru.p = self + + def remove(self, c): + if self.n is not None: + self.n.p = self.p + if self.p is not None: + self.p.n = self.n + if c.mru is self: + c.mru = self.n + if c.lru is self: + c.lru = self.p + c.n -= 1 + +class cache(object): + def __init__(self, *, keep=1000): + self.keep = keep + self.cur = {} + self.mru = self.lru = None + self.n = 0 + self.lk = threading.Lock() + + def _trim(self, n): + ent = self.lru + for i in range(self.n - n): + if ent.st == "l": + ent.obj = weakref.ref(ent.obj) + ent.st = "w" + elif ent.st == "w" and ent.obj() is None: + del self.cur[ent.id] + ent.remove(self) + ent.st = "r" + ent = ent.p + + def get(self, id, load=True): + while True: + with self.lk: + ent = self.cur.get(id) + if ent is None: + if not load: + raise KeyError(id) + self.cur[id] = ent = entry(id, self) + ent.lk = lk = threading.Lock() + ent.st = "ld" + st = None + self._trim(self.keep) + elif ent.st == "l": + ent.relink(self) + return ent.obj + elif ent.st == "w": + ret = ent.obj() + if ret is None: + del self.cur[id] + ent.remove(self) + ent.st = "r" + continue + return ret + elif ent.st == "ld": + lk = ent.lk + st = "ld" + if lk is None: + continue + elif ent.st == "r": + continue + with lk: + if st is None: + try: + ret = ent.obj = self.load(id) + ent.st = "l" + return ret + except: + with self.lk: + del self.cur[id] + ent.remove(self) + ent.st = "r" + raise + finally: + ent.lk = None + elif st == "ld": + continue + + def put(self, id, ob): + while True: + with self.lk: + ent = self.cur.get(id) + if ent is None: + self.cur[id] = ent = entry(id, self) + ent.obj = ob + ent.st = "l" + self._trim(self.keep) + return + elif ent.st == "l": + ent.obj = ob + return + elif ent.st == "w": + ent.obj = ob + return + elif ent.st == "r": + continue + elif ent.st == "ld": + lk = ent.lk + if lk is None: + continue + with lk: + continue + + def remove(self, id): + while True: + with self.lk: + ent = self.cur.get(id) + if ent is None: + return + elif ent.st == "ld": + lk = ent.lk + if lk is None: + continue + else: + del self.cur[id] + ent.remove(self) + ent.st = "r" + return + with lk: + continue + + def load(self, id): + raise KeyError() diff --git a/didex/store.py b/didex/store.py new file mode 100644 index 0000000..15d2eca --- /dev/null +++ b/didex/store.py @@ -0,0 +1,90 @@ +import threading, pickle +from . import db, index, cache +from .db import txnfun + +class environment(object): + def __init__(self, path): + self.path = path + self.lk = threading.Lock() + self.bk = None + + def __call__(self): + with self.lk: + if self.bk is None: + self.bk = db.environment(self.path) + return self.bk + + def close(self): + with self.lk: + if self.bk is not None: + self.bk.close() + self.bk = None + +class storedesc(object): + pass + +def storedescs(obj): + t = type(obj) + ret = getattr(t, "__didex_attr", None) + if ret is None: + ret = [] + for nm, val in t.__dict__.items(): + if isinstance(val, storedesc): + ret.append((nm, val)) + t.__didex_attr = ret + return ret + +class store(object): + def __init__(self, name, *, env=None, path=".", ncache=None): + self.name = name + self.lk = threading.Lock() + if env: + self.env = env + else: + self.env = environment(path) + self._db = None + if ncache is None: + ncache = cache.cache() + self.cache = ncache + self.cache.load = self._load + + def db(self): + with self.lk: + if self._db is None: + self._db = self.env().db(self.name) + return self._db + + def _load(self, id): + try: + return pickle.loads(self.db().get(id)) + except: + raise KeyError(id, "could not unpickle data") + + def _encode(self, obj): + return pickle.dumps(obj) + + def get(self, id, *, load=True): + return self.cache.get(id, load=load) + + @txnfun(lambda self: self.db().env.env) + def register(self, obj, *, tx): + id = self.db().add(self._encode(obj), tx=tx) + for nm, attr in storedescs(obj): + attr.register(id, obj, tx) + self.cache.put(id, obj) + return id + + @txnfun(lambda self: self.db().env.env) + def unregister(self, id, *, tx): + obj = self.get(id) + for nm, attr in storedescs(obj): + attr.unregister(id, obj, tx) + self.db().remove(id, tx=tx) + self.cache.remove(id) + + @txnfun(lambda self: self.db().env.env) + def update(self, id, *, tx): + obj = self.get(id, load=False) + for nm, attr, in storedescs(obj): + attr.update(id, obj, tx) + self.db().replace(id, self._encode(obj), tx=tx) diff --git a/didex/values.py b/didex/values.py new file mode 100644 index 0000000..0e7bddc --- /dev/null +++ b/didex/values.py @@ -0,0 +1,102 @@ +import threading +from . import store, lib +from .store import storedesc + +class cursor(lib.closable): + def __init__(self, bk, st): + self.bk = bk + self.st = st + + def close(self): + self.bk.close() + + def __iter__(self): + return self + + def __next__(self): + k, id = next(self.bk) + return k, self.st.get(id) + + def skip(self, n=1): + self.bk.skip(n) + +class base(storedesc): + def __init__(self, store, indextype, name, datatype, default): + self.store = store + self.indextype = indextype + self.name = name + self.typ = datatype + self.default = default + self.idx = None + self.lk = threading.Lock() + self.mattr = "__idx_%s_new" % name + self.iattr = "__idx_%s_cur" % name + + def index(self): + with self.lk: + if self.idx is None: + self.idx = self.indextype(self.store.db(), self.name, self.typ) + return self.idx + + def __get__(self, obj, cls): + if obj is None: return self + return getattr(obj, self.mattr, self.default) + + def __set__(self, obj, val): + setattr(obj, self.mattr, val) + + def __delete__(self, obj): + delattr(obj, self.mattr) + + def get(self, **kwargs): + return cursor(self.index().get(**kwargs), self.store) + +class simple(base): + def __init__(self, store, indextype, name, datatype, default=None): + super().__init__(store, indextype, name, datatype, default) + + def register(self, id, obj, tx): + val = self.__get__(obj, None) + self.index().put(val, id, tx=tx) + tx.postcommit(lambda: setattr(obj, self.iattr, val)) + + def unregister(self, id, obj, tx): + self.index().remove(getattr(obj, self.iattr), id, tx=tx) + tx.postcommit(lambda: delattr(obj, self.iattr)) + + def update(self, id, obj, tx): + val = self.__get__(obj, None) + ival = getattr(obj, self.iattr) + if val != ival: + idx = self.index() + idx.remove(ival, id, tx=tx) + idx.put(val, id, tx=tx) + tx.postcommit(lambda: setattr(obj, self.iattr, val)) + +class multi(base): + def __init__(self, store, indextype, name, datatype): + super().__init__(store, indextype, name, datatype, ()) + + def register(self, id, obj, tx): + vals = frozenset(self.__get__(obj, None)) + idx = self.index() + for val in vals: + idx.put(val, id, tx=tx) + tx.postcommit(lambda: setattr(obj, self.iattr, vals)) + + def unregister(self, id, obj, tx): + idx = self.index() + for val in getattr(obj, self.iattr): + idx.remove(val, id, tx=tx) + tx.postcommit(lambda: delattr(obj, self.iattr)) + + def update(self, id, obj, tx): + vals = frozenset(self.__get__(obj, None)) + ivals = getattr(obj, self.iattr) + if vals != ivals: + idx = self.index() + for val in ivals - vals: + idx.remove(val, id, tx=tx) + for val in vals - ivals: + idx.put(val, id, tx=tx) + tx.postcommit(lambda: setattr(obj, self.iattr, vals)) -- 2.11.0