package dolda.dolcon;
+import java.util.*;
+
public class Hub {
- int id, numpeers;
- String fnet, name, gid;
- String state;
+ int id, numpeers = 0;
+ final String fnet;
+ String name = "", gid = "";
+ String state = "syn";
+ Set<Listener> ls = new HashSet<Listener>();
- public Hub(int id) {
+ public Hub(int id, String fnet) {
this.id = id;
+ this.fnet = fnet.intern();
+ }
+
+ public interface Listener {
+ public void chName(Hub hub);
+ public void chNumPeers(Hub hub);
+ public void chState(Hub hub);
+ }
+
+ public Hub copy() {
+ Hub ret = new Hub(id, fnet);
+ ret.numpeers = numpeers;
+ ret.gid = gid;
+ ret.state = state;
+ ret.name = name;
+ return(ret);
}
public int getId() {
public String getState() {
return(state);
}
+
+ public void addListener(Listener ls) {
+ synchronized(this.ls) {
+ this.ls.add(ls);
+ }
+ }
+
+ public String toString() {
+ return("Hub (" + id + ", " + fnet + ", \"" + name + "\")");
+ }
}
--- /dev/null
+package dolda.dolcon;
+
+import java.util.*;
+import dolda.dolcon.protocol.*;
+
+class HubManager implements NotifyListener {
+ private Set<HubListener> hubls = new HashSet<HubListener>();
+ private Set<HubListener> delayed = new HashSet<HubListener>();
+ private Map<Integer, Hub> hubs = new TreeMap<Integer, Hub>();
+ private String state = "none";
+ private Session sess;
+
+ HubManager(Session sess) {
+ this.sess = sess;
+ }
+
+ private int atoi(String a) {
+ return(Integer.parseInt(a));
+ }
+
+ private void addall(final HubListener ls) {
+ for(final Hub hub : hubs.values()) {
+ sess.dispatch(new Runnable() {
+ public void run() {
+ ls.added(hub);
+ }
+ });
+ }
+ }
+
+ private void fetchhubs() {
+ synchronized(this) {
+ if(state != "none")
+ return;
+ state = "fetch";
+ }
+ Command cmd = new Command("lsnodes");
+ cmd.new Listener() {
+ public void done(Response r) {
+ if(r.code != 200)
+ return;
+ for(List<String> line : r.lines) {
+ Hub h = new Hub(atoi(line.get(0)), line.get(1));
+ h.name = line.get(2);
+ h.numpeers = atoi(line.get(3));
+ h.state = new String[] {"syn", "hs", "est", "dead"}[atoi(line.get(4))];
+ h.gid = line.get(5);
+ hubs.put(h.id, h);
+ }
+ synchronized(HubManager.this) {
+ state = "";
+ HubManager.this.notifyAll();
+ for(HubListener ls : delayed) {
+ addall(ls);
+ }
+ }
+ }
+
+ public void error(Exception e) {
+ synchronized(HubManager.this) {
+ state = "closed";
+ }
+ }
+ };
+ sess.conn.qcmd(new Command("notify", "fn:act", "on"), cmd);
+ sess.conn.addNotifyListener(this);
+ }
+
+ public Collection<Hub> gethubs() throws InterruptedException {
+ fetchhubs();
+ synchronized(this) {
+ while((state != "") && (state != "closed"))
+ wait();
+ }
+ Collection<Hub> ret = new LinkedList<Hub>();
+ synchronized(hubs) {
+ for(Hub h : hubs.values())
+ ret.add(h.copy());
+ }
+ return(ret);
+ }
+
+ public void addls(HubListener hl, boolean addexisting) {
+ fetchhubs();
+ synchronized(hubls) {
+ hubls.add(hl);
+ }
+ if(addexisting) {
+ synchronized(this) {
+ if(state != "")
+ delayed.add(hl);
+ else
+ addall(hl);
+ }
+ }
+ }
+
+ public void rmls(HubListener hl) {
+ synchronized(sess) {
+ synchronized(hubls) {
+ hubls.remove(hl);
+ if(hubls.isEmpty()) {
+ synchronized(hubs) {
+ hubs.clear();
+ }
+ state = "closed";
+ sess.conn.removeNotifyListener(this);
+ sess.hm = null;
+ }
+ }
+ }
+ }
+
+ public void notified(Response resp) {
+ synchronized(this) {
+ if(state != "")
+ return;
+ }
+ if(resp.code == 604) {
+ final Hub h = new Hub(atoi(resp.token(0, 0)), resp.token(0, 1));
+ synchronized(hubs) {
+ hubs.put(h.id, h);
+ }
+ sess.dispatch(new Runnable() {
+ public void run() {
+ synchronized(hubls) {
+ for(HubListener ls : hubls)
+ ls.added(h);
+ }
+ }
+ });
+ } else if(resp.code == 603) {
+ final Hub h;
+ synchronized(hubs) {
+ h = hubs.remove(atoi(resp.token(0, 0)));
+ }
+ sess.dispatch(new Runnable() {
+ public void run() {
+ synchronized(hubls) {
+ for(HubListener ls : hubls)
+ ls.removed(h);
+ }
+ }
+ });
+ } else if(resp.code == 601) {
+ final Hub h;
+ final String state = new String[] {"syn", "hs", "est", "dead"}[atoi(resp.token(0, 1))];
+ synchronized(hubs) {
+ h = hubs.get(atoi(resp.token(0, 0)));
+ }
+ h.state = state;
+ sess.dispatch(new Runnable() {
+ public void run() {
+ synchronized(h.ls) {
+ for(Hub.Listener ls : h.ls) {
+ ls.chState(h);
+ }
+ }
+ }
+ });
+ } else if(resp.code == 602) {
+ final Hub h;
+ final String name = resp.token(0, 1);
+ synchronized(hubs) {
+ h = hubs.get(atoi(resp.token(0, 0)));
+ }
+ h.name = name;
+ sess.dispatch(new Runnable() {
+ public void run() {
+ synchronized(h.ls) {
+ for(Hub.Listener ls : h.ls) {
+ ls.chName(h);
+ }
+ }
+ }
+ });
+ } else if(resp.code == 605) {
+ final Hub h;
+ final int np = atoi(resp.token(0, 1));
+ synchronized(hubs) {
+ h = hubs.get(atoi(resp.token(0, 0)));
+ }
+ h.numpeers = np;
+ sess.dispatch(new Runnable() {
+ public void run() {
+ synchronized(h.ls) {
+ for(Hub.Listener ls : h.ls) {
+ ls.chNumPeers(h);
+ }
+ }
+ }
+ });
+ }
+ }
+}
}
public String handles(List<String> name) {
- System.out.println(name);
if(name.contains("pam"))
return("pam");
return(null);
import java.util.*;
import dolda.dolcon.protocol.*;
-public class Session implements NotifyListener {
- private Connection conn;
+public class Session {
+ Connection conn;
private String state;
- private Set<HubListener> hubls = new HashSet<HubListener>();
private boolean listening = false;
- private String[] hubstate = {"none"};
- private String[][] states = {hubstate};
- private Map<Integer, Hub> hubs = new TreeMap<Integer, Hub>();
+ private Dispatcher dispatcher;
+ HubManager hm = null;
public Session(String aspec, String username, List<Authenticator> auth) throws AuthException, ProtocolException, InterruptedException {
state = "connecting";
state = "auth";
authenticate(username, auth);
state = "";
+ dispatcher = new Dispatcher();
+ dispatcher.start();
}
public Session(String aspec, String username, Authenticator... auth) throws AuthException, ProtocolException, InterruptedException {
String use = null;
Authenticator au = null;
for(Authenticator a : auth) {
- System.out.println(a);
use = a.handles(mechs);
if(use != null) {
au = a;
}
}
- private void checkstates() {
- boolean active = false;
- for(String[] sp : states) {
- if(sp[0] != "none") {
- active = true;
- break;
- }
+ private HubManager gethm() {
+ if(hm == null) {
+ hm = new HubManager(this);
}
- if(listening && !active)
- conn.removeNotifyListener(this);
- else if(!listening && active)
- conn.addNotifyListener(this);
- }
-
- private int atoi(String a) {
- return(Integer.parseInt(a));
- }
-
- private void fetchhubs() {
- synchronized(hubstate) {
- if(hubstate[0] != "none")
- return;
- hubstate[0] = "fetch";
- }
- Command cmd = new Command("lsnodes");
- cmd.new Listener() {
- public void done(Response r) {
- if(r.code != 200)
- return;
- for(List<String> line : r.lines) {
- Hub h = new Hub(atoi(line.get(0)));
- h.fnet = line.get(1).intern();
- h.name = line.get(2);
- h.numpeers = atoi(line.get(3));
- h.state = new String[] {"syn", "hs", "est", "dead"}[atoi(line.get(4))];
- h.gid = line.get(5);
- hubs.put(h.id, h);
- }
- }
-
- public void error(Exception e) {
- }
- };
- conn.qcmd(new Command("notify fn:act on"), cmd);
+ return(hm);
}
- public void addHubListener(HubListener hl, boolean addexisting) {
- fetchhubs();
- synchronized(hubls) {
- hubls.add(hl);
- }
+ public synchronized void addHubListener(HubListener hl, boolean addexisting) {
+ gethm().addls(hl, addexisting);
}
- public void removeHubListener(HubListener hl) {
- synchronized(hubls) {
- hubls.remove(hl);
- if(hubls.isEmpty()) {
- hubs.clear();
- hubstate[0] = "none";
- checkstates();
- }
- }
+ public synchronized void removeHubListener(HubListener hl) {
+ gethm().rmls(hl);
}
-
- public void notified(Response resp) {
+
+ public synchronized Collection<Hub> getHubs() throws InterruptedException {
+ return(gethm().gethubs());
}
public void close() {
protected void finalize() {
if(state != "closed")
close();
+ dispatcher.interrupt();
+ }
+
+ void dispatch(Runnable ev) {
+ dispatcher.dispatch(ev);
+ }
+
+ private static class Dispatcher extends Thread {
+ private Queue<Runnable> q = new LinkedList<Runnable>();
+
+ private Dispatcher() {
+ setDaemon(true);
+ }
+
+ public void dispatch(Runnable ev) {
+ synchronized(q) {
+ q.offer(ev);
+ q.notifyAll();
+ }
+ }
+
+ public void run() {
+ while(true) {
+ try {
+ Runnable r;
+ synchronized(q) {
+ while((r = q.poll()) == null)
+ q.wait();
+ }
+ r.run();
+ } catch(Throwable t) {
+ t.printStackTrace();
+ }
+ }
+ }
}
}
--- /dev/null
+package dolda.dolcon;
+
+import java.util.*;
+
+class Test {
+ public static void main(String[] args) throws Exception {
+ System.out.print("Password: ");
+ PasswordAuth auth = new PasswordAuth(new Scanner(System.in).nextLine());
+ long st = System.currentTimeMillis();
+ Session sess = new Session(args[0], args[1], auth);
+ sess.addHubListener(new HubListener() {
+ public void added(Hub h) {
+ h.addListener(new Hub.Listener() {
+ public void chState(Hub h) {
+ System.out.println(h.getId() + ": " + h.getState());
+ }
+
+ public void chNumPeers(Hub h) {
+ System.out.println(h.getId() + ": " + h.getNumPeers());
+ }
+
+ public void chName(Hub h) {
+ System.out.println(h.getId() + ": " + h.getName());
+ }
+ });
+ }
+
+ public void removed(Hub h) {
+ }
+ }, true);
+ /*
+ System.out.println(sess.getHubs());
+ sess.close();
+ System.out.println(System.currentTimeMillis() - st);
+ */
+ }
+}