提交 81884c8d 编写于 作者: Y yukon

[ROCKETMQ-312] Use independent thread pool for QueryMessageProcessor

Author: yukon <yukon@apache.org>

Closes #192 from zhouxinyu/ROCKETMQ-312.
上级 5a2e7109
...@@ -114,6 +114,7 @@ public class BrokerController { ...@@ -114,6 +114,7 @@ public class BrokerController {
private final SlaveSynchronize slaveSynchronize; private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue; private final BlockingQueue<Runnable> sendThreadPoolQueue;
private final BlockingQueue<Runnable> pullThreadPoolQueue; private final BlockingQueue<Runnable> pullThreadPoolQueue;
private final BlockingQueue<Runnable> queryThreadPoolQueue;
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue; private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue; private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
private final FilterServerManager filterServerManager; private final FilterServerManager filterServerManager;
...@@ -126,6 +127,7 @@ public class BrokerController { ...@@ -126,6 +127,7 @@ public class BrokerController {
private TopicConfigManager topicConfigManager; private TopicConfigManager topicConfigManager;
private ExecutorService sendMessageExecutor; private ExecutorService sendMessageExecutor;
private ExecutorService pullMessageExecutor; private ExecutorService pullMessageExecutor;
private ExecutorService queryMessageExecutor;
private ExecutorService adminBrokerExecutor; private ExecutorService adminBrokerExecutor;
private ExecutorService clientManageExecutor; private ExecutorService clientManageExecutor;
private ExecutorService consumerManageExecutor; private ExecutorService consumerManageExecutor;
...@@ -163,8 +165,8 @@ public class BrokerController { ...@@ -163,8 +165,8 @@ public class BrokerController {
this.slaveSynchronize = new SlaveSynchronize(this); this.slaveSynchronize = new SlaveSynchronize(this);
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity()); this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity()); this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
...@@ -191,6 +193,10 @@ public class BrokerController { ...@@ -191,6 +193,10 @@ public class BrokerController {
return pullThreadPoolQueue; return pullThreadPoolQueue;
} }
public BlockingQueue<Runnable> getQueryThreadPoolQueue() {
return queryThreadPoolQueue;
}
public boolean initialize() throws CloneNotSupportedException { public boolean initialize() throws CloneNotSupportedException {
boolean result = this.topicConfigManager.load(); boolean result = this.topicConfigManager.load();
...@@ -237,6 +243,14 @@ public class BrokerController { ...@@ -237,6 +243,14 @@ public class BrokerController {
this.pullThreadPoolQueue, this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_")); new ThreadFactoryImpl("PullMessageThread_"));
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_"));
this.adminBrokerExecutor = this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
"AdminBrokerThread_")); "AdminBrokerThread_"));
...@@ -404,11 +418,11 @@ public class BrokerController { ...@@ -404,11 +418,11 @@ public class BrokerController {
* QueryMessageProcessor * QueryMessageProcessor
*/ */
NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this); NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor); this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor); this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
/** /**
* ClientManageProcessor * ClientManageProcessor
...@@ -494,9 +508,14 @@ public class BrokerController { ...@@ -494,9 +508,14 @@ public class BrokerController {
return this.headSlowTimeMills(this.pullThreadPoolQueue); return this.headSlowTimeMills(this.pullThreadPoolQueue);
} }
public long headSlowTimeMills4QueryThreadPoolQueue() {
return this.headSlowTimeMills(this.queryThreadPoolQueue);
}
public void printWaterMark() { public void printWaterMark() {
LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue()); LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue()); LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue());
} }
public MessageStore getMessageStore() { public MessageStore getMessageStore() {
......
...@@ -1205,11 +1205,17 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { ...@@ -1205,11 +1205,17 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
runtimeInfo.put("pullThreadPoolQueueCapacity", runtimeInfo.put("pullThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity())); String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity()));
runtimeInfo.put("queryThreadPoolQueueSize", String.valueOf(this.brokerController.getQueryThreadPoolQueue().size()));
runtimeInfo.put("queryThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity()));
runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes())); runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills())); runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills()));
runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue())); runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue()));
runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4PullThreadPoolQueue())); runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4PullThreadPoolQueue()));
runtimeInfo.put("queryThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4QueryThreadPoolQueue()));
runtimeInfo.put("earliestMessageTimeStamp", String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime())); runtimeInfo.put("earliestMessageTimeStamp", String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime()));
runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp())); runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp()));
if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) { if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) {
......
...@@ -59,6 +59,8 @@ public class BrokerConfig { ...@@ -59,6 +59,8 @@ public class BrokerConfig {
*/ */
private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4; private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2; private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
private int adminBrokerThreadPoolNums = 16; private int adminBrokerThreadPoolNums = 16;
private int clientManageThreadPoolNums = 32; private int clientManageThreadPoolNums = 32;
private int consumerManageThreadPoolNums = 32; private int consumerManageThreadPoolNums = 32;
...@@ -73,6 +75,7 @@ public class BrokerConfig { ...@@ -73,6 +75,7 @@ public class BrokerConfig {
private boolean fetchNamesrvAddrByAddressServer = false; private boolean fetchNamesrvAddrByAddressServer = false;
private int sendThreadPoolQueueCapacity = 10000; private int sendThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000; private int pullThreadPoolQueueCapacity = 100000;
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000; private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000; private int consumerManagerThreadPoolQueueCapacity = 1000000;
...@@ -306,6 +309,14 @@ public class BrokerConfig { ...@@ -306,6 +309,14 @@ public class BrokerConfig {
this.pullMessageThreadPoolNums = pullMessageThreadPoolNums; this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
} }
public int getQueryMessageThreadPoolNums() {
return queryMessageThreadPoolNums;
}
public void setQueryMessageThreadPoolNums(final int queryMessageThreadPoolNums) {
this.queryMessageThreadPoolNums = queryMessageThreadPoolNums;
}
public int getAdminBrokerThreadPoolNums() { public int getAdminBrokerThreadPoolNums() {
return adminBrokerThreadPoolNums; return adminBrokerThreadPoolNums;
} }
...@@ -394,6 +405,14 @@ public class BrokerConfig { ...@@ -394,6 +405,14 @@ public class BrokerConfig {
this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity; this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
} }
public int getQueryThreadPoolQueueCapacity() {
return queryThreadPoolQueueCapacity;
}
public void setQueryThreadPoolQueueCapacity(final int queryThreadPoolQueueCapacity) {
this.queryThreadPoolQueueCapacity = queryThreadPoolQueueCapacity;
}
public boolean isBrokerTopicEnable() { public boolean isBrokerTopicEnable() {
return brokerTopicEnable; return brokerTopicEnable;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册