diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index f45674d6e461e623e08a769ef0fcd1372c1ee05e..a2069229110f974bf2ad76a9b46e3fed8bb54884 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -100,7 +100,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStats; import org.apache.rocketmq.store.stats.BrokerStatsManager; - public class BrokerController { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME); @@ -131,6 +130,7 @@ public class BrokerController { private final BlockingQueue clientManagerThreadPoolQueue; private final BlockingQueue heartbeatThreadPoolQueue; private final BlockingQueue consumerManagerThreadPoolQueue; + private final BlockingQueue endTransactionThreadPoolQueue; private final FilterServerManager filterServerManager; private final BrokerStatsManager brokerStatsManager; private final List sendMessageHookList = new ArrayList(); @@ -146,6 +146,7 @@ public class BrokerController { private ExecutorService clientManageExecutor; private ExecutorService heartbeatExecutor; private ExecutorService consumerManageExecutor; + private ExecutorService endTransactionExecutor; private boolean updateMasterHAServerAddrPeriodically = false; private BrokerStats brokerStats; private InetSocketAddress storeHost; @@ -189,6 +190,7 @@ public class BrokerController { this.clientManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); this.heartbeatThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity()); + this.endTransactionThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getEndTransactionPoolQueueCapacity()); this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); @@ -289,8 +291,15 @@ public class BrokerController { 1000 * 60, TimeUnit.MILLISECONDS, this.heartbeatThreadPoolQueue, - new ThreadFactoryImpl("HeartbeatThread_",true)); + new ThreadFactoryImpl("HeartbeatThread_", true)); + this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor( + this.brokerConfig.getEndTransactionThreadPoolNums(), + this.brokerConfig.getEndTransactionThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.endTransactionThreadPoolQueue, + new ThreadFactoryImpl("EndTransactionThread_")); this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( @@ -536,8 +545,8 @@ public class BrokerController { /** * EndTransactionProcessor */ - this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); /** * Default @@ -598,10 +607,15 @@ public class BrokerController { return this.headSlowTimeMills(this.queryThreadPoolQueue); } + public long headSlowTimeMills4EndTransactionThreadPoolQueue() { + return this.headSlowTimeMills(this.endTransactionThreadPoolQueue); + } + public void printWaterMark() { LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue()); LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue()); LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue()); + LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), headSlowTimeMills4EndTransactionThreadPoolQueue()); } public MessageStore getMessageStore() { @@ -741,6 +755,14 @@ public class BrokerController { if (this.fileWatchService != null) { this.fileWatchService.shutdown(); } + + if (this.transactionalMessageCheckService != null) { + this.transactionalMessageCheckService.shutdown(); + } + + if (this.endTransactionExecutor != null) { + this.endTransactionExecutor.shutdown(); + } } private void unregisterBrokerAll() { @@ -1027,4 +1049,8 @@ public class BrokerController { AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) { this.transactionalMessageCheckListener = transactionalMessageCheckListener; } + + public BlockingQueue getEndTransactionThreadPoolQueue() { + return endTransactionThreadPoolQueue; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index 0a8beca2c640d8ae32a1f8060de2df5a3c001cd6..a018f68f627f7a8c648a3b53d747bf17c44d25c7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -92,6 +92,9 @@ public class BrokerFastFailure { cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(), this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue()); + + cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this + .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue()); } void cleanExpiredRequestInQueue(final BlockingQueue blockingQueue, final long maxWaitTimeMillsInQueue) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 1a704a8c6bcc4ee207ce1d48b55b4e48081f10f6..356aafc46fb0637a7411c5042fc6170a33d2560c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1210,6 +1210,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { runtimeInfo.put("queryThreadPoolQueueCapacity", String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity())); + runtimeInfo.put("EndTransactionQueueSize", String.valueOf(this.brokerController.getEndTransactionThreadPoolQueue().size())); + runtimeInfo.put("EndTransactionThreadPoolQueueCapacity", + String.valueOf(this.brokerController.getBrokerConfig().getEndTransactionPoolQueueCapacity())); + runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes())); runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills())); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java index 59be7a7e26cd3fc3cb46dfd315209633fd601128..8b9b63e4dcbf8cdf2593b72bcda7c8a4e2af7f17 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java @@ -125,23 +125,25 @@ public class ServiceProvider { public static T loadClass(String name, Class clazz) { final InputStream is = getResourceAsStream(getContextClassLoader(), name); - BufferedReader reader; - try { + if (is != null) { + BufferedReader reader; try { - reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); - } catch (java.io.UnsupportedEncodingException e) { - reader = new BufferedReader(new InputStreamReader(is)); - } - String serviceName = reader.readLine(); - reader.close(); - if (serviceName != null && !"".equals(serviceName)) { - return initService(getContextClassLoader(), serviceName, clazz); - } else { - LOG.warn("ServiceName is empty!"); - return null; + try { + reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); + } catch (java.io.UnsupportedEncodingException e) { + reader = new BufferedReader(new InputStreamReader(is)); + } + String serviceName = reader.readLine(); + reader.close(); + if (serviceName != null && !"".equals(serviceName)) { + return initService(getContextClassLoader(), serviceName, clazz); + } else { + LOG.warn("ServiceName is empty!"); + return null; + } + } catch (Exception e) { + LOG.warn("Error occurred when looking for resource file " + name, e); } - } catch (Exception e) { - LOG.error("Error occured when looking for resource file " + name, e); } return null; } diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 442f456aa41e0b551b5b93557711243b3ddb7dad..963c88a133a46a22b72feb0aa49510dcaa9ca25e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -63,7 +63,12 @@ public class BrokerConfig { private int adminBrokerThreadPoolNums = 16; private int clientManageThreadPoolNums = 32; private int consumerManageThreadPoolNums = 32; - private int heartbeatThreadPoolNums = Math.min(32,Runtime.getRuntime().availableProcessors()); + private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors()); + + /** + * Thread numbers for EndTransactionProcessor + */ + private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2; private int flushConsumerOffsetInterval = 1000 * 5; @@ -79,6 +84,7 @@ public class BrokerConfig { private int clientManagerThreadPoolQueueCapacity = 1000000; private int consumerManagerThreadPoolQueueCapacity = 1000000; private int heartbeatThreadPoolQueueCapacity = 50000; + private int endTransactionPoolQueueCapacity = 100000; private int filterServerNums = 0; @@ -111,6 +117,7 @@ public class BrokerConfig { private long waitTimeMillsInSendQueue = 200; private long waitTimeMillsInPullQueue = 5 * 1000; private long waitTimeMillsInHeartbeatQueue = 31 * 1000; + private long waitTimeMillsInTransactionQueue = 3 * 1000; private long startAcceptSendRequestTimeStamp = 0L; @@ -156,7 +163,7 @@ public class BrokerConfig { * The maximum number of times the message was checked, if exceed this value, this message will be discarded. */ @ImportantField - private int transactionCheckMax = 5; + private int transactionCheckMax = 15; /** * Transaction message check interval. @@ -701,4 +708,28 @@ public class BrokerConfig { public void setTransactionCheckInterval(long transactionCheckInterval) { this.transactionCheckInterval = transactionCheckInterval; } + + public int getEndTransactionThreadPoolNums() { + return endTransactionThreadPoolNums; + } + + public void setEndTransactionThreadPoolNums(int endTransactionThreadPoolNums) { + this.endTransactionThreadPoolNums = endTransactionThreadPoolNums; + } + + public int getEndTransactionPoolQueueCapacity() { + return endTransactionPoolQueueCapacity; + } + + public void setEndTransactionPoolQueueCapacity(int endTransactionPoolQueueCapacity) { + this.endTransactionPoolQueueCapacity = endTransactionPoolQueueCapacity; + } + + public long getWaitTimeMillsInTransactionQueue() { + return waitTimeMillsInTransactionQueue; + } + + public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) { + this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue; + } }