diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 6ca46dca5e8cd44ad870d72417fea5f718877197..4ef34f932b78995fb06ecf8928a971faf1319332 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -56,6 +56,7 @@ import org.apache.rocketmq.broker.processor.SendMessageProcessor; import org.apache.rocketmq.broker.slave.SlaveSynchronize; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; +import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService; import org.apache.rocketmq.broker.topic.TopicQueueMappingManager; import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener; import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService; @@ -64,11 +65,9 @@ import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageC import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl; import org.apache.rocketmq.broker.util.ServiceProvider; -import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.Configuration; import org.apache.rocketmq.common.DataVersion; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; @@ -76,7 +75,6 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; @@ -87,9 +85,6 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RemotingServer; import org.apache.rocketmq.remoting.common.TlsMode; -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyRemotingServer; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; @@ -163,9 +158,6 @@ public class BrokerController { private final BrokerOuterAPI brokerOuterAPI; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( "BrokerControllerScheduledThread")); - //the topic queue mapping is costly, so use an independent executor - private final ScheduledExecutorService scheduledForTopicQueueMapping = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( - "BrokerControllerScheduledThread-TopicQueueMapping")); private final SlaveSynchronize slaveSynchronize; private final BlockingQueue sendThreadPoolQueue; private final BlockingQueue ackThreadPoolQueue; @@ -202,6 +194,7 @@ public class BrokerController { private InetSocketAddress storeHost; private BrokerFastFailure brokerFastFailure; private Configuration configuration; + private TopicQueueMappingCleanService topicQueueMappingCleanService; private FileWatchService fileWatchService; private TransactionalMessageCheckService transactionalMessageCheckService; private TransactionalMessageService transactionalMessageService; @@ -501,21 +494,6 @@ public class BrokerController { } }, 1, 5, TimeUnit.SECONDS); - this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> { - try { - this.topicQueueMappingManager.cleanItemListMoreThanSecondGen(); - } catch (Throwable t) { - log.error("ScheduledTask cleanItemListMoreThanSecondGen failed", t); - } - }, 1, 5, TimeUnit.MINUTES); - - this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> { - try { - this.topicQueueMappingManager.cleanItemExpired(); - } catch (Throwable t) { - log.error("ScheduledTask cleanItemExpired failed", t); - } - }, 1, 5, TimeUnit.MINUTES); if (!messageStoreConfig.isEnableDLegerCommitLog()) { if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { @@ -539,6 +517,8 @@ public class BrokerController { } } + this.topicQueueMappingCleanService = new TopicQueueMappingCleanService(this); + if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { @@ -897,6 +877,10 @@ public class BrokerController { this.fastRemotingServer.shutdown(); } + if (this.topicQueueMappingCleanService != null) { + this.topicQueueMappingCleanService.shutdown(); + } + if (this.fileWatchService != null) { this.fileWatchService.shutdown(); } @@ -911,12 +895,6 @@ public class BrokerController { } catch (InterruptedException e) { } - this.scheduledForTopicQueueMapping.shutdown(); - try { - this.scheduledForTopicQueueMapping.awaitTermination(5000, TimeUnit.MILLISECONDS); - } catch (Throwable ignored) { - } - this.unregisterBrokerAll(); if (this.sendMessageExecutor != null) { @@ -1020,6 +998,10 @@ public class BrokerController { this.fastRemotingServer.start(); } + if (this.topicQueueMappingCleanService != null) { + this.topicQueueMappingCleanService.start(); + } + if (this.fileWatchService != null) { this.fileWatchService.start(); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 20dfc8516b698a67fba292808250b23ea6431358..270c953aebd3f886007ecf621774ddb39d91b20b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -323,7 +323,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements try { this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingDetail, force); + this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingDetail, force, false, true); this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); response.setCode(ResponseCode.SUCCESS); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java new file mode 100644 index 0000000000000000000000000000000000000000..91fd60d16ccf559c8dddfacb12350574100288ca --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.topic; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.out.BrokerOuterAPI; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.admin.TopicOffset; +import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.rpc.ClientMetadata; +import org.apache.rocketmq.common.rpc.RpcClient; +import org.apache.rocketmq.common.rpc.RpcRequest; +import org.apache.rocketmq.common.rpc.RpcResponse; +import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; +import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.store.config.MessageStoreConfig; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class TopicQueueMappingCleanService extends ServiceThread { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + private TopicQueueMappingManager topicQueueMappingManager; + private BrokerOuterAPI brokerOuterAPI; + private RpcClient rpcClient; + private MessageStoreConfig messageStoreConfig; + private BrokerConfig brokerConfig; + + public TopicQueueMappingCleanService(BrokerController brokerController) { + this.topicQueueMappingManager = brokerController.getTopicQueueMappingManager(); + this.rpcClient = brokerController.getBrokerOuterAPI().getRpcClient(); + this.messageStoreConfig = brokerController.getMessageStoreConfig(); + this.brokerConfig = brokerController.getBrokerConfig(); + this.brokerOuterAPI = brokerController.getBrokerOuterAPI(); + } + + @Override + public String getServiceName() { + return TopicQueueMappingCleanService.class.getSimpleName(); + } + + @Override + public void run() { + log.info("Start topic queue mapping clean service thread!"); + while (!this.isStopped()) { + try { + cleanItemExpired(); + } catch (Throwable t) { + log.error("topic queue mapping cleanItemExpired failed", t); + } + try { + cleanItemListMoreThanSecondGen(); + } catch (Throwable t) { + log.error("topic queue mapping cleanItemListMoreThanSecondGen failed", t); + } + try { + this.waitForRunning(5L * 60 * 1000); + } catch (Throwable ignore) { + + } + } + log.info("End topic queue mapping clean service thread!"); + } + + + + public void cleanItemExpired() { + String when = messageStoreConfig.getDeleteWhen(); + if (!UtilAll.isItTimeToDo(when)) { + return; + } + boolean changed = false; + long start = System.currentTimeMillis(); + try { + for(String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) { + try { + if (isStopped()) { + break; + } + TopicQueueMappingDetail mappingDetail = this.topicQueueMappingManager.getTopicQueueMappingTable().get(topic); + if (mappingDetail == null + || mappingDetail.getHostedQueues().isEmpty()) { + continue; + } + if (!mappingDetail.getBname().equals(brokerConfig.getBrokerName())) { + log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail); + continue; + } + Set brokers = new HashSet<>(); + for (List items: mappingDetail.getHostedQueues().values()) { + if (items.size() < 2) { + continue; + } + LogicQueueMappingItem earlistItem = items.get(0); + brokers.add(earlistItem.getBname()); + } + Map statsTable = new HashMap<>(); + for (String broker: brokers) { + GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader(); + header.setTopic(topic); + header.setBname(broker); + try { + RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null); + RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get(); + if (rpcResponse.getException() != null) { + throw rpcResponse.getException(); + } + statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody()); + } catch (Throwable rt) { + log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt); + } + } + for (List items: mappingDetail.getHostedQueues().values()) { + if (items.size() < 2) { + continue; + } + LogicQueueMappingItem earlistItem = items.get(0); + TopicStatsTable topicStats = statsTable.get(earlistItem.getBname()); + if (topicStats == null) { + continue; + } + TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId())); + if (topicOffset == null) { + //this may should not happen + log.warn("Get null topicOffset for {}", earlistItem); + continue; + } + //ignore the maxOffset < 0, which may in case of some error + if (topicOffset.getMaxOffset() == topicOffset.getMinOffset() + || topicOffset.getMaxOffset() == 0) { + List newItems = new ArrayList<>(items); + boolean result = newItems.remove(earlistItem); + this.topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, true, false); + changed = changed || result; + log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset); + } + } + } catch (Throwable tt) { + log.error("Try CleanItemExpired failed for {}", topic, tt); + } finally { + UtilAll.sleep(10); + } + } + } catch (Throwable t) { + log.error("Try cleanItemExpired failed", t); + } finally { + if (changed) { + this.topicQueueMappingManager.getDataVersion().nextVersion(); + this.topicQueueMappingManager.persist(); + log.info("CleanItemExpired changed"); + } + log.info("cleanItemExpired cost {} ms", System.currentTimeMillis() - start); + } + } + + public void cleanItemListMoreThanSecondGen() { + String when = messageStoreConfig.getDeleteWhen(); + if (!UtilAll.isItTimeToDo(when)) { + return; + } + boolean changed = false; + long start = System.currentTimeMillis(); + try { + ClientMetadata clientMetadata = new ClientMetadata(); + for (String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) { + try { + if (isStopped()) { + break; + } + TopicQueueMappingDetail mappingDetail = this.topicQueueMappingManager.getTopicQueueMappingTable().get(topic); + if (mappingDetail == null + || mappingDetail.getHostedQueues().isEmpty()) { + continue; + } + if (!mappingDetail.getBname().equals(brokerConfig.getBrokerName())) { + log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail); + continue; + } + Map qid2CurrLeaderBroker = new HashMap<>(); + for (Map.Entry> entry : mappingDetail.getHostedQueues().entrySet()) { + Integer qId = entry.getKey(); + List items = entry.getValue(); + if (items.isEmpty()) { + continue; + } + LogicQueueMappingItem leaderItem = items.get(items.size() - 1); + if (!leaderItem.getBname().equals(mappingDetail.getBname())) { + qid2CurrLeaderBroker.put(qId, leaderItem.getBname()); + } + } + if (qid2CurrLeaderBroker.isEmpty()) { + continue; + } + //find the topic route + TopicRouteData topicRouteData = brokerOuterAPI.getTopicRouteInfoFromNameServer(topic, brokerConfig.getForwardTimeout()); + clientMetadata.freshTopicRoute(topic, topicRouteData); + Map qid2RealLeaderBroker = new HashMap<>(); + //fine the real leader + for (Map.Entry entry : qid2CurrLeaderBroker.entrySet()) { + qid2RealLeaderBroker.put(entry.getKey(), clientMetadata.getBrokerNameFromMessageQueue(new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, entry.getKey()))); + } + + //find the mapping detail of real leader + Map mappingDetailMap = new HashMap<>(); + for (Map.Entry entry : qid2RealLeaderBroker.entrySet()) { + if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(entry.getValue())) { + continue; + } + String broker = entry.getValue(); + GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader(); + header.setTopic(topic); + header.setBname(broker); + try { + RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null); + RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get(); + if (rpcResponse.getException() != null) { + throw rpcResponse.getException(); + } + TopicQueueMappingDetail mappingDetailRemote = ((TopicConfigAndQueueMapping) rpcResponse.getBody()).getMappingDetail(); + if (broker.equals(mappingDetailRemote.getBname())) { + mappingDetailMap.put(broker, mappingDetailRemote); + } + } catch (Throwable rt) { + log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt); + } + } + //check all the info + Set ids2delete = new HashSet<>(); + for (Map.Entry entry : qid2CurrLeaderBroker.entrySet()) { + Integer qId = entry.getKey(); + String currLeaderBroker = entry.getValue(); + String realLeaderBroker = qid2RealLeaderBroker.get(qId); + TopicQueueMappingDetail remoteMappingDetail = mappingDetailMap.get(realLeaderBroker); + if (remoteMappingDetail == null + || remoteMappingDetail.getTotalQueues() != mappingDetail.getTotalQueues() + || remoteMappingDetail.getEpoch() != mappingDetail.getEpoch()) { + continue; + } + List items = remoteMappingDetail.getHostedQueues().get(qId); + if (items.isEmpty()) { + continue; + } + LogicQueueMappingItem leaderItem = items.get(items.size() - 1); + if (!realLeaderBroker.equals(leaderItem.getBname())) { + continue; + } + //all the check is ok + if (!realLeaderBroker.equals(currLeaderBroker)) { + ids2delete.add(qId); + } + } + for (Integer qid : ids2delete) { + List items = mappingDetail.getHostedQueues().remove(qid); + changed = true; + if (items != null) { + log.info("Remove the ItemListMoreThanSecondGen topic {} qid {} items {}", topic, qid, items); + } + } + } catch (Throwable tt) { + log.error("Try cleanItemListMoreThanSecondGen failed for topic {}", topic, tt); + } finally { + UtilAll.sleep(10); + } + } + } catch (Throwable t) { + log.error("Try cleanItemListMoreThanSecondGen failed", t); + } finally { + if (changed) { + this.topicQueueMappingManager.getDataVersion().nextVersion(); + this.topicQueueMappingManager.persist(); + } + log.info("Try cleanItemListMoreThanSecondGen cost {} ms", System.currentTimeMillis() - start); + } + } + + + + +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index c442040cef1e5d28866243f74f5bef2ae433b4e7..e76d25e3e891e2bd0c569b292b0def43c483ee19 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -22,27 +22,22 @@ import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.admin.TopicOffset; -import org.apache.rocketmq.common.admin.TopicStatsTable; -import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper; import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader; import org.apache.rocketmq.common.rpc.RpcRequest; import org.apache.rocketmq.common.rpc.RpcResponse; +import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.store.DefaultMessageStore; import java.util.HashMap; import java.util.HashSet; @@ -74,7 +69,7 @@ public class TopicQueueMappingManager extends ConfigManager { this.brokerController = brokerController; } - public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) throws Exception { + public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force, boolean isClean, boolean flush) throws Exception { boolean locked = false; boolean updated = false; TopicQueueMappingDetail oldDetail = null; @@ -124,7 +119,7 @@ public class TopicQueueMappingManager extends ConfigManager { newDetail.getHostedQueues().put(globalId, oldItems); } } else { - TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems, epochEqual); + TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems, epochEqual, isClean); } } topicQueueMappingTable.put(newDetail.getTopic(), newDetail); @@ -133,7 +128,8 @@ public class TopicQueueMappingManager extends ConfigManager { if (locked) { this.lock.unlock(); } - if (updated) { + if (updated && flush) { + this.dataVersion.nextVersion(); this.persist(); log.info("Update topic queue mapping from [{}] to [{}], force {}", oldDetail, newDetail, force); } @@ -258,162 +254,4 @@ public class TopicQueueMappingManager extends ConfigManager { } } - - public void cleanItemListMoreThanSecondGen() { - String when = this.brokerController.getMessageStoreConfig().getDeleteWhen(); - if (!UtilAll.isItTimeToDo(when)) { - return; - } - - for(String topic : topicQueueMappingTable.keySet()) { - try { - TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic); - if (mappingDetail == null - || mappingDetail.getHostedQueues().isEmpty()) { - continue; - } - if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) { - log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail); - continue; - } - Set brokers = new HashSet<>(); - for (List items : mappingDetail.getHostedQueues().values()) { - if (items.size() < 2) { - continue; - } - LogicQueueMappingItem leaderItem = items.get(items.size() - 1); - if (!leaderItem.equals(mappingDetail.getBname())) { - brokers.add(leaderItem.getBname()); - } - } - if (brokers.isEmpty()) { - continue; - } - Map configAndQueueMappingMap = new HashMap<>(); - for (String broker: brokers) { - GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader(); - header.setTopic(topic); - header.setBname(broker); - try { - RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null); - RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); - if (rpcResponse.getException() != null) { - throw rpcResponse.getException(); - } - configAndQueueMappingMap.put(broker, (TopicConfigAndQueueMapping) rpcResponse.getBody()); - } catch (Throwable rt) { - log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt); - } - } - - Iterator>> it = mappingDetail.getHostedQueues().entrySet().iterator(); - while (it.hasNext()) { - Map.Entry> entry = it.next(); - Integer queueId = entry.getKey(); - List items = entry.getValue(); - if (items.size() < 2) { - continue; - } - LogicQueueMappingItem leaderItem = items.get(items.size() - 1); - - TopicConfigAndQueueMapping configAndQueueMapping = configAndQueueMappingMap.get(leaderItem.getBname()); - if (configAndQueueMapping == null) { - continue; - } - List itemsRemote = configAndQueueMapping.getMappingDetail().getHostedQueues().get(queueId); - //TODO - } - } catch (Throwable tt) { - log.error("Try cleanItemListMoreThanSecondGen failed for topic {}", topic, tt); - } finally { - UtilAll.sleep(10); - } - } - } - - - public void cleanItemExpired() { - String when = this.brokerController.getMessageStoreConfig().getDeleteWhen(); - if (!UtilAll.isItTimeToDo(when)) { - return; - } - boolean changed = false; - long start = System.currentTimeMillis(); - try { - for(String topic : topicQueueMappingTable.keySet()) { - try { - TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic); - if (mappingDetail == null - || mappingDetail.getHostedQueues().isEmpty()) { - continue; - } - if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) { - log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail); - continue; - } - Set brokers = new HashSet<>(); - for (List items: mappingDetail.getHostedQueues().values()) { - if (items.size() < 2) { - continue; - } - LogicQueueMappingItem earlistItem = items.get(0); - brokers.add(earlistItem.getBname()); - } - Map statsTable = new HashMap<>(); - for (String broker: brokers) { - GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader(); - header.setTopic(topic); - header.setBname(broker); - try { - RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null); - RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); - if (rpcResponse.getException() != null) { - throw rpcResponse.getException(); - } - statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody()); - } catch (Throwable rt) { - log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt); - } - } - for (List items: mappingDetail.getHostedQueues().values()) { - if (items.size() < 2) { - continue; - } - LogicQueueMappingItem earlistItem = items.get(0); - TopicStatsTable topicStats = statsTable.get(earlistItem.getBname()); - if (topicStats == null) { - continue; - } - TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId())); - if (topicOffset == null) { - //this may should not happen - log.warn("Get null topicOffset for {}", earlistItem); - continue; - } - if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) { - //TODO be careful of the concurrent problem - //Should use the lock - boolean result = items.remove(earlistItem); - changed = changed || result; - log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset); - } - } - } catch (Throwable tt) { - log.error("Try CleanItemExpired failed for {}", topic, tt); - } finally { - UtilAll.sleep(10); - } - } - } catch (Throwable t) { - log.error("Try cleanItemExpired failed", t); - } finally { - if (changed) { - this.dataVersion.nextVersion(); - this.persist(); - log.info("CleanItemExpired changed"); - } - log.info("cleanItemExpired cost {} ms", System.currentTimeMillis() - start); - } - } - } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java index 3c3f4881c64a97b41255ff21e6b3822972b3a401..6b4faab5d50532c4c167c2b89c9fad7492d35429 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java @@ -93,7 +93,7 @@ public class TopicQueueMappingManagerTest { Assert.assertEquals(0, topicQueueMappingManager.getTopicQueueMappingTable().size()); for (TopicQueueMappingDetail mappingDetail : mappingDetailMap.values()) { for (int i = 0; i < 10; i++) { - topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false); + topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, false, true); } } topicQueueMappingManager.persist(); diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index 13e34ac25f097439f390a4b4b41180ca04567e50..58928eac7b99b44357b7786bac381987111ba7ff 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -85,8 +85,8 @@ public class MixAll { public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml"; public static final String REPLY_MESSAGE_FLAG = "reply"; private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); - public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME = "__logical_queue_broker__"; - public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__logical_queue_broker_not_exist__"; + public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME = "__logic_broker__"; + public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__logic_broker_none__"; public static String getWSAddr() { String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP); diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index 8aa157488682926e8039c627c151d45a4d4f18f5..3ef45e05168fc56274d11f096a8a17920779b7af 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -193,7 +193,7 @@ public class TopicQueueMappingUtils { return new AbstractMap.SimpleEntry(maxEpoch, maxNum); } - public static void makeSureLogicQueueMappingItemImmutable(List oldItems, List newItems, boolean epochEqual) { + public static void makeSureLogicQueueMappingItemImmutable(List oldItems, List newItems, boolean epochEqual, boolean isCLean) { if (oldItems == null || oldItems.isEmpty()) { return; } @@ -205,10 +205,15 @@ public class TopicQueueMappingUtils { LogicQueueMappingItem newItem = newItems.get(inew); LogicQueueMappingItem oldItem = oldItems.get(iold); if (newItem.getGen() < oldItem.getGen()) { + //the old one may have been deleted inew++; - continue; } else if (oldItem.getGen() < newItem.getGen()){ - throw new RuntimeException("The gen is not correct for old item"); + //the new one may be the "delete one from " + if (isCLean) { + iold++; + } else { + throw new RuntimeException("The new item-list has less items than old item-list"); + } } else { assert oldItem.getBname().equals(newItem.getBname()); assert oldItem.getQueueId() == newItem.getQueueId();