提交 efa8e457 编写于 作者: D duheng 提交者: von gosling

[ISSUE #396]Use separated thread pool and add monitor tools for transactional message (#397)

* Use separate threadpool and add monitor tools for transaction

* Modify log level
上级 7cae5839
......@@ -100,7 +100,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
public class BrokerController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
......@@ -131,6 +130,7 @@ public class BrokerController {
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
private final FilterServerManager filterServerManager;
private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
......@@ -146,6 +146,7 @@ public class BrokerController {
private ExecutorService clientManageExecutor;
private ExecutorService heartbeatExecutor;
private ExecutorService consumerManageExecutor;
private ExecutorService endTransactionExecutor;
private boolean updateMasterHAServerAddrPeriodically = false;
private BrokerStats brokerStats;
private InetSocketAddress storeHost;
......@@ -189,6 +190,7 @@ public class BrokerController {
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
......@@ -289,8 +291,15 @@ public class BrokerController {
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_",true));
new ThreadFactoryImpl("HeartbeatThread_", true));
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_"));
this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
......@@ -536,8 +545,8 @@ public class BrokerController {
/**
* EndTransactionProcessor
*/
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
/**
* Default
......@@ -598,10 +607,15 @@ public class BrokerController {
return this.headSlowTimeMills(this.queryThreadPoolQueue);
}
public long headSlowTimeMills4EndTransactionThreadPoolQueue() {
return this.headSlowTimeMills(this.endTransactionThreadPoolQueue);
}
public void printWaterMark() {
LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), headSlowTimeMills4EndTransactionThreadPoolQueue());
}
public MessageStore getMessageStore() {
......@@ -741,6 +755,14 @@ public class BrokerController {
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown();
}
if (this.endTransactionExecutor != null) {
this.endTransactionExecutor.shutdown();
}
}
private void unregisterBrokerAll() {
......@@ -1027,4 +1049,8 @@ public class BrokerController {
AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = transactionalMessageCheckListener;
}
public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
return endTransactionThreadPoolQueue;
}
}
......@@ -92,6 +92,9 @@ public class BrokerFastFailure {
cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
}
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
......
......@@ -1210,6 +1210,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
runtimeInfo.put("queryThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity()));
runtimeInfo.put("EndTransactionQueueSize", String.valueOf(this.brokerController.getEndTransactionThreadPoolQueue().size()));
runtimeInfo.put("EndTransactionThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getEndTransactionPoolQueueCapacity()));
runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills()));
......
......@@ -125,6 +125,7 @@ public class ServiceProvider {
public static <T> T loadClass(String name, Class<?> clazz) {
final InputStream is = getResourceAsStream(getContextClassLoader(), name);
if (is != null) {
BufferedReader reader;
try {
try {
......@@ -141,7 +142,8 @@ public class ServiceProvider {
return null;
}
} catch (Exception e) {
LOG.error("Error occured when looking for resource file " + name, e);
LOG.warn("Error occurred when looking for resource file " + name, e);
}
}
return null;
}
......
......@@ -63,7 +63,12 @@ public class BrokerConfig {
private int adminBrokerThreadPoolNums = 16;
private int clientManageThreadPoolNums = 32;
private int consumerManageThreadPoolNums = 32;
private int heartbeatThreadPoolNums = Math.min(32,Runtime.getRuntime().availableProcessors());
private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());
/**
* Thread numbers for EndTransactionProcessor
*/
private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2;
private int flushConsumerOffsetInterval = 1000 * 5;
......@@ -79,6 +84,7 @@ public class BrokerConfig {
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
private int heartbeatThreadPoolQueueCapacity = 50000;
private int endTransactionPoolQueueCapacity = 100000;
private int filterServerNums = 0;
......@@ -111,6 +117,7 @@ public class BrokerConfig {
private long waitTimeMillsInSendQueue = 200;
private long waitTimeMillsInPullQueue = 5 * 1000;
private long waitTimeMillsInHeartbeatQueue = 31 * 1000;
private long waitTimeMillsInTransactionQueue = 3 * 1000;
private long startAcceptSendRequestTimeStamp = 0L;
......@@ -156,7 +163,7 @@ public class BrokerConfig {
* The maximum number of times the message was checked, if exceed this value, this message will be discarded.
*/
@ImportantField
private int transactionCheckMax = 5;
private int transactionCheckMax = 15;
/**
* Transaction message check interval.
......@@ -701,4 +708,28 @@ public class BrokerConfig {
public void setTransactionCheckInterval(long transactionCheckInterval) {
this.transactionCheckInterval = transactionCheckInterval;
}
public int getEndTransactionThreadPoolNums() {
return endTransactionThreadPoolNums;
}
public void setEndTransactionThreadPoolNums(int endTransactionThreadPoolNums) {
this.endTransactionThreadPoolNums = endTransactionThreadPoolNums;
}
public int getEndTransactionPoolQueueCapacity() {
return endTransactionPoolQueueCapacity;
}
public void setEndTransactionPoolQueueCapacity(int endTransactionPoolQueueCapacity) {
this.endTransactionPoolQueueCapacity = endTransactionPoolQueueCapacity;
}
public long getWaitTimeMillsInTransactionQueue() {
return waitTimeMillsInTransactionQueue;
}
public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) {
this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册