From 46288abb33e7f4cf9ca59dbfbff12be96fd8494a Mon Sep 17 00:00:00 2001 From: King <794220751@qq.com> Date: Fri, 9 Aug 2019 11:26:53 +0800 Subject: [PATCH] Polish lite pull consumer and fix bug (#1373) * fix unsubscribe code * fix commit consumed offset * fix commit consumed offset * fix commit consumed offset * fix commit consumed offset * polish commit consumed offset * pass checkstyle * pass checkstyle * polish LiteMQPullConsumer * add flow control and polish commit logic * fix bug * polish code * fix commit consumed offset back * refactor litePullConsumer * development save * development save * Refactor DefaultLitePullConsumer and DefaultLitePullConsumerImpl. * Polish lite pull consumer * polish lite pull consumer * polish lite pull consumer * fix seek * fix seek function * polish lite pull consumer * add apache header * add test * polish test * Make broadcast model work for litePullConsumer * Revert example/broadcast/PushConsumer.java * Add delay time when no new message * Enable long polling mode * Fix subscribe bug when rebalance * Delete useless consumeMessageHook --- .../consumer/DefaultLitePullConsumer.java | 15 +- .../impl/consumer/AssignedMessageQueue.java | 44 +++-- .../consumer/DefaultLitePullConsumerImpl.java | 159 +++++------------- .../example/broadcast/PushConsumer.java | 2 +- 4 files changed, 80 insertions(+), 140 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 7f657130..1858fa17 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -17,9 +17,7 @@ package org.apache.rocketmq.client.consumer; import java.util.Collection; -import java.util.HashSet; import java.util.List; -import java.util.Set; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; @@ -69,10 +67,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon * Offset Storage */ private OffsetStore offsetStore; - /** - * Topic set you want to register - */ - private Set registerTopics = new HashSet(); + /** * Queue allocation algorithm */ @@ -372,14 +367,6 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon this.messageQueueListener = messageQueueListener; } - public Set getRegisterTopics() { - return registerTopics; - } - - public void setRegisterTopics(Set registerTopics) { - this.registerTopics = withNamespace(registerTopics); - } - public long getConsumerPullTimeoutMillis() { return consumerPullTimeoutMillis; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java index aa8379ec..b21fd01e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.client.impl.consumer; import java.util.Collection; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.CountDownLatch2; import org.apache.rocketmq.common.message.MessageQueue; @@ -37,7 +38,7 @@ public class AssignedMessageQueue { this.rebalanceImpl = rebalanceImpl; } - public Collection messageQueues() { + public Set messageQueues() { return assignedMessageQueueState.keySet(); } @@ -130,6 +131,23 @@ public class AssignedMessageQueue { return null; } + public void updateAssignedMessageQueue(String topic, Collection assigned) { + synchronized (this.assignedMessageQueueState) { + Iterator> it = this.assignedMessageQueueState.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry next = it.next(); + if (next.getKey().getTopic().equals(topic)) { + if (!assigned.contains(next.getKey())) { + System.out.printf("MessageQueue-%s is removed %n", next.getKey()); + next.getValue().getProcessQueue().setDropped(true); + it.remove(); + } + } + } + addAssignedMessageQueue(assigned); + } + } + public void updateAssignedMessageQueue(Collection assigned) { synchronized (this.assignedMessageQueueState) { Iterator> it = this.assignedMessageQueueState.entrySet().iterator(); @@ -140,18 +158,22 @@ public class AssignedMessageQueue { it.remove(); } } + addAssignedMessageQueue(assigned); + } + } - for (MessageQueue messageQueue : assigned) { - if (!this.assignedMessageQueueState.containsKey(messageQueue)) { - MessageQueueStat messageQueueStat; - if (rebalanceImpl != null && rebalanceImpl.processQueueTable.get(messageQueue) != null) { - messageQueueStat = new MessageQueueStat(messageQueue, rebalanceImpl.processQueueTable.get(messageQueue)); - } else { - ProcessQueue processQueue = new ProcessQueue(); - messageQueueStat = new MessageQueueStat(messageQueue, processQueue); - } - this.assignedMessageQueueState.put(messageQueue, messageQueueStat); + private void addAssignedMessageQueue(Collection assigned) { + for (MessageQueue messageQueue : assigned) { + if (!this.assignedMessageQueueState.containsKey(messageQueue)) { + MessageQueueStat messageQueueStat; + if (rebalanceImpl != null && rebalanceImpl.getProcessQueueTable().get(messageQueue) != null) { + System.out.printf("MessageQueue-%s is added %n", messageQueue); + messageQueueStat = new MessageQueueStat(messageQueue, rebalanceImpl.getProcessQueueTable().get(messageQueue)); + } else { + ProcessQueue processQueue = new ProcessQueue(); + messageQueueStat = new MessageQueueStat(messageQueue, processQueue); } + this.assignedMessageQueueState.put(messageQueue, messageQueueStat); } } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 74cf644b..07ef1cfa 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -37,15 +37,12 @@ import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.MessageQueueListener; import org.apache.rocketmq.client.consumer.PullResult; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore; import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.consumer.store.ReadOffsetType; import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.hook.ConsumeMessageContext; -import org.apache.rocketmq.client.hook.ConsumeMessageHook; import org.apache.rocketmq.client.hook.FilterMessageHook; import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.MQClientManager; @@ -55,15 +52,10 @@ import org.apache.rocketmq.common.CountDownLatch2; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.ThreadFactoryImpl; -import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.help.FAQUrl; - -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageAccessor; -import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.NamespaceUtil; @@ -74,7 +66,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; public class DefaultLitePullConsumerImpl implements MQConsumerInner { @@ -85,8 +76,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { private final RPCHook rpcHook; - private final ArrayList consumeMessageHookList = new ArrayList(); - private final ArrayList filterMessageHookList = new ArrayList(); private volatile ServiceState serviceState = ServiceState.CREATE_JUST; @@ -122,6 +111,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { * Delay some time when suspend pull service */ private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000; + /** + * Delay some time when no new message + */ + private static final long PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG = 0; private DefaultLitePullConsumer defaultLitePullConsumer; @@ -143,10 +136,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { private long nextAutoCommitDeadline = -1L; public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) { - this.defaultLitePullConsumer = defaultLitePullConsumer; this.rpcHook = rpcHook; - } private void checkServiceState() { @@ -162,8 +153,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } private void updateAssignedMessageQueue(String topic, Set assignedMessageQueue) { - this.assignedMessageQueue.updateAssignedMessageQueue(assignedMessageQueue); - updatePullTask(topic, assignedMessageQueue); + this.assignedMessageQueue.updateAssignedMessageQueue(topic, assignedMessageQueue); } private void updatePullTask(String topic, Set mqNewSet) { @@ -187,9 +177,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { switch (messageModel) { case BROADCASTING: updateAssignedMessageQueue(topic, mqAll); + updatePullTask(topic, mqAll); break; case CLUSTERING: updateAssignedMessageQueue(topic, mqDivided); + updatePullTask(topic, mqDivided); break; default: break; @@ -356,13 +348,16 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { private void copySubscription() throws MQClientException { try { - Set registerTopics = this.defaultLitePullConsumer.getRegisterTopics(); - if (registerTopics != null) { - for (final String topic : registerTopics) { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(), - topic, SubscriptionData.SUB_ALL); - this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); - } + switch (this.defaultLitePullConsumer.getMessageModel()) { + case BROADCASTING: + break; + case CLUSTERING: + /* + * Retry topic support in the future. + */ + break; + default: + break; } } catch (Exception e) { throw new MQClientException("subscription exception", e); @@ -421,7 +416,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public synchronized void unsubscribe(final String topic) { this.rebalanceImpl.getSubscriptionInner().remove(topic); - //can be delete removePullTaskCallback(topic); assignedMessageQueue.removeAssignedMessageQueue(topic); } @@ -484,8 +478,13 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } public synchronized void seek(MessageQueue messageQueue, long offset) throws MQClientException { - if (!assignedMessageQueue.messageQueues().contains(messageQueue)) - throw new MQClientException("The message queue is not in assigned list, message queue: " + messageQueue, null); + if (!assignedMessageQueue.messageQueues().contains(messageQueue)) { + if (subscriptionType == SubscriptionType.SUBSCRIBE) { + throw new MQClientException("The message queue is not in assigned list, may be rebalancing, message queue: " + messageQueue, null); + } else { + throw new MQClientException("The message queue is not in assigned list, message queue: " + messageQueue, null); + } + } long minOffset = minOffset(messageQueue); long maxOffset = maxOffset(messageQueue); if (offset < minOffset || offset > maxOffset) @@ -552,6 +551,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } } + if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) + offsetStore.persistAll(assignedMessageQueue.messageQueues()); } catch (Exception e) { log.error("An error occurred when update consume offset synchronously.", e); } @@ -570,6 +571,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } } + if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) + offsetStore.persistAll(assignedMessageQueue.messageQueues()); } catch (Exception e) { log.error("An error occurred when update consume offset Automatically."); } @@ -605,6 +608,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { offset = assignedMessageQueue.getPullOffset(remoteQueue); if (offset == -1) { offset = fetchConsumeOffset(remoteQueue, false); + if (offset == -1 && defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) { + offset = 0; + } assignedMessageQueue.updatePullOffset(remoteQueue, offset); assignedMessageQueue.updateConsumeOffset(remoteQueue, offset); } @@ -706,6 +712,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {}", pullResult.toString()); break; + case NO_NEW_MSG: + pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG; + break; default: break; } @@ -745,7 +754,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { private PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression); - return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout); + return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout); } private PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums) @@ -756,7 +765,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { private PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector); - return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout); + return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout); } private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression) @@ -830,43 +839,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); //If namespace not null , reset Topic without namespace. this.resetTopic(pullResult.getMsgFoundList()); - if (!this.consumeMessageHookList.isEmpty()) { - ConsumeMessageContext consumeMessageContext = null; - consumeMessageContext = new ConsumeMessageContext(); - consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace()); - consumeMessageContext.setConsumerGroup(this.groupName()); - consumeMessageContext.setMq(mq); - consumeMessageContext.setMsgList(pullResult.getMsgFoundList()); - consumeMessageContext.setSuccess(false); - this.executeHookBefore(consumeMessageContext); - consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString()); - consumeMessageContext.setSuccess(true); - this.executeHookAfter(consumeMessageContext); - } return pullResult; } - private void executeHookBefore(final ConsumeMessageContext context) { - if (!this.consumeMessageHookList.isEmpty()) { - for (ConsumeMessageHook hook : this.consumeMessageHookList) { - try { - hook.consumeMessageBefore(context); - } catch (Throwable ignored) { - } - } - } - } - - private void executeHookAfter(final ConsumeMessageContext context) { - if (!this.consumeMessageHookList.isEmpty()) { - for (ConsumeMessageHook hook : this.consumeMessageHookList) { - try { - hook.consumeMessageAfter(context); - } catch (Throwable ignored) { - } - } - } - } public void resetTopic(List msgList) { if (null == msgList || msgList.size() == 0) { @@ -920,25 +895,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { @Override public Set subscriptions() { - Set result = new HashSet(); - - Set topics = this.defaultLitePullConsumer.getRegisterTopics(); - if (topics != null) { - synchronized (topics) { - for (String t : topics) { - SubscriptionData ms = null; - try { - ms = FilterAPI.buildSubscriptionData(this.groupName(), t, SubscriptionData.SUB_ALL); - } catch (Exception e) { - log.error("parse subscription error", e); - } - ms.setSubVersion(0L); - result.add(ms); - } - } - } + Set subSet = new HashSet(); - return result; + subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values()); + + return subSet; } @Override @@ -1000,41 +961,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { return info; } - private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - sendMessageBack(msg, delayLevel, brokerName, this.defaultLitePullConsumer.getConsumerGroup()); - } - - private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - try { - String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) - : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); - - if (UtilAll.isBlank(consumerGroup)) { - consumerGroup = this.defaultLitePullConsumer.getConsumerGroup(); - } - - this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000, - this.defaultLitePullConsumer.getMaxReconsumeTimes()); - } catch (Exception e) { - log.error("sendMessageBack Exception, " + this.defaultLitePullConsumer.getConsumerGroup(), e); - - Message newMsg = new Message(MixAll.getRetryTopic(this.defaultLitePullConsumer.getConsumerGroup()), msg.getBody()); - String originMsgId = MessageAccessor.getOriginMessageId(msg); - MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); - newMsg.setFlag(msg.getFlag()); - MessageAccessor.setProperties(newMsg, msg.getProperties()); - MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); - MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); - MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.defaultLitePullConsumer.getMaxReconsumeTimes())); - newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); - this.mQClientFactory.getDefaultMQProducer().send(newMsg); - } finally { - msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultLitePullConsumer.getNamespace())); - } - } - private void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway); @@ -1044,6 +970,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { return offsetStore; } + public void registerFilterMessageHook(final FilterMessageHook hook) { + this.filterMessageHookList.add(hook); + log.info("register FilterMessageHook Hook, {}", hook.hookName()); + } + public DefaultLitePullConsumer getDefaultLitePullConsumer() { return defaultLitePullConsumer; } diff --git a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java index fb1f9bbd..28e02341 100644 --- a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java @@ -50,4 +50,4 @@ public class PushConsumer { consumer.start(); System.out.printf("Broadcast Consumer Started.%n"); } -} +} \ No newline at end of file -- GitLab