diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 8d9bcd207f3a75da60f2865fb17d2dd289dd759d..d3ecf50db4cdcb88f4eac89dca53c16d8869fa85 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -54,6 +54,7 @@ import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.common.MixAll; @@ -120,7 +121,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (producer.getExecutorService() != null) { this.checkExecutor = producer.getExecutorService(); } else { - this.checkRequestQueue = new LinkedBlockingQueue(2000); + this.checkRequestQueue = new LinkedBlockingQueue(producer.getCheckRequestHoldMax()); this.checkExecutor = new ThreadPoolExecutor( producer.getCheckThreadPoolMinSize(), producer.getCheckThreadPoolMaxSize(), @@ -244,7 +245,12 @@ public class DefaultMQProducerImpl implements MQProducerInner { return null == prev || !prev.ok(); } + /** + * This method will be removed in the version 5.0.0 and getCheckListener is recommended. + * @return + */ @Override + @Deprecated public TransactionCheckListener checkListener() { if (this.defaultMQProducer instanceof TransactionMQProducer) { TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer; @@ -255,7 +261,15 @@ public class DefaultMQProducerImpl implements MQProducerInner { } @Override + public TransactionListener getCheckListener() { + if (this.defaultMQProducer instanceof TransactionMQProducer) { + TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer; + return producer.getTransactionListener(); + } + return null; + } + @Override public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) { Runnable request = new Runnable() { @@ -267,11 +281,19 @@ public class DefaultMQProducerImpl implements MQProducerInner { @Override public void run() { TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); - if (transactionCheckListener != null) { + TransactionListener transactionListener = getCheckListener(); + if (transactionCheckListener != null || transactionListener != null) { LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable exception = null; try { - localTransactionState = transactionCheckListener.checkLocalTransactionState(message); + if (transactionCheckListener != null) { + localTransactionState = transactionCheckListener.checkLocalTransactionState(message); + } else if (transactionListener != null) { + log.info("Used new check API in transaction message"); + localTransactionState = transactionListener.checkLocalTransaction(message); + } else { + log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group); + } } catch (Throwable e) { log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); exception = e; @@ -282,7 +304,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { group, exception); } else { - log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group); + log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group); } } @@ -1100,7 +1122,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { - if (null == localTransactionExecuter) { + TransactionListener transactionListener = getCheckListener(); + if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } Validators.checkMessage(msg, this.defaultMQProducer); @@ -1126,7 +1149,12 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } - localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); + if (null != localTransactionExecuter) { + localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); + } else if (transactionListener != null) { + log.info("Used new transaction API"); + transactionListener.executeLocalTransaction(msg, arg); + } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java index dfd485dd90909573eac012ff1e94f08f82d8e529..acfd7b1f2c1da313e874e3bd5daf1b710b4db557 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.client.impl.producer; import java.util.Set; import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; @@ -27,6 +28,7 @@ public interface MQProducerInner { boolean isPublishTopicNeedUpdate(final String topic); TransactionCheckListener checkListener(); + TransactionListener getCheckListener(); void checkTransactionState( final String addr, diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 267dbe8f57b03f458b7dc1e17a358770bba53185..9732d0eb84458062d203db0af55df8e37043b1a3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -476,6 +476,19 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); } + /** + * This method is used to send transactional messages. + * @param msg Transactional message to send. + * @param arg Argument used along with local transaction executor. + * @return Transaction result. + * @throws MQClientException + */ + @Override + public TransactionSendResult sendMessageInTransaction(Message msg, + Object arg) throws MQClientException { + throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); + } + /** * Create a topic on broker. * diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java index 80b5546925fee85484151985aee052107f59e49e..28789b91d54d83d9f474902008dc28fcbfbd96f0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java @@ -18,6 +18,10 @@ package org.apache.rocketmq.client.producer; import org.apache.rocketmq.common.message.Message; +/** + * This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended. + */ +@Deprecated public interface LocalTransactionExecuter { LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index 14caf6ffac9565bcac2c8ff0f8f3e6ba5cd13af8..1af6005748992d2334e0833c415500a9fbe96a57 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -83,6 +83,9 @@ public interface MQProducer extends MQAdmin { TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException; + TransactionSendResult sendMessageInTransaction(final Message msg, + final Object arg) throws MQClientException; + //for batch SendResult send(final Collection msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java index 1cf5c4d90a5147b6e9e57c9d055d842b1edc7fae..2d7cf5819f6a13eebb1ddf6e9f525507f6709b4a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java @@ -17,7 +17,10 @@ package org.apache.rocketmq.client.producer; import org.apache.rocketmq.common.message.MessageExt; - +/** + * This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended. + */ +@Deprecated public interface TransactionCheckListener { LocalTransactionState checkLocalTransactionState(final MessageExt msg); } diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/LocalTransactionExecuterImpl.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionListener.java similarity index 50% rename from example/src/main/java/org/apache/rocketmq/example/transaction/LocalTransactionExecuterImpl.java rename to client/src/main/java/org/apache/rocketmq/client/producer/TransactionListener.java index a3b363fd61b65f2356cbdd67fca237147309953d..233af69bc1dcee2bc929067c6cafb1eb876a613f 100644 --- a/example/src/main/java/org/apache/rocketmq/example/transaction/LocalTransactionExecuterImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionListener.java @@ -14,14 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.example.transaction; +package org.apache.rocketmq.client.producer; -import org.apache.rocketmq.client.producer.LocalTransactionExecuter; -import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; -public class LocalTransactionExecuterImpl implements LocalTransactionExecuter { - @Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { - return LocalTransactionState.UNKNOW; - } -} +public interface TransactionListener { + /** + * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction. + * + * @param msg Half(prepare) message + * @param arg Custom business parameter + * @return Transaction state + */ + LocalTransactionState executeLocalTransaction(final Message msg, final Object arg); + + /** + * When no response to prepare(half) message. broker will send check message to check the transaction status, and this + * method will be invoked to get local transaction status. + * + * @param msg Check message + * @return Transaction state + */ + LocalTransactionState checkLocalTransaction(final MessageExt msg); +} \ No newline at end of file diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java index 14fae65a71ef67fc6fc8f4ec51dc57357f3fd784..9fe26eb8b39822f6cebed9e4c81b187331733323 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java @@ -17,8 +17,11 @@ package org.apache.rocketmq.client.producer; import java.util.concurrent.ExecutorService; +import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.remoting.RPCHook; public class TransactionMQProducer extends DefaultMQProducer { @@ -29,6 +32,9 @@ public class TransactionMQProducer extends DefaultMQProducer { private ExecutorService executorService; + private TransactionListener transactionListener; + + public TransactionMQProducer() { } @@ -52,7 +58,12 @@ public class TransactionMQProducer extends DefaultMQProducer { this.defaultMQProducerImpl.destroyTransactionEnv(); } + + /** + * This method will be removed in the version 5.0.0, method sendMessageInTransaction(Message,Object)} is recommended. + */ @Override + @Deprecated public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { if (null == this.transactionCheckListener) { @@ -62,10 +73,23 @@ public class TransactionMQProducer extends DefaultMQProducer { return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg); } + @Override + public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { + if (null == this.transactionListener) { + throw new MQClientException("TransactionListener is null", null); + } + + return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); + } + public TransactionCheckListener getTransactionCheckListener() { return transactionCheckListener; } + /** + * This method will be removed in the version 5.0.0 and set a custom thread pool is recommended. + */ + @Deprecated public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) { this.transactionCheckListener = transactionCheckListener; } @@ -74,6 +98,10 @@ public class TransactionMQProducer extends DefaultMQProducer { return checkThreadPoolMinSize; } + /** + * This method will be removed in the version 5.0.0 and set a custom thread pool is recommended. + */ + @Deprecated public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) { this.checkThreadPoolMinSize = checkThreadPoolMinSize; } @@ -82,6 +110,10 @@ public class TransactionMQProducer extends DefaultMQProducer { return checkThreadPoolMaxSize; } + /** + * This method will be removed in the version 5.0.0 and set a custom thread pool is recommended. + */ + @Deprecated public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) { this.checkThreadPoolMaxSize = checkThreadPoolMaxSize; } @@ -90,6 +122,10 @@ public class TransactionMQProducer extends DefaultMQProducer { return checkRequestHoldMax; } + /** + * This method will be removed in the version 5.0.0 and set a custom thread pool is recommended. + */ + @Deprecated public void setCheckRequestHoldMax(int checkRequestHoldMax) { this.checkRequestHoldMax = checkRequestHoldMax; } @@ -101,4 +137,12 @@ public class TransactionMQProducer extends DefaultMQProducer { public void setExecutorService(ExecutorService executorService) { this.executorService = executorService; } + + public TransactionListener getTransactionListener() { + return transactionListener; + } + + public void setTransactionListener(TransactionListener transactionListener) { + this.transactionListener = transactionListener; + } } diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionListenerImpl.java similarity index 65% rename from example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java rename to example/src/main/java/org/apache/rocketmq/example/transaction/TransactionListenerImpl.java index fcab51936c2e6a21c1e7c5b01984ee2cecf1d7e0..cb066d21d396b9644b993f175b9bd214424aea9e 100644 --- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java +++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionListenerImpl.java @@ -16,18 +16,30 @@ */ package org.apache.rocketmq.example.transaction; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.producer.LocalTransactionState; -import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; -public class TransactionCheckListenerImpl implements TransactionCheckListener { +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class TransactionListenerImpl implements TransactionListener { + private AtomicInteger transactionIndex = new AtomicInteger(0); - private AtomicInteger localTrans = new AtomicInteger(0); + private ConcurrentHashMap localTrans = new ConcurrentHashMap<>(); + + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + int value = transactionIndex.getAndIncrement(); + int status = value % 3; + localTrans.put(msg.getTransactionId(), status); + return LocalTransactionState.UNKNOW; + } @Override - public LocalTransactionState checkLocalTransactionState(MessageExt msg) { - Integer status = localTrans.getAndIncrement() % 3; + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java index a4c3f351f5a453d5294d5207afe097362698003c..75c805b96987773cda3e885bdcaf4e247f556233 100644 --- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java @@ -17,9 +17,8 @@ package org.apache.rocketmq.example.transaction; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; @@ -33,7 +32,7 @@ import java.util.concurrent.TimeUnit; public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { - TransactionCheckListener transactionListener = new TransactionCheckListenerImpl(); + TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() { @Override @@ -43,9 +42,9 @@ public class TransactionProducer { return thread; } }); - LocalTransactionExecuter localTransactionExecuter = new LocalTransactionExecuterImpl(); + producer.setExecutorService(executorService); - producer.setTransactionCheckListener(transactionListener); + producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; @@ -54,7 +53,7 @@ public class TransactionProducer { Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); - SendResult sendResult = producer.sendMessageInTransaction(msg, localTransactionExecuter,null); + SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10);