diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index 87c01a5b3055470be8d1772457afde58d9e55a95..c3e4efa252c1c8fb4e3d9b2d0214e2b5b965d736 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -16,7 +16,9 @@ */ package org.apache.rocketmq.client; +import java.util.Collection; import java.util.HashSet; +import java.util.Iterator; import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.UtilAll; @@ -95,7 +97,6 @@ public class ClientConfig { } } - public String withNamespace(String resource) { return NamespaceUtil.wrapNamespace(this.getNamespace(), resource); } @@ -124,9 +125,21 @@ public class ClientConfig { if (StringUtils.isEmpty(this.getNamespace())) { return queue; } - return new MessageQueue(withNamespace(queue.getTopic()), queue.getBrokerName(), queue.getQueueId()); } + + public Collection queuesWithNamespace(Collection queues) { + if (StringUtils.isEmpty(this.getNamespace())) { + return queues; + } + Iterator iter = queues.iterator(); + while (iter.hasNext()) { + MessageQueue queue = iter.next(); + queue.setTopic(withNamespace(queue.getTopic())); + } + return queues; + } + public void resetClientConfig(final ClientConfig cc) { this.namesrvAddr = cc.namesrvAddr; this.clientIP = cc.clientIP; @@ -170,6 +183,7 @@ public class ClientConfig { /** * Domain name mode access way does not support the delimiter(;), and only one domain name can be set. + * * @param namesrvAddr name server address */ public void setNamesrvAddr(String namesrvAddr) { diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 1858fa1780c3bede4788860db7e74b08f75cdcbd..543e9cffb825a7ced2eba66914dace206fc280c0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -176,17 +176,22 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon @Override public void subscribe(String topic, String subExpression) throws MQClientException { - this.defaultLitePullConsumerImpl.subscribe(topic, subExpression); + this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression); + } + + @Override + public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException { + this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector); } @Override public void unsubscribe(String topic) { - this.defaultLitePullConsumerImpl.unsubscribe(topic); + this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic)); } @Override public void assign(Collection messageQueues) { - defaultLitePullConsumerImpl.assign(messageQueues); + defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues)); } @Override @@ -201,17 +206,17 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon @Override public void seek(MessageQueue messageQueue, long offset) throws MQClientException { - this.defaultLitePullConsumerImpl.seek(messageQueue, offset); + this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset); } @Override public void pause(Collection messageQueues) { - this.defaultLitePullConsumerImpl.pause(messageQueues); + this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues)); } @Override public void resume(Collection messageQueues) { - this.defaultLitePullConsumerImpl.resume(messageQueues); + this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues)); } @Override @@ -221,7 +226,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon @Override public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException { - return this.defaultLitePullConsumerImpl.searchOffset(messageQueue, timestamp); + return this.defaultLitePullConsumerImpl.searchOffset(queueWithNamespace(messageQueue), timestamp); + } + + public void registerTopicMessageQueueChangeListener(String topic, + TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException { + this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener); } @Override @@ -390,5 +400,4 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon public void setPullDelayTimeMills(long pullDelayTimeMills) { this.pullDelayTimeMills = pullDelayTimeMills; } - } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java index ece08af61b5b7f0f1cf58210b58ca43885883b55..87b9dd354b0d34b8bfc0442b93088d8db194caad 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java @@ -36,13 +36,20 @@ public interface LitePullConsumer { void shutdown(); /** - * Subscribe some topic + * Subscribe some topic with subExpression * * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
if * null or * expression,meaning subscribe all */ void subscribe(final String topic, final String subExpression) throws MQClientException; + /** + * Subscribe some topic with selector. + * + * @param selector message selector({@link MessageSelector}), can be null. + */ + void subscribe(final String topic, final MessageSelector selector) throws MQClientException; + /** * Unsubscribe consumption some topic * diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/TopicMessageQueueChangeListener.java b/client/src/main/java/org/apache/rocketmq/client/consumer/TopicMessageQueueChangeListener.java new file mode 100644 index 0000000000000000000000000000000000000000..fa6fd134e230118654eac42d68ee5296305c0e06 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/TopicMessageQueueChangeListener.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import java.util.Set; +import org.apache.rocketmq.common.message.MessageQueue; + +public interface TopicMessageQueueChangeListener { + /** + * This method will be invoked in the condition of queue numbers changed, These scenarios occur when the topic is + * expanded or shrunk. + * + * @param messageQueues + */ + void onChanged(String topic, Set messageQueues); +} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java index b21fd01e130e2fd03367b2cc67dbcddf75004716..8dcaa30e980bd635a2e9390b9fc76c528f282beb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java @@ -138,7 +138,6 @@ public class AssignedMessageQueue { Map.Entry next = it.next(); if (next.getKey().getTopic().equals(topic)) { if (!assigned.contains(next.getKey())) { - System.out.printf("MessageQueue-%s is removed %n", next.getKey()); next.getValue().getProcessQueue().setDropped(true); it.remove(); } @@ -167,7 +166,6 @@ public class AssignedMessageQueue { if (!this.assignedMessageQueueState.containsKey(messageQueue)) { MessageQueueStat messageQueueStat; if (rebalanceImpl != null && rebalanceImpl.getProcessQueueTable().get(messageQueue) != null) { - System.out.printf("MessageQueue-%s is added %n", messageQueue); messageQueueStat = new MessageQueueStat(messageQueue, rebalanceImpl.getProcessQueueTable().get(messageQueue)); } else { ProcessQueue processQueue = new ProcessQueue(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 07ef1cfa8e7dd01da665f5e917b5428c6796e31e..e17aae65aca0cce856e7491c89b913a571a32b27 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.client.impl.consumer; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.ArrayList; import java.util.Iterator; @@ -28,12 +29,16 @@ import java.util.Properties; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.MessageQueueListener; import org.apache.rocketmq.client.consumer.PullResult; @@ -127,6 +132,12 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; + private final ScheduledExecutorService scheduledExecutorService; + + private Map topicMessageQueueChangeListenerMap = new HashMap(); + + private Map> messageQueuesForTopic = new HashMap>(); + private long consumeRequestFlowControlTimes = 0L; private long queueFlowControlTimes = 0L; @@ -138,6 +149,16 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) { this.defaultLitePullConsumer = defaultLitePullConsumer; this.rpcHook = rpcHook; + this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor( + this.defaultLitePullConsumer.getPullThreadNumbers(), + new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup()) + ); + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "MonitorMessageQueueChangeThread"); + } + }); } private void checkServiceState() { @@ -266,12 +287,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { mQClientFactory.start(); - final String group = this.defaultLitePullConsumer.getConsumerGroup(); - - this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor( - this.defaultLitePullConsumer.getPullThreadNumbers(), - new ThreadFactoryImpl("PullMsgThread-" + group) - ); if (subscriptionType == SubscriptionType.SUBSCRIBE) { updateTopicSubscribeInfoWhenSubscriptionChanged(); } @@ -279,8 +294,25 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { updateAssignPullTask(assignedMessageQueue.messageQueues()); } + scheduledExecutorService.scheduleAtFixedRate( + new Runnable() { + @Override + public void run() { + try { + fetchTopicMessageQueuesAndCompare(); + } catch (Exception e) { + log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e); + } + } + }, 1000 * 20, 1000 * 30, TimeUnit.MILLISECONDS); + log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; + for (String topic : topicMessageQueueChangeListenerMap.keySet()) { + Set messageQueues = fetchMessageQueues(topic); + messageQueuesForTopic.put(topic, messageQueues); + } + this.mQClientFactory.checkClientInBroker(); break; case RUNNING: case START_FAILED: @@ -339,7 +371,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } - } public PullAPIWrapper getPullAPIWrapper() { @@ -353,7 +384,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { break; case CLUSTERING: /* - * Retry topic support in the future. + * Retry topic will be support in the future. */ break; default: @@ -410,7 +441,28 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { updateTopicSubscribeInfoWhenSubscriptionChanged(); } } catch (Exception e) { - throw new MQClientException("subscription exception", e); + throw new MQClientException("subscribe exception", e); + } + } + + public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException { + try { + setSubscriptionType(SubscriptionType.SUBSCRIBE); + if (messageSelector == null) { + subscribe(topic, SubscriptionData.SUB_ALL); + return; + } + SubscriptionData subscriptionData = FilterAPI.build(topic, + messageSelector.getExpression(), messageSelector.getExpressionType()); + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl()); + assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl); + if (serviceState == ServiceState.RUNNING) { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + updateTopicSubscribeInfoWhenSubscriptionChanged(); + } + } catch (Exception e) { + throw new MQClientException("subscribe exception", e); } } @@ -421,6 +473,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } public synchronized void assign(Collection messageQueues) { + if (messageQueues == null || messageQueues.isEmpty()) { + throw new IllegalArgumentException("Message queues can not be null or empty."); + } setSubscriptionType(SubscriptionType.ASSIGN); assignedMessageQueue.updateAssignedMessageQueue(messageQueues); if (serviceState == ServiceState.RUNNING) { @@ -461,6 +516,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { List messages = consumeRequest.getMessageExts(); long offset = consumeRequest.getProcessQueue().removeMessage(messages); assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset); + //If namespace not null , reset Topic without namespace. + this.resetTopic(messages); return messages; } } catch (InterruptedException ignore) { @@ -587,8 +644,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { private void submitConsumeRequest(ConsumeRequest consumeRequest) { try { consumeRequestCache.put(consumeRequest); - } catch (InterruptedException ex) { - log.error("Submit consumeRequest error", ex); + } catch (InterruptedException e) { + log.error("Submit consumeRequest error", e); } } @@ -649,14 +706,14 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); if (processQueue == null && processQueue.isDropped()) { - log.info("the message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue); + log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue); return; } if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchNums() > defaultLitePullConsumer.getPullThresholdForAll()) { scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); if ((consumeRequestFlowControlTimes++ % 1000) == 0) - log.warn("the consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes); + log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes); return; } @@ -667,7 +724,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( - "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", + "The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes); } return; @@ -677,7 +734,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( - "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", + "The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes); } return; @@ -687,21 +744,27 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { log.warn( - "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}", + "The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}", processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes); } return; } - String subExpression = null; - if (subscriptionType == SubscriptionType.SUBSCRIBE) { - String topic = this.messageQueue.getTopic(); - subExpression = rebalanceImpl.getSubscriptionInner().get(topic).getSubString(); - } long offset = nextPullOffset(messageQueue); long pullDelayTimeMills = defaultLitePullConsumer.getPullDelayTimeMills(); try { - PullResult pullResult = pull(messageQueue, subExpression, offset, nextPullBatchNums()); + + SubscriptionData subscriptionData; + if (subscriptionType == SubscriptionType.SUBSCRIBE) { + String topic = this.messageQueue.getTopic(); + subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic); + } else{ + String topic = this.messageQueue.getTopic(); + subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(), + topic, SubscriptionData.SUB_ALL); + } + + PullResult pullResult = pull(messageQueue, subscriptionData, offset, nextPullBatchNums()); switch (pullResult.getPullStatus()) { case FOUND: if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty()) { @@ -710,7 +773,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } break; case OFFSET_ILLEGAL: - log.warn("the pull request offset illegal, {}", pullResult.toString()); + log.warn("The pull request offset illegal, {}", pullResult.toString()); break; case NO_NEW_MSG: pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG; @@ -721,7 +784,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { updatePullOffset(messageQueue, pullResult.getNextBeginOffset()); } catch (Throwable e) { pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION; - e.printStackTrace(); log.error("An error occurred in pull message process.", e); } @@ -746,58 +808,16 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } - private PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums) + private PullResult pull(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return pull(mq, subExpression, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis()); + return pull(mq, subscriptionData, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis()); } - private PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout) + private PullResult pull(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression); return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout); } - private PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return pull(mq, messageSelector, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis()); - } - - private PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector); - return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout); - } - - private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression) - throws MQClientException { - - if (null == mq) { - throw new MQClientException("mq is null", null); - } - - try { - return FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(), - mq.getTopic(), subExpression); - } catch (Exception e) { - throw new MQClientException("parse subscription error", e); - } - } - - private SubscriptionData getSubscriptionData(MessageQueue mq, MessageSelector messageSelector) - throws MQClientException { - - if (null == mq) { - throw new MQClientException("mq is null", null); - } - - try { - return FilterAPI.build(mq.getTopic(), - messageSelector.getExpression(), messageSelector.getExpressionType()); - } catch (Exception e) { - throw new MQClientException("parse subscription error", e); - } - } - private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block, long timeout) @@ -815,8 +835,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { throw new MQClientException("maxNums <= 0", null); } - this.subscriptionAutomatically(mq.getTopic()); - int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); long timeoutMillis = block ? this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; @@ -837,13 +855,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { null ); this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); - //If namespace not null , reset Topic without namespace. - this.resetTopic(pullResult.getMsgFoundList()); return pullResult; } - - public void resetTopic(List msgList) { + private void resetTopic(List msgList) { if (null == msgList || msgList.size() == 0) { return; } @@ -857,17 +872,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } - public void subscriptionAutomatically(final String topic) { - if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) { - try { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(), - topic, SubscriptionData.SUB_ALL); - this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData); - } catch (Exception ignore) { - } - } - } - public void updateConsumeOffset(MessageQueue mq, long offset) { checkServiceState(); this.offsetStore.updateOffset(mq, offset, false); @@ -981,13 +985,59 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public Set fetchMessageQueues(String topic) throws MQClientException { checkServiceState(); - // check if has info in memory, otherwise invoke api. - Set result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic); - if (null == result) { - result = this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic); + Set result = this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic); + return parseMessageQueues(result); + } + + private synchronized void fetchTopicMessageQueuesAndCompare() throws MQClientException { + for (Map.Entry entry : topicMessageQueueChangeListenerMap.entrySet()) { + String topic = entry.getKey(); + TopicMessageQueueChangeListener topicMessageQueueChangeListener = entry.getValue(); + Set oldMessageQueues = messageQueuesForTopic.get(topic); + Set newMessageQueues = fetchMessageQueues(topic); + boolean isChanged = !isSetEqual(newMessageQueues, oldMessageQueues); + if (isChanged) { + messageQueuesForTopic.put(topic, newMessageQueues); + if (topicMessageQueueChangeListener != null) { + topicMessageQueueChangeListener.onChanged(topic, newMessageQueues); + } + } } + } - return parseMessageQueues(result); + private boolean isSetEqual(Set set1, Set set2) { + if (set1 == null && set2 == null) { + return true; + } + + if (set1 == null || set2 == null || set1.size() != set2.size() + || set1.size() == 0 || set2.size() == 0) { + return false; + } + + Iterator iter = set2.iterator(); + boolean isEqual = true; + while (iter.hasNext()) { + if (!set1.contains(iter.next())) { + isEqual = false; + } + } + return isEqual; + } + + public synchronized void registerTopicMessageQueueChangeListener(String topic, + TopicMessageQueueChangeListener listener) throws MQClientException { + if (topic == null || listener == null) { + throw new MQClientException("Topic or listener is null", null); + } + if (topicMessageQueueChangeListenerMap.containsKey(topic)) { + log.warn("Topic {} had been registered, new listener will overwrite the old one", topic); + } + topicMessageQueueChangeListenerMap.put(topic, listener); + if (this.serviceState == ServiceState.RUNNING) { + Set messageQueues = fetchMessageQueues(topic); + messageQueuesForTopic.put(topic, messageQueues); + } } private Set parseMessageQueues(Set queueSet) {