1 import sys, os, errno, threading, select, traceback
6 def __init__(self, check=None):
8 self.lock = threading.RLock()
12 self.loopcheck = set()
14 self.loopcheck.add(check)
19 return ((select.EPOLLIN if ch.readable else 0) |
20 (select.EPOLLOUT if ch.writable else 0))
23 if self.registered and self.th is None:
24 th = threading.Thread(target=self._run, name="Async epoll thread")
25 th.daemon = self._daemon
29 def exception(self, ch, *exc):
31 if self.exc_handler is None:
32 traceback.print_exception(*exc)
34 self.exc_handler(ch, *exc)
36 def _cb(self, ch, nm):
38 m = getattr(ch, nm, None)
40 raise AttributeError("%r has no %s method" % (ch, nm))
42 except Exception as exc:
43 self.exception(ch, *sys.exc_info())
47 while self.registered:
48 fd, (ch, evs) = next(iter(self.registered.items()))
49 del self.registered[fd]
50 self.ep.unregister(fd)
57 for fd, (ob, evs) in self.registered.items():
61 while self.registered:
62 for ck in self.loopcheck:
69 except IOError as exc:
70 if exc.errno == errno.EINTR:
73 for fd, evs in evlist:
75 if fd not in self.registered:
77 ch, cevs = self.registered[fd]
78 if fd in self.registered and evs & (select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR):
80 if fd in self.registered and evs & select.EPOLLOUT:
82 if fd in self.registered:
83 nevs = self._evsfor(ch)
85 del self.registered[fd]
89 self.registered[fd] = ch, nevs
100 def daemon(self): return self._daemon
102 def daemon(self, value):
103 self._daemon = bool(value)
105 if self.th is not None:
106 self.th = daemon = self._daemon
111 if fd in self.registered:
112 raise KeyError("fd %i is already registered" % fd)
113 evs = self._evsfor(ch)
118 self.registered[fd] = (ch, evs)
120 self.ep.register(fd, evs)
123 def remove(self, ch, ignore=False):
126 if fd not in self.registered:
129 raise KeyError("fd %i is not registered" % fd)
130 pch, cevs = self.registered[fd]
132 raise ValueError("fd %i registered via object %r, cannot remove with %r" % (pch, ch))
133 del self.registered[fd]
135 self.ep.unregister(fd)
138 def update(self, ch, ignore=False):
141 if fd not in self.registered:
144 raise KeyError("fd %i is not registered" % fd)
145 pch, cevs = self.registered[fd]
147 raise ValueError("fd %i registered via object %r, cannot update with %r" % (pch, ch))
148 evs = self._evsfor(ch)
150 del self.registered[fd]
152 self.ep.unregister(fd)
155 self.registered[fd] = ch, evs
157 self.ep.modify(fd, evs)
160 if threading.current_thread() == self.th:
173 class channel(object):
181 raise NotImplementedError("fileno()")
186 class sockbuffer(channel):
187 def __init__(self, socket, **kwargs):
188 super().__init__(**kwargs)
191 self.obuf = bytearray()
194 return self.sk.fileno()
199 def gotdata(self, data):
203 def send(self, data, eof=False):
204 self.obuf.extend(data)
207 if self.watcher is not None:
208 self.watcher.update(self, True)
215 data = self.sk.recv(1024)
223 return bool(self.obuf);
226 ret = self.sk.send(self.obuf)
227 self.obuf[:ret] = b""
232 class callbuffer(channel):
233 def __init__(self, **kwargs):
234 super().__init__(**kwargs)
236 self.rp, self.wp = os.pipe()
237 self.lock = threading.Lock()
260 data = os.read(self.rp, 1024)
265 cbs = list(self.queue)
275 raise Exception("stopped")
276 self.queue.append(cb)
277 os.write(self.wp, b"a")
285 def currentwatcher(io, current):
289 io.loopcheck.add(check)