Java: Fix ecmd and Writer bugs.
[doldaconnect.git] / lib / java / dolda / dolcon / protocol / Connection.java
1 package dolda.dolcon.protocol;
2
3 import java.io.*;
4 import java.net.Socket;
5 import java.util.*;
6
7 public class Connection {
8     private Socket s;
9     private Reader reader;
10     private Writer writer;
11     private Queue<Command> queue = new LinkedList<Command>();
12     private Queue<Command> pending = new LinkedList<Command>();
13     private int reqver = 2, revlo, revhi;
14     private String aspec;
15     private String state;
16     private Set<ConnectListener> connls = new HashSet<ConnectListener>();
17     private Set<NotifyListener> notls = new HashSet<NotifyListener>();
18     private Exception error;
19     
20     public interface ConnectListener {
21         public void connected() throws Exception;
22         public void error(Exception cause);
23     }
24     
25     public Connection(String aspec) {
26         this.aspec = aspec;
27         state = "idle";
28     }
29     
30     public void connect() throws ConnectException {
31         synchronized(this) {
32             if(state != "idle")
33                 throw(new IllegalStateException("Already connected"));
34             state = "connecting";
35         }
36         try {
37             s = new Socket(aspec, 1500);
38         } catch(java.net.UnknownHostException e) {
39             throw(new ConnectException("Could not resolve host " + aspec, e));
40         } catch(IOException e) {
41             throw(new ConnectException("Could not connect to host " + aspec, e));
42         }
43         pending = new LinkedList<Command>();
44         Command ccmd = new Command(".connect");
45         ccmd.new Listener() {
46                 public void done(Response resp) throws Exception {
47                     try {
48                         checkver(resp);
49                     } catch(VersionException e) {
50                         error(e);
51                         throw(e);
52                     }
53                     synchronized(Connection.this) {
54                         state = "connected";
55                     }
56                     synchronized(connls) {
57                         try {
58                             for(ConnectListener l : connls)
59                                 l.connected();
60                         } finally {
61                             connls.clear();
62                         }
63                     }
64                 }
65                 
66                 public void error(Exception cause) {
67                     synchronized(connls) {
68                         try {
69                             for(ConnectListener l : connls)
70                                 l.error(cause);
71                         } finally {
72                             connls.clear();
73                         }
74                     }
75                 }
76             };
77         pending.offer(ccmd);
78         reader = new Reader();
79         writer = new Writer();
80         reader.start();
81         writer.start();
82     }
83     
84     private void error(Throwable c) {
85         boolean n = false;
86         if(c instanceof StopCondition) {
87             StopCondition s = (StopCondition)c;
88             n = s.normal;
89             c = s.getCause();
90         }
91         Exception e;
92         if(c instanceof Exception)
93             e = (Exception)c;
94         else
95             e = new Exception(c);
96         if(!n) {
97             close();
98             error = e;
99         }
100         synchronized(queue) {
101             Command cmd;
102             while((cmd = pending.poll()) != null) {
103                 cmd.error(e);
104             }
105             while((cmd = queue.poll()) != null) {
106                 cmd.error(e);
107             }
108         }
109     }
110     
111     private void checkthread() {
112         if(Thread.currentThread() == reader)
113             throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
114     }
115         
116     public void syncConnect() throws ConnectException, InterruptedException {
117         checkthread();
118         final boolean[] donep = new boolean[] {false};
119         final Exception[] errp = new Exception[] {null};
120         ConnectListener l = new ConnectListener() {
121                 public void connected() {
122                     donep[0] = true;
123                     synchronized(this) {
124                         notifyAll();
125                     }
126                 }
127                 
128                 public void error(Exception cause) {
129                     donep[0] = true;
130                     errp[0] = cause;
131                     synchronized(this) {
132                         notifyAll();
133                     }
134                 }
135             };
136         addConnectListener(l);
137         connect();
138         while(!donep[0]) {
139             synchronized(l) {
140                 l.wait();
141             }
142         }
143         if(errp[0] != null)
144             throw(new ConnectException("DC connection has been closed", errp[0]));
145     }
146
147     public void expectVersion(int reqver) {
148         this.reqver = reqver;
149     }
150     
151     private void checkver(Response resp) throws VersionException {
152         revlo = Integer.parseInt(resp.token(0, 0));
153         revhi = Integer.parseInt(resp.token(0, 1));
154         if((reqver < revlo) || (reqver > revhi))
155             throw(new VersionException(reqver, revlo, revhi));
156     }
157
158     public Exception join() throws InterruptedException {
159         while(reader.isAlive()) {
160             reader.join();
161         }
162         close();
163         return(error);
164     }
165
166     public void addNotifyListener(NotifyListener l) {
167         synchronized(notls) {
168             notls.add(l);
169         }
170     }
171
172     public void removeNotifyListener(NotifyListener l) {
173         synchronized(notls) {
174             notls.remove(l);
175         }
176     }
177
178     public synchronized void addConnectListener(ConnectListener l) {
179         if((state != "idle") && (state != "connecting"))
180             throw(new IllegalStateException("Already connected"));
181         synchronized(connls) {
182             connls.add(l);
183         }
184     }
185
186     public void qcmd(Command cmd) {
187         synchronized(queue) {
188             queue.offer(cmd);
189             queue.notifyAll();
190         }
191     }
192     
193     public void qcmd(String... tokens) {
194         qcmd(new Command(tokens));
195     }
196     
197     public Response ecmd(Command cmd) throws ClosedException, InterruptedException {
198         checkthread();
199         final boolean[] donep = new boolean[] {false};
200         final Response[] resp = new Response[] {null};
201         final Exception[] errp = new Exception[] {null};
202         Object l = cmd.new Listener() {
203                 public synchronized void done(Response rsp) {
204                     resp[0] = rsp;
205                     donep[0] = true;
206                     notifyAll();
207                 }
208                 
209                 public synchronized void error(Exception e) {
210                     errp[0] = e;
211                     donep[0] = true;
212                     notifyAll();
213                 }
214             };
215         qcmd(cmd);
216         synchronized(l) {
217             while(!donep[0]) {
218                 l.wait();
219             }
220         }
221         if(errp[0] != null)
222             throw(new ClosedException(errp[0]));
223         return(resp[0]);
224     }
225     
226     public Response ecmd(String... tokens) throws ClosedException, InterruptedException {
227         return(ecmd(new Command(tokens)));
228     }
229     
230     static private class StopCondition extends Error {
231         final boolean normal;
232         
233         public StopCondition(Exception cause, boolean normal) {
234             super(cause);
235             this.normal = normal;
236         }
237     }
238     
239     private class Writer extends Thread {
240         public Writer() {
241             setDaemon(true);
242         }
243         
244         private String quote(String t) {
245             if(t.length() == 0)
246                 return("\"\"");
247             StringBuilder sb = new StringBuilder();
248             boolean quote = false;
249             for(int i = 0; i < t.length(); i++) {
250                 char c = t.charAt(i);
251                 if(c == '\"') {
252                     sb.append("\\\"");
253                 } else if(Character.isWhitespace(c)) {
254                     quote = true;
255                     sb.append(c);
256                 } else {
257                     sb.append(c);
258                 }
259             }
260             if(quote)
261                 return("\"" + sb.toString() + "\"");
262             else
263                 return(sb.toString());
264         }
265
266         private void guarded() {
267             try {
268                 java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
269                 while(true) {
270                     Command cmd;
271                     try {
272                         synchronized(queue) {
273                             while(pending.size() > 0)
274                                 queue.wait();
275                             while((cmd = queue.poll()) == null)
276                                 queue.wait();
277                             pending.offer(cmd);
278                         }
279                     } catch(InterruptedException e) {
280                         throw(new StopCondition(e, true));
281                     }
282                     StringBuilder out = new StringBuilder();
283                     for(String s : cmd.tokens) {
284                         if(out.length() > 0)
285                             out.append(' ');
286                         out.append(quote(s));
287                     }
288                     out.append("\r\n");
289                     w.write(out.toString());
290                     w.flush();
291                 }
292             } catch(IOException e) {
293                 throw(new StopCondition(e, false));
294             }
295         }
296         
297         public void run() {
298             try {
299                 guarded();
300             } catch(Throwable t) {
301                 error(t);
302             }
303         }
304     }
305
306     private class Reader extends Thread {
307         private void dispatch(Response resp) throws Exception {
308             if(resp.code < 600) {
309                 synchronized(queue) {
310                     try {
311                         resp.cmd = pending.remove();
312                     } catch(NoSuchElementException e) {
313                         throw(new RuntimeException("DC server sent reply without a pending command"));
314                     }
315                     queue.notifyAll();
316                 }
317                 resp.cmd.done(resp);
318             } else {
319                 synchronized(notls) {
320                     for(NotifyListener l : notls) {
321                         l.notified(resp);
322                     }
323                 }
324             }
325         }
326
327         private void guarded() {
328             try {
329                 java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
330                 String state = "start";
331                 StringBuilder ct = new StringBuilder();
332                 int code = -1;
333                 boolean last = true;
334                 List<List<String>> lines = new LinkedList<List<String>>();
335                 List<String> tokens = new LinkedList<String>();
336                 while(true) {
337                     char c;
338                     {
339                         int i;
340                         try {
341                             if((i = r.read()) < 0) {
342                                 throw(new IOException("The server closed the connection"));
343                             }
344                         } catch(java.nio.channels.ClosedByInterruptException e) {
345                             throw(new StopCondition(e, true));
346                         }
347                         c = (char)i;
348                     }
349                     eat: do {
350                         if(state == "start") {
351                             if(c == '\r') {
352                                 state = "nl";
353                             } else if(Character.isWhitespace(c)) {
354                             } else {
355                                 if(code == -1)
356                                     state = "code";
357                                 else
358                                     state = "token";
359                                 continue eat;
360                             }
361                         } else if(state == "nl") {
362                             if(c == '\n') {
363                                 if((code < 100) || (code >= 1000)) {
364                                     throw(new IOException("Illegal response code " + code + " from the server"));
365                                 }
366                                 lines.add(tokens);
367                                 tokens = new LinkedList<String>();
368                                 if(last) {
369                                     dispatch(new Response(code, lines));
370                                     lines = new LinkedList<List<String>>();
371                                 }
372                                 code = -1;
373                                 state = "start";
374                             } else {
375                                 state = "start";
376                                 continue eat;
377                             }
378                         } else if(state == "code") {
379                             if((c == '-') || Character.isWhitespace(c)) {
380                                 last = c != '-';
381                                 code = Integer.parseInt(ct.toString());
382                                 ct.setLength(0);
383                                 state = "start";
384                                 continue eat;
385                             } else {
386                                 ct.append(c);
387                             }
388                         } else if(state == "token") {
389                             if(Character.isWhitespace(c)) {
390                                 tokens.add(ct.toString());
391                                 ct.setLength(0);
392                                 state = "start";
393                                 continue eat;
394                             } else if(c == '\\') {
395                                 state = "bs";
396                             } else if(c == '"') {
397                                 state = "cited";
398                             } else {
399                                 ct.append(c);
400                             }
401                         } else if(state == "bs") {
402                             ct.append(c);
403                             state = "token";
404                         } else if(state == "cited") {
405                             if(c == '\\')
406                                 state = "cbs";
407                             else if(c == '"')
408                                 state = "token";
409                             else
410                                 ct.append(c);
411                         } else if(state == "cbs") {
412                             ct.append(c);
413                             state = "cited";
414                         } else {
415                             throw(new Error("invalid state " + state));
416                         }
417                         break;
418                     } while(true);
419                 }
420             } catch(Exception e) {
421                 throw(new StopCondition(e, false));
422             }
423         }
424         
425         public void run() {
426             try {
427                 guarded();
428             } catch(Throwable t) {
429                 error(t);
430             }
431         }
432     }
433
434     public void close() {
435         try {
436             s.close();
437         } catch(IOException e) {}
438         reader.interrupt();
439         writer.interrupt();
440     }
441 }