From 45043e2571609347b37641c1683d14e86eeee873 Mon Sep 17 00:00:00 2001 From: huangying Date: Fri, 18 Oct 2019 12:13:30 +0800 Subject: [PATCH] =?UTF-8?q?[#1540]=204.5.2=E7=89=88=E6=9C=AC=20=E4=BA=8B?= =?UTF-8?q?=E5=8A=A1=E6=B6=88=E6=81=AF=E8=AE=BE=E7=BD=AE=E5=BB=B6=E8=BF=9F?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E5=90=8E=E5=AF=BC=E8=87=B4RMQ=5FSYS=5FTRANS?= =?UTF-8?q?=5FHALF=5FTOPIC=E7=88=86=E6=8E=89=E7=9A=84=E9=97=AE=E9=A2=98?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/producer/DefaultMQProducerImpl.java | 6 ++ .../producer/TransactionMQProducerTest.java | 94 +++++++++++++++++++ 2 files changed, 100 insertions(+) create mode 100644 client/src/test/java/org/apache/rocketmq/client/producer/TransactionMQProducerTest.java diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 62aaef3b..8e3c331a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1179,6 +1179,12 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } + + // 事务消息屏蔽DelayTimeLevel参数 + if(msg.getDelayTimeLevel() != 0) { + MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); + } + Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/TransactionMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/TransactionMQProducerTest.java new file mode 100644 index 00000000..007c15bf --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/producer/TransactionMQProducerTest.java @@ -0,0 +1,94 @@ +package org.apache.rocketmq.client.producer; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @program: rocketmq + * @description: 事务消息测试类 + * @author: TonyStark888 + * @create: 2019-10-18 11:22 + **/ +public class TransactionMQProducerTest { + private TransactionMQProducer producer; + private String producerGroupPrefix = "Transaction_PID"; + + @Before + public void init() throws Exception { + String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); + producer = new TransactionMQProducer(producerGroupTemp); + producer.setNamesrvAddr("10.0.133.29:9876"); + producer.setTransactionListener(new DelayTimeLevelTransactionListener()); + producer.start(); + } + + @After + public void terminate() { + producer.shutdown(); + } + + @Test + public void testSendMessage() throws InterruptedException, RemotingException, MQBrokerException { + try { + Message message = new Message("TransactionTopic", "transactionTest", "msg-1", ("HelloTime").getBytes()); + SendResult result = producer.sendMessageInTransaction(message, "HelloTime"); + System.out.printf("Topic:%s send success, misId is:%s%n", message.getTopic(), result.getMsgId()); + + // 挂起5分钟,等待事务回查 + Thread.sleep(5 * 60 * 1000L); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("Transaction Message Send Error"); + } + } + + @Test + public void testSendMessage_DelayTimeLevel() throws RemotingException, InterruptedException, MQBrokerException { + try { + Message message = new Message("TransactionTopic", "transactionTest", "msg-1", ("HelloDelayTime").getBytes()); + message.setDelayTimeLevel(1); + SendResult result = producer.sendMessageInTransaction(message, "HelloDelayTime"); + System.out.printf("Topic:%s send success, misId is:%s%n", message.getTopic(), result.getMsgId()); + + // 挂起5分钟,等待事务回查 + Thread.sleep(5 * 60 * 1000L); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("Transaction Message Send Error"); + } + } +} + +/** + * 简便起见,直接返回Commit + */ +class DelayTimeLevelTransactionListener implements TransactionListener { + /** + * 执行本地事务 + * + * @param message + * @param o + * @return + */ + @Override + public LocalTransactionState executeLocalTransaction(Message message, Object o) { + return LocalTransactionState.COMMIT_MESSAGE; + } + + /** + * 回查本地事务结果 + * + * @param messageExt + * @return + */ + @Override + public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { + return LocalTransactionState.COMMIT_MESSAGE; + } +} -- GitLab