未验证 提交 b0a89596 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #2917 from ayanamist/fix-test

Fix test stability
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
package org.apache.rocketmq.client.consumer; package org.apache.rocketmq.client.consumer;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
...@@ -45,7 +44,6 @@ import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyServic ...@@ -45,7 +44,6 @@ import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyServic
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService; import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue; import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.PullMessageService; import org.apache.rocketmq.client.impl.consumer.PullMessageService;
import org.apache.rocketmq.client.impl.consumer.PullRequest; import org.apache.rocketmq.client.impl.consumer.PullRequest;
import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.client.impl.consumer.PullResultExt;
...@@ -88,7 +86,6 @@ public class DefaultMQPushConsumerTest { ...@@ -88,7 +86,6 @@ public class DefaultMQPushConsumerTest {
@Mock @Mock
private MQClientAPIImpl mQClientAPIImpl; private MQClientAPIImpl mQClientAPIImpl;
private PullAPIWrapper pullAPIWrapper;
private RebalanceImpl rebalanceImpl; private RebalanceImpl rebalanceImpl;
private DefaultMQPushConsumer pushConsumer; private DefaultMQPushConsumer pushConsumer;
...@@ -98,6 +95,27 @@ public class DefaultMQPushConsumerTest { ...@@ -98,6 +95,27 @@ public class DefaultMQPushConsumerTest {
factoryTable.forEach((s, instance) -> instance.shutdown()); factoryTable.forEach((s, instance) -> instance.shutdown());
factoryTable.clear(); factoryTable.clear();
when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<PullResult>() {
@Override
public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
consumerGroup = "FooBarGroup" + System.currentTimeMillis(); consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.setNamesrvAddr("127.0.0.1:9876"); pushConsumer.setNamesrvAddr("127.0.0.1:9876");
...@@ -115,58 +133,24 @@ public class DefaultMQPushConsumerTest { ...@@ -115,58 +133,24 @@ public class DefaultMQPushConsumerTest {
// suppress updateTopicRouteInfoFromNameServer // suppress updateTopicRouteInfoFromNameServer
pushConsumer.changeInstanceNameToPID(); pushConsumer.changeInstanceNameToPID();
mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true))); mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true));
FieldUtils.writeDeclaredField(mQClientFactory, "mQClientAPIImpl", mQClientAPIImpl, true);
mQClientFactory = spy(mQClientFactory);
factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory); factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
rebalanceImpl = spy(pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl()); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalanceImpl);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pushConsumerImpl, mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
field.setAccessible(true);
field.set(pushConsumerImpl, pullAPIWrapper);
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), rebalanceImpl = spy(pushConsumerImpl.getRebalanceImpl());
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) doReturn(123L).when(rebalanceImpl).computePullFromWhere(any(MessageQueue.class));
.thenAnswer(new Answer<PullResult>() { FieldUtils.writeDeclaredField(pushConsumerImpl, "rebalanceImpl", rebalanceImpl, true);
@Override
public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>(); Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue()); messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); pushConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
doReturn(123L).when(rebalanceImpl).computePullFromWhere(any(MessageQueue.class));
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
} }
@After @After
...@@ -292,7 +276,7 @@ public class DefaultMQPushConsumerTest { ...@@ -292,7 +276,7 @@ public class DefaultMQPushConsumerTest {
PullMessageService pullMessageService = mQClientFactory.getPullMessageService(); PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest()); pullMessageService.executePullRequestImmediately(createPullRequest());
assertThat(countDownLatch.await(30, TimeUnit.SECONDS)).isTrue(); assertThat(countDownLatch.await(10, TimeUnit.SECONDS)).isTrue();
pushConsumer.shutdown(); pushConsumer.shutdown();
assertThat(messageConsumedFlag.get()).isTrue(); assertThat(messageConsumedFlag.get()).isTrue();
......
...@@ -21,7 +21,6 @@ import io.opentracing.mock.MockSpan; ...@@ -21,7 +21,6 @@ import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer; import io.opentracing.mock.MockTracer;
import io.opentracing.tag.Tags; import io.opentracing.tag.Tags;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
...@@ -75,6 +74,7 @@ import org.mockito.junit.MockitoJUnitRunner; ...@@ -75,6 +74,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.waitAtMost;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
...@@ -105,6 +105,27 @@ public class DefaultMQConsumerWithOpenTracingTest { ...@@ -105,6 +105,27 @@ public class DefaultMQConsumerWithOpenTracingTest {
factoryTable.forEach((s, instance) -> instance.shutdown()); factoryTable.forEach((s, instance) -> instance.shutdown());
factoryTable.clear(); factoryTable.clear();
when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<PullResult>() {
@Override
public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[]{'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
consumerGroup = "FooBarGroup" + System.currentTimeMillis(); consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( pushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
...@@ -128,58 +149,20 @@ public class DefaultMQConsumerWithOpenTracingTest { ...@@ -128,58 +149,20 @@ public class DefaultMQConsumerWithOpenTracingTest {
// suppress updateTopicRouteInfoFromNameServer // suppress updateTopicRouteInfoFromNameServer
pushConsumer.changeInstanceNameToPID(); pushConsumer.changeInstanceNameToPID();
mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true))); mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true));
FieldUtils.writeDeclaredField(mQClientFactory, "mQClientAPIImpl", mQClientAPIImpl, true);
mQClientFactory = spy(mQClientFactory);
factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory); factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pushConsumerImpl, mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
field.setAccessible(true);
field.set(pushConsumerImpl, pullAPIWrapper);
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<PullResult>() {
@Override
public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[]{'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>(); Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue()); messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); pushConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
} }
@After @After
...@@ -209,7 +192,8 @@ public class DefaultMQConsumerWithOpenTracingTest { ...@@ -209,7 +192,8 @@ public class DefaultMQConsumerWithOpenTracingTest {
assertThat(msg.getTopic()).isEqualTo(topic); assertThat(msg.getTopic()).isEqualTo(topic);
assertThat(msg.getBody()).isEqualTo(new byte[]{'a'}); assertThat(msg.getBody()).isEqualTo(new byte[]{'a'});
assertThat(tracer.finishedSpans().size()).isEqualTo(1); // wait until consumeMessageAfter hook of tracer is done surely.
waitAtMost(1, TimeUnit.SECONDS).until(() -> tracer.finishedSpans().size() == 1);
MockSpan span = tracer.finishedSpans().get(0); MockSpan span = tracer.finishedSpans().get(0);
assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic); assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic);
assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_CONSUMER); assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_CONSUMER);
......
...@@ -439,6 +439,12 @@ ...@@ -439,6 +439,12 @@
<version>3.10.0</version> <version>3.10.0</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.1.0</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<dependencyManagement> <dependencyManagement>
......
...@@ -43,7 +43,7 @@ public class ListDataCollectorImpl implements DataCollector { ...@@ -43,7 +43,7 @@ public class ListDataCollectorImpl implements DataCollector {
return datas; return datas;
} }
public void resetData() { public synchronized void resetData() {
datas.clear(); datas.clear();
unlockIncrement(); unlockIncrement();
} }
...@@ -67,7 +67,7 @@ public class ListDataCollectorImpl implements DataCollector { ...@@ -67,7 +67,7 @@ public class ListDataCollectorImpl implements DataCollector {
return Collections.frequency(datas, data) == 1; return Collections.frequency(datas, data) == 1;
} }
public Collection<Object> getAllDataWithoutDuplicate() { public synchronized Collection<Object> getAllDataWithoutDuplicate() {
return new HashSet<Object>(datas); return new HashSet<Object>(datas);
} }
...@@ -81,7 +81,7 @@ public class ListDataCollectorImpl implements DataCollector { ...@@ -81,7 +81,7 @@ public class ListDataCollectorImpl implements DataCollector {
return res; return res;
} }
public void removeData(Object data) { public synchronized void removeData(Object data) {
datas.remove(data); datas.remove(data);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册