From e93536a9aeb42f9695767fe28d43b1a229265a67 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Tue, 23 Nov 2021 20:21:28 +0800 Subject: [PATCH] Add IT Test for static topic --- .../apache/rocketmq/test/base/BaseConf.java | 16 + .../rocketmq/test/smoke/LogicalQueueIT.java | 1170 ----------------- .../rocketmq/test/smoke/StaticTopicIT.java | 76 ++ .../tools/admin/DefaultMQAdminExtTest.java | 8 +- 4 files changed, 93 insertions(+), 1177 deletions(-) delete mode 100644 test/src/test/java/org/apache/rocketmq/test/smoke/LogicalQueueIT.java create mode 100644 test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java index 79469e1f..2d59f31d 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java @@ -19,8 +19,10 @@ package org.apache.rocketmq.test.base; import com.google.common.collect.ImmutableList; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -120,6 +122,13 @@ public class BaseConf { return group; } + public static DefaultMQAdminExt getAdmin(String nsAddr) { + final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(500); + mqAdminExt.setNamesrvAddr(nsAddr); + mqClients.add(mqAdminExt); + return mqAdminExt; + } + public static RMQNormalProducer getProducer(String nsAddr, String topic) { return getProducer(nsAddr, topic, false); } @@ -197,6 +206,13 @@ public class BaseConf { shutdown(mqClients); } + public static Set getBrokers() { + Set brokers = new HashSet<>(); + brokers.add(broker1Name); + brokers.add(broker2Name); + return brokers; + } + public static void shutdown(List mqClients) { mqClients.forEach(mqClient -> ForkJoinPool.commonPool().execute(() -> { if (mqClient instanceof AbstractMQProducer) { diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/LogicalQueueIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/LogicalQueueIT.java deleted file mode 100644 index b1e9013f..00000000 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/LogicalQueueIT.java +++ /dev/null @@ -1,1170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.test.smoke; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import org.apache.commons.lang3.RandomStringUtils; -import org.apache.commons.lang3.reflect.FieldUtils; -import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; -import org.apache.rocketmq.client.consumer.PullCallback; -import org.apache.rocketmq.client.consumer.PullResult; -import org.apache.rocketmq.client.consumer.PullStatus; -import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.SendResultForLogicalQueue; -import org.apache.rocketmq.common.MQVersion; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.constant.PermName; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData; -import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo; -import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.namesrv.NamesrvController; -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingException; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.store.CommitLog; -import org.apache.rocketmq.store.DefaultMessageStore; -import org.apache.rocketmq.store.MappedFileQueue; -import org.apache.rocketmq.test.base.BaseConf; -import org.apache.rocketmq.test.base.IntegrationTestBase; -import org.apache.rocketmq.test.util.MQRandomUtils; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; -import org.apache.rocketmq.tools.command.logicalqueue.MigrateTopicLogicalQueueCommand; -import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand; -import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueNumCommand; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.FixMethodOrder; -import org.junit.Test; -import org.junit.runners.MethodSorters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.util.Optional.ofNullable; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.awaitility.Awaitility.await; -import static org.awaitility.Awaitility.waitAtMost; - -@FixMethodOrder(MethodSorters.NAME_ASCENDING) -public class LogicalQueueIT { - private static final Logger logger = LoggerFactory.getLogger(LogicalQueueIT.class); - public static String nsAddr; - private static String broker1Name; - private static String broker2Name; - private static String clusterName; - private static int brokerNum; - private final static int QUEUE_NUMBERS = 8; - private static NamesrvController namesrvController; - private static BrokerController brokerController1; - private static BrokerController brokerController2; - private static Map brokerControllerMap; - private final static List mqClients = new ArrayList<>(); - - private static DefaultMQProducer producer; - private static DefaultMQPullConsumer consumer; - private static DefaultMQAdminExt mqAdminExt; - private static volatile String topic = null; - private static final String placeholderTopic = "placeholder"; - private static final int MSG_SENT_TIMES = 3; - private static final int COMMIT_LOG_FILE_SIZE = 512 * 1024; - - @BeforeClass - public static void beforeClass() throws Exception { - System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); - namesrvController = IntegrationTestBase.createAndStartNamesrv(); - nsAddr = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort(); - - int oldCommitLogSize = IntegrationTestBase.COMMIT_LOG_SIZE; - IntegrationTestBase.COMMIT_LOG_SIZE = COMMIT_LOG_FILE_SIZE; - brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr); - brokerController2 = IntegrationTestBase.createAndStartBroker(nsAddr); - IntegrationTestBase.COMMIT_LOG_SIZE = oldCommitLogSize; - - clusterName = brokerController1.getBrokerConfig().getBrokerClusterName(); - broker1Name = brokerController1.getBrokerConfig().getBrokerName(); - broker2Name = brokerController2.getBrokerConfig().getBrokerName(); - brokerNum = 2; - brokerControllerMap = ImmutableList.of(brokerController1, brokerController2).stream().collect(Collectors.toMap(input -> input.getBrokerConfig().getBrokerName(), Function.identity())); - - BaseConf.waitBrokerRegistered(nsAddr, clusterName); - - producer = new DefaultMQProducer(MQRandomUtils.getRandomConsumerGroup()); - mqClients.add(producer); - producer.setNamesrvAddr(nsAddr); - producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE); - producer.setSendMsgTimeout(1000); - producer.start(); - - consumer = new DefaultMQPullConsumer(BaseConf.initConsumerGroup()); - mqClients.add(consumer); - consumer.setNamesrvAddr(nsAddr); - consumer.setConsumerPullTimeoutMillis(1000); - consumer.start(); - - mqAdminExt = new DefaultMQAdminExt(1000); - mqClients.add(mqAdminExt); - mqAdminExt.setNamesrvAddr(nsAddr); - mqAdminExt.start(); - - mqAdminExt.createTopic(clusterName, placeholderTopic, 1); - } - - @AfterClass - public static void afterClass() { - BaseConf.shutdown(mqClients); - brokerControllerMap.forEach((s, brokerController) -> brokerController.shutdown()); - ofNullable(namesrvController).ifPresent(obj -> ForkJoinPool.commonPool().execute(obj::shutdown)); - } - - @Before - public void setUp() throws Exception { - topic = "tt-" + MQRandomUtils.getRandomTopic(); - logger.info("use topic: {}", topic); - mqAdminExt.createTopic(clusterName, topic, QUEUE_NUMBERS); - assertThat(mqAdminExt.examineTopicRouteInfo(topic).getBrokerDatas()).hasSize(brokerNum); - await().atMost(5, TimeUnit.SECONDS).until(() -> !mqAdminExt.examineTopicStats(topic).getOffsetTable().isEmpty()); - - consumer.setRegisterTopics(Collections.singleton(topic)); - // consumer.setMessageQueueListener & consumer.registerMessageQueueListener are useless in DefaultMQPullConsumer, they will never work, so do not need to test it - - new UpdateTopicLogicalQueueMappingCommand().execute(mqAdminExt, topic, brokerControllerMap.values().stream().map(BrokerController::getBrokerAddr).collect(Collectors.toSet())); - } - - private static String getCurrentMethodName() { - // 0: getStackTrace - // 1: getCurrentMethodName - // 2: __realMethod__ - return Thread.currentThread().getStackTrace()[2].getMethodName(); - } - - @Test - public void test001_SendPullSync() throws Exception { - String methodName = getCurrentMethodName(); - - List publishMessageQueues = producer.fetchPublishMessageQueues(topic); - assertThat(publishMessageQueues).hasSize(brokerNum * QUEUE_NUMBERS); - Set queueIds = IntStream.range(0, brokerNum * QUEUE_NUMBERS).boxed().collect(Collectors.toSet()); - for (MessageQueue messageQueue : publishMessageQueues) { - assertThat(messageQueue.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME); - assertThat(queueIds.remove(messageQueue.getQueueId())).isTrue(); - for (int i = 0; i < MSG_SENT_TIMES; i++) { - SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-sync-%d-%d", methodName, messageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), messageQueue); - assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(messageQueue.getBrokerName()); - assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(messageQueue.getQueueId()); - } - } - assertThat(queueIds).isEmpty(); - - List subscribeMessageQueues = consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList()); - assertThat(subscribeMessageQueues).hasSize(brokerNum * QUEUE_NUMBERS); - subscribeMessageQueues.sort(Comparator.comparingInt(MessageQueue::getQueueId)); - queueIds.addAll(IntStream.range(0, brokerNum * QUEUE_NUMBERS).boxed().collect(Collectors.toSet())); - for (MessageQueue messageQueue : subscribeMessageQueues) { - assertThat(messageQueue.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME); - assertThat(queueIds.remove(messageQueue.getQueueId())).isTrue(); - long offset = mqAdminExt.minOffset(messageQueue); - PullResult pullResult = consumer.pull(messageQueue, "*", offset, 10); - assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND); - assertThat(pullResult.getMsgFoundList()).hasSize(MSG_SENT_TIMES); - offset = -1; - for (int i = 0; i < MSG_SENT_TIMES; i++) { - MessageExt msg = pullResult.getMsgFoundList().get(i); - assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME); - assertThat(msg.getQueueId()).isEqualTo(messageQueue.getQueueId()); - assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-sync-%d-%d", methodName, messageQueue.getQueueId(), i)); - if (i > 0) { - assertThat(msg.getQueueOffset()).isEqualTo(offset + i); - } else { - offset = msg.getQueueOffset(); - } - } - assertThat(maxOffsetUncommitted(messageQueue)).isEqualTo(offset + MSG_SENT_TIMES); - } - assertThat(queueIds).isEmpty(); - } - - @Test - public void test002_SendPullAsync() throws Exception { - String methodName = getCurrentMethodName(); - - List publishMessageQueues = producer.fetchPublishMessageQueues(topic); - for (MessageQueue messageQueue : publishMessageQueues) { - for (int i = 0; i < MSG_SENT_TIMES; i++) { - CompletableFuture future = new CompletableFuture<>(); - producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-async-%d-%d", methodName, messageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), messageQueue, new SendCallback() { - @Override public void onSuccess(SendResult sendResult) { - future.complete(sendResult); - } - - @Override public void onException(Throwable e) { - future.completeExceptionally(e); - } - }); - SendResult sendResult = future.get(); - assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(messageQueue.getBrokerName()); - assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(messageQueue.getQueueId()); - } - } - - List subscribeMessageQueues = consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList()); - for (MessageQueue messageQueue : subscribeMessageQueues) { - long offset = mqAdminExt.minOffset(messageQueue); - CompletableFuture future = new CompletableFuture<>(); - consumer.pull(messageQueue, "*", offset, 10, new PullCallback() { - @Override public void onSuccess(PullResult pullResult) { - future.complete(pullResult); - } - - @Override public void onException(Throwable e) { - future.completeExceptionally(e); - } - }); - PullResult pullResult = future.get(); - assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND); - assertThat(pullResult.getMsgFoundList()).hasSize(MSG_SENT_TIMES); - offset = -1; - Iterator it = pullResult.getMsgFoundList().iterator(); - for (int i = 0; i < MSG_SENT_TIMES; i++) { - MessageExt msg = it.next(); - assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME); - assertThat(msg.getQueueId()).isEqualTo(messageQueue.getQueueId()); - assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-async-%d-%d", methodName, messageQueue.getQueueId(), i)); - if (i > 0) { - assertThat(msg.getQueueOffset()).isEqualTo(offset + i); - } else { - offset = msg.getQueueOffset(); - } - } - } - } - - @Test - public void test003_MigrateOnceWithoutData() throws Exception { - final String methodName = getCurrentMethodName(); - - final int logicalQueueIdx = 1; - - TopicRouteData topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic); - List logicalQueueRouteDataList1 = topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx); - LogicalQueueRouteData lastLogicalQueueRouteData1 = logicalQueueRouteDataList1.get(logicalQueueRouteDataList1.size() - 1); - String newBrokerName; - if (lastLogicalQueueRouteData1.getBrokerName().equals(broker1Name)) { - newBrokerName = broker2Name; - } else { - newBrokerName = broker1Name; - } - - MessageQueue migratedMessageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, logicalQueueIdx); - - new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, logicalQueueIdx, newBrokerName, null); - - topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic); - assertThat(topicRouteInfo.getLogicalQueuesInfo()).isNotNull(); - for (Map.Entry> entry : topicRouteInfo.getLogicalQueuesInfo().entrySet()) { - List logicalQueueRouteDataList2 = entry.getValue(); - if (entry.getKey() == logicalQueueIdx) { - assertThat(logicalQueueRouteDataList2).hasSize(logicalQueueRouteDataList1.size() + 1); - LogicalQueueRouteData lastLogicalQueueRouteData2 = logicalQueueRouteDataList2.get(logicalQueueRouteDataList2.size() - 2); - assertThat(lastLogicalQueueRouteData2.getMessageQueue()).isEqualTo(lastLogicalQueueRouteData1.getMessageQueue()); - assertThat(lastLogicalQueueRouteData2.getOffsetMax()).isGreaterThanOrEqualTo(0L); - assertThat(lastLogicalQueueRouteData2.getMessagesCount()).isEqualTo(0L); - assertThat(lastLogicalQueueRouteData2.isWritable()).isFalse(); - assertThat(lastLogicalQueueRouteData2.isReadable()).isFalse(); - assertThat(lastLogicalQueueRouteData2.isExpired()).isTrue(); - assertThat(lastLogicalQueueRouteData2.getLogicalQueueDelta()).isEqualTo(0L); - - LogicalQueueRouteData lastLogicalQueueRouteData3 = logicalQueueRouteDataList2.get(logicalQueueRouteDataList2.size() - 1); - assertThat(lastLogicalQueueRouteData3.getBrokerName()).isEqualTo(newBrokerName); - assertThat(lastLogicalQueueRouteData3.getOffsetMax()).isLessThan(0L); - assertThat(lastLogicalQueueRouteData3.isWritable()).isTrue(); - assertThat(lastLogicalQueueRouteData3.isReadable()).isTrue(); - assertThat(lastLogicalQueueRouteData3.isExpired()).isFalse(); - assertThat(lastLogicalQueueRouteData3.getLogicalQueueDelta()).isEqualTo(0L); - } else { - assertThat(logicalQueueRouteDataList2).hasSize(1); - LogicalQueueRouteData logicalQueueRouteData = logicalQueueRouteDataList2.get(0); - assertThat(logicalQueueRouteData.getOffsetMax()).isLessThan(0L); - assertThat(logicalQueueRouteData.isWritable()).isTrue(); - assertThat(logicalQueueRouteData.isReadable()).isTrue(); - assertThat(logicalQueueRouteData.isExpired()).isFalse(); - assertThat(logicalQueueRouteData.getLogicalQueueDelta()).isEqualTo(0L); - } - } - - List subscribeMessageQueues = consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList()); - assertThat(subscribeMessageQueues).hasSize(brokerNum * QUEUE_NUMBERS); - for (MessageQueue mq : subscribeMessageQueues) { - assertThat(mqAdminExt.minOffset(mq)).isEqualTo(0L); - } - - for (int i = 0; i < MSG_SENT_TIMES; i++) { - SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-sync-%d-%d", methodName, migratedMessageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue); - assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migratedMessageQueue.getBrokerName()); - assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migratedMessageQueue.getQueueId()); - SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult; - assertThat(sendResult2.getOrigBrokerName()).isEqualTo(newBrokerName); - assertThat(sendResult2.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS); - } - - for (int i = 0; i < MSG_SENT_TIMES; i++) { - CompletableFuture future = new CompletableFuture<>(); - producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-async-%d-%d", methodName, migratedMessageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue, new SendCallback() { - @Override public void onSuccess(SendResult sendResult) { - future.complete(sendResult); - } - - @Override public void onException(Throwable e) { - future.completeExceptionally(e); - } - }); - SendResult sendResult = future.get(); - assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migratedMessageQueue.getBrokerName()); - assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migratedMessageQueue.getQueueId()); - SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult; - assertThat(sendResult2.getOrigBrokerName()).isEqualTo(newBrokerName); - assertThat(sendResult2.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS); - } - - assertThat(maxOffsetUncommitted(migratedMessageQueue)).isEqualTo(2 * MSG_SENT_TIMES); - - waitAtMost(5, TimeUnit.SECONDS).until(() -> mqAdminExt.maxOffset(migratedMessageQueue) == 2 * MSG_SENT_TIMES); - - PullResult pullResult = consumer.pull(migratedMessageQueue, "*", 0L, 2 * MSG_SENT_TIMES); - assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND); - assertThat(pullResult.getMinOffset()).isEqualTo(0); - assertThat(pullResult.getMaxOffset()).isEqualTo(2 * MSG_SENT_TIMES); - assertThat(pullResult.getNextBeginOffset()).isEqualTo(2 * MSG_SENT_TIMES); - List msgFoundList = pullResult.getMsgFoundList(); - assertThat(msgFoundList).hasSize(2 * MSG_SENT_TIMES); - Iterator it = pullResult.getMsgFoundList().iterator(); - long offset = 0L; - for (String prefix : new String[] {"sync", "async"}) { - for (int i = 0; i < MSG_SENT_TIMES; i++) { - MessageExt msg = it.next(); - assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME); - assertThat(msg.getQueueId()).isEqualTo(migratedMessageQueue.getQueueId()); - assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-%s-%d-%d", methodName, prefix, migratedMessageQueue.getQueueId(), i)); - assertThat(msg.getQueueOffset()).isEqualTo(offset); - offset++; - } - } - - offset = pullResult.getNextBeginOffset(); - CompletableFuture future = new CompletableFuture<>(); - consumer.pull(migratedMessageQueue, "*", offset, 10, new PullCallback() { - @Override public void onSuccess(PullResult pullResult) { - future.complete(pullResult); - } - - @Override public void onException(Throwable e) { - future.completeExceptionally(e); - } - }); - pullResult = future.get(); - assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG); - assertThat(pullResult.getMinOffset()).isEqualTo(0); - assertThat(pullResult.getMaxOffset()).isEqualTo(2 * MSG_SENT_TIMES); - assertThat(pullResult.getNextBeginOffset()).isEqualTo(2 * MSG_SENT_TIMES); - assertThat(pullResult.getMsgFoundList()).isNull(); - } - - @Test - public void test004_MigrateOnceWithData() throws Exception { - final String methodName = getCurrentMethodName(); - - final int logicalQueueIdx = 1; - - TopicRouteData topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic); - List logicalQueueRouteDataList1 = topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx); - LogicalQueueRouteData lastLogicalQueueRouteData1 = logicalQueueRouteDataList1.get(logicalQueueRouteDataList1.size() - 1); - String newBrokerName; - if (lastLogicalQueueRouteData1.getBrokerName().equals(broker1Name)) { - newBrokerName = broker2Name; - } else { - newBrokerName = broker1Name; - } - - MessageQueue migratedMessageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, logicalQueueIdx); - - for (int i = 0; i < MSG_SENT_TIMES; i++) { - SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-sync-%d-%d", methodName, migratedMessageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue); - assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migratedMessageQueue.getBrokerName()); - assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migratedMessageQueue.getQueueId()); - } - assertThat(maxOffsetUncommitted(migratedMessageQueue)).isEqualTo(MSG_SENT_TIMES); - - waitAtMost(5, TimeUnit.SECONDS).until(() -> mqAdminExt.maxOffset(migratedMessageQueue) == MSG_SENT_TIMES); - - { - long offset = 0L; - PullResult pullResult = consumer.pull(migratedMessageQueue, "*", offset, 2 * MSG_SENT_TIMES); - assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND); - assertThat(pullResult.getMinOffset()).isEqualTo(0); - assertThat(pullResult.getMaxOffset()).isEqualTo(MSG_SENT_TIMES); - assertThat(pullResult.getNextBeginOffset()).isEqualTo(MSG_SENT_TIMES); - List msgFoundList = pullResult.getMsgFoundList(); - assertThat(msgFoundList).hasSize(MSG_SENT_TIMES); - Iterator it = pullResult.getMsgFoundList().iterator(); - for (int i = 0; i < MSG_SENT_TIMES; i++) { - MessageExt msg = it.next(); - assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME); - assertThat(msg.getQueueId()).isEqualTo(migratedMessageQueue.getQueueId()); - assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-sync-%d-%d", methodName, migratedMessageQueue.getQueueId(), i)); - assertThat(msg.getQueueOffset()).isEqualTo(offset); - offset++; - } - } - - new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, logicalQueueIdx, newBrokerName, null); - - topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic); - assertThat(topicRouteInfo.getLogicalQueuesInfo()).isNotNull(); - for (Map.Entry> entry : topicRouteInfo.getLogicalQueuesInfo().entrySet()) { - List logicalQueueRouteDataList2 = entry.getValue(); - if (entry.getKey() == logicalQueueIdx) { - assertThat(logicalQueueRouteDataList2).hasSize(logicalQueueRouteDataList1.size() + 1); - LogicalQueueRouteData lastLogicalQueueRouteData2 = logicalQueueRouteDataList2.get(logicalQueueRouteDataList2.size() - 2); - assertThat(lastLogicalQueueRouteData2.getMessageQueue()).isEqualTo(lastLogicalQueueRouteData1.getMessageQueue()); - assertThat(lastLogicalQueueRouteData2.getOffsetMax()).isGreaterThanOrEqualTo(0L); - assertThat(lastLogicalQueueRouteData2.getMessagesCount()).isEqualTo(MSG_SENT_TIMES); - assertThat(lastLogicalQueueRouteData2.isWritable()).isFalse(); - assertThat(lastLogicalQueueRouteData2.isReadable()).isTrue(); - assertThat(lastLogicalQueueRouteData2.isExpired()).isFalse(); - assertThat(lastLogicalQueueRouteData2.getLogicalQueueDelta()).isEqualTo(0L); - - LogicalQueueRouteData lastLogicalQueueRouteData3 = logicalQueueRouteDataList2.get(logicalQueueRouteDataList2.size() - 1); - assertThat(lastLogicalQueueRouteData3.getBrokerName()).isEqualTo(newBrokerName); - assertThat(lastLogicalQueueRouteData3.getOffsetMax()).isLessThan(0L); - assertThat(lastLogicalQueueRouteData3.isWritable()).isTrue(); - assertThat(lastLogicalQueueRouteData3.isReadable()).isTrue(); - assertThat(lastLogicalQueueRouteData3.isExpired()).isFalse(); - assertThat(lastLogicalQueueRouteData3.getLogicalQueueDelta()).isEqualTo(MSG_SENT_TIMES); - } else { - assertThat(logicalQueueRouteDataList2).hasSize(1); - LogicalQueueRouteData logicalQueueRouteData = logicalQueueRouteDataList2.get(0); - assertThat(logicalQueueRouteData.getOffsetMax()).isLessThan(0L); - assertThat(logicalQueueRouteData.isWritable()).isTrue(); - assertThat(logicalQueueRouteData.isReadable()).isTrue(); - assertThat(logicalQueueRouteData.isExpired()).isFalse(); - assertThat(logicalQueueRouteData.getLogicalQueueDelta()).isEqualTo(0L); - } - } - assertThat(migratedMessageQueue).isNotNull(); - - List subscribeMessageQueues = consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList()); - assertThat(subscribeMessageQueues).hasSize(brokerNum * QUEUE_NUMBERS); - for (MessageQueue mq : subscribeMessageQueues) { - assertThat(mqAdminExt.minOffset(mq)).isEqualTo(0L); - } - - for (int i = 0; i < MSG_SENT_TIMES; i++) { - CompletableFuture future = new CompletableFuture<>(); - producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-async-%d-%d", methodName, migratedMessageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue, new SendCallback() { - @Override public void onSuccess(SendResult sendResult) { - future.complete(sendResult); - } - - @Override public void onException(Throwable e) { - future.completeExceptionally(e); - } - }); - SendResult sendResult = future.get(); - assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migratedMessageQueue.getBrokerName()); - assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migratedMessageQueue.getQueueId()); - SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult; - assertThat(sendResult2.getOrigBrokerName()).isEqualTo(newBrokerName); - assertThat(sendResult2.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS); - } - - assertThat(maxOffsetUncommitted(migratedMessageQueue)).isEqualTo(2 * MSG_SENT_TIMES); - - waitAtMost(5, TimeUnit.SECONDS).until(() -> mqAdminExt.maxOffset(migratedMessageQueue) == 2 * MSG_SENT_TIMES); - - long offset = 0L; - PullResult pullResult = consumer.pull(migratedMessageQueue, "*", offset, 2 * MSG_SENT_TIMES); - assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND); - assertThat(pullResult.getMinOffset()).isEqualTo(0); - assertThat(pullResult.getMaxOffset()).isEqualTo(MSG_SENT_TIMES); - assertThat(pullResult.getNextBeginOffset()).isEqualTo(MSG_SENT_TIMES); - List msgFoundList = pullResult.getMsgFoundList(); - assertThat(msgFoundList).hasSize(MSG_SENT_TIMES); - Iterator it = pullResult.getMsgFoundList().iterator(); - for (int i = 0; i < MSG_SENT_TIMES; i++) { - MessageExt msg = it.next(); - assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME); - assertThat(msg.getQueueId()).isEqualTo(migratedMessageQueue.getQueueId()); - assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-sync-%d-%d", methodName, migratedMessageQueue.getQueueId(), i)); - assertThat(msg.getQueueOffset()).isEqualTo(offset); - offset++; - } - - offset = pullResult.getNextBeginOffset(); - CompletableFuture pullResultFuture = new CompletableFuture<>(); - consumer.pull(migratedMessageQueue, "*", offset, 2 * MSG_SENT_TIMES, new PullCallback() { - @Override public void onSuccess(PullResult pullResult) { - pullResultFuture.complete(pullResult); - } - - @Override public void onException(Throwable e) { - pullResultFuture.completeExceptionally(e); - } - }); - pullResult = pullResultFuture.get(); - assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND); - assertThat(pullResult.getMinOffset()).isEqualTo(MSG_SENT_TIMES); - assertThat(pullResult.getMaxOffset()).isEqualTo(2 * MSG_SENT_TIMES); - assertThat(pullResult.getNextBeginOffset()).isEqualTo(2 * MSG_SENT_TIMES); - msgFoundList = pullResult.getMsgFoundList(); - assertThat(msgFoundList).hasSize(MSG_SENT_TIMES); - it = pullResult.getMsgFoundList().iterator(); - for (int i = 0; i < MSG_SENT_TIMES; i++) { - MessageExt msg = it.next(); - assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME); - assertThat(msg.getQueueId()).isEqualTo(migratedMessageQueue.getQueueId()); - assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-async-%d-%d", methodName, migratedMessageQueue.getQueueId(), i)); - assertThat(msg.getQueueOffset()).isEqualTo(offset); - offset++; - } - - offset = pullResult.getNextBeginOffset(); - pullResult = consumer.pull(migratedMessageQueue, "*", offset, 10); - assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG); - assertThat(pullResult.getMinOffset()).isEqualTo(MSG_SENT_TIMES); - assertThat(pullResult.getMaxOffset()).isEqualTo(2 * MSG_SENT_TIMES); - assertThat(pullResult.getNextBeginOffset()).isEqualTo(2 * MSG_SENT_TIMES); - assertThat(pullResult.getMsgFoundList()).isNull(); - } - - @Test - public void test005_MigrateWithDataBackAndForth() throws Exception { - final String methodName = getCurrentMethodName(); - - final int logicalQueueIdx = 1; - - MessageQueue migratedMessageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, logicalQueueIdx); - - BrokerController brokerController; - - TopicRouteData topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic); - LogicalQueueRouteData lastLogicalQueueRouteData; - { - List logicalQueueRouteDataList = topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx); - lastLogicalQueueRouteData = logicalQueueRouteDataList.get(logicalQueueRouteDataList.size() - 1); - } - final String fromBrokerName, toBrokerName, fromBrokerAddr, toBrokerAddr; - if (lastLogicalQueueRouteData.getBrokerName().equals(broker1Name)) { - fromBrokerName = broker1Name; - fromBrokerAddr = brokerController1.getBrokerAddr(); - toBrokerName = broker2Name; - toBrokerAddr = brokerController2.getBrokerAddr(); - } else { - fromBrokerName = broker2Name; - fromBrokerAddr = brokerController2.getBrokerAddr(); - toBrokerName = broker1Name; - toBrokerAddr = brokerController1.getBrokerAddr(); - } - - int msgIdx = 0; - - for (int i = 0; i < MSG_SENT_TIMES; i++) { - SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx, msgIdx++).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue); - SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult; - assertThat(sendResult2.getOrigBrokerName()).isEqualTo(fromBrokerName); - assertThat(sendResult2.getOrigQueueId()).isEqualTo(logicalQueueIdx); - } - - rotateBrokerCommitLog(brokerControllerMap.get(fromBrokerName)); - - new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, logicalQueueIdx, toBrokerName, null); - - { - LogicalQueuesInfo info; - List logicalQueueRouteDataList; - info = mqAdminExt.queryTopicLogicalQueueMapping(fromBrokerAddr, topic); - logicalQueueRouteDataList = info.get(logicalQueueIdx); - assertThat(logicalQueueRouteDataList).hasSize(2); - info = mqAdminExt.queryTopicLogicalQueueMapping(toBrokerAddr, topic); - logicalQueueRouteDataList = info.get(logicalQueueIdx); - assertThat(logicalQueueRouteDataList).hasSize(1); - } - - for (int i = 0; i < MSG_SENT_TIMES; i++) { - SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx, msgIdx++).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue); - SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult; - assertThat(sendResult2.getOrigBrokerName()).isEqualTo(toBrokerName); - assertThat(sendResult2.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS); - } - - new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, logicalQueueIdx, fromBrokerName, null); - // now will reuse queue with a ReadOnly one - - { - LogicalQueuesInfo info; - List logicalQueueRouteDataList; - info = mqAdminExt.queryTopicLogicalQueueMapping(fromBrokerAddr, topic); - logicalQueueRouteDataList = info.get(logicalQueueIdx); - assertThat(logicalQueueRouteDataList).hasSize(3); - info = mqAdminExt.queryTopicLogicalQueueMapping(toBrokerAddr, topic); - logicalQueueRouteDataList = info.get(logicalQueueIdx); - assertThat(logicalQueueRouteDataList).hasSize(2); - } - - for (int i = 0; i < MSG_SENT_TIMES; i++) { - SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx, msgIdx++).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue); - SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult; - assertThat(sendResult2.getOrigBrokerName()).isEqualTo(fromBrokerName); - assertThat(sendResult2.getOrigQueueId()).isEqualTo(logicalQueueIdx); - } - - LogicalQueueRouteData logicalQueueRouteData1; - LogicalQueueRouteData logicalQueueRouteData2; - { - List logicalQueueRouteDataList; - topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic); - logicalQueueRouteDataList = topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx); - assertThat(logicalQueueRouteDataList).hasSize(3); - logicalQueueRouteData1 = logicalQueueRouteDataList.get(0); - assertThat(logicalQueueRouteData1.getLogicalQueueDelta()).isEqualTo(0); - assertThat(logicalQueueRouteData1.isReadable()).isTrue(); - assertThat(logicalQueueRouteData1.isWritable()).isFalse(); - assertThat(logicalQueueRouteData1.isExpired()).isFalse(); - assertThat(logicalQueueRouteData1.isWriteOnly()).isFalse(); - assertThat(logicalQueueRouteData1.getBrokerName()).isEqualTo(fromBrokerName); - assertThat(logicalQueueRouteData1.getOffsetMax()).isGreaterThanOrEqualTo(0L); - assertThat(logicalQueueRouteData1.getMessagesCount()).isEqualTo(MSG_SENT_TIMES); - assertThat(logicalQueueRouteData1.getFirstMsgTimeMillis()).isGreaterThan(0L); - assertThat(logicalQueueRouteData1.getLastMsgTimeMillis()).isGreaterThan(0L); - logicalQueueRouteData2 = logicalQueueRouteDataList.get(1); - assertThat(logicalQueueRouteData2.getLogicalQueueDelta()).isEqualTo(MSG_SENT_TIMES); - assertThat(logicalQueueRouteData2.isReadable()).isTrue(); - assertThat(logicalQueueRouteData2.isWritable()).isFalse(); - assertThat(logicalQueueRouteData2.isExpired()).isFalse(); - assertThat(logicalQueueRouteData2.isWriteOnly()).isFalse(); - assertThat(logicalQueueRouteData2.getBrokerName()).isEqualTo(toBrokerName); - assertThat(logicalQueueRouteData2.getOffsetMax()).isGreaterThanOrEqualTo(0L); - assertThat(logicalQueueRouteData2.getMessagesCount()).isEqualTo(MSG_SENT_TIMES); - assertThat(logicalQueueRouteData2.getFirstMsgTimeMillis()).isGreaterThan(0L); - assertThat(logicalQueueRouteData2.getLastMsgTimeMillis()).isGreaterThan(0L); - LogicalQueueRouteData logicalQueueRouteData3 = logicalQueueRouteDataList.get(2); - assertThat(logicalQueueRouteData3.getLogicalQueueDelta()).isEqualTo(2 * MSG_SENT_TIMES); - assertThat(logicalQueueRouteData3.isReadable()).isTrue(); - assertThat(logicalQueueRouteData3.isWritable()).isTrue(); - assertThat(logicalQueueRouteData3.isExpired()).isFalse(); - assertThat(logicalQueueRouteData3.isWriteOnly()).isFalse(); - assertThat(logicalQueueRouteData3.getBrokerName()).isEqualTo(fromBrokerName); - assertThat(logicalQueueRouteData3.getOffsetMax()).isLessThan(0L); - } - - msgIdx = 0; - forLoop: - for (long offset = 0L; ; ) { - PullResult pullResult = consumer.pull(migratedMessageQueue, "*", offset, 3 * MSG_SENT_TIMES); - switch (pullResult.getPullStatus()) { - case NO_NEW_MSG: - assertThat(offset).isGreaterThanOrEqualTo(3L * MSG_SENT_TIMES); - break forLoop; - case OFFSET_ILLEGAL: - offset = pullResult.getNextBeginOffset(); - break; - default: - assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND); - assertThat(pullResult.getMsgFoundList()).isNotNull(); - assertThat(pullResult.getMsgFoundList()).hasSize(MSG_SENT_TIMES); - for (MessageExt msg : pullResult.getMsgFoundList()) { - assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx, msgIdx)); - msgIdx++; - assertThat(msg.getQueueOffset()).isEqualTo(offset); - offset++; - } - offset = pullResult.getNextBeginOffset(); - break; - } - } - - waitAtMost(5, TimeUnit.SECONDS).until(() -> maxOffsetUncommitted(logicalQueueRouteData1.getMessageQueue()) == mqAdminExt.maxOffset(logicalQueueRouteData1.getMessageQueue())); - waitAtMost(5, TimeUnit.SECONDS).until(() -> maxOffsetUncommitted(logicalQueueRouteData2.getMessageQueue()) == mqAdminExt.maxOffset(logicalQueueRouteData2.getMessageQueue())); - - // now verify after commit log cleaned, toBroker's first queue route data will be expired too - brokerController = brokerControllerMap.get(logicalQueueRouteData2.getBrokerName()); - rotateBrokerCommitLog(brokerController); - deleteCommitLogFiles(brokerController, 1); - - { - topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic); - List logicalQueueRouteDataList = topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx); - assertThat(logicalQueueRouteDataList).hasSize(2); - assertThat(logicalQueueRouteDataList.get(0)).isEqualToIgnoringGivenFields(new LogicalQueueRouteData(logicalQueueIdx, 0, new MessageQueue(topic, fromBrokerName, logicalQueueIdx), MessageQueueRouteState.ReadOnly, 0, 3, -1, -1, fromBrokerAddr), "firstMsgTimeMillis", "lastMsgTimeMillis"); - assertThat(logicalQueueRouteDataList.get(1)).isEqualToComparingFieldByField(new LogicalQueueRouteData(logicalQueueIdx, 2 * MSG_SENT_TIMES, new MessageQueue(topic, fromBrokerName, logicalQueueIdx), MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, fromBrokerAddr)); - } - - // try pull again, since there is an expired queue route in the middle. - { - int msgCount = 0; - Queue wantMsgIdx = new LinkedList<>(); - wantMsgIdx.addAll(IntStream.range(0, MSG_SENT_TIMES).boxed().collect(Collectors.toList())); - wantMsgIdx.addAll(IntStream.range(2 * MSG_SENT_TIMES, 3 * MSG_SENT_TIMES).boxed().collect(Collectors.toList())); - forLoop: - for (long offset = mqAdminExt.minOffset(migratedMessageQueue); ; ) { - PullResult pullResult = consumer.pull(migratedMessageQueue, "*", offset, 3 * MSG_SENT_TIMES); - switch (pullResult.getPullStatus()) { - case NO_NEW_MSG: - assertThat(msgCount).as("offset=%d", offset).isEqualTo(2 * MSG_SENT_TIMES); - break forLoop; - case OFFSET_ILLEGAL: - offset = pullResult.getNextBeginOffset(); - break; - case FOUND: - msgCount += pullResult.getMsgFoundList().size(); - boolean first = true; - for (MessageExt msg : pullResult.getMsgFoundList()) { - assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).as("offset=%d", offset).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx, wantMsgIdx.poll())); - if (first) { - assertThat(msg.getQueueOffset()).isGreaterThanOrEqualTo(offset); - first = false; - } else { - assertThat(msg.getQueueOffset()).isGreaterThan(offset); - } - offset = msg.getQueueOffset(); - } - offset = pullResult.getNextBeginOffset(); - break; - default: - Assert.fail(String.format(Locale.ENGLISH, "unexpected pull offset=%d status: %s", offset, pullResult)); - } - } - } - - // rotate first queue route to expired, and pull it - brokerController = brokerControllerMap.get(logicalQueueRouteData1.getBrokerName()); - rotateBrokerCommitLog(brokerController); - deleteCommitLogFiles(brokerController, 2); - - { - List logicalQueueRouteDataList; - topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic); - logicalQueueRouteDataList = topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx); - assertThat(logicalQueueRouteDataList).isEqualTo(Collections.singletonList(new LogicalQueueRouteData(logicalQueueIdx, 2 * MSG_SENT_TIMES, new MessageQueue(topic, fromBrokerName, logicalQueueIdx), MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, fromBrokerAddr))); - } - - { - int msgCount = 0; - Queue wantMsgIdx = new LinkedList<>(); - wantMsgIdx.addAll(IntStream.range(2 * MSG_SENT_TIMES, 3 * MSG_SENT_TIMES).boxed().collect(Collectors.toList())); - forLoop: - for (long offset = mqAdminExt.minOffset(migratedMessageQueue); ; ) { - PullResult pullResult = consumer.pull(migratedMessageQueue, "*", offset, 3 * MSG_SENT_TIMES); - switch (pullResult.getPullStatus()) { - case NO_NEW_MSG: - if (msgCount != MSG_SENT_TIMES) { - Assert.fail(String.format(Locale.ENGLISH, "want %d msg but got %d", MSG_SENT_TIMES, msgCount)); - } - break forLoop; - case OFFSET_ILLEGAL: - offset = pullResult.getNextBeginOffset(); - break; - case FOUND: - msgCount += pullResult.getMsgFoundList().size(); - boolean first = true; - for (MessageExt msg : pullResult.getMsgFoundList()) { - assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).as("offset=%d", offset).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx, wantMsgIdx.poll())); - if (first) { - assertThat(msg.getQueueOffset()).isGreaterThanOrEqualTo(offset); - first = false; - } else { - assertThat(msg.getQueueOffset()).isGreaterThan(offset); - } - offset = msg.getQueueOffset(); - } - offset = pullResult.getNextBeginOffset(); - break; - default: - Assert.fail(String.format(Locale.ENGLISH, "unexpected pull offset=%d status: %s", offset, pullResult)); - } - } - } - - brokerController = brokerControllerMap.get(fromBrokerName); - rotateBrokerCommitLog(brokerController); - deleteCommitLogFiles(brokerController, 1); - - { - forLoop: - for (long offset = mqAdminExt.minOffset(migratedMessageQueue); ; ) { - PullResult pullResult = consumer.pull(migratedMessageQueue, "*", offset, 3 * MSG_SENT_TIMES); - // commit log rotate and cleaned, so there is no message. - switch (pullResult.getPullStatus()) { - case NO_MATCHED_MSG: - case NO_NEW_MSG: - assertThat(pullResult.getNextBeginOffset()).isEqualTo(3 * MSG_SENT_TIMES); - break forLoop; - case OFFSET_ILLEGAL: - offset = pullResult.getNextBeginOffset(); - break; - default: - Assert.fail(String.format(Locale.ENGLISH, "unexpected pull offset=%d status: %s", offset, pullResult)); - } - } - } - - { - LogicalQueuesInfo logicalQueuesInfo = mqAdminExt.queryTopicLogicalQueueMapping(brokerController.getBrokerAddr(), topic); - List logicalQueueRouteDataList = logicalQueuesInfo.get(logicalQueueIdx); - assertThat(logicalQueueRouteDataList).isEqualTo(Collections.singletonList(new LogicalQueueRouteData(logicalQueueIdx, 2 * MSG_SENT_TIMES, new MessageQueue(topic, fromBrokerName, logicalQueueIdx), MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, fromBrokerAddr))); - } - - // try migrate to this broker which has a expired queue, expect it will reuse the expired one, pull it to verify if delta works well - new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, logicalQueueIdx, toBrokerName, null); - - { - List logicalQueueRouteDataList; - topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic); - logicalQueueRouteDataList = topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx); - assertThat(logicalQueueRouteDataList).isEqualTo(Arrays.asList( - new LogicalQueueRouteData(logicalQueueIdx, 2 * MSG_SENT_TIMES, new MessageQueue(topic, fromBrokerName, logicalQueueIdx), MessageQueueRouteState.Expired, MSG_SENT_TIMES, 2 * MSG_SENT_TIMES, 0, 0, fromBrokerAddr) - , new LogicalQueueRouteData(logicalQueueIdx, 3 * MSG_SENT_TIMES, new MessageQueue(topic, toBrokerName, QUEUE_NUMBERS), MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, toBrokerAddr) - )); - - LogicalQueuesInfo info; - info = mqAdminExt.queryTopicLogicalQueueMapping(fromBrokerAddr, topic); - logicalQueueRouteDataList = info.get(logicalQueueIdx); - assertThat(logicalQueueRouteDataList).isEqualTo(Arrays.asList( - new LogicalQueueRouteData(logicalQueueIdx, 2 * MSG_SENT_TIMES, new MessageQueue(topic, fromBrokerName, logicalQueueIdx), MessageQueueRouteState.Expired, MSG_SENT_TIMES, 2 * MSG_SENT_TIMES, 0, 0, fromBrokerAddr) - , new LogicalQueueRouteData(logicalQueueIdx, 3 * MSG_SENT_TIMES, new MessageQueue(topic, toBrokerName, QUEUE_NUMBERS), MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, toBrokerAddr) - )); - info = mqAdminExt.queryTopicLogicalQueueMapping(toBrokerAddr, topic); - logicalQueueRouteDataList = info.get(logicalQueueIdx); - assertThat(logicalQueueRouteDataList).isEqualTo(Collections.singletonList(new LogicalQueueRouteData(logicalQueueIdx, 3 * MSG_SENT_TIMES, new MessageQueue(topic, toBrokerName, QUEUE_NUMBERS), MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, toBrokerAddr))); - } - - msgIdx = 3 * MSG_SENT_TIMES; - for (int i = 0; i < MSG_SENT_TIMES; i++) { - SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx, msgIdx++).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue); - SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult; - assertThat(sendResult2.getOrigBrokerName()).isEqualTo(toBrokerName); - assertThat(sendResult2.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS); - } - - { - int msgCount = 0; - Queue wantMsgIdx = new LinkedList<>(); - wantMsgIdx.addAll(IntStream.range(3 * MSG_SENT_TIMES, 4 * MSG_SENT_TIMES).boxed().collect(Collectors.toList())); - LOOP: - for (long offset = 0L; ; ) { - PullResult pullResult = consumer.pull(migratedMessageQueue, "*", offset, 3 * MSG_SENT_TIMES); - switch (pullResult.getPullStatus()) { - case NO_NEW_MSG: - assertThat(msgCount).as("msgCount with offset=%d", offset).isEqualTo(MSG_SENT_TIMES); - break LOOP; - case OFFSET_ILLEGAL: - assertThat(pullResult.getNextBeginOffset()).isNotEqualTo(Long.MIN_VALUE); - offset = pullResult.getNextBeginOffset(); - break; - case FOUND: - msgCount += pullResult.getMsgFoundList().size(); - boolean first = true; - for (MessageExt msg : pullResult.getMsgFoundList()) { - assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).as("offset=%d", offset).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx, wantMsgIdx.poll())); - if (first) { - assertThat(msg.getQueueOffset()).isGreaterThanOrEqualTo(offset); - first = false; - } else { - assertThat(msg.getQueueOffset()).isGreaterThan(offset); - } - offset = msg.getQueueOffset(); - } - offset = pullResult.getNextBeginOffset(); - break; - default: - Assert.fail(String.format(Locale.ENGLISH, "unexpected pull offset=%d status: %s", offset, pullResult)); - } - } - } - } - - @Test - public void test006_LogicalQueueNumChanged() throws Exception { - String methodName = getCurrentMethodName(); - int logicalQueueNum = brokerNum * QUEUE_NUMBERS; - - List publishMessageQueues; - publishMessageQueues = producer.fetchPublishMessageQueues(topic); - assertThat(publishMessageQueues).hasSize(logicalQueueNum); - List subscribeMessageQueues; - subscribeMessageQueues = consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList()); - assertThat(subscribeMessageQueues).hasSize(logicalQueueNum); - - logicalQueueNum++; - new UpdateTopicLogicalQueueNumCommand().execute(mqAdminExt, clusterName, topic, logicalQueueNum); - - int newAddLogicalQueueIdx = logicalQueueNum - 1; - MessageQueue newAddLogicalQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, newAddLogicalQueueIdx); - String newAddLogicalQueueBrokerName; - { - TopicRouteData topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic); - LogicalQueuesInfo info = topicRouteInfo.getLogicalQueuesInfo(); - assertThat(info).isNotNull(); - List queueRouteDataList = info.get(newAddLogicalQueueIdx); - assertThat(queueRouteDataList).isNotNull(); - assertThat(queueRouteDataList).hasSize(1); - LogicalQueueRouteData queueRouteData = queueRouteDataList.get(0); - newAddLogicalQueueBrokerName = queueRouteData.getBrokerName(); - assertThat(queueRouteData.getState()).isEqualTo(MessageQueueRouteState.Normal); - assertThat(queueRouteData.getLogicalQueueDelta()).isEqualTo(0); - assertThat(queueRouteData.getLogicalQueueIndex()).isEqualTo(newAddLogicalQueueIdx); - } - - publishMessageQueues = producer.fetchPublishMessageQueues(topic); - assertThat(publishMessageQueues).hasSize(logicalQueueNum); - Set logicalQueueIds = IntStream.range(0, logicalQueueNum).boxed().collect(Collectors.toSet()); - Map> queueIds = Maps.newHashMap(); - for (String brokerName : Arrays.asList(broker1Name, broker2Name)) { - queueIds.put(brokerName, IntStream.range(0, QUEUE_NUMBERS).boxed().collect(Collectors.toSet())); - } - queueIds.get(newAddLogicalQueueBrokerName).add(QUEUE_NUMBERS); - for (MessageQueue messageQueue : publishMessageQueues) { - assertThat(messageQueue.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME); - assertThat(logicalQueueIds.remove(messageQueue.getQueueId())).isTrue(); - for (int i = 0; i < MSG_SENT_TIMES; i++) { - SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-%d-%d", methodName, messageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), messageQueue); - assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(messageQueue.getBrokerName()); - assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(messageQueue.getQueueId()); - if (i == 0) { - SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult; - assertThat(queueIds.get(sendResult2.getOrigBrokerName()).remove(sendResult2.getOrigQueueId())).as("brokerName %s queueId %d", sendResult2.getOrigBrokerName(), sendResult2.getOrigQueueId()).isTrue(); - } - } - } - assertThat(logicalQueueIds).isEmpty(); - - subscribeMessageQueues = consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList()); - assertThat(subscribeMessageQueues).hasSize(logicalQueueNum); - subscribeMessageQueues.sort(Comparator.comparingInt(MessageQueue::getQueueId)); - logicalQueueIds.addAll(IntStream.range(0, logicalQueueNum).boxed().collect(Collectors.toSet())); - for (MessageQueue messageQueue : subscribeMessageQueues) { - assertThat(messageQueue.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME); - assertThat(logicalQueueIds.remove(messageQueue.getQueueId())).isTrue(); - long offset = mqAdminExt.minOffset(messageQueue); - assertThat(offset).isEqualTo(0); - PullResult pullResult = consumer.pull(messageQueue, "*", offset, 10); - assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND); - assertThat(pullResult.getMsgFoundList()).hasSize(MSG_SENT_TIMES); - for (int i = 0; i < MSG_SENT_TIMES; i++) { - MessageExt msg = pullResult.getMsgFoundList().get(i); - assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME); - assertThat(msg.getQueueId()).isEqualTo(messageQueue.getQueueId()); - assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d", methodName, messageQueue.getQueueId(), i)); - assertThat(msg.getQueueOffset()).isEqualTo(offset + i); - } - assertThat(maxOffsetUncommitted(messageQueue)).isEqualTo(offset + MSG_SENT_TIMES); - } - assertThat(logicalQueueIds).isEmpty(); - - // increase TopicConfig write queue first then increase logical queue, expect to reuse - String broker2Addr = brokerController2.getBrokerAddr(); - TopicConfig topicConfig = mqAdminExt.examineTopicConfig(broker2Addr, topic); - topicConfig.setWriteQueueNums(topicConfig.getWriteQueueNums() + 1); - topicConfig.setReadQueueNums(topicConfig.getReadQueueNums() + 1); - mqAdminExt.createAndUpdateTopicConfig(broker2Addr, topicConfig); - logicalQueueNum++; - new UpdateTopicLogicalQueueNumCommand().execute(mqAdminExt, clusterName, topic, logicalQueueNum); - { - newAddLogicalQueueIdx = logicalQueueNum -1; - TopicRouteData topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic); - LogicalQueuesInfo info = topicRouteInfo.getLogicalQueuesInfo(); - assertThat(info).isNotNull(); - List queueRouteDataList = info.get(newAddLogicalQueueIdx); - assertThat(queueRouteDataList).isNotNull(); - assertThat(queueRouteDataList).hasSize(1); - LogicalQueueRouteData queueRouteData = queueRouteDataList.get(0); - assertThat(queueRouteData.getState()).isEqualTo(MessageQueueRouteState.Normal); - assertThat(queueRouteData.getLogicalQueueDelta()).isEqualTo(0); - assertThat(queueRouteData.getLogicalQueueIndex()).isEqualTo(newAddLogicalQueueIdx); - assertThat(queueRouteData.getBrokerName()).isEqualTo(broker2Name); - assertThat(queueRouteData.getQueueId()).isEqualTo(topicConfig.getWriteQueueNums() -1); - } - - logicalQueueNum-=2; - new UpdateTopicLogicalQueueNumCommand().execute(mqAdminExt, clusterName, topic, logicalQueueNum); - - try { - producer.send(new Message(topic, "aaa".getBytes(StandardCharsets.UTF_8)), newAddLogicalQueue); - Assert.fail("write to decreased logical queue success, want it failed"); - } catch (MQBrokerException e) { - assertThat(e.getResponseCode()).isEqualTo(ResponseCode.NO_PERMISSION); - } - { - int offset = 0; - PullResult pullResult = consumer.pull(newAddLogicalQueue, "*", offset, 10); - assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND); - assertThat(pullResult.getMsgFoundList()).hasSize(MSG_SENT_TIMES); - for (int i = 0; i < MSG_SENT_TIMES; i++) { - MessageExt msg = pullResult.getMsgFoundList().get(i); - assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME); - assertThat(msg.getQueueId()).isEqualTo(newAddLogicalQueue.getQueueId()); - assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d", methodName, newAddLogicalQueue.getQueueId(), i)); - assertThat(msg.getQueueOffset()).isEqualTo(offset + i); - } - } - - // rotate to remove new add queue's data, and try pull again - { - BrokerController brokerController = brokerControllerMap.get(newAddLogicalQueueBrokerName); - rotateBrokerCommitLog(brokerController); - deleteCommitLogFiles(brokerController, 1); - } - { - int offset = 0; - PullResult pullResult = consumer.pull(newAddLogicalQueue, "*", offset, 10); - assertThat(pullResult.getPullStatus()).isIn(PullStatus.NO_NEW_MSG, PullStatus.NO_MATCHED_MSG); - } - } - - @Test - public void test007_LogicalQueueWritableEvenBrokerDown() throws Exception { - final String methodName = getCurrentMethodName(); - - final int logicalQueueIdx = 1; - - BrokerController brokerController3 = IntegrationTestBase.createAndStartBroker(nsAddr); - String broker3Name = brokerController3.getBrokerConfig().getBrokerName(); - brokerControllerMap.put(broker3Name, brokerController3); - await().atMost(30, TimeUnit.SECONDS).until(() -> mqAdminExt.examineBrokerClusterInfo().getBrokerAddrTable().containsKey(broker3Name)); - mqAdminExt.createAndUpdateTopicConfig(brokerController3.getBrokerAddr(), new TopicConfig(topic, 0, 0, PermName.PERM_READ | PermName.PERM_WRITE)); - - new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, logicalQueueIdx, brokerController3.getBrokerConfig().getBrokerName(), null); - - MessageQueue migrateMessageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, logicalQueueIdx); - { - for (int i = 0; i < MSG_SENT_TIMES; i++) { - SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-%d-%d", methodName, migrateMessageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), migrateMessageQueue); - assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migrateMessageQueue.getBrokerName()); - assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migrateMessageQueue.getQueueId()); - SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult; - assertThat(sendResult2.getOrigBrokerName()).isEqualTo(broker3Name); - assertThat(sendResult2.getOrigQueueId()).isEqualTo(0); - } - } - brokerController3.shutdown(); - brokerControllerMap.remove(broker3Name); - - assertThatThrownBy(() -> { - SendResult sendResult = producer.send(new Message(topic, "aaa".getBytes(StandardCharsets.UTF_8)), migrateMessageQueue); - logger.error("send should fail but got {}", sendResult); - }).isInstanceOf(RemotingException.class).hasMessageMatching("connect to [0-9.:]+ failed"); - - assertThatThrownBy(() -> { - new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, logicalQueueIdx, broker1Name, null); - }).hasRootCauseInstanceOf(RemotingConnectException.class).hasMessageContaining("migrateTopicLogicalQueuePrepare"); - - { - SendResult sendResult = producer.send(new Message(topic, "aaa".getBytes(StandardCharsets.UTF_8)), migrateMessageQueue); - assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migrateMessageQueue.getBrokerName()); - assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migrateMessageQueue.getQueueId()); - assertThat(sendResult.getQueueOffset()).isEqualTo(-1); - SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult; - assertThat(sendResult2.getOrigBrokerName()).isEqualTo(broker1Name); - assertThat(sendResult2.getOrigQueueId()).isIn( - /* CommitLog not rotated, will not reuse */QUEUE_NUMBERS, - /* CommitLog rotated in other test cases, will reuse */logicalQueueIdx - ); - } - } - - private static String getBrokerCommitLogFileName(BrokerController brokerController) throws IllegalAccessException { - DefaultMessageStore defaultMessageStore = (DefaultMessageStore) brokerController.getMessageStore(); - MappedFileQueue mfq = (MappedFileQueue) FieldUtils.readDeclaredField(defaultMessageStore.getCommitLog(), "mappedFileQueue", true); - return mfq.getLastMappedFile().getFileName(); - } - - private static void deleteCommitLogFiles(BrokerController brokerController, - int keepNum) throws IllegalAccessException { - CommitLog commitLog = ((DefaultMessageStore) brokerController.getMessageStore()).getCommitLog(); - commitLog.flush(); - MappedFileQueue mfq = (MappedFileQueue) FieldUtils.readDeclaredField(commitLog, "mappedFileQueue", true); - AtomicInteger count = new AtomicInteger(); - waitAtMost(5, TimeUnit.SECONDS).until(() -> { - count.getAndAdd(commitLog.deleteExpiredFile(0, 0, 5000, true, 1)); - return mfq.getMappedFiles().size() <= keepNum; - }); - brokerController.getTopicConfigManager().getLogicalQueueCleanHook().execute((DefaultMessageStore) brokerController.getMessageStore(), count.get()); - logger.info("deleteCommitLogFiles {} count {}", brokerController.getBrokerConfig().getBrokerName(), count.get()); - } - - private static void rotateBrokerCommitLog(BrokerController brokerController) throws IllegalAccessException { - CommitLog commitLog = ((DefaultMessageStore) brokerController.getMessageStore()).getCommitLog(); - commitLog.flush(); - String brokerName = brokerController.getBrokerConfig().getBrokerName(); - String fileName1 = getBrokerCommitLogFileName(brokerController); - logger.info("rotateBrokerCommitLog {} first {}", brokerName, fileName1); - int msgSize = 4 * 1024; - byte[] data = RandomStringUtils.randomAscii(msgSize).getBytes(StandardCharsets.UTF_8); - Message msg = new Message(placeholderTopic, data); - MessageQueue mq = new MessageQueue(placeholderTopic, brokerName, 0); - waitAtMost(5, TimeUnit.SECONDS).until(() -> { - for (int i = 0; i < 128; i++) { - producer.send(msg, mq); - } - commitLog.flush(); - String fileName2 = getBrokerCommitLogFileName(brokerController); - if (!fileName1.equals(fileName2)) { - logger.info("rotateBrokerCommitLog {} 4K msg last {}", brokerName, fileName2); - return true; - } - return false; - }); - } - - private long maxOffsetUncommitted(MessageQueue mq) throws IllegalAccessException, MQClientException { - DefaultMQAdminExtImpl defaultMQAdminExtImpl = (DefaultMQAdminExtImpl) FieldUtils.readDeclaredField(mqAdminExt, "defaultMQAdminExtImpl", true); - return defaultMQAdminExtImpl.maxOffset(mq, false); - } -} diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java new file mode 100644 index 00000000..9dd116dd --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java @@ -0,0 +1,76 @@ +package org.apache.rocketmq.test.smoke; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.rpc.ClientMetadata; +import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.util.MQRandomUtils; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.FixMethodOrder; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Map; +import java.util.Set; + +import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig; + +@FixMethodOrder +public class StaticTopicIT extends BaseConf { + + private static Logger logger = Logger.getLogger(StaticTopicIT.class); + private DefaultMQAdminExt defaultMQAdminExt; + private ClientMetadata clientMetadata; + + @Before + public void setUp() throws Exception { + defaultMQAdminExt = getAdmin(nsAddr); + waitBrokerRegistered(nsAddr, clusterName); + clientMetadata = new ClientMetadata(); + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + if (clusterInfo == null + || clusterInfo.getClusterAddrTable().isEmpty()) { + throw new RuntimeException("The Cluster info is empty"); + } + clientMetadata.refreshClusterInfo(clusterInfo); + + } + + @Test + public void testCreateStaticTopic() throws Exception { + String topic = "static" + MQRandomUtils.getRandomTopic(); + int queueNum = 10; + Set brokers = getBrokers(); + //create topic + { + Map brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); + Assert.assertTrue(brokerConfigMap.isEmpty()); + TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, getBrokers(), brokerConfigMap); + //If some succeed, and others fail, it will cause inconsistent data + for (Map.Entry entry : brokerConfigMap.entrySet()) { + String broker = entry.getKey(); + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicConfigAndQueueMapping configMapping = entry.getValue(); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false); + } + } + Map brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); + + TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(brokerConfigMap.values())), false, true); + Assert.assertEquals(queueNum, globalIdMap.size()); + + } + + @After + public void tearDown() { + super.shutdown(); + } + +} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index f1f237dd..857c76ed 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -150,13 +150,7 @@ public class DefaultMQAdminExtTest { topicRouteData.setBrokerDatas(brokerDatas); topicRouteData.setQueueDatas(new ArrayList()); topicRouteData.setFilterServerTable(new HashMap>()); - LogicalQueuesInfo logicalQueuesInfoinfo = new LogicalQueuesInfo(); - logicalQueuesInfoinfo.put(0, Lists.newArrayList( - new LogicalQueueRouteData(0, 0, new MessageQueue(topic1, broker1Name, 0), MessageQueueRouteState.ReadOnly, 0, 1000, 2000, 3000, broker1Addr), - new LogicalQueueRouteData(0, 1000, new MessageQueue(topic1, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr) - )); - topicRouteData.setLogicalQueuesInfo(logicalQueuesInfoinfo); - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), any())).thenReturn(topicRouteData); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData); HashMap result = new HashMap<>(); result.put("id", String.valueOf(MixAll.MASTER_ID)); -- GitLab