1 package dolda.dolcon.protocol;
4 import java.net.Socket;
7 public class Connection {
10 private Writer writer;
11 private Queue<Command> queue = new LinkedList<Command>();
12 private Queue<Command> pending = new LinkedList<Command>();
13 private int reqver = 2, revlo, revhi;
16 private Set<ConnectListener> connls = new HashSet<ConnectListener>();
17 private Exception error;
19 public interface ConnectListener {
20 public void connected() throws Exception;
21 public void error(Exception cause);
24 public Connection(String aspec) {
29 public void connect() throws ConnectException {
32 throw(new IllegalStateException("Already connected"));
36 s = new Socket(aspec, 1500);
37 } catch(java.net.UnknownHostException e) {
38 throw(new ConnectException("Could not resolve host " + aspec, e));
39 } catch(IOException e) {
40 throw(new ConnectException("Could not connect to host " + aspec, e));
42 pending = new LinkedList<Command>();
43 Command ccmd = new Command(".connect");
44 ccmd.addListener(new Command.Listener() {
45 public void done(Response resp) throws Exception {
48 } catch(VersionException e) {
52 synchronized(Connection.this) {
55 synchronized(connls) {
57 for(ConnectListener l : connls)
65 public void error(Exception cause) {
66 synchronized(connls) {
68 for(ConnectListener l : connls)
77 reader = new Reader();
78 writer = new Writer();
83 private void error(Throwable c) {
85 if(c instanceof StopCondition) {
86 StopCondition s = (StopCondition)c;
91 if(c instanceof Exception)
101 while((cmd = pending.poll()) != null) {
104 while((cmd = queue.poll()) != null) {
110 private void checkthread() {
111 if(Thread.currentThread() == reader)
112 throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
115 public void syncConnect() throws ConnectException, ClosedException, InterruptedException {
117 final boolean[] donep = new boolean[] {false};
118 final Exception[] errp = new Exception[] {null};
119 ConnectListener l = new ConnectListener() {
120 public void connected() {
127 public void error(Exception cause) {
135 addConnectListener(l);
143 throw(new ClosedException(errp[0]));
146 public void expectVersion(int reqver) {
147 this.reqver = reqver;
150 private void checkver(Response resp) throws VersionException {
151 revlo = Integer.parseInt(resp.token(0, 0));
152 revhi = Integer.parseInt(resp.token(0, 1));
153 if((reqver < revlo) || (reqver > revhi))
154 throw(new VersionException(reqver, revlo, revhi));
157 public Exception join() throws InterruptedException {
158 while(reader.isAlive()) {
165 public synchronized void addConnectListener(ConnectListener l) {
166 if((state != "idle") && (state != "connecting"))
167 throw(new IllegalStateException("Already connected"));
168 synchronized(connls) {
173 private void qcmd(Command cmd) {
174 synchronized(queue) {
180 static private class StopCondition extends Error {
181 final boolean normal;
183 public StopCondition(Exception cause, boolean normal) {
185 this.normal = normal;
189 private class Writer extends Thread {
194 private String quote(String t) {
197 StringBuilder sb = new StringBuilder();
198 boolean quote = false;
199 for(int i = 0; i < t.length(); i++) {
200 char c = t.charAt(i);
203 } else if(Character.isWhitespace(c)) {
211 return("\"" + sb.toString() + "\"");
213 return(sb.toString());
216 private void guarded() {
218 java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
222 synchronized(queue) {
223 while(pending.size() > 0)
225 while((cmd = queue.poll()) == null)
229 } catch(InterruptedException e) {
230 throw(new StopCondition(e, true));
232 StringBuilder out = new StringBuilder();
233 for(String s : cmd.tokens) {
236 out.append(quote(s));
238 w.write(out.toString());
240 } catch(IOException e) {
241 throw(new StopCondition(e, false));
248 } catch(Throwable t) {
254 private class Reader extends Thread {
255 private void dispatch(Response resp) throws Exception {
256 if(resp.code < 600) {
257 synchronized(queue) {
259 resp.cmd = pending.remove();
260 } catch(NoSuchElementException e) {
261 throw(new RuntimeException("DC server sent reply without a pending command"));
269 private void guarded() {
271 java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
272 String state = "start";
273 StringBuilder ct = new StringBuilder();
276 List<List<String>> lines = new LinkedList<List<String>>();
277 List<String> tokens = new LinkedList<String>();
283 if((i = r.read()) < 0) {
284 throw(new IOException("The server closed the connection"));
286 } catch(java.nio.channels.ClosedByInterruptException e) {
287 throw(new StopCondition(e, true));
292 if(state == "start") {
295 } else if(Character.isWhitespace(c)) {
303 } else if(state == "nl") {
305 if((code < 100) || (code >= 1000)) {
306 throw(new IOException("Illegal response code " + code + " from the server"));
309 tokens = new LinkedList<String>();
311 dispatch(new Response(code, lines));
312 lines = new LinkedList<List<String>>();
320 } else if(state == "code") {
321 if((c == '-') || Character.isWhitespace(c)) {
323 code = Integer.parseInt(ct.toString());
330 } else if(state == "token") {
331 if(Character.isWhitespace(c)) {
332 tokens.add(ct.toString());
336 } else if(c == '\\') {
338 } else if(c == '"') {
343 } else if(state == "bs") {
346 } else if(state == "cited") {
353 } else if(state == "cbs") {
357 throw(new Error("invalid state " + state));
362 } catch(Exception e) {
363 throw(new StopCondition(e, false));
370 } catch(Throwable t) {
376 public void close() {
379 } catch(IOException e) {}