diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java index c5d6ebb0a44d790acf6efd0c4b76b130ef9d11be..9b6775122bf0f288f327d7c744a3aa223579d7c0 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java @@ -29,8 +29,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { // make sure this value is not null private ConcurrentMap> hostedQueues = new ConcurrentHashMap>(); - public TopicQueueMappingDetail(String topic, int totalQueues, String bname, int gen) { - super(topic, totalQueues, bname, gen); + public TopicQueueMappingDetail(String topic, int totalQueues, String bname, int epoch) { + super(topic, totalQueues, bname, epoch); buildIdMap(); } @@ -118,7 +118,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { public TopicQueueMappingInfo cloneAsMappingInfo() { - TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.gen); + TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.epoch); topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0); topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1); diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java index 7f4a201dba6f7de95e2c39503a653ccac4b3bfbd..b2e85911aa1bd55118b2a8d6f9372e33efccdd6e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java @@ -28,19 +28,28 @@ public class TopicQueueMappingInfo extends RemotingSerializable { String topic; // redundant field int totalQueues; String bname; //identify the hosted broker name - int gen; //important to fence the old dirty data + int epoch; //important to fence the old dirty data + boolean dirty; //indicate if the data is dirty //register to broker to construct the route transient ConcurrentMap currIdMap = new ConcurrentHashMap(); //register to broker to help detect remapping failure transient ConcurrentMap prevIdMap = new ConcurrentHashMap(); - public TopicQueueMappingInfo(String topic, int totalQueues, String bname, int gen) { + public TopicQueueMappingInfo(String topic, int totalQueues, String bname, int epoch) { this.topic = topic; this.totalQueues = totalQueues; this.bname = bname; - this.gen = gen; + this.epoch = epoch; + this.dirty = false; } + public boolean isDirty() { + return dirty; + } + + public void setDirty(boolean dirty) { + this.dirty = dirty; + } public int getTotalQueues() { return totalQueues; @@ -58,8 +67,8 @@ public class TopicQueueMappingInfo extends RemotingSerializable { return topic; } - public int getGen() { - return gen; + public int getEpoch() { + return epoch; } public ConcurrentMap getCurrIdMap() { diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java index 23fbc6f54d7ef0a5ae812c531099e691a552130a..e2dd0762185f6ce3df0cc7e91aad3dfc7bce8dcc 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java @@ -57,9 +57,11 @@ public class ClientMetadata { return new ConcurrentHashMap(); } ConcurrentMap mqEndPoints = new ConcurrentHashMap(); + int totalNums = 0; for (Map.Entry entry : route.getTopicQueueMappingByBroker().entrySet()) { String brokerName = entry.getKey(); + //TODO check the epoch of if (entry.getValue().getTotalQueues() > totalNums) { if (totalNums != 0) { log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index 1ca9fd5efeabb3a83abb501e5475c90113c72e31..dc237807da32e37e45754f00e63dbe5fa20b5ebc 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -21,6 +21,7 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; @@ -74,6 +75,16 @@ public class UpdateStaticTopicSubCommand implements SubCommand { defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); try { + + String topic = commandLine.getOptionValue('t').trim(); + int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim()); + String cluster = commandLine.getOptionValue('c').trim(); + + //first check the topic route + { + TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic); + + } TopicConfig topicConfig = new TopicConfig(); topicConfig.setReadQueueNums(8); topicConfig.setWriteQueueNums(8);