提交 40184357 编写于 作者: D dongeforever

Polish the static topic command

上级 cf8b8465
......@@ -653,7 +653,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
assert i == mappingItems.size() - 1;
offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp);
if (offset > 0) {
offset = item.computeStaticQueueOffset(offset);
offset = item.computeStaticQueueOffsetUpToEnd(offset);
}
} else {
requestHeader.setPhysical(true);
......@@ -670,7 +670,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
|| (item.checkIfEndOffsetDecided() && offsetResponseHeader.getOffset() >= item.getEndOffset())) {
continue;
} else {
offset = item.computeStaticQueueOffset(offsetResponseHeader.getOffset());
offset = item.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset());
}
}
......@@ -722,7 +722,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
}
long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId());
offset = mappingItem.computeStaticQueueOffset(offset);
offset = mappingItem.computeStaticQueueOffsetUpToEnd(offset);
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
......@@ -774,7 +774,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
throw rpcResponse.getException();
}
GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader();
long offset = mappingItem.computeStaticQueueOffset(offsetResponseHeader.getOffset());
long offset = mappingItem.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset());
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
......
......@@ -180,16 +180,16 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
&& nextBeginOffset >= mappingItem.getEndOffset()) {
nextBeginOffset = mappingItem.getEndOffset();
}
responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffset(nextBeginOffset));
responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset));
}
//handle min offset
responseHeader.setMinOffset(mappingItem.computeStaticQueueOffset(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset())));
responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset())));
//handle max offset
{
if (mappingItem.checkIfEndOffsetDecided()) {
responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), mappingDetail.computeMaxOffsetFromMapping(mappingContext.getGlobalId())));
} else {
responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffset(responseHeader.getMaxOffset()));
responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getMaxOffset()));
}
}
//set the offsetDelta
......
......@@ -135,7 +135,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}
//no need to care the broker name
long staticLogicOffset = mappingItem.computeStaticQueueOffset(responseHeader.getQueueOffset());
long staticLogicOffset = mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getQueueOffset());
if (staticLogicOffset < 0) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}
......
......@@ -22,7 +22,7 @@ public class LogicQueueMappingItem {
this.timeOfEnd = timeOfEnd;
}
public long computeStaticQueueOffset(long physicalQueueOffset) {
public long computeStaticQueueOffsetUpToEnd(long physicalQueueOffset) {
if (physicalQueueOffset < startOffset) {
return logicOffset;
}
......@@ -33,6 +33,13 @@ public class LogicQueueMappingItem {
return logicOffset + (physicalQueueOffset - startOffset);
}
public long computeStaticQueueOffset(long physicalQueueOffset) {
if (physicalQueueOffset < startOffset) {
return logicOffset;
}
return logicOffset + (physicalQueueOffset - startOffset);
}
public long computePhysicalQueueOffset(long staticQueueOffset) {
return (staticQueueOffset - logicOffset) + startOffset;
}
......
......@@ -296,6 +296,16 @@ public class TopicQueueMappingUtils {
}
}
public static long blockSeqRoundUp(long offset, long blockSeqSize) {
long num = offset/blockSeqSize;
long left = offset % blockSeqSize;
if (left < blockSeqSize/2) {
return (num + 1) * blockSeqSize;
} else {
return (num + 2) * blockSeqSize;
}
}
}
......@@ -81,9 +81,14 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
opt.setRequired(true);
options.addOption(opt);
opt = new Option("f", "mapFile", true, "The map file name");
opt = new Option("mf", "mapFile", true, "The mapping data file name ");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("fr", "forceReplace", true, "Force replace the old mapping");
opt.setRequired(false);
options.addOption(opt);
return options;
}
......@@ -111,7 +116,12 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, false);
boolean force = false;
if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) {
force = true;
}
doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force);
return;
}catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......@@ -161,7 +171,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
}
newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset() + 10000));
newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), 10000));
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
//fresh the new leader
mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
......@@ -171,7 +181,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
}
......
......@@ -82,7 +82,11 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
opt.setRequired(true);
options.addOption(opt);
opt = new Option("f", "mapFile", true, "The map file name");
opt = new Option("mf", "mapFile", true, "The mapping data file name");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("fr", "forceReplace", true, "Force replace the old mapping");
opt.setRequired(false);
options.addOption(opt);
......@@ -104,6 +108,10 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class);
//double check the config
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap());
boolean force = false;
if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) {
force = true;
}
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
......@@ -112,7 +120,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt);
doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force);
return;
}catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......@@ -121,13 +129,13 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
}
public void doUpdate(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
public void doUpdate(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
}
......@@ -288,7 +296,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
System.out.println("The new mapping data is written to file " + newMappingDataFile);
}
doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt);
doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt, false);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册