From: Fredrik Tolf Date: Fri, 18 Feb 2022 02:57:27 +0000 (+0100) Subject: Handle unterminated streams in feed-input. X-Git-Url: http://git.dolda2000.com/gitweb/?a=commitdiff_plain;h=a1480d6f1457fab85b52f3caab83baa527ae0571;p=jagi.git Handle unterminated streams in feed-input. --- diff --git a/src/jagi/scgi/EventServer.java b/src/jagi/scgi/EventServer.java index e2055c7..1a6bc2a 100644 --- a/src/jagi/scgi/EventServer.java +++ b/src/jagi/scgi/EventServer.java @@ -152,12 +152,15 @@ public class EventServer implements Runnable { public final SocketChannel sk; public final WritableByteChannel out; private final ByteBuffer buf; + private final long max; private boolean eof = false; private double lastread; + private long cur = 0; - public TransferInput(SocketChannel sk, WritableByteChannel out) { + public TransferInput(SocketChannel sk, WritableByteChannel out, long max) { this.sk = sk; this.out = out; + this.max = max; buf = ByteBuffer.allocate(65536); buf.flip(); } @@ -171,11 +174,15 @@ public class EventServer implements Runnable { double now = Driver.current().time(); if((events & SelectionKey.OP_READ) != 0) { buf.rewind(); + if(buf.remaining() > max - cur) + buf.limit(buf.position() + (int)Math.min(max - cur, Integer.MAX_VALUE)); int rv = sk.read(buf); - if(rv < 0) + if(rv < 0) { eof = true; - else if(rv > 0) + } else if(rv > 0) { lastread = now; + cur += rv; + } buf.flip(); while(buf.remaining() > 0) out.write(buf); @@ -270,15 +277,27 @@ public class EventServer implements Runnable { switch(st) { case "feed-input": Object sink = resp.get("jagi.input-sink"); + Object clen = req.env.get("HTTP_CONTENT_LENGTH"); + long max = 0; + if(clen instanceof String) { + try { + max = Long.parseLong((String)clen); + } catch(NumberFormatException e) { + } + } if(sink instanceof WritableByteChannel) { - ev.add(new TransferInput(req.sk, (WritableByteChannel)sink).then(() -> submit(() -> handle(req, next)))); + ev.add(new TransferInput(req.sk, (WritableByteChannel)sink, max).then(() -> submit(() -> handle(req, next)))); } else if(sink instanceof OutputStream) { - ev.add(new TransferInput(req.sk, Channels.newChannel((OutputStream)sink)).then(() -> submit(() -> handle(req, next)))); + ev.add(new TransferInput(req.sk, Channels.newChannel((OutputStream)sink), max).then(() -> submit(() -> handle(req, next)))); } else { throw(new IllegalArgumentException("input-sink: " + sink)); } handoff = true; break; + case "chain": + submit(() -> handle(req, next)); + handoff = true; + break; default: throw(new IllegalArgumentException("jagi.status: " + st)); }