提交 eaa5ba52 编写于 作者: Y yukon

[ROCKETMQ-52] Polish unit tests for rocketmq-client

上级 6f60b4e8
...@@ -20,29 +20,37 @@ import java.io.ByteArrayOutputStream; ...@@ -20,29 +20,37 @@ import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService; import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue; import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper; import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.PullMessageService; import org.apache.rocketmq.client.impl.consumer.PullMessageService;
import org.apache.rocketmq.client.impl.consumer.PullRequest; import org.apache.rocketmq.client.impl.consumer.PullRequest;
import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After; import org.junit.After;
...@@ -62,8 +70,8 @@ import static org.mockito.ArgumentMatchers.anyLong; ...@@ -62,8 +70,8 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class DefaultMQPushConsumerTest { public class DefaultMQPushConsumerTest {
...@@ -75,6 +83,7 @@ public class DefaultMQPushConsumerTest { ...@@ -75,6 +83,7 @@ public class DefaultMQPushConsumerTest {
@Mock @Mock
private MQClientAPIImpl mQClientAPIImpl; private MQClientAPIImpl mQClientAPIImpl;
private PullAPIWrapper pullAPIWrapper; private PullAPIWrapper pullAPIWrapper;
private RebalancePushImpl rebalancePushImpl;
private DefaultMQPushConsumer pushConsumer; private DefaultMQPushConsumer pushConsumer;
@Before @Before
...@@ -91,14 +100,21 @@ public class DefaultMQPushConsumerTest { ...@@ -91,14 +100,21 @@ public class DefaultMQPushConsumerTest {
return null; return null;
} }
}); });
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
pushConsumer.subscribe(topic, "*"); pushConsumer.subscribe(topic, "*");
pushConsumer.start(); pushConsumer.start();
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl(); field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true); field.setAccessible(true);
field.set(pushConsumerImpl, mQClientFactory); field.set(pushConsumerImpl, mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true); field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl); field.set(mQClientFactory, mQClientAPIImpl);
...@@ -107,9 +123,35 @@ public class DefaultMQPushConsumerTest { ...@@ -107,9 +123,35 @@ public class DefaultMQPushConsumerTest {
field.setAccessible(true); field.setAccessible(true);
field.set(pushConsumerImpl, pullAPIWrapper); field.set(pushConsumerImpl, pullAPIWrapper);
when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false)); pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl); mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
mQClientFactory.start(); mQClientFactory.start();
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback)mock.getArgument(4)).onSuccess(pullResult);
return null;
}
}).when(mQClientAPIImpl).pullMessage(anyString(), any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(PullCallback.class));
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString());
Set<MessageQueue> messageQueueSet = new HashSet<>();
messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
doReturn(messageQueueSet).when(mQClientAPIImpl).lockBatchMQ(anyString(), any(LockBatchRequestBody.class), anyLong());
doReturn(123L).when(rebalancePushImpl).computePullFromWhere(any(MessageQueue.class));
} }
@After @After
...@@ -129,25 +171,31 @@ public class DefaultMQPushConsumerTest { ...@@ -129,25 +171,31 @@ public class DefaultMQPushConsumerTest {
return null; return null;
} }
})); }));
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock mock) throws Throwable { PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
PullMessageRequestHeader requestHeader = mock.getArgument(1); pullMessageService.executePullRequestImmediately(createPullRequest());
MessageClientExt messageClientExt = new MessageClientExt(); countDownLatch.await();
messageClientExt.setTopic(topic); assertThat(messageExts[0].getTopic()).isEqualTo(topic);
messageClientExt.setQueueId(0); assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
messageClientExt.setMsgId("123"); }
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234"); @Test
messageClientExt.setBornHost(new InetSocketAddress(8080)); public void testPullMessage_SuccessWithOrderlyService() throws InterruptedException, RemotingException, MQBrokerException {
messageClientExt.setStoreHost(new InetSocketAddress(8080)); final CountDownLatch countDownLatch = new CountDownLatch(1);
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt)); final MessageExt[] messageExts = new MessageExt[1];
((PullCallback)mock.getArgument(4)).onSuccess(pullResult); pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageOrderlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
messageExts[0] = msgs.get(0);
countDownLatch.countDown();
return null; return null;
} }
}).when(mQClientAPIImpl).pullMessage(anyString(), any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)); }));
pushConsumer.getDefaultMQPushConsumerImpl().doRebalance();
PullMessageService pullMessageService = mQClientFactory.getPullMessageService(); PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest()); pullMessageService.executePullRequestImmediately(createPullRequest());
countDownLatch.await(); countDownLatch.await();
assertThat(messageExts[0].getTopic()).isEqualTo(topic); assertThat(messageExts[0].getTopic()).isEqualTo(topic);
assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'}); assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
......
...@@ -59,7 +59,7 @@ public class MixAllTest { ...@@ -59,7 +59,7 @@ public class MixAllTest {
file.delete(); file.delete();
} }
file.createNewFile(); file.createNewFile();
try( PrintWriter out = new PrintWriter( fileName ) ){ try (PrintWriter out = new PrintWriter(fileName)) {
out.write("TestForMixAll"); out.write("TestForMixAll");
} }
String string = MixAll.file2String(fileName); String string = MixAll.file2String(fileName);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册