提交 c6096a14 编写于 作者: K King 提交者: Heng Du

Polish lite pull consumer (#1381)

* fix unsubscribe code

* fix commit consumed offset

* fix commit consumed offset

* fix commit consumed offset

* fix commit consumed offset

* polish commit consumed offset

* pass checkstyle

* pass checkstyle

* polish LiteMQPullConsumer

* add flow control and polish commit logic

* fix bug

* polish code

* fix commit consumed offset back

* refactor litePullConsumer

* development save

* development save

* Refactor DefaultLitePullConsumer and DefaultLitePullConsumerImpl.

* Polish lite pull consumer

* polish lite pull consumer

* polish lite pull consumer

* fix seek

* fix seek function

* polish lite pull consumer

* add apache header

* add test

* polish test

* Make broadcast model work for litePullConsumer

* Revert example/broadcast/PushConsumer.java

* Add delay time when no new message

* Enable long polling mode

* Fix subscribe bug when rebalance

* Delete useless consumeMessageHook

* Implement TopicMessageQueueChangeListener interface

* Lite pull consumer support namespace

* Make sql92 filter work
上级 46288abb
......@@ -16,7 +16,9 @@
*/
package org.apache.rocketmq.client;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.UtilAll;
......@@ -95,7 +97,6 @@ public class ClientConfig {
}
}
public String withNamespace(String resource) {
return NamespaceUtil.wrapNamespace(this.getNamespace(), resource);
}
......@@ -124,9 +125,21 @@ public class ClientConfig {
if (StringUtils.isEmpty(this.getNamespace())) {
return queue;
}
return new MessageQueue(withNamespace(queue.getTopic()), queue.getBrokerName(), queue.getQueueId());
}
public Collection<MessageQueue> queuesWithNamespace(Collection<MessageQueue> queues) {
if (StringUtils.isEmpty(this.getNamespace())) {
return queues;
}
Iterator<MessageQueue> iter = queues.iterator();
while (iter.hasNext()) {
MessageQueue queue = iter.next();
queue.setTopic(withNamespace(queue.getTopic()));
}
return queues;
}
public void resetClientConfig(final ClientConfig cc) {
this.namesrvAddr = cc.namesrvAddr;
this.clientIP = cc.clientIP;
......@@ -170,6 +183,7 @@ public class ClientConfig {
/**
* Domain name mode access way does not support the delimiter(;), and only one domain name can be set.
*
* @param namesrvAddr name server address
*/
public void setNamesrvAddr(String namesrvAddr) {
......
......@@ -176,17 +176,22 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(topic, subExpression);
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
}
@Override
public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector);
}
@Override
public void unsubscribe(String topic) {
this.defaultLitePullConsumerImpl.unsubscribe(topic);
this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
}
@Override
public void assign(Collection<MessageQueue> messageQueues) {
defaultLitePullConsumerImpl.assign(messageQueues);
defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
}
@Override
......@@ -201,17 +206,17 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
@Override
public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
this.defaultLitePullConsumerImpl.seek(messageQueue, offset);
this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset);
}
@Override
public void pause(Collection<MessageQueue> messageQueues) {
this.defaultLitePullConsumerImpl.pause(messageQueues);
this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues));
}
@Override
public void resume(Collection<MessageQueue> messageQueues) {
this.defaultLitePullConsumerImpl.resume(messageQueues);
this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues));
}
@Override
......@@ -221,7 +226,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
@Override
public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException {
return this.defaultLitePullConsumerImpl.searchOffset(messageQueue, timestamp);
return this.defaultLitePullConsumerImpl.searchOffset(queueWithNamespace(messageQueue), timestamp);
}
public void registerTopicMessageQueueChangeListener(String topic,
TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException {
this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener);
}
@Override
......@@ -390,5 +400,4 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public void setPullDelayTimeMills(long pullDelayTimeMills) {
this.pullDelayTimeMills = pullDelayTimeMills;
}
}
......@@ -36,13 +36,20 @@ public interface LitePullConsumer {
void shutdown();
/**
* Subscribe some topic
* Subscribe some topic with subExpression
*
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe all
*/
void subscribe(final String topic, final String subExpression) throws MQClientException;
/**
* Subscribe some topic with selector.
*
* @param selector message selector({@link MessageSelector}), can be null.
*/
void subscribe(final String topic, final MessageSelector selector) throws MQClientException;
/**
* Unsubscribe consumption some topic
*
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageQueue;
public interface TopicMessageQueueChangeListener {
/**
* This method will be invoked in the condition of queue numbers changed, These scenarios occur when the topic is
* expanded or shrunk.
*
* @param messageQueues
*/
void onChanged(String topic, Set<MessageQueue> messageQueues);
}
......@@ -138,7 +138,6 @@ public class AssignedMessageQueue {
Map.Entry<MessageQueue, MessageQueueStat> next = it.next();
if (next.getKey().getTopic().equals(topic)) {
if (!assigned.contains(next.getKey())) {
System.out.printf("MessageQueue-%s is removed %n", next.getKey());
next.getValue().getProcessQueue().setDropped(true);
it.remove();
}
......@@ -167,7 +166,6 @@ public class AssignedMessageQueue {
if (!this.assignedMessageQueueState.containsKey(messageQueue)) {
MessageQueueStat messageQueueStat;
if (rebalanceImpl != null && rebalanceImpl.getProcessQueueTable().get(messageQueue) != null) {
System.out.printf("MessageQueue-%s is added %n", messageQueue);
messageQueueStat = new MessageQueueStat(messageQueue, rebalanceImpl.getProcessQueueTable().get(messageQueue));
} else {
ProcessQueue processQueue = new ProcessQueue();
......
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.impl.consumer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
......@@ -28,12 +29,16 @@ import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.PullResult;
......@@ -127,6 +132,12 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
private final ScheduledExecutorService scheduledExecutorService;
private Map<String, TopicMessageQueueChangeListener> topicMessageQueueChangeListenerMap = new HashMap<String, TopicMessageQueueChangeListener>();
private Map<String, Set<MessageQueue>> messageQueuesForTopic = new HashMap<String, Set<MessageQueue>>();
private long consumeRequestFlowControlTimes = 0L;
private long queueFlowControlTimes = 0L;
......@@ -138,6 +149,16 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook;
this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
this.defaultLitePullConsumer.getPullThreadNumbers(),
new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
);
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MonitorMessageQueueChangeThread");
}
});
}
private void checkServiceState() {
......@@ -266,12 +287,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
mQClientFactory.start();
final String group = this.defaultLitePullConsumer.getConsumerGroup();
this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
this.defaultLitePullConsumer.getPullThreadNumbers(),
new ThreadFactoryImpl("PullMsgThread-" + group)
);
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
updateTopicSubscribeInfoWhenSubscriptionChanged();
}
......@@ -279,8 +294,25 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
updateAssignPullTask(assignedMessageQueue.messageQueues());
}
scheduledExecutorService.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
try {
fetchTopicMessageQueuesAndCompare();
} catch (Exception e) {
log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
}
}
}, 1000 * 20, 1000 * 30, TimeUnit.MILLISECONDS);
log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
for (String topic : topicMessageQueueChangeListenerMap.keySet()) {
Set<MessageQueue> messageQueues = fetchMessageQueues(topic);
messageQueuesForTopic.put(topic, messageQueues);
}
this.mQClientFactory.checkClientInBroker();
break;
case RUNNING:
case START_FAILED:
......@@ -339,7 +371,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
}
public PullAPIWrapper getPullAPIWrapper() {
......@@ -353,7 +384,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
break;
case CLUSTERING:
/*
* Retry topic support in the future.
* Retry topic will be support in the future.
*/
break;
default:
......@@ -410,7 +441,28 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
updateTopicSubscribeInfoWhenSubscriptionChanged();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
throw new MQClientException("subscribe exception", e);
}
}
public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
try {
setSubscriptionType(SubscriptionType.SUBSCRIBE);
if (messageSelector == null) {
subscribe(topic, SubscriptionData.SUB_ALL);
return;
}
SubscriptionData subscriptionData = FilterAPI.build(topic,
messageSelector.getExpression(), messageSelector.getExpressionType());
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
if (serviceState == ServiceState.RUNNING) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
updateTopicSubscribeInfoWhenSubscriptionChanged();
}
} catch (Exception e) {
throw new MQClientException("subscribe exception", e);
}
}
......@@ -421,6 +473,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
public synchronized void assign(Collection<MessageQueue> messageQueues) {
if (messageQueues == null || messageQueues.isEmpty()) {
throw new IllegalArgumentException("Message queues can not be null or empty.");
}
setSubscriptionType(SubscriptionType.ASSIGN);
assignedMessageQueue.updateAssignedMessageQueue(messageQueues);
if (serviceState == ServiceState.RUNNING) {
......@@ -461,6 +516,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
List<MessageExt> messages = consumeRequest.getMessageExts();
long offset = consumeRequest.getProcessQueue().removeMessage(messages);
assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
//If namespace not null , reset Topic without namespace.
this.resetTopic(messages);
return messages;
}
} catch (InterruptedException ignore) {
......@@ -587,8 +644,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private void submitConsumeRequest(ConsumeRequest consumeRequest) {
try {
consumeRequestCache.put(consumeRequest);
} catch (InterruptedException ex) {
log.error("Submit consumeRequest error", ex);
} catch (InterruptedException e) {
log.error("Submit consumeRequest error", e);
}
}
......@@ -649,14 +706,14 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
if (processQueue == null && processQueue.isDropped()) {
log.info("the message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
return;
}
if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchNums() > defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0)
log.warn("the consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
return;
}
......@@ -667,7 +724,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
"The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
}
return;
......@@ -677,7 +734,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
"The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
}
return;
......@@ -687,21 +744,27 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
"The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes);
}
return;
}
String subExpression = null;
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
String topic = this.messageQueue.getTopic();
subExpression = rebalanceImpl.getSubscriptionInner().get(topic).getSubString();
}
long offset = nextPullOffset(messageQueue);
long pullDelayTimeMills = defaultLitePullConsumer.getPullDelayTimeMills();
try {
PullResult pullResult = pull(messageQueue, subExpression, offset, nextPullBatchNums());
SubscriptionData subscriptionData;
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
String topic = this.messageQueue.getTopic();
subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
} else{
String topic = this.messageQueue.getTopic();
subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
topic, SubscriptionData.SUB_ALL);
}
PullResult pullResult = pull(messageQueue, subscriptionData, offset, nextPullBatchNums());
switch (pullResult.getPullStatus()) {
case FOUND:
if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty()) {
......@@ -710,7 +773,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {}", pullResult.toString());
log.warn("The pull request offset illegal, {}", pullResult.toString());
break;
case NO_NEW_MSG:
pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG;
......@@ -721,7 +784,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
} catch (Throwable e) {
pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION;
e.printStackTrace();
log.error("An error occurred in pull message process.", e);
}
......@@ -746,58 +808,16 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
private PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums)
private PullResult pull(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return pull(mq, subExpression, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis());
return pull(mq, subscriptionData, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis());
}
private PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
private PullResult pull(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout);
}
private PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return pull(mq, messageSelector, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis());
}
private PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout);
}
private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression)
throws MQClientException {
if (null == mq) {
throw new MQClientException("mq is null", null);
}
try {
return FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(),
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
}
private SubscriptionData getSubscriptionData(MessageQueue mq, MessageSelector messageSelector)
throws MQClientException {
if (null == mq) {
throw new MQClientException("mq is null", null);
}
try {
return FilterAPI.build(mq.getTopic(),
messageSelector.getExpression(), messageSelector.getExpressionType());
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
}
private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums,
boolean block,
long timeout)
......@@ -815,8 +835,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
throw new MQClientException("maxNums <= 0", null);
}
this.subscriptionAutomatically(mq.getTopic());
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
long timeoutMillis = block ? this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
......@@ -837,13 +855,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
null
);
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
//If namespace not null , reset Topic without namespace.
this.resetTopic(pullResult.getMsgFoundList());
return pullResult;
}
public void resetTopic(List<MessageExt> msgList) {
private void resetTopic(List<MessageExt> msgList) {
if (null == msgList || msgList.size() == 0) {
return;
}
......@@ -857,17 +872,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
public void subscriptionAutomatically(final String topic) {
if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(),
topic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
} catch (Exception ignore) {
}
}
}
public void updateConsumeOffset(MessageQueue mq, long offset) {
checkServiceState();
this.offsetStore.updateOffset(mq, offset, false);
......@@ -981,13 +985,59 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public Set<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
checkServiceState();
// check if has info in memory, otherwise invoke api.
Set<MessageQueue> result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
if (null == result) {
result = this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
Set<MessageQueue> result = this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
return parseMessageQueues(result);
}
private synchronized void fetchTopicMessageQueuesAndCompare() throws MQClientException {
for (Map.Entry<String, TopicMessageQueueChangeListener> entry : topicMessageQueueChangeListenerMap.entrySet()) {
String topic = entry.getKey();
TopicMessageQueueChangeListener topicMessageQueueChangeListener = entry.getValue();
Set<MessageQueue> oldMessageQueues = messageQueuesForTopic.get(topic);
Set<MessageQueue> newMessageQueues = fetchMessageQueues(topic);
boolean isChanged = !isSetEqual(newMessageQueues, oldMessageQueues);
if (isChanged) {
messageQueuesForTopic.put(topic, newMessageQueues);
if (topicMessageQueueChangeListener != null) {
topicMessageQueueChangeListener.onChanged(topic, newMessageQueues);
}
}
}
}
return parseMessageQueues(result);
private boolean isSetEqual(Set<MessageQueue> set1, Set<MessageQueue> set2) {
if (set1 == null && set2 == null) {
return true;
}
if (set1 == null || set2 == null || set1.size() != set2.size()
|| set1.size() == 0 || set2.size() == 0) {
return false;
}
Iterator iter = set2.iterator();
boolean isEqual = true;
while (iter.hasNext()) {
if (!set1.contains(iter.next())) {
isEqual = false;
}
}
return isEqual;
}
public synchronized void registerTopicMessageQueueChangeListener(String topic,
TopicMessageQueueChangeListener listener) throws MQClientException {
if (topic == null || listener == null) {
throw new MQClientException("Topic or listener is null", null);
}
if (topicMessageQueueChangeListenerMap.containsKey(topic)) {
log.warn("Topic {} had been registered, new listener will overwrite the old one", topic);
}
topicMessageQueueChangeListenerMap.put(topic, listener);
if (this.serviceState == ServiceState.RUNNING) {
Set<MessageQueue> messageQueues = fetchMessageQueues(topic);
messageQueuesForTopic.put(topic, messageQueues);
}
}
private Set<MessageQueue> parseMessageQueues(Set<MessageQueue> queueSet) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册