提交 9f4934fc 编写于 作者: S Shannon 提交者: von gosling

[ISSUE #66] duplicate compress message body if retry to send msg when… (#294)

* Fix issue : duplicate compress message body if retry to send msg when  exception occurs in async sending.
上级 c508cb33
......@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
......
......@@ -607,8 +607,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
......@@ -678,10 +680,19 @@ public class DefaultMQProducerImpl implements MQProducerInner {
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
msg.setBody(prevBody);
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
tmpMessage,
requestHeader,
timeout,
communicationMode,
......
......@@ -69,6 +69,7 @@ public class DefaultMQProducerTest {
private DefaultMQProducer producer;
private Message message;
private Message zeroMsg;
private Message bigMessage;
private String topic = "FooBar";
private String producerGroupPrefix = "FooBar_PID";
......@@ -77,8 +78,10 @@ public class DefaultMQProducerTest {
String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
producer = new DefaultMQProducer(producerGroupTemp);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setCompressMsgBodyOverHowmuch(16);
message = new Message(topic, new byte[] {'a'});
zeroMsg = new Message(topic, new byte[] {});
bigMessage = new Message(topic, "This is a very huge message!".getBytes());
producer.start();
......@@ -146,6 +149,52 @@ public class DefaultMQProducerTest {
assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
}
@Test
public void testSendMessageSync_WithBodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
SendResult sendResult = producer.send(bigMessage);
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
}
@Test
public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
}
@Override
public void onException(Throwable e) {
}
});
}
@Test
public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
producer.send(bigMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
}
@Override
public void onException(Throwable e) {
}
});
}
@Test
public void testSendMessageSync_SuccessWithHook() throws Throwable {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
......
......@@ -89,4 +89,11 @@ public class MessageAccessor {
return msg.getProperty(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP);
}
public static Message cloneMessage(final Message msg) {
Message newMsg = new Message(msg.getTopic(), msg.getBody());
newMsg.setFlag(msg.getFlag());
newMsg.setProperties(msg.getProperties());
return newMsg;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册