未验证 提交 f1b856a0 编写于 作者: H Heng Du 提交者: GitHub

feat(pullconsumer) add pull sys flag (#1658)

feat(lite_pull_consumer) add support for consume from where

test(pull_consumer) add unit test for compute from where

test(PullSysFlag) add lite pull sys flag unit test

feat(pull_consumer) add the verification of ConsumerFromWhere value
上级 f8f6fbe4
......@@ -24,6 +24,8 @@ import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
......@@ -138,6 +140,14 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
*/
private long topicMetadataCheckIntervalMillis = 30 * 1000;
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
/**
* Backtracking consumption time with second precision. Time format is 20131223171201<br> Implying Seventeen twelve
* and 01 seconds on December 23, 2013 year<br> Default backtracking consumption time Half an hour ago.
*/
private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
/**
* Default constructor.
*/
......@@ -431,4 +441,25 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public ConsumeFromWhere getConsumeFromWhere() {
return consumeFromWhere;
}
public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
if (consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
&& consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
&& consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) {
throw new RuntimeException("Invalid ConsumeFromWhere Value", null);
}
this.consumeFromWhere = consumeFromWhere;
}
public String getConsumeTimestamp() {
return consumeTimestamp;
}
public void setConsumeTimestamp(String consumeTimestamp) {
this.consumeTimestamp = consumeTimestamp;
}
}
......@@ -90,7 +90,7 @@ public class AssignedMessageQueue {
}
}
public long getConusmerOffset(MessageQueue messageQueue) {
public long getConsumerOffset(MessageQueue messageQueue) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
return messageQueueState.getConsumeOffset();
......
......@@ -597,7 +597,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized void commitSync() {
try {
for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
long consumerOffset = assignedMessageQueue.getConusmerOffset(messageQueue);
long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue);
if (consumerOffset != -1) {
ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
......@@ -618,7 +618,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private synchronized void commitAll() {
try {
for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
long consumerOffset = assignedMessageQueue.getConusmerOffset(messageQueue);
long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue);
if (consumerOffset != -1) {
ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
......@@ -650,9 +650,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
private long fetchConsumeOffset(MessageQueue messageQueue, boolean fromStore) {
private long fetchConsumeOffset(MessageQueue messageQueue) {
checkServiceState();
return this.offsetStore.readOffset(messageQueue, fromStore ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE);
long offset = this.rebalanceImpl.computePullFromWhere(messageQueue);
return offset;
}
public long committed(MessageQueue messageQueue) throws MQClientException {
......@@ -685,10 +686,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} else {
offset = assignedMessageQueue.getPullOffset(messageQueue);
if (offset == -1) {
offset = fetchConsumeOffset(messageQueue, false);
if (offset == -1 && defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) {
offset = 0;
}
offset = fetchConsumeOffset(messageQueue);
}
}
return offset;
......@@ -779,7 +777,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
PullResult pullResult = pull(messageQueue, subscriptionData, offset, nextPullBatchSize());
switch (pullResult.getPullStatus()) {
case FOUND:
final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
......@@ -850,7 +847,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
throw new MQClientException("maxNums <= 0", null);
}
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false, true);
long timeoutMillis = block ? this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
......
......@@ -40,11 +40,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
/**
* This class will be removed in 2022, and a better implementation {@link RebalanceLitePullImpl} is recommend to use
* in the scenario of actively pulling messages.
*/
@Deprecated
public abstract class RebalanceImpl {
protected static final InternalLogger log = ClientLogger.getLog();
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
......
......@@ -16,16 +16,20 @@
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
import java.util.Set;
public class RebalanceLitePullImpl extends RebalanceImpl {
private final DefaultLitePullConsumerImpl litePullConsumerImpl;
......@@ -72,7 +76,66 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
@Override
public long computePullFromWhere(MessageQueue mq) {
return 0;
ConsumeFromWhere consumeFromWhere = litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeFromWhere();
long result = -1;
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET: {
long lastOffset = litePullConsumerImpl.getOffsetStore().readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // First start, no offset
result = 0L;
} else {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
case CONSUME_FROM_FIRST_OFFSET: {
long lastOffset = litePullConsumerImpl.getOffsetStore().readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
result = 0L;
} else {
result = -1;
}
break;
}
case CONSUME_FROM_TIMESTAMP: {
long lastOffset = litePullConsumerImpl.getOffsetStore().readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
} else {
try {
long timestamp = UtilAll.parseDate(this.litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeTimestamp(),
UtilAll.YYYYMMDDHHMMSS).getTime();
result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
}
return result;
}
@Override
......
......@@ -41,6 +41,7 @@ import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.consumer.RebalanceService;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
......@@ -419,6 +420,50 @@ public class DefaultLitePullConsumerTest {
}
@Test
public void testComputePullFromWhereReturnedNotFound() throws Exception{
DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
MessageQueue messageQueue = createMessageQueue();
when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
long offset = rebalanceImpl.computePullFromWhere(messageQueue);
assertThat(offset).isEqualTo(0);
}
@Test
public void testComputePullFromWhereReturned() throws Exception{
DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
MessageQueue messageQueue = createMessageQueue();
when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(100L);
long offset = rebalanceImpl.computePullFromWhere(messageQueue);
assertThat(offset).isEqualTo(100);
}
@Test
public void testComputePullFromLast() throws Exception{
DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
MessageQueue messageQueue = createMessageQueue();
when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
when(mQClientFactory.getMQAdminImpl().maxOffset(any(MessageQueue.class))).thenReturn(100L);
long offset = rebalanceImpl.computePullFromWhere(messageQueue);
assertThat(offset).isEqualTo(100);
}
@Test
public void testComputePullByTimeStamp() throws Exception{
DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
defaultLitePullConsumer.setConsumeTimestamp("20191024171201");
MessageQueue messageQueue = createMessageQueue();
when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
when(mQClientFactory.getMQAdminImpl().searchOffset(any(MessageQueue.class),anyLong())).thenReturn(100L);
long offset = rebalanceImpl.computePullFromWhere(messageQueue);
assertThat(offset).isEqualTo(100);
}
private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception {
Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
......
......@@ -21,6 +21,7 @@ public class PullSysFlag {
private final static int FLAG_SUSPEND = 0x1 << 1;
private final static int FLAG_SUBSCRIPTION = 0x1 << 2;
private final static int FLAG_CLASS_FILTER = 0x1 << 3;
private final static int FLAG_LITE_PULL_MESSAGE = 0x1 << 4;
public static int buildSysFlag(final boolean commitOffset, final boolean suspend,
final boolean subscription, final boolean classFilter) {
......@@ -45,6 +46,17 @@ public class PullSysFlag {
return flag;
}
public static int buildSysFlag(final boolean commitOffset, final boolean suspend,
final boolean subscription, final boolean classFilter, final boolean litePull) {
int flag = buildSysFlag(commitOffset, suspend, subscription, classFilter);
if (litePull) {
flag |= FLAG_LITE_PULL_MESSAGE;
}
return flag;
}
public static int clearCommitOffsetFlag(final int sysFlag) {
return sysFlag & (~FLAG_COMMIT_OFFSET);
}
......@@ -64,4 +76,8 @@ public class PullSysFlag {
public static boolean hasClassFilterFlag(final int sysFlag) {
return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER;
}
public static boolean hasLitePullFlag(final int sysFlag) {
return (sysFlag & FLAG_LITE_PULL_MESSAGE) == FLAG_LITE_PULL_MESSAGE;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.sysflag;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class PullSysFlagTest {
@Test
public void testLitePullFlag() {
int flag = PullSysFlag.buildSysFlag(false, false, false, false, true);
assertThat(PullSysFlag.hasLitePullFlag(flag)).isTrue();
}
@Test
public void testLitePullFlagFalse() {
int flag = PullSysFlag.buildSysFlag(false, false, false, false, false);
assertThat(PullSysFlag.hasLitePullFlag(flag)).isFalse();
}
}
......@@ -18,6 +18,7 @@ package org.apache.rocketmq.example.simple;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class LitePullConsumerSubscribe {
......@@ -25,7 +26,8 @@ public class LitePullConsumerSubscribe {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
try {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册