diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index cd68552bd27bae4ff69632ffb0306006bcebc0df..0a6f0b45e9de20f282880b854ad209ec97f31cf3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -114,6 +114,7 @@ public class BrokerController { private final SlaveSynchronize slaveSynchronize; private final BlockingQueue sendThreadPoolQueue; private final BlockingQueue pullThreadPoolQueue; + private final BlockingQueue queryThreadPoolQueue; private final BlockingQueue clientManagerThreadPoolQueue; private final BlockingQueue consumerManagerThreadPoolQueue; private final FilterServerManager filterServerManager; @@ -126,6 +127,7 @@ public class BrokerController { private TopicConfigManager topicConfigManager; private ExecutorService sendMessageExecutor; private ExecutorService pullMessageExecutor; + private ExecutorService queryMessageExecutor; private ExecutorService adminBrokerExecutor; private ExecutorService clientManageExecutor; private ExecutorService consumerManageExecutor; @@ -163,8 +165,8 @@ public class BrokerController { this.slaveSynchronize = new SlaveSynchronize(this); this.sendThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getSendThreadPoolQueueCapacity()); - this.pullThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getPullThreadPoolQueueCapacity()); + this.queryThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getQueryThreadPoolQueueCapacity()); this.clientManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); @@ -191,6 +193,10 @@ public class BrokerController { return pullThreadPoolQueue; } + public BlockingQueue getQueryThreadPoolQueue() { + return queryThreadPoolQueue; + } + public boolean initialize() throws CloneNotSupportedException { boolean result = this.topicConfigManager.load(); @@ -237,6 +243,14 @@ public class BrokerController { this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_")); + this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( + this.brokerConfig.getQueryMessageThreadPoolNums(), + this.brokerConfig.getQueryMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.queryThreadPoolQueue, + new ThreadFactoryImpl("QueryMessageThread_")); + this.adminBrokerExecutor = Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( "AdminBrokerThread_")); @@ -404,11 +418,11 @@ public class BrokerController { * QueryMessageProcessor */ NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this); - this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor); - this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); /** * ClientManageProcessor @@ -494,9 +508,14 @@ public class BrokerController { return this.headSlowTimeMills(this.pullThreadPoolQueue); } + public long headSlowTimeMills4QueryThreadPoolQueue() { + return this.headSlowTimeMills(this.queryThreadPoolQueue); + } + public void printWaterMark() { LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue()); LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue()); + LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue()); } public MessageStore getMessageStore() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index abea4ec9e0dbbe351e0535384e68596853d28d1d..d69a78700c9e126260a256c18cf456ada6471d03 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1205,11 +1205,17 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { runtimeInfo.put("pullThreadPoolQueueCapacity", String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity())); + runtimeInfo.put("queryThreadPoolQueueSize", String.valueOf(this.brokerController.getQueryThreadPoolQueue().size())); + runtimeInfo.put("queryThreadPoolQueueCapacity", + String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity())); + runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes())); runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills())); runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue())); runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4PullThreadPoolQueue())); + runtimeInfo.put("queryThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4QueryThreadPoolQueue())); + runtimeInfo.put("earliestMessageTimeStamp", String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime())); runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp())); if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) { diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 9a208a3fad05b3f3a555204532c13e6679b78c7f..c344a7ce6d97bd0e8ba75b07158d7b9370d4276c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -59,6 +59,8 @@ public class BrokerConfig { */ private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4; private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2; + private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors(); + private int adminBrokerThreadPoolNums = 16; private int clientManageThreadPoolNums = 32; private int consumerManageThreadPoolNums = 32; @@ -73,6 +75,7 @@ public class BrokerConfig { private boolean fetchNamesrvAddrByAddressServer = false; private int sendThreadPoolQueueCapacity = 10000; private int pullThreadPoolQueueCapacity = 100000; + private int queryThreadPoolQueueCapacity = 20000; private int clientManagerThreadPoolQueueCapacity = 1000000; private int consumerManagerThreadPoolQueueCapacity = 1000000; @@ -306,6 +309,14 @@ public class BrokerConfig { this.pullMessageThreadPoolNums = pullMessageThreadPoolNums; } + public int getQueryMessageThreadPoolNums() { + return queryMessageThreadPoolNums; + } + + public void setQueryMessageThreadPoolNums(final int queryMessageThreadPoolNums) { + this.queryMessageThreadPoolNums = queryMessageThreadPoolNums; + } + public int getAdminBrokerThreadPoolNums() { return adminBrokerThreadPoolNums; } @@ -394,6 +405,14 @@ public class BrokerConfig { this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity; } + public int getQueryThreadPoolQueueCapacity() { + return queryThreadPoolQueueCapacity; + } + + public void setQueryThreadPoolQueueCapacity(final int queryThreadPoolQueueCapacity) { + this.queryThreadPoolQueueCapacity = queryThreadPoolQueueCapacity; + } + public boolean isBrokerTopicEnable() { return brokerTopicEnable; }