diff --git a/remoting-core/pom.xml b/remoting-core/pom.xml index 702b826a89a8946d427eb17fc35f79503bca7f82..997011b329b97335361d52841fafbd3978326f3a 100644 --- a/remoting-core/pom.xml +++ b/remoting-core/pom.xml @@ -77,7 +77,7 @@ io.netty netty-all - 4.1.6.Final + 4.1.15.Final com.alibaba diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java index 1a80d20eab7aa1b55abe9d4f32a84343b2f3c6f6..5a5008918fbe53d203f76ceca6906a267d0b7c27 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java @@ -20,6 +20,7 @@ package org.apache.rocketmq.remoting.external; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -43,25 +44,32 @@ public final class ThreadUtils { int maximumPoolSize, long keepAliveTime, TimeUnit unit, - BlockingQueue workQueue, String processName, boolean isDaemon) { - return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon)); + BlockingQueue workQueue, + String processName, boolean isDaemon) { + return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newGenericThreadFactory(processName, isDaemon)); + } + + public static ExecutorService newFixedThreadPool(int nThreads, int workQueueCapacity, String processName, boolean isDaemon) { + return new ThreadPoolExecutor( + nThreads, + nThreads, + 0, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(workQueueCapacity), + newGenericThreadFactory(processName, isDaemon)); } public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) { - return Executors.newSingleThreadExecutor(newThreadFactory(processName, isDaemon)); + return Executors.newSingleThreadExecutor(newGenericThreadFactory(processName, isDaemon)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) { - return Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, isDaemon)); + return Executors.newSingleThreadScheduledExecutor(newGenericThreadFactory(processName, isDaemon)); } public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName, boolean isDaemon) { - return Executors.newScheduledThreadPool(nThreads, newThreadFactory(processName, isDaemon)); - } - - public static ThreadFactory newThreadFactory(String processName, boolean isDaemon) { - return newGenericThreadFactory("Remoting-" + processName, isDaemon); + return Executors.newScheduledThreadPool(nThreads, newGenericThreadFactory(processName, isDaemon)); } public static ThreadFactory newGenericThreadFactory(String processName) { diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index a5c21180af5997dfd62e66379c6ed2850115d53f..82b17f4816504afa2deab12f24bcf09a891379cc 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -92,13 +92,9 @@ public abstract class NettyRemotingAbstract implements RemotingService { NettyRemotingAbstract(RemotingConfig clientConfig, RemotingCommandFactoryMeta remotingCommandFactoryMeta) { this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true); this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true); - this.publicExecutor = ThreadUtils.newThreadPoolExecutor( + this.publicExecutor = ThreadUtils.newFixedThreadPool( clientConfig.getClientAsyncCallbackExecutorThreads(), - clientConfig.getClientAsyncCallbackExecutorThreads(), - 60, - TimeUnit.SECONDS, - new LinkedBlockingQueue(10000), - "PublicExecutor", true); + 10000, "Remoting-PublicExecutor", true); this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta); } diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java index 4483ca3b6aacc8a4cbe48627da089ffe95b1b3c0..35931b2f62565367f9200a4f1f200600f4b66840 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java @@ -19,7 +19,6 @@ package org.apache.rocketmq.rpc.impl.client; import java.util.Properties; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.RemotingClient; import org.apache.rocketmq.remoting.api.RemotingService; @@ -48,13 +47,10 @@ public class SimpleClientImpl extends RpcInstanceAbstract implements SimpleClien super(rpcCommonConfig); this.remotingClient = remotingClient; this.rpcCommonConfig = rpcCommonConfig; - this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( + this.callServiceThreadPool = ThreadUtils.newFixedThreadPool( rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), - "clientCallServiceThread", true); + rpcCommonConfig.getServiceThreadBlockQueueSize(), + "RPC-ClientCallServiceThread", true); } public void initialize() { diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java index 1fdda499082d87acbc070c6e33f09156b573ee21..469e0c7029567b26a18c1ae031d31cb8a258c48c 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java @@ -19,7 +19,6 @@ package org.apache.rocketmq.rpc.impl.server; import java.util.Properties; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.RemotingServer; import org.apache.rocketmq.remoting.api.RemotingService; @@ -40,13 +39,10 @@ public class SimpleServerImpl extends RpcInstanceAbstract implements SimpleServe public SimpleServerImpl(final RpcCommonConfig remotingConfig) { this(remotingConfig, RemotingBootstrapFactory.createRemotingServer(remotingConfig)); this.rpcCommonConfig = remotingConfig; - this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( + this.callServiceThreadPool = ThreadUtils.newFixedThreadPool( rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(remotingConfig.getServiceThreadBlockQueueSize()), - "serverCallServiceThread", true); + remotingConfig.getServiceThreadBlockQueueSize(), + "RPC-ServerCallServiceThread", true); } public SimpleServerImpl(final RpcCommonConfig remotingConfig, final RemotingServer remotingServer) { diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java index 8c0ddf2719707655ead7dd59e3443fde3cd71073..7ece4a8626d8e084da14cc9c52ea9f96b31f10dc 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java @@ -18,8 +18,6 @@ package org.apache.rocketmq.rpc.impl.service; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.external.ThreadUtils; import org.apache.rocketmq.rpc.impl.command.RpcRequestCode; @@ -29,15 +27,13 @@ import org.apache.rocketmq.rpc.impl.metrics.DefaultServiceAPIImpl; import org.apache.rocketmq.rpc.impl.metrics.ThreadStats; import org.apache.rocketmq.rpc.impl.processor.RpcRequestProcessor; -import static org.apache.rocketmq.remoting.external.ThreadUtils.newThreadFactory; - public abstract class RpcInstanceAbstract extends RpcProxyCommon { protected final RpcRequestProcessor rpcRequestProcessor; protected final ThreadLocal threadLocalProviderContext = new ThreadLocal(); protected final RpcCommonConfig rpcCommonConfig; protected ThreadStats threadStats; private DefaultServiceAPIImpl defaultServiceAPI; - private ThreadPoolExecutor invokeServiceThreadPool; + private ExecutorService invokeServiceThreadPool; public RpcInstanceAbstract(RpcCommonConfig rpcCommonConfig) { super(rpcCommonConfig); @@ -45,13 +41,9 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon { this.rpcCommonConfig = rpcCommonConfig; this.rpcRequestProcessor = new RpcRequestProcessor(this.threadLocalProviderContext, this, serviceStats); - this.invokeServiceThreadPool = new ThreadPoolExecutor( - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + this.invokeServiceThreadPool = ThreadUtils.newFixedThreadPool( rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - 60, - TimeUnit.SECONDS, - new LinkedBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), - newThreadFactory("rpcInvokeServiceThread", true)); + rpcCommonConfig.getServiceThreadBlockQueueSize(),"RPC-InvokeServiceThread", true); } @@ -81,11 +73,11 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon { public abstract void registerServiceListener(); - public ThreadPoolExecutor getInvokeServiceThreadPool() { + public ExecutorService getInvokeServiceThreadPool() { return invokeServiceThreadPool; } - public void setInvokeServiceThreadPool(ThreadPoolExecutor invokeServiceThreadPool) { + public void setInvokeServiceThreadPool(ExecutorService invokeServiceThreadPool) { this.invokeServiceThreadPool = invokeServiceThreadPool; } diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java index 2487f79dc447c1441936ed511f1c36405a157408..ac8c208ffe1c37d672e9501e4eb5af34ddb259b8 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java @@ -25,7 +25,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.AsyncHandler; import org.apache.rocketmq.remoting.api.RemotingService; @@ -68,20 +67,14 @@ public abstract class RpcProxyCommon { public RpcProxyCommon(RpcCommonConfig rpcCommonConfig) { this.rpcCommonConfig = rpcCommonConfig; this.serviceStats = new ServiceStats(); - this.promiseExecutorService = ThreadUtils.newThreadPoolExecutor( + this.promiseExecutorService = ThreadUtils.newFixedThreadPool( rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getServiceThreadBlockQueueSize(), + "Remoting-PromiseExecutorService", true); + this.callServiceThreadPool = ThreadUtils.newFixedThreadPool( rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), - "promiseExecutorService", true); - this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(rpcCommonConfig.getServiceThreadBlockQueueSize()), - "callServiceThread", true); + rpcCommonConfig.getServiceThreadBlockQueueSize(), + "Remoting-CallServiceThread", true); } private RemotingCommand createRemoteRequest(RemoteService serviceExport, Method method, Object[] args,