diff --git a/src/main/java/org/msgpack/rpc/loop/EventLoop.java b/src/main/java/org/msgpack/rpc/loop/EventLoop.java index 7a500b7..2cc8487 100644 --- a/src/main/java/org/msgpack/rpc/loop/EventLoop.java +++ b/src/main/java/org/msgpack/rpc/loop/EventLoop.java @@ -82,13 +82,21 @@ static public EventLoop start(ExecutorService workerExecutor, ExecutorService io static public EventLoop start( ExecutorService workerExecutor, ExecutorService ioExecutor, ScheduledExecutorService scheduledExecutor, MessagePack messagePack) { - return getFactory().make(workerExecutor, ioExecutor, scheduledExecutor, messagePack); + return start(workerExecutor, ioExecutor, scheduledExecutor, messagePack, + 2 * Runtime.getRuntime().availableProcessors()); + } + + static public EventLoop start( + ExecutorService workerExecutor, ExecutorService ioExecutor, + ScheduledExecutorService scheduledExecutor, MessagePack messagePack, int workerCount) { + return getFactory().make(workerExecutor, ioExecutor, scheduledExecutor, messagePack, workerCount); } private ExecutorService workerExecutor; private ExecutorService ioExecutor; private ScheduledExecutorService scheduledExecutor; private MessagePack messagePack; + private int workerCount; public MessagePack getMessagePack() { return messagePack; @@ -99,11 +107,12 @@ public void setMessagePack(MessagePack messagePack) { } public EventLoop(ExecutorService workerExecutor, ExecutorService ioExecutor, - ScheduledExecutorService scheduledExecutor, MessagePack messagePack) { + ScheduledExecutorService scheduledExecutor, MessagePack messagePack, int workerCount) { this.workerExecutor = workerExecutor; this.scheduledExecutor = scheduledExecutor; this.ioExecutor = ioExecutor; this.messagePack = messagePack; + this.workerCount = workerCount; } public ExecutorService getWorkerExecutor() { @@ -118,6 +127,10 @@ public ScheduledExecutorService getScheduledExecutor() { return scheduledExecutor; } + public int getWorkerCount() { + return workerCount; + } + public void shutdown() { scheduledExecutor.shutdown(); ioExecutor.shutdown(); diff --git a/src/main/java/org/msgpack/rpc/loop/EventLoopFactory.java b/src/main/java/org/msgpack/rpc/loop/EventLoopFactory.java index 584a164..7eacfc3 100644 --- a/src/main/java/org/msgpack/rpc/loop/EventLoopFactory.java +++ b/src/main/java/org/msgpack/rpc/loop/EventLoopFactory.java @@ -24,7 +24,7 @@ public interface EventLoopFactory { public EventLoop make(ExecutorService workerExecutor, ExecutorService ioExecutor, - ScheduledExecutorService scheduledExecutor, MessagePack messagePack); + ScheduledExecutorService scheduledExecutor, MessagePack messagePack, int workerCount); // TODO Map EventLoopConfig } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoop.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoop.java index cf71a25..5768af4 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoop.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoop.java @@ -35,8 +35,8 @@ public class NettyEventLoop extends EventLoop { public NettyEventLoop(ExecutorService workerExecutor, ExecutorService ioExecutor, - ScheduledExecutorService scheduledExecutor, MessagePack messagePack) { - super(workerExecutor, ioExecutor, scheduledExecutor, messagePack); + ScheduledExecutorService scheduledExecutor, MessagePack messagePack, int workerCount) { + super(workerExecutor, ioExecutor, scheduledExecutor, messagePack, workerCount); } private ClientSocketChannelFactory clientFactory = null; @@ -45,7 +45,7 @@ public NettyEventLoop(ExecutorService workerExecutor, public synchronized ClientSocketChannelFactory getClientFactory() { if (clientFactory == null) { clientFactory = new NioClientSocketChannelFactory(getIoExecutor(), - getWorkerExecutor()); // TODO: workerCount + getWorkerExecutor(), getWorkerCount()); } return clientFactory; } @@ -53,7 +53,7 @@ public synchronized ClientSocketChannelFactory getClientFactory() { public synchronized ServerSocketChannelFactory getServerFactory() { if (serverFactory == null) { serverFactory = new NioServerSocketChannelFactory(getIoExecutor(), - getWorkerExecutor()); // TODO: workerCount + getWorkerExecutor(), getWorkerCount()); // messages will be dispatched to worker thread on server. // see useThread(true) in NettyTcpClientTransport(). } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoopFactory.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoopFactory.java index 1e715aa..bd82234 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoopFactory.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoopFactory.java @@ -30,8 +30,8 @@ public NettyEventLoopFactory() { public EventLoop make(ExecutorService workerExecutor, ExecutorService ioExecutor, - ScheduledExecutorService scheduledExecutor, MessagePack messagePack) { + ScheduledExecutorService scheduledExecutor, MessagePack messagePack, int workerCount) { return new NettyEventLoop(workerExecutor, ioExecutor, - scheduledExecutor, messagePack); + scheduledExecutor, messagePack, workerCount); } }