X-Git-Url: http://git.dolda2000.com/gitweb/?a=blobdiff_plain;f=src%2Fjagi%2Fevent%2FDriver.java;fp=src%2Fjagi%2Fevent%2FDriver.java;h=c9ca63856081003d49e1eec3b86e7e24cf148874;hb=aac2f975859e9b0bbbf582c4d84bebccd2e27e51;hp=0000000000000000000000000000000000000000;hpb=49ccd711f15e0fbb64afdef0e6698aca14ecbc79;p=jagi.git diff --git a/src/jagi/event/Driver.java b/src/jagi/event/Driver.java new file mode 100644 index 0000000..c9ca638 --- /dev/null +++ b/src/jagi/event/Driver.java @@ -0,0 +1,262 @@ +package jagi.event; + +import java.util.*; +import java.util.logging.*; +import java.util.concurrent.*; +import java.io.*; +import java.nio.*; +import java.nio.channels.*; +import java.nio.channels.spi.*; + +public class Driver { + private static final Logger log = Logger.getLogger("jagi.event"); + private static final Logger hlog = Logger.getLogger("jagi.event.handler"); + private static final ThreadLocal current = new ThreadLocal<>(); + private final Map selectors = new HashMap<>(); + private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(), + 5, TimeUnit.SECONDS, new LinkedBlockingQueue(128), + this::thread); + + protected Thread thread(Runnable tgt) { + return(new Thread(tgt)); + } + + protected void handle(Watcher w, int evs) { + try { + current.set(this); + w.handle(evs); + } catch(Throwable t) { + error(w, t); + } finally { + current.remove(); + } + } + + protected void submit(Runnable task) { + worker.submit(task); + } + + protected void error(Watcher w, Throwable t) { + hlog.log(Level.WARNING, w + ": uncaught error when handling event", t); + remove(w); + } + + class SelectPool implements Runnable { + final SelectorProvider provider; + final Selector poll; + final Map watching = new IdentityHashMap<>(); + final Heap timeheap = new Heap<>(Comparator.naturalOrder()); + final Map paused = new IdentityHashMap<>(); + + SelectPool(SelectorProvider provider) { + this.provider = provider; + try { + this.poll = provider.openSelector(); + } catch(IOException e) { + /* I think this counts more as an assertion error. */ + throw(new RuntimeException(e)); + } + } + + void handle(Watcher w, int evs) { + try { + pause(w); + submit(() -> { + try { + Driver.this.handle(w, evs); + } finally { + resume(w); + } + }); + } catch(Throwable t) { + try { + synchronized(selectors) { + remove(w); + } + } catch(Exception e) { + t.addSuppressed(e); + } + log.log(Level.SEVERE, "unexpected error when submitting event", t); + } + } + + void start() { + thread(this).start(); + } + + public void run() { + boolean quit = false; + Throwable error = null; + try { + while(true) { + double now = time(); + long timeout = 0; + synchronized(selectors) { + Double first = timeheap.keypeek(); + if((first == null) && watching.isEmpty()) { + quit = true; + selectors.remove(provider); + return; + } + if(first != null) + timeout = (long)Math.ceil((first - now) * 1000); + } + poll.selectedKeys().clear(); + try { + poll.select(timeout); + } catch(IOException e) { + throw(new RuntimeException(e)); + } + for(SelectionKey key : poll.selectedKeys()) + handle((Watcher)key.attachment(), key.readyOps()); + } + } catch(Throwable t) { + error = t; + throw(t); + } finally { + if(!quit) + log.log(Level.SEVERE, "selector exited abnormally", error); + } + } + + void pause(Watcher w) { + if(paused.containsKey(w)) + throw(new IllegalStateException(w + ": already paused")); + SelectionKey wc = watching.get(w); + Object tc = timeheap.remove(w); + if((wc == null) && (tc == null)) + throw(new IllegalStateException(w + ": not registered")); + if(wc != null) + wc.interestOps(0); + paused.put(w, this); + } + + void resume(Watcher w) { + if(paused.remove(w) == null) + return; + SelectionKey wc = watching.get(w); + int evs = w.events(); + double timeout = w.timeout(); + boolean hastime = timeout < Double.POSITIVE_INFINITY; + if((evs == 0) && !hastime) { + remove(w); + return; + } + wc.interestOps(evs); + if(hastime) + timeheap.add(w, timeout); + poll.wakeup(); + } + + void add(Watcher w, SelectableChannel ch) { + if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w)) + throw(new IllegalStateException(w + ": already registered")); + int evs = w.events(); + double timeout = w.timeout(); + boolean hastime = timeout < Double.POSITIVE_INFINITY; + if((evs == 0) && !hastime) { + w.close(); + return; + } + try { + watching.put(w, ch.register(poll, evs, w)); + } catch(ClosedChannelException e) { + throw(new RuntimeException("attempted to watch closed channel", e)); + } + if(hastime) + timeheap.add(w, timeout); + poll.wakeup(); + } + + void remove(Watcher w) { + SelectionKey wc = watching.remove(w); + Object tc = timeheap.remove(w); + Object pc = paused.remove(w); + if(wc != null) + wc.cancel(); + if(((wc != null) || (tc != null)) && (pc != null)) + throw(new RuntimeException(w + ": inconsistent internal state")); + if(wc == null) + throw(new IllegalStateException(w + ": not registered")); + w.close(); + poll.wakeup(); + } + + void update(Watcher w) { + SelectionKey wc = watching.get(w); + if(wc == null) + throw(new IllegalStateException(w + ": not registered")); + int evs = w.events(); + double timeout = w.timeout(); + boolean hastime = timeout < Double.POSITIVE_INFINITY; + if((evs == 0) && !hastime) { + remove(w); + return; + } + wc.interestOps(evs); + if(hastime) + timeheap.set(w, timeout); + else + timeheap.remove(w); + poll.wakeup(); + } + } + + private SelectPool pool(SelectorProvider provider) { + SelectPool pool = selectors.get(provider); + if(pool == null) { + pool = new SelectPool(provider); + selectors.put(provider, pool); + pool.start(); + } + return(pool); + } + + public void add(Watcher w) { + SelectableChannel ch = w.channel(); + synchronized(selectors) { + pool(ch.provider()).add(w, ch); + } + } + + public void remove(Watcher w) { + SelectableChannel ch = w.channel(); + synchronized(selectors) { + pool(ch.provider()).remove(w); + } + } + + public void update(Watcher w) { + SelectableChannel ch = w.channel(); + synchronized(selectors) { + pool(ch.provider()).update(w); + } + } + + public double time() { + return(rtime()); + } + + private static final long rtimeoff = System.nanoTime(); + public static double rtime() { + return((System.nanoTime() - rtimeoff) / 1e9); + } + + private static Driver global = null; + public static Driver get() { + if(global == null) { + synchronized(Driver.class) { + if(global == null) + global = new Driver(); + } + } + return(global); + } + + public static Driver current() { + Driver ret = current.get(); + if(ret == null) + throw(new IllegalStateException("no current driver")); + return(ret); + } +}