From 401843579e7d70ef900a02808bebb5c29277b2cd Mon Sep 17 00:00:00 2001 From: dongeforever Date: Sat, 20 Nov 2021 12:51:20 +0800 Subject: [PATCH] Polish the static topic command --- .../broker/processor/AdminBrokerProcessor.java | 8 ++++---- .../broker/processor/PullMessageProcessor.java | 6 +++--- .../broker/processor/SendMessageProcessor.java | 2 +- .../statictopic/LogicQueueMappingItem.java | 9 ++++++++- .../statictopic/TopicQueueMappingUtils.java | 10 ++++++++++ .../topic/RemappingStaticTopicSubCommand.java | 18 ++++++++++++++---- .../topic/UpdateStaticTopicSubCommand.java | 18 +++++++++++++----- 7 files changed, 53 insertions(+), 18 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 9048584c..3eceb491 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -653,7 +653,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements assert i == mappingItems.size() - 1; offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp); if (offset > 0) { - offset = item.computeStaticQueueOffset(offset); + offset = item.computeStaticQueueOffsetUpToEnd(offset); } } else { requestHeader.setPhysical(true); @@ -670,7 +670,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements || (item.checkIfEndOffsetDecided() && offsetResponseHeader.getOffset() >= item.getEndOffset())) { continue; } else { - offset = item.computeStaticQueueOffset(offsetResponseHeader.getOffset()); + offset = item.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset()); } } @@ -722,7 +722,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements } long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId()); - offset = mappingItem.computeStaticQueueOffset(offset); + offset = mappingItem.computeStaticQueueOffsetUpToEnd(offset); final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); @@ -774,7 +774,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements throw rpcResponse.getException(); } GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader(); - long offset = mappingItem.computeStaticQueueOffset(offsetResponseHeader.getOffset()); + long offset = mappingItem.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset()); final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index df4e4e29..24d3f079 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -180,16 +180,16 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements && nextBeginOffset >= mappingItem.getEndOffset()) { nextBeginOffset = mappingItem.getEndOffset(); } - responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffset(nextBeginOffset)); + responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset)); } //handle min offset - responseHeader.setMinOffset(mappingItem.computeStaticQueueOffset(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset()))); + responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset()))); //handle max offset { if (mappingItem.checkIfEndOffsetDecided()) { responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), mappingDetail.computeMaxOffsetFromMapping(mappingContext.getGlobalId()))); } else { - responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffset(responseHeader.getMaxOffset())); + responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getMaxOffset())); } } //set the offsetDelta diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 2fe0f6f3..d254b8a7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -135,7 +135,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } //no need to care the broker name - long staticLogicOffset = mappingItem.computeStaticQueueOffset(responseHeader.getQueueOffset()); + long staticLogicOffset = mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getQueueOffset()); if (staticLogicOffset < 0) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java index 01686fde..c855dfdc 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java @@ -22,7 +22,7 @@ public class LogicQueueMappingItem { this.timeOfEnd = timeOfEnd; } - public long computeStaticQueueOffset(long physicalQueueOffset) { + public long computeStaticQueueOffsetUpToEnd(long physicalQueueOffset) { if (physicalQueueOffset < startOffset) { return logicOffset; } @@ -33,6 +33,13 @@ public class LogicQueueMappingItem { return logicOffset + (physicalQueueOffset - startOffset); } + public long computeStaticQueueOffset(long physicalQueueOffset) { + if (physicalQueueOffset < startOffset) { + return logicOffset; + } + return logicOffset + (physicalQueueOffset - startOffset); + } + public long computePhysicalQueueOffset(long staticQueueOffset) { return (staticQueueOffset - logicOffset) + startOffset; } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index eb82cada..7ac7ce86 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -296,6 +296,16 @@ public class TopicQueueMappingUtils { } } + public static long blockSeqRoundUp(long offset, long blockSeqSize) { + long num = offset/blockSeqSize; + long left = offset % blockSeqSize; + if (left < blockSeqSize/2) { + return (num + 1) * blockSeqSize; + } else { + return (num + 2) * blockSeqSize; + } + } + } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java index c46c28ae..aa6d1342 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java @@ -81,9 +81,14 @@ public class RemappingStaticTopicSubCommand implements SubCommand { opt.setRequired(true); options.addOption(opt); - opt = new Option("f", "mapFile", true, "The map file name"); + opt = new Option("mf", "mapFile", true, "The mapping data file name "); opt.setRequired(false); options.addOption(opt); + + opt = new Option("fr", "forceReplace", true, "Force replace the old mapping"); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -111,7 +116,12 @@ public class RemappingStaticTopicSubCommand implements SubCommand { throw new RuntimeException("The Cluster info is empty"); } clientMetadata.refreshClusterInfo(clusterInfo); - doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, false); + + boolean force = false; + if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) { + force = true; + } + doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force); return; }catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); @@ -161,7 +171,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) { throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset()); } - newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset() + 10000)); + newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), 10000)); TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname()); //fresh the new leader mapInConfig.getMappingDetail().putMappingInfo(globalId, items); @@ -171,7 +181,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { for (String broker: brokersToMapIn) { String addr = clientMetadata.findMasterBrokerAddr(broker); TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); - defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index 7c6abdcb..cdb4fc0c 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -82,7 +82,11 @@ public class UpdateStaticTopicSubCommand implements SubCommand { opt.setRequired(true); options.addOption(opt); - opt = new Option("f", "mapFile", true, "The map file name"); + opt = new Option("mf", "mapFile", true, "The mapping data file name"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("fr", "forceReplace", true, "Force replace the old mapping"); opt.setRequired(false); options.addOption(opt); @@ -104,6 +108,10 @@ public class UpdateStaticTopicSubCommand implements SubCommand { TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class); //double check the config TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap()); + boolean force = false; + if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) { + force = true; + } TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true); ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); @@ -112,7 +120,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { throw new RuntimeException("The Cluster info is empty"); } clientMetadata.refreshClusterInfo(clusterInfo); - doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt); + doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force); return; }catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); @@ -121,13 +129,13 @@ public class UpdateStaticTopicSubCommand implements SubCommand { } } - public void doUpdate(Map brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt) throws Exception { + public void doUpdate(Map brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception { //If some succeed, and others fail, it will cause inconsistent data for (Map.Entry entry : brokerConfigMap.entrySet()) { String broker = entry.getKey(); String addr = clientMetadata.findMasterBrokerAddr(broker); TopicConfigAndQueueMapping configMapping = entry.getValue(); - defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); } } @@ -288,7 +296,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { System.out.println("The new mapping data is written to file " + newMappingDataFile); } - doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt); + doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt, false); } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); -- GitLab