提交 c673cb73 编写于 作者: D dongeforever

Use timestamp as the epoch to prevent some unknown problem

上级 a5af2cf5
...@@ -29,7 +29,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -29,7 +29,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
// make sure this value is not null // make sure this value is not null
private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>(); private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
public TopicQueueMappingDetail(String topic, int totalQueues, String bname, int epoch) { public TopicQueueMappingDetail(String topic, int totalQueues, String bname, long epoch) {
super(topic, totalQueues, bname, epoch); super(topic, totalQueues, bname, epoch);
buildIdMap(); buildIdMap();
} }
......
...@@ -28,14 +28,14 @@ public class TopicQueueMappingInfo extends RemotingSerializable { ...@@ -28,14 +28,14 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
String topic; // redundant field String topic; // redundant field
int totalQueues; int totalQueues;
String bname; //identify the hosted broker name String bname; //identify the hosted broker name
int epoch; //important to fence the old dirty data long epoch; //important to fence the old dirty data
boolean dirty; //indicate if the data is dirty boolean dirty; //indicate if the data is dirty
//register to broker to construct the route //register to broker to construct the route
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>(); transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
//register to broker to help detect remapping failure //register to broker to help detect remapping failure
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>(); transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
public TopicQueueMappingInfo(String topic, int totalQueues, String bname, int epoch) { public TopicQueueMappingInfo(String topic, int totalQueues, String bname, long epoch) {
this.topic = topic; this.topic = topic;
this.totalQueues = totalQueues; this.totalQueues = totalQueues;
this.bname = bname; this.bname = bname;
...@@ -64,11 +64,11 @@ public class TopicQueueMappingInfo extends RemotingSerializable { ...@@ -64,11 +64,11 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
return topic; return topic;
} }
public int getEpoch() { public long getEpoch() {
return epoch; return epoch;
} }
public void setEpoch(int epoch) { public void setEpoch(long epoch) {
this.epoch = epoch; this.epoch = epoch;
} }
......
...@@ -89,8 +89,8 @@ public class TopicQueueMappingUtils { ...@@ -89,8 +89,8 @@ public class TopicQueueMappingUtils {
return new MappingAllocator(idToBroker, brokerNumMap); return new MappingAllocator(idToBroker, brokerNumMap);
} }
public static Map.Entry<Integer, Integer> findMaxEpochAndQueueNum(List<TopicQueueMappingDetail> mappingDetailList) { public static Map.Entry<Long, Integer> findMaxEpochAndQueueNum(List<TopicQueueMappingDetail> mappingDetailList) {
int epoch = -1; long epoch = -1;
int queueNum = 0; int queueNum = 0;
for (TopicQueueMappingDetail mappingDetail : mappingDetailList) { for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
if (mappingDetail.getEpoch() > epoch) { if (mappingDetail.getEpoch() > epoch) {
...@@ -100,14 +100,14 @@ public class TopicQueueMappingUtils { ...@@ -100,14 +100,14 @@ public class TopicQueueMappingUtils {
queueNum = mappingDetail.getTotalQueues(); queueNum = mappingDetail.getTotalQueues();
} }
} }
return new AbstractMap.SimpleImmutableEntry<Integer, Integer>(epoch, queueNum); return new AbstractMap.SimpleImmutableEntry<Long, Integer>(epoch, queueNum);
} }
public static Map<Integer, ImmutableList<LogicQueueMappingItem>> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace) { public static Map<Integer, ImmutableList<LogicQueueMappingItem>> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace) {
Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() { Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
@Override @Override
public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) { public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) {
return o2.getEpoch() - o1.getEpoch(); return (int)(o2.getEpoch() - o1.getEpoch());
} }
}); });
......
...@@ -35,7 +35,7 @@ public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeW ...@@ -35,7 +35,7 @@ public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeW
public static TopicConfigAndMappingSerializeWrapper from(TopicConfigSerializeWrapper wrapper) { public static TopicConfigAndMappingSerializeWrapper from(TopicConfigSerializeWrapper wrapper) {
if (wrapper instanceof TopicConfigAndMappingSerializeWrapper) { if (wrapper instanceof TopicConfigAndMappingSerializeWrapper) {
return (TopicConfigAndMappingSerializeWrapper)wrapper; return (TopicConfigAndMappingSerializeWrapper) wrapper;
} }
TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = new TopicConfigAndMappingSerializeWrapper(); TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = new TopicConfigAndMappingSerializeWrapper();
mappingSerializeWrapper.setDataVersion(wrapper.getDataVersion()); mappingSerializeWrapper.setDataVersion(wrapper.getDataVersion());
......
...@@ -134,7 +134,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -134,7 +134,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
} }
} }
Map.Entry<Integer, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(-1, queueNum); Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
if (!existedTopicConfigMap.isEmpty()) { if (!existedTopicConfigMap.isEmpty()) {
//make sure it it not null //make sure it it not null
existedTopicConfigMap.forEach((key, value) -> { existedTopicConfigMap.forEach((key, value) -> {
...@@ -155,7 +155,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -155,7 +155,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList()); List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
//check the epoch and qnum //check the epoch and qnum
maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList); maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
final Map.Entry<Integer, Integer> tmpMaxEpochAndNum = maxEpochAndNum; final Map.Entry<Long, Integer> tmpMaxEpochAndNum = maxEpochAndNum;
detailList.forEach( mappingDetail -> { detailList.forEach( mappingDetail -> {
if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) { if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname())); throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
...@@ -200,7 +200,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -200,7 +200,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Map<Integer, String> newIdToBroker = allocator.getIdToBroker(); Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
//construct the topic configAndMapping //construct the topic configAndMapping
int epoch = maxEpochAndNum.getKey() + 1; long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
newIdToBroker.forEach( (queueId, broker) -> { newIdToBroker.forEach( (queueId, broker) -> {
TopicConfigAndQueueMapping configMapping; TopicConfigAndQueueMapping configMapping;
if (!existedTopicConfigMap.containsKey(broker)) { if (!existedTopicConfigMap.containsKey(broker)) {
...@@ -218,6 +218,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -218,6 +218,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem)); configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
}); });
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) { for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
String broker = entry.getKey(); String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker); String addr = clientMetadata.findMasterBrokerAddr(broker);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册