提交 46288abb 编写于 作者: K King 提交者: Heng Du

Polish lite pull consumer and fix bug (#1373)

* fix unsubscribe code

* fix commit consumed offset

* fix commit consumed offset

* fix commit consumed offset

* fix commit consumed offset

* polish commit consumed offset

* pass checkstyle

* pass checkstyle

* polish LiteMQPullConsumer

* add flow control and polish commit logic

* fix bug

* polish code

* fix commit consumed offset back

* refactor litePullConsumer

* development save

* development save

* Refactor DefaultLitePullConsumer and DefaultLitePullConsumerImpl.

* Polish lite pull consumer

* polish lite pull consumer

* polish lite pull consumer

* fix seek

* fix seek function

* polish lite pull consumer

* add apache header

* add test

* polish test

* Make broadcast model work for litePullConsumer

* Revert example/broadcast/PushConsumer.java

* Add delay time when no new message

* Enable long polling mode

* Fix subscribe bug when rebalance

* Delete useless consumeMessageHook
上级 9c3b26cf
......@@ -17,9 +17,7 @@
package org.apache.rocketmq.client.consumer;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
......@@ -69,10 +67,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
* Offset Storage
*/
private OffsetStore offsetStore;
/**
* Topic set you want to register
*/
private Set<String> registerTopics = new HashSet<String>();
/**
* Queue allocation algorithm
*/
......@@ -372,14 +367,6 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
this.messageQueueListener = messageQueueListener;
}
public Set<String> getRegisterTopics() {
return registerTopics;
}
public void setRegisterTopics(Set<String> registerTopics) {
this.registerTopics = withNamespace(registerTopics);
}
public long getConsumerPullTimeoutMillis() {
return consumerPullTimeoutMillis;
}
......
......@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.impl.consumer;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.message.MessageQueue;
......@@ -37,7 +38,7 @@ public class AssignedMessageQueue {
this.rebalanceImpl = rebalanceImpl;
}
public Collection<MessageQueue> messageQueues() {
public Set<MessageQueue> messageQueues() {
return assignedMessageQueueState.keySet();
}
......@@ -130,6 +131,23 @@ public class AssignedMessageQueue {
return null;
}
public void updateAssignedMessageQueue(String topic, Collection<MessageQueue> assigned) {
synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, MessageQueueStat> next = it.next();
if (next.getKey().getTopic().equals(topic)) {
if (!assigned.contains(next.getKey())) {
System.out.printf("MessageQueue-%s is removed %n", next.getKey());
next.getValue().getProcessQueue().setDropped(true);
it.remove();
}
}
}
addAssignedMessageQueue(assigned);
}
}
public void updateAssignedMessageQueue(Collection<MessageQueue> assigned) {
synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = this.assignedMessageQueueState.entrySet().iterator();
......@@ -140,12 +158,17 @@ public class AssignedMessageQueue {
it.remove();
}
}
addAssignedMessageQueue(assigned);
}
}
private void addAssignedMessageQueue(Collection<MessageQueue> assigned) {
for (MessageQueue messageQueue : assigned) {
if (!this.assignedMessageQueueState.containsKey(messageQueue)) {
MessageQueueStat messageQueueStat;
if (rebalanceImpl != null && rebalanceImpl.processQueueTable.get(messageQueue) != null) {
messageQueueStat = new MessageQueueStat(messageQueue, rebalanceImpl.processQueueTable.get(messageQueue));
if (rebalanceImpl != null && rebalanceImpl.getProcessQueueTable().get(messageQueue) != null) {
System.out.printf("MessageQueue-%s is added %n", messageQueue);
messageQueueStat = new MessageQueueStat(messageQueue, rebalanceImpl.getProcessQueueTable().get(messageQueue));
} else {
ProcessQueue processQueue = new ProcessQueue();
messageQueueStat = new MessageQueueStat(messageQueue, processQueue);
......@@ -154,7 +177,6 @@ public class AssignedMessageQueue {
}
}
}
}
public void removeAssignedMessageQueue(String topic) {
synchronized (this.assignedMessageQueueState) {
......
......@@ -37,15 +37,12 @@ import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.hook.FilterMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager;
......@@ -55,15 +52,10 @@ import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
......@@ -74,7 +66,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class DefaultLitePullConsumerImpl implements MQConsumerInner {
......@@ -85,8 +76,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private final RPCHook rpcHook;
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
......@@ -122,6 +111,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
* Delay some time when suspend pull service
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000;
/**
* Delay some time when no new message
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG = 0;
private DefaultLitePullConsumer defaultLitePullConsumer;
......@@ -143,10 +136,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private long nextAutoCommitDeadline = -1L;
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook;
}
private void checkServiceState() {
......@@ -162,8 +153,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
private void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
this.assignedMessageQueue.updateAssignedMessageQueue(assignedMessageQueue);
updatePullTask(topic, assignedMessageQueue);
this.assignedMessageQueue.updateAssignedMessageQueue(topic, assignedMessageQueue);
}
private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
......@@ -187,9 +177,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
switch (messageModel) {
case BROADCASTING:
updateAssignedMessageQueue(topic, mqAll);
updatePullTask(topic, mqAll);
break;
case CLUSTERING:
updateAssignedMessageQueue(topic, mqDivided);
updatePullTask(topic, mqDivided);
break;
default:
break;
......@@ -356,13 +348,16 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private void copySubscription() throws MQClientException {
try {
Set<String> registerTopics = this.defaultLitePullConsumer.getRegisterTopics();
if (registerTopics != null) {
for (final String topic : registerTopics) {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(),
topic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
switch (this.defaultLitePullConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
/*
* Retry topic support in the future.
*/
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
......@@ -421,7 +416,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized void unsubscribe(final String topic) {
this.rebalanceImpl.getSubscriptionInner().remove(topic);
//can be delete
removePullTaskCallback(topic);
assignedMessageQueue.removeAssignedMessageQueue(topic);
}
......@@ -484,8 +478,13 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
public synchronized void seek(MessageQueue messageQueue, long offset) throws MQClientException {
if (!assignedMessageQueue.messageQueues().contains(messageQueue))
if (!assignedMessageQueue.messageQueues().contains(messageQueue)) {
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
throw new MQClientException("The message queue is not in assigned list, may be rebalancing, message queue: " + messageQueue, null);
} else {
throw new MQClientException("The message queue is not in assigned list, message queue: " + messageQueue, null);
}
}
long minOffset = minOffset(messageQueue);
long maxOffset = maxOffset(messageQueue);
if (offset < minOffset || offset > maxOffset)
......@@ -552,6 +551,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
}
if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING)
offsetStore.persistAll(assignedMessageQueue.messageQueues());
} catch (Exception e) {
log.error("An error occurred when update consume offset synchronously.", e);
}
......@@ -570,6 +571,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
}
if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING)
offsetStore.persistAll(assignedMessageQueue.messageQueues());
} catch (Exception e) {
log.error("An error occurred when update consume offset Automatically.");
}
......@@ -605,6 +608,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
offset = assignedMessageQueue.getPullOffset(remoteQueue);
if (offset == -1) {
offset = fetchConsumeOffset(remoteQueue, false);
if (offset == -1 && defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) {
offset = 0;
}
assignedMessageQueue.updatePullOffset(remoteQueue, offset);
assignedMessageQueue.updateConsumeOffset(remoteQueue, offset);
}
......@@ -706,6 +712,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {}", pullResult.toString());
break;
case NO_NEW_MSG:
pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG;
break;
default:
break;
}
......@@ -745,7 +754,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout);
}
private PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
......@@ -756,7 +765,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout);
}
private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression)
......@@ -830,43 +839,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
//If namespace not null , reset Topic without namespace.
this.resetTopic(pullResult.getMsgFoundList());
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = null;
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(mq);
consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
consumeMessageContext.setSuccess(false);
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
return pullResult;
}
private void executeHookBefore(final ConsumeMessageContext context) {
if (!this.consumeMessageHookList.isEmpty()) {
for (ConsumeMessageHook hook : this.consumeMessageHookList) {
try {
hook.consumeMessageBefore(context);
} catch (Throwable ignored) {
}
}
}
}
private void executeHookAfter(final ConsumeMessageContext context) {
if (!this.consumeMessageHookList.isEmpty()) {
for (ConsumeMessageHook hook : this.consumeMessageHookList) {
try {
hook.consumeMessageAfter(context);
} catch (Throwable ignored) {
}
}
}
}
public void resetTopic(List<MessageExt> msgList) {
if (null == msgList || msgList.size() == 0) {
......@@ -920,25 +895,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
@Override
public Set<SubscriptionData> subscriptions() {
Set<SubscriptionData> result = new HashSet<SubscriptionData>();
Set<SubscriptionData> subSet = new HashSet<SubscriptionData>();
Set<String> topics = this.defaultLitePullConsumer.getRegisterTopics();
if (topics != null) {
synchronized (topics) {
for (String t : topics) {
SubscriptionData ms = null;
try {
ms = FilterAPI.buildSubscriptionData(this.groupName(), t, SubscriptionData.SUB_ALL);
} catch (Exception e) {
log.error("parse subscription error", e);
}
ms.setSubVersion(0L);
result.add(ms);
}
}
}
subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());
return result;
return subSet;
}
@Override
......@@ -1000,41 +961,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return info;
}
private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
sendMessageBack(msg, delayLevel, brokerName, this.defaultLitePullConsumer.getConsumerGroup());
}
private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
if (UtilAll.isBlank(consumerGroup)) {
consumerGroup = this.defaultLitePullConsumer.getConsumerGroup();
}
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000,
this.defaultLitePullConsumer.getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultLitePullConsumer.getConsumerGroup(), e);
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultLitePullConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.defaultLitePullConsumer.getMaxReconsumeTimes()));
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultLitePullConsumer.getNamespace()));
}
}
private void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway);
......@@ -1044,6 +970,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return offsetStore;
}
public void registerFilterMessageHook(final FilterMessageHook hook) {
this.filterMessageHookList.add(hook);
log.info("register FilterMessageHook Hook, {}", hook.hookName());
}
public DefaultLitePullConsumer getDefaultLitePullConsumer() {
return defaultLitePullConsumer;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册