More Java work.
authorFredrik Tolf <fredrik@dolda2000.com>
Sun, 18 Nov 2007 04:05:58 +0000 (05:05 +0100)
committerFredrik Tolf <fredrik@dolda2000.com>
Sun, 18 Nov 2007 04:05:58 +0000 (05:05 +0100)
lib/java/dolda/dolcon/ClosedException.java [new file with mode: 0644]
lib/java/dolda/dolcon/Command.java [new file with mode: 0644]
lib/java/dolda/dolcon/Connection.java
lib/java/dolda/dolcon/Response.java
lib/java/dolda/dolcon/VersionException.java [new file with mode: 0644]

diff --git a/lib/java/dolda/dolcon/ClosedException.java b/lib/java/dolda/dolcon/ClosedException.java
new file mode 100644 (file)
index 0000000..e1944c0
--- /dev/null
@@ -0,0 +1,11 @@
+package dolda.dolcon;
+
+public class ClosedException extends Exception {
+    public ClosedException(Throwable cause) {
+       super("DC connection has been closed", cause);
+    }
+
+    public ClosedException() {
+       this(null);
+    }
+}
diff --git a/lib/java/dolda/dolcon/Command.java b/lib/java/dolda/dolcon/Command.java
new file mode 100644 (file)
index 0000000..fa72357
--- /dev/null
@@ -0,0 +1,37 @@
+package dolda.dolcon;
+
+import java.util.*;
+
+public class Command {
+    List<String> tokens;
+    Set<Listener> listeners = new HashSet<Listener>();
+    Response resp;
+    
+    public interface Listener {
+       public void done(Response resp) throws Exception;
+       public void error(Exception cause);
+    }
+
+    public Command(List<String> tokens) {
+       this.tokens = tokens;
+    }
+    
+    public Command(String... tokens) {
+       this(Arrays.asList(tokens));
+    }
+    
+    public void addListener(Listener l) {
+       listeners.add(l);
+    }
+    
+    public void done(Response resp) throws Exception {
+       this.resp = resp;
+       for(Listener l : listeners)
+           l.done(resp);
+    }
+    
+    public void error(Exception cause) {
+       for(Listener l : listeners)
+           l.error(cause);
+    }
+}
index 0d85f76..b31151e 100644 (file)
@@ -5,11 +5,29 @@ import java.net.Socket;
 import java.util.*;
 
 public class Connection {
-    Socket s;
-    Reader reader;
-    LinkedList<Response> resps = new LinkedList<Response>();
+    private Socket s;
+    private Reader reader;
+    private Writer writer;
+    private Queue<Command> queue = new LinkedList<Command>();
+    private Queue<Command> pending = new LinkedList<Command>();
+    private int reqver = 2, revlo, revhi;
+    private String aspec;
+    private String state;
+    private Set<ConnectListener> connls = new HashSet<ConnectListener>();
+    private Exception error;
     
-    public Connection(String aspec) throws ConnectException {
+    public interface ConnectListener {
+       public void connected() throws Exception;
+       public void error(Exception cause);
+    }
+
+    public Connection(String aspec) {
+       this.aspec = aspec;
+       state = "idle";
+    }
+    
+    public void connect() throws ConnectException {
+       state = "connecting";
        try {
            s = new Socket(aspec, 1500);
        } catch(java.net.UnknownHostException e) {
@@ -17,157 +35,352 @@ public class Connection {
        } catch(IOException e) {
            throw(new ConnectException("Could not connect to host " + aspec, e));
        }
-       reader = new Reader(s, resps);
+       pending = new LinkedList<Command>();
+       Command ccmd = new Command(".connect");
+       ccmd.addListener(new Command.Listener() {
+               public void done(Response resp) throws Exception {
+                   try {
+                       checkver(resp);
+                   } catch(VersionException e) {
+                       error(e);
+                       throw(e);
+                   }
+                   synchronized(connls) {
+                       state = "connected";
+                       try {
+                           for(ConnectListener l : connls)
+                               l.connected();
+                       } finally {
+                           connls.clear();
+                       }
+                   }
+               }
+               
+               public void error(Exception cause) {
+                   synchronized(connls) {
+                       try {
+                           for(ConnectListener l : connls)
+                               l.error(cause);
+                       } finally {
+                           connls.clear();
+                       }
+                   }
+               }
+           });
+       pending.offer(ccmd);
+       reader = new Reader(s, pending);
+       writer = new Writer(s, queue, pending);
+       Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
+               public void uncaughtException(Thread t, Throwable c) {
+                   boolean n = false;
+                   if(c instanceof StopCondition) {
+                       StopCondition s = (StopCondition)c;
+                       n = s.normal;
+                       c = s.getCause();
+                   }
+                   Exception e;
+                   if(c instanceof Exception)
+                       e = (Exception)c;
+                   else
+                       e = new Exception(c);
+                   if(!n) {
+                       close();
+                       error = e;
+                   }
+                   synchronized(pending) {
+                       Command cmd;
+                       while((cmd = pending.poll()) != null) {
+                           cmd.error(e);
+                       }
+                   }
+                   synchronized(queue) {
+                       Command cmd;
+                       while((cmd = queue.poll()) != null) {
+                           cmd.error(e);
+                       }
+                   }
+               }
+           };
+       reader.setUncaughtExceptionHandler(h);
+       writer.setUncaughtExceptionHandler(h);
        reader.start();
+       writer.start();
     }
     
-    static private class Reader extends Thread {
-       Exception error = null;
+    private void checkthread() {
+       if(Thread.currentThread() == reader)
+           throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
+    }
+        
+    public void syncConnect() throws ConnectException, ClosedException, InterruptedException {
+       checkthread();
+       final boolean[] donep = new boolean[] {false};
+       final Exception[] errp = new Exception[] {null};
+       ConnectListener l = new ConnectListener() {
+               public void connected() {
+                   donep[0] = true;
+                   synchronized(this) {
+                       notifyAll();
+                   }
+               }
+               
+               public void error(Exception cause) {
+                   donep[0] = true;
+                   errp[0] = cause;
+                   synchronized(this) {
+                       notifyAll();
+                   }
+               }
+           };
+       addConnectListener(l);
+       connect();
+       while(!donep[0]) {
+           synchronized(l) {
+               l.wait();
+           }
+       }
+       if(errp[0] != null)
+           throw(new ClosedException(errp[0]));
+    }
+
+    public void expectVersion(int reqver) {
+       this.reqver = reqver;
+    }
+    
+    private void checkver(Response resp) throws VersionException {
+       revlo = Integer.parseInt(resp.token(0, 0));
+       revhi = Integer.parseInt(resp.token(0, 1));
+       if((reqver < revlo) || (reqver > revhi))
+           throw(new VersionException(reqver, revlo, revhi));
+    }
+
+    public Exception join() throws InterruptedException {
+       while(reader.isAlive()) {
+           reader.join();
+       }
+       close();
+       return(error);
+    }
+
+    public void addConnectListener(ConnectListener l) {
+       synchronized(connls) {
+           if((state != "idle") && (state != "connecting"))
+               throw(new IllegalStateException("Already connected"));
+           connls.add(l);
+       }
+    }
+
+    private void qcmd(Command cmd) {
+       synchronized(queue) {
+           queue.offer(cmd);
+           queue.notifyAll();
+       }
+    }
+    
+    static private class StopCondition extends Error {
+       final boolean normal;
+       
+       public StopCondition(Exception cause, boolean normal) {
+           super(cause);
+           this.normal = normal;
+       }
+    }
+    
+    static private class Writer extends Thread {
        Socket s;
-       Collection<Response> resps;
+       Queue<Command> queue, pending;
        
-       public Reader(Socket s, Collection<Response> resps) {
+       public Writer(Socket s, Queue<Command> queue, Queue<Command> pending) {
            this.s = s;
-           this.resps = resps;
+           this.queue = queue;
+           this.pending = pending;
            setDaemon(true);
        }
        
-       public void run() {
-           java.io.Reader r;
-           try {
-               r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
-           } catch(IOException e) {
-               synchronized(resps) {
-                   resps.notifyAll();
-                   error = e;
+       private String quote(String t) {
+           if(t.length() == 0)
+               return("\"\"");
+           StringBuilder sb = new StringBuilder();
+           boolean quote = false;
+           for(int i = 0; i < t.length(); i++) {
+               char c = t.charAt(i);
+               if(c == '\"') {
+                   sb.append("\\\"");
+               } else if(Character.isWhitespace(c)) {
+                   quote = true;
+                   sb.append(c);
+               } else {
+                   sb.append(c);
                }
-               return;
            }
-           String state = "start";
-           StringBuilder ct = new StringBuilder();
-           int code = -1;
-           boolean last = true;
-           List<List<String>>lines = new LinkedList<List<String>>();
-           List<String>tokens = new LinkedList<String>();
-           while(true) {
-               char c;
-               {
-                   int i;
+           if(quote)
+               return("\"" + sb.toString() + "\"");
+           else
+               return(sb.toString());
+       }
+
+       public void run() {
+           try {
+               java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
+               while(true) {
+                   Command cmd;
                    try {
-                       if((i = r.read()) < 0) {
-                           throw(new IOException("The server closed the connection"));
+                       synchronized(pending) {
+                           while(pending.size() > 0)
+                               pending.wait();
                        }
-                   } catch(IOException e) {
-                       synchronized(resps) {
-                           resps.notifyAll();
-                           error = e;
+                       synchronized(queue) {
+                           do {
+                               if((cmd = queue.poll()) != null)
+                                   break;
+                               queue.wait();
+                           } while(true);
                        }
-                       return;
+                   } catch(InterruptedException e) {
+                       throw(new StopCondition(e, true));
                    }
-                   c = (char)i;
+                   StringBuilder out = new StringBuilder();
+                   for(String s : cmd.tokens) {
+                       if(out.length() > 0)
+                           out.append(' ');
+                       out.append(quote(s));
+                   }
+                   w.write(out.toString());
                }
-               eat: do {
-                   if(state == "start") {
-                       if(c == '\r') {
-                           state = "nl";
-                       } else if(Character.isWhitespace(c)) {
-                       } else {
-                           if(code == -1)
-                               state = "code";
-                           else
-                               state = "token";
-                           continue eat;
+           } catch(IOException e) {
+               throw(new StopCondition(e, false));
+           }
+       }
+    }
+
+    static private class Reader extends Thread {
+       Socket s;
+       Queue<Command> pending;
+       
+       public Reader(Socket s, Queue<Command> pending) {
+           this.s = s;
+           this.pending = pending;
+       }
+       
+       private void dispatch(Response resp) throws Exception {
+           if(resp.code < 600) {
+               synchronized(pending) {
+                   resp.cmd = pending.remove();
+                   pending.notifyAll();
+               }
+               resp.cmd.done(resp);
+           }
+       }
+
+       public void run() {
+           try {
+               java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
+               String state = "start";
+               StringBuilder ct = new StringBuilder();
+               int code = -1;
+               boolean last = true;
+               List<List<String>> lines = new LinkedList<List<String>>();
+               List<String> tokens = new LinkedList<String>();
+               while(true) {
+                   char c;
+                   {
+                       int i;
+                       try {
+                           if((i = r.read()) < 0) {
+                               throw(new IOException("The server closed the connection"));
+                           }
+                       } catch(java.nio.channels.ClosedByInterruptException e) {
+                           throw(new StopCondition(e, true));
                        }
-                   } else if(state == "nl") {
-                       if(c == '\n') {
-                           if(code == -1) {
-                               synchronized(resps) {
-                                   resps.notifyAll();
-                                   try {
-                                       throw(new IOException("Illegal response code " + code + " from the server"));
-                                   } catch(IOException e) {
-                                       error = e;
-                                   }
-                               }
-                               return;
+                       c = (char)i;
+                   }
+                   eat: do {
+                       if(state == "start") {
+                           if(c == '\r') {
+                               state = "nl";
+                           } else if(Character.isWhitespace(c)) {
+                           } else {
+                               if(code == -1)
+                                   state = "code";
+                               else
+                                   state = "token";
+                               continue eat;
                            }
-                           lines.add(tokens);
-                           tokens = new LinkedList<String>();
-                           if(last) {
-                               synchronized(resps) {
-                                   resps.add(new Response(code, lines));
-                                   resps.notifyAll();
+                       } else if(state == "nl") {
+                           if(c == '\n') {
+                               if((code < 100) || (code >= 1000)) {
+                                   throw(new IOException("Illegal response code " + code + " from the server"));
                                }
-                               lines = new LinkedList<List<String>>();
+                               lines.add(tokens);
+                               tokens = new LinkedList<String>();
+                               if(last) {
+                                   dispatch(new Response(code, lines));
+                                   lines = new LinkedList<List<String>>();
+                               }
+                               code = -1;
+                               state = "start";
+                           } else {
+                               state = "start";
+                               continue eat;
                            }
-                           state = "start";
-                       } else {
-                           state = "start";
-                           continue eat;
-                       }
-                   } else if(state == "code") {
-                       if((c == '-') || Character.isWhitespace(c)) {
-                           last = c != '-';
-                           code = Integer.parseInt(ct.toString());
-                           ct.setLength(0);
-                           state = "start";
-                           continue eat;
-                       } else {
+                       } else if(state == "code") {
+                           if((c == '-') || Character.isWhitespace(c)) {
+                               last = c != '-';
+                               code = Integer.parseInt(ct.toString());
+                               ct.setLength(0);
+                               state = "start";
+                               continue eat;
+                           } else {
+                               ct.append(c);
+                           }
+                       } else if(state == "token") {
+                           if(Character.isWhitespace(c)) {
+                               tokens.add(ct.toString());
+                               ct.setLength(0);
+                               state = "start";
+                               continue eat;
+                           } else if(c == '\\') {
+                               state = "bs";
+                           } else if(c == '"') {
+                               state = "cited";
+                           } else {
+                               ct.append(c);
+                           }
+                       } else if(state == "bs") {
+                           ct.append(c);
+                           state = "token";
+                       } else if(state == "cited") {
+                           if(c == '\\')
+                               state = "cbs";
+                           else if(c == '"')
+                               state = "token";
+                           else
+                               ct.append(c);
+                       } else if(state == "cbs") {
                            ct.append(c);
-                       }
-                   } else if(state == "token") {
-                       if(Character.isWhitespace(c)) {
-                           tokens.add(ct.toString());
-                           ct.setLength(0);
-                           state = "start";
-                           code = -1;
-                           continue eat;
-                       } else if(c == '\\') {
-                           state = "bs";
-                       } else if(c == '"') {
                            state = "cited";
                        } else {
-                           ct.append(c);
+                           throw(new Error("invalid state " + state));
                        }
-                   } else if(state == "bs") {
-                       ct.append(c);
-                       state = "token";
-                   } else if(state == "cited") {
-                       if(c == '\\')
-                           state = "cbs";
-                       else if(c == '"')
-                           state = "token";
-                       else
-                           ct.append(c);
-                   } else if(state == "cbs") {
-                       ct.append(c);
-                       state = "cited";
-                   } else {
-                       throw(new Error("invalid state " + state));
-                   }
-                   break;
-               } while(true);
+                       break;
+                   } while(true);
+               }
+           } catch(Exception e) {
+               throw(new StopCondition(e, false));
            }
        }
     }
 
-    protected void finalize() {
+    public void close() {
        try {
            s.close();
        } catch(IOException e) {
        }
        reader.interrupt();
+       writer.interrupt();
     }
 
-    public static void main(String[] args) throws Exception {
-       Connection c = new Connection("pc18");
-       while(true) {
-           while(c.resps.size() > 0) {
-               System.out.println(c.resps.remove(0));
-           }
-           synchronized(c.resps) {
-               c.resps.wait();
-           }
-       }
+    protected void finalize() {
+       close();
     }
 }
index 5ce1115..d5ee921 100644 (file)
@@ -3,10 +3,11 @@ package dolda.dolcon;
 import java.util.*;
 
 public class Response {
-    List<List<String>>lines;
+    List<List<String>> lines;
+    Command cmd;
     int code;
     
-    public Response(int code, List<List<String>>lines) {
+    public Response(int code, List<List<String>> lines) {
        this.code = code;
        this.lines = lines;
     }
@@ -14,4 +15,8 @@ public class Response {
     public String toString() {
        return("Response " + code + ": " + lines.toString());
     }
+    
+    public String token(int line, int token) {
+       return(lines.get(line).get(token));
+    }
 }
diff --git a/lib/java/dolda/dolcon/VersionException.java b/lib/java/dolda/dolcon/VersionException.java
new file mode 100644 (file)
index 0000000..ca3bc74
--- /dev/null
@@ -0,0 +1,12 @@
+package dolda.dolcon;
+
+public class VersionException extends Exception {
+    public final int r, l, h;
+    
+    public VersionException(int r, int l, int h) {
+       super("Unexpected protocol revision: " + l + "-" + h + ", wanted " + r);
+       this.r = r;
+       this.l = l;
+       this.h = h;
+    }
+}