1 import sys, os, errno, threading, select, traceback
6 def __init__(self, check=None):
8 self.lock = threading.RLock()
12 self.loopcheck = set()
14 self.loopcheck.add(check)
19 return ((select.EPOLLIN if ch.readable else 0) |
20 (select.EPOLLOUT if ch.writable else 0))
23 if self.registered and self.th is None:
24 th = threading.Thread(target=self._run, name="Async epoll thread")
25 th.daemon = self._daemon
29 def exception(self, ch, *exc):
31 if self.exc_handler is None:
32 traceback.print_exception(*exc)
34 self.exc_handler(ch, *exc)
36 def _cb(self, ch, nm):
38 m = getattr(ch, nm, None)
40 raise AttributeError("%r has no %s method" % (ch, nm))
42 except Exception as exc:
43 self.exception(ch, *sys.exc_info())
47 while self.registered:
48 fd, (ch, evs) = next(iter(self.registered.items()))
49 del self.registered[fd]
50 self.ep.unregister(fd)
58 for fd, (ob, evs) in self.registered.items():
61 self.registered.clear()
65 while self.registered:
66 for ck in self.loopcheck:
73 except IOError as exc:
74 if exc.errno == errno.EINTR:
77 for fd, evs in evlist:
79 if fd not in self.registered:
81 ch, cevs = self.registered[fd]
82 if fd in self.registered and evs & (select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR):
84 if fd in self.registered and evs & select.EPOLLOUT:
86 if fd in self.registered:
87 nevs = self._evsfor(ch)
89 del self.registered[fd]
93 self.registered[fd] = ch, nevs
104 def daemon(self): return self._daemon
106 def daemon(self, value):
107 self._daemon = bool(value)
109 if self.th is not None:
110 self.th = daemon = self._daemon
115 if fd in self.registered:
116 raise KeyError("fd %i is already registered" % fd)
117 evs = self._evsfor(ch)
122 self.registered[fd] = (ch, evs)
124 self.ep.register(fd, evs)
127 def remove(self, ch, ignore=False):
130 if fd not in self.registered:
133 raise KeyError("fd %i is not registered" % fd)
134 pch, cevs = self.registered[fd]
136 raise ValueError("fd %i registered via object %r, cannot remove with %r" % (pch, ch))
137 del self.registered[fd]
139 self.ep.unregister(fd)
142 def update(self, ch, ignore=False):
145 if fd not in self.registered:
148 raise KeyError("fd %i is not registered" % fd)
149 pch, cevs = self.registered[fd]
151 raise ValueError("fd %i registered via object %r, cannot update with %r" % (pch, ch))
152 evs = self._evsfor(ch)
154 del self.registered[fd]
156 self.ep.unregister(fd)
159 self.registered[fd] = ch, evs
161 self.ep.modify(fd, evs)
164 if threading.current_thread() == self.th:
177 class channel(object):
185 raise NotImplementedError("fileno()")
190 class sockbuffer(channel):
191 def __init__(self, socket, **kwargs):
192 super().__init__(**kwargs)
195 self.obuf = bytearray()
198 return self.sk.fileno()
203 def gotdata(self, data):
207 def send(self, data, eof=False):
208 self.obuf.extend(data)
211 if self.watcher is not None:
212 self.watcher.update(self, True)
219 data = self.sk.recv(1024)
227 return bool(self.obuf);
230 ret = self.sk.send(self.obuf)
231 self.obuf[:ret] = b""
236 class callbuffer(channel):
237 def __init__(self, **kwargs):
238 super().__init__(**kwargs)
240 self.rp, self.wp = os.pipe()
241 self.lock = threading.Lock()
264 data = os.read(self.rp, 1024)
269 cbs = list(self.queue)
279 raise Exception("stopped")
280 self.queue.append(cb)
281 os.write(self.wp, b"a")
289 def currentwatcher(io, current):
293 io.loopcheck.add(check)