提交 8353551d 编写于 作者: Q qqeasonchen

add unit test of reply

上级 e7b9169b
......@@ -19,12 +19,16 @@ package org.apache.rocketmq.client.consumer;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
......@@ -33,6 +37,7 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
......@@ -46,11 +51,23 @@ import org.apache.rocketmq.client.impl.consumer.PullRequest;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Assert;
......@@ -66,8 +83,10 @@ import org.powermock.modules.junit4.PowerMockRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
......@@ -123,6 +142,10 @@ public class DefaultMQPushConsumerTest {
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(mQClientFactory.getDefaultMQProducer().getDefaultMQProducerImpl(), mQClientFactory);
pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
field.setAccessible(true);
......@@ -256,6 +279,64 @@ public class DefaultMQPushConsumerTest {
}
}
@Test
public void testReplyMessageSync() throws RemotingException, InterruptedException, MQBrokerException, MQClientException, NoSuchFieldException {
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
SendResult sendResult = pushConsumer.reply(createRequestMessage(), new byte[]{'a'}, 3000);
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("12345");
assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
}
@Test
public void testReplyMessageASync() throws RemotingException, InterruptedException, MQBrokerException, MQClientException, NoSuchFieldException {
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenAnswer(new Answer<Object>() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
CommunicationMode communicationMode = mock.getArgument(5);
SendCallback sendCallback = mock.getArgument(6);
SendResult sendResult = createSendResult(SendStatus.SEND_OK);
switch (communicationMode) {
case SYNC:
return sendCallback;
case ASYNC:
case ONEWAY:
if (sendCallback != null) {
sendCallback.onSuccess(sendResult);
}
}
return null;
}
});
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicBoolean replySuccess = new AtomicBoolean(false);
pushConsumer.reply(createRequestMessage(), new byte[] {'a'}, new SendCallback() {
@Override public void onSuccess(SendResult sendResult) {
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("12345");
assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
countDownLatch.countDown();
replySuccess.set(true);
}
@Override public void onException(Throwable e) {
countDownLatch.countDown();
}
}, 3000);
countDownLatch.await(3000, TimeUnit.MILLISECONDS);
assertTrue(replySuccess.get());
}
private DefaultMQPushConsumer createPushConsumer() {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
......@@ -294,4 +375,51 @@ public class DefaultMQPushConsumerTest {
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
private SendResult createSendResult(SendStatus sendStatus) {
SendResult sendResult = new SendResult();
sendResult.setMsgId("12345");
sendResult.setOffsetMsgId("12345");
sendResult.setQueueOffset(456);
sendResult.setSendStatus(sendStatus);
sendResult.setRegionId("HZ");
return sendResult;
}
public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("BrokerA");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData();
queueData.setBrokerName("BrokerA");
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSynFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
private Message createRequestMessage() {
Message requestMessage = new Message();
Map map = new HashMap<String, String>();
map.put(MessageConst.PROPERTY_MESSAGE_REPLY_TO, "reply_to_who");
map.put(MessageConst.PROPERTY_CLUSTER, "DefaultCluster");
map.put(MessageConst.PROPERTY_MESSAGE_TTL, "3000");
MessageAccessor.setProperties(requestMessage, map);
return requestMessage;
}
}
......@@ -49,6 +49,16 @@ public class MessageUtilsTest {
}
}
@Test
public void testCreateReplyMessage_reqMsgIsNull() throws MQClientException {
try {
Message msg = MessageUtil.createReplyMessage(null, new byte[] {'a'});
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("create reply message fail, requestMessage cannot be null.");
}
}
private Message createReplyMessage(String clusterName) {
Message requestMessage = new Message();
Map map = new HashMap<String, String>();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册