Java: Added ecmd to the connection manager.
[doldaconnect.git] / lib / java / dolda / dolcon / protocol / Connection.java
... / ...
CommitLineData
1package dolda.dolcon.protocol;
2
3import java.io.*;
4import java.net.Socket;
5import java.util.*;
6
7public class Connection {
8 private Socket s;
9 private Reader reader;
10 private Writer writer;
11 private Queue<Command> queue = new LinkedList<Command>();
12 private Queue<Command> pending = new LinkedList<Command>();
13 private int reqver = 2, revlo, revhi;
14 private String aspec;
15 private String state;
16 private Set<ConnectListener> connls = new HashSet<ConnectListener>();
17 private Exception error;
18
19 public interface ConnectListener {
20 public void connected() throws Exception;
21 public void error(Exception cause);
22 }
23
24 public Connection(String aspec) {
25 this.aspec = aspec;
26 state = "idle";
27 }
28
29 public void connect() throws ConnectException {
30 synchronized(this) {
31 if(state != "idle")
32 throw(new IllegalStateException("Already connected"));
33 state = "connecting";
34 }
35 try {
36 s = new Socket(aspec, 1500);
37 } catch(java.net.UnknownHostException e) {
38 throw(new ConnectException("Could not resolve host " + aspec, e));
39 } catch(IOException e) {
40 throw(new ConnectException("Could not connect to host " + aspec, e));
41 }
42 pending = new LinkedList<Command>();
43 Command ccmd = new Command(".connect");
44 ccmd.new Listener() {
45 public void done(Response resp) throws Exception {
46 try {
47 checkver(resp);
48 } catch(VersionException e) {
49 error(e);
50 throw(e);
51 }
52 synchronized(Connection.this) {
53 state = "connected";
54 }
55 synchronized(connls) {
56 try {
57 for(ConnectListener l : connls)
58 l.connected();
59 } finally {
60 connls.clear();
61 }
62 }
63 }
64
65 public void error(Exception cause) {
66 synchronized(connls) {
67 try {
68 for(ConnectListener l : connls)
69 l.error(cause);
70 } finally {
71 connls.clear();
72 }
73 }
74 }
75 };
76 pending.offer(ccmd);
77 reader = new Reader();
78 writer = new Writer();
79 reader.start();
80 writer.start();
81 }
82
83 private void error(Throwable c) {
84 boolean n = false;
85 if(c instanceof StopCondition) {
86 StopCondition s = (StopCondition)c;
87 n = s.normal;
88 c = s.getCause();
89 }
90 Exception e;
91 if(c instanceof Exception)
92 e = (Exception)c;
93 else
94 e = new Exception(c);
95 if(!n) {
96 close();
97 error = e;
98 }
99 synchronized(queue) {
100 Command cmd;
101 while((cmd = pending.poll()) != null) {
102 cmd.error(e);
103 }
104 while((cmd = queue.poll()) != null) {
105 cmd.error(e);
106 }
107 }
108 }
109
110 private void checkthread() {
111 if(Thread.currentThread() == reader)
112 throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
113 }
114
115 public void syncConnect() throws ConnectException, InterruptedException {
116 checkthread();
117 final boolean[] donep = new boolean[] {false};
118 final Exception[] errp = new Exception[] {null};
119 ConnectListener l = new ConnectListener() {
120 public void connected() {
121 donep[0] = true;
122 synchronized(this) {
123 notifyAll();
124 }
125 }
126
127 public void error(Exception cause) {
128 donep[0] = true;
129 errp[0] = cause;
130 synchronized(this) {
131 notifyAll();
132 }
133 }
134 };
135 addConnectListener(l);
136 connect();
137 while(!donep[0]) {
138 synchronized(l) {
139 l.wait();
140 }
141 }
142 if(errp[0] != null)
143 throw(new ConnectException("DC connection has been closed", errp[0]));
144 }
145
146 public void expectVersion(int reqver) {
147 this.reqver = reqver;
148 }
149
150 private void checkver(Response resp) throws VersionException {
151 revlo = Integer.parseInt(resp.token(0, 0));
152 revhi = Integer.parseInt(resp.token(0, 1));
153 if((reqver < revlo) || (reqver > revhi))
154 throw(new VersionException(reqver, revlo, revhi));
155 }
156
157 public Exception join() throws InterruptedException {
158 while(reader.isAlive()) {
159 reader.join();
160 }
161 close();
162 return(error);
163 }
164
165 public synchronized void addConnectListener(ConnectListener l) {
166 if((state != "idle") && (state != "connecting"))
167 throw(new IllegalStateException("Already connected"));
168 synchronized(connls) {
169 connls.add(l);
170 }
171 }
172
173 public void qcmd(Command cmd) {
174 synchronized(queue) {
175 queue.offer(cmd);
176 queue.notifyAll();
177 }
178 }
179
180 public void qcmd(String... tokens) {
181 qcmd(new Command(tokens));
182 }
183
184 public Response ecmd(Command cmd) throws ClosedException, InterruptedException {
185 checkthread();
186 final boolean[] donep = new boolean[] {false};
187 final Response[] resp = new Response[] {null};
188 final Exception[] errp = new Exception[] {null};
189 Object l = cmd.new Listener() {
190 public synchronized void done(Response rsp) {
191 resp[0] = rsp;
192 donep[0] = true;
193 notifyAll();
194 }
195
196 public synchronized void error(Exception e) {
197 errp[0] = e;
198 donep[0] = true;
199 notifyAll();
200 }
201 };
202 synchronized(l) {
203 while(!donep[0]) {
204 l.wait();
205 }
206 }
207 if(errp[0] != null)
208 throw(new ClosedException(errp[0]));
209 return(resp[0]);
210 }
211
212 public Response ecmd(String... tokens) throws ClosedException, InterruptedException {
213 return(ecmd(new Command(tokens)));
214 }
215
216 static private class StopCondition extends Error {
217 final boolean normal;
218
219 public StopCondition(Exception cause, boolean normal) {
220 super(cause);
221 this.normal = normal;
222 }
223 }
224
225 private class Writer extends Thread {
226 public Writer() {
227 setDaemon(true);
228 }
229
230 private String quote(String t) {
231 if(t.length() == 0)
232 return("\"\"");
233 StringBuilder sb = new StringBuilder();
234 boolean quote = false;
235 for(int i = 0; i < t.length(); i++) {
236 char c = t.charAt(i);
237 if(c == '\"') {
238 sb.append("\\\"");
239 } else if(Character.isWhitespace(c)) {
240 quote = true;
241 sb.append(c);
242 } else {
243 sb.append(c);
244 }
245 }
246 if(quote)
247 return("\"" + sb.toString() + "\"");
248 else
249 return(sb.toString());
250 }
251
252 private void guarded() {
253 try {
254 java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
255 while(true) {
256 Command cmd;
257 try {
258 synchronized(queue) {
259 while(pending.size() > 0)
260 queue.wait();
261 while((cmd = queue.poll()) == null)
262 queue.wait();
263 pending.offer(cmd);
264 }
265 } catch(InterruptedException e) {
266 throw(new StopCondition(e, true));
267 }
268 StringBuilder out = new StringBuilder();
269 for(String s : cmd.tokens) {
270 if(out.length() > 0)
271 out.append(' ');
272 out.append(quote(s));
273 }
274 w.write(out.toString());
275 }
276 } catch(IOException e) {
277 throw(new StopCondition(e, false));
278 }
279 }
280
281 public void run() {
282 try {
283 guarded();
284 } catch(Throwable t) {
285 error(t);
286 }
287 }
288 }
289
290 private class Reader extends Thread {
291 private void dispatch(Response resp) throws Exception {
292 if(resp.code < 600) {
293 synchronized(queue) {
294 try {
295 resp.cmd = pending.remove();
296 } catch(NoSuchElementException e) {
297 throw(new RuntimeException("DC server sent reply without a pending command"));
298 }
299 queue.notifyAll();
300 }
301 resp.cmd.done(resp);
302 }
303 }
304
305 private void guarded() {
306 try {
307 java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
308 String state = "start";
309 StringBuilder ct = new StringBuilder();
310 int code = -1;
311 boolean last = true;
312 List<List<String>> lines = new LinkedList<List<String>>();
313 List<String> tokens = new LinkedList<String>();
314 while(true) {
315 char c;
316 {
317 int i;
318 try {
319 if((i = r.read()) < 0) {
320 throw(new IOException("The server closed the connection"));
321 }
322 } catch(java.nio.channels.ClosedByInterruptException e) {
323 throw(new StopCondition(e, true));
324 }
325 c = (char)i;
326 }
327 eat: do {
328 if(state == "start") {
329 if(c == '\r') {
330 state = "nl";
331 } else if(Character.isWhitespace(c)) {
332 } else {
333 if(code == -1)
334 state = "code";
335 else
336 state = "token";
337 continue eat;
338 }
339 } else if(state == "nl") {
340 if(c == '\n') {
341 if((code < 100) || (code >= 1000)) {
342 throw(new IOException("Illegal response code " + code + " from the server"));
343 }
344 lines.add(tokens);
345 tokens = new LinkedList<String>();
346 if(last) {
347 dispatch(new Response(code, lines));
348 lines = new LinkedList<List<String>>();
349 }
350 code = -1;
351 state = "start";
352 } else {
353 state = "start";
354 continue eat;
355 }
356 } else if(state == "code") {
357 if((c == '-') || Character.isWhitespace(c)) {
358 last = c != '-';
359 code = Integer.parseInt(ct.toString());
360 ct.setLength(0);
361 state = "start";
362 continue eat;
363 } else {
364 ct.append(c);
365 }
366 } else if(state == "token") {
367 if(Character.isWhitespace(c)) {
368 tokens.add(ct.toString());
369 ct.setLength(0);
370 state = "start";
371 continue eat;
372 } else if(c == '\\') {
373 state = "bs";
374 } else if(c == '"') {
375 state = "cited";
376 } else {
377 ct.append(c);
378 }
379 } else if(state == "bs") {
380 ct.append(c);
381 state = "token";
382 } else if(state == "cited") {
383 if(c == '\\')
384 state = "cbs";
385 else if(c == '"')
386 state = "token";
387 else
388 ct.append(c);
389 } else if(state == "cbs") {
390 ct.append(c);
391 state = "cited";
392 } else {
393 throw(new Error("invalid state " + state));
394 }
395 break;
396 } while(true);
397 }
398 } catch(Exception e) {
399 throw(new StopCondition(e, false));
400 }
401 }
402
403 public void run() {
404 try {
405 guarded();
406 } catch(Throwable t) {
407 error(t);
408 }
409 }
410 }
411
412 public void close() {
413 try {
414 s.close();
415 } catch(IOException e) {}
416 reader.interrupt();
417 writer.interrupt();
418 }
419}