diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 757c9665cc9954669c71613adcf47e42cf54fb06..7f657130c4a11f1bbf18a10025a3f0ca5a07649f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -32,10 +32,9 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.RPCHook; - public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer { - private DefaultLitePullConsumerImpl defaultLitePullConsumerImpl; + private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl; /** * Do the same thing for the same Group, the application must be set,and guarantee Globally unique @@ -47,7 +46,6 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon */ private long brokerSuspendMaxTimeMillis = 1000 * 20; - /** * Long polling mode, the Consumer connection timeout(must greater than brokerSuspendMaxTimeMillis), it is not * recommended to modify @@ -134,10 +132,15 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon private int pullThresholdSizeForQueue = 100; /** - * The socket timeout in milliseconds + * The poll timeout in milliseconds */ private long pollTimeoutMillis = 1000 * 5; + /** + * Message pull delay in milliseconds + */ + private long pullDelayTimeMills = 0; + public DefaultLitePullConsumer() { this(null, MixAll.DEFAULT_CONSUMER_GROUP, null); } @@ -163,7 +166,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) { this.namespace = namespace; this.consumerGroup = consumerGroup; - defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this,rpcHook); + defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook); } @Override @@ -217,13 +220,13 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon } @Override - public Collection fetchMessageQueues(String topic) throws MQClientException{ + public Collection fetchMessageQueues(String topic) throws MQClientException { return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic)); } @Override - public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException{ - return this.defaultLitePullConsumerImpl.searchOffset(messageQueue,timestamp); + public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException { + return this.defaultLitePullConsumerImpl.searchOffset(messageQueue, timestamp); } @Override @@ -393,4 +396,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend; } + public long getPullDelayTimeMills() { + return pullDelayTimeMills; + } + + public void setPullDelayTimeMills(long pullDelayTimeMills) { + this.pullDelayTimeMills = pullDelayTimeMills; + } + } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java index a3c5da1e792f22f71b47278b7316028808c858b0..aa8379ecdd0132b40c6301030fa8db36bf884449 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.CountDownLatch2; import org.apache.rocketmq.common.message.MessageQueue; public class AssignedMessageQueue { @@ -36,7 +37,7 @@ public class AssignedMessageQueue { this.rebalanceImpl = rebalanceImpl; } - public Collection messageQueues(){ + public Collection messageQueues() { return assignedMessageQueueState.keySet(); } @@ -52,6 +53,7 @@ public class AssignedMessageQueue { for (MessageQueue messageQueue : messageQueues) { MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); if (assignedMessageQueueState.get(messageQueue) != null) { + messageQueueStat.getPausedLatch().reset(); messageQueueStat.setPaused(true); } } @@ -62,6 +64,7 @@ public class AssignedMessageQueue { MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); if (assignedMessageQueueState.get(messageQueue) != null) { messageQueueStat.setPaused(false); + messageQueueStat.getPausedLatch().reset(); } } } @@ -74,18 +77,18 @@ public class AssignedMessageQueue { return null; } - public long getNextOffset(MessageQueue messageQueue) { + public long getPullOffset(MessageQueue messageQueue) { MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); if (messageQueueStat != null) { - return messageQueueStat.getNextOffset(); + return messageQueueStat.getPullOffset(); } return -1; } - public void updateNextOffset(MessageQueue messageQueue, long offset) { + public void updatePullOffset(MessageQueue messageQueue, long offset) { MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); if (messageQueueStat != null) { - messageQueueStat.setNextOffset(offset); + messageQueueStat.setPullOffset(offset); } } @@ -119,12 +122,21 @@ public class AssignedMessageQueue { return -1; } + public CountDownLatch2 getPausedLatch(MessageQueue messageQueue) { + MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); + if (messageQueueStat != null) { + return messageQueueStat.getPausedLatch(); + } + return null; + } + public void updateAssignedMessageQueue(Collection assigned) { synchronized (this.assignedMessageQueueState) { Iterator> it = this.assignedMessageQueueState.entrySet().iterator(); while (it.hasNext()) { Map.Entry next = it.next(); if (!assigned.contains(next.getKey())) { + next.getValue().getProcessQueue().setDropped(true); it.remove(); } } @@ -159,10 +171,11 @@ public class AssignedMessageQueue { public class MessageQueueStat { private MessageQueue messageQueue; private ProcessQueue processQueue; - private boolean paused = false; - private long nextOffset = -1; - private long consumeOffset = -1; + private volatile boolean paused = false; + private volatile long pullOffset = -1; + private volatile long consumeOffset = -1; private volatile long seekOffset = -1; + private CountDownLatch2 pausedLatch = new CountDownLatch2(1); public MessageQueueStat(MessageQueue messageQueue, ProcessQueue processQueue) { this.messageQueue = messageQueue; @@ -185,12 +198,12 @@ public class AssignedMessageQueue { this.paused = paused; } - public long getNextOffset() { - return nextOffset; + public long getPullOffset() { + return pullOffset; } - public void setNextOffset(long nextOffset) { - this.nextOffset = nextOffset; + public void setPullOffset(long pullOffset) { + this.pullOffset = pullOffset; } public ProcessQueue getProcessQueue() { @@ -216,5 +229,9 @@ public class AssignedMessageQueue { public void setSeekOffset(long seekOffset) { this.seekOffset = seekOffset; } + + public CountDownLatch2 getPausedLatch() { + return pausedLatch; + } } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 95e218f0dd43223ed7f504b43152363933b9a6ee..74cf644b98aaa112042dcd8e5ad5cae1b288a8d3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -16,27 +16,22 @@ */ package org.apache.rocketmq.client.impl.consumer; +import java.util.Collections; import java.util.List; import java.util.ArrayList; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.Collection; -import java.util.Collections; -import java.util.TreeMap; import java.util.HashSet; import java.util.Properties; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReadWriteLock; -import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; @@ -56,6 +51,7 @@ import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.CountDownLatch2; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.ThreadFactoryImpl; @@ -65,7 +61,11 @@ import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.help.FAQUrl; -import org.apache.rocketmq.common.message.*; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; @@ -150,7 +150,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } private void checkServiceState() { - if (!(this.serviceState == ServiceState.RUNNING)) + if (this.serviceState != ServiceState.RUNNING) throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE); } @@ -347,6 +347,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } + + } + + public PullAPIWrapper getPullAPIWrapper() { + return pullAPIWrapper; } private void copySubscription() throws MQClientException { @@ -440,16 +445,24 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public List poll(long timeout) { try { checkServiceState(); + if (timeout < 0) + throw new IllegalArgumentException("Timeout must not be negative"); + if (defaultLitePullConsumer.isAutoCommit()) { maybeAutoCommit(); } long endTime = System.currentTimeMillis() + timeout; + ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); - while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) { - consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); - if ((endTime - System.currentTimeMillis()) <= 0) - break; + + if (endTime - System.currentTimeMillis() > 0) { + while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) { + consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + if (endTime - System.currentTimeMillis() <= 0) + break; + } } + if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) { List messages = consumeRequest.getMessageExts(); long offset = consumeRequest.getProcessQueue().removeMessage(messages); @@ -471,14 +484,33 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } public synchronized void seek(MessageQueue messageQueue, long offset) throws MQClientException { - if (offset < minOffset(messageQueue) || offset > maxOffset(messageQueue)) - throw new MQClientException("Seek offset illegal", null); + if (!assignedMessageQueue.messageQueues().contains(messageQueue)) + throw new MQClientException("The message queue is not in assigned list, message queue: " + messageQueue, null); + long minOffset = minOffset(messageQueue); + long maxOffset = maxOffset(messageQueue); + if (offset < minOffset || offset > maxOffset) + throw new MQClientException("Seek offset illegal, seek offset = " + offset + ", min offset = " + minOffset + ", max offset = " + maxOffset, null); try { + assignedMessageQueue.pause(Collections.singletonList(messageQueue)); + CountDownLatch2 pausedLatch = assignedMessageQueue.getPausedLatch(messageQueue); + if (pausedLatch != null) { + pausedLatch.await(2, TimeUnit.SECONDS); + } + ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); + if (processQueue != null) { + processQueue.clear(); + } + Iterator iter = consumeRequestCache.iterator(); + while (iter.hasNext()) { + if (iter.next().getMessageQueue().equals(messageQueue)) + iter.remove(); + } assignedMessageQueue.setSeekOffset(messageQueue, offset); - updateConsumeOffset(messageQueue, offset); - updateConsumeOffsetToBroker(messageQueue, offset, false); + assignedMessageQueue.updateConsumeOffset(messageQueue, offset); } catch (Exception e) { log.error("Seek offset failed.", e); + } finally { + assignedMessageQueue.resume(Collections.singletonList(messageQueue)); } } @@ -545,7 +577,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { private void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) { if (assignedMessageQueue.getSeekOffset(remoteQueue) == -1) { - assignedMessageQueue.updateNextOffset(remoteQueue, nextPullOffset); + assignedMessageQueue.updatePullOffset(remoteQueue, nextPullOffset); } } @@ -568,12 +600,12 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { if (seekOffset != -1) { offset = seekOffset; assignedMessageQueue.setSeekOffset(remoteQueue, -1); - assignedMessageQueue.updateNextOffset(remoteQueue,offset); + assignedMessageQueue.updatePullOffset(remoteQueue, offset); } else { - offset = assignedMessageQueue.getNextOffset(remoteQueue); + offset = assignedMessageQueue.getPullOffset(remoteQueue); if (offset == -1) { offset = fetchConsumeOffset(remoteQueue, false); - assignedMessageQueue.updateNextOffset(remoteQueue, offset); + assignedMessageQueue.updatePullOffset(remoteQueue, offset); assignedMessageQueue.updateConsumeOffset(remoteQueue, offset); } } @@ -596,78 +628,82 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { @Override public void run() { - ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); - if (processQueue == null && processQueue.isDropped()) { - log.info("the message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue); - return; - } + if (!this.isCancelled()) { - if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchNums() > defaultLitePullConsumer.getPullThresholdForAll()) { - scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); - if ((consumeRequestFlowControlTimes++ % 1000) == 0) - log.warn("the consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes); - return; - } + if (assignedMessageQueue.isPaused(messageQueue)) { + CountDownLatch2 pasuedLatch = assignedMessageQueue.getPausedLatch(messageQueue); + if (pasuedLatch != null) + pasuedLatch.countDown(); + scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS); + log.debug("Message Queue: {} has been paused!", messageQueue); + return; + } - long cachedMessageCount = processQueue.getMsgCount().get(); - long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); + ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); - if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) { - scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); - if ((queueFlowControlTimes++ % 1000) == 0) { - log.warn( - "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", - defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes); + if (processQueue == null && processQueue.isDropped()) { + log.info("the message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue); + return; } - return; - } - if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) { - scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); - if ((queueFlowControlTimes++ % 1000) == 0) { - log.warn( - "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", - defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes); + if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchNums() > defaultLitePullConsumer.getPullThresholdForAll()) { + scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); + if ((consumeRequestFlowControlTimes++ % 1000) == 0) + log.warn("the consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes); + return; } - return; - } - if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) { - scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); - if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { - log.warn( - "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}", - processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes); + long cachedMessageCount = processQueue.getMsgCount().get(); + long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); + + if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) { + scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); + if ((queueFlowControlTimes++ % 1000) == 0) { + log.warn( + "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", + defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes); + } + return; } - return; - } - if (!this.isCancelled()) { - if (assignedMessageQueue.isPaused(messageQueue)) { - scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS); - log.debug("Message Queue: {} has been paused!", messageQueue); + if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) { + scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); + if ((queueFlowControlTimes++ % 1000) == 0) { + log.warn( + "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", + defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes); + } return; } + + if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) { + scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); + if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { + log.warn( + "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}", + processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes); + } + return; + } + String subExpression = null; if (subscriptionType == SubscriptionType.SUBSCRIBE) { String topic = this.messageQueue.getTopic(); subExpression = rebalanceImpl.getSubscriptionInner().get(topic).getSubString(); } long offset = nextPullOffset(messageQueue); - long pullDelayTimeMills = 0; + long pullDelayTimeMills = defaultLitePullConsumer.getPullDelayTimeMills(); try { PullResult pullResult = pull(messageQueue, subExpression, offset, nextPullBatchNums()); switch (pullResult.getPullStatus()) { case FOUND: - processQueue.putMessage(pullResult.getMsgFoundList()); - submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue)); - pullDelayTimeMills = 0; + if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty()) { + processQueue.putMessage(pullResult.getMsgFoundList()); + submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue)); + } break; - case NO_NEW_MSG: - pullDelayTimeMills = 100; case OFFSET_ILLEGAL: - //TODO log.warn("the pull request offset illegal, {}", pullResult.toString()); break; default: @@ -1037,7 +1073,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { private final List messageExts; private final MessageQueue messageQueue; private final ProcessQueue processQueue; - private long startConsumeTimeMillis; public ConsumeRequest(final List messageExts, final MessageQueue messageQueue, final ProcessQueue processQueue) { @@ -1058,12 +1093,5 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { return processQueue; } - public long getStartConsumeTimeMillis() { - return startConsumeTimeMillis; - } - - public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) { - this.startConsumeTimeMillis = startConsumeTimeMillis; - } } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java index 8148c7da5a02adbba5dd65b970856b7e700b8a2a..0b8ec67778f2d86034c3573a4b69de417f89c539 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.client.impl.consumer; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; @@ -10,7 +26,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import java.util.List; import java.util.Set; -public class RebalanceLitePullImpl extends RebalanceImpl { +public class RebalanceLitePullImpl extends RebalanceImpl { private final DefaultLitePullConsumerImpl litePullConsumerImpl; @@ -19,8 +35,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl { } public RebalanceLitePullImpl(String consumerGroup, MessageModel messageModel, - AllocateMessageQueueStrategy allocateMessageQueueStrategy, - MQClientInstance mQClientFactory, DefaultLitePullConsumerImpl litePullConsumerImpl) { + AllocateMessageQueueStrategy allocateMessageQueueStrategy, + MQClientInstance mQClientFactory, DefaultLitePullConsumerImpl litePullConsumerImpl) { super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory); this.litePullConsumerImpl = litePullConsumerImpl; } @@ -37,7 +53,6 @@ public class RebalanceLitePullImpl extends RebalanceImpl { } } - @Override public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { this.litePullConsumerImpl.getOffsetStore().persist(mq); @@ -64,5 +79,4 @@ public class RebalanceLitePullImpl extends RebalanceImpl { public void dispatchPullRequest(List pullRequestList) { } - } diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..68144c785ea5cbe5dfe37e85a2594c61b524728f --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.consumer; + +import java.io.ByteArrayOutputStream; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.consumer.store.OffsetStore; +import org.apache.rocketmq.client.consumer.store.ReadOffsetType; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.impl.FindBrokerResult; +import org.apache.rocketmq.client.impl.MQAdminImpl; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; +import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper; +import org.apache.rocketmq.client.impl.consumer.PullResultExt; +import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; +import org.apache.rocketmq.client.impl.consumer.RebalanceService; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.message.MessageClientExt; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class DefaultLitePullConsumerTest { + @Spy + private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + + @Mock + private MQClientAPIImpl mQClientAPIImpl; + @Mock + private MQAdminImpl mQAdminImpl; + + private RebalanceImpl rebalanceImpl; + private OffsetStore offsetStore; + private DefaultLitePullConsumer litePullConsumer; + private DefaultLitePullConsumerImpl litePullConsumerImpl; + private String consumerGroup = "LitePullConsumerGroup"; + private String topic = "LitePullConsumerTest"; + private String brokerName = "BrokerA"; + + @Before + public void init() throws Exception { + String groupName = consumerGroup + System.currentTimeMillis(); + litePullConsumer = new DefaultLitePullConsumer(groupName); + litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); + + Field field = MQClientInstance.class.getDeclaredField("rebalanceService"); + field.setAccessible(true); + RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory); + field = RebalanceService.class.getDeclaredField("waitInterval"); + field.setAccessible(true); + field.set(rebalanceService, 100); + + litePullConsumer.start(); + + field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl"); + field.setAccessible(true); + litePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer); + field = DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(litePullConsumerImpl, mQClientFactory); + + PullAPIWrapper pullAPIWrapper = litePullConsumerImpl.getPullAPIWrapper(); + field = PullAPIWrapper.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(pullAPIWrapper, mQClientFactory); + + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mQClientFactory, mQClientAPIImpl); + + field = MQClientInstance.class.getDeclaredField("mQAdminImpl"); + field.setAccessible(true); + field.set(mQClientFactory, mQAdminImpl); + + field = DefaultLitePullConsumerImpl.class.getDeclaredField("rebalanceImpl"); + field.setAccessible(true); + rebalanceImpl = (RebalanceImpl) field.get(litePullConsumerImpl); + field = RebalanceImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(rebalanceImpl, mQClientFactory); + + offsetStore = spy(litePullConsumerImpl.getOffsetStore()); + field = DefaultLitePullConsumerImpl.class.getDeclaredField("offsetStore"); + field.setAccessible(true); + field.set(litePullConsumerImpl, offsetStore); + + when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), + anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) + .thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { + PullMessageRequestHeader requestHeader = mock.getArgument(1); + MessageClientExt messageClientExt = new MessageClientExt(); + messageClientExt.setTopic(topic); + messageClientExt.setQueueId(0); + messageClientExt.setMsgId("123"); + messageClientExt.setBody(new byte[] {'a'}); + messageClientExt.setOffsetMsgId("234"); + messageClientExt.setBornHost(new InetSocketAddress(8080)); + messageClientExt.setStoreHost(new InetSocketAddress(8080)); + PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(messageClientExt)); + return pullResult; + } + }); + + when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false)); + + doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString()); + + doReturn(123L).when(offsetStore).readOffset(any(MessageQueue.class), any(ReadOffsetType.class)); + } + + @After + public void terminate() { + litePullConsumer.shutdown(); + } + + @Test + public void testAssign_PollMessageSuccess() { + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.setPullDelayTimeMills(60 * 1000); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + List result = litePullConsumer.poll(); + assertThat(result.get(0).getTopic()).isEqualTo(topic); + assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'}); + } + + @Test + public void testSubscribe_PollMessageSuccess() throws MQClientException { + litePullConsumer.setPullDelayTimeMills(60 * 1000); + litePullConsumer.subscribe(topic, "*"); + Set messageQueueSet = new HashSet(); + messageQueueSet.add(createMessageQueue()); + litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet); + litePullConsumer.setPollTimeoutMillis(20 * 1000); + List result = litePullConsumer.poll(); + assertThat(result.get(0).getTopic()).isEqualTo(topic); + assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'}); + } + + @Test + public void testSubscriptionType_AssignAndSubscribeExclusive() throws MQClientException { + try { + litePullConsumer.subscribe(topic, "*"); + litePullConsumer.assign(Collections.singletonList(createMessageQueue())); + failBecauseExceptionWasNotThrown(IllegalStateException.class); + } catch (IllegalStateException e) { + assertThat(e).hasMessageContaining("Cannot select two subscription types at the same time."); + } + } + + @Test + public void testFetchMesseageQueues_FetchMessageQueuesBeforeStart() throws MQClientException { + try { + DefaultLitePullConsumer litePullConsumer = createLitePullConsumer(); + litePullConsumer.fetchMessageQueues(topic); + failBecauseExceptionWasNotThrown(IllegalStateException.class); + } catch (IllegalStateException e) { + assertThat(e).hasMessageContaining("The consumer not running."); + } + } + + @Test + public void testSeek_SeekOffsetIllegal() throws MQClientException { + when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L); + when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L); + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + try { + litePullConsumer.seek(messageQueue, -1); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("min offset = 0"); + } + + try { + litePullConsumer.seek(messageQueue, 1000); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("max offset = 100"); + } + } + + @Test + public void testSeek_MessageQueueNotInAssignList() { + try { + litePullConsumer.seek(createMessageQueue(), 0); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("The message queue is not in assigned list"); + } + } + + private MessageQueue createMessageQueue() { + MessageQueue messageQueue = new MessageQueue(); + messageQueue.setBrokerName(brokerName); + messageQueue.setQueueId(0); + messageQueue.setTopic(topic); + return messageQueue; + } + + private DefaultLitePullConsumer createLitePullConsumer() { + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); + return litePullConsumer; + } + + private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, + List messageExtList) throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + for (MessageExt messageExt : messageExtList) { + outputStream.write(MessageDecoder.encode(messageExt, false)); + } + return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray()); + } +} diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java index 488a499de17b6cf275bed16bb2ea2e26bce8c775..0430465cf59e87d087c34386ee26e22b24be430a 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java @@ -16,23 +16,33 @@ */ package org.apache.rocketmq.example.simple; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.common.message.MessageExt; - +import org.apache.rocketmq.common.message.MessageQueue; public class LitePullConsumerTest { public static void main(String[] args) throws Exception { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("test"); - litePullConsumer.setNamesrvAddr("localhost:9876"); - litePullConsumer.setAutoCommit(true); - litePullConsumer.subscribe("test41","TagA" ); + litePullConsumer.setAutoCommit(false); litePullConsumer.start(); + Collection mqSet = litePullConsumer.fetchMessageQueues("test400"); + List list = new ArrayList<>(mqSet); + Collection assginMq = Collections.singletonList(list.get(0)); + litePullConsumer.assign(assginMq); + int size = 0; + litePullConsumer.seek(list.get(0), 26); - int i = 0; while (true) { List messageExts = litePullConsumer.poll(); - System.out.printf("%s%n", messageExts); + if (messageExts != null) { + size += messageExts.size(); + } + litePullConsumer.commitSync(); + System.out.printf("%s %d %n", messageExts, size); } }