From 9f4934fc9846ed151aa6bb20b7e971ee9a043a32 Mon Sep 17 00:00:00 2001 From: Shannon Date: Tue, 15 May 2018 16:59:27 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#66]=20duplicate=20compress=20message?= =?UTF-8?q?=20body=20if=20retry=20to=20send=20msg=20when=E2=80=A6=20(#294)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix issue : duplicate compress message body if retry to send msg when exception occurs in async sending. --- .../rocketmq/client/impl/MQClientAPIImpl.java | 1 + .../impl/producer/DefaultMQProducerImpl.java | 13 ++++- .../producer/DefaultMQProducerTest.java | 49 +++++++++++++++++++ .../common/message/MessageAccessor.java | 7 +++ 4 files changed, 69 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index b0777849..d4ed1ec4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 9dd8ee3c..81461f5e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -607,8 +607,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { } int sysFlag = 0; + boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; + msgBodyCompressed = true; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); @@ -678,10 +680,19 @@ public class DefaultMQProducerImpl implements MQProducerInner { SendResult sendResult = null; switch (communicationMode) { case ASYNC: + Message tmpMessage = msg; + if (msgBodyCompressed) { + //If msg body was compressed, msgbody should be reset using prevBody. + //Clone new message using commpressed message body and recover origin massage. + //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 + tmpMessage = MessageAccessor.cloneMessage(msg); + msg.setBody(prevBody); + } + sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), - msg, + tmpMessage, requestHeader, timeout, communicationMode, diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index ded22ada..d3c6cc8b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -69,6 +69,7 @@ public class DefaultMQProducerTest { private DefaultMQProducer producer; private Message message; private Message zeroMsg; + private Message bigMessage; private String topic = "FooBar"; private String producerGroupPrefix = "FooBar_PID"; @@ -77,8 +78,10 @@ public class DefaultMQProducerTest { String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); producer = new DefaultMQProducer(producerGroupTemp); producer.setNamesrvAddr("127.0.0.1:9876"); + producer.setCompressMsgBodyOverHowmuch(16); message = new Message(topic, new byte[] {'a'}); zeroMsg = new Message(topic, new byte[] {}); + bigMessage = new Message(topic, "This is a very huge message!".getBytes()); producer.start(); @@ -146,6 +149,52 @@ public class DefaultMQProducerTest { assertThat(sendResult.getQueueOffset()).isEqualTo(456L); } + @Test + public void testSendMessageSync_WithBodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + SendResult sendResult = producer.send(bigMessage); + + assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); + assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); + assertThat(sendResult.getQueueOffset()).isEqualTo(456L); + } + + @Test + public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + producer.send(message, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); + assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); + assertThat(sendResult.getQueueOffset()).isEqualTo(456L); + } + + @Override + public void onException(Throwable e) { + } + }); + + } + + @Test + public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + producer.send(bigMessage, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); + assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); + assertThat(sendResult.getQueueOffset()).isEqualTo(456L); + } + + @Override + public void onException(Throwable e) { + } + }); + + } + @Test public void testSendMessageSync_SuccessWithHook() throws Throwable { when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java index 4cac404b..1b7e2bba 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java @@ -89,4 +89,11 @@ public class MessageAccessor { return msg.getProperty(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP); } + public static Message cloneMessage(final Message msg) { + Message newMsg = new Message(msg.getTopic(), msg.getBody()); + newMsg.setFlag(msg.getFlag()); + newMsg.setProperties(msg.getProperties()); + return newMsg; + } + } -- GitLab