diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index 4c22e7ce5b8f9b1a55fce0efa5e44f8433c451d3..a5c21180af5997dfd62e66379c6ed2850115d53f 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -25,7 +25,6 @@ import io.netty.channel.SimpleChannelInboundHandler; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -98,7 +97,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { clientConfig.getClientAsyncCallbackExecutorThreads(), 60, TimeUnit.SECONDS, - new ArrayBlockingQueue(10000), + new LinkedBlockingQueue(10000), "PublicExecutor", true); this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta); } diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java index 787e8c1bba52a9a4e5c4a8524f6d3b8cce4deeef..4483ca3b6aacc8a4cbe48627da089ffe95b1b3c0 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java @@ -18,8 +18,8 @@ package org.apache.rocketmq.rpc.impl.client; import java.util.Properties; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.RemotingClient; import org.apache.rocketmq.remoting.api.RemotingService; @@ -48,9 +48,13 @@ public class SimpleClientImpl extends RpcInstanceAbstract implements SimpleClien super(rpcCommonConfig); this.remotingClient = remotingClient; this.rpcCommonConfig = rpcCommonConfig; - this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, new ArrayBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), "clientCallServiceThread", true); + this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getServiceThreadKeepAliveTime(), + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), + "clientCallServiceThread", true); } public void initialize() { diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java index e076cbe6a99ad07c9964baec24b3ba59751a17d1..1fdda499082d87acbc070c6e33f09156b573ee21 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java @@ -18,8 +18,8 @@ package org.apache.rocketmq.rpc.impl.server; import java.util.Properties; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.RemotingServer; import org.apache.rocketmq.remoting.api.RemotingService; @@ -40,9 +40,12 @@ public class SimpleServerImpl extends RpcInstanceAbstract implements SimpleServe public SimpleServerImpl(final RpcCommonConfig remotingConfig) { this(remotingConfig, RemotingBootstrapFactory.createRemotingServer(remotingConfig)); this.rpcCommonConfig = remotingConfig; - this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, new ArrayBlockingQueue(remotingConfig.getServiceThreadBlockQueueSize()), + this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getServiceThreadKeepAliveTime(), + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(remotingConfig.getServiceThreadBlockQueueSize()), "serverCallServiceThread", true); } diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java index 2b1288c00bade2d9ce401636d6ab4cc2c7144fb0..8c0ddf2719707655ead7dd59e3443fde3cd71073 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java @@ -17,8 +17,8 @@ package org.apache.rocketmq.rpc.impl.service; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.external.ThreadUtils; @@ -45,9 +45,13 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon { this.rpcCommonConfig = rpcCommonConfig; this.rpcRequestProcessor = new RpcRequestProcessor(this.threadLocalProviderContext, this, serviceStats); - this.invokeServiceThreadPool = new ThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), 60, TimeUnit.SECONDS, - new ArrayBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), newThreadFactory("rpcInvokeServiceThread", true)); + this.invokeServiceThreadPool = new ThreadPoolExecutor( + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + 60, + TimeUnit.SECONDS, + new LinkedBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), + newThreadFactory("rpcInvokeServiceThread", true)); } diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java index c5d9a3c3a918a370b218d57f8d78840e65f688cc..2487f79dc447c1441936ed511f1c36405a157408 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java @@ -21,11 +21,11 @@ import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.nio.ByteBuffer; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.AsyncHandler; import org.apache.rocketmq.remoting.api.RemotingService; @@ -68,12 +68,20 @@ public abstract class RpcProxyCommon { public RpcProxyCommon(RpcCommonConfig rpcCommonConfig) { this.rpcCommonConfig = rpcCommonConfig; this.serviceStats = new ServiceStats(); - this.promiseExecutorService = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, new ArrayBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), "promiseExecutorService", true); - this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, new ArrayBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), "callServiceThread", true); + this.promiseExecutorService = ThreadUtils.newThreadPoolExecutor( + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getServiceThreadKeepAliveTime(), + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), + "promiseExecutorService", true); + this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getServiceThreadKeepAliveTime(), + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), + "callServiceThread", true); } private RemotingCommand createRemoteRequest(RemoteService serviceExport, Method method, Object[] args,