03152bce6783aeec86c775bd369fa232043451cd
[doldaconnect.git] / lib / java / dolda / dolcon / protocol / Connection.java
1 package dolda.dolcon.protocol;
2
3 import java.io.*;
4 import java.net.Socket;
5 import java.util.*;
6
7 public class Connection {
8     private Socket s;
9     private Reader reader;
10     private Writer writer;
11     private Queue<Command> queue = new LinkedList<Command>();
12     private Queue<Command> pending = new LinkedList<Command>();
13     private int reqver = 2, revlo, revhi;
14     private String aspec;
15     private String state;
16     private Set<ConnectListener> connls = new HashSet<ConnectListener>();
17     private Set<NotifyListener> notls = new HashSet<NotifyListener>();
18     private Exception error;
19     
20     public interface ConnectListener {
21         public void connected() throws Exception;
22         public void error(Exception cause);
23     }
24     
25     public Connection(String aspec) {
26         this.aspec = aspec;
27         state = "idle";
28     }
29     
30     public void connect() throws ConnectException {
31         synchronized(this) {
32             if(state != "idle")
33                 throw(new IllegalStateException("Already connected"));
34             state = "connecting";
35         }
36         try {
37             s = new Socket(aspec, 1500);
38         } catch(java.net.UnknownHostException e) {
39             throw(new ConnectException("Could not resolve host " + aspec, e));
40         } catch(IOException e) {
41             throw(new ConnectException("Could not connect to host " + aspec, e));
42         }
43         pending = new LinkedList<Command>();
44         Command ccmd = new Command(".connect");
45         ccmd.new Listener() {
46                 public void done(Response resp) throws Exception {
47                     try {
48                         checkver(resp);
49                     } catch(VersionException e) {
50                         error(e);
51                         throw(e);
52                     }
53                     synchronized(Connection.this) {
54                         state = "connected";
55                     }
56                     synchronized(connls) {
57                         try {
58                             for(ConnectListener l : connls)
59                                 l.connected();
60                         } finally {
61                             connls.clear();
62                         }
63                     }
64                 }
65                 
66                 public void error(Exception cause) {
67                     synchronized(connls) {
68                         try {
69                             for(ConnectListener l : connls)
70                                 l.error(cause);
71                         } finally {
72                             connls.clear();
73                         }
74                     }
75                 }
76             };
77         pending.offer(ccmd);
78         reader = new Reader();
79         writer = new Writer();
80         reader.start();
81         writer.start();
82     }
83     
84     private void error(Throwable c) {
85         boolean n = false;
86         if(c instanceof StopCondition) {
87             StopCondition s = (StopCondition)c;
88             n = s.normal;
89             c = s.getCause();
90         }
91         Exception e;
92         if(c instanceof Exception)
93             e = (Exception)c;
94         else
95             e = new Exception(c);
96         if(!n) {
97             close();
98             error = e;
99         }
100         synchronized(queue) {
101             Command cmd;
102             while((cmd = pending.poll()) != null) {
103                 cmd.error(e);
104             }
105             while((cmd = queue.poll()) != null) {
106                 cmd.error(e);
107             }
108         }
109     }
110     
111     private void checkthread() {
112         if(Thread.currentThread() == reader)
113             throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
114     }
115         
116     public void syncConnect() throws ConnectException, InterruptedException {
117         checkthread();
118         final boolean[] donep = new boolean[] {false};
119         final Exception[] errp = new Exception[] {null};
120         ConnectListener l = new ConnectListener() {
121                 public void connected() {
122                     donep[0] = true;
123                     synchronized(this) {
124                         notifyAll();
125                     }
126                 }
127                 
128                 public void error(Exception cause) {
129                     donep[0] = true;
130                     errp[0] = cause;
131                     synchronized(this) {
132                         notifyAll();
133                     }
134                 }
135             };
136         addConnectListener(l);
137         connect();
138         while(!donep[0]) {
139             synchronized(l) {
140                 l.wait();
141             }
142         }
143         if(errp[0] != null)
144             throw(new ConnectException("DC connection has been closed", errp[0]));
145     }
146
147     public void expectVersion(int reqver) {
148         this.reqver = reqver;
149     }
150     
151     private void checkver(Response resp) throws VersionException {
152         revlo = Integer.parseInt(resp.token(0, 0));
153         revhi = Integer.parseInt(resp.token(0, 1));
154         if((reqver < revlo) || (reqver > revhi))
155             throw(new VersionException(reqver, revlo, revhi));
156     }
157
158     public Exception join() throws InterruptedException {
159         while(reader.isAlive()) {
160             reader.join();
161         }
162         close();
163         return(error);
164     }
165
166     public void addNotifyListener(NotifyListener l) {
167         synchronized(notls) {
168             notls.add(l);
169         }
170     }
171
172     public void removeNotifyListener(NotifyListener l) {
173         synchronized(notls) {
174             notls.remove(l);
175         }
176     }
177
178     public synchronized void addConnectListener(ConnectListener l) {
179         if((state != "idle") && (state != "connecting"))
180             throw(new IllegalStateException("Already connected"));
181         synchronized(connls) {
182             connls.add(l);
183         }
184     }
185
186     public void qcmd(Command cmd) {
187         synchronized(queue) {
188             queue.offer(cmd);
189             queue.notifyAll();
190         }
191     }
192     
193     public void qcmd(String... tokens) {
194         qcmd(new Command(tokens));
195     }
196     
197     public Response ecmd(Command cmd) throws ClosedException, InterruptedException {
198         checkthread();
199         final boolean[] donep = new boolean[] {false};
200         final Response[] resp = new Response[] {null};
201         final Exception[] errp = new Exception[] {null};
202         Object l = cmd.new Listener() {
203                 public synchronized void done(Response rsp) {
204                     resp[0] = rsp;
205                     donep[0] = true;
206                     notifyAll();
207                 }
208                 
209                 public synchronized void error(Exception e) {
210                     errp[0] = e;
211                     donep[0] = true;
212                     notifyAll();
213                 }
214             };
215         synchronized(l) {
216             while(!donep[0]) {
217                 l.wait();
218             }
219         }
220         if(errp[0] != null)
221             throw(new ClosedException(errp[0]));
222         return(resp[0]);
223     }
224     
225     public Response ecmd(String... tokens) throws ClosedException, InterruptedException {
226         return(ecmd(new Command(tokens)));
227     }
228     
229     static private class StopCondition extends Error {
230         final boolean normal;
231         
232         public StopCondition(Exception cause, boolean normal) {
233             super(cause);
234             this.normal = normal;
235         }
236     }
237     
238     private class Writer extends Thread {
239         public Writer() {
240             setDaemon(true);
241         }
242         
243         private String quote(String t) {
244             if(t.length() == 0)
245                 return("\"\"");
246             StringBuilder sb = new StringBuilder();
247             boolean quote = false;
248             for(int i = 0; i < t.length(); i++) {
249                 char c = t.charAt(i);
250                 if(c == '\"') {
251                     sb.append("\\\"");
252                 } else if(Character.isWhitespace(c)) {
253                     quote = true;
254                     sb.append(c);
255                 } else {
256                     sb.append(c);
257                 }
258             }
259             if(quote)
260                 return("\"" + sb.toString() + "\"");
261             else
262                 return(sb.toString());
263         }
264
265         private void guarded() {
266             try {
267                 java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
268                 while(true) {
269                     Command cmd;
270                     try {
271                         synchronized(queue) {
272                             while(pending.size() > 0)
273                                 queue.wait();
274                             while((cmd = queue.poll()) == null)
275                                 queue.wait();
276                             pending.offer(cmd);
277                         }
278                     } catch(InterruptedException e) {
279                         throw(new StopCondition(e, true));
280                     }
281                     StringBuilder out = new StringBuilder();
282                     for(String s : cmd.tokens) {
283                         if(out.length() > 0)
284                             out.append(' ');
285                         out.append(quote(s));
286                     }
287                     w.write(out.toString());
288                 }
289             } catch(IOException e) {
290                 throw(new StopCondition(e, false));
291             }
292         }
293         
294         public void run() {
295             try {
296                 guarded();
297             } catch(Throwable t) {
298                 error(t);
299             }
300         }
301     }
302
303     private class Reader extends Thread {
304         private void dispatch(Response resp) throws Exception {
305             if(resp.code < 600) {
306                 synchronized(queue) {
307                     try {
308                         resp.cmd = pending.remove();
309                     } catch(NoSuchElementException e) {
310                         throw(new RuntimeException("DC server sent reply without a pending command"));
311                     }
312                     queue.notifyAll();
313                 }
314                 resp.cmd.done(resp);
315             } else {
316                 synchronized(notls) {
317                     for(NotifyListener l : notls) {
318                         l.notified(resp);
319                     }
320                 }
321             }
322         }
323
324         private void guarded() {
325             try {
326                 java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
327                 String state = "start";
328                 StringBuilder ct = new StringBuilder();
329                 int code = -1;
330                 boolean last = true;
331                 List<List<String>> lines = new LinkedList<List<String>>();
332                 List<String> tokens = new LinkedList<String>();
333                 while(true) {
334                     char c;
335                     {
336                         int i;
337                         try {
338                             if((i = r.read()) < 0) {
339                                 throw(new IOException("The server closed the connection"));
340                             }
341                         } catch(java.nio.channels.ClosedByInterruptException e) {
342                             throw(new StopCondition(e, true));
343                         }
344                         c = (char)i;
345                     }
346                     eat: do {
347                         if(state == "start") {
348                             if(c == '\r') {
349                                 state = "nl";
350                             } else if(Character.isWhitespace(c)) {
351                             } else {
352                                 if(code == -1)
353                                     state = "code";
354                                 else
355                                     state = "token";
356                                 continue eat;
357                             }
358                         } else if(state == "nl") {
359                             if(c == '\n') {
360                                 if((code < 100) || (code >= 1000)) {
361                                     throw(new IOException("Illegal response code " + code + " from the server"));
362                                 }
363                                 lines.add(tokens);
364                                 tokens = new LinkedList<String>();
365                                 if(last) {
366                                     dispatch(new Response(code, lines));
367                                     lines = new LinkedList<List<String>>();
368                                 }
369                                 code = -1;
370                                 state = "start";
371                             } else {
372                                 state = "start";
373                                 continue eat;
374                             }
375                         } else if(state == "code") {
376                             if((c == '-') || Character.isWhitespace(c)) {
377                                 last = c != '-';
378                                 code = Integer.parseInt(ct.toString());
379                                 ct.setLength(0);
380                                 state = "start";
381                                 continue eat;
382                             } else {
383                                 ct.append(c);
384                             }
385                         } else if(state == "token") {
386                             if(Character.isWhitespace(c)) {
387                                 tokens.add(ct.toString());
388                                 ct.setLength(0);
389                                 state = "start";
390                                 continue eat;
391                             } else if(c == '\\') {
392                                 state = "bs";
393                             } else if(c == '"') {
394                                 state = "cited";
395                             } else {
396                                 ct.append(c);
397                             }
398                         } else if(state == "bs") {
399                             ct.append(c);
400                             state = "token";
401                         } else if(state == "cited") {
402                             if(c == '\\')
403                                 state = "cbs";
404                             else if(c == '"')
405                                 state = "token";
406                             else
407                                 ct.append(c);
408                         } else if(state == "cbs") {
409                             ct.append(c);
410                             state = "cited";
411                         } else {
412                             throw(new Error("invalid state " + state));
413                         }
414                         break;
415                     } while(true);
416                 }
417             } catch(Exception e) {
418                 throw(new StopCondition(e, false));
419             }
420         }
421         
422         public void run() {
423             try {
424                 guarded();
425             } catch(Throwable t) {
426                 error(t);
427             }
428         }
429     }
430
431     public void close() {
432         try {
433             s.close();
434         } catch(IOException e) {}
435         reader.interrupt();
436         writer.interrupt();
437     }
438 }