diff --git a/pom.xml b/pom.xml index 4a07d5fbc..ae0d49c61 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ 4.0.0 net.spy spymemcached - 2.999.999-SNAPSHOT + 2.12.3-kwai-0.0.11 Spymemcached A client library for memcached. @@ -42,6 +42,19 @@ http://couchbase.com/ + + + + kuaishou.releases + Internal Release Repository + http://nexus.corp.kuaishou.com:88/nexus/content/repositories/releases/ + + + kuaishou.snapshots + Internal Snapshot Repository + http://nexus.corp.kuaishou.com:88/nexus/content/repositories/snapshots/ + + @@ -85,5 +98,22 @@ + + + + org.apache.maven.plugins + maven-source-plugin + 3.0.0 + + + attach-sources + + jar + + + + + + diff --git a/src/main/java/net/spy/memcached/AsyncOpListener.java b/src/main/java/net/spy/memcached/AsyncOpListener.java new file mode 100644 index 000000000..17d66f420 --- /dev/null +++ b/src/main/java/net/spy/memcached/AsyncOpListener.java @@ -0,0 +1,17 @@ +package net.spy.memcached; + +import net.spy.memcached.TimeoutListener.Method; +import net.spy.memcached.internal.BulkGetFuture; +import net.spy.memcached.internal.GetFuture; +import net.spy.memcached.internal.OperationFuture; + +public interface AsyncOpListener { + + T beforeInvoke(Method method); + + void onBulkGetCompletion(T before, BulkGetFuture future); + + void onGetCompletion(T before, GetFuture future); + + void onOperationCompletion(T before, Method method, OperationFuture future); +} diff --git a/src/main/java/net/spy/memcached/ConnectionFactory.java b/src/main/java/net/spy/memcached/ConnectionFactory.java index a85a6dda3..2ed6af30d 100644 --- a/src/main/java/net/spy/memcached/ConnectionFactory.java +++ b/src/main/java/net/spy/memcached/ConnectionFactory.java @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import net.spy.memcached.auth.AuthDescriptor; @@ -87,7 +88,7 @@ MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c, * Get the ExecutorService which is used to asynchronously execute listeners * on futures. */ - ExecutorService getListenerExecutorService(); + Executor getListenerExecutorService(); /** * Returns true if the default provided {@link ExecutorService} has not been diff --git a/src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java b/src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java index b3ece2eec..cfc040149 100644 --- a/src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java +++ b/src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import net.spy.memcached.auth.AuthDescriptor; @@ -75,7 +76,7 @@ public class ConnectionFactoryBuilder { protected MetricType metricType = null; protected MetricCollector collector = null; - protected ExecutorService executorService = null; + protected Executor executor = null; protected long authWaitTime = DefaultConnectionFactory.DEFAULT_AUTH_WAIT_TIME; /** @@ -311,8 +312,8 @@ public ConnectionFactoryBuilder setMetricCollector(MetricCollector collector) { * * @param executorService the ExecutorService to use. */ - public ConnectionFactoryBuilder setListenerExecutorService(ExecutorService executorService) { - this.executorService = executorService; + public ConnectionFactoryBuilder setListenerExecutorService(Executor executorService) { + this.executor = executorService; return this; } @@ -447,13 +448,13 @@ public MetricCollector getMetricCollector() { } @Override - public ExecutorService getListenerExecutorService() { - return executorService == null ? super.getListenerExecutorService() : executorService; + public Executor getListenerExecutorService() { + return executor == null ? super.getListenerExecutorService() : executor; } @Override public boolean isDefaultExecutorService() { - return executorService == null; + return executor == null; } @Override diff --git a/src/main/java/net/spy/memcached/DefaultConnectionFactory.java b/src/main/java/net/spy/memcached/DefaultConnectionFactory.java index 16623cd3b..5c5f5f828 100644 --- a/src/main/java/net/spy/memcached/DefaultConnectionFactory.java +++ b/src/main/java/net/spy/memcached/DefaultConnectionFactory.java @@ -32,10 +32,9 @@ import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -132,9 +131,9 @@ public class DefaultConnectionFactory extends SpyObject implements private MetricCollector metrics; /** - * The ExecutorService in which the listener callbacks will be executed. + * The Executor in which the listener callbacks will be executed. */ - private ExecutorService executorService; + private Executor executor; /** * Construct a DefaultConnectionFactory with the given parameters. @@ -289,16 +288,16 @@ public long getAuthWaitTime() { * @return the stored {@link ExecutorService}. */ @Override - public ExecutorService getListenerExecutorService() { - if (executorService == null) { + public Executor getListenerExecutorService() { + if (executor == null) { ThreadFactory threadFactory = new ThreadFactory() { @Override public Thread newThread(Runnable r) { - return new Thread(r, "FutureNotifyListener"); + return new Thread(r, "FutureNotifyListener[" + DefaultConnectionFactory.this.getName() + "]"); } }; - executorService = new ThreadPoolExecutor( + executor = new ThreadPoolExecutor( 0, Runtime.getRuntime().availableProcessors(), 60L, @@ -308,7 +307,7 @@ public Thread newThread(Runnable r) { ); } - return executorService; + return executor; } @Override diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index 77d0f048e..eb1c2f18d 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -23,12 +23,53 @@ package net.spy.memcached; +import static net.spy.memcached.TimeoutListener.Method.cas; +import static net.spy.memcached.TimeoutListener.Method.from; +import static net.spy.memcached.TimeoutListener.Method.get; +import static net.spy.memcached.TimeoutListener.Method.getBulk; +import static net.spy.memcached.TimeoutListener.Method.gets; +import static net.spy.memcached.TimeoutListener.Method.touch; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import net.spy.memcached.TimeoutListener.Method; import net.spy.memcached.auth.AuthDescriptor; import net.spy.memcached.auth.AuthThreadMonitor; import net.spy.memcached.compat.SpyObject; +import net.spy.memcached.compat.log.LoggerFactory; import net.spy.memcached.internal.BulkFuture; +import net.spy.memcached.internal.BulkGetCompletionListener; import net.spy.memcached.internal.BulkGetFuture; +import net.spy.memcached.internal.GetCompletionListener; import net.spy.memcached.internal.GetFuture; +import net.spy.memcached.internal.OperationCompletionListener; import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.internal.SingleElementInfiniteIterator; import net.spy.memcached.ops.CASOperationStatus; @@ -53,32 +94,6 @@ import net.spy.memcached.transcoders.Transcoder; import net.spy.memcached.util.StringUtils; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - /** * Client to a memcached server. * @@ -157,7 +172,11 @@ public class MemcachedClient extends SpyObject implements MemcachedClientIF, protected final AuthThreadMonitor authMonitor = new AuthThreadMonitor(); - protected final ExecutorService executorService; + protected final Executor executor; + + private final List timeoutListeners = new ArrayList(); + + private final List> asyncOpListeners = new ArrayList>(); /** * Get a memcache client operating on the specified memcached locations. @@ -210,7 +229,7 @@ public MemcachedClient(ConnectionFactory cf, List addrs) assert mconn != null : "Connection factory failed to make a connection"; operationTimeout = cf.getOperationTimeout(); authDescriptor = cf.getAuthDescriptor(); - executorService = cf.getListenerExecutorService(); + executor = cf.getListenerExecutorService(); if (authDescriptor != null) { addObserver(this); } @@ -301,11 +320,13 @@ private CountDownLatch broadcastOp(BroadcastOpFactory of, private OperationFuture asyncStore(StoreType storeType, String key, int exp, T value, Transcoder tc) { + Method method = from(storeType); + final Map, Object> before = before(method); CachedData co = tc.encode(value); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = - new OperationFuture(key, latch, operationTimeout, - executorService); + new OperationFuture(key, latch, operationTimeout, executor); + Operation op = opFact.store(storeType, key, co.getFlags(), exp, co.getData(), new StoreOperation.Callback() { @Override @@ -323,11 +344,33 @@ public void complete() { rv.signalComplete(); } }); + rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); + addOpAsyncListener(method, before, rv); return rv; } + private void addOpAsyncListener(final Method method, + final Map, Object> before, OperationFuture rv) { + rv.addListener(new OperationCompletionListener() { + + public void onComplete(OperationFuture future) { + for (Entry, Object> entry : before.entrySet()) { + entry.getKey().onOperationCompletion(entry.getValue(), method, future); + } + } + }); + } + + private Map, Object> before(Method method) { + Map, Object> result = new IdentityHashMap, Object>(); + for (AsyncOpListener asyncOpListener : asyncOpListeners) { + result.put(asyncOpListener, asyncOpListener.beforeInvoke(method)); + } + return result; + } + private OperationFuture asyncStore(StoreType storeType, String key, int exp, Object value) { return asyncStore(storeType, key, exp, value, transcoder); @@ -335,10 +378,13 @@ private OperationFuture asyncStore(StoreType storeType, String key, private OperationFuture asyncCat(ConcatenationType catType, long cas, String key, T value, Transcoder tc) { + Method method = from(catType); + final Map, Object> before = before(method); CachedData co = tc.encode(value); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, - latch, operationTimeout, executorService); + latch, operationTimeout, executor); + Operation op = opFact.cat(catType, cas, key, co.getData(), new OperationCallback() { @Override @@ -352,8 +398,10 @@ public void complete() { rv.signalComplete(); } }); + rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); + addOpAsyncListener(method, before, rv); return rv; } @@ -387,10 +435,11 @@ public OperationFuture touch(final String key, final int exp) { @Override public OperationFuture touch(final String key, final int exp, final Transcoder tc) { + Method method = touch; + Map, Object> before = before(method); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = - new OperationFuture(key, latch, operationTimeout, - executorService); + new OperationFuture(key, latch, operationTimeout, executor); Operation op = opFact.touch(key, exp, new OperationCallback() { @Override @@ -404,8 +453,10 @@ public void complete() { rv.signalComplete(); } }); + rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); + addOpAsyncListener(method, before, rv); return rv; } @@ -620,11 +671,13 @@ public OperationFuture prepend(String key, T val, @Override public OperationFuture asyncCAS(String key, long casId, int exp, T value, Transcoder tc) { + Method method = cas; + Map, Object> before = before(method); CachedData co = tc.encode(value); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = - new OperationFuture(key, latch, operationTimeout, - executorService); + new OperationFuture(key, latch, operationTimeout, executor); + Operation op = opFact.cas(StoreType.set, key, casId, co.getFlags(), exp, co.getData(), new StoreOperation.Callback() { @Override @@ -649,8 +702,10 @@ public void complete() { rv.signalComplete(); } }); + rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); + addOpAsyncListener(method, before, rv); return rv; } @@ -1016,9 +1071,9 @@ public OperationFuture replace(String key, int exp, Object o) { @Override public GetFuture asyncGet(final String key, final Transcoder tc) { + final Map, Object> before = before(get); final CountDownLatch latch = new CountDownLatch(1); - final GetFuture rv = new GetFuture(latch, operationTimeout, key, - executorService); + final GetFuture rv = new GetFuture(latch, operationTimeout, key, executor); Operation op = opFact.get(key, new GetOperation.Callback() { private Future val; @@ -1040,8 +1095,17 @@ public void complete() { rv.signalComplete(); } }); + rv.setTimeoutListeners(get, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); + rv.addListener(new GetCompletionListener() { + + public void onComplete(GetFuture future) { + for (Entry, Object> entry : before.entrySet()) { + entry.getKey().onGetCompletion(entry.getValue(), future); + } + } + }); return rv; } @@ -1072,10 +1136,11 @@ public GetFuture asyncGet(final String key) { public OperationFuture> asyncGets(final String key, final Transcoder tc) { + Method method = gets; + Map, Object> before = before(method); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture> rv = - new OperationFuture>(key, latch, operationTimeout, - executorService); + new OperationFuture>(key, latch, operationTimeout, executor); Operation op = opFact.gets(key, new GetsOperation.Callback() { private CASValue val; @@ -1099,8 +1164,10 @@ public void complete() { rv.signalComplete(); } }); + rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); + addOpAsyncListener(method, before, rv); return rv; } @@ -1273,6 +1340,7 @@ public Object get(String key) { @Override public BulkFuture> asyncGetBulk(Iterator keyIter, Iterator> tcIter) { + final Map, Object> before = before(getBulk); final Map> m = new ConcurrentHashMap>(); // This map does not need to be a ConcurrentHashMap @@ -1319,7 +1387,9 @@ public BulkFuture> asyncGetBulk(Iterator keyIter, int initialLatchCount = chunks.isEmpty() ? 0 : 1; final CountDownLatch latch = new CountDownLatch(initialLatchCount); final Collection ops = new ArrayList(chunks.size()); - final BulkGetFuture rv = new BulkGetFuture(m, ops, latch, executorService); + final String name = connFactory instanceof DefaultConnectionFactory ? ((DefaultConnectionFactory) connFactory) + .getName() : null; + final BulkGetFuture rv = new BulkGetFuture(m, ops, latch, executor, name); GetOperation.Callback cb = new GetOperation.Callback() { @Override @@ -1352,14 +1422,23 @@ public void complete() { final Map mops = new HashMap(); - for (Map.Entry> me : chunks.entrySet()) { + for (Entry> me : chunks.entrySet()) { Operation op = opFact.get(me.getValue(), cb); mops.put(me.getKey(), op); ops.add(op); } assert mops.size() == chunks.size(); + rv.setTimeoutListeners(null, timeoutListeners); mconn.checkState(); mconn.addOperations(mops); + rv.addListener(new BulkGetCompletionListener() { + + public void onComplete(BulkGetFuture future) { + for (Entry, Object> entry : before.entrySet()) { + entry.getKey().onBulkGetCompletion(entry.getValue(), future); + } + } + }); return rv; } @@ -1503,9 +1582,11 @@ public OperationFuture> asyncGetAndTouch(final String key, @Override public OperationFuture> asyncGetAndTouch(final String key, final int exp, final Transcoder tc) { + Method method = Method.getAndTouch; + Map, Object> before = before(method); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture> rv = new OperationFuture>( - key, latch, operationTimeout, executorService); + key, latch, operationTimeout, executor); Operation op = opFact.getAndTouch(key, exp, new GetAndTouchOperation.Callback() { @@ -1530,8 +1611,10 @@ public void gotData(String k, int flags, long cas, byte[] data) { tc.getMaxSize()))); } }); + rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); + addOpAsyncListener(method, before, rv); return rv; } @@ -1767,6 +1850,13 @@ public void complete() { })); try { if (!latch.await(operationTimeout, TimeUnit.MILLISECONDS)) { + for (TimeoutListener timeoutListener : timeoutListeners) { + try { + timeoutListener.onTimeout(from(m), null); + } catch (Exception e) { + LoggerFactory.getLogger(getClass()).error("fail to execute timeout listener:", e); + } + } throw new OperationTimeoutException("Mutate operation timed out," + "unable to modify counter [" + key + ']'); } @@ -1985,9 +2075,12 @@ private OperationFuture asyncMutate(Mutator m, String key, long by, + "binary protocol or the sync variant."); } + Method method = from(m); + Map, Object> before = before(method); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = - new OperationFuture(key, latch, operationTimeout, executorService); + new OperationFuture(key, latch, operationTimeout, executor); + Operation op = opFact.mutate(m, key, by, def, exp, new OperationCallback() { @Override @@ -2001,8 +2094,10 @@ public void complete() { rv.signalComplete(); } }); - mconn.enqueueOperation(key, op); + rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); + mconn.enqueueOperation(key, op); + addOpAsyncListener(method, before, rv); return rv; } @@ -2306,9 +2401,11 @@ public OperationFuture delete(String key) { */ @Override public OperationFuture delete(String key, long cas) { + Method method = Method.delete; + Map, Object> before = before(method); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, - latch, operationTimeout, executorService); + latch, operationTimeout, executor); DeleteOperation.Callback callback = new DeleteOperation.Callback() { @Override @@ -2335,8 +2432,10 @@ public void complete() { op = opFact.delete(key, cas, callback); } + rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); + addOpAsyncListener(method, before, rv); return rv; } @@ -2375,7 +2474,7 @@ public void complete() { }); return new OperationFuture(null, blatch, flushResult, - operationTimeout, executorService) { + operationTimeout, executor) { @Override public void set(Boolean o, OperationStatus s) { @@ -2493,10 +2592,12 @@ public boolean shutdown(long timeout, TimeUnit unit) { mconn.setName(baseName + " - SHUTTING DOWN"); boolean rv = true; if (connFactory.isDefaultExecutorService()) { - try { - executorService.shutdown(); - } catch (Exception ex) { - getLogger().warn("Failed shutting down the ExecutorService: ", ex); + if(executor instanceof ExecutorService) { + try { + ((ExecutorService) executor).shutdown(); + } catch (Exception ex) { + getLogger().warn("Failed shutting down the ExecutorService: ", ex); + } } } try { @@ -2581,6 +2682,22 @@ public boolean addObserver(ConnectionObserver obs) { return rv; } + public MemcachedClient addTimeoutListener(TimeoutListener listener) { + if (listener == null) { + throw new NullPointerException(); + } + timeoutListeners.add(listener); + return this; + } + + public MemcachedClient addAsyncOpListener(AsyncOpListener listener) { + if (listener == null) { + throw new NullPointerException(); + } + asyncOpListeners.add((AsyncOpListener) listener); + return this; + } + /** * Remove a connection observer. * @@ -2639,8 +2756,8 @@ public TranscodeService getTranscoderService() { return tcService; } - public ExecutorService getExecutorService() { - return executorService; + public Executor getExecutorService() { + return executor; } @Override diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index 47f432fb1..4435ebd7c 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -23,27 +23,6 @@ package net.spy.memcached; -import net.spy.memcached.compat.SpyThread; -import net.spy.memcached.compat.log.Logger; -import net.spy.memcached.compat.log.LoggerFactory; -import net.spy.memcached.internal.OperationFuture; -import net.spy.memcached.metrics.MetricCollector; -import net.spy.memcached.metrics.MetricType; -import net.spy.memcached.ops.GetOperation; -import net.spy.memcached.ops.KeyedOperation; -import net.spy.memcached.ops.NoopOperation; -import net.spy.memcached.ops.Operation; -import net.spy.memcached.ops.OperationCallback; -import net.spy.memcached.ops.OperationException; -import net.spy.memcached.ops.OperationState; -import net.spy.memcached.ops.OperationStatus; -import net.spy.memcached.ops.TapOperation; -import net.spy.memcached.ops.VBucketAware; -import net.spy.memcached.protocol.binary.BinaryOperationFactory; -import net.spy.memcached.protocol.binary.MultiGetOperationImpl; -import net.spy.memcached.protocol.binary.TapAckOperationImpl; -import net.spy.memcached.util.StringUtils; - import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; @@ -71,9 +50,30 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import net.spy.memcached.compat.SpyThread; +import net.spy.memcached.compat.log.Logger; +import net.spy.memcached.compat.log.LoggerFactory; +import net.spy.memcached.internal.OperationFuture; +import net.spy.memcached.metrics.MetricCollector; +import net.spy.memcached.metrics.MetricType; +import net.spy.memcached.ops.GetOperation; +import net.spy.memcached.ops.KeyedOperation; +import net.spy.memcached.ops.NoopOperation; +import net.spy.memcached.ops.Operation; +import net.spy.memcached.ops.OperationCallback; +import net.spy.memcached.ops.OperationException; +import net.spy.memcached.ops.OperationState; +import net.spy.memcached.ops.OperationStatus; +import net.spy.memcached.ops.TapOperation; +import net.spy.memcached.ops.VBucketAware; +import net.spy.memcached.protocol.binary.BinaryOperationFactory; +import net.spy.memcached.protocol.binary.MultiGetOperationImpl; +import net.spy.memcached.protocol.binary.TapAckOperationImpl; +import net.spy.memcached.util.StringUtils; + /** * Main class for handling connections to a memcached cluster. */ @@ -227,9 +227,9 @@ public class MemcachedConnection extends SpyThread { private final boolean verifyAliveOnConnect; /** - * The {@link ExecutorService} to use for callbacks. + * The {@link Executor} to use for callbacks. */ - private final ExecutorService listenerExecutorService; + private final Executor listenerExecutor; /** * The {@link MetricCollector} to accumulate metrics (or dummy). @@ -276,7 +276,7 @@ public MemcachedConnection(final int bufSize, final ConnectionFactory f, selector = Selector.open(); retryOps = Collections.synchronizedList(new ArrayList()); nodesToShutdown = new ConcurrentLinkedQueue(); - listenerExecutorService = f.getListenerExecutorService(); + listenerExecutor = f.getListenerExecutorService(); this.bufSize = bufSize; this.connectionFactory = f; @@ -302,7 +302,9 @@ public MemcachedConnection(final int bufSize, final ConnectionFactory f, registerMetrics(); - setName("Memcached IO over " + this); + String name = f instanceof DefaultConnectionFactory ? ((DefaultConnectionFactory) f) + .getName() : getName(); + setName("Memcached IO over [" + name + "] " + this); setDaemon(f.isDaemon()); start(); } @@ -751,7 +753,7 @@ private void finishConnect(final SelectionKey sk, final MemcachedNode node) if (verifyAliveOnConnect) { final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture("noop", - latch, 2500, listenerExecutorService); + latch, 2500, listenerExecutor); NoopOperation testOp = opFact.noop(new OperationCallback() { public void receivedStatus(OperationStatus status) { rv.set(status.isSuccess(), status); @@ -1468,6 +1470,8 @@ public void run() { logRunException(e); } catch (ConcurrentModificationException e) { logRunException(e); + } catch (OutOfMemoryError e) { + logRunException(e); } } getLogger().info("Shut down memcached client"); @@ -1481,7 +1485,7 @@ public void run() { * * @param e the exception to log. */ - private void logRunException(final Exception e) { + private void logRunException(final Throwable e) { if (shutDown) { getLogger().debug("Exception occurred during shutdown", e); } else { diff --git a/src/main/java/net/spy/memcached/TimeoutListener.java b/src/main/java/net/spy/memcached/TimeoutListener.java new file mode 100644 index 000000000..f7616a307 --- /dev/null +++ b/src/main/java/net/spy/memcached/TimeoutListener.java @@ -0,0 +1,55 @@ +package net.spy.memcached; + +import java.util.EventListener; +import java.util.concurrent.Future; + +import net.spy.memcached.ops.ConcatenationType; +import net.spy.memcached.ops.Mutator; +import net.spy.memcached.ops.StoreType; + +public interface TimeoutListener extends EventListener { + + /** + * @param future may be {@code null} if from some non async operations. + */ + void onTimeout(Method method, Future future) throws Exception; + + enum Method { + get, getAndTouch, getBulk, getBulkSome, gets, set, replace, add, append, prepend, touch, cas, incr, decr, delete; + + static Method from(Mutator mutator) { + switch (mutator) { + case incr: + return incr; + case decr: + return decr; + default: + return null; + } + } + + static Method from(ConcatenationType concatenationType) { + switch (concatenationType) { + case append: + return append; + case prepend: + return prepend; + default: + return null; + } + } + + static Method from(StoreType storeType) { + switch (storeType) { + case set: + return set; + case add: + return add; + case replace: + return replace; + default: + return null; + } + } + } +} diff --git a/src/main/java/net/spy/memcached/compat/log/SLF4JLogger.java b/src/main/java/net/spy/memcached/compat/log/SLF4JLogger.java index 65427a306..9f57eec32 100644 --- a/src/main/java/net/spy/memcached/compat/log/SLF4JLogger.java +++ b/src/main/java/net/spy/memcached/compat/log/SLF4JLogger.java @@ -22,6 +22,14 @@ package net.spy.memcached.compat.log; +import static org.slf4j.spi.LocationAwareLogger.DEBUG_INT; +import static org.slf4j.spi.LocationAwareLogger.ERROR_INT; +import static org.slf4j.spi.LocationAwareLogger.INFO_INT; +import static org.slf4j.spi.LocationAwareLogger.TRACE_INT; +import static org.slf4j.spi.LocationAwareLogger.WARN_INT; + +import org.slf4j.spi.LocationAwareLogger; + /** * Logging Implementation using the SLF4J * logging facade. @@ -38,7 +46,10 @@ */ public class SLF4JLogger extends AbstractLogger { + private static final String FQCN = AbstractLogger.class.getName(); + private final org.slf4j.Logger logger; + private final LocationAwareLogger locationAwareLogger; /** * Get an instance of the SLF4JLogger. @@ -46,6 +57,11 @@ public class SLF4JLogger extends AbstractLogger { public SLF4JLogger(String name) { super(name); logger = org.slf4j.LoggerFactory.getLogger(name); + if(logger instanceof LocationAwareLogger) { + locationAwareLogger = (LocationAwareLogger) logger; + } else { + locationAwareLogger = null; + } } @Override @@ -78,22 +94,40 @@ public void log(Level level, Object message, Throwable e) { switch(level) { case TRACE: - logger.trace(message.toString(), e); + if (locationAwareLogger != null) { + locationAwareLogger.log(null, FQCN, TRACE_INT, message.toString(), null, e); + } else { + logger.trace(message.toString(), e); + } break; case DEBUG: - logger.debug(message.toString(), e); + if (locationAwareLogger != null) { + locationAwareLogger.log(null, FQCN, DEBUG_INT, message.toString(), null, e); + } else { + logger.debug(message.toString(), e); + } break; case INFO: - logger.info(message.toString(), e); + if (locationAwareLogger != null) { + locationAwareLogger.log(null, FQCN, INFO_INT, message.toString(), null, e); + } else { + logger.info(message.toString(), e); + } break; case WARN: - logger.warn(message.toString(), e); + if (locationAwareLogger != null) { + locationAwareLogger.log(null, FQCN, WARN_INT, message.toString(), null, e); + } else { + logger.warn(message.toString(), e); + } break; case ERROR: - logger.error(message.toString(), e); - break; case FATAL: - logger.error(message.toString(), e); + if (locationAwareLogger != null) { + locationAwareLogger.log(null, FQCN, ERROR_INT, message.toString(), null, e); + } else { + logger.error(message.toString(), e); + } break; default: logger.error("Unhandled Logging Level: " + level diff --git a/src/main/java/net/spy/memcached/internal/AbstractListenableFuture.java b/src/main/java/net/spy/memcached/internal/AbstractListenableFuture.java index fd0b4e970..b7e9ec4c3 100644 --- a/src/main/java/net/spy/memcached/internal/AbstractListenableFuture.java +++ b/src/main/java/net/spy/memcached/internal/AbstractListenableFuture.java @@ -23,10 +23,13 @@ package net.spy.memcached.internal; +import net.spy.memcached.TimeoutListener; +import net.spy.memcached.TimeoutListener.Method; import net.spy.memcached.compat.SpyObject; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -47,9 +50,9 @@ public abstract class AbstractListenableFuture implements ListenableFuture { /** - * The {@link ExecutorService} in which the notifications will be handled. + * The {@link Executor} in which the notifications will be handled. */ - private final ExecutorService service; + private final Executor service; /** * Holds the list of listeners which will be notified upon completion. @@ -61,17 +64,19 @@ public abstract class AbstractListenableFuture * * @param executor the executor in which the callbacks will be executed in. */ - protected AbstractListenableFuture(ExecutorService executor) { + protected AbstractListenableFuture(Executor executor) { service = executor; listeners = new ArrayList>>(); } + public abstract void setTimeoutListeners(Method method, List timeoutListeners); + /** * Returns the current executor. * * @return the current executor service. */ - protected ExecutorService executor() { + protected Executor executor() { return service; } @@ -108,9 +113,9 @@ protected Future addToListeners( * @param future the future to hand over. * @param listener the listener to notify. */ - protected void notifyListener(final ExecutorService executor, + protected void notifyListener(final Executor executor, final Future future, final GenericCompletionListener listener) { - executor.submit(new Runnable() { + executor.execute(new Runnable() { @Override public void run() { try { @@ -172,4 +177,4 @@ protected Future removeFromListeners( } return this; } -} \ No newline at end of file +} diff --git a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java index 0f956c3a1..f35b4b1c6 100644 --- a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java +++ b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java @@ -23,19 +23,26 @@ package net.spy.memcached.internal; +import static java.util.Collections.unmodifiableCollection; +import static net.spy.memcached.TimeoutListener.Method.getBulk; +import static net.spy.memcached.TimeoutListener.Method.getBulkSome; + import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import net.spy.memcached.MemcachedConnection; +import net.spy.memcached.TimeoutListener; +import net.spy.memcached.TimeoutListener.Method; import net.spy.memcached.compat.log.LoggerFactory; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; @@ -56,17 +63,21 @@ public class BulkGetFuture private final Map> rvMap; private final Collection ops; private final CountDownLatch latch; + private final String name; private OperationStatus status; private boolean cancelled = false; private boolean timeout = false; + private Collection timeoutOps; + private List timeoutListeners; public BulkGetFuture(Map> m, Collection getOps, - CountDownLatch l, ExecutorService service) { + CountDownLatch l, Executor service, String name) { super(service); rvMap = m; ops = getOps; latch = l; status = null; + this.name = name; } public boolean cancel(boolean ign) { @@ -104,8 +115,16 @@ public Map getSome(long to, TimeUnit unit) Map ret = internalGet(to, unit, timedoutOps); if (timedoutOps.size() > 0) { timeout = true; + this.timeoutOps = timedoutOps; + for (TimeoutListener timeoutListener : timeoutListeners) { + try { + timeoutListener.onTimeout(getBulkSome, this); + } catch (Exception e) { + LoggerFactory.getLogger(getClass()).error("fail to execute timeout listener:", e); + } + } LoggerFactory.getLogger(getClass()).warn( - new CheckedOperationTimeoutException("Operation timed out: ", + new CheckedOperationTimeoutException("Operation timed out[" + name + "]: ", timedoutOps).getMessage()); } return ret; @@ -124,7 +143,15 @@ public Map get(long to, TimeUnit unit) Map ret = internalGet(to, unit, timedoutOps); if (timedoutOps.size() > 0) { this.timeout = true; - throw new CheckedOperationTimeoutException("Operation timed out.", + this.timeoutOps = timedoutOps; + for (TimeoutListener timeoutListener : timeoutListeners) { + try { + timeoutListener.onTimeout(getBulk, this); + } catch (Exception e) { + LoggerFactory.getLogger(getClass()).error("fail to execute timeout listener:", e); + } + } + throw new CheckedOperationTimeoutException("Operation timed out[" + name + "].", timedoutOps); } return ret; @@ -183,6 +210,10 @@ public OperationStatus getStatus() { return status; } + public Collection getOperations() { + return unmodifiableCollection(ops); + } + public void setStatus(OperationStatus s) { status = s; } @@ -225,4 +256,12 @@ public void signalComplete() { notifyListeners(); } + @Override + public void setTimeoutListeners(Method method, List timeoutListeners) { + this.timeoutListeners = timeoutListeners; + } + + public Collection getTimeoutOps() { + return timeoutOps; + } } diff --git a/src/main/java/net/spy/memcached/internal/EnqueueTimeoutException.java b/src/main/java/net/spy/memcached/internal/EnqueueTimeoutException.java new file mode 100644 index 000000000..820dbc172 --- /dev/null +++ b/src/main/java/net/spy/memcached/internal/EnqueueTimeoutException.java @@ -0,0 +1,17 @@ +package net.spy.memcached.internal; + +import net.spy.memcached.ops.Operation; + +public class EnqueueTimeoutException extends IllegalStateException { + + private final Operation op; + + public EnqueueTimeoutException(String s, Operation op) { + super(s); + this.op = op; + } + + public Operation getOp() { + return op; + } +} diff --git a/src/main/java/net/spy/memcached/internal/GetFuture.java b/src/main/java/net/spy/memcached/internal/GetFuture.java index a18ab0b56..b4b554f39 100644 --- a/src/main/java/net/spy/memcached/internal/GetFuture.java +++ b/src/main/java/net/spy/memcached/internal/GetFuture.java @@ -24,13 +24,18 @@ package net.spy.memcached.internal; +import static net.spy.memcached.TimeoutListener.Method.get; + +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import net.spy.memcached.TimeoutListener; +import net.spy.memcached.TimeoutListener.Method; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationStatus; @@ -48,7 +53,7 @@ public class GetFuture private final OperationFuture> rv; public GetFuture(CountDownLatch l, long opTimeout, String key, - ExecutorService service) { + Executor service) { super(service); this.rv = new OperationFuture>(key, l, opTimeout, service); } @@ -70,6 +75,10 @@ public T get(long duration, TimeUnit units) throws InterruptedException, return v == null ? null : v.get(); } + public Operation getOperation() { + return rv.getOperation(); + } + public OperationStatus getStatus() { return rv.getStatus(); } @@ -109,4 +118,8 @@ public void signalComplete() { notifyListeners(); } + @Override + public void setTimeoutListeners(Method method, List timeoutListeners) { + rv.setTimeoutListeners(method, timeoutListeners); + } } diff --git a/src/main/java/net/spy/memcached/internal/OperationFuture.java b/src/main/java/net/spy/memcached/internal/OperationFuture.java index 3a7f8cca8..0cb2dff5c 100644 --- a/src/main/java/net/spy/memcached/internal/OperationFuture.java +++ b/src/main/java/net/spy/memcached/internal/OperationFuture.java @@ -23,9 +23,11 @@ package net.spy.memcached.internal; +import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -33,6 +35,8 @@ import java.util.concurrent.atomic.AtomicReference; import net.spy.memcached.MemcachedConnection; +import net.spy.memcached.TimeoutListener; +import net.spy.memcached.TimeoutListener.Method; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; import net.spy.memcached.ops.OperationStatus; @@ -61,6 +65,8 @@ public class OperationFuture private final long timeout; private Operation op; private final String key; + private Method method; + private List timeoutListeners; private Long cas; /** @@ -72,8 +78,7 @@ public class OperationFuture * @param l the latch to be used counting down the OperationFuture * @param opTimeout the timeout within which the operation needs to be done */ - public OperationFuture(String k, CountDownLatch l, long opTimeout, - ExecutorService service) { + public OperationFuture(String k, CountDownLatch l, long opTimeout, Executor service) { this(k, l, new AtomicReference(null), opTimeout, service); } @@ -88,7 +93,7 @@ public OperationFuture(String k, CountDownLatch l, long opTimeout, * @param opTimeout the timeout within which the operation needs to be done */ public OperationFuture(String k, CountDownLatch l, AtomicReference oref, - long opTimeout, ExecutorService service) { + long opTimeout, Executor service) { super(service); latch = l; @@ -99,6 +104,12 @@ public OperationFuture(String k, CountDownLatch l, AtomicReference oref, cas = null; } + @Override + public void setTimeoutListeners(Method method, List timeoutListeners) { + this.method = method; + this.timeoutListeners = timeoutListeners; + } + /** * Cancel this operation, if possible. * @@ -164,8 +175,14 @@ public T get(long duration, TimeUnit units) throws InterruptedException, if (op != null) { // op can be null on a flush op.timeOut(); } - throw new CheckedOperationTimeoutException( - "Timed out waiting for operation", op); + for (TimeoutListener listener : timeoutListeners) { + try { + listener.onTimeout(method, this); + } catch (Exception e) { + getLogger().error("Error execute timeout listeners", e); + } + } + throw new CheckedOperationTimeoutException("Timed out waiting for operation", op); } else { // continuous timeout counter will be reset MemcachedConnection.opSucceeded(op); @@ -231,6 +248,11 @@ public Long getCas() { } return cas; } + + public Operation getOperation() { + return op; + } + /** * Get the current status of this operation. * diff --git a/src/main/java/net/spy/memcached/ops/Operation.java b/src/main/java/net/spy/memcached/ops/Operation.java index b2b1f6a6b..36fa47f67 100644 --- a/src/main/java/net/spy/memcached/ops/Operation.java +++ b/src/main/java/net/spy/memcached/ops/Operation.java @@ -184,4 +184,10 @@ public interface Operation { * Sets the clone count for this operation. */ void setCloneCount(int count); + + long getCreateTimestamp(); + + long getStartWritingTimestamp(); + + long getFinishedReadTimestamp(); } diff --git a/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java b/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java index 24d690899..8d8e6defa 100644 --- a/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java @@ -23,6 +23,8 @@ package net.spy.memcached.protocol; +import static java.lang.System.currentTimeMillis; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -31,7 +33,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.concurrent.atomic.AtomicLong; import net.spy.memcached.MemcachedNode; import net.spy.memcached.compat.SpyObject; @@ -70,6 +71,10 @@ public abstract class BaseOperationImpl extends SpyObject implements Operation { new HashSet(); private long writeCompleteTimestamp; + private final long createTime = currentTimeMillis(); + private long startWriteTimestamp; + private long finishReadTimestamp; + /** * If the operation gets cloned, the reference is used to cascade cancellations * and timeouts. @@ -164,12 +169,18 @@ protected final synchronized void transitionState(OperationState newState) { cmd = null; } if (state == OperationState.COMPLETE) { + if (finishReadTimestamp == 0) { + finishReadTimestamp = currentTimeMillis(); + } callback.complete(); } } public final void writing() { transitionState(OperationState.WRITING); + if(startWriteTimestamp == 0){ + startWriteTimestamp = currentTimeMillis(); + } } public final void writeComplete() { @@ -275,4 +286,16 @@ public int getCloneCount() { public void setCloneCount(int count) { cloneCount = count; } + + public long getCreateTimestamp() { + return createTime; + } + + public long getStartWritingTimestamp() { + return startWriteTimestamp; + } + + public long getFinishedReadTimestamp() { + return finishReadTimestamp; + } } diff --git a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java index 52deeb04b..eb4e1f365 100644 --- a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java +++ b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java @@ -40,6 +40,7 @@ import net.spy.memcached.MemcachedConnection; import net.spy.memcached.MemcachedNode; import net.spy.memcached.compat.SpyObject; +import net.spy.memcached.internal.EnqueueTimeoutException; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; import net.spy.memcached.protocol.binary.TapAckOperationImpl; @@ -359,8 +360,8 @@ public final void addOp(Operation op) { return; } if (!inputQueue.offer(op, opQueueMaxBlockTime, TimeUnit.MILLISECONDS)) { - throw new IllegalStateException("Timed out waiting to add " + op - + "(max wait=" + opQueueMaxBlockTime + "ms)"); + throw new EnqueueTimeoutException("Timed out waiting to add " + op + + "(max wait=" + opQueueMaxBlockTime + "ms)", op); } } catch (InterruptedException e) { // Restore the interrupted status diff --git a/src/test/java/net/spy/memcached/MemcachedTimeoutListenerTest.java b/src/test/java/net/spy/memcached/MemcachedTimeoutListenerTest.java new file mode 100644 index 000000000..363057979 --- /dev/null +++ b/src/test/java/net/spy/memcached/MemcachedTimeoutListenerTest.java @@ -0,0 +1,159 @@ +package net.spy.memcached; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static net.spy.memcached.TestConfig.PORT_NUMBER; +import static net.spy.memcached.TimeoutListener.Method.add; +import static net.spy.memcached.TimeoutListener.Method.append; +import static net.spy.memcached.TimeoutListener.Method.decr; +import static net.spy.memcached.TimeoutListener.Method.delete; +import static net.spy.memcached.TimeoutListener.Method.get; +import static net.spy.memcached.TimeoutListener.Method.getAndTouch; +import static net.spy.memcached.TimeoutListener.Method.getBulk; +import static net.spy.memcached.TimeoutListener.Method.getBulkSome; +import static net.spy.memcached.TimeoutListener.Method.incr; +import static net.spy.memcached.TimeoutListener.Method.prepend; +import static net.spy.memcached.TimeoutListener.Method.replace; +import static net.spy.memcached.TimeoutListener.Method.set; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; + +import junit.framework.TestCase; +import net.spy.memcached.TimeoutListener.Method; + +public class MemcachedTimeoutListenerTest extends TestCase { + + private MemcachedClient client = null; + private Set methodSet = new HashSet(); + + @Override + protected void setUp() throws IOException { + client = new MemcachedClient(new BinaryConnectionFactory(), AddrUtil.getAddresses("1.1.1.1:" + PORT_NUMBER)); + client.addTimeoutListener(new TimeoutListener() { + + public void onTimeout(Method method, Future future) { + methodSet.add(method); + } + }); + } + + @Override + protected void tearDown() { + if (client != null) { + try { + client.shutdown(); + } catch (NullPointerException e) { + // This is a workaround for a disagreement betweewn how things + // should work in eclipse and buildr. My plan is to upgrade to + // junit4 all around and write some tests that are a bit easier + // to follow. + + // The actual problem here is a client that isn't properly + // initialized is attempting to be shut down. + } + } + } + + public void testTimeoutListener() throws ExecutionException, InterruptedException { + try { + client.get("test"); + fail(); + } catch (OperationTimeoutException e) { + } + assertTrue(methodSet.contains(get)); + + try { + client.set("test", 1, 1).get(1, MILLISECONDS); + fail(); + } catch (TimeoutException e) { + } + assertTrue(methodSet.contains(set)); + + try { + client.getBulk("test1", "test2"); + fail(); + } catch (OperationTimeoutException e) { + } + assertTrue(methodSet.contains(getBulk)); + + client.asyncGetBulk("test3").getSome(1, MILLISECONDS); + assertTrue(methodSet.contains(getBulkSome)); + + try { + client.getAndTouch("test1", 1); + fail(); + } catch (OperationTimeoutException e) { + } + assertTrue(methodSet.contains(getAndTouch)); + + try { + client.incr("test1", 1); + fail(); + } catch (OperationTimeoutException e) { + } + assertTrue(methodSet.contains(incr)); + + try { + client.decr("test1", 1); + fail(); + } catch (OperationTimeoutException e) { + } + assertTrue(methodSet.contains(decr)); + + try { + client.append("test1", 1).get(1, MILLISECONDS); + fail(); + } catch (TimeoutException e) { + } + assertTrue(methodSet.contains(append)); + + try { + client.prepend("test1", 1).get(1, MILLISECONDS); + fail(); + } catch (TimeoutException e) { + } + assertTrue(methodSet.contains(prepend)); + + try { + client.add("test1", 1, "value").get(1, MILLISECONDS); + fail(); + } catch (TimeoutException e) { + } + assertTrue(methodSet.contains(add)); + + try { + client.replace("test1", 1, "value").get(1, MILLISECONDS); + fail(); + } catch (TimeoutException e) { + } + assertTrue(methodSet.contains(replace)); + + try { + client.delete("test1").get(1, MILLISECONDS); + fail(); + } catch (TimeoutException e) { + } + assertTrue(methodSet.contains(delete)); + + assertTrue(methodSet.remove(incr)); + assertTrue(methodSet.remove(decr)); + + try { + client.asyncIncr("test1", 1).get(1, MILLISECONDS); + fail(); + } catch (TimeoutException e) { + } + assertTrue(methodSet.contains(incr)); + + try { + client.asyncDecr("test1", 1).get(1, MILLISECONDS); + fail(); + } catch (TimeoutException e) { + } + assertTrue(methodSet.contains(decr)); + } +} diff --git a/src/test/java/net/spy/memcached/internal/DummyListenableFuture.java b/src/test/java/net/spy/memcached/internal/DummyListenableFuture.java index 86d89ff21..2c803253f 100644 --- a/src/test/java/net/spy/memcached/internal/DummyListenableFuture.java +++ b/src/test/java/net/spy/memcached/internal/DummyListenableFuture.java @@ -23,11 +23,15 @@ package net.spy.memcached.internal; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import net.spy.memcached.TimeoutListener; +import net.spy.memcached.TimeoutListener.Method; + /** * A very basic {@link ListenableFuture} to verify and test basic * add, remove and notification behavior. @@ -96,4 +100,7 @@ public DummyListenableFuture removeListener( return this; } + public void setTimeoutListeners(Method method, List timeoutListeners) { + throw new UnsupportedOperationException(); + } }