提交 9c3b26cf 编写于 作者: K King 提交者: Heng Du

Polish lite pull consumer (#1359)

* fix unsubscribe code

* fix commit consumed offset

* fix commit consumed offset

* fix commit consumed offset

* fix commit consumed offset

* polish commit consumed offset

* pass checkstyle

* pass checkstyle

* polish LiteMQPullConsumer

* add flow control and polish commit logic

* fix bug

* polish code

* fix commit consumed offset back

* refactor litePullConsumer

* development save

* development save

* Refactor DefaultLitePullConsumer and DefaultLitePullConsumerImpl.

* Polish lite pull consumer

* polish lite pull consumer

* polish lite pull consumer

* fix seek

* fix seek function

* polish lite pull consumer

* add apache header

* add test

* polish test
上级 83098eb6
......@@ -32,10 +32,9 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
private DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
/**
* Do the same thing for the same Group, the application must be set,and guarantee Globally unique
......@@ -47,7 +46,6 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
*/
private long brokerSuspendMaxTimeMillis = 1000 * 20;
/**
* Long polling mode, the Consumer connection timeout(must greater than brokerSuspendMaxTimeMillis), it is not
* recommended to modify
......@@ -134,10 +132,15 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
private int pullThresholdSizeForQueue = 100;
/**
* The socket timeout in milliseconds
* The poll timeout in milliseconds
*/
private long pollTimeoutMillis = 1000 * 5;
/**
* Message pull delay in milliseconds
*/
private long pullDelayTimeMills = 0;
public DefaultLitePullConsumer() {
this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);
}
......@@ -163,7 +166,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.consumerGroup = consumerGroup;
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this,rpcHook);
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}
@Override
......@@ -217,13 +220,13 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
}
@Override
public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException{
public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));
}
@Override
public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException{
return this.defaultLitePullConsumerImpl.searchOffset(messageQueue,timestamp);
public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException {
return this.defaultLitePullConsumerImpl.searchOffset(messageQueue, timestamp);
}
@Override
......@@ -393,4 +396,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
}
public long getPullDelayTimeMills() {
return pullDelayTimeMills;
}
public void setPullDelayTimeMills(long pullDelayTimeMills) {
this.pullDelayTimeMills = pullDelayTimeMills;
}
}
......@@ -20,6 +20,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.message.MessageQueue;
public class AssignedMessageQueue {
......@@ -36,7 +37,7 @@ public class AssignedMessageQueue {
this.rebalanceImpl = rebalanceImpl;
}
public Collection<MessageQueue> messageQueues(){
public Collection<MessageQueue> messageQueues() {
return assignedMessageQueueState.keySet();
}
......@@ -52,6 +53,7 @@ public class AssignedMessageQueue {
for (MessageQueue messageQueue : messageQueues) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) {
messageQueueStat.getPausedLatch().reset();
messageQueueStat.setPaused(true);
}
}
......@@ -62,6 +64,7 @@ public class AssignedMessageQueue {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) {
messageQueueStat.setPaused(false);
messageQueueStat.getPausedLatch().reset();
}
}
}
......@@ -74,18 +77,18 @@ public class AssignedMessageQueue {
return null;
}
public long getNextOffset(MessageQueue messageQueue) {
public long getPullOffset(MessageQueue messageQueue) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
if (messageQueueStat != null) {
return messageQueueStat.getNextOffset();
return messageQueueStat.getPullOffset();
}
return -1;
}
public void updateNextOffset(MessageQueue messageQueue, long offset) {
public void updatePullOffset(MessageQueue messageQueue, long offset) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
if (messageQueueStat != null) {
messageQueueStat.setNextOffset(offset);
messageQueueStat.setPullOffset(offset);
}
}
......@@ -119,12 +122,21 @@ public class AssignedMessageQueue {
return -1;
}
public CountDownLatch2 getPausedLatch(MessageQueue messageQueue) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
if (messageQueueStat != null) {
return messageQueueStat.getPausedLatch();
}
return null;
}
public void updateAssignedMessageQueue(Collection<MessageQueue> assigned) {
synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, MessageQueueStat> next = it.next();
if (!assigned.contains(next.getKey())) {
next.getValue().getProcessQueue().setDropped(true);
it.remove();
}
}
......@@ -159,10 +171,11 @@ public class AssignedMessageQueue {
public class MessageQueueStat {
private MessageQueue messageQueue;
private ProcessQueue processQueue;
private boolean paused = false;
private long nextOffset = -1;
private long consumeOffset = -1;
private volatile boolean paused = false;
private volatile long pullOffset = -1;
private volatile long consumeOffset = -1;
private volatile long seekOffset = -1;
private CountDownLatch2 pausedLatch = new CountDownLatch2(1);
public MessageQueueStat(MessageQueue messageQueue, ProcessQueue processQueue) {
this.messageQueue = messageQueue;
......@@ -185,12 +198,12 @@ public class AssignedMessageQueue {
this.paused = paused;
}
public long getNextOffset() {
return nextOffset;
public long getPullOffset() {
return pullOffset;
}
public void setNextOffset(long nextOffset) {
this.nextOffset = nextOffset;
public void setPullOffset(long pullOffset) {
this.pullOffset = pullOffset;
}
public ProcessQueue getProcessQueue() {
......@@ -216,5 +229,9 @@ public class AssignedMessageQueue {
public void setSeekOffset(long seekOffset) {
this.seekOffset = seekOffset;
}
public CountDownLatch2 getPausedLatch() {
return pausedLatch;
}
}
}
......@@ -16,27 +16,22 @@
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Collection;
import java.util.Collections;
import java.util.TreeMap;
import java.util.HashSet;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
......@@ -56,6 +51,7 @@ import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.ThreadFactoryImpl;
......@@ -65,7 +61,11 @@ import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
......@@ -150,7 +150,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
private void checkServiceState() {
if (!(this.serviceState == ServiceState.RUNNING))
if (this.serviceState != ServiceState.RUNNING)
throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
}
......@@ -347,6 +347,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
}
public PullAPIWrapper getPullAPIWrapper() {
return pullAPIWrapper;
}
private void copySubscription() throws MQClientException {
......@@ -440,16 +445,24 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public List<MessageExt> poll(long timeout) {
try {
checkServiceState();
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
if (defaultLitePullConsumer.isAutoCommit()) {
maybeAutoCommit();
}
long endTime = System.currentTimeMillis() + timeout;
ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if ((endTime - System.currentTimeMillis()) <= 0)
break;
if (endTime - System.currentTimeMillis() > 0) {
while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (endTime - System.currentTimeMillis() <= 0)
break;
}
}
if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
List<MessageExt> messages = consumeRequest.getMessageExts();
long offset = consumeRequest.getProcessQueue().removeMessage(messages);
......@@ -471,14 +484,33 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
public synchronized void seek(MessageQueue messageQueue, long offset) throws MQClientException {
if (offset < minOffset(messageQueue) || offset > maxOffset(messageQueue))
throw new MQClientException("Seek offset illegal", null);
if (!assignedMessageQueue.messageQueues().contains(messageQueue))
throw new MQClientException("The message queue is not in assigned list, message queue: " + messageQueue, null);
long minOffset = minOffset(messageQueue);
long maxOffset = maxOffset(messageQueue);
if (offset < minOffset || offset > maxOffset)
throw new MQClientException("Seek offset illegal, seek offset = " + offset + ", min offset = " + minOffset + ", max offset = " + maxOffset, null);
try {
assignedMessageQueue.pause(Collections.singletonList(messageQueue));
CountDownLatch2 pausedLatch = assignedMessageQueue.getPausedLatch(messageQueue);
if (pausedLatch != null) {
pausedLatch.await(2, TimeUnit.SECONDS);
}
ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
if (processQueue != null) {
processQueue.clear();
}
Iterator<ConsumeRequest> iter = consumeRequestCache.iterator();
while (iter.hasNext()) {
if (iter.next().getMessageQueue().equals(messageQueue))
iter.remove();
}
assignedMessageQueue.setSeekOffset(messageQueue, offset);
updateConsumeOffset(messageQueue, offset);
updateConsumeOffsetToBroker(messageQueue, offset, false);
assignedMessageQueue.updateConsumeOffset(messageQueue, offset);
} catch (Exception e) {
log.error("Seek offset failed.", e);
} finally {
assignedMessageQueue.resume(Collections.singletonList(messageQueue));
}
}
......@@ -545,7 +577,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
if (assignedMessageQueue.getSeekOffset(remoteQueue) == -1) {
assignedMessageQueue.updateNextOffset(remoteQueue, nextPullOffset);
assignedMessageQueue.updatePullOffset(remoteQueue, nextPullOffset);
}
}
......@@ -568,12 +600,12 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
if (seekOffset != -1) {
offset = seekOffset;
assignedMessageQueue.setSeekOffset(remoteQueue, -1);
assignedMessageQueue.updateNextOffset(remoteQueue,offset);
assignedMessageQueue.updatePullOffset(remoteQueue, offset);
} else {
offset = assignedMessageQueue.getNextOffset(remoteQueue);
offset = assignedMessageQueue.getPullOffset(remoteQueue);
if (offset == -1) {
offset = fetchConsumeOffset(remoteQueue, false);
assignedMessageQueue.updateNextOffset(remoteQueue, offset);
assignedMessageQueue.updatePullOffset(remoteQueue, offset);
assignedMessageQueue.updateConsumeOffset(remoteQueue, offset);
}
}
......@@ -596,78 +628,82 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
@Override
public void run() {
ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
if (processQueue == null && processQueue.isDropped()) {
log.info("the message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
return;
}
if (!this.isCancelled()) {
if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchNums() > defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0)
log.warn("the consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
return;
}
if (assignedMessageQueue.isPaused(messageQueue)) {
CountDownLatch2 pasuedLatch = assignedMessageQueue.getPausedLatch(messageQueue);
if (pasuedLatch != null)
pasuedLatch.countDown();
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
log.debug("Message Queue: {} has been paused!", messageQueue);
return;
}
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
if (processQueue == null && processQueue.isDropped()) {
log.info("the message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
return;
}
return;
}
if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchNums() > defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0)
log.warn("the consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
return;
}
return;
}
if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes);
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
}
return;
}
return;
}
if (!this.isCancelled()) {
if (assignedMessageQueue.isPaused(messageQueue)) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
log.debug("Message Queue: {} has been paused!", messageQueue);
if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
}
return;
}
if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes);
}
return;
}
String subExpression = null;
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
String topic = this.messageQueue.getTopic();
subExpression = rebalanceImpl.getSubscriptionInner().get(topic).getSubString();
}
long offset = nextPullOffset(messageQueue);
long pullDelayTimeMills = 0;
long pullDelayTimeMills = defaultLitePullConsumer.getPullDelayTimeMills();
try {
PullResult pullResult = pull(messageQueue, subExpression, offset, nextPullBatchNums());
switch (pullResult.getPullStatus()) {
case FOUND:
processQueue.putMessage(pullResult.getMsgFoundList());
submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
pullDelayTimeMills = 0;
if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty()) {
processQueue.putMessage(pullResult.getMsgFoundList());
submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
}
break;
case NO_NEW_MSG:
pullDelayTimeMills = 100;
case OFFSET_ILLEGAL:
//TODO
log.warn("the pull request offset illegal, {}", pullResult.toString());
break;
default:
......@@ -1037,7 +1073,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private final List<MessageExt> messageExts;
private final MessageQueue messageQueue;
private final ProcessQueue processQueue;
private long startConsumeTimeMillis;
public ConsumeRequest(final List<MessageExt> messageExts, final MessageQueue messageQueue,
final ProcessQueue processQueue) {
......@@ -1058,12 +1093,5 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return processQueue;
}
public long getStartConsumeTimeMillis() {
return startConsumeTimeMillis;
}
public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) {
this.startConsumeTimeMillis = startConsumeTimeMillis;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
......@@ -10,7 +26,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
import java.util.Set;
public class RebalanceLitePullImpl extends RebalanceImpl {
public class RebalanceLitePullImpl extends RebalanceImpl {
private final DefaultLitePullConsumerImpl litePullConsumerImpl;
......@@ -19,8 +35,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
}
public RebalanceLitePullImpl(String consumerGroup, MessageModel messageModel,
AllocateMessageQueueStrategy allocateMessageQueueStrategy,
MQClientInstance mQClientFactory, DefaultLitePullConsumerImpl litePullConsumerImpl) {
AllocateMessageQueueStrategy allocateMessageQueueStrategy,
MQClientInstance mQClientFactory, DefaultLitePullConsumerImpl litePullConsumerImpl) {
super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
this.litePullConsumerImpl = litePullConsumerImpl;
}
......@@ -37,7 +53,6 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
}
}
@Override
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
this.litePullConsumerImpl.getOffsetStore().persist(mq);
......@@ -64,5 +79,4 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.consumer.RebalanceService;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultLitePullConsumerTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
@Mock
private MQAdminImpl mQAdminImpl;
private RebalanceImpl rebalanceImpl;
private OffsetStore offsetStore;
private DefaultLitePullConsumer litePullConsumer;
private DefaultLitePullConsumerImpl litePullConsumerImpl;
private String consumerGroup = "LitePullConsumerGroup";
private String topic = "LitePullConsumerTest";
private String brokerName = "BrokerA";
@Before
public void init() throws Exception {
String groupName = consumerGroup + System.currentTimeMillis();
litePullConsumer = new DefaultLitePullConsumer(groupName);
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
field.setAccessible(true);
RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory);
field = RebalanceService.class.getDeclaredField("waitInterval");
field.setAccessible(true);
field.set(rebalanceService, 100);
litePullConsumer.start();
field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
field.setAccessible(true);
litePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer);
field = DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(litePullConsumerImpl, mQClientFactory);
PullAPIWrapper pullAPIWrapper = litePullConsumerImpl.getPullAPIWrapper();
field = PullAPIWrapper.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pullAPIWrapper, mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
field = MQClientInstance.class.getDeclaredField("mQAdminImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQAdminImpl);
field = DefaultLitePullConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
rebalanceImpl = (RebalanceImpl) field.get(litePullConsumerImpl);
field = RebalanceImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(rebalanceImpl, mQClientFactory);
offsetStore = spy(litePullConsumerImpl.getOffsetStore());
field = DefaultLitePullConsumerImpl.class.getDeclaredField("offsetStore");
field.setAccessible(true);
field.set(litePullConsumerImpl, offsetStore);
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
return pullResult;
}
});
when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false));
doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString());
doReturn(123L).when(offsetStore).readOffset(any(MessageQueue.class), any(ReadOffsetType.class));
}
@After
public void terminate() {
litePullConsumer.shutdown();
}
@Test
public void testAssign_PollMessageSuccess() {
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.setPullDelayTimeMills(60 * 1000);
litePullConsumer.assign(Collections.singletonList(messageQueue));
List<MessageExt> result = litePullConsumer.poll();
assertThat(result.get(0).getTopic()).isEqualTo(topic);
assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
}
@Test
public void testSubscribe_PollMessageSuccess() throws MQClientException {
litePullConsumer.setPullDelayTimeMills(60 * 1000);
litePullConsumer.subscribe(topic, "*");
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createMessageQueue());
litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
litePullConsumer.setPollTimeoutMillis(20 * 1000);
List<MessageExt> result = litePullConsumer.poll();
assertThat(result.get(0).getTopic()).isEqualTo(topic);
assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
}
@Test
public void testSubscriptionType_AssignAndSubscribeExclusive() throws MQClientException {
try {
litePullConsumer.subscribe(topic, "*");
litePullConsumer.assign(Collections.singletonList(createMessageQueue()));
failBecauseExceptionWasNotThrown(IllegalStateException.class);
} catch (IllegalStateException e) {
assertThat(e).hasMessageContaining("Cannot select two subscription types at the same time.");
}
}
@Test
public void testFetchMesseageQueues_FetchMessageQueuesBeforeStart() throws MQClientException {
try {
DefaultLitePullConsumer litePullConsumer = createLitePullConsumer();
litePullConsumer.fetchMessageQueues(topic);
failBecauseExceptionWasNotThrown(IllegalStateException.class);
} catch (IllegalStateException e) {
assertThat(e).hasMessageContaining("The consumer not running.");
}
}
@Test
public void testSeek_SeekOffsetIllegal() throws MQClientException {
when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L);
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
try {
litePullConsumer.seek(messageQueue, -1);
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("min offset = 0");
}
try {
litePullConsumer.seek(messageQueue, 1000);
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("max offset = 100");
}
}
@Test
public void testSeek_MessageQueueNotInAssignList() {
try {
litePullConsumer.seek(createMessageQueue(), 0);
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("The message queue is not in assigned list");
}
}
private MessageQueue createMessageQueue() {
MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName(brokerName);
messageQueue.setQueueId(0);
messageQueue.setTopic(topic);
return messageQueue;
}
private DefaultLitePullConsumer createLitePullConsumer() {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
return litePullConsumer;
}
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
List<MessageExt> messageExtList) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (MessageExt messageExt : messageExtList) {
outputStream.write(MessageDecoder.encode(messageExt, false));
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
}
......@@ -16,23 +16,33 @@
*/
package org.apache.rocketmq.example.simple;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
public class LitePullConsumerTest {
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("test");
litePullConsumer.setNamesrvAddr("localhost:9876");
litePullConsumer.setAutoCommit(true);
litePullConsumer.subscribe("test41","TagA" );
litePullConsumer.setAutoCommit(false);
litePullConsumer.start();
Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("test400");
List<MessageQueue> list = new ArrayList<>(mqSet);
Collection<MessageQueue> assginMq = Collections.singletonList(list.get(0));
litePullConsumer.assign(assginMq);
int size = 0;
litePullConsumer.seek(list.get(0), 26);
int i = 0;
while (true) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
if (messageExts != null) {
size += messageExts.size();
}
litePullConsumer.commitSync();
System.out.printf("%s %d %n", messageExts, size);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册