From aac2f975859e9b0bbbf582c4d84bebccd2e27e51 Mon Sep 17 00:00:00 2001 From: Fredrik Tolf Date: Sun, 13 Feb 2022 18:00:18 +0100 Subject: [PATCH] Added a basic event-loop driver. --- src/jagi/event/Driver.java | 262 ++++++++++++++++++++++++++++++++++++++++++++ src/jagi/event/Heap.java | 159 +++++++++++++++++++++++++++ src/jagi/event/Watcher.java | 12 ++ 3 files changed, 433 insertions(+) create mode 100644 src/jagi/event/Driver.java create mode 100644 src/jagi/event/Heap.java create mode 100644 src/jagi/event/Watcher.java diff --git a/src/jagi/event/Driver.java b/src/jagi/event/Driver.java new file mode 100644 index 0000000..c9ca638 --- /dev/null +++ b/src/jagi/event/Driver.java @@ -0,0 +1,262 @@ +package jagi.event; + +import java.util.*; +import java.util.logging.*; +import java.util.concurrent.*; +import java.io.*; +import java.nio.*; +import java.nio.channels.*; +import java.nio.channels.spi.*; + +public class Driver { + private static final Logger log = Logger.getLogger("jagi.event"); + private static final Logger hlog = Logger.getLogger("jagi.event.handler"); + private static final ThreadLocal current = new ThreadLocal<>(); + private final Map selectors = new HashMap<>(); + private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(), + 5, TimeUnit.SECONDS, new LinkedBlockingQueue(128), + this::thread); + + protected Thread thread(Runnable tgt) { + return(new Thread(tgt)); + } + + protected void handle(Watcher w, int evs) { + try { + current.set(this); + w.handle(evs); + } catch(Throwable t) { + error(w, t); + } finally { + current.remove(); + } + } + + protected void submit(Runnable task) { + worker.submit(task); + } + + protected void error(Watcher w, Throwable t) { + hlog.log(Level.WARNING, w + ": uncaught error when handling event", t); + remove(w); + } + + class SelectPool implements Runnable { + final SelectorProvider provider; + final Selector poll; + final Map watching = new IdentityHashMap<>(); + final Heap timeheap = new Heap<>(Comparator.naturalOrder()); + final Map paused = new IdentityHashMap<>(); + + SelectPool(SelectorProvider provider) { + this.provider = provider; + try { + this.poll = provider.openSelector(); + } catch(IOException e) { + /* I think this counts more as an assertion error. */ + throw(new RuntimeException(e)); + } + } + + void handle(Watcher w, int evs) { + try { + pause(w); + submit(() -> { + try { + Driver.this.handle(w, evs); + } finally { + resume(w); + } + }); + } catch(Throwable t) { + try { + synchronized(selectors) { + remove(w); + } + } catch(Exception e) { + t.addSuppressed(e); + } + log.log(Level.SEVERE, "unexpected error when submitting event", t); + } + } + + void start() { + thread(this).start(); + } + + public void run() { + boolean quit = false; + Throwable error = null; + try { + while(true) { + double now = time(); + long timeout = 0; + synchronized(selectors) { + Double first = timeheap.keypeek(); + if((first == null) && watching.isEmpty()) { + quit = true; + selectors.remove(provider); + return; + } + if(first != null) + timeout = (long)Math.ceil((first - now) * 1000); + } + poll.selectedKeys().clear(); + try { + poll.select(timeout); + } catch(IOException e) { + throw(new RuntimeException(e)); + } + for(SelectionKey key : poll.selectedKeys()) + handle((Watcher)key.attachment(), key.readyOps()); + } + } catch(Throwable t) { + error = t; + throw(t); + } finally { + if(!quit) + log.log(Level.SEVERE, "selector exited abnormally", error); + } + } + + void pause(Watcher w) { + if(paused.containsKey(w)) + throw(new IllegalStateException(w + ": already paused")); + SelectionKey wc = watching.get(w); + Object tc = timeheap.remove(w); + if((wc == null) && (tc == null)) + throw(new IllegalStateException(w + ": not registered")); + if(wc != null) + wc.interestOps(0); + paused.put(w, this); + } + + void resume(Watcher w) { + if(paused.remove(w) == null) + return; + SelectionKey wc = watching.get(w); + int evs = w.events(); + double timeout = w.timeout(); + boolean hastime = timeout < Double.POSITIVE_INFINITY; + if((evs == 0) && !hastime) { + remove(w); + return; + } + wc.interestOps(evs); + if(hastime) + timeheap.add(w, timeout); + poll.wakeup(); + } + + void add(Watcher w, SelectableChannel ch) { + if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w)) + throw(new IllegalStateException(w + ": already registered")); + int evs = w.events(); + double timeout = w.timeout(); + boolean hastime = timeout < Double.POSITIVE_INFINITY; + if((evs == 0) && !hastime) { + w.close(); + return; + } + try { + watching.put(w, ch.register(poll, evs, w)); + } catch(ClosedChannelException e) { + throw(new RuntimeException("attempted to watch closed channel", e)); + } + if(hastime) + timeheap.add(w, timeout); + poll.wakeup(); + } + + void remove(Watcher w) { + SelectionKey wc = watching.remove(w); + Object tc = timeheap.remove(w); + Object pc = paused.remove(w); + if(wc != null) + wc.cancel(); + if(((wc != null) || (tc != null)) && (pc != null)) + throw(new RuntimeException(w + ": inconsistent internal state")); + if(wc == null) + throw(new IllegalStateException(w + ": not registered")); + w.close(); + poll.wakeup(); + } + + void update(Watcher w) { + SelectionKey wc = watching.get(w); + if(wc == null) + throw(new IllegalStateException(w + ": not registered")); + int evs = w.events(); + double timeout = w.timeout(); + boolean hastime = timeout < Double.POSITIVE_INFINITY; + if((evs == 0) && !hastime) { + remove(w); + return; + } + wc.interestOps(evs); + if(hastime) + timeheap.set(w, timeout); + else + timeheap.remove(w); + poll.wakeup(); + } + } + + private SelectPool pool(SelectorProvider provider) { + SelectPool pool = selectors.get(provider); + if(pool == null) { + pool = new SelectPool(provider); + selectors.put(provider, pool); + pool.start(); + } + return(pool); + } + + public void add(Watcher w) { + SelectableChannel ch = w.channel(); + synchronized(selectors) { + pool(ch.provider()).add(w, ch); + } + } + + public void remove(Watcher w) { + SelectableChannel ch = w.channel(); + synchronized(selectors) { + pool(ch.provider()).remove(w); + } + } + + public void update(Watcher w) { + SelectableChannel ch = w.channel(); + synchronized(selectors) { + pool(ch.provider()).update(w); + } + } + + public double time() { + return(rtime()); + } + + private static final long rtimeoff = System.nanoTime(); + public static double rtime() { + return((System.nanoTime() - rtimeoff) / 1e9); + } + + private static Driver global = null; + public static Driver get() { + if(global == null) { + synchronized(Driver.class) { + if(global == null) + global = new Driver(); + } + } + return(global); + } + + public static Driver current() { + Driver ret = current.get(); + if(ret == null) + throw(new IllegalStateException("no current driver")); + return(ret); + } +} diff --git a/src/jagi/event/Heap.java b/src/jagi/event/Heap.java new file mode 100644 index 0000000..aec2991 --- /dev/null +++ b/src/jagi/event/Heap.java @@ -0,0 +1,159 @@ +package jagi.event; + +import java.util.*; + +public class Heap { + private static final Object[] empty = {}; + private final Comparator cmp; + private final Map index = new IdentityHashMap<>(); + private Object[] vbuf = empty, kbuf = empty; + private int size; + + public Heap(Comparator cmp) { + this.cmp = cmp; + } + + @SuppressWarnings("unchecked") + private V val(int i) {return((V)vbuf[i]);} + @SuppressWarnings("unchecked") + private K key(int i) {return((K)kbuf[i]);} + + private void raise(V val, K key, int i) { + while(i > 0) { + int p = (i - 1) >>> 1; + if(cmp.compare(key(p), key) <= 0) + break; + vbuf[i] = vbuf[p]; + kbuf[i] = kbuf[p]; + index.put(val(i), i); + i = p; + } + vbuf[i] = val; + kbuf[i] = key; + index.put(val, i); + } + + private void lower(V val, K key, int i) { + while(true) { + int c1 = (i << 1) + 1, c2 = c1 + 1; + if(c1 >= size) + break; + int c = ((c2 < size) && (cmp.compare(key(c1), key(c2)) > 0)) ? c2 : c1; + if(cmp.compare(key(c), key) > 0) + break; + vbuf[i] = vbuf[c]; + kbuf[i] = kbuf[c]; + index.put(val(i), i); + i = c; + } + vbuf[i] = val; + kbuf[i] = key; + index.put(val, i); + } + + private void adjust(V val, K key, int i) { + if((i > 0) && cmp.compare(key((i - 1) >> 1), key) > 0) + raise(val, key, i); + else + lower(val, key, i); + } + + public int size() { + return(size); + } + + public V peek() { + return((size > 0) ? val(0) : null); + } + + public V poll() { + if(size == 0) + return(null); + V ret = val(0); + remove(0); + return(ret); + } + + public V remove() { + if(size == 0) + throw(new NoSuchElementException()); + V ret = val(0); + remove(0); + return(ret); + } + + public K keypeek() { + return((size > 0) ? key(0) : null); + } + + public void add(V val, K key) { + if(index.containsKey(val)) + throw(new IllegalStateException()); + int p = size++; + if(p >= vbuf.length) { + int n = Math.max(vbuf.length * 2, 16); + vbuf = Arrays.copyOf(vbuf, n); + kbuf = Arrays.copyOf(kbuf, n); + } + raise(val, key, p); + } + + public K update(V val, K key) { + Integer p = index.get(val); + if(p == null) + throw(new NoSuchElementException()); + K ret = key(p); + adjust(val, key, p); + return(ret); + } + + public K set(V val, K key) { + Integer p = index.get(val); + if(p == null) { + add(val, key); + return(null); + } + K ret = key(p); + adjust(val, key, p); + return(ret); + } + + private K remove(int p) { + K ret = key(p); + size--; + if(p == size) { + } else if(p < size) { + adjust(val(size), key(size), p); + } else { + throw(new AssertionError()); + } + vbuf[size] = null; + kbuf[size] = null; + return(ret); + } + + public K remove(V val) { + Integer p = index.remove(val); + if(p == null) + return(null); + return(remove(p)); + } + + public boolean contains(V val) { + return(index.containsKey(val)); + } + + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append('['); + for(int i = 0; i < size; i++) { + if(i > 0) + buf.append(", "); + buf.append(String.valueOf(kbuf[i])); + buf.append('='); + buf.append(String.valueOf(vbuf[i])); + } + buf.append(']'); + return(buf.toString()); + } +} diff --git a/src/jagi/event/Watcher.java b/src/jagi/event/Watcher.java new file mode 100644 index 0000000..4b85c80 --- /dev/null +++ b/src/jagi/event/Watcher.java @@ -0,0 +1,12 @@ +package jagi.event; + +import java.io.*; +import java.nio.channels.*; + +public interface Watcher { + public SelectableChannel channel(); + public int events(); + public void handle(int events) throws Exception; + public default void close() {} + public default double timeout() {return(Double.POSITIVE_INFINITY);} +} -- 2.11.0