--- /dev/null
+package jagi.event;
+
+import java.util.*;
+import java.util.logging.*;
+import java.util.concurrent.*;
+import java.io.*;
+import java.nio.*;
+import java.nio.channels.*;
+import java.nio.channels.spi.*;
+
+public class Driver {
+ private static final Logger log = Logger.getLogger("jagi.event");
+ private static final Logger hlog = Logger.getLogger("jagi.event.handler");
+ private static final ThreadLocal<Driver> current = new ThreadLocal<>();
+ private final Map<SelectorProvider, SelectPool> selectors = new HashMap<>();
+ private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(),
+ 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(128),
+ this::thread);
+
+ protected Thread thread(Runnable tgt) {
+ return(new Thread(tgt));
+ }
+
+ protected void handle(Watcher w, int evs) {
+ try {
+ current.set(this);
+ w.handle(evs);
+ } catch(Throwable t) {
+ error(w, t);
+ } finally {
+ current.remove();
+ }
+ }
+
+ protected void submit(Runnable task) {
+ worker.submit(task);
+ }
+
+ protected void error(Watcher w, Throwable t) {
+ hlog.log(Level.WARNING, w + ": uncaught error when handling event", t);
+ remove(w);
+ }
+
+ class SelectPool implements Runnable {
+ final SelectorProvider provider;
+ final Selector poll;
+ final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>();
+ final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder());
+ final Map<Watcher, Object> paused = new IdentityHashMap<>();
+
+ SelectPool(SelectorProvider provider) {
+ this.provider = provider;
+ try {
+ this.poll = provider.openSelector();
+ } catch(IOException e) {
+ /* I think this counts more as an assertion error. */
+ throw(new RuntimeException(e));
+ }
+ }
+
+ void handle(Watcher w, int evs) {
+ try {
+ pause(w);
+ submit(() -> {
+ try {
+ Driver.this.handle(w, evs);
+ } finally {
+ resume(w);
+ }
+ });
+ } catch(Throwable t) {
+ try {
+ synchronized(selectors) {
+ remove(w);
+ }
+ } catch(Exception e) {
+ t.addSuppressed(e);
+ }
+ log.log(Level.SEVERE, "unexpected error when submitting event", t);
+ }
+ }
+
+ void start() {
+ thread(this).start();
+ }
+
+ public void run() {
+ boolean quit = false;
+ Throwable error = null;
+ try {
+ while(true) {
+ double now = time();
+ long timeout = 0;
+ synchronized(selectors) {
+ Double first = timeheap.keypeek();
+ if((first == null) && watching.isEmpty()) {
+ quit = true;
+ selectors.remove(provider);
+ return;
+ }
+ if(first != null)
+ timeout = (long)Math.ceil((first - now) * 1000);
+ }
+ poll.selectedKeys().clear();
+ try {
+ poll.select(timeout);
+ } catch(IOException e) {
+ throw(new RuntimeException(e));
+ }
+ for(SelectionKey key : poll.selectedKeys())
+ handle((Watcher)key.attachment(), key.readyOps());
+ }
+ } catch(Throwable t) {
+ error = t;
+ throw(t);
+ } finally {
+ if(!quit)
+ log.log(Level.SEVERE, "selector exited abnormally", error);
+ }
+ }
+
+ void pause(Watcher w) {
+ if(paused.containsKey(w))
+ throw(new IllegalStateException(w + ": already paused"));
+ SelectionKey wc = watching.get(w);
+ Object tc = timeheap.remove(w);
+ if((wc == null) && (tc == null))
+ throw(new IllegalStateException(w + ": not registered"));
+ if(wc != null)
+ wc.interestOps(0);
+ paused.put(w, this);
+ }
+
+ void resume(Watcher w) {
+ if(paused.remove(w) == null)
+ return;
+ SelectionKey wc = watching.get(w);
+ int evs = w.events();
+ double timeout = w.timeout();
+ boolean hastime = timeout < Double.POSITIVE_INFINITY;
+ if((evs == 0) && !hastime) {
+ remove(w);
+ return;
+ }
+ wc.interestOps(evs);
+ if(hastime)
+ timeheap.add(w, timeout);
+ poll.wakeup();
+ }
+
+ void add(Watcher w, SelectableChannel ch) {
+ if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w))
+ throw(new IllegalStateException(w + ": already registered"));
+ int evs = w.events();
+ double timeout = w.timeout();
+ boolean hastime = timeout < Double.POSITIVE_INFINITY;
+ if((evs == 0) && !hastime) {
+ w.close();
+ return;
+ }
+ try {
+ watching.put(w, ch.register(poll, evs, w));
+ } catch(ClosedChannelException e) {
+ throw(new RuntimeException("attempted to watch closed channel", e));
+ }
+ if(hastime)
+ timeheap.add(w, timeout);
+ poll.wakeup();
+ }
+
+ void remove(Watcher w) {
+ SelectionKey wc = watching.remove(w);
+ Object tc = timeheap.remove(w);
+ Object pc = paused.remove(w);
+ if(wc != null)
+ wc.cancel();
+ if(((wc != null) || (tc != null)) && (pc != null))
+ throw(new RuntimeException(w + ": inconsistent internal state"));
+ if(wc == null)
+ throw(new IllegalStateException(w + ": not registered"));
+ w.close();
+ poll.wakeup();
+ }
+
+ void update(Watcher w) {
+ SelectionKey wc = watching.get(w);
+ if(wc == null)
+ throw(new IllegalStateException(w + ": not registered"));
+ int evs = w.events();
+ double timeout = w.timeout();
+ boolean hastime = timeout < Double.POSITIVE_INFINITY;
+ if((evs == 0) && !hastime) {
+ remove(w);
+ return;
+ }
+ wc.interestOps(evs);
+ if(hastime)
+ timeheap.set(w, timeout);
+ else
+ timeheap.remove(w);
+ poll.wakeup();
+ }
+ }
+
+ private SelectPool pool(SelectorProvider provider) {
+ SelectPool pool = selectors.get(provider);
+ if(pool == null) {
+ pool = new SelectPool(provider);
+ selectors.put(provider, pool);
+ pool.start();
+ }
+ return(pool);
+ }
+
+ public void add(Watcher w) {
+ SelectableChannel ch = w.channel();
+ synchronized(selectors) {
+ pool(ch.provider()).add(w, ch);
+ }
+ }
+
+ public void remove(Watcher w) {
+ SelectableChannel ch = w.channel();
+ synchronized(selectors) {
+ pool(ch.provider()).remove(w);
+ }
+ }
+
+ public void update(Watcher w) {
+ SelectableChannel ch = w.channel();
+ synchronized(selectors) {
+ pool(ch.provider()).update(w);
+ }
+ }
+
+ public double time() {
+ return(rtime());
+ }
+
+ private static final long rtimeoff = System.nanoTime();
+ public static double rtime() {
+ return((System.nanoTime() - rtimeoff) / 1e9);
+ }
+
+ private static Driver global = null;
+ public static Driver get() {
+ if(global == null) {
+ synchronized(Driver.class) {
+ if(global == null)
+ global = new Driver();
+ }
+ }
+ return(global);
+ }
+
+ public static Driver current() {
+ Driver ret = current.get();
+ if(ret == null)
+ throw(new IllegalStateException("no current driver"));
+ return(ret);
+ }
+}
--- /dev/null
+package jagi.event;
+
+import java.util.*;
+
+public class Heap<V, K> {
+ private static final Object[] empty = {};
+ private final Comparator<? super K> cmp;
+ private final Map<V, Integer> index = new IdentityHashMap<>();
+ private Object[] vbuf = empty, kbuf = empty;
+ private int size;
+
+ public Heap(Comparator<? super K> cmp) {
+ this.cmp = cmp;
+ }
+
+ @SuppressWarnings("unchecked")
+ private V val(int i) {return((V)vbuf[i]);}
+ @SuppressWarnings("unchecked")
+ private K key(int i) {return((K)kbuf[i]);}
+
+ private void raise(V val, K key, int i) {
+ while(i > 0) {
+ int p = (i - 1) >>> 1;
+ if(cmp.compare(key(p), key) <= 0)
+ break;
+ vbuf[i] = vbuf[p];
+ kbuf[i] = kbuf[p];
+ index.put(val(i), i);
+ i = p;
+ }
+ vbuf[i] = val;
+ kbuf[i] = key;
+ index.put(val, i);
+ }
+
+ private void lower(V val, K key, int i) {
+ while(true) {
+ int c1 = (i << 1) + 1, c2 = c1 + 1;
+ if(c1 >= size)
+ break;
+ int c = ((c2 < size) && (cmp.compare(key(c1), key(c2)) > 0)) ? c2 : c1;
+ if(cmp.compare(key(c), key) > 0)
+ break;
+ vbuf[i] = vbuf[c];
+ kbuf[i] = kbuf[c];
+ index.put(val(i), i);
+ i = c;
+ }
+ vbuf[i] = val;
+ kbuf[i] = key;
+ index.put(val, i);
+ }
+
+ private void adjust(V val, K key, int i) {
+ if((i > 0) && cmp.compare(key((i - 1) >> 1), key) > 0)
+ raise(val, key, i);
+ else
+ lower(val, key, i);
+ }
+
+ public int size() {
+ return(size);
+ }
+
+ public V peek() {
+ return((size > 0) ? val(0) : null);
+ }
+
+ public V poll() {
+ if(size == 0)
+ return(null);
+ V ret = val(0);
+ remove(0);
+ return(ret);
+ }
+
+ public V remove() {
+ if(size == 0)
+ throw(new NoSuchElementException());
+ V ret = val(0);
+ remove(0);
+ return(ret);
+ }
+
+ public K keypeek() {
+ return((size > 0) ? key(0) : null);
+ }
+
+ public void add(V val, K key) {
+ if(index.containsKey(val))
+ throw(new IllegalStateException());
+ int p = size++;
+ if(p >= vbuf.length) {
+ int n = Math.max(vbuf.length * 2, 16);
+ vbuf = Arrays.copyOf(vbuf, n);
+ kbuf = Arrays.copyOf(kbuf, n);
+ }
+ raise(val, key, p);
+ }
+
+ public K update(V val, K key) {
+ Integer p = index.get(val);
+ if(p == null)
+ throw(new NoSuchElementException());
+ K ret = key(p);
+ adjust(val, key, p);
+ return(ret);
+ }
+
+ public K set(V val, K key) {
+ Integer p = index.get(val);
+ if(p == null) {
+ add(val, key);
+ return(null);
+ }
+ K ret = key(p);
+ adjust(val, key, p);
+ return(ret);
+ }
+
+ private K remove(int p) {
+ K ret = key(p);
+ size--;
+ if(p == size) {
+ } else if(p < size) {
+ adjust(val(size), key(size), p);
+ } else {
+ throw(new AssertionError());
+ }
+ vbuf[size] = null;
+ kbuf[size] = null;
+ return(ret);
+ }
+
+ public K remove(V val) {
+ Integer p = index.remove(val);
+ if(p == null)
+ return(null);
+ return(remove(p));
+ }
+
+ public boolean contains(V val) {
+ return(index.containsKey(val));
+ }
+
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append('[');
+ for(int i = 0; i < size; i++) {
+ if(i > 0)
+ buf.append(", ");
+ buf.append(String.valueOf(kbuf[i]));
+ buf.append('=');
+ buf.append(String.valueOf(vbuf[i]));
+ }
+ buf.append(']');
+ return(buf.toString());
+ }
+}