1 import sys, os, errno, threading, select, traceback
8 self.lock = threading.RLock()
16 return ((select.EPOLLIN if ch.readable else 0) |
17 (select.EPOLLOUT if ch.writable else 0))
20 if self.registered and self.th is None:
21 th = threading.Thread(target=self._run, name="Async epoll thread")
22 th.daemon = self._daemon
26 def exception(self, ch, *exc):
28 if self.exc_handler is None:
29 traceback.print_exception(exc)
31 self.exc_handler(ch, *exc)
33 def _cb(self, ch, nm):
35 m = getattr(ch, nm, None)
37 raise AttributeError("%r has no %s method" % (ch, nm))
39 except Exception as exc:
40 self.exception(ch, *sys.exc_info())
44 while self.registered:
45 fd, (ch, evs) = next(iter(self.registered.items()))
46 del self.registered[fd]
47 self.ep.unregister(fd)
54 for fd, (ob, evs) in self.registered.items():
58 while self.registered:
64 except IOError as exc:
65 if exc.errno == errno.EINTR:
68 for fd, evs in evlist:
70 if fd not in self.registered:
72 ch, cevs = self.registered[fd]
73 if fd in self.registered and evs & (select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR):
75 if fd in self.registered and evs & select.EPOLLOUT:
77 if fd in self.registered:
78 nevs = self._evsfor(ch)
80 del self.registered[fd]
84 self.registered[fd] = ch, nevs
95 def daemon(self): return self._daemon
97 def daemon(self, value):
98 self._daemon = bool(value)
100 if self.th is not None:
101 self.th = daemon = self._daemon
106 if fd in self.registered:
107 raise KeyError("fd %i is already registered" % fd)
108 evs = self._evsfor(ch)
113 self.registered[fd] = (ch, evs)
115 self.ep.register(fd, evs)
118 def remove(self, ch, ignore=False):
121 if fd not in self.registered:
124 raise KeyError("fd %i is not registered" % fd)
125 pch, cevs = self.registered[fd]
127 raise ValueError("fd %i registered via object %r, cannot remove with %r" % (pch, ch))
128 del self.registered[fd]
130 self.ep.unregister(fd)
133 def update(self, ch, ignore=False):
136 if fd not in self.registered:
139 raise KeyError("fd %i is not registered" % fd)
140 pch, cevs = self.registered[fd]
142 raise ValueError("fd %i registered via object %r, cannot update with %r" % (pch, ch))
143 evs = self._evsfor(ch)
145 del self.registered[fd]
147 self.ep.unregister(fd)
150 self.registered[fd] = ch, evs
152 self.ep.modify(fd, evs)
155 if threading.current_thread() == self.th:
168 class sockbuffer(object):
169 def __init__(self, sk):
172 self.obuf = bytearray()
176 return self.sk.fileno()
181 def gotdata(self, data):
185 def send(self, data, eof=False):
186 self.obuf.extend(data)
189 if self.watcher is not None:
190 self.watcher.update(self, True)
197 data = self.sk.recv(1024)
205 return bool(self.obuf);
208 ret = self.sk.send(self.obuf)
209 self.obuf[:ret] = b""
214 class callbuffer(object):
217 self.rp, self.wp = os.pipe()
218 self.lock = threading.Lock()
241 data = os.read(self.rp, 1024)
246 cbs = list(self.queue)
256 raise Exception("stopped")
257 self.queue.append(cb)
258 os.write(self.wp, b"a")
266 def currentwatcher(io, current):
271 threading.Thread(target=run, name="Current watcher").start()