}
public void connect() throws ConnectException {
- state = "connecting";
+ synchronized(this) {
+ if(state != "idle")
+ throw(new IllegalStateException("Already connected"));
+ state = "connecting";
+ }
try {
s = new Socket(aspec, 1500);
} catch(java.net.UnknownHostException e) {
error(e);
throw(e);
}
- synchronized(connls) {
+ synchronized(Connection.this) {
state = "connected";
+ }
+ synchronized(connls) {
try {
for(ConnectListener l : connls)
l.connected();
}
});
pending.offer(ccmd);
- reader = new Reader(s, pending);
- writer = new Writer(s, queue, pending);
- Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
- public void uncaughtException(Thread t, Throwable c) {
- boolean n = false;
- if(c instanceof StopCondition) {
- StopCondition s = (StopCondition)c;
- n = s.normal;
- c = s.getCause();
- }
- Exception e;
- if(c instanceof Exception)
- e = (Exception)c;
- else
- e = new Exception(c);
- if(!n) {
- close();
- error = e;
- }
- synchronized(pending) {
- Command cmd;
- while((cmd = pending.poll()) != null) {
- cmd.error(e);
- }
- }
- synchronized(queue) {
- Command cmd;
- while((cmd = queue.poll()) != null) {
- cmd.error(e);
- }
- }
- }
- };
- reader.setUncaughtExceptionHandler(h);
- writer.setUncaughtExceptionHandler(h);
+ reader = new Reader();
+ writer = new Writer();
reader.start();
writer.start();
}
+ private void error(Throwable c) {
+ boolean n = false;
+ if(c instanceof StopCondition) {
+ StopCondition s = (StopCondition)c;
+ n = s.normal;
+ c = s.getCause();
+ }
+ Exception e;
+ if(c instanceof Exception)
+ e = (Exception)c;
+ else
+ e = new Exception(c);
+ if(!n) {
+ close();
+ error = e;
+ }
+ synchronized(queue) {
+ Command cmd;
+ while((cmd = pending.poll()) != null) {
+ cmd.error(e);
+ }
+ while((cmd = queue.poll()) != null) {
+ cmd.error(e);
+ }
+ }
+ }
+
private void checkthread() {
if(Thread.currentThread() == reader)
throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
return(error);
}
- public void addConnectListener(ConnectListener l) {
+ public synchronized void addConnectListener(ConnectListener l) {
+ if((state != "idle") && (state != "connecting"))
+ throw(new IllegalStateException("Already connected"));
synchronized(connls) {
- if((state != "idle") && (state != "connecting"))
- throw(new IllegalStateException("Already connected"));
connls.add(l);
}
}
}
}
- static private class Writer extends Thread {
- Socket s;
- Queue<Command> queue, pending;
-
- public Writer(Socket s, Queue<Command> queue, Queue<Command> pending) {
- this.s = s;
- this.queue = queue;
- this.pending = pending;
+ private class Writer extends Thread {
+ public Writer() {
setDaemon(true);
}
return(sb.toString());
}
- public void run() {
+ private void guarded() {
try {
java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
while(true) {
Command cmd;
try {
- synchronized(pending) {
- while(pending.size() > 0)
- pending.wait();
- }
synchronized(queue) {
- do {
- if((cmd = queue.poll()) != null)
- break;
+ while(pending.size() > 0)
+ queue.wait();
+ while((cmd = queue.poll()) == null)
queue.wait();
- } while(true);
+ pending.offer(cmd);
}
} catch(InterruptedException e) {
throw(new StopCondition(e, true));
throw(new StopCondition(e, false));
}
}
- }
-
- static private class Reader extends Thread {
- Socket s;
- Queue<Command> pending;
- public Reader(Socket s, Queue<Command> pending) {
- this.s = s;
- this.pending = pending;
+ public void run() {
+ try {
+ guarded();
+ } catch(Throwable t) {
+ error(t);
+ }
}
-
+ }
+
+ private class Reader extends Thread {
private void dispatch(Response resp) throws Exception {
if(resp.code < 600) {
- synchronized(pending) {
- resp.cmd = pending.remove();
- pending.notifyAll();
+ synchronized(queue) {
+ try {
+ resp.cmd = pending.remove();
+ } catch(NoSuchElementException e) {
+ throw(new RuntimeException("DC server sent reply without a pending command"));
+ }
+ queue.notifyAll();
}
resp.cmd.done(resp);
}
}
- public void run() {
+ private void guarded() {
try {
java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
String state = "start";
throw(new StopCondition(e, false));
}
}
+
+ public void run() {
+ try {
+ guarded();
+ } catch(Throwable t) {
+ error(t);
+ }
+ }
}
public void close() {
try {
s.close();
- } catch(IOException e) {
- }
+ } catch(IOException e) {}
reader.interrupt();
writer.interrupt();
}
-
- protected void finalize() {
- close();
- }
}