diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index f45674d6e461e623e08a769ef0fcd1372c1ee05e..e7ef46d0c61607e8d81eb478ecdcec373947e6f2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -100,7 +100,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStats; import org.apache.rocketmq.store.stats.BrokerStatsManager; - public class BrokerController { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME); @@ -131,6 +130,7 @@ public class BrokerController { private final BlockingQueue clientManagerThreadPoolQueue; private final BlockingQueue heartbeatThreadPoolQueue; private final BlockingQueue consumerManagerThreadPoolQueue; + private final BlockingQueue endTransactionThreadPoolQueue; private final FilterServerManager filterServerManager; private final BrokerStatsManager brokerStatsManager; private final List sendMessageHookList = new ArrayList(); @@ -146,6 +146,7 @@ public class BrokerController { private ExecutorService clientManageExecutor; private ExecutorService heartbeatExecutor; private ExecutorService consumerManageExecutor; + private ExecutorService endTransactionExecutor; private boolean updateMasterHAServerAddrPeriodically = false; private BrokerStats brokerStats; private InetSocketAddress storeHost; @@ -189,6 +190,7 @@ public class BrokerController { this.clientManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); this.heartbeatThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity()); + this.endTransactionThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getEndTransactionPoolQueueCapacity()); this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); @@ -289,8 +291,15 @@ public class BrokerController { 1000 * 60, TimeUnit.MILLISECONDS, this.heartbeatThreadPoolQueue, - new ThreadFactoryImpl("HeartbeatThread_",true)); + new ThreadFactoryImpl("HeartbeatThread_", true)); + this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor( + this.brokerConfig.getEndTransactionThreadPoolNums(), + this.brokerConfig.getEndTransactionThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.endTransactionThreadPoolQueue, + new ThreadFactoryImpl("EndTransactionThread_")); this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( @@ -536,8 +545,8 @@ public class BrokerController { /** * EndTransactionProcessor */ - this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); /** * Default @@ -598,10 +607,15 @@ public class BrokerController { return this.headSlowTimeMills(this.queryThreadPoolQueue); } + public long headSlowTimeMills4EndTransactionThreadPoolQueue() { + return this.headSlowTimeMills(this.endTransactionThreadPoolQueue); + } + public void printWaterMark() { LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue()); LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue()); LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue()); + LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), headSlowTimeMills4EndTransactionThreadPoolQueue()); } public MessageStore getMessageStore() { @@ -741,6 +755,13 @@ public class BrokerController { if (this.fileWatchService != null) { this.fileWatchService.shutdown(); } + if (this.transactionalMessageCheckService != null) { + this.transactionalMessageCheckService.shutdown(false); + } + + if (this.endTransactionExecutor != null) { + this.endTransactionExecutor.shutdown(); + } } private void unregisterBrokerAll() { @@ -1027,4 +1048,8 @@ public class BrokerController { AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) { this.transactionalMessageCheckListener = transactionalMessageCheckListener; } + + public BlockingQueue getEndTransactionThreadPoolQueue() { + return endTransactionThreadPoolQueue; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index 0a8beca2c640d8ae32a1f8060de2df5a3c001cd6..a018f68f627f7a8c648a3b53d747bf17c44d25c7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -92,6 +92,9 @@ public class BrokerFastFailure { cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(), this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue()); + + cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this + .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue()); } void cleanExpiredRequestInQueue(final BlockingQueue blockingQueue, final long maxWaitTimeMillsInQueue) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 1a704a8c6bcc4ee207ce1d48b55b4e48081f10f6..356aafc46fb0637a7411c5042fc6170a33d2560c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1210,6 +1210,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { runtimeInfo.put("queryThreadPoolQueueCapacity", String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity())); + runtimeInfo.put("EndTransactionQueueSize", String.valueOf(this.brokerController.getEndTransactionThreadPoolQueue().size())); + runtimeInfo.put("EndTransactionThreadPoolQueueCapacity", + String.valueOf(this.brokerController.getBrokerConfig().getEndTransactionPoolQueueCapacity())); + runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes())); runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills())); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java index 59be7a7e26cd3fc3cb46dfd315209633fd601128..8b9b63e4dcbf8cdf2593b72bcda7c8a4e2af7f17 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java @@ -125,23 +125,25 @@ public class ServiceProvider { public static T loadClass(String name, Class clazz) { final InputStream is = getResourceAsStream(getContextClassLoader(), name); - BufferedReader reader; - try { + if (is != null) { + BufferedReader reader; try { - reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); - } catch (java.io.UnsupportedEncodingException e) { - reader = new BufferedReader(new InputStreamReader(is)); - } - String serviceName = reader.readLine(); - reader.close(); - if (serviceName != null && !"".equals(serviceName)) { - return initService(getContextClassLoader(), serviceName, clazz); - } else { - LOG.warn("ServiceName is empty!"); - return null; + try { + reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); + } catch (java.io.UnsupportedEncodingException e) { + reader = new BufferedReader(new InputStreamReader(is)); + } + String serviceName = reader.readLine(); + reader.close(); + if (serviceName != null && !"".equals(serviceName)) { + return initService(getContextClassLoader(), serviceName, clazz); + } else { + LOG.warn("ServiceName is empty!"); + return null; + } + } catch (Exception e) { + LOG.warn("Error occurred when looking for resource file " + name, e); } - } catch (Exception e) { - LOG.error("Error occured when looking for resource file " + name, e); } return null; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index e1d9f90429512a46ce9950a1ce31a873a63cd5b8..7ace9d5b07d13bb850e2c51bc3eb4d9ee9d517ab 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -16,6 +16,22 @@ */ package org.apache.rocketmq.client.impl.producer; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.common.ClientErrorCode; @@ -31,11 +47,13 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.latency.MQFaultStrategy; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.client.producer.TransactionSendResult; @@ -65,23 +83,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import java.util.concurrent.RejectedExecutionException; - -import java.io.IOException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; public class DefaultMQProducerImpl implements MQProducerInner { private final InternalLogger log = ClientLogger.getLog(); @@ -120,10 +121,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (producer.getExecutorService() != null) { this.checkExecutor = producer.getExecutorService(); } else { - this.checkRequestQueue = new LinkedBlockingQueue(2000); + this.checkRequestQueue = new LinkedBlockingQueue(producer.getCheckRequestHoldMax()); this.checkExecutor = new ThreadPoolExecutor( - 1, - 1, + producer.getCheckThreadPoolMinSize(), + producer.getCheckThreadPoolMaxSize(), 1000 * 60, TimeUnit.MILLISECONDS, this.checkRequestQueue); @@ -131,8 +132,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public void destroyTransactionEnv() { - this.checkExecutor.shutdown(); - this.checkRequestQueue.clear(); + if (this.checkExecutor != null) { + this.checkExecutor.shutdown(); + } } public void registerSendMessageHook(final SendMessageHook hook) { @@ -243,13 +245,27 @@ public class DefaultMQProducerImpl implements MQProducerInner { return null == prev || !prev.ok(); } + /** + * This method will be removed in the version 5.0.0 and getCheckListener is recommended. + * @return + */ + @Override + @Deprecated + public TransactionCheckListener checkListener() { + if (this.defaultMQProducer instanceof TransactionMQProducer) { + TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer; + return producer.getTransactionCheckListener(); + } + + return null; + } + @Override - public TransactionListener checkListener() { + public TransactionListener getCheckListener() { if (this.defaultMQProducer instanceof TransactionMQProducer) { TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer; return producer.getTransactionListener(); } - return null; } @@ -264,12 +280,20 @@ public class DefaultMQProducerImpl implements MQProducerInner { @Override public void run() { - TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); - if (transactionCheckListener != null) { + TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); + TransactionListener transactionListener = getCheckListener(); + if (transactionCheckListener != null || transactionListener != null) { LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable exception = null; try { - localTransactionState = transactionCheckListener.checkLocalTransaction(message); + if (transactionCheckListener != null) { + localTransactionState = transactionCheckListener.checkLocalTransactionState(message); + } else if (transactionListener != null) { + log.debug("Used new check API in transaction message"); + localTransactionState = transactionListener.checkLocalTransaction(message); + } else { + log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group); + } } catch (Throwable e) { log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); exception = e; @@ -280,7 +304,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { group, exception); } else { - log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group); + log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group); } } @@ -1096,9 +1120,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public TransactionSendResult sendMessageInTransaction(final Message msg, - final TransactionListener tranExecuter, final Object arg) + final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { - if (null == tranExecuter) { + TransactionListener transactionListener = getCheckListener(); + if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } Validators.checkMessage(msg, this.defaultMQProducer); @@ -1124,7 +1149,12 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } - localTransactionState = tranExecuter.executeLocalTransaction(msg, arg); + if (null != localTransactionExecuter) { + localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); + } else if (transactionListener != null) { + log.debug("Used new transaction API"); + localTransactionState = transactionListener.executeLocalTransaction(msg, arg); + } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java index 52ebe1b57d35a6988a8aef738c1a081082775c2a..acfd7b1f2c1da313e874e3bd5daf1b710b4db557 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java @@ -16,18 +16,19 @@ */ package org.apache.rocketmq.client.impl.producer; +import java.util.Set; +import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; -import java.util.Set; - public interface MQProducerInner { Set getPublishTopicList(); boolean isPublishTopicNeedUpdate(final String topic); - TransactionListener checkListener(); + TransactionCheckListener checkListener(); + TransactionListener getCheckListener(); void checkTransactionState( final String addr, diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 065f068c7a76764e4b971807b4c4766cc7b9ff87..9732d0eb84458062d203db0af55df8e37043b1a3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -464,16 +464,31 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * This method is to send transactional messages. * * @param msg Transactional message to send. + * @param tranExecuter local transaction executor. * @param arg Argument used along with local transaction executor. * @return Transaction result. * @throws MQClientException if there is any client error. */ @Override - public TransactionSendResult sendMessageInTransaction(Message msg, final Object arg) + public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, + final Object arg) throws MQClientException { throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); } + /** + * This method is used to send transactional messages. + * @param msg Transactional message to send. + * @param arg Argument used along with local transaction executor. + * @return Transaction result. + * @throws MQClientException + */ + @Override + public TransactionSendResult sendMessageInTransaction(Message msg, + Object arg) throws MQClientException { + throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); + } + /** * Create a topic on broker. * diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java new file mode 100644 index 0000000000000000000000000000000000000000..28789b91d54d83d9f474902008dc28fcbfbd96f0 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.producer; + +import org.apache.rocketmq.common.message.Message; + +/** + * This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended. + */ +@Deprecated +public interface LocalTransactionExecuter { + LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg); +} diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index 0776ee155cf62f0da241102fc655e15c6fdf277a..1af6005748992d2334e0833c415500a9fbe96a57 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -80,7 +80,11 @@ public interface MQProducer extends MQAdmin { void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, InterruptedException; - TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException; + TransactionSendResult sendMessageInTransaction(final Message msg, + final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException; + + TransactionSendResult sendMessageInTransaction(final Message msg, + final Object arg) throws MQClientException; //for batch SendResult send(final Collection msgs) throws MQClientException, RemotingException, MQBrokerException, diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java new file mode 100644 index 0000000000000000000000000000000000000000..2d7cf5819f6a13eebb1ddf6e9f525507f6709b4a --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.producer; + +import org.apache.rocketmq.common.message.MessageExt; +/** + * This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended. + */ +@Deprecated +public interface TransactionCheckListener { + LocalTransactionState checkLocalTransactionState(final MessageExt msg); +} diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionListener.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionListener.java index c750e53845acdc5096905706ab5a36edbc4e49bf..233af69bc1dcee2bc929067c6cafb1eb876a613f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionListener.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionListener.java @@ -37,4 +37,4 @@ public interface TransactionListener { * @return Transaction state */ LocalTransactionState checkLocalTransaction(final MessageExt msg); -} +} \ No newline at end of file diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java index c4f122c58483cb539736962a2666d8560bc1234c..8f6428b29c2ae700547d0b082a8d13d608e35bd1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java @@ -16,17 +16,21 @@ */ package org.apache.rocketmq.client.producer; +import java.util.concurrent.ExecutorService; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; -import java.util.concurrent.ExecutorService; - public class TransactionMQProducer extends DefaultMQProducer { - private TransactionListener transactionListener; + private TransactionCheckListener transactionCheckListener; + private int checkThreadPoolMinSize = 1; + private int checkThreadPoolMaxSize = 1; + private int checkRequestHoldMax = 2000; private ExecutorService executorService; + private TransactionListener transactionListener; + public TransactionMQProducer() { } @@ -50,21 +54,77 @@ public class TransactionMQProducer extends DefaultMQProducer { this.defaultMQProducerImpl.destroyTransactionEnv(); } + /** + * This method will be removed in the version 5.0.0, method sendMessageInTransaction(Message,Object)} + * is recommended. + */ + @Override + @Deprecated + public TransactionSendResult sendMessageInTransaction(final Message msg, + final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { + if (null == this.transactionCheckListener) { + throw new MQClientException("localTransactionBranchCheckListener is null", null); + } + + return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg); + } + @Override - public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { + public TransactionSendResult sendMessageInTransaction(final Message msg, + final Object arg) throws MQClientException { if (null == this.transactionListener) { throw new MQClientException("TransactionListener is null", null); } - return this.defaultMQProducerImpl.sendMessageInTransaction(msg, transactionListener, arg); + return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); } - public TransactionListener getTransactionListener() { - return transactionListener; + public TransactionCheckListener getTransactionCheckListener() { + return transactionCheckListener; } - public void setTransactionListener(TransactionListener transactionListener) { - this.transactionListener = transactionListener; + /** + * This method will be removed in the version 5.0.0 and set a custom thread pool is recommended. + */ + @Deprecated + public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) { + this.transactionCheckListener = transactionCheckListener; + } + + public int getCheckThreadPoolMinSize() { + return checkThreadPoolMinSize; + } + + /** + * This method will be removed in the version 5.0.0 and set a custom thread pool is recommended. + */ + @Deprecated + public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) { + this.checkThreadPoolMinSize = checkThreadPoolMinSize; + } + + public int getCheckThreadPoolMaxSize() { + return checkThreadPoolMaxSize; + } + + /** + * This method will be removed in the version 5.0.0 and set a custom thread pool is recommended. + */ + @Deprecated + public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) { + this.checkThreadPoolMaxSize = checkThreadPoolMaxSize; + } + + public int getCheckRequestHoldMax() { + return checkRequestHoldMax; + } + + /** + * This method will be removed in the version 5.0.0 and set a custom thread pool is recommended. + */ + @Deprecated + public void setCheckRequestHoldMax(int checkRequestHoldMax) { + this.checkRequestHoldMax = checkRequestHoldMax; } public ExecutorService getExecutorService() { @@ -74,4 +134,12 @@ public class TransactionMQProducer extends DefaultMQProducer { public void setExecutorService(ExecutorService executorService) { this.executorService = executorService; } + + public TransactionListener getTransactionListener() { + return transactionListener; + } + + public void setTransactionListener(TransactionListener transactionListener) { + this.transactionListener = transactionListener; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 442f456aa41e0b551b5b93557711243b3ddb7dad..f81af21651581b9a442c2eef24174ac970f7b4a3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -63,7 +63,12 @@ public class BrokerConfig { private int adminBrokerThreadPoolNums = 16; private int clientManageThreadPoolNums = 32; private int consumerManageThreadPoolNums = 32; - private int heartbeatThreadPoolNums = Math.min(32,Runtime.getRuntime().availableProcessors()); + private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors()); + + /** + * Thread numbers for EndTransactionProcessor + */ + private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2; private int flushConsumerOffsetInterval = 1000 * 5; @@ -79,6 +84,7 @@ public class BrokerConfig { private int clientManagerThreadPoolQueueCapacity = 1000000; private int consumerManagerThreadPoolQueueCapacity = 1000000; private int heartbeatThreadPoolQueueCapacity = 50000; + private int endTransactionPoolQueueCapacity = 100000; private int filterServerNums = 0; @@ -111,6 +117,7 @@ public class BrokerConfig { private long waitTimeMillsInSendQueue = 200; private long waitTimeMillsInPullQueue = 5 * 1000; private long waitTimeMillsInHeartbeatQueue = 31 * 1000; + private long waitTimeMillsInTransactionQueue = 3 * 1000; private long startAcceptSendRequestTimeStamp = 0L; @@ -150,13 +157,13 @@ public class BrokerConfig { * that can be checked. */ @ImportantField - private long transactionTimeOut = 3 * 1000; + private long transactionTimeOut = 6 * 1000; /** * The maximum number of times the message was checked, if exceed this value, this message will be discarded. */ @ImportantField - private int transactionCheckMax = 5; + private int transactionCheckMax = 15; /** * Transaction message check interval. @@ -701,4 +708,28 @@ public class BrokerConfig { public void setTransactionCheckInterval(long transactionCheckInterval) { this.transactionCheckInterval = transactionCheckInterval; } + + public int getEndTransactionThreadPoolNums() { + return endTransactionThreadPoolNums; + } + + public void setEndTransactionThreadPoolNums(int endTransactionThreadPoolNums) { + this.endTransactionThreadPoolNums = endTransactionThreadPoolNums; + } + + public int getEndTransactionPoolQueueCapacity() { + return endTransactionPoolQueueCapacity; + } + + public void setEndTransactionPoolQueueCapacity(int endTransactionPoolQueueCapacity) { + this.endTransactionPoolQueueCapacity = endTransactionPoolQueueCapacity; + } + + public long getWaitTimeMillsInTransactionQueue() { + return waitTimeMillsInTransactionQueue; + } + + public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) { + this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/MQVersion.java b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java index 96a2329cc30685b25e7ef5c68ecb614b1bc621a9..6c2712d6116d4d67d30857de8e75ddeac26ed14f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MQVersion.java +++ b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java @@ -18,7 +18,7 @@ package org.apache.rocketmq.common; public class MQVersion { - public static final int CURRENT_VERSION = Version.V4_3_0.ordinal(); + public static final int CURRENT_VERSION = Version.V4_3_1.ordinal(); public static String getVersionDesc(int value) { int length = Version.values().length; diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index ae97cc97d86eccda233f6c49077d75ed071aba18..20d186764ebccd87e148f4ca80ce2d0a6876b234 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -57,7 +57,7 @@ public class MixAll { public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr"); //http://jmenv.tbsite.net:8080/rocketmq/nsaddr //public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP; - public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "AUTO_CREATE_TOPIC_KEY"; // Will be created at broker when isAutoCreateTopicEnable + public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; // Will be created at broker when isAutoCreateTopicEnable public static final String BENCHMARK_TOPIC = "BenchmarkTest"; public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER"; public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER"; diff --git a/distribution/bin/mqfiltersrv b/distribution/bin/mqfiltersrv deleted file mode 100644 index 2fd0cbea8c525e400613674c06ffa1a8852c19ff..0000000000000000000000000000000000000000 --- a/distribution/bin/mqfiltersrv +++ /dev/null @@ -1,45 +0,0 @@ -#!/bin/sh - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -if [ -z "$ROCKETMQ_HOME" ] ; then - ## resolve links - $0 may be a link to maven's home - PRG="$0" - - # need this for relative symlinks - while [ -h "$PRG" ] ; do - ls=`ls -ld "$PRG"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - PRG="$link" - else - PRG="`dirname "$PRG"`/$link" - fi - done - - saveddir=`pwd` - - ROCKETMQ_HOME=`dirname "$PRG"`/.. - - # make it fully qualified - ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd` - - cd "$saveddir" -fi - -export ROCKETMQ_HOME - -sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.filtersrv.FiltersrvStartup $@ diff --git a/distribution/bin/mqfiltersrv.cmd b/distribution/bin/mqfiltersrv.cmd deleted file mode 100644 index 0503026a3976ca7eb63b48cd6f7e52f9eee2f80d..0000000000000000000000000000000000000000 --- a/distribution/bin/mqfiltersrv.cmd +++ /dev/null @@ -1,23 +0,0 @@ -@echo off -rem Licensed to the Apache Software Foundation (ASF) under one or more -rem contributor license agreements. See the NOTICE file distributed with -rem this work for additional information regarding copyright ownership. -rem The ASF licenses this file to You under the Apache License, Version 2.0 -rem (the "License"); you may not use this file except in compliance with -rem the License. You may obtain a copy of the License at -rem -rem http://www.apache.org/licenses/LICENSE-2.0 -rem -rem Unless required by applicable law or agreed to in writing, software -rem distributed under the License is distributed on an "AS IS" BASIS, -rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -rem See the License for the specific language governing permissions and -rem limitations under the License. - -if not exist "%ROCKETMQ_HOME%\bin\runbroker.cmd" echo Please set the ROCKETMQ_HOME variable in your environment! & EXIT /B 1 - -call "%ROCKETMQ_HOME%\bin\runserver.cmd" org.apache.rocketmq.filtersrv.FiltersrvStartup %* - -IF %ERRORLEVEL% EQU 0 ( - ECHO "Filtersrv starts OK" -) \ No newline at end of file diff --git a/distribution/bin/mqfiltersrv.xml b/distribution/bin/mqfiltersrv.xml deleted file mode 100644 index dc36a8d8c1e21899e0d5f84ac10d6dfb2fa56287..0000000000000000000000000000000000000000 --- a/distribution/bin/mqfiltersrv.xml +++ /dev/null @@ -1,45 +0,0 @@ - - - - false - - ${JAVA_HOME} - - server - - org.apache.rocketmq.filtersrv.FiltersrvStartup - - - ${cpd}/../lib - ${cpd}/.. - - - - - - - <-Xms512m> - - <-Xmx1g> - -<-XX:NewSize>256M -<-XX:MaxNewSize>512M -<-XX:PermSize>128M -<-XX:MaxPermSize>128M - - diff --git a/distribution/conf/logback_filtersrv.xml b/distribution/conf/logback_filtersrv.xml deleted file mode 100644 index 71b9a939130a97adbb674f28d0abb2f2f43555ce..0000000000000000000000000000000000000000 --- a/distribution/conf/logback_filtersrv.xml +++ /dev/null @@ -1,87 +0,0 @@ - - - - - - ${user.home}/logs/rocketmqlogs/filtersrv_default.log - true - - ${user.home}/logs/rocketmqlogs/otherdays/filtersrv_default.%i.log.gz - 1 - 5 - - - 100MB - - - %d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n - UTF-8 - - - - - ${user.home}/logs/rocketmqlogs/filtersrv.log - true - - ${user.home}/logs/rocketmqlogs/otherdays/filtersrv.%i.log.gz - 1 - 5 - - - 100MB - - - %d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n - UTF-8 - - - - - 0 - - - - true - - %d{yyy-MM-dd HH\:mm\:ss,SSS} %p %t - %m%n - UTF-8 - - - - - - - - - - - - - - - - - - - - - - - diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java index 2d8a5fead374ddaec1c23ee4c7bc52af367a4755..d9fafdd08e88ab4fcdad9226698f573db2ca7553 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java @@ -17,15 +17,6 @@ package org.apache.rocketmq.example.benchmark; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.LocalTransactionState; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.TransactionListener; -import org.apache.rocketmq.client.producer.TransactionMQProducer; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.remoting.common.RemotingHelper; - import java.io.UnsupportedEncodingException; import java.util.LinkedList; import java.util.Timer; @@ -33,18 +24,27 @@ import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.common.RemotingHelper; public class TransactionProducer { private static int threadCount; private static int messageSize; - private static boolean isCheck; - private static boolean isCheckFalse; + private static boolean ischeck; + private static boolean ischeckffalse; public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32; messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2; - isCheck = args.length >= 3 && Boolean.parseBoolean(args[2]); - isCheckFalse = args.length >= 4 && Boolean.parseBoolean(args[3]); + ischeck = args.length >= 3 && Boolean.parseBoolean(args[2]); + ischeckffalse = args.length >= 4 && Boolean.parseBoolean(args[3]); final Message msg = buildMessage(messageSize); @@ -73,8 +73,8 @@ public class TransactionProducer { Long[] end = snapshotList.getLast(); final long sendTps = - (long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L); - final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]); + (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); + final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); System.out.printf( "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n", @@ -92,14 +92,16 @@ public class TransactionProducer { } }, 10000, 10000); - final TransactionListener transactionListener = - new TransactionListenerImpl(isCheckFalse, isCheck, statsBenchmark); + final TransactionCheckListener transactionCheckListener = + new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark); final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer"); producer.setInstanceName(Long.toString(System.currentTimeMillis())); - producer.setTransactionListener(transactionListener); + producer.setTransactionCheckListener(transactionCheckListener); producer.setDefaultTopicQueueNums(1000); producer.start(); + final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck); + for (int i = 0; i < threadCount; i++) { sendThreadPool.execute(new Runnable() { @Override @@ -109,7 +111,7 @@ public class TransactionProducer { // Thread.sleep(1000); final long beginTimestamp = System.currentTimeMillis(); SendResult sendResult = - producer.sendMessageInTransaction(msg, null); + producer.sendMessageInTransaction(msg, tranExecuter, null); if (sendResult != null) { statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); @@ -122,7 +124,8 @@ public class TransactionProducer { boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT); - if (updated) { break; } + if (updated) + break; prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); } @@ -150,37 +153,43 @@ public class TransactionProducer { } } +class TransactionExecuterBImpl implements LocalTransactionExecuter { -class TransactionListenerImpl implements TransactionListener { - private boolean isCheckFalse; + private boolean ischeck; + + public TransactionExecuterBImpl(boolean ischeck) { + this.ischeck = ischeck; + } + + @Override + public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { + if (ischeck) { + return LocalTransactionState.UNKNOW; + } + return LocalTransactionState.COMMIT_MESSAGE; + } +} + +class TransactionCheckListenerBImpl implements TransactionCheckListener { + private boolean ischeckffalse; private StatsBenchmarkTProducer statsBenchmarkTProducer; - private boolean isCheckLocal; - public TransactionListenerImpl(boolean isCheckFalse, boolean isCheckLocal, - StatsBenchmarkTProducer statsBenchmarkTProducer) { - this.isCheckFalse = isCheckFalse; - this.isCheckLocal = isCheckLocal; + public TransactionCheckListenerBImpl(boolean ischeckffalse, + StatsBenchmarkTProducer statsBenchmarkTProducer) { + this.ischeckffalse = ischeckffalse; this.statsBenchmarkTProducer = statsBenchmarkTProducer; } @Override - public LocalTransactionState checkLocalTransaction(MessageExt msg) { + public LocalTransactionState checkLocalTransactionState(MessageExt msg) { statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet(); - if (isCheckFalse) { + if (ischeckffalse) { return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } - - @Override - public LocalTransactionState executeLocalTransaction(final Message msg, final Object arg) { - if (isCheckLocal) { - return LocalTransactionState.UNKNOW; - } - return LocalTransactionState.COMMIT_MESSAGE; - } } class StatsBenchmarkTProducer { diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionListenerImpl.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionListenerImpl.java index ce471d2ce920cf5ea42605e92c8c14745cf282ff..cb066d21d396b9644b993f175b9bd214424aea9e 100644 --- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionListenerImpl.java +++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionListenerImpl.java @@ -48,6 +48,8 @@ public class TransactionListenerImpl implements TransactionListener { return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; + default: + return LocalTransactionState.COMMIT_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java index 6bf91840aeb48df6e9b9a093201d06c7732b1bfd..8aed59ea449066d048f05081e924484a5421c71f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java @@ -131,7 +131,7 @@ public class ConsumeMessageCommand implements SubCommand { try { /* Group name must be set before consumer start */ if (commandLine.hasOption('g')) { - String consumerGroup = commandLine.getOptionValue('b').trim(); + String consumerGroup = commandLine.getOptionValue('g').trim(); defaultMQPullConsumer.setConsumerGroup(consumerGroup); }