1 package dolda.dolcon.protocol;
4 import java.net.Socket;
7 public class Connection {
10 private Writer writer;
11 private Queue<Command> queue = new LinkedList<Command>();
12 private Queue<Command> pending = new LinkedList<Command>();
13 private int reqver = 2, revlo, revhi;
16 private Set<ConnectListener> connls = new HashSet<ConnectListener>();
17 private Set<NotifyListener> notls = new HashSet<NotifyListener>();
18 private Exception error;
20 public interface ConnectListener {
21 public void connected() throws Exception;
22 public void error(Exception cause);
25 public Connection(String aspec) {
30 public void connect() throws ConnectException {
33 throw(new IllegalStateException("Already connected"));
37 s = new Socket(aspec, 1500);
38 } catch(java.net.UnknownHostException e) {
39 throw(new ConnectException("Could not resolve host " + aspec, e));
40 } catch(IOException e) {
41 throw(new ConnectException("Could not connect to host " + aspec, e));
43 pending = new LinkedList<Command>();
44 Command ccmd = new Command(".connect");
46 public void done(Response resp) throws Exception {
49 } catch(VersionException e) {
53 synchronized(Connection.this) {
56 synchronized(connls) {
58 for(ConnectListener l : connls)
66 public void error(Exception cause) {
67 synchronized(connls) {
69 for(ConnectListener l : connls)
78 reader = new Reader();
79 writer = new Writer();
84 private void error(Throwable c) {
86 if(c instanceof StopCondition) {
87 StopCondition s = (StopCondition)c;
92 if(c instanceof Exception)
100 synchronized(queue) {
102 while((cmd = pending.poll()) != null) {
105 while((cmd = queue.poll()) != null) {
111 private void checkthread() {
112 if(Thread.currentThread() == reader)
113 throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
116 public void syncConnect() throws ConnectException, InterruptedException {
118 final boolean[] donep = new boolean[] {false};
119 final Exception[] errp = new Exception[] {null};
120 ConnectListener l = new ConnectListener() {
121 public void connected() {
128 public void error(Exception cause) {
136 addConnectListener(l);
144 throw(new ConnectException("DC connection has been closed", errp[0]));
147 public void expectVersion(int reqver) {
148 this.reqver = reqver;
151 private void checkver(Response resp) throws VersionException {
152 revlo = Integer.parseInt(resp.token(0, 0));
153 revhi = Integer.parseInt(resp.token(0, 1));
154 if((reqver < revlo) || (reqver > revhi))
155 throw(new VersionException(reqver, revlo, revhi));
158 public Exception join() throws InterruptedException {
159 while(reader.isAlive()) {
166 public void addNotifyListener(NotifyListener l) {
167 synchronized(notls) {
172 public void removeNotifyListener(NotifyListener l) {
173 synchronized(notls) {
178 public synchronized void addConnectListener(ConnectListener l) {
179 if((state != "idle") && (state != "connecting"))
180 throw(new IllegalStateException("Already connected"));
181 synchronized(connls) {
186 public void qcmd(Command cmd) {
187 synchronized(queue) {
193 public void qcmd(String... tokens) {
194 qcmd(new Command(tokens));
197 public Response ecmd(Command cmd) throws ClosedException, InterruptedException {
199 final boolean[] donep = new boolean[] {false};
200 final Response[] resp = new Response[] {null};
201 final Exception[] errp = new Exception[] {null};
202 Object l = cmd.new Listener() {
203 public synchronized void done(Response rsp) {
209 public synchronized void error(Exception e) {
222 throw(new ClosedException(errp[0]));
226 public Response ecmd(String... tokens) throws ClosedException, InterruptedException {
227 return(ecmd(new Command(tokens)));
230 static private class StopCondition extends Error {
231 final boolean normal;
233 public StopCondition(Exception cause, boolean normal) {
235 this.normal = normal;
239 private class Writer extends Thread {
244 private String quote(String t) {
247 StringBuilder sb = new StringBuilder();
248 boolean quote = false;
249 for(int i = 0; i < t.length(); i++) {
250 char c = t.charAt(i);
253 } else if(Character.isWhitespace(c)) {
261 return("\"" + sb.toString() + "\"");
263 return(sb.toString());
266 private void guarded() {
268 java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
272 synchronized(queue) {
273 while(pending.size() > 0)
275 while((cmd = queue.poll()) == null)
279 } catch(InterruptedException e) {
280 throw(new StopCondition(e, true));
282 StringBuilder out = new StringBuilder();
283 for(String s : cmd.tokens) {
286 out.append(quote(s));
289 w.write(out.toString());
292 } catch(IOException e) {
293 throw(new StopCondition(e, false));
300 } catch(Throwable t) {
306 private class Reader extends Thread {
307 private void dispatch(Response resp) throws Exception {
308 if(resp.code < 600) {
309 synchronized(queue) {
311 resp.cmd = pending.remove();
312 } catch(NoSuchElementException e) {
313 throw(new RuntimeException("DC server sent reply without a pending command"));
319 synchronized(notls) {
320 for(NotifyListener l : notls) {
327 private void guarded() {
329 java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
330 String state = "start";
331 StringBuilder ct = new StringBuilder();
334 List<List<String>> lines = new LinkedList<List<String>>();
335 List<String> tokens = new LinkedList<String>();
341 if((i = r.read()) < 0) {
342 throw(new IOException("The server closed the connection"));
344 } catch(java.nio.channels.ClosedByInterruptException e) {
345 throw(new StopCondition(e, true));
350 if(state == "start") {
353 } else if(Character.isWhitespace(c)) {
361 } else if(state == "nl") {
363 if((code < 100) || (code >= 1000)) {
364 throw(new IOException("Illegal response code " + code + " from the server"));
367 tokens = new LinkedList<String>();
369 dispatch(new Response(code, lines));
370 lines = new LinkedList<List<String>>();
378 } else if(state == "code") {
379 if((c == '-') || Character.isWhitespace(c)) {
381 code = Integer.parseInt(ct.toString());
388 } else if(state == "token") {
389 if(Character.isWhitespace(c)) {
390 tokens.add(ct.toString());
394 } else if(c == '\\') {
396 } else if(c == '"') {
401 } else if(state == "bs") {
404 } else if(state == "cited") {
411 } else if(state == "cbs") {
415 throw(new Error("invalid state " + state));
420 } catch(Exception e) {
421 throw(new StopCondition(e, false));
428 } catch(Throwable t) {
434 public void close() {
437 } catch(IOException e) {}