1 import sys, os, errno, threading, select, traceback
8 self.lock = threading.RLock()
16 return ((select.EPOLLIN if ch.readable else 0) |
17 (select.EPOLLOUT if ch.writable else 0))
20 if self.registered and self.th is None:
21 th = threading.Thread(target=self._run, name="Async epoll thread")
22 th.daemon = self._daemon
26 def exception(self, ch, *exc):
28 if self.exc_handler is None:
29 traceback.print_exception(exc)
31 self.exc_handler(ch, *exc)
33 def _cb(self, ch, nm):
35 m = getattr(ch, nm, None)
37 raise AttributeError("%r has no %s method" % (ch, nm))
39 except Exception as exc:
40 self.exception(ch, *sys.exc_info())
44 while self.registered:
45 fd, (ch, evs) = next(iter(self.registered.items()))
46 del self.registered[fd]
47 self.ep.unregister(fd)
54 for fd, (ob, evs) in self.registered.items():
58 while self.registered:
64 except IOError as exc:
65 if exc.errno == errno.EINTR:
68 for fd, evs in evlist:
70 if fd not in self.registered:
72 ch, cevs = self.registered[fd]
73 if fd in self.registered and evs & (select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR):
75 if fd in self.registered and evs & select.EPOLLOUT:
77 if fd in self.registered:
78 nevs = self._evsfor(ch)
80 del self.registered[fd]
84 self.registered[fd] = ch, nevs
95 def daemon(self): return self._daemon
97 def daemon(self, value):
98 self._daemon = bool(value)
100 if self.th is not None:
101 self.th = daemon = self._daemon
106 if fd in self.registered:
107 raise KeyError("fd %i is already registered" % fd)
108 evs = self._evsfor(ch)
113 self.registered[fd] = (ch, evs)
115 self.ep.register(fd, evs)
118 def remove(self, ch, ignore=False):
121 if fd not in self.registered:
124 raise KeyError("fd %i is not registered" % fd)
125 pch, cevs = self.registered[fd]
127 raise ValueError("fd %i registered via object %r, cannot remove with %r" % (pch, ch))
128 del self.registered[fd]
130 self.ep.unregister(fd)
133 def update(self, ch, ignore=False):
136 if fd not in self.registered:
139 raise KeyError("fd %i is not registered" % fd)
140 pch, cevs = self.registered[fd]
142 raise ValueError("fd %i registered via object %r, cannot update with %r" % (pch, ch))
143 evs = self._evsfor(ch)
145 del self.registered[fd]
147 self.ep.unregister(fd)
150 self.registered[fd] = ch, evs
152 self.ep.modify(fd, evs)
155 if threading.current_thread() == self.th:
168 class channel(object):
176 raise NotImplementedError("fileno()")
181 class sockbuffer(channel):
182 def __init__(self, socket, **kwargs):
183 super().__init__(**kwargs)
186 self.obuf = bytearray()
189 return self.sk.fileno()
194 def gotdata(self, data):
198 def send(self, data, eof=False):
199 self.obuf.extend(data)
202 if self.watcher is not None:
203 self.watcher.update(self, True)
210 data = self.sk.recv(1024)
218 return bool(self.obuf);
221 ret = self.sk.send(self.obuf)
222 self.obuf[:ret] = b""
227 class callbuffer(channel):
228 def __init__(self, **kwargs):
229 super().__init__(**kwargs)
231 self.rp, self.wp = os.pipe()
232 self.lock = threading.Lock()
255 data = os.read(self.rp, 1024)
260 cbs = list(self.queue)
270 raise Exception("stopped")
271 self.queue.append(cb)
272 os.write(self.wp, b"a")
280 def currentwatcher(io, current):
285 threading.Thread(target=run, name="Current watcher").start()