提交 489b1d8b 编写于 作者: Y yukon

Use LinkedBlockingQueue for better performance

上级 114b6ae0
...@@ -25,7 +25,6 @@ import io.netty.channel.SimpleChannelInboundHandler; ...@@ -25,7 +25,6 @@ import io.netty.channel.SimpleChannelInboundHandler;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
...@@ -98,7 +97,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { ...@@ -98,7 +97,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
clientConfig.getClientAsyncCallbackExecutorThreads(), clientConfig.getClientAsyncCallbackExecutorThreads(),
60, 60,
TimeUnit.SECONDS, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10000), new LinkedBlockingQueue<Runnable>(10000),
"PublicExecutor", true); "PublicExecutor", true);
this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta); this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta);
} }
......
...@@ -18,8 +18,8 @@ ...@@ -18,8 +18,8 @@
package org.apache.rocketmq.rpc.impl.client; package org.apache.rocketmq.rpc.impl.client;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.api.RemotingClient; import org.apache.rocketmq.remoting.api.RemotingClient;
import org.apache.rocketmq.remoting.api.RemotingService; import org.apache.rocketmq.remoting.api.RemotingService;
...@@ -48,9 +48,13 @@ public class SimpleClientImpl extends RpcInstanceAbstract implements SimpleClien ...@@ -48,9 +48,13 @@ public class SimpleClientImpl extends RpcInstanceAbstract implements SimpleClien
super(rpcCommonConfig); super(rpcCommonConfig);
this.remotingClient = remotingClient; this.remotingClient = remotingClient;
this.rpcCommonConfig = rpcCommonConfig; this.rpcCommonConfig = rpcCommonConfig;
this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), "clientCallServiceThread", true); rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
rpcCommonConfig.getServiceThreadKeepAliveTime(),
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
"clientCallServiceThread", true);
} }
public void initialize() { public void initialize() {
......
...@@ -18,8 +18,8 @@ ...@@ -18,8 +18,8 @@
package org.apache.rocketmq.rpc.impl.server; package org.apache.rocketmq.rpc.impl.server;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.api.RemotingServer; import org.apache.rocketmq.remoting.api.RemotingServer;
import org.apache.rocketmq.remoting.api.RemotingService; import org.apache.rocketmq.remoting.api.RemotingService;
...@@ -40,9 +40,12 @@ public class SimpleServerImpl extends RpcInstanceAbstract implements SimpleServe ...@@ -40,9 +40,12 @@ public class SimpleServerImpl extends RpcInstanceAbstract implements SimpleServe
public SimpleServerImpl(final RpcCommonConfig remotingConfig) { public SimpleServerImpl(final RpcCommonConfig remotingConfig) {
this(remotingConfig, RemotingBootstrapFactory.createRemotingServer(remotingConfig)); this(remotingConfig, RemotingBootstrapFactory.createRemotingServer(remotingConfig));
this.rpcCommonConfig = remotingConfig; this.rpcCommonConfig = remotingConfig;
this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(remotingConfig.getServiceThreadBlockQueueSize()), rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
rpcCommonConfig.getServiceThreadKeepAliveTime(),
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(remotingConfig.getServiceThreadBlockQueueSize()),
"serverCallServiceThread", true); "serverCallServiceThread", true);
} }
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
package org.apache.rocketmq.rpc.impl.service; package org.apache.rocketmq.rpc.impl.service;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.external.ThreadUtils; import org.apache.rocketmq.remoting.external.ThreadUtils;
...@@ -45,9 +45,13 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon { ...@@ -45,9 +45,13 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon {
this.rpcCommonConfig = rpcCommonConfig; this.rpcCommonConfig = rpcCommonConfig;
this.rpcRequestProcessor = new RpcRequestProcessor(this.threadLocalProviderContext, this, serviceStats); this.rpcRequestProcessor = new RpcRequestProcessor(this.threadLocalProviderContext, this, serviceStats);
this.invokeServiceThreadPool = new ThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), this.invokeServiceThreadPool = new ThreadPoolExecutor(
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), 60, TimeUnit.SECONDS, rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), newThreadFactory("rpcInvokeServiceThread", true)); rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
newThreadFactory("rpcInvokeServiceThread", true));
} }
......
...@@ -21,11 +21,11 @@ import java.lang.reflect.Method; ...@@ -21,11 +21,11 @@ import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType; import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.api.AsyncHandler; import org.apache.rocketmq.remoting.api.AsyncHandler;
import org.apache.rocketmq.remoting.api.RemotingService; import org.apache.rocketmq.remoting.api.RemotingService;
...@@ -68,12 +68,20 @@ public abstract class RpcProxyCommon { ...@@ -68,12 +68,20 @@ public abstract class RpcProxyCommon {
public RpcProxyCommon(RpcCommonConfig rpcCommonConfig) { public RpcProxyCommon(RpcCommonConfig rpcCommonConfig) {
this.rpcCommonConfig = rpcCommonConfig; this.rpcCommonConfig = rpcCommonConfig;
this.serviceStats = new ServiceStats(); this.serviceStats = new ServiceStats();
this.promiseExecutorService = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), this.promiseExecutorService = ThreadUtils.newThreadPoolExecutor(
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), "promiseExecutorService", true); rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(),
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), TimeUnit.MILLISECONDS,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), "callServiceThread", true); new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
"promiseExecutorService", true);
this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
rpcCommonConfig.getServiceThreadKeepAliveTime(),
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
"callServiceThread", true);
} }
private RemotingCommand createRemoteRequest(RemoteService serviceExport, Method method, Object[] args, private RemotingCommand createRemoteRequest(RemoteService serviceExport, Method method, Object[] args,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册