diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index 0d5b2f850..02302e165 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -407,17 +407,19 @@ public void handleIO() throws IOException { int selected = selector.select(delay); Set selectedKeys = selector.selectedKeys(); - if (selectedKeys.isEmpty() && !shutDown) { + if (selector.selectedKeys().isEmpty() && !shutDown) { handleEmptySelects(); } else { getLogger().debug("Selected %d, selected %d keys", selected, - selectedKeys.size()); + selector.selectedKeys().size()); emptySelects = 0; - - for (SelectionKey sk : selectedKeys) { + + Iterator iterator = selector.selectedKeys().iterator(); + while (iterator.hasNext()) { + SelectionKey sk = iterator.next(); handleIO(sk); + iterator.remove(); } - selectedKeys.clear(); } handleOperationalTasks(); @@ -1005,10 +1007,11 @@ public void redistributeOperation(Operation op) { return; } - // The operation gets redistributed but has never been actually written, - // it we just straight re-add it without cloning. + // The operation gets redistributed but has never actually been written, + // then we just straight re-add it without cloning. if (op.getState() == OperationState.WRITE_QUEUED) { - addOperation(op.getHandlingNode(), op); + addOperation(((KeyedOperation)op).getKeys().iterator().next(), op); + return; } if (op instanceof MultiGetOperationImpl) { @@ -1120,7 +1123,7 @@ private void potentiallyCloseLeakingChannel(final SocketChannel ch, final MemcachedNode node) { if (ch != null && !ch.isConnected() && !ch.isConnectionPending()) { try { - ch.close(); + ch.socket().close(); } catch (IOException e) { getLogger().error("Exception closing channel: %s", node, e); } diff --git a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java index bbd97f29b..12854148c 100644 --- a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java +++ b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java @@ -60,7 +60,7 @@ public abstract class TCPMemcachedNodeImpl extends SpyObject implements private final long opQueueMaxBlockTime; private final long authWaitTime; private final ConnectionFactory connectionFactory; - private AtomicInteger reconnectAttempt = new AtomicInteger(1); + private AtomicInteger reconnectAttempt = new AtomicInteger(0); private SocketChannel channel; private int toWrite = 0; protected Operation optimizedOp = null; diff --git a/src/main/java/net/spy/memcached/protocol/binary/BinaryOperationFactory.java b/src/main/java/net/spy/memcached/protocol/binary/BinaryOperationFactory.java index b23915b8b..7b3d4b85e 100644 --- a/src/main/java/net/spy/memcached/protocol/binary/BinaryOperationFactory.java +++ b/src/main/java/net/spy/memcached/protocol/binary/BinaryOperationFactory.java @@ -154,7 +154,7 @@ protected Collection cloneGet(KeyedOperation op) { if(getCb != null) { rv.add(get(k, getCb)); } else if(getsCb != null) { - rv.add(get(k, getCb)); + rv.add(gets(k, getsCb)); } else { rv.add(replicaGet(k, ((ReplicaGetOperationImpl)op).getReplicaIndex() ,replicaGetCb)); }