Commit | Line | Data |
---|---|---|
aac2f975 FT |
1 | package jagi.event; |
2 | ||
3 | import java.util.*; | |
4 | import java.util.logging.*; | |
5 | import java.util.concurrent.*; | |
6 | import java.io.*; | |
7 | import java.nio.*; | |
8 | import java.nio.channels.*; | |
9 | import java.nio.channels.spi.*; | |
10 | ||
11 | public class Driver { | |
12 | private static final Logger log = Logger.getLogger("jagi.event"); | |
13 | private static final Logger hlog = Logger.getLogger("jagi.event.handler"); | |
14 | private static final ThreadLocal<Driver> current = new ThreadLocal<>(); | |
15 | private final Map<SelectorProvider, SelectPool> selectors = new HashMap<>(); | |
16 | private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(), | |
17 | 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(128), | |
18 | this::thread); | |
19 | ||
20 | protected Thread thread(Runnable tgt) { | |
21 | return(new Thread(tgt)); | |
22 | } | |
23 | ||
24 | protected void handle(Watcher w, int evs) { | |
25 | try { | |
26 | current.set(this); | |
27 | w.handle(evs); | |
28 | } catch(Throwable t) { | |
7a3bde5c FT |
29 | error(w, t, "handling event"); |
30 | } finally { | |
31 | current.remove(); | |
32 | } | |
33 | } | |
34 | ||
35 | protected void close(Watcher w) { | |
36 | try { | |
37 | current.set(this); | |
38 | w.close(); | |
39 | } catch(Throwable t) { | |
40 | error(w, t, "closing"); | |
aac2f975 FT |
41 | } finally { |
42 | current.remove(); | |
43 | } | |
44 | } | |
45 | ||
46 | protected void submit(Runnable task) { | |
47 | worker.submit(task); | |
48 | } | |
49 | ||
7a3bde5c FT |
50 | protected void error(Watcher w, Throwable t, String thing) { |
51 | hlog.log(Level.WARNING, w + ": uncaught error when " + thing, t); | |
aac2f975 FT |
52 | remove(w); |
53 | } | |
54 | ||
55 | class SelectPool implements Runnable { | |
56 | final SelectorProvider provider; | |
57 | final Selector poll; | |
58 | final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>(); | |
59 | final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder()); | |
60 | final Map<Watcher, Object> paused = new IdentityHashMap<>(); | |
e6788877 | 61 | final Collection<SelectionKey> cancelled = new HashSet<>(); |
aac2f975 FT |
62 | |
63 | SelectPool(SelectorProvider provider) { | |
64 | this.provider = provider; | |
65 | try { | |
66 | this.poll = provider.openSelector(); | |
67 | } catch(IOException e) { | |
68 | /* I think this counts more as an assertion error. */ | |
69 | throw(new RuntimeException(e)); | |
70 | } | |
71 | } | |
72 | ||
73 | void handle(Watcher w, int evs) { | |
2a11bb22 FT |
74 | if(!watching.containsKey(w)) |
75 | return; | |
aac2f975 FT |
76 | try { |
77 | pause(w); | |
78 | submit(() -> { | |
79 | try { | |
80 | Driver.this.handle(w, evs); | |
81 | } finally { | |
82 | resume(w); | |
83 | } | |
84 | }); | |
85 | } catch(Throwable t) { | |
86 | try { | |
87 | synchronized(selectors) { | |
88 | remove(w); | |
89 | } | |
90 | } catch(Exception e) { | |
91 | t.addSuppressed(e); | |
92 | } | |
93 | log.log(Level.SEVERE, "unexpected error when submitting event", t); | |
94 | } | |
95 | } | |
96 | ||
97 | void start() { | |
98 | thread(this).start(); | |
99 | } | |
100 | ||
101 | public void run() { | |
102 | boolean quit = false; | |
103 | Throwable error = null; | |
104 | try { | |
2a11bb22 | 105 | double now = time(); |
aac2f975 | 106 | while(true) { |
aac2f975 FT |
107 | long timeout = 0; |
108 | synchronized(selectors) { | |
109 | Double first = timeheap.keypeek(); | |
110 | if((first == null) && watching.isEmpty()) { | |
111 | quit = true; | |
112 | selectors.remove(provider); | |
113 | return; | |
114 | } | |
115 | if(first != null) | |
e6788877 | 116 | timeout = Math.max((long)Math.ceil((first - now) * 1000), 1); |
aac2f975 | 117 | } |
e6788877 FT |
118 | Collection<SelectionKey> precancelled; |
119 | synchronized(cancelled) { | |
120 | precancelled = new ArrayList<>(cancelled); | |
121 | } | |
122 | if(!precancelled.isEmpty()) | |
123 | timeout = 1; | |
aac2f975 FT |
124 | poll.selectedKeys().clear(); |
125 | try { | |
126 | poll.select(timeout); | |
127 | } catch(IOException e) { | |
128 | throw(new RuntimeException(e)); | |
129 | } | |
e6788877 FT |
130 | if(!precancelled.isEmpty()) { |
131 | synchronized(cancelled) { | |
132 | cancelled.removeAll(precancelled); | |
133 | cancelled.notifyAll(); | |
134 | } | |
135 | } | |
aac2f975 FT |
136 | for(SelectionKey key : poll.selectedKeys()) |
137 | handle((Watcher)key.attachment(), key.readyOps()); | |
2a11bb22 FT |
138 | now = time(); |
139 | while(true) { | |
140 | Double first = timeheap.keypeek(); | |
141 | if((first == null) || (first > now)) | |
142 | break; | |
143 | handle(timeheap.remove(), 0); | |
144 | } | |
aac2f975 FT |
145 | } |
146 | } catch(Throwable t) { | |
147 | error = t; | |
148 | throw(t); | |
149 | } finally { | |
150 | if(!quit) | |
151 | log.log(Level.SEVERE, "selector exited abnormally", error); | |
152 | } | |
153 | } | |
154 | ||
155 | void pause(Watcher w) { | |
156 | if(paused.containsKey(w)) | |
157 | throw(new IllegalStateException(w + ": already paused")); | |
158 | SelectionKey wc = watching.get(w); | |
159 | Object tc = timeheap.remove(w); | |
160 | if((wc == null) && (tc == null)) | |
161 | throw(new IllegalStateException(w + ": not registered")); | |
162 | if(wc != null) | |
163 | wc.interestOps(0); | |
164 | paused.put(w, this); | |
165 | } | |
166 | ||
167 | void resume(Watcher w) { | |
168 | if(paused.remove(w) == null) | |
169 | return; | |
170 | SelectionKey wc = watching.get(w); | |
171 | int evs = w.events(); | |
172 | double timeout = w.timeout(); | |
173 | boolean hastime = timeout < Double.POSITIVE_INFINITY; | |
2c1781f3 | 174 | if(evs < 0) { |
aac2f975 FT |
175 | remove(w); |
176 | return; | |
177 | } | |
178 | wc.interestOps(evs); | |
179 | if(hastime) | |
180 | timeheap.add(w, timeout); | |
181 | poll.wakeup(); | |
182 | } | |
183 | ||
184 | void add(Watcher w, SelectableChannel ch) { | |
185 | if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w)) | |
186 | throw(new IllegalStateException(w + ": already registered")); | |
1ee6412b FT |
187 | try { |
188 | ch.configureBlocking(false); | |
189 | } catch(IOException e) { | |
190 | throw(new RuntimeException(ch + ": could not make non-blocking", e)); | |
191 | } | |
aac2f975 FT |
192 | int evs = w.events(); |
193 | double timeout = w.timeout(); | |
194 | boolean hastime = timeout < Double.POSITIVE_INFINITY; | |
2c1781f3 | 195 | if(evs < 0) { |
7a3bde5c | 196 | submit(() -> close(w)); |
aac2f975 FT |
197 | return; |
198 | } | |
2c1781f3 | 199 | w.added(Driver.this); |
aac2f975 FT |
200 | try { |
201 | watching.put(w, ch.register(poll, evs, w)); | |
202 | } catch(ClosedChannelException e) { | |
203 | throw(new RuntimeException("attempted to watch closed channel", e)); | |
204 | } | |
205 | if(hastime) | |
206 | timeheap.add(w, timeout); | |
207 | poll.wakeup(); | |
208 | } | |
209 | ||
210 | void remove(Watcher w) { | |
211 | SelectionKey wc = watching.remove(w); | |
212 | Object tc = timeheap.remove(w); | |
213 | Object pc = paused.remove(w); | |
e6788877 FT |
214 | if(wc != null) { |
215 | synchronized(cancelled) { | |
216 | cancelled.add(wc); | |
217 | wc.cancel(); | |
218 | poll.wakeup(); | |
219 | boolean irq = false; | |
220 | while(cancelled.contains(wc)) { | |
221 | try { | |
222 | cancelled.wait(); | |
223 | } catch(InterruptedException e) { | |
224 | irq = true; | |
225 | } | |
226 | } | |
227 | if(irq) | |
228 | Thread.currentThread().interrupt(); | |
229 | } | |
230 | } | |
aac2f975 FT |
231 | if(((wc != null) || (tc != null)) && (pc != null)) |
232 | throw(new RuntimeException(w + ": inconsistent internal state")); | |
233 | if(wc == null) | |
234 | throw(new IllegalStateException(w + ": not registered")); | |
7a3bde5c | 235 | submit(() -> close(w)); |
aac2f975 FT |
236 | } |
237 | ||
238 | void update(Watcher w) { | |
239 | SelectionKey wc = watching.get(w); | |
240 | if(wc == null) | |
241 | throw(new IllegalStateException(w + ": not registered")); | |
242 | int evs = w.events(); | |
243 | double timeout = w.timeout(); | |
244 | boolean hastime = timeout < Double.POSITIVE_INFINITY; | |
2c1781f3 | 245 | if(evs < 0) { |
aac2f975 FT |
246 | remove(w); |
247 | return; | |
248 | } | |
249 | wc.interestOps(evs); | |
250 | if(hastime) | |
251 | timeheap.set(w, timeout); | |
252 | else | |
253 | timeheap.remove(w); | |
254 | poll.wakeup(); | |
255 | } | |
256 | } | |
257 | ||
258 | private SelectPool pool(SelectorProvider provider) { | |
259 | SelectPool pool = selectors.get(provider); | |
260 | if(pool == null) { | |
261 | pool = new SelectPool(provider); | |
262 | selectors.put(provider, pool); | |
263 | pool.start(); | |
264 | } | |
265 | return(pool); | |
266 | } | |
267 | ||
268 | public void add(Watcher w) { | |
269 | SelectableChannel ch = w.channel(); | |
270 | synchronized(selectors) { | |
271 | pool(ch.provider()).add(w, ch); | |
272 | } | |
273 | } | |
274 | ||
275 | public void remove(Watcher w) { | |
276 | SelectableChannel ch = w.channel(); | |
277 | synchronized(selectors) { | |
278 | pool(ch.provider()).remove(w); | |
279 | } | |
280 | } | |
281 | ||
282 | public void update(Watcher w) { | |
283 | SelectableChannel ch = w.channel(); | |
284 | synchronized(selectors) { | |
285 | pool(ch.provider()).update(w); | |
286 | } | |
287 | } | |
288 | ||
289 | public double time() { | |
290 | return(rtime()); | |
291 | } | |
292 | ||
293 | private static final long rtimeoff = System.nanoTime(); | |
294 | public static double rtime() { | |
295 | return((System.nanoTime() - rtimeoff) / 1e9); | |
296 | } | |
297 | ||
298 | private static Driver global = null; | |
299 | public static Driver get() { | |
300 | if(global == null) { | |
301 | synchronized(Driver.class) { | |
302 | if(global == null) | |
303 | global = new Driver(); | |
304 | } | |
305 | } | |
306 | return(global); | |
307 | } | |
308 | ||
309 | public static Driver current() { | |
310 | Driver ret = current.get(); | |
311 | if(ret == null) | |
312 | throw(new IllegalStateException("no current driver")); | |
313 | return(ret); | |
314 | } | |
315 | } |