提交 67751d81 编写于 作者: K King 提交者: Heng Du

Add unit test for lite pull consumer (#1410)

* Add unit test for lite pull consumer.

* Add synchronized to poll function.
上级 2ab9e850
...@@ -327,7 +327,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -327,7 +327,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e); log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
} }
} }
}, 1000 * 20, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS); }, 1000 * 10, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS);
} }
private void operateAfterRunning() throws MQClientException { private void operateAfterRunning() throws MQClientException {
...@@ -491,7 +491,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -491,7 +491,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
} }
public List<MessageExt> poll(long timeout) { public synchronized List<MessageExt> poll(long timeout) {
try { try {
checkServiceState(); checkServiceState();
if (timeout < 0) if (timeout < 0)
......
...@@ -40,6 +40,7 @@ import org.apache.rocketmq.client.impl.consumer.PullResultExt; ...@@ -40,6 +40,7 @@ import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.consumer.RebalanceService; import org.apache.rocketmq.client.impl.consumer.RebalanceService;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
...@@ -83,6 +84,7 @@ public class DefaultLitePullConsumerTest { ...@@ -83,6 +84,7 @@ public class DefaultLitePullConsumerTest {
private String consumerGroup = "LitePullConsumerGroup"; private String consumerGroup = "LitePullConsumerGroup";
private String topic = "LitePullConsumerTest"; private String topic = "LitePullConsumerTest";
private String brokerName = "BrokerA"; private String brokerName = "BrokerA";
private boolean flag = false;
@Before @Before
public void init() throws Exception { public void init() throws Exception {
...@@ -155,7 +157,7 @@ public class DefaultLitePullConsumerTest { ...@@ -155,7 +157,7 @@ public class DefaultLitePullConsumerTest {
} }
@Test @Test
public void testFetchMesseageQueues_FetchMessageQueuesBeforeStart() throws Exception { public void testFetchMessageQueues_FetchMessageQueuesBeforeStart() throws Exception {
DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer(); DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer();
try { try {
litePullConsumer.fetchMessageQueues(topic); litePullConsumer.fetchMessageQueues(topic);
...@@ -167,6 +169,22 @@ public class DefaultLitePullConsumerTest { ...@@ -167,6 +169,22 @@ public class DefaultLitePullConsumerTest {
} }
} }
@Test
public void testSeek_SeekOffsetSuccess() throws Exception {
DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer();
when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L);
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
long offset = litePullConsumer.committed(messageQueue);
litePullConsumer.seek(messageQueue, offset);
Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
field.setAccessible(true);
AssignedMessageQueue assignedMessageQueue = (AssignedMessageQueue) field.get(litePullConsumerImpl);
assertEquals(assignedMessageQueue.getSeekOffset(messageQueue), offset);
litePullConsumer.shutdown();
}
@Test @Test
public void testSeek_SeekOffsetIllegal() throws Exception { public void testSeek_SeekOffsetIllegal() throws Exception {
DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer(); DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer();
...@@ -190,21 +208,6 @@ public class DefaultLitePullConsumerTest { ...@@ -190,21 +208,6 @@ public class DefaultLitePullConsumerTest {
litePullConsumer.shutdown(); litePullConsumer.shutdown();
} }
@Test
public void testSeek_SeekOffsetSuccess() throws Exception {
DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer();
when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L);
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
litePullConsumer.seek(messageQueue, 50);
Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
field.setAccessible(true);
AssignedMessageQueue assignedMessageQueue = (AssignedMessageQueue) field.get(litePullConsumerImpl);
assertEquals(assignedMessageQueue.getSeekOffset(messageQueue), 50);
litePullConsumer.shutdown();
}
@Test @Test
public void testSeek_MessageQueueNotInAssignList() throws Exception { public void testSeek_MessageQueueNotInAssignList() throws Exception {
DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer(); DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer();
...@@ -216,6 +219,34 @@ public class DefaultLitePullConsumerTest { ...@@ -216,6 +219,34 @@ public class DefaultLitePullConsumerTest {
} finally { } finally {
litePullConsumer.shutdown(); litePullConsumer.shutdown();
} }
litePullConsumer = createSubscribeLitePullConsumer();
try {
litePullConsumer.seek(createMessageQueue(), 0);
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("The message queue is not in assigned list, may be rebalancing");
} finally {
litePullConsumer.shutdown();
}
}
@Test
public void testOffsetForTimestamp_FailedAndSuccess() throws Exception {
MessageQueue messageQueue = createMessageQueue();
DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer();
try {
litePullConsumer.offsetForTimestamp(messageQueue, 123456L);
failBecauseExceptionWasNotThrown(IllegalStateException.class);
} catch (IllegalStateException e) {
assertThat(e).hasMessageContaining("The consumer not running, please start it first.");
} finally {
litePullConsumer.shutdown();
}
doReturn(123L).when(mQAdminImpl).searchOffset(any(MessageQueue.class), anyLong());
litePullConsumer = createStartLitePullConsumer();
long offset = litePullConsumer.offsetForTimestamp(messageQueue, 123456L);
assertThat(offset).isEqualTo(123L);
} }
@Test @Test
...@@ -238,12 +269,120 @@ public class DefaultLitePullConsumerTest { ...@@ -238,12 +269,120 @@ public class DefaultLitePullConsumerTest {
} }
} }
private MessageQueue createMessageQueue() { @Test
MessageQueue messageQueue = new MessageQueue(); public void testRegisterTopicMessageQueueChangeListener_Success() throws Exception {
messageQueue.setBrokerName(brokerName); flag = false;
messageQueue.setQueueId(0); DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer();
messageQueue.setTopic(topic); doReturn(Collections.emptySet()).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString());
return messageQueue; litePullConsumer.setTopicMetadataCheckIntervalMillis(10);
litePullConsumer.registerTopicMessageQueueChangeListener(topic, new TopicMessageQueueChangeListener() {
@Override public void onChanged(String topic, Set<MessageQueue> messageQueues) {
flag = true;
}
});
Set<MessageQueue> set = new HashSet<MessageQueue>();
set.add(createMessageQueue());
doReturn(set).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString());
Thread.sleep(11 * 1000);
assertThat(flag).isTrue();
}
@Test
public void testFlowControl_Success() throws Exception {
DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer();
try {
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.setPullThresholdForAll(-1);
litePullConsumer.assign(Collections.singletonList(messageQueue));
litePullConsumer.setPollTimeoutMillis(500);
List<MessageExt> result = litePullConsumer.poll();
assertThat(result).isEmpty();
} finally {
litePullConsumer.shutdown();
}
litePullConsumer = createStartLitePullConsumer();
try {
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.setPullThresholdForQueue(-1);
litePullConsumer.assign(Collections.singletonList(messageQueue));
litePullConsumer.setPollTimeoutMillis(500);
List<MessageExt> result = litePullConsumer.poll();
assertThat(result).isEmpty();
} finally {
litePullConsumer.shutdown();
}
litePullConsumer = createStartLitePullConsumer();
try {
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.setPullThresholdSizeForQueue(-1);
litePullConsumer.assign(Collections.singletonList(messageQueue));
litePullConsumer.setPollTimeoutMillis(500);
List<MessageExt> result = litePullConsumer.poll();
assertThat(result).isEmpty();
} finally {
litePullConsumer.shutdown();
}
litePullConsumer = createStartLitePullConsumer();
try {
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.setConsumeMaxSpan(-1);
litePullConsumer.assign(Collections.singletonList(messageQueue));
litePullConsumer.setPollTimeoutMillis(500);
List<MessageExt> result = litePullConsumer.poll();
assertThat(result).isEmpty();
} finally {
litePullConsumer.shutdown();
}
}
@Test
public void testCheckConfig_Exception() {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(MixAll.DEFAULT_CONSUMER_GROUP);
try {
litePullConsumer.start();
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("consumerGroup can not equal");
} finally {
litePullConsumer.shutdown();
}
litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.setMessageModel(null);
try {
litePullConsumer.start();
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("messageModel is null");
} finally {
litePullConsumer.shutdown();
}
litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.setAllocateMessageQueueStrategy(null);
try {
litePullConsumer.start();
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("allocateMessageQueueStrategy is null");
} finally {
litePullConsumer.shutdown();
}
litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.setConsumerTimeoutMillisWhenSuspend(1);
try {
litePullConsumer.start();
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis");
} finally {
litePullConsumer.shutdown();
}
} }
private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception { private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception {
...@@ -310,33 +449,40 @@ public class DefaultLitePullConsumerTest { ...@@ -310,33 +449,40 @@ public class DefaultLitePullConsumerTest {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
litePullConsumer.subscribe(topic, "*"); litePullConsumer.subscribe(topic, "*");
litePullConsumer.start(); litePullConsumer.start();
initDefaultLitePullConsumer(litePullConsumer); initDefaultLitePullConsumer(litePullConsumer);
return litePullConsumer; return litePullConsumer;
} }
private DefaultLitePullConsumer createBroadcastLitePullConsumer() throws Exception { private DefaultLitePullConsumer createStartLitePullConsumer() throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
litePullConsumer.setMessageModel(MessageModel.BROADCASTING);
litePullConsumer.subscribe(topic, "*");
litePullConsumer.start(); litePullConsumer.start();
initDefaultLitePullConsumer(litePullConsumer); initDefaultLitePullConsumer(litePullConsumer);
return litePullConsumer; return litePullConsumer;
} }
private DefaultLitePullConsumer createStartLitePullConsumer() throws Exception { private DefaultLitePullConsumer createNotStartLitePullConsumer() {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
return litePullConsumer;
}
private DefaultLitePullConsumer createBroadcastLitePullConsumer() throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
litePullConsumer.setMessageModel(MessageModel.BROADCASTING);
litePullConsumer.subscribe(topic, "*");
litePullConsumer.start(); litePullConsumer.start();
initDefaultLitePullConsumer(litePullConsumer); initDefaultLitePullConsumer(litePullConsumer);
return litePullConsumer; return litePullConsumer;
} }
private DefaultLitePullConsumer createNotStartLitePullConsumer() { private MessageQueue createMessageQueue() {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); MessageQueue messageQueue = new MessageQueue();
return litePullConsumer; messageQueue.setBrokerName(brokerName);
messageQueue.setQueueId(0);
messageQueue.setTopic(topic);
return messageQueue;
} }
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册