diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 52060ea7535c9e961696c61e64115ef57baf900d..782d29b6c023a595d8ae0f2d85995b40476b0c11 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -24,6 +24,8 @@ import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
@@ -138,6 +140,14 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
*/
private long topicMetadataCheckIntervalMillis = 30 * 1000;
+ private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
+
+ /**
+ * Backtracking consumption time with second precision. Time format is 20131223171201
Implying Seventeen twelve
+ * and 01 seconds on December 23, 2013 year
Default backtracking consumption time Half an hour ago.
+ */
+ private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
+
/**
* Default constructor.
*/
@@ -431,4 +441,25 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
+
+ public ConsumeFromWhere getConsumeFromWhere() {
+ return consumeFromWhere;
+ }
+
+ public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+ if (consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
+ && consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
+ && consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) {
+ throw new RuntimeException("Invalid ConsumeFromWhere Value", null);
+ }
+ this.consumeFromWhere = consumeFromWhere;
+ }
+
+ public String getConsumeTimestamp() {
+ return consumeTimestamp;
+ }
+
+ public void setConsumeTimestamp(String consumeTimestamp) {
+ this.consumeTimestamp = consumeTimestamp;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index c0c6f6030c2599457aef833f389e62a22fcee296..0b090e3df30099f9b59f7655216a2041a630ea15 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -90,7 +90,7 @@ public class AssignedMessageQueue {
}
}
- public long getConusmerOffset(MessageQueue messageQueue) {
+ public long getConsumerOffset(MessageQueue messageQueue) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
return messageQueueState.getConsumeOffset();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index f44eea7f9a014cf103dc4d43a81a2003b9cdfdbd..cd4d4cfc3a7051abe0d4833e14af9549ab6778c5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -597,7 +597,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized void commitSync() {
try {
for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
- long consumerOffset = assignedMessageQueue.getConusmerOffset(messageQueue);
+ long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue);
if (consumerOffset != -1) {
ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
@@ -618,7 +618,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private synchronized void commitAll() {
try {
for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
- long consumerOffset = assignedMessageQueue.getConusmerOffset(messageQueue);
+ long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue);
if (consumerOffset != -1) {
ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
@@ -650,9 +650,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
- private long fetchConsumeOffset(MessageQueue messageQueue, boolean fromStore) {
+ private long fetchConsumeOffset(MessageQueue messageQueue) {
checkServiceState();
- return this.offsetStore.readOffset(messageQueue, fromStore ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE);
+ long offset = this.rebalanceImpl.computePullFromWhere(messageQueue);
+ return offset;
}
public long committed(MessageQueue messageQueue) throws MQClientException {
@@ -685,10 +686,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} else {
offset = assignedMessageQueue.getPullOffset(messageQueue);
if (offset == -1) {
- offset = fetchConsumeOffset(messageQueue, false);
- if (offset == -1 && defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) {
- offset = 0;
- }
+ offset = fetchConsumeOffset(messageQueue);
}
}
return offset;
@@ -779,7 +777,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
PullResult pullResult = pull(messageQueue, subscriptionData, offset, nextPullBatchSize());
-
switch (pullResult.getPullStatus()) {
case FOUND:
final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
@@ -850,7 +847,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
throw new MQClientException("maxNums <= 0", null);
}
- int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
+ int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false, true);
long timeoutMillis = block ? this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 146fce6e1e3b3c7c311074efdca9d718a5a261f3..b8972a92e8fdb3402b92c7c1b1a8015cb38a127b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -40,11 +40,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-/**
- * This class will be removed in 2022, and a better implementation {@link RebalanceLitePullImpl} is recommend to use
- * in the scenario of actively pulling messages.
- */
-@Deprecated
public abstract class RebalanceImpl {
protected static final InternalLogger log = ClientLogger.getLog();
protected final ConcurrentMap processQueueTable = new ConcurrentHashMap(64);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
index 0b8ec67778f2d86034c3573a4b69de417f89c539..9d1ea7492eea2ffc662defea11f4fe5d1bf84f38 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
@@ -16,16 +16,20 @@
*/
package org.apache.rocketmq.client.impl.consumer;
+import java.util.List;
+import java.util.Set;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import java.util.List;
-import java.util.Set;
-
public class RebalanceLitePullImpl extends RebalanceImpl {
private final DefaultLitePullConsumerImpl litePullConsumerImpl;
@@ -72,7 +76,66 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
@Override
public long computePullFromWhere(MessageQueue mq) {
- return 0;
+ ConsumeFromWhere consumeFromWhere = litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeFromWhere();
+ long result = -1;
+ switch (consumeFromWhere) {
+ case CONSUME_FROM_LAST_OFFSET: {
+ long lastOffset = litePullConsumerImpl.getOffsetStore().readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
+ if (lastOffset >= 0) {
+ result = lastOffset;
+ } else if (-1 == lastOffset) {
+ if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // First start, no offset
+ result = 0L;
+ } else {
+ try {
+ result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+ } catch (MQClientException e) {
+ result = -1;
+ }
+ }
+ } else {
+ result = -1;
+ }
+ break;
+ }
+ case CONSUME_FROM_FIRST_OFFSET: {
+ long lastOffset = litePullConsumerImpl.getOffsetStore().readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
+ if (lastOffset >= 0) {
+ result = lastOffset;
+ } else if (-1 == lastOffset) {
+ result = 0L;
+ } else {
+ result = -1;
+ }
+ break;
+ }
+ case CONSUME_FROM_TIMESTAMP: {
+ long lastOffset = litePullConsumerImpl.getOffsetStore().readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
+ if (lastOffset >= 0) {
+ result = lastOffset;
+ } else if (-1 == lastOffset) {
+ if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ try {
+ result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+ } catch (MQClientException e) {
+ result = -1;
+ }
+ } else {
+ try {
+ long timestamp = UtilAll.parseDate(this.litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeTimestamp(),
+ UtilAll.YYYYMMDDHHMMSS).getTime();
+ result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
+ } catch (MQClientException e) {
+ result = -1;
+ }
+ }
+ } else {
+ result = -1;
+ }
+ break;
+ }
+ }
+ return result;
}
@Override
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index d2cb0571ed9ed58f5e0087025cc461d24ea1d296..cc8d5e2bf78a8ba0c840c1ec4e1045b89f931009 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -41,6 +41,7 @@ import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.consumer.RebalanceService;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
@@ -419,6 +420,50 @@ public class DefaultLitePullConsumerTest {
}
+ @Test
+ public void testComputePullFromWhereReturnedNotFound() throws Exception{
+ DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
+ defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ MessageQueue messageQueue = createMessageQueue();
+ when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
+ long offset = rebalanceImpl.computePullFromWhere(messageQueue);
+ assertThat(offset).isEqualTo(0);
+ }
+
+ @Test
+ public void testComputePullFromWhereReturned() throws Exception{
+ DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
+ defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ MessageQueue messageQueue = createMessageQueue();
+ when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(100L);
+ long offset = rebalanceImpl.computePullFromWhere(messageQueue);
+ assertThat(offset).isEqualTo(100);
+ }
+
+
+ @Test
+ public void testComputePullFromLast() throws Exception{
+ DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
+ defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+ MessageQueue messageQueue = createMessageQueue();
+ when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
+ when(mQClientFactory.getMQAdminImpl().maxOffset(any(MessageQueue.class))).thenReturn(100L);
+ long offset = rebalanceImpl.computePullFromWhere(messageQueue);
+ assertThat(offset).isEqualTo(100);
+ }
+
+ @Test
+ public void testComputePullByTimeStamp() throws Exception{
+ DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
+ defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
+ defaultLitePullConsumer.setConsumeTimestamp("20191024171201");
+ MessageQueue messageQueue = createMessageQueue();
+ when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
+ when(mQClientFactory.getMQAdminImpl().searchOffset(any(MessageQueue.class),anyLong())).thenReturn(100L);
+ long offset = rebalanceImpl.computePullFromWhere(messageQueue);
+ assertThat(offset).isEqualTo(100);
+ }
+
private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception {
Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
index d476a35b749348b4b279003e252b1e83358c09ad..ce7558f2bcffd66b02b2b15b8a1e0a3306bce459 100644
--- a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
@@ -21,6 +21,7 @@ public class PullSysFlag {
private final static int FLAG_SUSPEND = 0x1 << 1;
private final static int FLAG_SUBSCRIPTION = 0x1 << 2;
private final static int FLAG_CLASS_FILTER = 0x1 << 3;
+ private final static int FLAG_LITE_PULL_MESSAGE = 0x1 << 4;
public static int buildSysFlag(final boolean commitOffset, final boolean suspend,
final boolean subscription, final boolean classFilter) {
@@ -45,6 +46,17 @@ public class PullSysFlag {
return flag;
}
+ public static int buildSysFlag(final boolean commitOffset, final boolean suspend,
+ final boolean subscription, final boolean classFilter, final boolean litePull) {
+ int flag = buildSysFlag(commitOffset, suspend, subscription, classFilter);
+
+ if (litePull) {
+ flag |= FLAG_LITE_PULL_MESSAGE;
+ }
+
+ return flag;
+ }
+
public static int clearCommitOffsetFlag(final int sysFlag) {
return sysFlag & (~FLAG_COMMIT_OFFSET);
}
@@ -64,4 +76,8 @@ public class PullSysFlag {
public static boolean hasClassFilterFlag(final int sysFlag) {
return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER;
}
+
+ public static boolean hasLitePullFlag(final int sysFlag) {
+ return (sysFlag & FLAG_LITE_PULL_MESSAGE) == FLAG_LITE_PULL_MESSAGE;
+ }
}
diff --git a/common/src/test/java/org/apache/rocketmq/common/sysflag/PullSysFlagTest.java b/common/src/test/java/org/apache/rocketmq/common/sysflag/PullSysFlagTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..60e18123c47ceae127e44c77d697cb7f822e6c2f
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/sysflag/PullSysFlagTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.sysflag;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class PullSysFlagTest {
+
+ @Test
+ public void testLitePullFlag() {
+ int flag = PullSysFlag.buildSysFlag(false, false, false, false, true);
+ assertThat(PullSysFlag.hasLitePullFlag(flag)).isTrue();
+ }
+
+ @Test
+ public void testLitePullFlagFalse() {
+ int flag = PullSysFlag.buildSysFlag(false, false, false, false, false);
+ assertThat(PullSysFlag.hasLitePullFlag(flag)).isFalse();
+ }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
index 1bfe49d7365e0460e3ef4bc3b290596a1f66867b..e5c1a6134b4631e1e4dfb1dadf5ecccf1c530fd5 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.example.simple;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class LitePullConsumerSubscribe {
@@ -25,7 +26,8 @@ public class LitePullConsumerSubscribe {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
- DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
+ DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
+ litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
try {