31bb172c8712cbbddf2c1214f09a3639f39f427d
[doldaconnect.git] / lib / java / dolda / dolcon / protocol / Connection.java
1 package dolda.dolcon.protocol;
2
3 import java.io.*;
4 import java.net.Socket;
5 import java.util.*;
6
7 public class Connection {
8     private Socket s;
9     private Reader reader;
10     private Writer writer;
11     private Queue<Command> queue = new LinkedList<Command>();
12     private Queue<Command> pending = new LinkedList<Command>();
13     private int reqver = 2, revlo, revhi;
14     private String aspec;
15     private String state;
16     private Set<ConnectListener> connls = new HashSet<ConnectListener>();
17     private Exception error;
18     
19     public interface ConnectListener {
20         public void connected() throws Exception;
21         public void error(Exception cause);
22     }
23
24     public Connection(String aspec) {
25         this.aspec = aspec;
26         state = "idle";
27     }
28     
29     public void connect() throws ConnectException {
30         synchronized(this) {
31             if(state != "idle")
32                 throw(new IllegalStateException("Already connected"));
33             state = "connecting";
34         }
35         try {
36             s = new Socket(aspec, 1500);
37         } catch(java.net.UnknownHostException e) {
38             throw(new ConnectException("Could not resolve host " + aspec, e));
39         } catch(IOException e) {
40             throw(new ConnectException("Could not connect to host " + aspec, e));
41         }
42         pending = new LinkedList<Command>();
43         Command ccmd = new Command(".connect");
44         ccmd.new Listener() {
45                 public void done(Response resp) throws Exception {
46                     try {
47                         checkver(resp);
48                     } catch(VersionException e) {
49                         error(e);
50                         throw(e);
51                     }
52                     synchronized(Connection.this) {
53                         state = "connected";
54                     }
55                     synchronized(connls) {
56                         try {
57                             for(ConnectListener l : connls)
58                                 l.connected();
59                         } finally {
60                             connls.clear();
61                         }
62                     }
63                 }
64                 
65                 public void error(Exception cause) {
66                     synchronized(connls) {
67                         try {
68                             for(ConnectListener l : connls)
69                                 l.error(cause);
70                         } finally {
71                             connls.clear();
72                         }
73                     }
74                 }
75             };
76         pending.offer(ccmd);
77         reader = new Reader();
78         writer = new Writer();
79         reader.start();
80         writer.start();
81     }
82     
83     private void error(Throwable c) {
84         boolean n = false;
85         if(c instanceof StopCondition) {
86             StopCondition s = (StopCondition)c;
87             n = s.normal;
88             c = s.getCause();
89         }
90         Exception e;
91         if(c instanceof Exception)
92             e = (Exception)c;
93         else
94             e = new Exception(c);
95         if(!n) {
96             close();
97             error = e;
98         }
99         synchronized(queue) {
100             Command cmd;
101             while((cmd = pending.poll()) != null) {
102                 cmd.error(e);
103             }
104             while((cmd = queue.poll()) != null) {
105                 cmd.error(e);
106             }
107         }
108     }
109     
110     private void checkthread() {
111         if(Thread.currentThread() == reader)
112             throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
113     }
114         
115     public void syncConnect() throws ConnectException, InterruptedException {
116         checkthread();
117         final boolean[] donep = new boolean[] {false};
118         final Exception[] errp = new Exception[] {null};
119         ConnectListener l = new ConnectListener() {
120                 public void connected() {
121                     donep[0] = true;
122                     synchronized(this) {
123                         notifyAll();
124                     }
125                 }
126                 
127                 public void error(Exception cause) {
128                     donep[0] = true;
129                     errp[0] = cause;
130                     synchronized(this) {
131                         notifyAll();
132                     }
133                 }
134             };
135         addConnectListener(l);
136         connect();
137         while(!donep[0]) {
138             synchronized(l) {
139                 l.wait();
140             }
141         }
142         if(errp[0] != null)
143             throw(new ConnectException("DC connection has been closed", errp[0]));
144     }
145
146     public void expectVersion(int reqver) {
147         this.reqver = reqver;
148     }
149     
150     private void checkver(Response resp) throws VersionException {
151         revlo = Integer.parseInt(resp.token(0, 0));
152         revhi = Integer.parseInt(resp.token(0, 1));
153         if((reqver < revlo) || (reqver > revhi))
154             throw(new VersionException(reqver, revlo, revhi));
155     }
156
157     public Exception join() throws InterruptedException {
158         while(reader.isAlive()) {
159             reader.join();
160         }
161         close();
162         return(error);
163     }
164
165     public synchronized void addConnectListener(ConnectListener l) {
166         if((state != "idle") && (state != "connecting"))
167             throw(new IllegalStateException("Already connected"));
168         synchronized(connls) {
169             connls.add(l);
170         }
171     }
172
173     public void qcmd(Command cmd) {
174         synchronized(queue) {
175             queue.offer(cmd);
176             queue.notifyAll();
177         }
178     }
179     
180     public void qcmd(String... tokens) {
181         qcmd(new Command(tokens));
182     }
183     
184     public Response ecmd(Command cmd) throws ClosedException, InterruptedException {
185         checkthread();
186         final boolean[] donep = new boolean[] {false};
187         final Response[] resp = new Response[] {null};
188         final Exception[] errp = new Exception[] {null};
189         Object l = cmd.new Listener() {
190                 public synchronized void done(Response rsp) {
191                     resp[0] = rsp;
192                     donep[0] = true;
193                     notifyAll();
194                 }
195                 
196                 public synchronized void error(Exception e) {
197                     errp[0] = e;
198                     donep[0] = true;
199                     notifyAll();
200                 }
201             };
202         synchronized(l) {
203             while(!donep[0]) {
204                 l.wait();
205             }
206         }
207         if(errp[0] != null)
208             throw(new ClosedException(errp[0]));
209         return(resp[0]);
210     }
211     
212     public Response ecmd(String... tokens) throws ClosedException, InterruptedException {
213         return(ecmd(new Command(tokens)));
214     }
215     
216     static private class StopCondition extends Error {
217         final boolean normal;
218         
219         public StopCondition(Exception cause, boolean normal) {
220             super(cause);
221             this.normal = normal;
222         }
223     }
224     
225     private class Writer extends Thread {
226         public Writer() {
227             setDaemon(true);
228         }
229         
230         private String quote(String t) {
231             if(t.length() == 0)
232                 return("\"\"");
233             StringBuilder sb = new StringBuilder();
234             boolean quote = false;
235             for(int i = 0; i < t.length(); i++) {
236                 char c = t.charAt(i);
237                 if(c == '\"') {
238                     sb.append("\\\"");
239                 } else if(Character.isWhitespace(c)) {
240                     quote = true;
241                     sb.append(c);
242                 } else {
243                     sb.append(c);
244                 }
245             }
246             if(quote)
247                 return("\"" + sb.toString() + "\"");
248             else
249                 return(sb.toString());
250         }
251
252         private void guarded() {
253             try {
254                 java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
255                 while(true) {
256                     Command cmd;
257                     try {
258                         synchronized(queue) {
259                             while(pending.size() > 0)
260                                 queue.wait();
261                             while((cmd = queue.poll()) == null)
262                                 queue.wait();
263                             pending.offer(cmd);
264                         }
265                     } catch(InterruptedException e) {
266                         throw(new StopCondition(e, true));
267                     }
268                     StringBuilder out = new StringBuilder();
269                     for(String s : cmd.tokens) {
270                         if(out.length() > 0)
271                             out.append(' ');
272                         out.append(quote(s));
273                     }
274                     w.write(out.toString());
275                 }
276             } catch(IOException e) {
277                 throw(new StopCondition(e, false));
278             }
279         }
280         
281         public void run() {
282             try {
283                 guarded();
284             } catch(Throwable t) {
285                 error(t);
286             }
287         }
288     }
289
290     private class Reader extends Thread {
291         private void dispatch(Response resp) throws Exception {
292             if(resp.code < 600) {
293                 synchronized(queue) {
294                     try {
295                         resp.cmd = pending.remove();
296                     } catch(NoSuchElementException e) {
297                         throw(new RuntimeException("DC server sent reply without a pending command"));
298                     }
299                     queue.notifyAll();
300                 }
301                 resp.cmd.done(resp);
302             }
303         }
304
305         private void guarded() {
306             try {
307                 java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
308                 String state = "start";
309                 StringBuilder ct = new StringBuilder();
310                 int code = -1;
311                 boolean last = true;
312                 List<List<String>> lines = new LinkedList<List<String>>();
313                 List<String> tokens = new LinkedList<String>();
314                 while(true) {
315                     char c;
316                     {
317                         int i;
318                         try {
319                             if((i = r.read()) < 0) {
320                                 throw(new IOException("The server closed the connection"));
321                             }
322                         } catch(java.nio.channels.ClosedByInterruptException e) {
323                             throw(new StopCondition(e, true));
324                         }
325                         c = (char)i;
326                     }
327                     eat: do {
328                         if(state == "start") {
329                             if(c == '\r') {
330                                 state = "nl";
331                             } else if(Character.isWhitespace(c)) {
332                             } else {
333                                 if(code == -1)
334                                     state = "code";
335                                 else
336                                     state = "token";
337                                 continue eat;
338                             }
339                         } else if(state == "nl") {
340                             if(c == '\n') {
341                                 if((code < 100) || (code >= 1000)) {
342                                     throw(new IOException("Illegal response code " + code + " from the server"));
343                                 }
344                                 lines.add(tokens);
345                                 tokens = new LinkedList<String>();
346                                 if(last) {
347                                     dispatch(new Response(code, lines));
348                                     lines = new LinkedList<List<String>>();
349                                 }
350                                 code = -1;
351                                 state = "start";
352                             } else {
353                                 state = "start";
354                                 continue eat;
355                             }
356                         } else if(state == "code") {
357                             if((c == '-') || Character.isWhitespace(c)) {
358                                 last = c != '-';
359                                 code = Integer.parseInt(ct.toString());
360                                 ct.setLength(0);
361                                 state = "start";
362                                 continue eat;
363                             } else {
364                                 ct.append(c);
365                             }
366                         } else if(state == "token") {
367                             if(Character.isWhitespace(c)) {
368                                 tokens.add(ct.toString());
369                                 ct.setLength(0);
370                                 state = "start";
371                                 continue eat;
372                             } else if(c == '\\') {
373                                 state = "bs";
374                             } else if(c == '"') {
375                                 state = "cited";
376                             } else {
377                                 ct.append(c);
378                             }
379                         } else if(state == "bs") {
380                             ct.append(c);
381                             state = "token";
382                         } else if(state == "cited") {
383                             if(c == '\\')
384                                 state = "cbs";
385                             else if(c == '"')
386                                 state = "token";
387                             else
388                                 ct.append(c);
389                         } else if(state == "cbs") {
390                             ct.append(c);
391                             state = "cited";
392                         } else {
393                             throw(new Error("invalid state " + state));
394                         }
395                         break;
396                     } while(true);
397                 }
398             } catch(Exception e) {
399                 throw(new StopCondition(e, false));
400             }
401         }
402         
403         public void run() {
404             try {
405                 guarded();
406             } catch(Throwable t) {
407                 error(t);
408             }
409         }
410     }
411
412     public void close() {
413         try {
414             s.close();
415         } catch(IOException e) {}
416         reader.interrupt();
417         writer.interrupt();
418     }
419 }