Commit | Line | Data |
---|---|---|
a95055e8 FT |
1 | import time, threading, struct |
2 | from . import lib | |
3 | from bsddb3 import db as bd | |
4 | ||
5 | deadlock = bd.DBLockDeadlockError | |
6 | ||
7 | class environment(lib.closable): | |
8 | def __init__(self, path, *, create=True, recover=False, mode=0o666): | |
9 | self.env = bd.DBEnv() | |
10 | self.env.set_lk_detect(bd.DB_LOCK_RANDOM) | |
11 | fl = bd.DB_THREAD | bd.DB_INIT_MPOOL | bd.DB_INIT_LOCK | bd.DB_INIT_LOG | bd.DB_INIT_TXN | |
12 | if recover: | |
13 | fl |= bd.DB_RECOVER | |
14 | if create: | |
15 | fl |= bd.DB_CREATE | |
16 | self.env.open(path, fl, mode) | |
17 | self.lastckp = self.lastarch = time.time() | |
18 | self.lock = threading.Lock() | |
19 | self.dbs = {} | |
20 | ||
21 | def close(self): | |
22 | env = self.env | |
23 | if env is None: | |
24 | return | |
25 | env.close() | |
26 | self.env = None | |
27 | ||
28 | def maint(self): | |
29 | now = time.time() | |
30 | try: | |
31 | if now - self.lastckp > 60: | |
32 | self.env.txn_checkpoint(1024) | |
33 | self.lastckp = now | |
34 | if now - self.lastarch > 3600: | |
35 | self.env.log_archive(bd.DB_ARCH_REMOVE) | |
36 | self.lastarch = now | |
37 | except deadlock: | |
38 | pass | |
39 | ||
40 | def db(self, name, create=True, mode=0o666): | |
41 | with self.lock: | |
42 | if name not in self.dbs: | |
43 | self.dbs[name] = database(self, name, create, mode) | |
44 | return self.dbs[name] | |
45 | ||
46 | def __del__(self): | |
47 | self.close() | |
48 | def __enter__(self): | |
49 | return self | |
50 | def __exit__(self, *excinfo): | |
51 | self.close() | |
52 | return False | |
53 | ||
54 | def opendb(env, fnm, dnm, typ, fl, mode): | |
55 | ret = bd.DB(env) | |
56 | while True: | |
57 | try: | |
58 | self.main.open(fnm, dnm, typ, fl, mode) | |
59 | except deadlock: | |
60 | continue | |
61 | return ret | |
62 | ||
63 | class txn(object): | |
64 | def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC): | |
65 | self.tx = env.txn_begin(None, flags) | |
66 | self.done = False | |
da5de014 | 67 | self.pcommit = set() |
a95055e8 FT |
68 | |
69 | def commit(self): | |
70 | self.done = True | |
71 | self.tx.commit(0) | |
da5de014 FT |
72 | def run1(list): |
73 | if len(list) > 0: | |
74 | try: | |
75 | list[0]() | |
76 | finally: | |
77 | run1(list[1:]) | |
78 | run1(list(self.pcommit)) | |
a95055e8 FT |
79 | |
80 | def abort(self): | |
81 | self.done = True | |
82 | self.tx.abort() | |
83 | ||
84 | def __enter__(self): | |
85 | return self | |
86 | ||
87 | def __exit__(self, etype, exc, tb): | |
88 | if not self.done: | |
89 | self.abort() | |
90 | return False | |
91 | ||
da5de014 FT |
92 | def postcommit(self, fun): |
93 | self.pcommit.add(fun) | |
94 | ||
8950191c FT |
95 | def txnfun(envfun): |
96 | def fxf(fun): | |
97 | def wrapper(self, *args, tx=None, **kwargs): | |
98 | if tx is None: | |
99 | while True: | |
100 | try: | |
101 | with txn(envfun(self)) as ltx: | |
102 | ret = fun(self, *args, tx=ltx, **kwargs) | |
103 | ltx.commit() | |
104 | return ret | |
105 | except deadlock: | |
106 | continue | |
107 | else: | |
108 | return fun(self, *args, tx=tx, **kwargs) | |
109 | return wrapper | |
110 | return fxf | |
111 | ||
a95055e8 FT |
112 | class database(object): |
113 | def __init__(self, env, name, create, mode): | |
114 | self.env = env | |
115 | self.mode = mode | |
116 | self.fnm = name | |
117 | fl = bd.DB_THREAD | bd.DB_AUTO_COMMIT | |
118 | if create: | |
119 | fl |= bd.DB_CREATE | |
120 | self.cf = self._opendb("cf", bd.DB_HASH, fl) | |
121 | self.ob = self._opendb("ob", bd.DB_HASH, fl) | |
122 | ||
123 | def _opendb(self, dnm, typ, fl, init=None): | |
124 | ret = bd.DB(self.env.env) | |
125 | if init: init(ret) | |
126 | while True: | |
127 | try: | |
128 | ret.open(self.fnm, dnm, typ, fl, self.mode) | |
129 | except deadlock: | |
130 | continue | |
131 | return ret | |
132 | ||
8950191c FT |
133 | @txnfun(lambda self: self.env.env) |
134 | def _nextseq(self, *, tx): | |
a95055e8 FT |
135 | if self.cf.has_key(b"seq", txn=tx.tx): |
136 | seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0] | |
137 | else: | |
138 | seq = 1 | |
139 | self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx) | |
140 | return seq | |
141 | ||
8950191c FT |
142 | @txnfun(lambda self: self.env.env) |
143 | def add(self, ob, *, tx): | |
144 | seq = self._nextseq(tx=tx) | |
145 | self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE) | |
146 | return seq | |
a95055e8 | 147 | |
8950191c FT |
148 | @txnfun(lambda self: self.env.env) |
149 | def replace(self, id, ob, *, tx): | |
150 | key = struct.pack(">Q", id) | |
151 | if not self.ob.has_key(key, txn=tx.tx): | |
152 | raise KeyError(id) | |
153 | self.ob.put(key, ob, txn=tx.tx) | |
154 | ||
155 | @txnfun(lambda self: self.env.env) | |
156 | def get(self, id, *, tx): | |
157 | ret = self.ob.get(struct.pack(">Q", id), None) | |
158 | if ret is None: | |
159 | raise KeyError(id) | |
160 | return ret | |
a95055e8 | 161 | |
8950191c FT |
162 | @txnfun(lambda self: self.env.env) |
163 | def remove(self, id, *, tx): | |
164 | key = struct.pack(">Q", id) | |
165 | if not self.ob.has_key(key, txn=tx.tx): | |
166 | raise KeyError(id) | |
167 | self.ob.delete(key, txn=tx.tx) |