提交 d8c446e8 编写于 作者: F fuyou001 提交者: von gosling

[ISSUE #314] Heartbeat handler use independently thread pool (#315)

上级 76233ed8
...@@ -16,6 +16,21 @@ ...@@ -16,6 +16,21 @@
*/ */
package org.apache.rocketmq.broker; package org.apache.rocketmq.broker;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ClientHousekeepingService; import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager; import org.apache.rocketmq.broker.client.ConsumerManager;
...@@ -85,21 +100,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; ...@@ -85,21 +100,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStats; import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class BrokerController { public class BrokerController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
...@@ -129,6 +129,7 @@ public class BrokerController { ...@@ -129,6 +129,7 @@ public class BrokerController {
private final BlockingQueue<Runnable> pullThreadPoolQueue; private final BlockingQueue<Runnable> pullThreadPoolQueue;
private final BlockingQueue<Runnable> queryThreadPoolQueue; private final BlockingQueue<Runnable> queryThreadPoolQueue;
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue; private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue; private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
private final FilterServerManager filterServerManager; private final FilterServerManager filterServerManager;
private final BrokerStatsManager brokerStatsManager; private final BrokerStatsManager brokerStatsManager;
...@@ -143,6 +144,7 @@ public class BrokerController { ...@@ -143,6 +144,7 @@ public class BrokerController {
private ExecutorService queryMessageExecutor; private ExecutorService queryMessageExecutor;
private ExecutorService adminBrokerExecutor; private ExecutorService adminBrokerExecutor;
private ExecutorService clientManageExecutor; private ExecutorService clientManageExecutor;
private ExecutorService heartbeatExecutor;
private ExecutorService consumerManageExecutor; private ExecutorService consumerManageExecutor;
private boolean updateMasterHAServerAddrPeriodically = false; private boolean updateMasterHAServerAddrPeriodically = false;
private BrokerStats brokerStats; private BrokerStats brokerStats;
...@@ -186,6 +188,7 @@ public class BrokerController { ...@@ -186,6 +188,7 @@ public class BrokerController {
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity()); this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
...@@ -280,6 +283,15 @@ public class BrokerController { ...@@ -280,6 +283,15 @@ public class BrokerController {
this.clientManagerThreadPoolQueue, this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_")); new ThreadFactoryImpl("ClientManageThread_"));
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_",true));
this.consumerManageExecutor = this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_")); "ConsumerManageThread_"));
...@@ -501,11 +513,11 @@ public class BrokerController { ...@@ -501,11 +513,11 @@ public class BrokerController {
* ClientManageProcessor * ClientManageProcessor
*/ */
ClientManageProcessor clientProcessor = new ClientManageProcessor(this); ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor); this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
...@@ -729,10 +741,6 @@ public class BrokerController { ...@@ -729,10 +741,6 @@ public class BrokerController {
if (this.fileWatchService != null) { if (this.fileWatchService != null) {
this.fileWatchService.shutdown(); this.fileWatchService.shutdown();
} }
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown(false);
}
} }
private void unregisterBrokerAll() { private void unregisterBrokerAll() {
...@@ -990,6 +998,10 @@ public class BrokerController { ...@@ -990,6 +998,10 @@ public class BrokerController {
return this.configuration; return this.configuration;
} }
public BlockingQueue<Runnable> getHeartbeatThreadPoolQueue() {
return heartbeatThreadPoolQueue;
}
public TransactionalMessageCheckService getTransactionalMessageCheckService() { public TransactionalMessageCheckService getTransactionalMessageCheckService() {
return transactionalMessageCheckService; return transactionalMessageCheckService;
} }
......
...@@ -89,6 +89,9 @@ public class BrokerFastFailure { ...@@ -89,6 +89,9 @@ public class BrokerFastFailure {
cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(), cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue()); this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
} }
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) { void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
......
...@@ -63,6 +63,7 @@ public class BrokerConfig { ...@@ -63,6 +63,7 @@ public class BrokerConfig {
private int adminBrokerThreadPoolNums = 16; private int adminBrokerThreadPoolNums = 16;
private int clientManageThreadPoolNums = 32; private int clientManageThreadPoolNums = 32;
private int consumerManageThreadPoolNums = 32; private int consumerManageThreadPoolNums = 32;
private int heartbeatThreadPoolNums = Math.min(32,Runtime.getRuntime().availableProcessors());
private int flushConsumerOffsetInterval = 1000 * 5; private int flushConsumerOffsetInterval = 1000 * 5;
...@@ -77,6 +78,7 @@ public class BrokerConfig { ...@@ -77,6 +78,7 @@ public class BrokerConfig {
private int queryThreadPoolQueueCapacity = 20000; private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000; private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000; private int consumerManagerThreadPoolQueueCapacity = 1000000;
private int heartbeatThreadPoolQueueCapacity = 50000;
private int filterServerNums = 0; private int filterServerNums = 0;
...@@ -108,6 +110,7 @@ public class BrokerConfig { ...@@ -108,6 +110,7 @@ public class BrokerConfig {
private boolean brokerFastFailureEnable = true; private boolean brokerFastFailureEnable = true;
private long waitTimeMillsInSendQueue = 200; private long waitTimeMillsInSendQueue = 200;
private long waitTimeMillsInPullQueue = 5 * 1000; private long waitTimeMillsInPullQueue = 5 * 1000;
private long waitTimeMillsInHeartbeatQueue = 31 * 1000;
private long startAcceptSendRequestTimeStamp = 0L; private long startAcceptSendRequestTimeStamp = 0L;
...@@ -643,6 +646,30 @@ public class BrokerConfig { ...@@ -643,6 +646,30 @@ public class BrokerConfig {
this.forceRegister = forceRegister; this.forceRegister = forceRegister;
} }
public int getHeartbeatThreadPoolQueueCapacity() {
return heartbeatThreadPoolQueueCapacity;
}
public void setHeartbeatThreadPoolQueueCapacity(int heartbeatThreadPoolQueueCapacity) {
this.heartbeatThreadPoolQueueCapacity = heartbeatThreadPoolQueueCapacity;
}
public int getHeartbeatThreadPoolNums() {
return heartbeatThreadPoolNums;
}
public void setHeartbeatThreadPoolNums(int heartbeatThreadPoolNums) {
this.heartbeatThreadPoolNums = heartbeatThreadPoolNums;
}
public long getWaitTimeMillsInHeartbeatQueue() {
return waitTimeMillsInHeartbeatQueue;
}
public void setWaitTimeMillsInHeartbeatQueue(long waitTimeMillsInHeartbeatQueue) {
this.waitTimeMillsInHeartbeatQueue = waitTimeMillsInHeartbeatQueue;
}
public int getRegisterNameServerPeriod() { public int getRegisterNameServerPeriod() {
return registerNameServerPeriod; return registerNameServerPeriod;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册