tgt -> new Thread(tgt, "Request handler thread"));
public EventServer(ServerSocketChannel sk, Function handler) {
- try {
- sk.configureBlocking(false);
- } catch(IOException e) {
- throw(new RuntimeException(e));
- }
this.sk = sk;
this.handler = handler;
}
public void handle(int events) throws IOException {
if(!eof && (buf.remaining() == 0)) {
- buf.rewind();
+ buf.clear();
while(buf.remaining() > 0) {
- if(in.read(buf) < 0)
+ if(in.read(buf) < 0) {
+ eof = true;
break;
+ }
}
+ buf.flip();
}
double now = Driver.current().time();
if((events & SelectionKey.OP_WRITE) != 0) {
public final SocketChannel sk;
public final WritableByteChannel out;
private final ByteBuffer buf;
+ private final long max;
private boolean eof = false;
private double lastread;
+ private long cur = 0;
- public TransferInput(SocketChannel sk, WritableByteChannel out) {
+ public TransferInput(SocketChannel sk, WritableByteChannel out, long max) {
this.sk = sk;
this.out = out;
+ this.max = max;
buf = ByteBuffer.allocate(65536);
buf.flip();
}
public void handle(int events) throws IOException {
double now = Driver.current().time();
if((events & SelectionKey.OP_READ) != 0) {
- buf.rewind();
+ buf.clear();
+ if(buf.remaining() > max - cur)
+ buf.limit(buf.position() + (int)Math.min(max - cur, Integer.MAX_VALUE));
int rv = sk.read(buf);
- if(rv < 0)
+ if(rv < 0) {
eof = true;
- else if(rv > 0)
+ } else if(rv > 0) {
lastread = now;
+ if((cur += rv) >= max)
+ eof = true;
+ }
buf.flip();
while(buf.remaining() > 0)
out.write(buf);
switch(st) {
case "feed-input":
Object sink = resp.get("jagi.input-sink");
+ Object clen = req.env.get("HTTP_CONTENT_LENGTH");
+ long max = 0;
+ if(clen instanceof String) {
+ try {
+ max = Long.parseLong((String)clen);
+ } catch(NumberFormatException e) {
+ }
+ }
if(sink instanceof WritableByteChannel) {
- ev.add(new TransferInput(req.sk, (WritableByteChannel)sink).then(() -> submit(() -> handle(req, next))));
+ ev.add(new TransferInput(req.sk, (WritableByteChannel)sink, max).then(() -> submit(() -> handle(req, next))));
} else if(sink instanceof OutputStream) {
- ev.add(new TransferInput(req.sk, Channels.newChannel((OutputStream)sink)).then(() -> submit(() -> handle(req, next))));
+ ev.add(new TransferInput(req.sk, Channels.newChannel((OutputStream)sink), max).then(() -> submit(() -> handle(req, next))));
} else {
throw(new IllegalArgumentException("input-sink: " + sink));
}
handoff = true;
break;
+ case "chain":
+ submit(() -> handle(req, next));
+ handoff = true;
+ break;
default:
throw(new IllegalArgumentException("jagi.status: " + st));
}
} else if((st = (String)resp.get("http.status")) != null) {
respond(req, st, resp);
handoff = true;
+ } else {
+ throw(new IllegalArgumentException("neither http.status nor jagi.status set"));
}
} catch(Throwable t) {
error = t;
int headlen = 0;
ByteBuffer head = null;
Map<Object, Object> env = null;
+ Request req = null;
Client(SocketChannel sk) {
this.sk = sk;
if((events & SelectionKey.OP_READ) != 0) {
if((env == null) && !readhead())
return;
- Request req = new Request(env, sk);
- submit(() -> EventServer.this.handle(req, handler));
+ req = new Request(env, sk);
handoff = true;
}
if(Driver.current().time() > (lastread + timeout))
}
public void close() {
+ if(req != null)
+ submit(() -> EventServer.this.handle(req, handler));
if(!handoff) {
try {
sk.close();
public int events() {return(SelectionKey.OP_ACCEPT);}
public void handle(int events) throws IOException {
- if((events & SelectionKey.OP_ACCEPT) != 0) {
- SocketChannel cl = sk.accept();
- cl.configureBlocking(false);
- Driver.current().add(new Client(cl));
- }
+ if((events & SelectionKey.OP_ACCEPT) != 0)
+ Driver.current().add(new Client(sk.accept()));
}
public void close() {