提交 0c6ee5c5 编写于 作者: D dongeforever

Add epoch, dirty to the topic mapping detail

上级 215c0e4d
...@@ -29,8 +29,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -29,8 +29,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
// make sure this value is not null // make sure this value is not null
private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>(); private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
public TopicQueueMappingDetail(String topic, int totalQueues, String bname, int gen) { public TopicQueueMappingDetail(String topic, int totalQueues, String bname, int epoch) {
super(topic, totalQueues, bname, gen); super(topic, totalQueues, bname, epoch);
buildIdMap(); buildIdMap();
} }
...@@ -118,7 +118,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -118,7 +118,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
public TopicQueueMappingInfo cloneAsMappingInfo() { public TopicQueueMappingInfo cloneAsMappingInfo() {
TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.gen); TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.epoch);
topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0); topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1); topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1);
......
...@@ -28,19 +28,28 @@ public class TopicQueueMappingInfo extends RemotingSerializable { ...@@ -28,19 +28,28 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
String topic; // redundant field String topic; // redundant field
int totalQueues; int totalQueues;
String bname; //identify the hosted broker name String bname; //identify the hosted broker name
int gen; //important to fence the old dirty data int epoch; //important to fence the old dirty data
boolean dirty; //indicate if the data is dirty
//register to broker to construct the route //register to broker to construct the route
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>(); transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
//register to broker to help detect remapping failure //register to broker to help detect remapping failure
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>(); transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
public TopicQueueMappingInfo(String topic, int totalQueues, String bname, int gen) { public TopicQueueMappingInfo(String topic, int totalQueues, String bname, int epoch) {
this.topic = topic; this.topic = topic;
this.totalQueues = totalQueues; this.totalQueues = totalQueues;
this.bname = bname; this.bname = bname;
this.gen = gen; this.epoch = epoch;
this.dirty = false;
} }
public boolean isDirty() {
return dirty;
}
public void setDirty(boolean dirty) {
this.dirty = dirty;
}
public int getTotalQueues() { public int getTotalQueues() {
return totalQueues; return totalQueues;
...@@ -58,8 +67,8 @@ public class TopicQueueMappingInfo extends RemotingSerializable { ...@@ -58,8 +67,8 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
return topic; return topic;
} }
public int getGen() { public int getEpoch() {
return gen; return epoch;
} }
public ConcurrentMap<Integer, Integer> getCurrIdMap() { public ConcurrentMap<Integer, Integer> getCurrIdMap() {
......
...@@ -57,9 +57,11 @@ public class ClientMetadata { ...@@ -57,9 +57,11 @@ public class ClientMetadata {
return new ConcurrentHashMap<MessageQueue, String>(); return new ConcurrentHashMap<MessageQueue, String>();
} }
ConcurrentMap<MessageQueue, String> mqEndPoints = new ConcurrentHashMap<MessageQueue, String>(); ConcurrentMap<MessageQueue, String> mqEndPoints = new ConcurrentHashMap<MessageQueue, String>();
int totalNums = 0; int totalNums = 0;
for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) { for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) {
String brokerName = entry.getKey(); String brokerName = entry.getKey();
//TODO check the epoch of
if (entry.getValue().getTotalQueues() > totalNums) { if (entry.getValue().getTotalQueues() > totalNums) {
if (totalNums != 0) { if (totalNums != 0) {
log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues()); log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues());
......
...@@ -21,6 +21,7 @@ import org.apache.commons.cli.Option; ...@@ -21,6 +21,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.srvutil.ServerUtil;
...@@ -74,6 +75,16 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -74,6 +75,16 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try { try {
String topic = commandLine.getOptionValue('t').trim();
int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
String cluster = commandLine.getOptionValue('c').trim();
//first check the topic route
{
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
}
TopicConfig topicConfig = new TopicConfig(); TopicConfig topicConfig = new TopicConfig();
topicConfig.setReadQueueNums(8); topicConfig.setReadQueueNums(8);
topicConfig.setWriteQueueNums(8); topicConfig.setWriteQueueNums(8);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册