diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 9dbee82e70510cc87b887ff64d3ae58f045466ee..f45674d6e461e623e08a769ef0fcd1372c1ee05e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -16,6 +16,21 @@ */ package org.apache.rocketmq.broker; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.client.ClientHousekeepingService; import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; import org.apache.rocketmq.broker.client.ConsumerManager; @@ -85,21 +100,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStats; import org.apache.rocketmq.store.stats.BrokerStatsManager; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; public class BrokerController { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -129,6 +129,7 @@ public class BrokerController { private final BlockingQueue pullThreadPoolQueue; private final BlockingQueue queryThreadPoolQueue; private final BlockingQueue clientManagerThreadPoolQueue; + private final BlockingQueue heartbeatThreadPoolQueue; private final BlockingQueue consumerManagerThreadPoolQueue; private final FilterServerManager filterServerManager; private final BrokerStatsManager brokerStatsManager; @@ -143,6 +144,7 @@ public class BrokerController { private ExecutorService queryMessageExecutor; private ExecutorService adminBrokerExecutor; private ExecutorService clientManageExecutor; + private ExecutorService heartbeatExecutor; private ExecutorService consumerManageExecutor; private boolean updateMasterHAServerAddrPeriodically = false; private BrokerStats brokerStats; @@ -186,6 +188,7 @@ public class BrokerController { this.queryThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getQueryThreadPoolQueueCapacity()); this.clientManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); + this.heartbeatThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity()); this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); @@ -280,6 +283,15 @@ public class BrokerController { this.clientManagerThreadPoolQueue, new ThreadFactoryImpl("ClientManageThread_")); + this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor( + this.brokerConfig.getHeartbeatThreadPoolNums(), + this.brokerConfig.getHeartbeatThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.heartbeatThreadPoolQueue, + new ThreadFactoryImpl("HeartbeatThread_",true)); + + this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( "ConsumerManageThread_")); @@ -501,11 +513,11 @@ public class BrokerController { * ClientManageProcessor */ ClientManageProcessor clientProcessor = new ClientManageProcessor(this); - this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor); + this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor); this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor); this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); @@ -729,10 +741,6 @@ public class BrokerController { if (this.fileWatchService != null) { this.fileWatchService.shutdown(); } - - if (this.transactionalMessageCheckService != null) { - this.transactionalMessageCheckService.shutdown(false); - } } private void unregisterBrokerAll() { @@ -990,6 +998,10 @@ public class BrokerController { return this.configuration; } + public BlockingQueue getHeartbeatThreadPoolQueue() { + return heartbeatThreadPoolQueue; + } + public TransactionalMessageCheckService getTransactionalMessageCheckService() { return transactionalMessageCheckService; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index 0159d323fe1efb7e7dfbce76c8567827402acf07..0a8beca2c640d8ae32a1f8060de2df5a3c001cd6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -89,6 +89,9 @@ public class BrokerFastFailure { cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(), this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue()); + + cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(), + this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue()); } void cleanExpiredRequestInQueue(final BlockingQueue blockingQueue, final long maxWaitTimeMillsInQueue) { diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index e4486da0d1569907920c703efc2202f92fe50c27..442f456aa41e0b551b5b93557711243b3ddb7dad 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -63,6 +63,7 @@ public class BrokerConfig { private int adminBrokerThreadPoolNums = 16; private int clientManageThreadPoolNums = 32; private int consumerManageThreadPoolNums = 32; + private int heartbeatThreadPoolNums = Math.min(32,Runtime.getRuntime().availableProcessors()); private int flushConsumerOffsetInterval = 1000 * 5; @@ -77,6 +78,7 @@ public class BrokerConfig { private int queryThreadPoolQueueCapacity = 20000; private int clientManagerThreadPoolQueueCapacity = 1000000; private int consumerManagerThreadPoolQueueCapacity = 1000000; + private int heartbeatThreadPoolQueueCapacity = 50000; private int filterServerNums = 0; @@ -108,6 +110,7 @@ public class BrokerConfig { private boolean brokerFastFailureEnable = true; private long waitTimeMillsInSendQueue = 200; private long waitTimeMillsInPullQueue = 5 * 1000; + private long waitTimeMillsInHeartbeatQueue = 31 * 1000; private long startAcceptSendRequestTimeStamp = 0L; @@ -643,6 +646,30 @@ public class BrokerConfig { this.forceRegister = forceRegister; } + public int getHeartbeatThreadPoolQueueCapacity() { + return heartbeatThreadPoolQueueCapacity; + } + + public void setHeartbeatThreadPoolQueueCapacity(int heartbeatThreadPoolQueueCapacity) { + this.heartbeatThreadPoolQueueCapacity = heartbeatThreadPoolQueueCapacity; + } + + public int getHeartbeatThreadPoolNums() { + return heartbeatThreadPoolNums; + } + + public void setHeartbeatThreadPoolNums(int heartbeatThreadPoolNums) { + this.heartbeatThreadPoolNums = heartbeatThreadPoolNums; + } + + public long getWaitTimeMillsInHeartbeatQueue() { + return waitTimeMillsInHeartbeatQueue; + } + + public void setWaitTimeMillsInHeartbeatQueue(long waitTimeMillsInHeartbeatQueue) { + this.waitTimeMillsInHeartbeatQueue = waitTimeMillsInHeartbeatQueue; + } + public int getRegisterNameServerPeriod() { return registerNameServerPeriod; }