diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index bdcba39bdc55c55399640783720a397ac99cffe9..17f19cb1fe21995cd400fa55f2aa4094c01698cd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -137,7 +137,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement //no need to care the broker name long staticLogicOffset = mappingItem.computeStaticQueueOffsetLoosely(responseHeader.getQueueOffset()); if (staticLogicOffset < 0) { - return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); + //if the logic offset is -1, just let it go + //maybe we need a dynamic config + //return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } responseHeader.setQueueId(mappingContext.getGlobalId()); responseHeader.setQueueOffset(staticLogicOffset); diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java index 9de620504dc3ce560c75e200c4eebb6171f429db..7eaf6a0c3d4ea392675ccb831c662de72e59f8f9 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java @@ -23,13 +23,17 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ForkJoinPool; import org.apache.log4j.Logger; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.rpc.ClientMetadata; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminUtils; import org.apache.rocketmq.tools.command.CommandUtil; @@ -187,4 +191,45 @@ public class MQAdminTestUtils { MQAdminUtils.remappingStaticTopic(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false, defaultMQAdminExt); } + //for test only + public static void remappingStaticTopicWithNegativeLogicOffset(String topic, Set targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception { + Map brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); + assert !brokerConfigMap.isEmpty(); + TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers); + MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt); + remappingStaticTopicWithNegativeLogicOffset(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false, defaultMQAdminExt); + } + + //for test only + public static void remappingStaticTopicWithNegativeLogicOffset(String topic, Set brokersToMapIn, Set brokersToMapOut, Map brokerConfigMap, int blockSeqSize, boolean force, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + + ClientMetadata clientMetadata = MQAdminUtils.getBrokerMetadata(defaultMQAdminExt); + MQAdminUtils.checkIfMasterAlive(brokerConfigMap.keySet(), defaultMQAdminExt, clientMetadata); + // now do the remapping + //Step1: let the new leader can be write without the logicOffset + for (String broker: brokersToMapIn) { + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); + } + //Step2: forbid the write of old leader + for (String broker: brokersToMapOut) { + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); + } + + //Step5: write the non-target brokers + for (String broker: brokerConfigMap.keySet()) { + if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) { + continue; + } + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); + } + } + + + } diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java index aeddef6dd697f3beade06f8fe8b42daed19670a4..9b348dd85928578f8154b69df3aa2d01db53d34a 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java @@ -52,6 +52,7 @@ public class StaticTopicIT extends BaseConf { defaultMQAdminExt.start(); } + @Test public void testNoTargetBrokers() throws Exception { String topic = "static" + MQRandomUtils.getRandomTopic(); @@ -357,6 +358,85 @@ public class StaticTopicIT extends BaseConf { } } + @Test + public void testRemappingWithNegativeLogicOffset() throws Exception { + String topic = "static" + MQRandomUtils.getRandomTopic(); + RMQNormalProducer producer = getProducer(nsAddr, topic); + int queueNum = 10; + int msgEachQueue = 100; + //create static topic + { + Set targetBrokers = new HashSet<>(); + targetBrokers.add(broker1Name); + MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt); + } + //System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name)); + //System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name)); + + //produce the messages + { + List messageQueueList = producer.getMessageQueue(); + for (int i = 0; i < queueNum; i++) { + MessageQueue messageQueue = messageQueueList.get(i); + Assert.assertEquals(i, messageQueue.getQueueId()); + Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName()); + } + for(MessageQueue messageQueue: messageQueueList) { + producer.send(msgEachQueue, messageQueue); + } + Assert.assertEquals(0, producer.getSendErrorMsg().size()); + //leave the time to build the cq + Thread.sleep(100); + for(MessageQueue messageQueue: messageQueueList) { + //Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue)); + Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue)); + } + } + + //remapping the static topic with -1 logic offset + { + Set targetBrokers = new HashSet<>(); + targetBrokers.add(broker2Name); + MQAdminTestUtils.remappingStaticTopicWithNegativeLogicOffset(topic, targetBrokers, defaultMQAdminExt); + Map remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); + + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap); + Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true); + Assert.assertEquals(queueNum, globalIdMap.size()); + for (TopicQueueMappingOne mappingOne: globalIdMap.values()) { + Assert.assertEquals(broker2Name, mappingOne.getBname()); + Assert.assertEquals(-1, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset()); + } + } + //leave the time to refresh the metadata + Thread.sleep(500); + producer.setDebug(true); + { + ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt); + List messageQueueList = producer.getMessageQueue(); + for (int i = 0; i < queueNum; i++) { + MessageQueue messageQueue = messageQueueList.get(i); + Assert.assertEquals(i, messageQueue.getQueueId()); + String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue); + Assert.assertEquals(destBrokerName, broker2Name); + } + + for(MessageQueue messageQueue: messageQueueList) { + producer.send(msgEachQueue, messageQueue); + } + Assert.assertEquals(0, producer.getSendErrorMsg().size()); + Assert.assertEquals(queueNum * msgEachQueue * 2, producer.getAllOriginMsg().size()); + //leave the time to build the cq + Thread.sleep(100); + for(MessageQueue messageQueue: messageQueueList) { + Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue)); + //the max offset should still be msgEachQueue + Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue)); + } + } + } + + @After public void tearDown() { System.setProperty("rocketmq.client.rebalance.waitInterval", "20000");