From 527382e4ce5cc03435b199f4b3c618be5590210c Mon Sep 17 00:00:00 2001 From: dongeforever Date: Mon, 6 Dec 2021 16:44:50 +0800 Subject: [PATCH] Clean the items more than second gen --- .../processor/AdminBrokerProcessor.java | 5 +- .../topic/TopicQueueMappingManager.java | 50 ++++++++++++++++++- .../statictopic/TopicQueueMappingDetail.java | 13 +++++ .../statictopic/TopicQueueMappingInfo.java | 12 +++++ .../namesrv/routeinfo/RouteInfoManager.java | 1 + 5 files changed, 78 insertions(+), 3 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index a286818c..20dfc851 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -353,13 +353,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements } this.brokerController.getTopicConfigManager().deleteTopicConfig(topic); + this.brokerController.getTopicQueueMappingManager().delete(topic); + this.brokerController.getMessageStore() .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) { this.brokerController.getBrokerStatsManager().onTopicDeleted(requestHeader.getTopic()); } - //TODO delete the topic route - //this.brokerController.getTopicQueueMappingManager() + response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index 9be37176..c484bcf3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -33,7 +33,9 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -61,6 +63,7 @@ public class TopicQueueMappingManager extends ConfigManager { public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) throws Exception { boolean locked = false; boolean updated = false; + TopicQueueMappingDetail oldDetail = null; try { if (lock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { locked = true; @@ -74,7 +77,7 @@ public class TopicQueueMappingManager extends ConfigManager { TopicQueueMappingUtils.checkLogicQueueMappingItemOffset(items); }); - TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic()); + oldDetail = topicQueueMappingTable.get(newDetail.getTopic()); if (oldDetail == null) { topicQueueMappingTable.put(newDetail.getTopic(), newDetail); updated = true; @@ -115,11 +118,23 @@ public class TopicQueueMappingManager extends ConfigManager { } if (updated) { this.persist(); + log.info("Update topic queue mapping from [{}] to [{}], force {}", oldDetail, newDetail, force); } } } + public void delete(final String topic) { + TopicQueueMappingDetail old = this.topicQueueMappingTable.remove(topic); + if (old != null) { + log.info("delete topic queue mapping OK, topic queue mapping: {}", old); + this.dataVersion.nextVersion(); + this.persist(); + } else { + log.warn("delete topic queue mapping failed, topic: {} not exists", topic); + } + } + public TopicQueueMappingDetail getTopicQueueMapping(String topic) { return topicQueueMappingTable.get(topic); } @@ -177,6 +192,9 @@ public class TopicQueueMappingManager extends ConfigManager { //it is not static topic return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null); } + + assert mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName()); + //If not find mappingItem, it encounters some errors Integer globalId = requestHeader.getQueueId(); if (globalId < 0 && !selectOneWhenMiss) { @@ -224,4 +242,34 @@ public class TopicQueueMappingManager extends ConfigManager { } + public void cleanItemListMoreThanSecondGen() { + for(String topic : topicQueueMappingTable.keySet()) { + TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic); + if (mappingDetail == null + || mappingDetail.getHostedQueues().isEmpty()) { + continue; + } + if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) { + log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail); + continue; + } + Iterator>> it = mappingDetail.getHostedQueues().entrySet().iterator(); + while (it.hasNext()) { + Map.Entry> entry = it.next(); + Integer queueId = entry.getKey(); + List items = entry.getValue(); + if (items.size() <= 2) { + continue; + } + LogicQueueMappingItem leaderItem = items.get(items.size() - 1); + LogicQueueMappingItem secLeaderItem = items.get(items.size() - 2); + if (!leaderItem.getBname().equals(mappingDetail.getBname()) + && !secLeaderItem.getBname().equals(mappingDetail.getBname())) { + it.remove(); + log.info("The topic queue {} {} is expired with items {}", mappingDetail.getTopic(), queueId, items); + } + } + } + } + } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java index 1749b8ed..f0f27a5c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java @@ -128,4 +128,17 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { .append(hostedQueues) .toHashCode(); } + + @Override + public String toString() { + return "TopicQueueMappingDetail{" + + "hostedQueues=" + hostedQueues + + ", topic='" + topic + '\'' + + ", totalQueues=" + totalQueues + + ", bname='" + bname + '\'' + + ", epoch=" + epoch + + ", dirty=" + dirty + + ", currIdMap=" + currIdMap + + '}'; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java index 53041aae..a6a7eb59 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java @@ -124,4 +124,16 @@ public class TopicQueueMappingInfo extends RemotingSerializable { .append(currIdMap) .toHashCode(); } + + @Override + public String toString() { + return "TopicQueueMappingInfo{" + + "topic='" + topic + '\'' + + ", totalQueues=" + totalQueues + + ", bname='" + bname + '\'' + + ", epoch=" + epoch + + ", dirty=" + dirty + + ", currIdMap=" + currIdMap + + '}'; + } } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index 60a0f811..a02d3f1f 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -169,6 +169,7 @@ public class RouteInfoManager { topicQueueMappingInfoTable.put(entry.getKey(), new HashMap()); } //Note asset brokerName equal entry.getValue().getBname() + //here use the mappingDetail.bname topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue()); } } -- GitLab