diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 9048584c42de1ed29cc9b2da23f4543a99f13d97..3eceb49134778601413b1974db1eacc48623e0d6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -653,7 +653,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements assert i == mappingItems.size() - 1; offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp); if (offset > 0) { - offset = item.computeStaticQueueOffset(offset); + offset = item.computeStaticQueueOffsetUpToEnd(offset); } } else { requestHeader.setPhysical(true); @@ -670,7 +670,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements || (item.checkIfEndOffsetDecided() && offsetResponseHeader.getOffset() >= item.getEndOffset())) { continue; } else { - offset = item.computeStaticQueueOffset(offsetResponseHeader.getOffset()); + offset = item.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset()); } } @@ -722,7 +722,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements } long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId()); - offset = mappingItem.computeStaticQueueOffset(offset); + offset = mappingItem.computeStaticQueueOffsetUpToEnd(offset); final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); @@ -774,7 +774,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements throw rpcResponse.getException(); } GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader(); - long offset = mappingItem.computeStaticQueueOffset(offsetResponseHeader.getOffset()); + long offset = mappingItem.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset()); final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index df4e4e29aa8026c230baa2afec7eb37834da736b..24d3f0797d462d5805031bf9bd5305be248f5ae7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -180,16 +180,16 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements && nextBeginOffset >= mappingItem.getEndOffset()) { nextBeginOffset = mappingItem.getEndOffset(); } - responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffset(nextBeginOffset)); + responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset)); } //handle min offset - responseHeader.setMinOffset(mappingItem.computeStaticQueueOffset(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset()))); + responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset()))); //handle max offset { if (mappingItem.checkIfEndOffsetDecided()) { responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), mappingDetail.computeMaxOffsetFromMapping(mappingContext.getGlobalId()))); } else { - responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffset(responseHeader.getMaxOffset())); + responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getMaxOffset())); } } //set the offsetDelta diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 2fe0f6f326c1ca91b7b3ce69b5267555f91de906..d254b8a7f1297800b22b827b2b73d736fce26182 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -135,7 +135,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } //no need to care the broker name - long staticLogicOffset = mappingItem.computeStaticQueueOffset(responseHeader.getQueueOffset()); + long staticLogicOffset = mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getQueueOffset()); if (staticLogicOffset < 0) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java index 01686fde667e269432c41b0103649c3ac98ef994..c855dfdc0e516b7db9c40e7006f421f2d26f60b6 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java @@ -22,7 +22,7 @@ public class LogicQueueMappingItem { this.timeOfEnd = timeOfEnd; } - public long computeStaticQueueOffset(long physicalQueueOffset) { + public long computeStaticQueueOffsetUpToEnd(long physicalQueueOffset) { if (physicalQueueOffset < startOffset) { return logicOffset; } @@ -33,6 +33,13 @@ public class LogicQueueMappingItem { return logicOffset + (physicalQueueOffset - startOffset); } + public long computeStaticQueueOffset(long physicalQueueOffset) { + if (physicalQueueOffset < startOffset) { + return logicOffset; + } + return logicOffset + (physicalQueueOffset - startOffset); + } + public long computePhysicalQueueOffset(long staticQueueOffset) { return (staticQueueOffset - logicOffset) + startOffset; } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index eb82cadae10d6346037c78d3722d54a1630f1a7b..7ac7ce86f1bd1dedf5651cb1ec911f5c84093200 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -296,6 +296,16 @@ public class TopicQueueMappingUtils { } } + public static long blockSeqRoundUp(long offset, long blockSeqSize) { + long num = offset/blockSeqSize; + long left = offset % blockSeqSize; + if (left < blockSeqSize/2) { + return (num + 1) * blockSeqSize; + } else { + return (num + 2) * blockSeqSize; + } + } + } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java index c46c28aea2305f6777beadee1bb07e38ab117d07..aa6d13428f850eec534fab2d592afb5693900256 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java @@ -81,9 +81,14 @@ public class RemappingStaticTopicSubCommand implements SubCommand { opt.setRequired(true); options.addOption(opt); - opt = new Option("f", "mapFile", true, "The map file name"); + opt = new Option("mf", "mapFile", true, "The mapping data file name "); opt.setRequired(false); options.addOption(opt); + + opt = new Option("fr", "forceReplace", true, "Force replace the old mapping"); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -111,7 +116,12 @@ public class RemappingStaticTopicSubCommand implements SubCommand { throw new RuntimeException("The Cluster info is empty"); } clientMetadata.refreshClusterInfo(clusterInfo); - doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, false); + + boolean force = false; + if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) { + force = true; + } + doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force); return; }catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); @@ -161,7 +171,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) { throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset()); } - newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset() + 10000)); + newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), 10000)); TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname()); //fresh the new leader mapInConfig.getMappingDetail().putMappingInfo(globalId, items); @@ -171,7 +181,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { for (String broker: brokersToMapIn) { String addr = clientMetadata.findMasterBrokerAddr(broker); TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); - defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index 7c6abdcb476de4ccf3984cbb19bf5e66d5744ff1..cdb4fc0cffef4e62a8882e9ae4132193ed73a1a8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -82,7 +82,11 @@ public class UpdateStaticTopicSubCommand implements SubCommand { opt.setRequired(true); options.addOption(opt); - opt = new Option("f", "mapFile", true, "The map file name"); + opt = new Option("mf", "mapFile", true, "The mapping data file name"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("fr", "forceReplace", true, "Force replace the old mapping"); opt.setRequired(false); options.addOption(opt); @@ -104,6 +108,10 @@ public class UpdateStaticTopicSubCommand implements SubCommand { TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class); //double check the config TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap()); + boolean force = false; + if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) { + force = true; + } TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true); ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); @@ -112,7 +120,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { throw new RuntimeException("The Cluster info is empty"); } clientMetadata.refreshClusterInfo(clusterInfo); - doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt); + doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force); return; }catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); @@ -121,13 +129,13 @@ public class UpdateStaticTopicSubCommand implements SubCommand { } } - public void doUpdate(Map brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt) throws Exception { + public void doUpdate(Map brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception { //If some succeed, and others fail, it will cause inconsistent data for (Map.Entry entry : brokerConfigMap.entrySet()) { String broker = entry.getKey(); String addr = clientMetadata.findMasterBrokerAddr(broker); TopicConfigAndQueueMapping configMapping = entry.getValue(); - defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); } } @@ -288,7 +296,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { System.out.println("The new mapping data is written to file " + newMappingDataFile); } - doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt); + doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt, false); } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);