提交 45043e25 编写于 作者: H huangying

[#1540] 4.5.2版本 事务消息设置延迟时间后导致RMQ_SYS_TRANS_HALF_TOPIC爆掉的问题修复

上级 034bebc1
......@@ -1179,6 +1179,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// 事务消息屏蔽DelayTimeLevel参数
if(msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
......
package org.apache.rocketmq.client.producer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @program: rocketmq
* @description: 事务消息测试类
* @author: TonyStark888
* @create: 2019-10-18 11:22
**/
public class TransactionMQProducerTest {
private TransactionMQProducer producer;
private String producerGroupPrefix = "Transaction_PID";
@Before
public void init() throws Exception {
String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
producer = new TransactionMQProducer(producerGroupTemp);
producer.setNamesrvAddr("10.0.133.29:9876");
producer.setTransactionListener(new DelayTimeLevelTransactionListener());
producer.start();
}
@After
public void terminate() {
producer.shutdown();
}
@Test
public void testSendMessage() throws InterruptedException, RemotingException, MQBrokerException {
try {
Message message = new Message("TransactionTopic", "transactionTest", "msg-1", ("HelloTime").getBytes());
SendResult result = producer.sendMessageInTransaction(message, "HelloTime");
System.out.printf("Topic:%s send success, misId is:%s%n", message.getTopic(), result.getMsgId());
// 挂起5分钟,等待事务回查
Thread.sleep(5 * 60 * 1000L);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("Transaction Message Send Error");
}
}
@Test
public void testSendMessage_DelayTimeLevel() throws RemotingException, InterruptedException, MQBrokerException {
try {
Message message = new Message("TransactionTopic", "transactionTest", "msg-1", ("HelloDelayTime").getBytes());
message.setDelayTimeLevel(1);
SendResult result = producer.sendMessageInTransaction(message, "HelloDelayTime");
System.out.printf("Topic:%s send success, misId is:%s%n", message.getTopic(), result.getMsgId());
// 挂起5分钟,等待事务回查
Thread.sleep(5 * 60 * 1000L);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("Transaction Message Send Error");
}
}
}
/**
* 简便起见,直接返回Commit
*/
class DelayTimeLevelTransactionListener implements TransactionListener {
/**
* 执行本地事务
*
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
return LocalTransactionState.COMMIT_MESSAGE;
}
/**
* 回查本地事务结果
*
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return LocalTransactionState.COMMIT_MESSAGE;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册