Reworked the Java connection handler a bit.
authorFredrik Tolf <fredrik@dolda2000.com>
Sun, 27 Jan 2008 18:21:28 +0000 (19:21 +0100)
committerFredrik Tolf <fredrik@dolda2000.com>
Sun, 27 Jan 2008 18:21:28 +0000 (19:21 +0100)
lib/java/dolda/dolcon/protocol/Connection.java

index 5d805e6..20c06d6 100644 (file)
@@ -27,7 +27,11 @@ public class Connection {
     }
     
     public void connect() throws ConnectException {
-       state = "connecting";
+       synchronized(this) {
+           if(state != "idle")
+               throw(new IllegalStateException("Already connected"));
+           state = "connecting";
+       }
        try {
            s = new Socket(aspec, 1500);
        } catch(java.net.UnknownHostException e) {
@@ -45,8 +49,10 @@ public class Connection {
                        error(e);
                        throw(e);
                    }
-                   synchronized(connls) {
+                   synchronized(Connection.this) {
                        state = "connected";
+                   }
+                   synchronized(connls) {
                        try {
                            for(ConnectListener l : connls)
                                l.connected();
@@ -68,45 +74,39 @@ public class Connection {
                }
            });
        pending.offer(ccmd);
-       reader = new Reader(s, pending);
-       writer = new Writer(s, queue, pending);
-       Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
-               public void uncaughtException(Thread t, Throwable c) {
-                   boolean n = false;
-                   if(c instanceof StopCondition) {
-                       StopCondition s = (StopCondition)c;
-                       n = s.normal;
-                       c = s.getCause();
-                   }
-                   Exception e;
-                   if(c instanceof Exception)
-                       e = (Exception)c;
-                   else
-                       e = new Exception(c);
-                   if(!n) {
-                       close();
-                       error = e;
-                   }
-                   synchronized(pending) {
-                       Command cmd;
-                       while((cmd = pending.poll()) != null) {
-                           cmd.error(e);
-                       }
-                   }
-                   synchronized(queue) {
-                       Command cmd;
-                       while((cmd = queue.poll()) != null) {
-                           cmd.error(e);
-                       }
-                   }
-               }
-           };
-       reader.setUncaughtExceptionHandler(h);
-       writer.setUncaughtExceptionHandler(h);
+       reader = new Reader();
+       writer = new Writer();
        reader.start();
        writer.start();
     }
     
+    private void error(Throwable c) {
+       boolean n = false;
+       if(c instanceof StopCondition) {
+           StopCondition s = (StopCondition)c;
+           n = s.normal;
+           c = s.getCause();
+       }
+       Exception e;
+       if(c instanceof Exception)
+           e = (Exception)c;
+       else
+           e = new Exception(c);
+       if(!n) {
+           close();
+           error = e;
+       }
+       synchronized(queue) {
+           Command cmd;
+           while((cmd = pending.poll()) != null) {
+               cmd.error(e);
+           }
+           while((cmd = queue.poll()) != null) {
+               cmd.error(e);
+           }
+       }
+    }
+    
     private void checkthread() {
        if(Thread.currentThread() == reader)
            throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
@@ -162,10 +162,10 @@ public class Connection {
        return(error);
     }
 
-    public void addConnectListener(ConnectListener l) {
+    public synchronized void addConnectListener(ConnectListener l) {
+       if((state != "idle") && (state != "connecting"))
+           throw(new IllegalStateException("Already connected"));
        synchronized(connls) {
-           if((state != "idle") && (state != "connecting"))
-               throw(new IllegalStateException("Already connected"));
            connls.add(l);
        }
     }
@@ -186,14 +186,8 @@ public class Connection {
        }
     }
     
-    static private class Writer extends Thread {
-       Socket s;
-       Queue<Command> queue, pending;
-       
-       public Writer(Socket s, Queue<Command> queue, Queue<Command> pending) {
-           this.s = s;
-           this.queue = queue;
-           this.pending = pending;
+    private class Writer extends Thread {
+       public Writer() {
            setDaemon(true);
        }
        
@@ -219,22 +213,18 @@ public class Connection {
                return(sb.toString());
        }
 
-       public void run() {
+       private void guarded() {
            try {
                java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
                while(true) {
                    Command cmd;
                    try {
-                       synchronized(pending) {
-                           while(pending.size() > 0)
-                               pending.wait();
-                       }
                        synchronized(queue) {
-                           do {
-                               if((cmd = queue.poll()) != null)
-                                   break;
+                           while(pending.size() > 0)
+                               queue.wait();
+                           while((cmd = queue.poll()) == null)
                                queue.wait();
-                           } while(true);
+                           pending.offer(cmd);
                        }
                    } catch(InterruptedException e) {
                        throw(new StopCondition(e, true));
@@ -251,28 +241,32 @@ public class Connection {
                throw(new StopCondition(e, false));
            }
        }
-    }
-
-    static private class Reader extends Thread {
-       Socket s;
-       Queue<Command> pending;
        
-       public Reader(Socket s, Queue<Command> pending) {
-           this.s = s;
-           this.pending = pending;
+       public void run() {
+           try {
+               guarded();
+           } catch(Throwable t) {
+               error(t);
+           }
        }
-       
+    }
+
+    private class Reader extends Thread {
        private void dispatch(Response resp) throws Exception {
            if(resp.code < 600) {
-               synchronized(pending) {
-                   resp.cmd = pending.remove();
-                   pending.notifyAll();
+               synchronized(queue) {
+                   try {
+                       resp.cmd = pending.remove();
+                   } catch(NoSuchElementException e) {
+                       throw(new RuntimeException("DC server sent reply without a pending command"));
+                   }
+                   queue.notifyAll();
                }
                resp.cmd.done(resp);
            }
        }
 
-       public void run() {
+       private void guarded() {
            try {
                java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
                String state = "start";
@@ -369,18 +363,21 @@ public class Connection {
                throw(new StopCondition(e, false));
            }
        }
+       
+       public void run() {
+           try {
+               guarded();
+           } catch(Throwable t) {
+               error(t);
+           }
+       }
     }
 
     public void close() {
        try {
            s.close();
-       } catch(IOException e) {
-       }
+       } catch(IOException e) {}
        reader.interrupt();
        writer.interrupt();
     }
-
-    protected void finalize() {
-       close();
-    }
 }