提交 4f7e8312 编写于 作者: 斜阳

[ISSUE #2708] Fix unit test

上级 707d1b85
...@@ -16,17 +16,6 @@ ...@@ -16,17 +16,6 @@
*/ */
package org.apache.rocketmq.client.consumer; package org.apache.rocketmq.client.consumer;
import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
...@@ -48,6 +37,7 @@ import org.apache.rocketmq.client.impl.consumer.PullMessageService; ...@@ -48,6 +37,7 @@ import org.apache.rocketmq.client.impl.consumer.PullMessageService;
import org.apache.rocketmq.client.impl.consumer.PullRequest; import org.apache.rocketmq.client.impl.consumer.PullRequest;
import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
...@@ -66,6 +56,18 @@ import org.mockito.invocation.InvocationOnMock; ...@@ -66,6 +56,18 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
...@@ -74,6 +76,7 @@ import static org.mockito.ArgumentMatchers.anyLong; ...@@ -74,6 +76,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
...@@ -87,6 +90,7 @@ public class DefaultMQPushConsumerTest { ...@@ -87,6 +90,7 @@ public class DefaultMQPushConsumerTest {
@Mock @Mock
private MQClientAPIImpl mQClientAPIImpl; private MQClientAPIImpl mQClientAPIImpl;
private RebalanceImpl rebalanceImpl; private RebalanceImpl rebalanceImpl;
private RebalancePushImpl rebalancePushImpl;
private DefaultMQPushConsumer pushConsumer; private DefaultMQPushConsumer pushConsumer;
@Before @Before
...@@ -96,24 +100,24 @@ public class DefaultMQPushConsumerTest { ...@@ -96,24 +100,24 @@ public class DefaultMQPushConsumerTest {
factoryTable.clear(); factoryTable.clear();
when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class), when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<PullResult>() { .thenAnswer(new Answer<PullResult>() {
@Override @Override
public PullResult answer(InvocationOnMock mock) throws Throwable { public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1); PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt(); MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic); messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0); messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123"); messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'}); messageClientExt.setBody(new byte[]{'a'});
messageClientExt.setOffsetMsgId("234"); messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080)); messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080)); messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt)); PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult); ((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult; return pullResult;
} }
}); });
consumerGroup = "FooBarGroup" + System.currentTimeMillis(); consumerGroup = "FooBarGroup" + System.currentTimeMillis();
...@@ -124,12 +128,13 @@ public class DefaultMQPushConsumerTest { ...@@ -124,12 +128,13 @@ public class DefaultMQPushConsumerTest {
pushConsumer.registerMessageListener(new MessageListenerConcurrently() { pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) { ConsumeConcurrentlyContext context) {
return null; return null;
} }
}); });
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl(); DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
// suppress updateTopicRouteInfoFromNameServer // suppress updateTopicRouteInfoFromNameServer
pushConsumer.changeInstanceNameToPID(); pushConsumer.changeInstanceNameToPID();
...@@ -142,7 +147,8 @@ public class DefaultMQPushConsumerTest { ...@@ -142,7 +147,8 @@ public class DefaultMQPushConsumerTest {
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
rebalanceImpl = spy(pushConsumerImpl.getRebalanceImpl()); rebalanceImpl = spy(pushConsumerImpl.getRebalanceImpl());
doReturn(123L).when(rebalanceImpl).computePullFromWhere(any(MessageQueue.class)); // doReturn(123L).when(rebalancePushImpl).computePullFromWhere(any(MessageQueue.class));
doReturn(123L).when(rebalanceImpl).computePullFromWhereWithException(any(MessageQueue.class));
FieldUtils.writeDeclaredField(pushConsumerImpl, "rebalanceImpl", rebalanceImpl, true); FieldUtils.writeDeclaredField(pushConsumerImpl, "rebalanceImpl", rebalanceImpl, true);
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>(); Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
...@@ -170,7 +176,7 @@ public class DefaultMQPushConsumerTest { ...@@ -170,7 +176,7 @@ public class DefaultMQPushConsumerTest {
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() { pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) { ConsumeConcurrentlyContext context) {
messageAtomic.set(msgs.get(0)); messageAtomic.set(msgs.get(0));
countDownLatch.countDown(); countDownLatch.countDown();
return null; return null;
...@@ -183,7 +189,7 @@ public class DefaultMQPushConsumerTest { ...@@ -183,7 +189,7 @@ public class DefaultMQPushConsumerTest {
MessageExt msg = messageAtomic.get(); MessageExt msg = messageAtomic.get();
assertThat(msg).isNotNull(); assertThat(msg).isNotNull();
assertThat(msg.getTopic()).isEqualTo(topic); assertThat(msg.getTopic()).isEqualTo(topic);
assertThat(msg.getBody()).isEqualTo(new byte[] {'a'}); assertThat(msg.getBody()).isEqualTo(new byte[]{'a'});
} }
@Test @Test
...@@ -210,7 +216,7 @@ public class DefaultMQPushConsumerTest { ...@@ -210,7 +216,7 @@ public class DefaultMQPushConsumerTest {
MessageExt msg = messageAtomic.get(); MessageExt msg = messageAtomic.get();
assertThat(msg).isNotNull(); assertThat(msg).isNotNull();
assertThat(msg.getTopic()).isEqualTo(topic); assertThat(msg.getTopic()).isEqualTo(topic);
assertThat(msg.getBody()).isEqualTo(new byte[] {'a'}); assertThat(msg.getBody()).isEqualTo(new byte[]{'a'});
} }
@Test @Test
...@@ -287,7 +293,7 @@ public class DefaultMQPushConsumerTest { ...@@ -287,7 +293,7 @@ public class DefaultMQPushConsumerTest {
pushConsumer.registerMessageListener(new MessageListenerConcurrently() { pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) { ConsumeConcurrentlyContext context) {
return null; return null;
} }
}); });
...@@ -313,11 +319,29 @@ public class DefaultMQPushConsumerTest { ...@@ -313,11 +319,29 @@ public class DefaultMQPushConsumerTest {
} }
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
List<MessageExt> messageExtList) throws Exception { List<MessageExt> messageExtList) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (MessageExt messageExt : messageExtList) { for (MessageExt messageExt : messageExtList) {
outputStream.write(MessageDecoder.encode(messageExt, false)); outputStream.write(MessageDecoder.encode(messageExt, false));
} }
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray()); return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
} }
@Test
public void testPullMessage_ExceptionOccursWhenComputePullFromWhere() throws MQClientException {
doThrow(MQClientException.class).when(rebalancePushImpl).computePullFromWhereWithException(any(MessageQueue.class));
final CountDownLatch countDownLatch = new CountDownLatch(1);
final MessageExt[] messageExts = new MessageExt[1];
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(
new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(),
(msgs, context) -> {
messageExts[0] = msgs.get(0);
return null;
}));
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeOrderly(true);
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest());
assertThat(messageExts[0]).isNull();
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册