self.lock = threading.RLock()
self.ep = None
self.th = None
+ self.stopped = False
self._daemon = True
@staticmethod
except Exception as exc:
self.exception(ch, *sys.exc_info())
+ def _closeall(self):
+ with self.lock:
+ while self.registered:
+ fd, (ch, evs) = next(iter(self.registered.items()))
+ del self.registered[fd]
+ self.ep.unregister(fd)
+ self._cb(ch, "close")
+
def _run(self):
ep = select.epoll()
try:
self.ep = ep
while self.registered:
+ if self.stopped:
+ self._closeall()
+ break
try:
evlist = ep.poll(10)
except IOError as exc:
if self.ep:
self.ep.modify(fd, evs)
+ def stop(self):
+ if threading.current_thread() == self.th:
+ self.stopped = True
+ else:
+ def tgt():
+ self.stopped = True
+ cb = callbuffer()
+ cb.call(tgt)
+ cb.stop()
+ self.add(cb)
+
def watcher():
return epoller()
if self.wp >= 0:
os.close(self.wp)
self.wp = -1
+
+def currentwatcher(io, current):
+ def run():
+ while current:
+ current.wait()
+ io.stop()
+ threading.Thread(target=run, name="Current watcher").start()