4 import java.util.logging.*;
5 import java.util.concurrent.*;
8 import java.nio.channels.*;
9 import java.nio.channels.spi.*;
12 private static final Logger log = Logger.getLogger("jagi.event");
13 private static final Logger hlog = Logger.getLogger("jagi.event.handler");
14 private static final ThreadLocal<Driver> current = new ThreadLocal<>();
15 private final Map<SelectorProvider, SelectPool> selectors = new HashMap<>();
16 private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(),
17 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(128),
20 protected Thread thread(Runnable tgt) {
21 return(new Thread(tgt));
24 protected void handle(Watcher w, int evs) {
28 } catch(Throwable t) {
29 error(w, t, "handling event");
35 protected void close(Watcher w) {
39 } catch(Throwable t) {
40 error(w, t, "closing");
46 protected void submit(Runnable task) {
50 protected void error(Watcher w, Throwable t, String thing) {
51 hlog.log(Level.WARNING, w + ": uncaught error when " + thing, t);
55 class SelectPool implements Runnable {
56 final SelectorProvider provider;
58 final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>();
59 final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder());
60 final Map<Watcher, Object> paused = new IdentityHashMap<>();
61 final Collection<SelectionKey> cancelled = new HashSet<>();
63 SelectPool(SelectorProvider provider) {
64 this.provider = provider;
66 this.poll = provider.openSelector();
67 } catch(IOException e) {
68 /* I think this counts more as an assertion error. */
69 throw(new RuntimeException(e));
73 void handle(Watcher w, int evs) {
74 if(!watching.containsKey(w))
80 Driver.this.handle(w, evs);
85 } catch(Throwable t) {
87 synchronized(selectors) {
90 } catch(Exception e) {
93 log.log(Level.SEVERE, "unexpected error when submitting event", t);
102 boolean quit = false;
103 Throwable error = null;
108 synchronized(selectors) {
109 Double first = timeheap.keypeek();
110 if((first == null) && watching.isEmpty()) {
112 selectors.remove(provider);
116 timeout = Math.max((long)Math.ceil((first - now) * 1000), 1);
118 Collection<SelectionKey> precancelled;
119 synchronized(cancelled) {
120 precancelled = new ArrayList<>(cancelled);
122 if(!precancelled.isEmpty())
124 poll.selectedKeys().clear();
126 poll.select(timeout);
127 } catch(IOException e) {
128 throw(new RuntimeException(e));
130 if(!precancelled.isEmpty()) {
131 synchronized(cancelled) {
132 cancelled.removeAll(precancelled);
133 cancelled.notifyAll();
136 for(SelectionKey key : poll.selectedKeys())
137 handle((Watcher)key.attachment(), key.readyOps());
140 Double first = timeheap.keypeek();
141 if((first == null) || (first > now))
143 handle(timeheap.remove(), 0);
146 } catch(Throwable t) {
151 log.log(Level.SEVERE, "selector exited abnormally", error);
155 void pause(Watcher w) {
156 if(paused.containsKey(w))
157 throw(new IllegalStateException(w + ": already paused"));
158 SelectionKey wc = watching.get(w);
159 Object tc = timeheap.remove(w);
160 if((wc == null) && (tc == null))
161 throw(new IllegalStateException(w + ": not registered"));
167 void resume(Watcher w) {
168 if(paused.remove(w) == null)
170 SelectionKey wc = watching.get(w);
171 int evs = w.events();
172 double timeout = w.timeout();
173 boolean hastime = timeout < Double.POSITIVE_INFINITY;
180 timeheap.add(w, timeout);
184 void add(Watcher w, SelectableChannel ch) {
185 if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w))
186 throw(new IllegalStateException(w + ": already registered"));
187 int evs = w.events();
188 double timeout = w.timeout();
189 boolean hastime = timeout < Double.POSITIVE_INFINITY;
191 submit(() -> close(w));
194 w.added(Driver.this);
196 watching.put(w, ch.register(poll, evs, w));
197 } catch(ClosedChannelException e) {
198 throw(new RuntimeException("attempted to watch closed channel", e));
201 timeheap.add(w, timeout);
205 void remove(Watcher w) {
206 SelectionKey wc = watching.remove(w);
207 Object tc = timeheap.remove(w);
208 Object pc = paused.remove(w);
210 synchronized(cancelled) {
215 while(cancelled.contains(wc)) {
218 } catch(InterruptedException e) {
223 Thread.currentThread().interrupt();
226 if(((wc != null) || (tc != null)) && (pc != null))
227 throw(new RuntimeException(w + ": inconsistent internal state"));
229 throw(new IllegalStateException(w + ": not registered"));
230 submit(() -> close(w));
233 void update(Watcher w) {
234 SelectionKey wc = watching.get(w);
236 throw(new IllegalStateException(w + ": not registered"));
237 int evs = w.events();
238 double timeout = w.timeout();
239 boolean hastime = timeout < Double.POSITIVE_INFINITY;
246 timeheap.set(w, timeout);
253 private SelectPool pool(SelectorProvider provider) {
254 SelectPool pool = selectors.get(provider);
256 pool = new SelectPool(provider);
257 selectors.put(provider, pool);
263 public void add(Watcher w) {
264 SelectableChannel ch = w.channel();
265 synchronized(selectors) {
266 pool(ch.provider()).add(w, ch);
270 public void remove(Watcher w) {
271 SelectableChannel ch = w.channel();
272 synchronized(selectors) {
273 pool(ch.provider()).remove(w);
277 public void update(Watcher w) {
278 SelectableChannel ch = w.channel();
279 synchronized(selectors) {
280 pool(ch.provider()).update(w);
284 public double time() {
288 private static final long rtimeoff = System.nanoTime();
289 public static double rtime() {
290 return((System.nanoTime() - rtimeoff) / 1e9);
293 private static Driver global = null;
294 public static Driver get() {
296 synchronized(Driver.class) {
298 global = new Driver();
304 public static Driver current() {
305 Driver ret = current.get();
307 throw(new IllegalStateException("no current driver"));