Allow jagidir modules to include other compiled modules.
[jagi.git] /
... / ...
1package jagi.scgi;
3import jagi.*;
4import jagi.event.*;
5import java.util.*;
6import java.util.function.*;
7import java.util.concurrent.*;
8import java.util.logging.*;
10import java.nio.*;
11import java.nio.channels.*;
13public class EventServer implements Runnable {
14 private static final double timeout = 5;
15 private static final Logger log = Logger.getLogger("jagi.server");
16 private final ServerSocketChannel sk;
17 private final Function handler;
18 private final Driver ev = Driver.get();
19 private final ExecutorService handlers = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors() * 2,
20 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(64),
21 tgt -> new Thread(tgt, "Request handler thread"));
23 public EventServer(ServerSocketChannel sk, Function handler) {
24 try {
25 sk.configureBlocking(false);
26 } catch(IOException e) {
27 throw(new RuntimeException(e));
28 }
29 = sk;
30 this.handler = handler;
31 }
33 public static class Request {
34 public final Map<Object, Object> env;
35 public final SocketChannel sk;
37 public Request(Map<Object, Object> env, SocketChannel sk) {
38 this.env = env;
39 = sk;
40 }
42 public void close() {
43 ArrayList<Object> cleanup = new ArrayList<>((Collection<?>)env.get("jagi.cleanup"));
44 cleanup.add(sk);
45 RuntimeException ce = null;
46 for(Object obj : cleanup) {
47 if(obj instanceof AutoCloseable) {
48 try {
49 ((AutoCloseable)obj).close();
50 } catch(Exception e) {
51 if(ce == null)
52 ce = new RuntimeException("error(s) occurred during cleanup");
53 ce.addSuppressed(e);
54 }
55 }
56 }
57 if(ce != null)
58 throw(ce);
59 }
60 }
62 protected void error(Request req, Throwable error) {
63 log.log(Level.WARNING, "uncaught exception while handling request", error);
64 }
66 public static abstract class ChainWatcher implements Watcher {
67 private Runnable then;
68 public ChainWatcher then(Runnable then) {this.then = then; return(this);}
70 public void close() {
71 if(then != null)
73 }
74 }
76 public static class BufferedOutput extends ChainWatcher {
77 public final SocketChannel sk;
78 public final ByteBuffer buf;
79 private double lastwrite;
81 public BufferedOutput(SocketChannel sk, ByteBuffer buf) {
82 = sk;
83 this.buf = buf;
84 }
86 public void added(Driver d) {lastwrite = d.time();}
87 public SelectableChannel channel() {return(sk);}
88 public int events() {return((buf.remaining() > 0) ? SelectionKey.OP_WRITE : -1);}
89 public double timeout() {return(lastwrite + timeout);}
91 public void handle(int events) throws IOException {
92 double now = Driver.current().time();
93 if((events & SelectionKey.OP_WRITE) != 0) {
94 if(sk.write(buf) > 0)
95 lastwrite = now;
96 }
97 if(now > lastwrite + timeout)
98 buf.position(buf.limit());
99 }
100 }
102 public static class TransferOutput extends ChainWatcher {
103 public final SocketChannel sk;
104 public final ReadableByteChannel in;
105 private final ByteBuffer buf;
106 private boolean eof = false;
107 private double lastwrite;
109 public TransferOutput(SocketChannel sk, ReadableByteChannel in) {
110 = sk;
111 = in;
112 buf = ByteBuffer.allocate(65536);
113 buf.flip();
114 }
116 public void added(Driver d) {lastwrite = d.time();}
117 public SelectableChannel channel() {return(sk);}
118 public int events() {return((eof && (buf.remaining() == 0)) ? -1 : SelectionKey.OP_WRITE);}
119 public double timeout() {return(lastwrite + timeout);}
121 public void handle(int events) throws IOException {
122 if(!eof && (buf.remaining() == 0)) {
123 buf.rewind();
124 while(buf.remaining() > 0) {
125 if( < 0)
126 break;
127 }
128 }
129 double now = Driver.current().time();
130 if((events & SelectionKey.OP_WRITE) != 0) {
131 if(sk.write(buf) > 0)
132 lastwrite = now;
133 }
134 if(now > lastwrite + timeout) {
135 eof = true;
136 buf.position(buf.limit());
137 }
138 }
140 public void close() {
141 try {
142 in.close();
143 } catch(IOException e) {
144 log.log(Level.WARNING, "failed to close transfer channel: " + in, e);
145 } finally {
146 super.close();
147 }
148 }
149 }
151 public static class TransferInput extends ChainWatcher {
152 public final SocketChannel sk;
153 public final WritableByteChannel out;
154 private final ByteBuffer buf;
155 private boolean eof = false;
156 private double lastread;
158 public TransferInput(SocketChannel sk, WritableByteChannel out) {
159 = sk;
160 this.out = out;
161 buf = ByteBuffer.allocate(65536);
162 buf.flip();
163 }
165 public void added(Driver d) {lastread = d.time();}
166 public SelectableChannel channel() {return(sk);}
167 public int events() {return(eof ? -1 : SelectionKey.OP_READ);}
168 public double timeout() {return(lastread + timeout);}
170 public void handle(int events) throws IOException {
171 double now = Driver.current().time();
172 if((events & SelectionKey.OP_READ) != 0) {
173 buf.rewind();
174 int rv =;
175 if(rv < 0)
176 eof = true;
177 else if(rv > 0)
178 lastread = now;
179 buf.flip();
180 while(buf.remaining() > 0)
181 out.write(buf);
182 }
183 if(now > lastread + timeout) {
184 eof = true;
185 buf.position(buf.limit());
186 }
187 }
189 public void close() {
190 try {
191 out.close();
192 } catch(IOException e) {
193 log.log(Level.WARNING, "failed to close transfer channel: " + out, e);
194 } finally {
195 super.close();
196 }
197 }
198 }
200 protected void respond(Request req, String status, Map resp) {
201 Object output = resp.get("jagi.output");
202 ByteArrayOutputStream buf = new ByteArrayOutputStream();
203 try {
204 Writer head = new OutputStreamWriter(buf, Utils.UTF8);
205 head.write("Status: ");
206 head.write(status);
207 head.write("\n");
208 for(Iterator it = resp.entrySet().iterator(); it.hasNext();) {
209 Map.Entry ent = (Map.Entry);
210 Object val = ent.getValue();
211 if((ent.getKey() instanceof String) && (val != null)) {
212 String key = (String)ent.getKey();
213 if(key.startsWith("http.")) {
214 String nm = key.substring(5);
215 if(nm.equalsIgnoreCase("status"))
216 continue;
217 if(val instanceof Collection) {
218 for(Object part : (Collection)val) {
219 head.write(nm);
220 head.write(": ");
221 head.write(part.toString());
222 head.write("\n");
223 }
224 } else {
225 head.write(nm);
226 head.write(": ");
227 head.write(val.toString());
228 head.write("\n");
229 }
230 }
231 }
232 }
233 head.write("\n");
234 head.flush();
235 } catch(IOException e) {
236 throw(new RuntimeException("cannot happen"));
237 }
238 ChainWatcher out;
239 if(output == null) {
240 out = new BufferedOutput(, ByteBuffer.allocate(0));
241 } else if(output instanceof byte[]) {
242 out = new BufferedOutput(, ByteBuffer.wrap((byte[])output));
243 } else if(output instanceof ByteBuffer) {
244 out = new BufferedOutput(, (ByteBuffer)output);
245 } else if(output instanceof String) {
246 out = new BufferedOutput(, ByteBuffer.wrap(((String)output).getBytes(Utils.UTF8)));
247 } else if(output instanceof CharSequence) {
248 out = new BufferedOutput(, Utils.UTF8.encode(CharBuffer.wrap((CharSequence)output)));
249 } else if(output instanceof InputStream) {
250 out = new TransferOutput(, Channels.newChannel((InputStream)output));
251 } else if(output instanceof ReadableByteChannel) {
252 out = new TransferOutput(, (ReadableByteChannel)output);
253 } else {
254 throw(new IllegalArgumentException("response-body: " + output));
255 }
256 out.then(() -> submit(req::close));
257 ev.add(new BufferedOutput(, ByteBuffer.wrap(buf.toByteArray())).then(() -> ev.add(out)));
258 }
260 @SuppressWarnings("unchecked")
261 protected void handle(Request req, Function handler) {
262 boolean handoff = false;
263 try {
264 Throwable error = null;
265 try {
266 Map resp = (Map)handler.apply(req.env);
267 String st;
268 if((st = (String)resp.get("jagi.status")) != null) {
269 Function next = (Function)resp.get("");
270 switch(st) {
271 case "feed-input":
272 Object sink = resp.get("jagi.input-sink");
273 if(sink instanceof WritableByteChannel) {
274 ev.add(new TransferInput(, (WritableByteChannel)sink).then(() -> submit(() -> handle(req, next))));
275 } else if(sink instanceof OutputStream) {
276 ev.add(new TransferInput(, Channels.newChannel((OutputStream)sink)).then(() -> submit(() -> handle(req, next))));
277 } else {
278 throw(new IllegalArgumentException("input-sink: " + sink));
279 }
280 handoff = true;
281 break;
282 default:
283 throw(new IllegalArgumentException("jagi.status: " + st));
284 }
285 } else if((st = (String)resp.get("http.status")) != null) {
286 respond(req, st, resp);
287 handoff = true;
288 }
289 } catch(Throwable t) {
290 error = t;
291 throw(t);
292 } finally {
293 if(!handoff) {
294 try {
295 req.close();
296 } catch(Throwable ce) {
297 if(error == null) {
298 throw(ce);
299 } else {
300 error.addSuppressed(ce);
301 }
302 }
303 }
304 }
305 } catch(Throwable t) {
306 error(req, t);
307 }
308 }
310 protected void submit(Runnable task) {
311 handlers.submit(task);
312 }
314 class Client implements Watcher {
315 final SocketChannel sk;
316 double lastread;
317 boolean eof = false, handoff = false;
318 int headlen = 0;
319 ByteBuffer head = null;
320 Map<Object, Object> env = null;
322 Client(SocketChannel sk) {
323 = sk;
324 }
326 public void added(Driver d) {lastread = d.time();}
327 public SelectableChannel channel() {return(sk);}
328 public double timeout() {return(lastread + timeout);}
329 public int events() {
330 if(eof)
331 return(-1);
332 if(env == null)
333 return(SelectionKey.OP_READ);
334 return(-1);
335 }
337 boolean readhead() throws IOException {
338 if(head == null) {
339 ByteBuffer buf = ByteBuffer.allocate(1);
340 while(true) {
341 buf.rewind();
342 int rv =;
343 if(rv < 0) {
344 eof = true;
345 return(false);
346 } else if(rv == 0) {
347 return(false);
348 } else {
349 lastread = Driver.current().time();
350 int c = buf.get(0);
351 if((c >= '0') && (c <= '9')) {
352 headlen = (headlen * 10) + (c - '0');
353 } else if(c == ':') {
354 head = ByteBuffer.allocate(headlen + 1);
355 break;
356 } else {
357 eof = true;
358 return(false);
359 }
360 }
361 }
362 }
363 while(true) {
364 if(head.remaining() == 0) {
365 if(head.get(head.limit() - 1) != ',') {
366 /* Unterminated netstring */
367 eof = true;
368 return(false);
369 }
370 head.limit(head.limit() - 1);
371 env = Jagi.mkenv(Scgi.splithead(head), sk);
372 return(true);
373 }
374 int rv =;
375 if(rv < 0) {
376 eof = true;
377 return(false);
378 } else if(rv == 0) {
379 return(false);
380 }
381 }
382 }
384 public void handle(int events) throws IOException {
385 if((events & SelectionKey.OP_READ) != 0) {
386 if((env == null) && !readhead())
387 return;
388 Request req = new Request(env, sk);
389 submit(() -> EventServer.this.handle(req, handler));
390 handoff = true;
391 }
392 if(Driver.current().time() > (lastread + timeout))
393 eof = true;
394 }
396 public void close() {
397 if(!handoff) {
398 try {
399 sk.close();
400 } catch(IOException e) {
401 }
402 }
403 }
404 }
406 class Accepter implements Watcher {
407 boolean closed = false;
409 public SelectableChannel channel() {return(sk);}
410 public int events() {return(SelectionKey.OP_ACCEPT);}
412 public void handle(int events) throws IOException {
413 if((events & SelectionKey.OP_ACCEPT) != 0) {
414 SocketChannel cl = sk.accept();
415 cl.configureBlocking(false);
416 Driver.current().add(new Client(cl));
417 }
418 }
420 public void close() {
421 synchronized(this) {
422 closed = true;
423 notifyAll();
424 }
425 }
426 }
428 public void run() {
429 Accepter main = new Accepter();
430 ev.add(main);
431 try {
432 synchronized(main) {
433 while(!main.closed) {
434 main.wait();
435 }
436 }
437 } catch(InterruptedException e) {
438 ev.remove(main);
439 } finally {
440 try {
441 sk.close();
442 } catch(IOException e) {
443 throw(new RuntimeException(e));
444 }
445 }
446 }