From 0c6ee5c50b2b6077ecbe06289e4b7d1a1ab50cca Mon Sep 17 00:00:00 2001 From: dongeforever Date: Wed, 17 Nov 2021 16:07:05 +0800 Subject: [PATCH] Add epoch, dirty to the topic mapping detail --- .../common/TopicQueueMappingDetail.java | 6 +++--- .../common/TopicQueueMappingInfo.java | 19 ++++++++++++++----- .../rocketmq/common/rpc/ClientMetadata.java | 2 ++ .../topic/UpdateStaticTopicSubCommand.java | 11 +++++++++++ 4 files changed, 30 insertions(+), 8 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java index c5d6ebb0..9b677512 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java @@ -29,8 +29,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { // make sure this value is not null private ConcurrentMap> hostedQueues = new ConcurrentHashMap>(); - public TopicQueueMappingDetail(String topic, int totalQueues, String bname, int gen) { - super(topic, totalQueues, bname, gen); + public TopicQueueMappingDetail(String topic, int totalQueues, String bname, int epoch) { + super(topic, totalQueues, bname, epoch); buildIdMap(); } @@ -118,7 +118,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { public TopicQueueMappingInfo cloneAsMappingInfo() { - TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.gen); + TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.epoch); topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0); topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1); diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java index 7f4a201d..b2e85911 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java @@ -28,19 +28,28 @@ public class TopicQueueMappingInfo extends RemotingSerializable { String topic; // redundant field int totalQueues; String bname; //identify the hosted broker name - int gen; //important to fence the old dirty data + int epoch; //important to fence the old dirty data + boolean dirty; //indicate if the data is dirty //register to broker to construct the route transient ConcurrentMap currIdMap = new ConcurrentHashMap(); //register to broker to help detect remapping failure transient ConcurrentMap prevIdMap = new ConcurrentHashMap(); - public TopicQueueMappingInfo(String topic, int totalQueues, String bname, int gen) { + public TopicQueueMappingInfo(String topic, int totalQueues, String bname, int epoch) { this.topic = topic; this.totalQueues = totalQueues; this.bname = bname; - this.gen = gen; + this.epoch = epoch; + this.dirty = false; } + public boolean isDirty() { + return dirty; + } + + public void setDirty(boolean dirty) { + this.dirty = dirty; + } public int getTotalQueues() { return totalQueues; @@ -58,8 +67,8 @@ public class TopicQueueMappingInfo extends RemotingSerializable { return topic; } - public int getGen() { - return gen; + public int getEpoch() { + return epoch; } public ConcurrentMap getCurrIdMap() { diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java index 23fbc6f5..e2dd0762 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java @@ -57,9 +57,11 @@ public class ClientMetadata { return new ConcurrentHashMap(); } ConcurrentMap mqEndPoints = new ConcurrentHashMap(); + int totalNums = 0; for (Map.Entry entry : route.getTopicQueueMappingByBroker().entrySet()) { String brokerName = entry.getKey(); + //TODO check the epoch of if (entry.getValue().getTotalQueues() > totalNums) { if (totalNums != 0) { log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index 1ca9fd5e..dc237807 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -21,6 +21,7 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; @@ -74,6 +75,16 @@ public class UpdateStaticTopicSubCommand implements SubCommand { defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); try { + + String topic = commandLine.getOptionValue('t').trim(); + int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim()); + String cluster = commandLine.getOptionValue('c').trim(); + + //first check the topic route + { + TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic); + + } TopicConfig topicConfig = new TopicConfig(); topicConfig.setReadQueueNums(8); topicConfig.setWriteQueueNums(8); -- GitLab