提交 6593294f 编写于 作者: Y yukon

Polish thread poll create method

上级 489b1d8b
...@@ -77,7 +77,7 @@ ...@@ -77,7 +77,7 @@
<dependency> <dependency>
<groupId>io.netty</groupId> <groupId>io.netty</groupId>
<artifactId>netty-all</artifactId> <artifactId>netty-all</artifactId>
<version>4.1.6.Final</version> <version>4.1.15.Final</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.alibaba</groupId> <groupId>com.alibaba</groupId>
......
...@@ -20,6 +20,7 @@ package org.apache.rocketmq.remoting.external; ...@@ -20,6 +20,7 @@ package org.apache.rocketmq.remoting.external;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
...@@ -43,25 +44,32 @@ public final class ThreadUtils { ...@@ -43,25 +44,32 @@ public final class ThreadUtils {
int maximumPoolSize, int maximumPoolSize,
long keepAliveTime, long keepAliveTime,
TimeUnit unit, TimeUnit unit,
BlockingQueue<Runnable> workQueue, String processName, boolean isDaemon) { BlockingQueue<Runnable> workQueue,
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon)); String processName, boolean isDaemon) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newGenericThreadFactory(processName, isDaemon));
}
public static ExecutorService newFixedThreadPool(int nThreads, int workQueueCapacity, String processName, boolean isDaemon) {
return new ThreadPoolExecutor(
nThreads,
nThreads,
0,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(workQueueCapacity),
newGenericThreadFactory(processName, isDaemon));
} }
public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) { public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) {
return Executors.newSingleThreadExecutor(newThreadFactory(processName, isDaemon)); return Executors.newSingleThreadExecutor(newGenericThreadFactory(processName, isDaemon));
} }
public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) { public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) {
return Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, isDaemon)); return Executors.newSingleThreadScheduledExecutor(newGenericThreadFactory(processName, isDaemon));
} }
public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName, public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName,
boolean isDaemon) { boolean isDaemon) {
return Executors.newScheduledThreadPool(nThreads, newThreadFactory(processName, isDaemon)); return Executors.newScheduledThreadPool(nThreads, newGenericThreadFactory(processName, isDaemon));
}
public static ThreadFactory newThreadFactory(String processName, boolean isDaemon) {
return newGenericThreadFactory("Remoting-" + processName, isDaemon);
} }
public static ThreadFactory newGenericThreadFactory(String processName) { public static ThreadFactory newGenericThreadFactory(String processName) {
......
...@@ -92,13 +92,9 @@ public abstract class NettyRemotingAbstract implements RemotingService { ...@@ -92,13 +92,9 @@ public abstract class NettyRemotingAbstract implements RemotingService {
NettyRemotingAbstract(RemotingConfig clientConfig, RemotingCommandFactoryMeta remotingCommandFactoryMeta) { NettyRemotingAbstract(RemotingConfig clientConfig, RemotingCommandFactoryMeta remotingCommandFactoryMeta) {
this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true); this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true);
this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true); this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true);
this.publicExecutor = ThreadUtils.newThreadPoolExecutor( this.publicExecutor = ThreadUtils.newFixedThreadPool(
clientConfig.getClientAsyncCallbackExecutorThreads(), clientConfig.getClientAsyncCallbackExecutorThreads(),
clientConfig.getClientAsyncCallbackExecutorThreads(), 10000, "Remoting-PublicExecutor", true);
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(10000),
"PublicExecutor", true);
this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta); this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta);
} }
......
...@@ -19,7 +19,6 @@ package org.apache.rocketmq.rpc.impl.client; ...@@ -19,7 +19,6 @@ package org.apache.rocketmq.rpc.impl.client;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.api.RemotingClient; import org.apache.rocketmq.remoting.api.RemotingClient;
import org.apache.rocketmq.remoting.api.RemotingService; import org.apache.rocketmq.remoting.api.RemotingService;
...@@ -48,13 +47,10 @@ public class SimpleClientImpl extends RpcInstanceAbstract implements SimpleClien ...@@ -48,13 +47,10 @@ public class SimpleClientImpl extends RpcInstanceAbstract implements SimpleClien
super(rpcCommonConfig); super(rpcCommonConfig);
this.remotingClient = remotingClient; this.remotingClient = remotingClient;
this.rpcCommonConfig = rpcCommonConfig; this.rpcCommonConfig = rpcCommonConfig;
this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( this.callServiceThreadPool = ThreadUtils.newFixedThreadPool(
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadBlockQueueSize(),
rpcCommonConfig.getServiceThreadKeepAliveTime(), "RPC-ClientCallServiceThread", true);
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
"clientCallServiceThread", true);
} }
public void initialize() { public void initialize() {
......
...@@ -19,7 +19,6 @@ package org.apache.rocketmq.rpc.impl.server; ...@@ -19,7 +19,6 @@ package org.apache.rocketmq.rpc.impl.server;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.api.RemotingServer; import org.apache.rocketmq.remoting.api.RemotingServer;
import org.apache.rocketmq.remoting.api.RemotingService; import org.apache.rocketmq.remoting.api.RemotingService;
...@@ -40,13 +39,10 @@ public class SimpleServerImpl extends RpcInstanceAbstract implements SimpleServe ...@@ -40,13 +39,10 @@ public class SimpleServerImpl extends RpcInstanceAbstract implements SimpleServe
public SimpleServerImpl(final RpcCommonConfig remotingConfig) { public SimpleServerImpl(final RpcCommonConfig remotingConfig) {
this(remotingConfig, RemotingBootstrapFactory.createRemotingServer(remotingConfig)); this(remotingConfig, RemotingBootstrapFactory.createRemotingServer(remotingConfig));
this.rpcCommonConfig = remotingConfig; this.rpcCommonConfig = remotingConfig;
this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( this.callServiceThreadPool = ThreadUtils.newFixedThreadPool(
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), remotingConfig.getServiceThreadBlockQueueSize(),
rpcCommonConfig.getServiceThreadKeepAliveTime(), "RPC-ServerCallServiceThread", true);
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(remotingConfig.getServiceThreadBlockQueueSize()),
"serverCallServiceThread", true);
} }
public SimpleServerImpl(final RpcCommonConfig remotingConfig, final RemotingServer remotingServer) { public SimpleServerImpl(final RpcCommonConfig remotingConfig, final RemotingServer remotingServer) {
......
...@@ -18,8 +18,6 @@ ...@@ -18,8 +18,6 @@
package org.apache.rocketmq.rpc.impl.service; package org.apache.rocketmq.rpc.impl.service;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.external.ThreadUtils; import org.apache.rocketmq.remoting.external.ThreadUtils;
import org.apache.rocketmq.rpc.impl.command.RpcRequestCode; import org.apache.rocketmq.rpc.impl.command.RpcRequestCode;
...@@ -29,15 +27,13 @@ import org.apache.rocketmq.rpc.impl.metrics.DefaultServiceAPIImpl; ...@@ -29,15 +27,13 @@ import org.apache.rocketmq.rpc.impl.metrics.DefaultServiceAPIImpl;
import org.apache.rocketmq.rpc.impl.metrics.ThreadStats; import org.apache.rocketmq.rpc.impl.metrics.ThreadStats;
import org.apache.rocketmq.rpc.impl.processor.RpcRequestProcessor; import org.apache.rocketmq.rpc.impl.processor.RpcRequestProcessor;
import static org.apache.rocketmq.remoting.external.ThreadUtils.newThreadFactory;
public abstract class RpcInstanceAbstract extends RpcProxyCommon { public abstract class RpcInstanceAbstract extends RpcProxyCommon {
protected final RpcRequestProcessor rpcRequestProcessor; protected final RpcRequestProcessor rpcRequestProcessor;
protected final ThreadLocal<RpcProviderContext> threadLocalProviderContext = new ThreadLocal<RpcProviderContext>(); protected final ThreadLocal<RpcProviderContext> threadLocalProviderContext = new ThreadLocal<RpcProviderContext>();
protected final RpcCommonConfig rpcCommonConfig; protected final RpcCommonConfig rpcCommonConfig;
protected ThreadStats threadStats; protected ThreadStats threadStats;
private DefaultServiceAPIImpl defaultServiceAPI; private DefaultServiceAPIImpl defaultServiceAPI;
private ThreadPoolExecutor invokeServiceThreadPool; private ExecutorService invokeServiceThreadPool;
public RpcInstanceAbstract(RpcCommonConfig rpcCommonConfig) { public RpcInstanceAbstract(RpcCommonConfig rpcCommonConfig) {
super(rpcCommonConfig); super(rpcCommonConfig);
...@@ -45,13 +41,9 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon { ...@@ -45,13 +41,9 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon {
this.rpcCommonConfig = rpcCommonConfig; this.rpcCommonConfig = rpcCommonConfig;
this.rpcRequestProcessor = new RpcRequestProcessor(this.threadLocalProviderContext, this, serviceStats); this.rpcRequestProcessor = new RpcRequestProcessor(this.threadLocalProviderContext, this, serviceStats);
this.invokeServiceThreadPool = new ThreadPoolExecutor( this.invokeServiceThreadPool = ThreadUtils.newFixedThreadPool(
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
60, rpcCommonConfig.getServiceThreadBlockQueueSize(),"RPC-InvokeServiceThread", true);
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
newThreadFactory("rpcInvokeServiceThread", true));
} }
...@@ -81,11 +73,11 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon { ...@@ -81,11 +73,11 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon {
public abstract void registerServiceListener(); public abstract void registerServiceListener();
public ThreadPoolExecutor getInvokeServiceThreadPool() { public ExecutorService getInvokeServiceThreadPool() {
return invokeServiceThreadPool; return invokeServiceThreadPool;
} }
public void setInvokeServiceThreadPool(ThreadPoolExecutor invokeServiceThreadPool) { public void setInvokeServiceThreadPool(ExecutorService invokeServiceThreadPool) {
this.invokeServiceThreadPool = invokeServiceThreadPool; this.invokeServiceThreadPool = invokeServiceThreadPool;
} }
......
...@@ -25,7 +25,6 @@ import java.util.concurrent.Callable; ...@@ -25,7 +25,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.api.AsyncHandler; import org.apache.rocketmq.remoting.api.AsyncHandler;
import org.apache.rocketmq.remoting.api.RemotingService; import org.apache.rocketmq.remoting.api.RemotingService;
...@@ -68,20 +67,14 @@ public abstract class RpcProxyCommon { ...@@ -68,20 +67,14 @@ public abstract class RpcProxyCommon {
public RpcProxyCommon(RpcCommonConfig rpcCommonConfig) { public RpcProxyCommon(RpcCommonConfig rpcCommonConfig) {
this.rpcCommonConfig = rpcCommonConfig; this.rpcCommonConfig = rpcCommonConfig;
this.serviceStats = new ServiceStats(); this.serviceStats = new ServiceStats();
this.promiseExecutorService = ThreadUtils.newThreadPoolExecutor( this.promiseExecutorService = ThreadUtils.newFixedThreadPool(
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
rpcCommonConfig.getServiceThreadBlockQueueSize(),
"Remoting-PromiseExecutorService", true);
this.callServiceThreadPool = ThreadUtils.newFixedThreadPool(
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
rpcCommonConfig.getServiceThreadKeepAliveTime(), rpcCommonConfig.getServiceThreadBlockQueueSize(),
TimeUnit.MILLISECONDS, "Remoting-CallServiceThread", true);
new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
"promiseExecutorService", true);
this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
rpcCommonConfig.getServiceThreadKeepAliveTime(),
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
"callServiceThread", true);
} }
private RemotingCommand createRemoteRequest(RemoteService serviceExport, Method method, Object[] args, private RemotingCommand createRemoteRequest(RemoteService serviceExport, Method method, Object[] args,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册