From 489b1d8b7829990bd5b6ecef1ab5aeae685a1ec4 Mon Sep 17 00:00:00 2001 From: yukon Date: Wed, 20 Sep 2017 17:18:43 +0800 Subject: [PATCH] Use LinkedBlockingQueue for better performance --- .../impl/netty/NettyRemotingAbstract.java | 3 +-- .../rpc/impl/client/SimpleClientImpl.java | 12 ++++++---- .../rpc/impl/server/SimpleServerImpl.java | 11 ++++++---- .../rpc/impl/service/RpcInstanceAbstract.java | 12 ++++++---- .../rpc/impl/service/RpcProxyCommon.java | 22 +++++++++++++------ 5 files changed, 39 insertions(+), 21 deletions(-) diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index 4c22e7ce..a5c21180 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -25,7 +25,6 @@ import io.netty.channel.SimpleChannelInboundHandler; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -98,7 +97,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { clientConfig.getClientAsyncCallbackExecutorThreads(), 60, TimeUnit.SECONDS, - new ArrayBlockingQueue(10000), + new LinkedBlockingQueue(10000), "PublicExecutor", true); this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta); } diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java index 787e8c1b..4483ca3b 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java @@ -18,8 +18,8 @@ package org.apache.rocketmq.rpc.impl.client; import java.util.Properties; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.RemotingClient; import org.apache.rocketmq.remoting.api.RemotingService; @@ -48,9 +48,13 @@ public class SimpleClientImpl extends RpcInstanceAbstract implements SimpleClien super(rpcCommonConfig); this.remotingClient = remotingClient; this.rpcCommonConfig = rpcCommonConfig; - this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, new ArrayBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), "clientCallServiceThread", true); + this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getServiceThreadKeepAliveTime(), + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), + "clientCallServiceThread", true); } public void initialize() { diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java index e076cbe6..1fdda499 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java @@ -18,8 +18,8 @@ package org.apache.rocketmq.rpc.impl.server; import java.util.Properties; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.RemotingServer; import org.apache.rocketmq.remoting.api.RemotingService; @@ -40,9 +40,12 @@ public class SimpleServerImpl extends RpcInstanceAbstract implements SimpleServe public SimpleServerImpl(final RpcCommonConfig remotingConfig) { this(remotingConfig, RemotingBootstrapFactory.createRemotingServer(remotingConfig)); this.rpcCommonConfig = remotingConfig; - this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, new ArrayBlockingQueue(remotingConfig.getServiceThreadBlockQueueSize()), + this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getServiceThreadKeepAliveTime(), + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(remotingConfig.getServiceThreadBlockQueueSize()), "serverCallServiceThread", true); } diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java index 2b1288c0..8c0ddf27 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java @@ -17,8 +17,8 @@ package org.apache.rocketmq.rpc.impl.service; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.external.ThreadUtils; @@ -45,9 +45,13 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon { this.rpcCommonConfig = rpcCommonConfig; this.rpcRequestProcessor = new RpcRequestProcessor(this.threadLocalProviderContext, this, serviceStats); - this.invokeServiceThreadPool = new ThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), 60, TimeUnit.SECONDS, - new ArrayBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), newThreadFactory("rpcInvokeServiceThread", true)); + this.invokeServiceThreadPool = new ThreadPoolExecutor( + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + 60, + TimeUnit.SECONDS, + new LinkedBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), + newThreadFactory("rpcInvokeServiceThread", true)); } diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java index c5d9a3c3..2487f79d 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java @@ -21,11 +21,11 @@ import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.nio.ByteBuffer; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.AsyncHandler; import org.apache.rocketmq.remoting.api.RemotingService; @@ -68,12 +68,20 @@ public abstract class RpcProxyCommon { public RpcProxyCommon(RpcCommonConfig rpcCommonConfig) { this.rpcCommonConfig = rpcCommonConfig; this.serviceStats = new ServiceStats(); - this.promiseExecutorService = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, new ArrayBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), "promiseExecutorService", true); - this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, new ArrayBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), "callServiceThread", true); + this.promiseExecutorService = ThreadUtils.newThreadPoolExecutor( + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getServiceThreadKeepAliveTime(), + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), + "promiseExecutorService", true); + this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getServiceThreadKeepAliveTime(), + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), + "callServiceThread", true); } private RemotingCommand createRemoteRequest(RemoteService serviceExport, Method method, Object[] args, -- GitLab