class epoller(object):
exc_handler = None
- def __init__(self):
+ def __init__(self, check=None):
self.registered = {}
self.lock = threading.RLock()
self.ep = None
self.th = None
+ self.stopped = False
+ self.loopcheck = set()
+ if check is not None:
+ self.loopcheck.add(check)
self._daemon = True
@staticmethod
def exception(self, ch, *exc):
self.remove(ch)
if self.exc_handler is None:
- traceback.print_exception(exc)
+ traceback.print_exception(*exc)
else:
self.exc_handler(ch, *exc)
except Exception as exc:
self.exception(ch, *sys.exc_info())
+ def _closeall(self):
+ with self.lock:
+ while self.registered:
+ fd, (ch, evs) = next(iter(self.registered.items()))
+ del self.registered[fd]
+ self.ep.unregister(fd)
+ self._cb(ch, "close")
+
def _run(self):
ep = select.epoll()
try:
self.ep = ep
while self.registered:
+ for ck in self.loopcheck:
+ ck(self)
+ if self.stopped:
+ self._closeall()
+ break
try:
evlist = ep.poll(10)
except IOError as exc:
if self.ep:
self.ep.modify(fd, evs)
+ def stop(self):
+ if threading.current_thread() == self.th:
+ self.stopped = True
+ else:
+ def tgt():
+ self.stopped = True
+ cb = callbuffer()
+ cb.call(tgt)
+ cb.stop()
+ self.add(cb)
+
def watcher():
return epoller()
-class sockbuffer(object):
- def __init__(self, sk):
- self.sk = sk
+class channel(object):
+ readable = False
+ writable = False
+
+ def __init__(self):
+ self.watcher = None
+
+ def fileno(self):
+ raise NotImplementedError("fileno()")
+
+ def close(self):
+ pass
+
+class sockbuffer(channel):
+ def __init__(self, socket, **kwargs):
+ super().__init__(**kwargs)
+ self.sk = socket
self.eof = False
self.obuf = bytearray()
- self.watcher = None
def fileno(self):
return self.sk.fileno()
self.obuf[:] = b""
self.eof = True
-class callbuffer(object):
- def __init__(self):
+class callbuffer(channel):
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
self.queue = []
self.rp, self.wp = os.pipe()
self.lock = threading.Lock()
if self.wp >= 0:
os.close(self.wp)
self.wp = -1
+
+def currentwatcher(io, current):
+ def check(io):
+ if not current:
+ io.stop()
+ io.loopcheck.add(check)