提交 a71ee3b9 编写于 作者: D dongeforever

Add test for logicOffset = -1

上级 4f96f72c
......@@ -137,7 +137,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
//no need to care the broker name
long staticLogicOffset = mappingItem.computeStaticQueueOffsetLoosely(responseHeader.getQueueOffset());
if (staticLogicOffset < 0) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
//if the logic offset is -1, just let it go
//maybe we need a dynamic config
//return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}
responseHeader.setQueueId(mappingContext.getGlobalId());
responseHeader.setQueueOffset(staticLogicOffset);
......
......@@ -23,13 +23,17 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminUtils;
import org.apache.rocketmq.tools.command.CommandUtil;
......@@ -187,4 +191,45 @@ public class MQAdminTestUtils {
MQAdminUtils.remappingStaticTopic(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false, defaultMQAdminExt);
}
//for test only
public static void remappingStaticTopicWithNegativeLogicOffset(String topic, Set<String> targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
assert !brokerConfigMap.isEmpty();
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers);
MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt);
remappingStaticTopicWithNegativeLogicOffset(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false, defaultMQAdminExt);
}
//for test only
public static void remappingStaticTopicWithNegativeLogicOffset(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
ClientMetadata clientMetadata = MQAdminUtils.getBrokerMetadata(defaultMQAdminExt);
MQAdminUtils.checkIfMasterAlive(brokerConfigMap.keySet(), defaultMQAdminExt, clientMetadata);
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step2: forbid the write of old leader
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step5: write the non-target brokers
for (String broker: brokerConfigMap.keySet()) {
if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) {
continue;
}
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
}
}
......@@ -52,6 +52,7 @@ public class StaticTopicIT extends BaseConf {
defaultMQAdminExt.start();
}
@Test
public void testNoTargetBrokers() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
......@@ -357,6 +358,85 @@ public class StaticTopicIT extends BaseConf {
}
}
@Test
public void testRemappingWithNegativeLogicOffset() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic);
int queueNum = 10;
int msgEachQueue = 100;
//create static topic
{
Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker1Name);
MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
}
//System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name));
//System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name));
//produce the messages
{
List<MessageQueue> messageQueueList = producer.getMessageQueue();
for (int i = 0; i < queueNum; i++) {
MessageQueue messageQueue = messageQueueList.get(i);
Assert.assertEquals(i, messageQueue.getQueueId());
Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
}
for(MessageQueue messageQueue: messageQueueList) {
producer.send(msgEachQueue, messageQueue);
}
Assert.assertEquals(0, producer.getSendErrorMsg().size());
//leave the time to build the cq
Thread.sleep(100);
for(MessageQueue messageQueue: messageQueueList) {
//Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
}
}
//remapping the static topic with -1 logic offset
{
Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker2Name);
MQAdminTestUtils.remappingStaticTopicWithNegativeLogicOffset(topic, targetBrokers, defaultMQAdminExt);
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
for (TopicQueueMappingOne mappingOne: globalIdMap.values()) {
Assert.assertEquals(broker2Name, mappingOne.getBname());
Assert.assertEquals(-1, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
}
}
//leave the time to refresh the metadata
Thread.sleep(500);
producer.setDebug(true);
{
ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
List<MessageQueue> messageQueueList = producer.getMessageQueue();
for (int i = 0; i < queueNum; i++) {
MessageQueue messageQueue = messageQueueList.get(i);
Assert.assertEquals(i, messageQueue.getQueueId());
String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
Assert.assertEquals(destBrokerName, broker2Name);
}
for(MessageQueue messageQueue: messageQueueList) {
producer.send(msgEachQueue, messageQueue);
}
Assert.assertEquals(0, producer.getSendErrorMsg().size());
Assert.assertEquals(queueNum * msgEachQueue * 2, producer.getAllOriginMsg().size());
//leave the time to build the cq
Thread.sleep(100);
for(MessageQueue messageQueue: messageQueueList) {
Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
//the max offset should still be msgEachQueue
Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
}
}
}
@After
public void tearDown() {
System.setProperty("rocketmq.client.rebalance.waitInterval", "20000");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册