From efa8e457e84a7406923594b9516545661b56765e Mon Sep 17 00:00:00 2001 From: duheng <39583243+duhengforever@users.noreply.github.com> Date: Fri, 17 Aug 2018 11:20:05 +0800 Subject: [PATCH] [ISSUE #396]Use separated thread pool and add monitor tools for transactional message (#397) * Use separate threadpool and add monitor tools for transaction * Modify log level --- .../rocketmq/broker/BrokerController.java | 34 +++++++++++++++--- .../broker/latency/BrokerFastFailure.java | 3 ++ .../processor/AdminBrokerProcessor.java | 4 +++ .../rocketmq/broker/util/ServiceProvider.java | 32 +++++++++-------- .../apache/rocketmq/common/BrokerConfig.java | 35 +++++++++++++++++-- 5 files changed, 87 insertions(+), 21 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index f45674d6..a2069229 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -100,7 +100,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStats; import org.apache.rocketmq.store.stats.BrokerStatsManager; - public class BrokerController { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME); @@ -131,6 +130,7 @@ public class BrokerController { private final BlockingQueue clientManagerThreadPoolQueue; private final BlockingQueue heartbeatThreadPoolQueue; private final BlockingQueue consumerManagerThreadPoolQueue; + private final BlockingQueue endTransactionThreadPoolQueue; private final FilterServerManager filterServerManager; private final BrokerStatsManager brokerStatsManager; private final List sendMessageHookList = new ArrayList(); @@ -146,6 +146,7 @@ public class BrokerController { private ExecutorService clientManageExecutor; private ExecutorService heartbeatExecutor; private ExecutorService consumerManageExecutor; + private ExecutorService endTransactionExecutor; private boolean updateMasterHAServerAddrPeriodically = false; private BrokerStats brokerStats; private InetSocketAddress storeHost; @@ -189,6 +190,7 @@ public class BrokerController { this.clientManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); this.heartbeatThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity()); + this.endTransactionThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getEndTransactionPoolQueueCapacity()); this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); @@ -289,8 +291,15 @@ public class BrokerController { 1000 * 60, TimeUnit.MILLISECONDS, this.heartbeatThreadPoolQueue, - new ThreadFactoryImpl("HeartbeatThread_",true)); + new ThreadFactoryImpl("HeartbeatThread_", true)); + this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor( + this.brokerConfig.getEndTransactionThreadPoolNums(), + this.brokerConfig.getEndTransactionThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.endTransactionThreadPoolQueue, + new ThreadFactoryImpl("EndTransactionThread_")); this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( @@ -536,8 +545,8 @@ public class BrokerController { /** * EndTransactionProcessor */ - this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); /** * Default @@ -598,10 +607,15 @@ public class BrokerController { return this.headSlowTimeMills(this.queryThreadPoolQueue); } + public long headSlowTimeMills4EndTransactionThreadPoolQueue() { + return this.headSlowTimeMills(this.endTransactionThreadPoolQueue); + } + public void printWaterMark() { LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue()); LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue()); LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue()); + LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), headSlowTimeMills4EndTransactionThreadPoolQueue()); } public MessageStore getMessageStore() { @@ -741,6 +755,14 @@ public class BrokerController { if (this.fileWatchService != null) { this.fileWatchService.shutdown(); } + + if (this.transactionalMessageCheckService != null) { + this.transactionalMessageCheckService.shutdown(); + } + + if (this.endTransactionExecutor != null) { + this.endTransactionExecutor.shutdown(); + } } private void unregisterBrokerAll() { @@ -1027,4 +1049,8 @@ public class BrokerController { AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) { this.transactionalMessageCheckListener = transactionalMessageCheckListener; } + + public BlockingQueue getEndTransactionThreadPoolQueue() { + return endTransactionThreadPoolQueue; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index 0a8beca2..a018f68f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -92,6 +92,9 @@ public class BrokerFastFailure { cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(), this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue()); + + cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this + .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue()); } void cleanExpiredRequestInQueue(final BlockingQueue blockingQueue, final long maxWaitTimeMillsInQueue) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 1a704a8c..356aafc4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1210,6 +1210,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { runtimeInfo.put("queryThreadPoolQueueCapacity", String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity())); + runtimeInfo.put("EndTransactionQueueSize", String.valueOf(this.brokerController.getEndTransactionThreadPoolQueue().size())); + runtimeInfo.put("EndTransactionThreadPoolQueueCapacity", + String.valueOf(this.brokerController.getBrokerConfig().getEndTransactionPoolQueueCapacity())); + runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes())); runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills())); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java index 59be7a7e..8b9b63e4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java @@ -125,23 +125,25 @@ public class ServiceProvider { public static T loadClass(String name, Class clazz) { final InputStream is = getResourceAsStream(getContextClassLoader(), name); - BufferedReader reader; - try { + if (is != null) { + BufferedReader reader; try { - reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); - } catch (java.io.UnsupportedEncodingException e) { - reader = new BufferedReader(new InputStreamReader(is)); - } - String serviceName = reader.readLine(); - reader.close(); - if (serviceName != null && !"".equals(serviceName)) { - return initService(getContextClassLoader(), serviceName, clazz); - } else { - LOG.warn("ServiceName is empty!"); - return null; + try { + reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); + } catch (java.io.UnsupportedEncodingException e) { + reader = new BufferedReader(new InputStreamReader(is)); + } + String serviceName = reader.readLine(); + reader.close(); + if (serviceName != null && !"".equals(serviceName)) { + return initService(getContextClassLoader(), serviceName, clazz); + } else { + LOG.warn("ServiceName is empty!"); + return null; + } + } catch (Exception e) { + LOG.warn("Error occurred when looking for resource file " + name, e); } - } catch (Exception e) { - LOG.error("Error occured when looking for resource file " + name, e); } return null; } diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 442f456a..963c88a1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -63,7 +63,12 @@ public class BrokerConfig { private int adminBrokerThreadPoolNums = 16; private int clientManageThreadPoolNums = 32; private int consumerManageThreadPoolNums = 32; - private int heartbeatThreadPoolNums = Math.min(32,Runtime.getRuntime().availableProcessors()); + private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors()); + + /** + * Thread numbers for EndTransactionProcessor + */ + private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2; private int flushConsumerOffsetInterval = 1000 * 5; @@ -79,6 +84,7 @@ public class BrokerConfig { private int clientManagerThreadPoolQueueCapacity = 1000000; private int consumerManagerThreadPoolQueueCapacity = 1000000; private int heartbeatThreadPoolQueueCapacity = 50000; + private int endTransactionPoolQueueCapacity = 100000; private int filterServerNums = 0; @@ -111,6 +117,7 @@ public class BrokerConfig { private long waitTimeMillsInSendQueue = 200; private long waitTimeMillsInPullQueue = 5 * 1000; private long waitTimeMillsInHeartbeatQueue = 31 * 1000; + private long waitTimeMillsInTransactionQueue = 3 * 1000; private long startAcceptSendRequestTimeStamp = 0L; @@ -156,7 +163,7 @@ public class BrokerConfig { * The maximum number of times the message was checked, if exceed this value, this message will be discarded. */ @ImportantField - private int transactionCheckMax = 5; + private int transactionCheckMax = 15; /** * Transaction message check interval. @@ -701,4 +708,28 @@ public class BrokerConfig { public void setTransactionCheckInterval(long transactionCheckInterval) { this.transactionCheckInterval = transactionCheckInterval; } + + public int getEndTransactionThreadPoolNums() { + return endTransactionThreadPoolNums; + } + + public void setEndTransactionThreadPoolNums(int endTransactionThreadPoolNums) { + this.endTransactionThreadPoolNums = endTransactionThreadPoolNums; + } + + public int getEndTransactionPoolQueueCapacity() { + return endTransactionPoolQueueCapacity; + } + + public void setEndTransactionPoolQueueCapacity(int endTransactionPoolQueueCapacity) { + this.endTransactionPoolQueueCapacity = endTransactionPoolQueueCapacity; + } + + public long getWaitTimeMillsInTransactionQueue() { + return waitTimeMillsInTransactionQueue; + } + + public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) { + this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue; + } } -- GitLab