final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>();
final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder());
final Map<Watcher, Object> paused = new IdentityHashMap<>();
+ final Collection<SelectionKey> cancelled = new HashSet<>();
SelectPool(SelectorProvider provider) {
this.provider = provider;
return;
}
if(first != null)
- timeout = (long)Math.ceil((first - now) * 1000);
+ timeout = Math.max((long)Math.ceil((first - now) * 1000), 1);
}
+ Collection<SelectionKey> precancelled;
+ synchronized(cancelled) {
+ precancelled = new ArrayList<>(cancelled);
+ }
+ if(!precancelled.isEmpty())
+ timeout = 1;
poll.selectedKeys().clear();
try {
poll.select(timeout);
} catch(IOException e) {
throw(new RuntimeException(e));
}
+ if(!precancelled.isEmpty()) {
+ synchronized(cancelled) {
+ cancelled.removeAll(precancelled);
+ cancelled.notifyAll();
+ }
+ }
for(SelectionKey key : poll.selectedKeys())
handle((Watcher)key.attachment(), key.readyOps());
now = time();
void add(Watcher w, SelectableChannel ch) {
if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w))
throw(new IllegalStateException(w + ": already registered"));
+ try {
+ ch.configureBlocking(false);
+ } catch(IOException e) {
+ throw(new RuntimeException(ch + ": could not make non-blocking", e));
+ }
int evs = w.events();
double timeout = w.timeout();
boolean hastime = timeout < Double.POSITIVE_INFINITY;
SelectionKey wc = watching.remove(w);
Object tc = timeheap.remove(w);
Object pc = paused.remove(w);
- if(wc != null)
- wc.cancel();
+ if(wc != null) {
+ synchronized(cancelled) {
+ cancelled.add(wc);
+ wc.cancel();
+ poll.wakeup();
+ boolean irq = false;
+ while(cancelled.contains(wc)) {
+ try {
+ cancelled.wait();
+ } catch(InterruptedException e) {
+ irq = true;
+ }
+ }
+ if(irq)
+ Thread.currentThread().interrupt();
+ }
+ }
if(((wc != null) || (tc != null)) && (pc != null))
throw(new RuntimeException(w + ": inconsistent internal state"));
if(wc == null)
throw(new IllegalStateException(w + ": not registered"));
submit(() -> close(w));
- poll.wakeup();
}
void update(Watcher w) {