diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index b07778499b70a2055216c42b01f17cc34b0ea6dc..d4ed1ec4df15c55594c3c8d68ea9c059528658e3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 9dd8ee3ce0aafd71e00615698590c6f24fa6a3d0..81461f5e9b2c0d6e43f764881cb6eb542d7241bb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -607,8 +607,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { } int sysFlag = 0; + boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; + msgBodyCompressed = true; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); @@ -678,10 +680,19 @@ public class DefaultMQProducerImpl implements MQProducerInner { SendResult sendResult = null; switch (communicationMode) { case ASYNC: + Message tmpMessage = msg; + if (msgBodyCompressed) { + //If msg body was compressed, msgbody should be reset using prevBody. + //Clone new message using commpressed message body and recover origin massage. + //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 + tmpMessage = MessageAccessor.cloneMessage(msg); + msg.setBody(prevBody); + } + sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), - msg, + tmpMessage, requestHeader, timeout, communicationMode, diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index ded22ada914b163589b909a24d0f1b3a6b02e46c..d3c6cc8b117236d3d106fb1e2b04e01e3ffd646e 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -69,6 +69,7 @@ public class DefaultMQProducerTest { private DefaultMQProducer producer; private Message message; private Message zeroMsg; + private Message bigMessage; private String topic = "FooBar"; private String producerGroupPrefix = "FooBar_PID"; @@ -77,8 +78,10 @@ public class DefaultMQProducerTest { String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); producer = new DefaultMQProducer(producerGroupTemp); producer.setNamesrvAddr("127.0.0.1:9876"); + producer.setCompressMsgBodyOverHowmuch(16); message = new Message(topic, new byte[] {'a'}); zeroMsg = new Message(topic, new byte[] {}); + bigMessage = new Message(topic, "This is a very huge message!".getBytes()); producer.start(); @@ -146,6 +149,52 @@ public class DefaultMQProducerTest { assertThat(sendResult.getQueueOffset()).isEqualTo(456L); } + @Test + public void testSendMessageSync_WithBodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + SendResult sendResult = producer.send(bigMessage); + + assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); + assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); + assertThat(sendResult.getQueueOffset()).isEqualTo(456L); + } + + @Test + public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + producer.send(message, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); + assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); + assertThat(sendResult.getQueueOffset()).isEqualTo(456L); + } + + @Override + public void onException(Throwable e) { + } + }); + + } + + @Test + public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + producer.send(bigMessage, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); + assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); + assertThat(sendResult.getQueueOffset()).isEqualTo(456L); + } + + @Override + public void onException(Throwable e) { + } + }); + + } + @Test public void testSendMessageSync_SuccessWithHook() throws Throwable { when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java index 4cac404b9bd08e4546b2b16ef2a6b955216c5e36..1b7e2bba320fb28ec032f0c500ac9ca3b2c4b57a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java @@ -89,4 +89,11 @@ public class MessageAccessor { return msg.getProperty(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP); } + public static Message cloneMessage(final Message msg) { + Message newMsg = new Message(msg.getTopic(), msg.getBody()); + newMsg.setFlag(msg.getFlag()); + newMsg.setProperties(msg.getProperties()); + return newMsg; + } + }