diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index a286818c7f5b03fd39aeb86168023f775a3b5f09..20dfc8516b698a67fba292808250b23ea6431358 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -353,13 +353,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements } this.brokerController.getTopicConfigManager().deleteTopicConfig(topic); + this.brokerController.getTopicQueueMappingManager().delete(topic); + this.brokerController.getMessageStore() .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) { this.brokerController.getBrokerStatsManager().onTopicDeleted(requestHeader.getTopic()); } - //TODO delete the topic route - //this.brokerController.getTopicQueueMappingManager() + response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index 9be37176a22fa75e24fe2378b6d517d5ce492fcf..c484bcf316a152f36e0e46fdf3c1399252f2a04f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -33,7 +33,9 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -61,6 +63,7 @@ public class TopicQueueMappingManager extends ConfigManager { public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) throws Exception { boolean locked = false; boolean updated = false; + TopicQueueMappingDetail oldDetail = null; try { if (lock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { locked = true; @@ -74,7 +77,7 @@ public class TopicQueueMappingManager extends ConfigManager { TopicQueueMappingUtils.checkLogicQueueMappingItemOffset(items); }); - TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic()); + oldDetail = topicQueueMappingTable.get(newDetail.getTopic()); if (oldDetail == null) { topicQueueMappingTable.put(newDetail.getTopic(), newDetail); updated = true; @@ -115,11 +118,23 @@ public class TopicQueueMappingManager extends ConfigManager { } if (updated) { this.persist(); + log.info("Update topic queue mapping from [{}] to [{}], force {}", oldDetail, newDetail, force); } } } + public void delete(final String topic) { + TopicQueueMappingDetail old = this.topicQueueMappingTable.remove(topic); + if (old != null) { + log.info("delete topic queue mapping OK, topic queue mapping: {}", old); + this.dataVersion.nextVersion(); + this.persist(); + } else { + log.warn("delete topic queue mapping failed, topic: {} not exists", topic); + } + } + public TopicQueueMappingDetail getTopicQueueMapping(String topic) { return topicQueueMappingTable.get(topic); } @@ -177,6 +192,9 @@ public class TopicQueueMappingManager extends ConfigManager { //it is not static topic return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null); } + + assert mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName()); + //If not find mappingItem, it encounters some errors Integer globalId = requestHeader.getQueueId(); if (globalId < 0 && !selectOneWhenMiss) { @@ -224,4 +242,34 @@ public class TopicQueueMappingManager extends ConfigManager { } + public void cleanItemListMoreThanSecondGen() { + for(String topic : topicQueueMappingTable.keySet()) { + TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic); + if (mappingDetail == null + || mappingDetail.getHostedQueues().isEmpty()) { + continue; + } + if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) { + log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail); + continue; + } + Iterator>> it = mappingDetail.getHostedQueues().entrySet().iterator(); + while (it.hasNext()) { + Map.Entry> entry = it.next(); + Integer queueId = entry.getKey(); + List items = entry.getValue(); + if (items.size() <= 2) { + continue; + } + LogicQueueMappingItem leaderItem = items.get(items.size() - 1); + LogicQueueMappingItem secLeaderItem = items.get(items.size() - 2); + if (!leaderItem.getBname().equals(mappingDetail.getBname()) + && !secLeaderItem.getBname().equals(mappingDetail.getBname())) { + it.remove(); + log.info("The topic queue {} {} is expired with items {}", mappingDetail.getTopic(), queueId, items); + } + } + } + } + } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java index 1749b8ed7cb97e55524b0c31ce8f0909808fd805..f0f27a5cd0c5bd79eb76c5ab032d224bd2ab3da2 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java @@ -128,4 +128,17 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { .append(hostedQueues) .toHashCode(); } + + @Override + public String toString() { + return "TopicQueueMappingDetail{" + + "hostedQueues=" + hostedQueues + + ", topic='" + topic + '\'' + + ", totalQueues=" + totalQueues + + ", bname='" + bname + '\'' + + ", epoch=" + epoch + + ", dirty=" + dirty + + ", currIdMap=" + currIdMap + + '}'; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java index 53041aaee8a5a54a9f3b7718abd6d3927fa63fef..a6a7eb5975a13772f679ec623fbd2bfee63342db 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java @@ -124,4 +124,16 @@ public class TopicQueueMappingInfo extends RemotingSerializable { .append(currIdMap) .toHashCode(); } + + @Override + public String toString() { + return "TopicQueueMappingInfo{" + + "topic='" + topic + '\'' + + ", totalQueues=" + totalQueues + + ", bname='" + bname + '\'' + + ", epoch=" + epoch + + ", dirty=" + dirty + + ", currIdMap=" + currIdMap + + '}'; + } } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index 60a0f811ebee141b96f13cf0911661d5f956a6a9..a02d3f1f445990fdf0e49d6a312c69ab660d4cbc 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -169,6 +169,7 @@ public class RouteInfoManager { topicQueueMappingInfoTable.put(entry.getKey(), new HashMap()); } //Note asset brokerName equal entry.getValue().getBname() + //here use the mappingDetail.bname topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue()); } }