...
 
Commits (3)
    https://gitcode.net/apacherocketmq/rocketmq/-/commit/cf8b84659fbf5ca22c31fa336a615843f98fde01 Polish the lock and persist for updateTopicQueueMapping 2021-11-20T12:33:15+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/401843579e7d70ef900a02808bebb5c29277b2cd Polish the static topic command 2021-11-20T12:51:20+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/daf47490c3e26720762534462c41a77b15e97ea2 Polish the use of route data 2021-11-20T13:20:39+08:00 dongeforever dongeforever@apache.org
...@@ -343,7 +343,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -343,7 +343,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
response.setCode(ResponseCode.SUCCESS); response.setCode(ResponseCode.SUCCESS);
} catch (Exception e) { } catch (Exception e) {
log.error("Update static failed for [{}]", request, e); log.error("Update static topic failed for [{}]", request, e);
response.setCode(ResponseCode.SYSTEM_ERROR); response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(e.getMessage()); response.setRemark(e.getMessage());
} }
...@@ -653,7 +653,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -653,7 +653,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
assert i == mappingItems.size() - 1; assert i == mappingItems.size() - 1;
offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp); offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp);
if (offset > 0) { if (offset > 0) {
offset = item.computeStaticQueueOffset(offset); offset = item.computeStaticQueueOffsetUpToEnd(offset);
} }
} else { } else {
requestHeader.setPhysical(true); requestHeader.setPhysical(true);
...@@ -670,7 +670,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -670,7 +670,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
|| (item.checkIfEndOffsetDecided() && offsetResponseHeader.getOffset() >= item.getEndOffset())) { || (item.checkIfEndOffsetDecided() && offsetResponseHeader.getOffset() >= item.getEndOffset())) {
continue; continue;
} else { } else {
offset = item.computeStaticQueueOffset(offsetResponseHeader.getOffset()); offset = item.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset());
} }
} }
...@@ -722,7 +722,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -722,7 +722,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
} }
long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId()); long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId());
offset = mappingItem.computeStaticQueueOffset(offset); offset = mappingItem.computeStaticQueueOffsetUpToEnd(offset);
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
...@@ -774,7 +774,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -774,7 +774,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
throw rpcResponse.getException(); throw rpcResponse.getException();
} }
GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader(); GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader();
long offset = mappingItem.computeStaticQueueOffset(offsetResponseHeader.getOffset()); long offset = mappingItem.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset());
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader(); final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
......
...@@ -180,16 +180,16 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -180,16 +180,16 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
&& nextBeginOffset >= mappingItem.getEndOffset()) { && nextBeginOffset >= mappingItem.getEndOffset()) {
nextBeginOffset = mappingItem.getEndOffset(); nextBeginOffset = mappingItem.getEndOffset();
} }
responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffset(nextBeginOffset)); responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset));
} }
//handle min offset //handle min offset
responseHeader.setMinOffset(mappingItem.computeStaticQueueOffset(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset()))); responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset())));
//handle max offset //handle max offset
{ {
if (mappingItem.checkIfEndOffsetDecided()) { if (mappingItem.checkIfEndOffsetDecided()) {
responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), mappingDetail.computeMaxOffsetFromMapping(mappingContext.getGlobalId()))); responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), mappingDetail.computeMaxOffsetFromMapping(mappingContext.getGlobalId())));
} else { } else {
responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffset(responseHeader.getMaxOffset())); responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getMaxOffset()));
} }
} }
//set the offsetDelta //set the offsetDelta
......
...@@ -135,7 +135,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -135,7 +135,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
} }
//no need to care the broker name //no need to care the broker name
long staticLogicOffset = mappingItem.computeStaticQueueOffset(responseHeader.getQueueOffset()); long staticLogicOffset = mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getQueueOffset());
if (staticLogicOffset < 0) { if (staticLogicOffset < 0) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
} }
......
...@@ -36,6 +36,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; ...@@ -36,6 +36,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
...@@ -57,9 +58,15 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -57,9 +58,15 @@ public class TopicQueueMappingManager extends ConfigManager {
this.brokerController = brokerController; this.brokerController = brokerController;
} }
public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) { public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) throws Exception {
lock.lock(); boolean locked = false;
boolean updated = false;
try { try {
if (lock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
locked = true;
} else {
return;
}
if (newDetail == null) { if (newDetail == null) {
return; return;
} }
...@@ -70,6 +77,7 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -70,6 +77,7 @@ public class TopicQueueMappingManager extends ConfigManager {
TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic()); TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic());
if (oldDetail == null) { if (oldDetail == null) {
topicQueueMappingTable.put(newDetail.getTopic(), newDetail); topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
updated = true;
return; return;
} }
if (force) { if (force) {
...@@ -77,6 +85,7 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -77,6 +85,7 @@ public class TopicQueueMappingManager extends ConfigManager {
newDetail.getHostedQueues().putIfAbsent(queueId, items); newDetail.getHostedQueues().putIfAbsent(queueId, items);
}); });
topicQueueMappingTable.put(newDetail.getTopic(), newDetail); topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
updated = true;
return; return;
} }
//do more check //do more check
...@@ -94,8 +103,14 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -94,8 +103,14 @@ public class TopicQueueMappingManager extends ConfigManager {
} }
} }
topicQueueMappingTable.put(newDetail.getTopic(), newDetail); topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
} finally { updated = true;
lock.unlock(); } finally {
if (locked) {
this.lock.unlock();
}
if (updated) {
this.persist();
}
} }
} }
......
...@@ -10,8 +10,11 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; ...@@ -10,8 +10,11 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
...@@ -93,33 +96,46 @@ public class ClientMetadata { ...@@ -93,33 +96,46 @@ public class ClientMetadata {
|| route.getTopicQueueMappingByBroker().isEmpty()) { || route.getTopicQueueMappingByBroker().isEmpty()) {
return new ConcurrentHashMap<MessageQueue, String>(); return new ConcurrentHashMap<MessageQueue, String>();
} }
ConcurrentMap<MessageQueue, String> mqEndPoints = new ConcurrentHashMap<MessageQueue, String>(); ConcurrentMap<MessageQueue, TopicQueueMappingInfo> mqEndPoints = new ConcurrentHashMap<MessageQueue, TopicQueueMappingInfo>();
int totalNums = 0;
for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) { List<Map.Entry<String, TopicQueueMappingInfo>> mappingInfos = new ArrayList<Map.Entry<String, TopicQueueMappingInfo>>(route.getTopicQueueMappingByBroker().entrySet());
String brokerName = entry.getKey(); Collections.sort(mappingInfos, new Comparator<Map.Entry<String, TopicQueueMappingInfo>>() {
//TODO check the epoch of @Override
if (entry.getValue().getTotalQueues() > totalNums) { public int compare(Map.Entry<String, TopicQueueMappingInfo> o1, Map.Entry<String, TopicQueueMappingInfo> o2) {
if (totalNums != 0) { return (int) (o2.getValue().getEpoch() - o1.getValue().getEpoch());
log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues()); }
} });
totalNums = entry.getValue().getTotalQueues();
int maxTotalNums = 0;
long maxTotalNumOfEpoch = -1;
for (Map.Entry<String, TopicQueueMappingInfo> entry : mappingInfos) {
TopicQueueMappingInfo info = entry.getValue();
if (info.getEpoch() >= maxTotalNumOfEpoch && info.getTotalQueues() > maxTotalNums) {
maxTotalNums = entry.getValue().getTotalQueues();
} }
for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) { for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
int globalId = idEntry.getKey(); int globalId = idEntry.getKey();
MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, globalId); MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, globalId);
String oldBrokerName = mqEndPoints.put(mq, brokerName); TopicQueueMappingInfo oldInfo = mqEndPoints.get(mq);
log.warn("The static logic queue is duplicated {} {} {} ", mq, oldBrokerName, brokerName); if (oldInfo == null || oldInfo.getEpoch() <= info.getEpoch()) {
mqEndPoints.put(mq, info);
}
} }
} }
ConcurrentMap<MessageQueue, String> mqEndPointsOfBroker = new ConcurrentHashMap<MessageQueue, String>();
//accomplish the static logic queues //accomplish the static logic queues
for (int i = 0; i < totalNums; i++) { for (int i = 0; i < maxTotalNums; i++) {
MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, i); MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, i);
if (!mqEndPoints.containsKey(mq)) { if (!mqEndPoints.containsKey(mq)) {
mqEndPoints.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST); mqEndPointsOfBroker.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST);
} else {
mqEndPointsOfBroker.put(mq, mqEndPoints.get(mq).getBname());
} }
} }
return mqEndPoints; return mqEndPointsOfBroker;
} }
} }
...@@ -2,14 +2,14 @@ package org.apache.rocketmq.common.statictopic; ...@@ -2,14 +2,14 @@ package org.apache.rocketmq.common.statictopic;
public class LogicQueueMappingItem { public class LogicQueueMappingItem {
private int gen; //generation, mutable private final int gen; // immutable
private int queueId; private final int queueId; //, immutable
private String bname; private final String bname; //important, immutable
private long logicOffset; // the start of the logic offset private long logicOffset; // the start of the logic offset, important, can be changed by command only once
private long startOffset; // the start of the physical offset, included private final long startOffset; // the start of the physical offset, should always be 0, immutable
private long endOffset = -1; // the end of the physical offset, excluded private long endOffset = -1; // the end of the physical offset, excluded, revered -1, mutable
private long timeOfStart = -1; // mutable private long timeOfStart = -1; // mutable, reserved
private long timeOfEnd = -1; // mutable private long timeOfEnd = -1; // mutable, reserved
public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long endOffset, long timeOfStart, long timeOfEnd) { public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long endOffset, long timeOfStart, long timeOfEnd) {
this.gen = gen; this.gen = gen;
...@@ -22,7 +22,7 @@ public class LogicQueueMappingItem { ...@@ -22,7 +22,7 @@ public class LogicQueueMappingItem {
this.timeOfEnd = timeOfEnd; this.timeOfEnd = timeOfEnd;
} }
public long computeStaticQueueOffset(long physicalQueueOffset) { public long computeStaticQueueOffsetUpToEnd(long physicalQueueOffset) {
if (physicalQueueOffset < startOffset) { if (physicalQueueOffset < startOffset) {
return logicOffset; return logicOffset;
} }
...@@ -33,6 +33,13 @@ public class LogicQueueMappingItem { ...@@ -33,6 +33,13 @@ public class LogicQueueMappingItem {
return logicOffset + (physicalQueueOffset - startOffset); return logicOffset + (physicalQueueOffset - startOffset);
} }
public long computeStaticQueueOffset(long physicalQueueOffset) {
if (physicalQueueOffset < startOffset) {
return logicOffset;
}
return logicOffset + (physicalQueueOffset - startOffset);
}
public long computePhysicalQueueOffset(long staticQueueOffset) { public long computePhysicalQueueOffset(long staticQueueOffset) {
return (staticQueueOffset - logicOffset) + startOffset; return (staticQueueOffset - logicOffset) + startOffset;
} }
......
...@@ -45,14 +45,13 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -45,14 +45,13 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
public void buildIdMap() { public void buildIdMap() {
this.currIdMap = buildIdMap(LEVEL_0); this.currIdMap = buildIdMap(LEVEL_0);
this.prevIdMap = buildIdMap(LEVEL_1);
} }
public ConcurrentMap<Integer, Integer> buildIdMap(int level) { public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
//level 0 means current leader in this broker //level 0 means current leader in this broker
//level 1 means previous leader in this broker //level 1 means previous leader in this broker, reserved for
assert level == LEVEL_0 || level == LEVEL_1; assert level == LEVEL_0 ;
if (hostedQueues == null || hostedQueues.isEmpty()) { if (hostedQueues == null || hostedQueues.isEmpty()) {
return new ConcurrentHashMap<Integer, Integer>(); return new ConcurrentHashMap<Integer, Integer>();
...@@ -67,12 +66,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -67,12 +66,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
if (bname.equals(curr.getBname())) { if (bname.equals(curr.getBname())) {
tmpIdMap.put(globalId, curr.getQueueId()); tmpIdMap.put(globalId, curr.getQueueId());
} }
} else if (level == LEVEL_1
&& items.size() >= 2) {
LogicQueueMappingItem prev = items.get(items.size() - 1);
if (bname.equals(prev.getBname())) {
tmpIdMap.put(globalId, prev.getQueueId());
}
} }
} }
return tmpIdMap; return tmpIdMap;
...@@ -120,8 +113,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -120,8 +113,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
public TopicQueueMappingInfo cloneAsMappingInfo() { public TopicQueueMappingInfo cloneAsMappingInfo() {
TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.epoch); TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.epoch);
topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0); topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1);
return topicQueueMappingInfo; return topicQueueMappingInfo;
} }
......
...@@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentMap; ...@@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentMap;
public class TopicQueueMappingInfo extends RemotingSerializable { public class TopicQueueMappingInfo extends RemotingSerializable {
public static final int LEVEL_0 = 0; public static final int LEVEL_0 = 0;
public static final int LEVEL_1 = 1;
String topic; // redundant field String topic; // redundant field
int totalQueues; int totalQueues;
...@@ -32,8 +31,6 @@ public class TopicQueueMappingInfo extends RemotingSerializable { ...@@ -32,8 +31,6 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
boolean dirty; //indicate if the data is dirty boolean dirty; //indicate if the data is dirty
//register to broker to construct the route //register to broker to construct the route
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>(); transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
//register to broker to help detect remapping failure
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
public TopicQueueMappingInfo(String topic, int totalQueues, String bname, long epoch) { public TopicQueueMappingInfo(String topic, int totalQueues, String bname, long epoch) {
this.topic = topic; this.topic = topic;
...@@ -79,8 +76,4 @@ public class TopicQueueMappingInfo extends RemotingSerializable { ...@@ -79,8 +76,4 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
public ConcurrentMap<Integer, Integer> getCurrIdMap() { public ConcurrentMap<Integer, Integer> getCurrIdMap() {
return currIdMap; return currIdMap;
} }
public ConcurrentMap<Integer, Integer> getPrevIdMap() {
return prevIdMap;
}
} }
...@@ -296,6 +296,16 @@ public class TopicQueueMappingUtils { ...@@ -296,6 +296,16 @@ public class TopicQueueMappingUtils {
} }
} }
public static long blockSeqRoundUp(long offset, long blockSeqSize) {
long num = offset/blockSeqSize;
long left = offset % blockSeqSize;
if (left < blockSeqSize/2) {
return (num + 1) * blockSeqSize;
} else {
return (num + 2) * blockSeqSize;
}
}
} }
...@@ -168,6 +168,7 @@ public class RouteInfoManager { ...@@ -168,6 +168,7 @@ public class RouteInfoManager {
if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) { if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<String, TopicQueueMappingInfo>()); topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<String, TopicQueueMappingInfo>());
} }
//Note asset brokerName equal entry.getValue().getBname()
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue()); topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
} }
} }
......
...@@ -81,9 +81,14 @@ public class RemappingStaticTopicSubCommand implements SubCommand { ...@@ -81,9 +81,14 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
opt.setRequired(true); opt.setRequired(true);
options.addOption(opt); options.addOption(opt);
opt = new Option("f", "mapFile", true, "The map file name"); opt = new Option("mf", "mapFile", true, "The mapping data file name ");
opt.setRequired(false); opt.setRequired(false);
options.addOption(opt); options.addOption(opt);
opt = new Option("fr", "forceReplace", true, "Force replace the old mapping");
opt.setRequired(false);
options.addOption(opt);
return options; return options;
} }
...@@ -111,7 +116,12 @@ public class RemappingStaticTopicSubCommand implements SubCommand { ...@@ -111,7 +116,12 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
throw new RuntimeException("The Cluster info is empty"); throw new RuntimeException("The Cluster info is empty");
} }
clientMetadata.refreshClusterInfo(clusterInfo); clientMetadata.refreshClusterInfo(clusterInfo);
doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, false);
boolean force = false;
if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) {
force = true;
}
doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force);
return; return;
}catch (Exception e) { }catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
...@@ -161,7 +171,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { ...@@ -161,7 +171,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) { if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset()); throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
} }
newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset() + 10000)); newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), 10000));
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname()); TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
//fresh the new leader //fresh the new leader
mapInConfig.getMappingDetail().putMappingInfo(globalId, items); mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
...@@ -171,7 +181,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { ...@@ -171,7 +181,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
for (String broker: brokersToMapIn) { for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker); String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false); defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
} }
} }
......
...@@ -82,7 +82,11 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -82,7 +82,11 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
opt.setRequired(true); opt.setRequired(true);
options.addOption(opt); options.addOption(opt);
opt = new Option("f", "mapFile", true, "The map file name"); opt = new Option("mf", "mapFile", true, "The mapping data file name");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("fr", "forceReplace", true, "Force replace the old mapping");
opt.setRequired(false); opt.setRequired(false);
options.addOption(opt); options.addOption(opt);
...@@ -104,6 +108,10 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -104,6 +108,10 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class); TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class);
//double check the config //double check the config
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap()); TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap());
boolean force = false;
if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) {
force = true;
}
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true); TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
...@@ -112,7 +120,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -112,7 +120,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
throw new RuntimeException("The Cluster info is empty"); throw new RuntimeException("The Cluster info is empty");
} }
clientMetadata.refreshClusterInfo(clusterInfo); clientMetadata.refreshClusterInfo(clusterInfo);
doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt); doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force);
return; return;
}catch (Exception e) { }catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
...@@ -121,13 +129,13 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -121,13 +129,13 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
} }
} }
public void doUpdate(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt) throws Exception { public void doUpdate(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
//If some succeed, and others fail, it will cause inconsistent data //If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) { for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey(); String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker); String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue(); TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false); defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
} }
} }
...@@ -288,7 +296,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -288,7 +296,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
System.out.println("The new mapping data is written to file " + newMappingDataFile); System.out.println("The new mapping data is written to file " + newMappingDataFile);
} }
doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt); doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt, false);
} catch (Exception e) { } catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......