提交 4e9b0974 编写于 作者: D dongeforever

Check the correctness of logic items

上级 f05cfb92
......@@ -7,7 +7,7 @@ public class LogicQueueMappingItem {
private String bname;
private long logicOffset; // the start of the logic offset
private long startOffset; // the start of the physical offset, included
private long endOffset; // the end of the physical offset, excluded
private long endOffset = -1; // the end of the physical offset, excluded
private long timeOfStart = -1; // mutable
private long timeOfEnd = -1; // mutable
......@@ -162,6 +162,72 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
public static void checkLogicQueueMappingItemOffset(ImmutableList<LogicQueueMappingItem> oldItems, ImmutableList<LogicQueueMappingItem> newItems) {
if (oldItems == null || oldItems.isEmpty()) {
if (newItems == null || newItems.isEmpty() || newItems.size() < oldItems.size()) {
throw new RuntimeException("The new item list is smaller than old ones");
int iold = 0, inew = 0;
while (iold < oldItems.size() && inew < newItems.size()) {
LogicQueueMappingItem newItem = newItems.get(inew);
LogicQueueMappingItem oldItem = oldItems.get(iold);
if (newItem.getGen() < oldItem.getGen()) {
} else if (oldItem.getGen() < newItem.getGen()){
throw new RuntimeException("The gen is not correct for old item");
} else {
assert oldItem.getBname().equals(newItem.getBname());
assert oldItem.getQueueId() == newItem.getQueueId();
assert oldItem.getStartOffset() == newItem.getStartOffset();
if (oldItem.getLogicOffset() != -1) {
assert oldItem.getLogicOffset() == newItem.getLogicOffset();
public static void checkLogicQueueMappingItemOffset(ImmutableList<LogicQueueMappingItem> items) {
if (items == null
|| items.isEmpty()) {
int lastGen = -1;
long lastOffset = -1;
for (int i = items.size() - 1; i >=0 ; i--) {
LogicQueueMappingItem item = items.get(i);
if (item.getStartOffset() < 0
|| item.getGen() < 0
|| item.getQueueId() < 0) {
throw new RuntimeException("The field is illegal, should not be negative");
if (lastGen != -1 && item.getGen() >= lastGen) {
throw new RuntimeException("The gen dose not increase monotonically");
if (item.getEndOffset() != -1
&& item.getEndOffset() < item.getStartOffset()) {
throw new RuntimeException("The endOffset is smaller than the start offset");
if (lastOffset != -1 && item.getLogicOffset() != -1) {
if (item.getLogicOffset() >= lastOffset) {
throw new RuntimeException("The base logic offset dose not increase monotonically");
if (item.computeMaxStaticQueueOffset() >= lastOffset) {
throw new RuntimeException("The max logic offset dose not increase monotonically");
lastGen = item.getGen();
lastOffset = item.getLogicOffset();
public static Map<Integer, TopicQueueMappingOne> checkAndBuildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) {
Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
......@@ -178,6 +244,7 @@ public class TopicQueueMappingUtils {
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
Integer globalid = entry.getKey();
String leaderBrokerName = getLeaderBroker(entry.getValue());
if (!leaderBrokerName.equals(mappingDetail.getBname())) {
//not the leader
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册