...
 
Commits (4)
    https://gitcode.net/apacherocketmq/rocketmq/-/commit/3d68590377079e290c8c5f34b9a8834f70da02c4 Polish the code 2021-11-23T11:58:04+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/2f4bf29603aec75be48aa4bd7d2570b10ed3bf2c Polish the code structure for static topic command 2021-11-23T15:00:01+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/e93536a9aeb42f9695767fe28d43b1a229265a67 Add IT Test for static topic 2021-11-23T20:21:28+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/fa39815666187647b6990af7a8a98e1779eb065d Catch the exception 2021-11-23T21:16:26+08:00 dongeforever dongeforever@apache.org
......@@ -1880,7 +1880,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(requestHeader.getTopic());
if (topicConfig == null) {
log.error("No topic in this broker, client: {} topic: {}", ctx.channel().remoteAddress(), requestHeader.getTopic());
response.setCode(ResponseCode.SYSTEM_ERROR);
//be care of the response code, should set "not-exist" explictly
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic in this broker. topic: " + requestHeader.getTopic());
return response;
}
......
......@@ -332,7 +332,7 @@ public class MQClientAPIImpl {
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
throws RemotingException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
......@@ -2708,7 +2708,7 @@ public class MQClientAPIImpl {
public TopicConfigAndQueueMapping getTopicConfig(final String brokerAddr, String topic,
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic);
header.setWithMapping(true);
......@@ -2720,15 +2720,19 @@ public class MQClientAPIImpl {
case ResponseCode.SUCCESS: {
return RemotingSerializable.decode(response.getBody(), TopicConfigAndQueueMapping.class);
}
//should check the exist
case ResponseCode.TOPIC_NOT_EXIST: {
//should return null?
break;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
throw new MQClientException(response.getCode(), response.getRemark());
}
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail, boolean force,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
......@@ -2753,6 +2757,6 @@ public class MQClientAPIImpl {
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
throw new MQClientException(response.getCode(), response.getRemark());
}
}
......@@ -91,6 +91,10 @@ public class ClientMetadata {
return brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID);
}
public ConcurrentMap<String, HashMap<Long, String>> getBrokerAddrTable() {
return brokerAddrTable;
}
public static ConcurrentMap<MessageQueue, String> topicRouteData2EndpointsForStaticTopic(final String topic, final TopicRouteData route) {
if (route.getTopicQueueMappingByBroker() == null
|| route.getTopicQueueMappingByBroker().isEmpty()) {
......@@ -112,7 +116,7 @@ public class ClientMetadata {
for (Map.Entry<String, TopicQueueMappingInfo> entry : mappingInfos) {
TopicQueueMappingInfo info = entry.getValue();
if (info.getEpoch() >= maxTotalNumOfEpoch && info.getTotalQueues() > maxTotalNums) {
maxTotalNums = entry.getValue().getTotalQueues();
maxTotalNums = info.getTotalQueues();
}
for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
int globalId = idEntry.getKey();
......
......@@ -18,17 +18,23 @@ package org.apache.rocketmq.common.statictopic;
import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import java.io.File;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
public class TopicQueueMappingUtils {
......@@ -306,6 +312,167 @@ public class TopicQueueMappingUtils {
}
}
public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<Long, Integer>(System.currentTimeMillis(), queueNum);
if (!brokerConfigMap.isEmpty()) {
maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
}
if (queueNum < globalIdMap.size()) {
throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
}
//check the queue number
if (queueNum == globalIdMap.size()) {
throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing");
}
//the check is ok, now do the mapping allocation
Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
for (String broker: targetBrokers) {
brokerNumMap.put(broker, 0);
}
final Map<Integer, String> oldIdToBroker = new HashMap<Integer, String>();
for (Map.Entry<Integer, TopicQueueMappingOne> entry : globalIdMap.entrySet()) {
String leaderbroker = entry.getValue().getBname();
oldIdToBroker.put(entry.getKey(), leaderbroker);
if (!brokerNumMap.containsKey(leaderbroker)) {
brokerNumMap.put(leaderbroker, 1);
} else {
brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1);
}
}
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap);
allocator.upToNum(queueNum);
Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
//construct the topic configAndMapping
long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
Integer queueId = e.getKey();
String broker = e.getValue();
if (globalIdMap.containsKey(queueId)) {
//ignore the exited
continue;
}
TopicConfigAndQueueMapping configMapping;
if (!brokerConfigMap.containsKey(broker)) {
configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
configMapping.setWriteQueueNums(1);
configMapping.setReadQueueNums(1);
brokerConfigMap.put(broker, configMapping);
} else {
configMapping = brokerConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
}
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
}
// set the topic config
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
TopicConfigAndQueueMapping configMapping = entry.getValue();
configMapping.getMappingDetail().setEpoch(newEpoch);
configMapping.getMappingDetail().setTotalQueues(queueNum);
}
//double check the config
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, brokerConfigMap, new HashSet<String>(), new HashSet<String>());
}
public static TopicRemappingDetailWrapper remappingStaticTopic(String topic, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, Set<String> targetBrokers) {
final Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
final Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
//the check is ok, now do the mapping allocation
int maxNum = maxEpochAndNum.getValue();
Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
for (String broker: targetBrokers) {
brokerNumMap.put(broker, 0);
}
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap);
allocator.upToNum(maxNum);
Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
Map<Integer, String> expectedIdToBroker = new HashMap<Integer, String>();
//the following logic will make sure that, for one broker, either "map in" or "map out"
//It can't both, map in some queues but also map out some queues.
for (Map.Entry<Integer, TopicQueueMappingOne> entry : globalIdMap.entrySet()) {
Integer queueId = entry.getKey();
TopicQueueMappingOne mappingOne = entry.getValue();
String leaderBroker = mappingOne.getBname();
if (expectedBrokerNumMap.containsKey(leaderBroker)) {
if (expectedBrokerNumMap.get(leaderBroker) > 0) {
expectedIdToBroker.put(queueId, leaderBroker);
expectedBrokerNumMap.put(leaderBroker, expectedBrokerNumMap.get(leaderBroker) - 1);
} else {
waitAssignQueues.add(queueId);
expectedBrokerNumMap.remove(leaderBroker);
}
} else {
waitAssignQueues.add(queueId);
}
}
for (Map.Entry<String, Integer> entry: expectedBrokerNumMap.entrySet()) {
String broker = entry.getKey();
Integer queueNum = entry.getValue();
for (int i = 0; i < queueNum; i++) {
Integer queueId = waitAssignQueues.poll();
assert queueId != null;
expectedIdToBroker.put(queueId, broker);
}
}
long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
//Now construct the remapping info
Set<String> brokersToMapOut = new HashSet<String>();
Set<String> brokersToMapIn = new HashSet<String>();
for (Map.Entry<Integer, String> mapEntry : expectedIdToBroker.entrySet()) {
Integer queueId = mapEntry.getKey();
String broker = mapEntry.getValue();
TopicQueueMappingOne topicQueueMappingOne = globalIdMap.get(queueId);
assert topicQueueMappingOne != null;
if (topicQueueMappingOne.getBname().equals(broker)) {
continue;
}
//remapping
final String mapInBroker = broker;
final String mapOutBroker = topicQueueMappingOne.getBname();
brokersToMapIn.add(mapInBroker);
brokersToMapOut.add(mapOutBroker);
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(mapInBroker);
TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(mapOutBroker);
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
List<LogicQueueMappingItem> items = new ArrayList<LogicQueueMappingItem>(topicQueueMappingOne.getItems());
LogicQueueMappingItem last = items.get(items.size() - 1);
items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1));
ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
//Use the same object
mapInConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
}
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
TopicConfigAndQueueMapping configMapping = entry.getValue();
configMapping.getMappingDetail().setEpoch(newEpoch);
configMapping.getMappingDetail().setTotalQueues(maxNum);
}
//double check
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, brokerConfigMap, brokersToMapIn, brokersToMapOut);
}
}
......@@ -18,7 +18,6 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
private final String topic;
private final String type;
private final long epoch;
private Map<Integer, String> expectedIdToBroker = new HashMap<Integer, String>();
private Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
......@@ -26,12 +25,13 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
private Set<String> brokerToMapOut = new HashSet<String>();
public TopicRemappingDetailWrapper(String topic, String type, long epoch, Map<Integer, String> expectedIdToBroker, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
public TopicRemappingDetailWrapper(String topic, String type, long epoch, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, Set<String> brokerToMapIn, Set<String> brokerToMapOut) {
this.topic = topic;
this.type = type;
this.epoch = epoch;
this.expectedIdToBroker = expectedIdToBroker;
this.brokerConfigMap = brokerConfigMap;
this.brokerToMapIn = brokerToMapIn;
this.brokerToMapOut = brokerToMapOut;
}
public String getTopic() {
......@@ -46,10 +46,6 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
return epoch;
}
public Map<Integer, String> getExpectedIdToBroker() {
return expectedIdToBroker;
}
public Map<String, TopicConfigAndQueueMapping> getBrokerConfigMap() {
return brokerConfigMap;
}
......@@ -58,15 +54,7 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
return brokerToMapIn;
}
public void setBrokerToMapIn(Set<String> brokerToMapIn) {
this.brokerToMapIn = brokerToMapIn;
}
public Set<String> getBrokerToMapOut() {
return brokerToMapOut;
}
public void setBrokerToMapOut(Set<String> brokerToMapOut) {
this.brokerToMapOut = brokerToMapOut;
}
}
......@@ -161,6 +161,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
});
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
......
......@@ -19,8 +19,10 @@ package org.apache.rocketmq.test.base;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
......@@ -120,6 +122,13 @@ public class BaseConf {
return group;
}
public static DefaultMQAdminExt getAdmin(String nsAddr) {
final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(500);
mqAdminExt.setNamesrvAddr(nsAddr);
mqClients.add(mqAdminExt);
return mqAdminExt;
}
public static RMQNormalProducer getProducer(String nsAddr, String topic) {
return getProducer(nsAddr, topic, false);
}
......@@ -197,6 +206,13 @@ public class BaseConf {
shutdown(mqClients);
}
public static Set<String> getBrokers() {
Set<String> brokers = new HashSet<>();
brokers.add(broker1Name);
brokers.add(broker2Name);
return brokers;
}
public static void shutdown(List<Object> mqClients) {
mqClients.forEach(mqClient -> ForkJoinPool.commonPool().execute(() -> {
if (mqClient instanceof AbstractMQProducer) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.smoke;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendResultForLogicalQueue;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MappedFileQueue;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.logicalqueue.MigrateTopicLogicalQueueCommand;
import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand;
import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueNumCommand;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Optional.ofNullable;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.waitAtMost;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class LogicalQueueIT {
private static final Logger logger = LoggerFactory.getLogger(LogicalQueueIT.class);
public static String nsAddr;
private static String broker1Name;
private static String broker2Name;
private static String clusterName;
private static int brokerNum;
private final static int QUEUE_NUMBERS = 8;
private static NamesrvController namesrvController;
private static BrokerController brokerController1;
private static BrokerController brokerController2;
private static Map<String, BrokerController> brokerControllerMap;
private final static List<Object> mqClients = new ArrayList<>();
private static DefaultMQProducer producer;
private static DefaultMQPullConsumer consumer;
private static DefaultMQAdminExt mqAdminExt;
private static volatile String topic = null;
private static final String placeholderTopic = "placeholder";
private static final int MSG_SENT_TIMES = 3;
private static final int COMMIT_LOG_FILE_SIZE = 512 * 1024;
@BeforeClass
public static void beforeClass() throws Exception {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
namesrvController = IntegrationTestBase.createAndStartNamesrv();
nsAddr = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
int oldCommitLogSize = IntegrationTestBase.COMMIT_LOG_SIZE;
IntegrationTestBase.COMMIT_LOG_SIZE = COMMIT_LOG_FILE_SIZE;
brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr);
brokerController2 = IntegrationTestBase.createAndStartBroker(nsAddr);
IntegrationTestBase.COMMIT_LOG_SIZE = oldCommitLogSize;
clusterName = brokerController1.getBrokerConfig().getBrokerClusterName();
broker1Name = brokerController1.getBrokerConfig().getBrokerName();
broker2Name = brokerController2.getBrokerConfig().getBrokerName();
brokerNum = 2;
brokerControllerMap = ImmutableList.of(brokerController1, brokerController2).stream().collect(Collectors.toMap(input -> input.getBrokerConfig().getBrokerName(), Function.identity()));
BaseConf.waitBrokerRegistered(nsAddr, clusterName);
producer = new DefaultMQProducer(MQRandomUtils.getRandomConsumerGroup());
mqClients.add(producer);
producer.setNamesrvAddr(nsAddr);
producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
producer.setSendMsgTimeout(1000);
producer.start();
consumer = new DefaultMQPullConsumer(BaseConf.initConsumerGroup());
mqClients.add(consumer);
consumer.setNamesrvAddr(nsAddr);
consumer.setConsumerPullTimeoutMillis(1000);
consumer.start();
mqAdminExt = new DefaultMQAdminExt(1000);
mqClients.add(mqAdminExt);
mqAdminExt.setNamesrvAddr(nsAddr);
mqAdminExt.start();
mqAdminExt.createTopic(clusterName, placeholderTopic, 1);
}
@AfterClass
public static void afterClass() {
BaseConf.shutdown(mqClients);
brokerControllerMap.forEach((s, brokerController) -> brokerController.shutdown());
ofNullable(namesrvController).ifPresent(obj -> ForkJoinPool.commonPool().execute(obj::shutdown));
}
@Before
public void setUp() throws Exception {
topic = "tt-" + MQRandomUtils.getRandomTopic();
logger.info("use topic: {}", topic);
mqAdminExt.createTopic(clusterName, topic, QUEUE_NUMBERS);
assertThat(mqAdminExt.examineTopicRouteInfo(topic).getBrokerDatas()).hasSize(brokerNum);
await().atMost(5, TimeUnit.SECONDS).until(() -> !mqAdminExt.examineTopicStats(topic).getOffsetTable().isEmpty());
consumer.setRegisterTopics(Collections.singleton(topic));
// consumer.setMessageQueueListener & consumer.registerMessageQueueListener are useless in DefaultMQPullConsumer, they will never work, so do not need to test it
new UpdateTopicLogicalQueueMappingCommand().execute(mqAdminExt, topic, brokerControllerMap.values().stream().map(BrokerController::getBrokerAddr).collect(Collectors.toSet()));
}
private static String getCurrentMethodName() {
// 0: getStackTrace
// 1: getCurrentMethodName
// 2: __realMethod__
return Thread.currentThread().getStackTrace()[2].getMethodName();
}
@Test
public void test001_SendPullSync() throws Exception {
String methodName = getCurrentMethodName();
List<MessageQueue> publishMessageQueues = producer.fetchPublishMessageQueues(topic);
assertThat(publishMessageQueues).hasSize(brokerNum * QUEUE_NUMBERS);
Set<Integer> queueIds = IntStream.range(0, brokerNum * QUEUE_NUMBERS).boxed().collect(Collectors.toSet());
for (MessageQueue messageQueue : publishMessageQueues) {
assertThat(messageQueue.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
assertThat(queueIds.remove(messageQueue.getQueueId())).isTrue();
for (int i = 0; i < MSG_SENT_TIMES; i++) {
SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-sync-%d-%d", methodName, messageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), messageQueue);
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(messageQueue.getBrokerName());
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(messageQueue.getQueueId());
}
}
assertThat(queueIds).isEmpty();
List<MessageQueue> subscribeMessageQueues = consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList());
assertThat(subscribeMessageQueues).hasSize(brokerNum * QUEUE_NUMBERS);
subscribeMessageQueues.sort(Comparator.comparingInt(MessageQueue::getQueueId));
queueIds.addAll(IntStream.range(0, brokerNum * QUEUE_NUMBERS).boxed().collect(Collectors.toSet()));
for (MessageQueue messageQueue : subscribeMessageQueues) {
assertThat(messageQueue.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
assertThat(queueIds.remove(messageQueue.getQueueId())).isTrue();
long offset = mqAdminExt.minOffset(messageQueue);
PullResult pullResult = consumer.pull(messageQueue, "*", offset, 10);
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
assertThat(pullResult.getMsgFoundList()).hasSize(MSG_SENT_TIMES);
offset = -1;
for (int i = 0; i < MSG_SENT_TIMES; i++) {
MessageExt msg = pullResult.getMsgFoundList().get(i);
assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
assertThat(msg.getQueueId()).isEqualTo(messageQueue.getQueueId());
assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-sync-%d-%d", methodName, messageQueue.getQueueId(), i));
if (i > 0) {
assertThat(msg.getQueueOffset()).isEqualTo(offset + i);
} else {
offset = msg.getQueueOffset();
}
}
assertThat(maxOffsetUncommitted(messageQueue)).isEqualTo(offset + MSG_SENT_TIMES);
}
assertThat(queueIds).isEmpty();
}
@Test
public void test002_SendPullAsync() throws Exception {
String methodName = getCurrentMethodName();
List<MessageQueue> publishMessageQueues = producer.fetchPublishMessageQueues(topic);
for (MessageQueue messageQueue : publishMessageQueues) {
for (int i = 0; i < MSG_SENT_TIMES; i++) {
CompletableFuture<SendResult> future = new CompletableFuture<>();
producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-async-%d-%d", methodName, messageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), messageQueue, new SendCallback() {
@Override public void onSuccess(SendResult sendResult) {
future.complete(sendResult);
}
@Override public void onException(Throwable e) {
future.completeExceptionally(e);
}
});
SendResult sendResult = future.get();
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(messageQueue.getBrokerName());
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(messageQueue.getQueueId());
}
}
List<MessageQueue> subscribeMessageQueues = consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList());
for (MessageQueue messageQueue : subscribeMessageQueues) {
long offset = mqAdminExt.minOffset(messageQueue);
CompletableFuture<PullResult> future = new CompletableFuture<>();
consumer.pull(messageQueue, "*", offset, 10, new PullCallback() {
@Override public void onSuccess(PullResult pullResult) {
future.complete(pullResult);
}
@Override public void onException(Throwable e) {
future.completeExceptionally(e);
}
});
PullResult pullResult = future.get();
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
assertThat(pullResult.getMsgFoundList()).hasSize(MSG_SENT_TIMES);
offset = -1;
Iterator<MessageExt> it = pullResult.getMsgFoundList().iterator();
for (int i = 0; i < MSG_SENT_TIMES; i++) {
MessageExt msg = it.next();
assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
assertThat(msg.getQueueId()).isEqualTo(messageQueue.getQueueId());
assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-async-%d-%d", methodName, messageQueue.getQueueId(), i));
if (i > 0) {
assertThat(msg.getQueueOffset()).isEqualTo(offset + i);
} else {
offset = msg.getQueueOffset();
}
}
}
}
@Test
public void test003_MigrateOnceWithoutData() throws Exception {
final String methodName = getCurrentMethodName();
final int logicalQueueIdx = 1;
TopicRouteData topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
List<LogicalQueueRouteData> logicalQueueRouteDataList1 = topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx);
LogicalQueueRouteData lastLogicalQueueRouteData1 = logicalQueueRouteDataList1.get(logicalQueueRouteDataList1.size() - 1);
String newBrokerName;
if (lastLogicalQueueRouteData1.getBrokerName().equals(broker1Name)) {
newBrokerName = broker2Name;
} else {
newBrokerName = broker1Name;
}
MessageQueue migratedMessageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, logicalQueueIdx);
new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, logicalQueueIdx, newBrokerName, null);
topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
assertThat(topicRouteInfo.getLogicalQueuesInfo()).isNotNull();
for (Map.Entry<Integer, List<LogicalQueueRouteData>> entry : topicRouteInfo.getLogicalQueuesInfo().entrySet()) {
List<LogicalQueueRouteData> logicalQueueRouteDataList2 = entry.getValue();
if (entry.getKey() == logicalQueueIdx) {
assertThat(logicalQueueRouteDataList2).hasSize(logicalQueueRouteDataList1.size() + 1);
LogicalQueueRouteData lastLogicalQueueRouteData2 = logicalQueueRouteDataList2.get(logicalQueueRouteDataList2.size() - 2);
assertThat(lastLogicalQueueRouteData2.getMessageQueue()).isEqualTo(lastLogicalQueueRouteData1.getMessageQueue());
assertThat(lastLogicalQueueRouteData2.getOffsetMax()).isGreaterThanOrEqualTo(0L);
assertThat(lastLogicalQueueRouteData2.getMessagesCount()).isEqualTo(0L);
assertThat(lastLogicalQueueRouteData2.isWritable()).isFalse();
assertThat(lastLogicalQueueRouteData2.isReadable()).isFalse();
assertThat(lastLogicalQueueRouteData2.isExpired()).isTrue();
assertThat(lastLogicalQueueRouteData2.getLogicalQueueDelta()).isEqualTo(0L);
LogicalQueueRouteData lastLogicalQueueRouteData3 = logicalQueueRouteDataList2.get(logicalQueueRouteDataList2.size() - 1);
assertThat(lastLogicalQueueRouteData3.getBrokerName()).isEqualTo(newBrokerName);
assertThat(lastLogicalQueueRouteData3.getOffsetMax()).isLessThan(0L);
assertThat(lastLogicalQueueRouteData3.isWritable()).isTrue();
assertThat(lastLogicalQueueRouteData3.isReadable()).isTrue();
assertThat(lastLogicalQueueRouteData3.isExpired()).isFalse();
assertThat(lastLogicalQueueRouteData3.getLogicalQueueDelta()).isEqualTo(0L);
} else {
assertThat(logicalQueueRouteDataList2).hasSize(1);
LogicalQueueRouteData logicalQueueRouteData = logicalQueueRouteDataList2.get(0);
assertThat(logicalQueueRouteData.getOffsetMax()).isLessThan(0L);
assertThat(logicalQueueRouteData.isWritable()).isTrue();
assertThat(logicalQueueRouteData.isReadable()).isTrue();
assertThat(logicalQueueRouteData.isExpired()).isFalse();
assertThat(logicalQueueRouteData.getLogicalQueueDelta()).isEqualTo(0L);
}
}
List<MessageQueue> subscribeMessageQueues = consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList());
assertThat(subscribeMessageQueues).hasSize(brokerNum * QUEUE_NUMBERS);
for (MessageQueue mq : subscribeMessageQueues) {
assertThat(mqAdminExt.minOffset(mq)).isEqualTo(0L);
}
for (int i = 0; i < MSG_SENT_TIMES; i++) {
SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-sync-%d-%d", methodName, migratedMessageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue);
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migratedMessageQueue.getBrokerName());
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migratedMessageQueue.getQueueId());
SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult;
assertThat(sendResult2.getOrigBrokerName()).isEqualTo(newBrokerName);
assertThat(sendResult2.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS);
}
for (int i = 0; i < MSG_SENT_TIMES; i++) {
CompletableFuture<SendResult> future = new CompletableFuture<>();
producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-async-%d-%d", methodName, migratedMessageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue, new SendCallback() {
@Override public void onSuccess(SendResult sendResult) {
future.complete(sendResult);
}
@Override public void onException(Throwable e) {
future.completeExceptionally(e);
}
});
SendResult sendResult = future.get();
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migratedMessageQueue.getBrokerName());
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migratedMessageQueue.getQueueId());
SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult;
assertThat(sendResult2.getOrigBrokerName()).isEqualTo(newBrokerName);
assertThat(sendResult2.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS);
}
assertThat(maxOffsetUncommitted(migratedMessageQueue)).isEqualTo(2 * MSG_SENT_TIMES);
waitAtMost(5, TimeUnit.SECONDS).until(() -> mqAdminExt.maxOffset(migratedMessageQueue) == 2 * MSG_SENT_TIMES);
PullResult pullResult = consumer.pull(migratedMessageQueue, "*", 0L, 2 * MSG_SENT_TIMES);
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
assertThat(pullResult.getMinOffset()).isEqualTo(0);
assertThat(pullResult.getMaxOffset()).isEqualTo(2 * MSG_SENT_TIMES);
assertThat(pullResult.getNextBeginOffset()).isEqualTo(2 * MSG_SENT_TIMES);
List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
assertThat(msgFoundList).hasSize(2 * MSG_SENT_TIMES);
Iterator<MessageExt> it = pullResult.getMsgFoundList().iterator();
long offset = 0L;
for (String prefix : new String[] {"sync", "async"}) {
for (int i = 0; i < MSG_SENT_TIMES; i++) {
MessageExt msg = it.next();
assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
assertThat(msg.getQueueId()).isEqualTo(migratedMessageQueue.getQueueId());
assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-%s-%d-%d", methodName, prefix, migratedMessageQueue.getQueueId(), i));
assertThat(msg.getQueueOffset()).isEqualTo(offset);
offset++;
}
}
offset = pullResult.getNextBeginOffset();
CompletableFuture<PullResult> future = new CompletableFuture<>();
consumer.pull(migratedMessageQueue, "*", offset, 10, new PullCallback() {
@Override public void onSuccess(PullResult pullResult) {
future.complete(pullResult);
}
@Override public void onException(Throwable e) {
future.completeExceptionally(e);
}
});
pullResult = future.get();
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
assertThat(pullResult.getMinOffset()).isEqualTo(0);
assertThat(pullResult.getMaxOffset()).isEqualTo(2 * MSG_SENT_TIMES);
assertThat(pullResult.getNextBeginOffset()).isEqualTo(2 * MSG_SENT_TIMES);
assertThat(pullResult.getMsgFoundList()).isNull();
}
@Test
public void test004_MigrateOnceWithData() throws Exception {
final String methodName = getCurrentMethodName();
final int logicalQueueIdx = 1;
TopicRouteData topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
List<LogicalQueueRouteData> logicalQueueRouteDataList1 = topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx);
LogicalQueueRouteData lastLogicalQueueRouteData1 = logicalQueueRouteDataList1.get(logicalQueueRouteDataList1.size() - 1);
String newBrokerName;
if (lastLogicalQueueRouteData1.getBrokerName().equals(broker1Name)) {
newBrokerName = broker2Name;
} else {
newBrokerName = broker1Name;
}
MessageQueue migratedMessageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, logicalQueueIdx);
for (int i = 0; i < MSG_SENT_TIMES; i++) {
SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-sync-%d-%d", methodName, migratedMessageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue);
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migratedMessageQueue.getBrokerName());
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migratedMessageQueue.getQueueId());
}
assertThat(maxOffsetUncommitted(migratedMessageQueue)).isEqualTo(MSG_SENT_TIMES);
waitAtMost(5, TimeUnit.SECONDS).until(() -> mqAdminExt.maxOffset(migratedMessageQueue) == MSG_SENT_TIMES);
{
long offset = 0L;
PullResult pullResult = consumer.pull(migratedMessageQueue, "*", offset, 2 * MSG_SENT_TIMES);
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
assertThat(pullResult.getMinOffset()).isEqualTo(0);
assertThat(pullResult.getMaxOffset()).isEqualTo(MSG_SENT_TIMES);
assertThat(pullResult.getNextBeginOffset()).isEqualTo(MSG_SENT_TIMES);
List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
assertThat(msgFoundList).hasSize(MSG_SENT_TIMES);
Iterator<MessageExt> it = pullResult.getMsgFoundList().iterator();
for (int i = 0; i < MSG_SENT_TIMES; i++) {
MessageExt msg = it.next();
assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
assertThat(msg.getQueueId()).isEqualTo(migratedMessageQueue.getQueueId());
assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-sync-%d-%d", methodName, migratedMessageQueue.getQueueId(), i));
assertThat(msg.getQueueOffset()).isEqualTo(offset);
offset++;
}
}
new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, logicalQueueIdx, newBrokerName, null);
topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
assertThat(topicRouteInfo.getLogicalQueuesInfo()).isNotNull();
for (Map.Entry<Integer, List<LogicalQueueRouteData>> entry : topicRouteInfo.getLogicalQueuesInfo().entrySet()) {
List<LogicalQueueRouteData> logicalQueueRouteDataList2 = entry.getValue();
if (entry.getKey() == logicalQueueIdx) {
assertThat(logicalQueueRouteDataList2).hasSize(logicalQueueRouteDataList1.size() + 1);
LogicalQueueRouteData lastLogicalQueueRouteData2 = logicalQueueRouteDataList2.get(logicalQueueRouteDataList2.size() - 2);
assertThat(lastLogicalQueueRouteData2.getMessageQueue()).isEqualTo(lastLogicalQueueRouteData1.getMessageQueue());
assertThat(lastLogicalQueueRouteData2.getOffsetMax()).isGreaterThanOrEqualTo(0L);
assertThat(lastLogicalQueueRouteData2.getMessagesCount()).isEqualTo(MSG_SENT_TIMES);
assertThat(lastLogicalQueueRouteData2.isWritable()).isFalse();
assertThat(lastLogicalQueueRouteData2.isReadable()).isTrue();
assertThat(lastLogicalQueueRouteData2.isExpired()).isFalse();
assertThat(lastLogicalQueueRouteData2.getLogicalQueueDelta()).isEqualTo(0L);
LogicalQueueRouteData lastLogicalQueueRouteData3 = logicalQueueRouteDataList2.get(logicalQueueRouteDataList2.size() - 1);
assertThat(lastLogicalQueueRouteData3.getBrokerName()).isEqualTo(newBrokerName);
assertThat(lastLogicalQueueRouteData3.getOffsetMax()).isLessThan(0L);
assertThat(lastLogicalQueueRouteData3.isWritable()).isTrue();
assertThat(lastLogicalQueueRouteData3.isReadable()).isTrue();
assertThat(lastLogicalQueueRouteData3.isExpired()).isFalse();
assertThat(lastLogicalQueueRouteData3.getLogicalQueueDelta()).isEqualTo(MSG_SENT_TIMES);
} else {
assertThat(logicalQueueRouteDataList2).hasSize(1);
LogicalQueueRouteData logicalQueueRouteData = logicalQueueRouteDataList2.get(0);
assertThat(logicalQueueRouteData.getOffsetMax()).isLessThan(0L);
assertThat(logicalQueueRouteData.isWritable()).isTrue();
assertThat(logicalQueueRouteData.isReadable()).isTrue();
assertThat(logicalQueueRouteData.isExpired()).isFalse();
assertThat(logicalQueueRouteData.getLogicalQueueDelta()).isEqualTo(0L);
}
}
assertThat(migratedMessageQueue).isNotNull();
List<MessageQueue> subscribeMessageQueues = consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList());
assertThat(subscribeMessageQueues).hasSize(brokerNum * QUEUE_NUMBERS);
for (MessageQueue mq : subscribeMessageQueues) {
assertThat(mqAdminExt.minOffset(mq)).isEqualTo(0L);
}
for (int i = 0; i < MSG_SENT_TIMES; i++) {
CompletableFuture<SendResult> future = new CompletableFuture<>();
producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-async-%d-%d", methodName, migratedMessageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue, new SendCallback() {
@Override public void onSuccess(SendResult sendResult) {
future.complete(sendResult);
}
@Override public void onException(Throwable e) {
future.completeExceptionally(e);
}
});
SendResult sendResult = future.get();
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migratedMessageQueue.getBrokerName());
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migratedMessageQueue.getQueueId());
SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult;
assertThat(sendResult2.getOrigBrokerName()).isEqualTo(newBrokerName);
assertThat(sendResult2.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS);
}
assertThat(maxOffsetUncommitted(migratedMessageQueue)).isEqualTo(2 * MSG_SENT_TIMES);
waitAtMost(5, TimeUnit.SECONDS).until(() -> mqAdminExt.maxOffset(migratedMessageQueue) == 2 * MSG_SENT_TIMES);
long offset = 0L;
PullResult pullResult = consumer.pull(migratedMessageQueue, "*", offset, 2 * MSG_SENT_TIMES);
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
assertThat(pullResult.getMinOffset()).isEqualTo(0);
assertThat(pullResult.getMaxOffset()).isEqualTo(MSG_SENT_TIMES);
assertThat(pullResult.getNextBeginOffset()).isEqualTo(MSG_SENT_TIMES);
List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
assertThat(msgFoundList).hasSize(MSG_SENT_TIMES);
Iterator<MessageExt> it = pullResult.getMsgFoundList().iterator();
for (int i = 0; i < MSG_SENT_TIMES; i++) {
MessageExt msg = it.next();
assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
assertThat(msg.getQueueId()).isEqualTo(migratedMessageQueue.getQueueId());
assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-sync-%d-%d", methodName, migratedMessageQueue.getQueueId(), i));
assertThat(msg.getQueueOffset()).isEqualTo(offset);
offset++;
}
offset = pullResult.getNextBeginOffset();
CompletableFuture<PullResult> pullResultFuture = new CompletableFuture<>();
consumer.pull(migratedMessageQueue, "*", offset, 2 * MSG_SENT_TIMES, new PullCallback() {
@Override public void onSuccess(PullResult pullResult) {
pullResultFuture.complete(pullResult);
}
@Override public void onException(Throwable e) {
pullResultFuture.completeExceptionally(e);
}
});
pullResult = pullResultFuture.get();
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
assertThat(pullResult.getMinOffset()).isEqualTo(MSG_SENT_TIMES);
assertThat(pullResult.getMaxOffset()).isEqualTo(2 * MSG_SENT_TIMES);
assertThat(pullResult.getNextBeginOffset()).isEqualTo(2 * MSG_SENT_TIMES);
msgFoundList = pullResult.getMsgFoundList();
assertThat(msgFoundList).hasSize(MSG_SENT_TIMES);
it = pullResult.getMsgFoundList().iterator();
for (int i = 0; i < MSG_SENT_TIMES; i++) {
MessageExt msg = it.next();
assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
assertThat(msg.getQueueId()).isEqualTo(migratedMessageQueue.getQueueId());
assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-async-%d-%d", methodName, migratedMessageQueue.getQueueId(), i));
assertThat(msg.getQueueOffset()).isEqualTo(offset);
offset++;
}
offset = pullResult.getNextBeginOffset();
pullResult = consumer.pull(migratedMessageQueue, "*", offset, 10);
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
assertThat(pullResult.getMinOffset()).isEqualTo(MSG_SENT_TIMES);
assertThat(pullResult.getMaxOffset()).isEqualTo(2 * MSG_SENT_TIMES);
assertThat(pullResult.getNextBeginOffset()).isEqualTo(2 * MSG_SENT_TIMES);
assertThat(pullResult.getMsgFoundList()).isNull();
}
@Test
public void test005_MigrateWithDataBackAndForth() throws Exception {
final String methodName = getCurrentMethodName();
final int logicalQueueIdx = 1;
MessageQueue migratedMessageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, logicalQueueIdx);
BrokerController brokerController;
TopicRouteData topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
LogicalQueueRouteData lastLogicalQueueRouteData;
{
List<LogicalQueueRouteData> logicalQueueRouteDataList = topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx);
lastLogicalQueueRouteData = logicalQueueRouteDataList.get(logicalQueueRouteDataList.size() - 1);
}
final String fromBrokerName, toBrokerName, fromBrokerAddr, toBrokerAddr;
if (lastLogicalQueueRouteData.getBrokerName().equals(broker1Name)) {
fromBrokerName = broker1Name;
fromBrokerAddr = brokerController1.getBrokerAddr();
toBrokerName = broker2Name;
toBrokerAddr = brokerController2.getBrokerAddr();
} else {
fromBrokerName = broker2Name;
fromBrokerAddr = brokerController2.getBrokerAddr();
toBrokerName = broker1Name;
toBrokerAddr = brokerController1.getBrokerAddr();
}
int msgIdx = 0;
for (int i = 0; i < MSG_SENT_TIMES; i++) {
SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx, msgIdx++).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue);
SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult;
assertThat(sendResult2.getOrigBrokerName()).isEqualTo(fromBrokerName);
assertThat(sendResult2.getOrigQueueId()).isEqualTo(logicalQueueIdx);
}
rotateBrokerCommitLog(brokerControllerMap.get(fromBrokerName));
new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, logicalQueueIdx, toBrokerName, null);
{
LogicalQueuesInfo info;
List<LogicalQueueRouteData> logicalQueueRouteDataList;
info = mqAdminExt.queryTopicLogicalQueueMapping(fromBrokerAddr, topic);
logicalQueueRouteDataList = info.get(logicalQueueIdx);
assertThat(logicalQueueRouteDataList).hasSize(2);
info = mqAdminExt.queryTopicLogicalQueueMapping(toBrokerAddr, topic);
logicalQueueRouteDataList = info.get(logicalQueueIdx);
assertThat(logicalQueueRouteDataList).hasSize(1);
}
for (int i = 0; i < MSG_SENT_TIMES; i++) {
SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx, msgIdx++).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue);
SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult;
assertThat(sendResult2.getOrigBrokerName()).isEqualTo(toBrokerName);
assertThat(sendResult2.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS);
}
new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, logicalQueueIdx, fromBrokerName, null);
// now will reuse queue with a ReadOnly one
{
LogicalQueuesInfo info;
List<LogicalQueueRouteData> logicalQueueRouteDataList;
info = mqAdminExt.queryTopicLogicalQueueMapping(fromBrokerAddr, topic);
logicalQueueRouteDataList = info.get(logicalQueueIdx);
assertThat(logicalQueueRouteDataList).hasSize(3);
info = mqAdminExt.queryTopicLogicalQueueMapping(toBrokerAddr, topic);
logicalQueueRouteDataList = info.get(logicalQueueIdx);
assertThat(logicalQueueRouteDataList).hasSize(2);
}
for (int i = 0; i < MSG_SENT_TIMES; i++) {
SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx, msgIdx++).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue);
SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult;
assertThat(sendResult2.getOrigBrokerName()).isEqualTo(fromBrokerName);
assertThat(sendResult2.getOrigQueueId()).isEqualTo(logicalQueueIdx);
}
LogicalQueueRouteData logicalQueueRouteData1;
LogicalQueueRouteData logicalQueueRouteData2;
{
List<LogicalQueueRouteData> logicalQueueRouteDataList;
topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
logicalQueueRouteDataList = topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx);
assertThat(logicalQueueRouteDataList).hasSize(3);
logicalQueueRouteData1 = logicalQueueRouteDataList.get(0);
assertThat(logicalQueueRouteData1.getLogicalQueueDelta()).isEqualTo(0);
assertThat(logicalQueueRouteData1.isReadable()).isTrue();
assertThat(logicalQueueRouteData1.isWritable()).isFalse();
assertThat(logicalQueueRouteData1.isExpired()).isFalse();
assertThat(logicalQueueRouteData1.isWriteOnly()).isFalse();
assertThat(logicalQueueRouteData1.getBrokerName()).isEqualTo(fromBrokerName);
assertThat(logicalQueueRouteData1.getOffsetMax()).isGreaterThanOrEqualTo(0L);
assertThat(logicalQueueRouteData1.getMessagesCount()).isEqualTo(MSG_SENT_TIMES);
assertThat(logicalQueueRouteData1.getFirstMsgTimeMillis()).isGreaterThan(0L);
assertThat(logicalQueueRouteData1.getLastMsgTimeMillis()).isGreaterThan(0L);
logicalQueueRouteData2 = logicalQueueRouteDataList.get(1);
assertThat(logicalQueueRouteData2.getLogicalQueueDelta()).isEqualTo(MSG_SENT_TIMES);
assertThat(logicalQueueRouteData2.isReadable()).isTrue();
assertThat(logicalQueueRouteData2.isWritable()).isFalse();
assertThat(logicalQueueRouteData2.isExpired()).isFalse();
assertThat(logicalQueueRouteData2.isWriteOnly()).isFalse();
assertThat(logicalQueueRouteData2.getBrokerName()).isEqualTo(toBrokerName);
assertThat(logicalQueueRouteData2.getOffsetMax()).isGreaterThanOrEqualTo(0L);
assertThat(logicalQueueRouteData2.getMessagesCount()).isEqualTo(MSG_SENT_TIMES);
assertThat(logicalQueueRouteData2.getFirstMsgTimeMillis()).isGreaterThan(0L);
assertThat(logicalQueueRouteData2.getLastMsgTimeMillis()).isGreaterThan(0L);
LogicalQueueRouteData logicalQueueRouteData3 = logicalQueueRouteDataList.get(2);
assertThat(logicalQueueRouteData3.getLogicalQueueDelta()).isEqualTo(2 * MSG_SENT_TIMES);
assertThat(logicalQueueRouteData3.isReadable()).isTrue();
assertThat(logicalQueueRouteData3.isWritable()).isTrue();
assertThat(logicalQueueRouteData3.isExpired()).isFalse();
assertThat(logicalQueueRouteData3.isWriteOnly()).isFalse();
assertThat(logicalQueueRouteData3.getBrokerName()).isEqualTo(fromBrokerName);
assertThat(logicalQueueRouteData3.getOffsetMax()).isLessThan(0L);
}
msgIdx = 0;
forLoop:
for (long offset = 0L; ; ) {
PullResult pullResult = consumer.pull(migratedMessageQueue, "*", offset, 3 * MSG_SENT_TIMES);
switch (pullResult.getPullStatus()) {
case NO_NEW_MSG:
assertThat(offset).isGreaterThanOrEqualTo(3L * MSG_SENT_TIMES);
break forLoop;
case OFFSET_ILLEGAL:
offset = pullResult.getNextBeginOffset();
break;
default:
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
assertThat(pullResult.getMsgFoundList()).isNotNull();
assertThat(pullResult.getMsgFoundList()).hasSize(MSG_SENT_TIMES);
for (MessageExt msg : pullResult.getMsgFoundList()) {
assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx, msgIdx));
msgIdx++;
assertThat(msg.getQueueOffset()).isEqualTo(offset);
offset++;
}
offset = pullResult.getNextBeginOffset();
break;
}
}
waitAtMost(5, TimeUnit.SECONDS).until(() -> maxOffsetUncommitted(logicalQueueRouteData1.getMessageQueue()) == mqAdminExt.maxOffset(logicalQueueRouteData1.getMessageQueue()));
waitAtMost(5, TimeUnit.SECONDS).until(() -> maxOffsetUncommitted(logicalQueueRouteData2.getMessageQueue()) == mqAdminExt.maxOffset(logicalQueueRouteData2.getMessageQueue()));
// now verify after commit log cleaned, toBroker's first queue route data will be expired too
brokerController = brokerControllerMap.get(logicalQueueRouteData2.getBrokerName());
rotateBrokerCommitLog(brokerController);
deleteCommitLogFiles(brokerController, 1);
{
topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
List<LogicalQueueRouteData> logicalQueueRouteDataList = topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx);
assertThat(logicalQueueRouteDataList).hasSize(2);
assertThat(logicalQueueRouteDataList.get(0)).isEqualToIgnoringGivenFields(new LogicalQueueRouteData(logicalQueueIdx, 0, new MessageQueue(topic, fromBrokerName, logicalQueueIdx), MessageQueueRouteState.ReadOnly, 0, 3, -1, -1, fromBrokerAddr), "firstMsgTimeMillis", "lastMsgTimeMillis");
assertThat(logicalQueueRouteDataList.get(1)).isEqualToComparingFieldByField(new LogicalQueueRouteData(logicalQueueIdx, 2 * MSG_SENT_TIMES, new MessageQueue(topic, fromBrokerName, logicalQueueIdx), MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, fromBrokerAddr));
}
// try pull again, since there is an expired queue route in the middle.
{
int msgCount = 0;
Queue<Integer> wantMsgIdx = new LinkedList<>();
wantMsgIdx.addAll(IntStream.range(0, MSG_SENT_TIMES).boxed().collect(Collectors.toList()));
wantMsgIdx.addAll(IntStream.range(2 * MSG_SENT_TIMES, 3 * MSG_SENT_TIMES).boxed().collect(Collectors.toList()));
forLoop:
for (long offset = mqAdminExt.minOffset(migratedMessageQueue); ; ) {
PullResult pullResult = consumer.pull(migratedMessageQueue, "*", offset, 3 * MSG_SENT_TIMES);
switch (pullResult.getPullStatus()) {
case NO_NEW_MSG:
assertThat(msgCount).as("offset=%d", offset).isEqualTo(2 * MSG_SENT_TIMES);
break forLoop;
case OFFSET_ILLEGAL:
offset = pullResult.getNextBeginOffset();
break;
case FOUND:
msgCount += pullResult.getMsgFoundList().size();
boolean first = true;
for (MessageExt msg : pullResult.getMsgFoundList()) {
assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).as("offset=%d", offset).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx, wantMsgIdx.poll()));
if (first) {
assertThat(msg.getQueueOffset()).isGreaterThanOrEqualTo(offset);
first = false;
} else {
assertThat(msg.getQueueOffset()).isGreaterThan(offset);
}
offset = msg.getQueueOffset();
}
offset = pullResult.getNextBeginOffset();
break;
default:
Assert.fail(String.format(Locale.ENGLISH, "unexpected pull offset=%d status: %s", offset, pullResult));
}
}
}
// rotate first queue route to expired, and pull it
brokerController = brokerControllerMap.get(logicalQueueRouteData1.getBrokerName());
rotateBrokerCommitLog(brokerController);
deleteCommitLogFiles(brokerController, 2);
{
List<LogicalQueueRouteData> logicalQueueRouteDataList;
topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
logicalQueueRouteDataList = topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx);
assertThat(logicalQueueRouteDataList).isEqualTo(Collections.singletonList(new LogicalQueueRouteData(logicalQueueIdx, 2 * MSG_SENT_TIMES, new MessageQueue(topic, fromBrokerName, logicalQueueIdx), MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, fromBrokerAddr)));
}
{
int msgCount = 0;
Queue<Integer> wantMsgIdx = new LinkedList<>();
wantMsgIdx.addAll(IntStream.range(2 * MSG_SENT_TIMES, 3 * MSG_SENT_TIMES).boxed().collect(Collectors.toList()));
forLoop:
for (long offset = mqAdminExt.minOffset(migratedMessageQueue); ; ) {
PullResult pullResult = consumer.pull(migratedMessageQueue, "*", offset, 3 * MSG_SENT_TIMES);
switch (pullResult.getPullStatus()) {
case NO_NEW_MSG:
if (msgCount != MSG_SENT_TIMES) {
Assert.fail(String.format(Locale.ENGLISH, "want %d msg but got %d", MSG_SENT_TIMES, msgCount));
}
break forLoop;
case OFFSET_ILLEGAL:
offset = pullResult.getNextBeginOffset();
break;
case FOUND:
msgCount += pullResult.getMsgFoundList().size();
boolean first = true;
for (MessageExt msg : pullResult.getMsgFoundList()) {
assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).as("offset=%d", offset).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx, wantMsgIdx.poll()));
if (first) {
assertThat(msg.getQueueOffset()).isGreaterThanOrEqualTo(offset);
first = false;
} else {
assertThat(msg.getQueueOffset()).isGreaterThan(offset);
}
offset = msg.getQueueOffset();
}
offset = pullResult.getNextBeginOffset();
break;
default:
Assert.fail(String.format(Locale.ENGLISH, "unexpected pull offset=%d status: %s", offset, pullResult));
}
}
}
brokerController = brokerControllerMap.get(fromBrokerName);
rotateBrokerCommitLog(brokerController);
deleteCommitLogFiles(brokerController, 1);
{
forLoop:
for (long offset = mqAdminExt.minOffset(migratedMessageQueue); ; ) {
PullResult pullResult = consumer.pull(migratedMessageQueue, "*", offset, 3 * MSG_SENT_TIMES);
// commit log rotate and cleaned, so there is no message.
switch (pullResult.getPullStatus()) {
case NO_MATCHED_MSG:
case NO_NEW_MSG:
assertThat(pullResult.getNextBeginOffset()).isEqualTo(3 * MSG_SENT_TIMES);
break forLoop;
case OFFSET_ILLEGAL:
offset = pullResult.getNextBeginOffset();
break;
default:
Assert.fail(String.format(Locale.ENGLISH, "unexpected pull offset=%d status: %s", offset, pullResult));
}
}
}
{
LogicalQueuesInfo logicalQueuesInfo = mqAdminExt.queryTopicLogicalQueueMapping(brokerController.getBrokerAddr(), topic);
List<LogicalQueueRouteData> logicalQueueRouteDataList = logicalQueuesInfo.get(logicalQueueIdx);
assertThat(logicalQueueRouteDataList).isEqualTo(Collections.singletonList(new LogicalQueueRouteData(logicalQueueIdx, 2 * MSG_SENT_TIMES, new MessageQueue(topic, fromBrokerName, logicalQueueIdx), MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, fromBrokerAddr)));
}
// try migrate to this broker which has a expired queue, expect it will reuse the expired one, pull it to verify if delta works well
new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, logicalQueueIdx, toBrokerName, null);
{
List<LogicalQueueRouteData> logicalQueueRouteDataList;
topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
logicalQueueRouteDataList = topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx);
assertThat(logicalQueueRouteDataList).isEqualTo(Arrays.asList(
new LogicalQueueRouteData(logicalQueueIdx, 2 * MSG_SENT_TIMES, new MessageQueue(topic, fromBrokerName, logicalQueueIdx), MessageQueueRouteState.Expired, MSG_SENT_TIMES, 2 * MSG_SENT_TIMES, 0, 0, fromBrokerAddr)
, new LogicalQueueRouteData(logicalQueueIdx, 3 * MSG_SENT_TIMES, new MessageQueue(topic, toBrokerName, QUEUE_NUMBERS), MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, toBrokerAddr)
));
LogicalQueuesInfo info;
info = mqAdminExt.queryTopicLogicalQueueMapping(fromBrokerAddr, topic);
logicalQueueRouteDataList = info.get(logicalQueueIdx);
assertThat(logicalQueueRouteDataList).isEqualTo(Arrays.asList(
new LogicalQueueRouteData(logicalQueueIdx, 2 * MSG_SENT_TIMES, new MessageQueue(topic, fromBrokerName, logicalQueueIdx), MessageQueueRouteState.Expired, MSG_SENT_TIMES, 2 * MSG_SENT_TIMES, 0, 0, fromBrokerAddr)
, new LogicalQueueRouteData(logicalQueueIdx, 3 * MSG_SENT_TIMES, new MessageQueue(topic, toBrokerName, QUEUE_NUMBERS), MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, toBrokerAddr)
));
info = mqAdminExt.queryTopicLogicalQueueMapping(toBrokerAddr, topic);
logicalQueueRouteDataList = info.get(logicalQueueIdx);
assertThat(logicalQueueRouteDataList).isEqualTo(Collections.singletonList(new LogicalQueueRouteData(logicalQueueIdx, 3 * MSG_SENT_TIMES, new MessageQueue(topic, toBrokerName, QUEUE_NUMBERS), MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, toBrokerAddr)));
}
msgIdx = 3 * MSG_SENT_TIMES;
for (int i = 0; i < MSG_SENT_TIMES; i++) {
SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx, msgIdx++).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue);
SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult;
assertThat(sendResult2.getOrigBrokerName()).isEqualTo(toBrokerName);
assertThat(sendResult2.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS);
}
{
int msgCount = 0;
Queue<Integer> wantMsgIdx = new LinkedList<>();
wantMsgIdx.addAll(IntStream.range(3 * MSG_SENT_TIMES, 4 * MSG_SENT_TIMES).boxed().collect(Collectors.toList()));
LOOP:
for (long offset = 0L; ; ) {
PullResult pullResult = consumer.pull(migratedMessageQueue, "*", offset, 3 * MSG_SENT_TIMES);
switch (pullResult.getPullStatus()) {
case NO_NEW_MSG:
assertThat(msgCount).as("msgCount with offset=%d", offset).isEqualTo(MSG_SENT_TIMES);
break LOOP;
case OFFSET_ILLEGAL:
assertThat(pullResult.getNextBeginOffset()).isNotEqualTo(Long.MIN_VALUE);
offset = pullResult.getNextBeginOffset();
break;
case FOUND:
msgCount += pullResult.getMsgFoundList().size();
boolean first = true;
for (MessageExt msg : pullResult.getMsgFoundList()) {
assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).as("offset=%d", offset).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx, wantMsgIdx.poll()));
if (first) {
assertThat(msg.getQueueOffset()).isGreaterThanOrEqualTo(offset);
first = false;
} else {
assertThat(msg.getQueueOffset()).isGreaterThan(offset);
}
offset = msg.getQueueOffset();
}
offset = pullResult.getNextBeginOffset();
break;
default:
Assert.fail(String.format(Locale.ENGLISH, "unexpected pull offset=%d status: %s", offset, pullResult));
}
}
}
}
@Test
public void test006_LogicalQueueNumChanged() throws Exception {
String methodName = getCurrentMethodName();
int logicalQueueNum = brokerNum * QUEUE_NUMBERS;
List<MessageQueue> publishMessageQueues;
publishMessageQueues = producer.fetchPublishMessageQueues(topic);
assertThat(publishMessageQueues).hasSize(logicalQueueNum);
List<MessageQueue> subscribeMessageQueues;
subscribeMessageQueues = consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList());
assertThat(subscribeMessageQueues).hasSize(logicalQueueNum);
logicalQueueNum++;
new UpdateTopicLogicalQueueNumCommand().execute(mqAdminExt, clusterName, topic, logicalQueueNum);
int newAddLogicalQueueIdx = logicalQueueNum - 1;
MessageQueue newAddLogicalQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, newAddLogicalQueueIdx);
String newAddLogicalQueueBrokerName;
{
TopicRouteData topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
LogicalQueuesInfo info = topicRouteInfo.getLogicalQueuesInfo();
assertThat(info).isNotNull();
List<LogicalQueueRouteData> queueRouteDataList = info.get(newAddLogicalQueueIdx);
assertThat(queueRouteDataList).isNotNull();
assertThat(queueRouteDataList).hasSize(1);
LogicalQueueRouteData queueRouteData = queueRouteDataList.get(0);
newAddLogicalQueueBrokerName = queueRouteData.getBrokerName();
assertThat(queueRouteData.getState()).isEqualTo(MessageQueueRouteState.Normal);
assertThat(queueRouteData.getLogicalQueueDelta()).isEqualTo(0);
assertThat(queueRouteData.getLogicalQueueIndex()).isEqualTo(newAddLogicalQueueIdx);
}
publishMessageQueues = producer.fetchPublishMessageQueues(topic);
assertThat(publishMessageQueues).hasSize(logicalQueueNum);
Set<Integer> logicalQueueIds = IntStream.range(0, logicalQueueNum).boxed().collect(Collectors.toSet());
Map<String, Set<Integer>> queueIds = Maps.newHashMap();
for (String brokerName : Arrays.asList(broker1Name, broker2Name)) {
queueIds.put(brokerName, IntStream.range(0, QUEUE_NUMBERS).boxed().collect(Collectors.toSet()));
}
queueIds.get(newAddLogicalQueueBrokerName).add(QUEUE_NUMBERS);
for (MessageQueue messageQueue : publishMessageQueues) {
assertThat(messageQueue.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
assertThat(logicalQueueIds.remove(messageQueue.getQueueId())).isTrue();
for (int i = 0; i < MSG_SENT_TIMES; i++) {
SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-%d-%d", methodName, messageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), messageQueue);
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(messageQueue.getBrokerName());
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(messageQueue.getQueueId());
if (i == 0) {
SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult;
assertThat(queueIds.get(sendResult2.getOrigBrokerName()).remove(sendResult2.getOrigQueueId())).as("brokerName %s queueId %d", sendResult2.getOrigBrokerName(), sendResult2.getOrigQueueId()).isTrue();
}
}
}
assertThat(logicalQueueIds).isEmpty();
subscribeMessageQueues = consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList());
assertThat(subscribeMessageQueues).hasSize(logicalQueueNum);
subscribeMessageQueues.sort(Comparator.comparingInt(MessageQueue::getQueueId));
logicalQueueIds.addAll(IntStream.range(0, logicalQueueNum).boxed().collect(Collectors.toSet()));
for (MessageQueue messageQueue : subscribeMessageQueues) {
assertThat(messageQueue.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
assertThat(logicalQueueIds.remove(messageQueue.getQueueId())).isTrue();
long offset = mqAdminExt.minOffset(messageQueue);
assertThat(offset).isEqualTo(0);
PullResult pullResult = consumer.pull(messageQueue, "*", offset, 10);
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
assertThat(pullResult.getMsgFoundList()).hasSize(MSG_SENT_TIMES);
for (int i = 0; i < MSG_SENT_TIMES; i++) {
MessageExt msg = pullResult.getMsgFoundList().get(i);
assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
assertThat(msg.getQueueId()).isEqualTo(messageQueue.getQueueId());
assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d", methodName, messageQueue.getQueueId(), i));
assertThat(msg.getQueueOffset()).isEqualTo(offset + i);
}
assertThat(maxOffsetUncommitted(messageQueue)).isEqualTo(offset + MSG_SENT_TIMES);
}
assertThat(logicalQueueIds).isEmpty();
// increase TopicConfig write queue first then increase logical queue, expect to reuse
String broker2Addr = brokerController2.getBrokerAddr();
TopicConfig topicConfig = mqAdminExt.examineTopicConfig(broker2Addr, topic);
topicConfig.setWriteQueueNums(topicConfig.getWriteQueueNums() + 1);
topicConfig.setReadQueueNums(topicConfig.getReadQueueNums() + 1);
mqAdminExt.createAndUpdateTopicConfig(broker2Addr, topicConfig);
logicalQueueNum++;
new UpdateTopicLogicalQueueNumCommand().execute(mqAdminExt, clusterName, topic, logicalQueueNum);
{
newAddLogicalQueueIdx = logicalQueueNum -1;
TopicRouteData topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
LogicalQueuesInfo info = topicRouteInfo.getLogicalQueuesInfo();
assertThat(info).isNotNull();
List<LogicalQueueRouteData> queueRouteDataList = info.get(newAddLogicalQueueIdx);
assertThat(queueRouteDataList).isNotNull();
assertThat(queueRouteDataList).hasSize(1);
LogicalQueueRouteData queueRouteData = queueRouteDataList.get(0);
assertThat(queueRouteData.getState()).isEqualTo(MessageQueueRouteState.Normal);
assertThat(queueRouteData.getLogicalQueueDelta()).isEqualTo(0);
assertThat(queueRouteData.getLogicalQueueIndex()).isEqualTo(newAddLogicalQueueIdx);
assertThat(queueRouteData.getBrokerName()).isEqualTo(broker2Name);
assertThat(queueRouteData.getQueueId()).isEqualTo(topicConfig.getWriteQueueNums() -1);
}
logicalQueueNum-=2;
new UpdateTopicLogicalQueueNumCommand().execute(mqAdminExt, clusterName, topic, logicalQueueNum);
try {
producer.send(new Message(topic, "aaa".getBytes(StandardCharsets.UTF_8)), newAddLogicalQueue);
Assert.fail("write to decreased logical queue success, want it failed");
} catch (MQBrokerException e) {
assertThat(e.getResponseCode()).isEqualTo(ResponseCode.NO_PERMISSION);
}
{
int offset = 0;
PullResult pullResult = consumer.pull(newAddLogicalQueue, "*", offset, 10);
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
assertThat(pullResult.getMsgFoundList()).hasSize(MSG_SENT_TIMES);
for (int i = 0; i < MSG_SENT_TIMES; i++) {
MessageExt msg = pullResult.getMsgFoundList().get(i);
assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
assertThat(msg.getQueueId()).isEqualTo(newAddLogicalQueue.getQueueId());
assertThat(new String(msg.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d", methodName, newAddLogicalQueue.getQueueId(), i));
assertThat(msg.getQueueOffset()).isEqualTo(offset + i);
}
}
// rotate to remove new add queue's data, and try pull again
{
BrokerController brokerController = brokerControllerMap.get(newAddLogicalQueueBrokerName);
rotateBrokerCommitLog(brokerController);
deleteCommitLogFiles(brokerController, 1);
}
{
int offset = 0;
PullResult pullResult = consumer.pull(newAddLogicalQueue, "*", offset, 10);
assertThat(pullResult.getPullStatus()).isIn(PullStatus.NO_NEW_MSG, PullStatus.NO_MATCHED_MSG);
}
}
@Test
public void test007_LogicalQueueWritableEvenBrokerDown() throws Exception {
final String methodName = getCurrentMethodName();
final int logicalQueueIdx = 1;
BrokerController brokerController3 = IntegrationTestBase.createAndStartBroker(nsAddr);
String broker3Name = brokerController3.getBrokerConfig().getBrokerName();
brokerControllerMap.put(broker3Name, brokerController3);
await().atMost(30, TimeUnit.SECONDS).until(() -> mqAdminExt.examineBrokerClusterInfo().getBrokerAddrTable().containsKey(broker3Name));
mqAdminExt.createAndUpdateTopicConfig(brokerController3.getBrokerAddr(), new TopicConfig(topic, 0, 0, PermName.PERM_READ | PermName.PERM_WRITE));
new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, logicalQueueIdx, brokerController3.getBrokerConfig().getBrokerName(), null);
MessageQueue migrateMessageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, logicalQueueIdx);
{
for (int i = 0; i < MSG_SENT_TIMES; i++) {
SendResult sendResult = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-%d-%d", methodName, migrateMessageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), migrateMessageQueue);
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migrateMessageQueue.getBrokerName());
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migrateMessageQueue.getQueueId());
SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult;
assertThat(sendResult2.getOrigBrokerName()).isEqualTo(broker3Name);
assertThat(sendResult2.getOrigQueueId()).isEqualTo(0);
}
}
brokerController3.shutdown();
brokerControllerMap.remove(broker3Name);
assertThatThrownBy(() -> {
SendResult sendResult = producer.send(new Message(topic, "aaa".getBytes(StandardCharsets.UTF_8)), migrateMessageQueue);
logger.error("send should fail but got {}", sendResult);
}).isInstanceOf(RemotingException.class).hasMessageMatching("connect to [0-9.:]+ failed");
assertThatThrownBy(() -> {
new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, logicalQueueIdx, broker1Name, null);
}).hasRootCauseInstanceOf(RemotingConnectException.class).hasMessageContaining("migrateTopicLogicalQueuePrepare");
{
SendResult sendResult = producer.send(new Message(topic, "aaa".getBytes(StandardCharsets.UTF_8)), migrateMessageQueue);
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migrateMessageQueue.getBrokerName());
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migrateMessageQueue.getQueueId());
assertThat(sendResult.getQueueOffset()).isEqualTo(-1);
SendResultForLogicalQueue sendResult2 = (SendResultForLogicalQueue) sendResult;
assertThat(sendResult2.getOrigBrokerName()).isEqualTo(broker1Name);
assertThat(sendResult2.getOrigQueueId()).isIn(
/* CommitLog not rotated, will not reuse */QUEUE_NUMBERS,
/* CommitLog rotated in other test cases, will reuse */logicalQueueIdx
);
}
}
private static String getBrokerCommitLogFileName(BrokerController brokerController) throws IllegalAccessException {
DefaultMessageStore defaultMessageStore = (DefaultMessageStore) brokerController.getMessageStore();
MappedFileQueue mfq = (MappedFileQueue) FieldUtils.readDeclaredField(defaultMessageStore.getCommitLog(), "mappedFileQueue", true);
return mfq.getLastMappedFile().getFileName();
}
private static void deleteCommitLogFiles(BrokerController brokerController,
int keepNum) throws IllegalAccessException {
CommitLog commitLog = ((DefaultMessageStore) brokerController.getMessageStore()).getCommitLog();
commitLog.flush();
MappedFileQueue mfq = (MappedFileQueue) FieldUtils.readDeclaredField(commitLog, "mappedFileQueue", true);
AtomicInteger count = new AtomicInteger();
waitAtMost(5, TimeUnit.SECONDS).until(() -> {
count.getAndAdd(commitLog.deleteExpiredFile(0, 0, 5000, true, 1));
return mfq.getMappedFiles().size() <= keepNum;
});
brokerController.getTopicConfigManager().getLogicalQueueCleanHook().execute((DefaultMessageStore) brokerController.getMessageStore(), count.get());
logger.info("deleteCommitLogFiles {} count {}", brokerController.getBrokerConfig().getBrokerName(), count.get());
}
private static void rotateBrokerCommitLog(BrokerController brokerController) throws IllegalAccessException {
CommitLog commitLog = ((DefaultMessageStore) brokerController.getMessageStore()).getCommitLog();
commitLog.flush();
String brokerName = brokerController.getBrokerConfig().getBrokerName();
String fileName1 = getBrokerCommitLogFileName(brokerController);
logger.info("rotateBrokerCommitLog {} first {}", brokerName, fileName1);
int msgSize = 4 * 1024;
byte[] data = RandomStringUtils.randomAscii(msgSize).getBytes(StandardCharsets.UTF_8);
Message msg = new Message(placeholderTopic, data);
MessageQueue mq = new MessageQueue(placeholderTopic, brokerName, 0);
waitAtMost(5, TimeUnit.SECONDS).until(() -> {
for (int i = 0; i < 128; i++) {
producer.send(msg, mq);
}
commitLog.flush();
String fileName2 = getBrokerCommitLogFileName(brokerController);
if (!fileName1.equals(fileName2)) {
logger.info("rotateBrokerCommitLog {} 4K msg last {}", brokerName, fileName2);
return true;
}
return false;
});
}
private long maxOffsetUncommitted(MessageQueue mq) throws IllegalAccessException, MQClientException {
DefaultMQAdminExtImpl defaultMQAdminExtImpl = (DefaultMQAdminExtImpl) FieldUtils.readDeclaredField(mqAdminExt, "defaultMQAdminExtImpl", true);
return defaultMQAdminExtImpl.maxOffset(mq, false);
}
}
package org.apache.rocketmq.test.smoke;
import org.apache.log4j.Logger;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig;
@FixMethodOrder
public class StaticTopicIT extends BaseConf {
private static Logger logger = Logger.getLogger(StaticTopicIT.class);
private DefaultMQAdminExt defaultMQAdminExt;
private ClientMetadata clientMetadata;
@Before
public void setUp() throws Exception {
defaultMQAdminExt = getAdmin(nsAddr);
waitBrokerRegistered(nsAddr, clusterName);
clientMetadata = new ClientMetadata();
defaultMQAdminExt.start();
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
}
@Test
public void testCreateStaticTopic() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
int queueNum = 10;
Set<String> brokers = getBrokers();
//create topic
{
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
Assert.assertTrue(brokerConfigMap.isEmpty());
TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, getBrokers(), brokerConfigMap);
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
}
}
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
}
@After
public void tearDown() {
super.shutdown();
}
}
......@@ -28,6 +28,8 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
......@@ -220,6 +222,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
return defaultMQAdminExtImpl.examineTopicConfig(addr, topic);
}
@Override
public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.defaultMQAdminExtImpl.examineTopicConfigAll(clientMetadata, topic);
}
@Override
public TopicStatsTable examineTopicStats(
String topic) throws RemotingException, MQClientException, InterruptedException,
......@@ -665,6 +672,12 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
}
@Override
public void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQAdminExtImpl.remappingStaticTopic(clientMetadata, topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, blockSeqSize, force);
}
@Override public void migrateTopicLogicalQueueNotify(String brokerAddr,
LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
......
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.tools.admin;
import java.io.UnsupportedEncodingException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
......@@ -29,6 +30,9 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
......@@ -42,6 +46,9 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
......@@ -82,6 +89,9 @@ import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -201,8 +211,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
}
......@@ -248,7 +257,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicConfig(addr, topic, timeoutMillis);
}
......@@ -1100,6 +1109,130 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
}
@Override
public void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
for (String broker : brokerConfigMap.keySet()) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
}
}
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step2: forbid the write of old leader
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step3: decide the logic offset
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicStatsTable statsTable = examineTopicStats(addr, topic);
TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
ImmutableList<LogicQueueMappingItem> items = entry.getValue();
Integer globalId = entry.getKey();
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem newLeader = items.get(items.size() - 1);
LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
if (newLeader.getLogicOffset() > 0) {
continue;
}
TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
if (topicOffset == null) {
throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
}
//TODO check the max offset, will it return -1?
if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
}
newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize));
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
//fresh the new leader
mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
}
}
//Step4: write to the new leader with logic offset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
}
@Override
public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
try {
TopicRouteData routeData = examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
try {
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
} catch (MQClientException exception) {
if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw exception;
}
}
}
}
} catch (MQClientException exception) {
if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw exception;
}
log.info("The topic {} dose not exist in nameserver, so check it from all brokers", topic);
//if cannot get from nameserver, then check all the brokers
try {
ClusterInfo clusterInfo = examineBrokerClusterInfo();
if (clusterInfo != null
&& clusterInfo.getBrokerAddrTable() != null) {
clientMetadata.refreshClusterInfo(clusterInfo);
}
}catch (MQBrokerException e) {
throw new MQClientException(e.getResponseCode(), e.getMessage());
}
for (Entry<String, HashMap<Long, String>> entry : clientMetadata.getBrokerAddrTable().entrySet()) {
String bname = entry.getKey();
HashMap<Long, String> map = entry.getValue();
String addr = map.get(MixAll.MASTER_ID);
if (addr != null) {
try {
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
} catch (MQClientException clientException) {
if (clientException.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw clientException;
}
}
}
}
}
return brokerConfigMap;
}
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
......
......@@ -21,12 +21,18 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
......@@ -54,6 +60,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
......@@ -102,7 +109,7 @@ public interface MQAdminExt extends MQAdmin {
SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException;
TopicConfig examineTopicConfig(final String addr,
final String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
final String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
TopicStatsTable examineTopicStats(
final String topic) throws RemotingException, MQClientException, InterruptedException,
......@@ -337,8 +344,11 @@ public interface MQAdminExt extends MQAdmin {
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQClientException;
Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException;
void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
void migrateTopicLogicalQueueNotify(String brokerAddr, LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException;
......
......@@ -40,6 +40,7 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
......@@ -121,7 +122,13 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) {
force = true;
}
doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force);
for (String broker : wrapper.getBrokerConfigMap().keySet()) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
}
}
defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), 10000, force);
return;
}catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......@@ -131,68 +138,9 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
}
public void doRemapping(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap,
ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step2: forbid the write of old leader
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step3: decide the logic offset
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic);
TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
ImmutableList<LogicQueueMappingItem> items = entry.getValue();
Integer globalId = entry.getKey();
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem newLeader = items.get(items.size() - 1);
LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
if (newLeader.getLogicOffset() > 0) {
continue;
}
TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
if (topicOffset == null) {
throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
}
//TODO check the max offset, will it return -1?
if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
}
newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), 10000));
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
//fresh the new leader
mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
}
}
//Step4: write to the new leader with logic offset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
}
@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
if (!commandLine.hasOption('t')) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
......@@ -250,120 +198,25 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
}
}
//get the existed topic config and mapping
{
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
}
}
}
brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
if (brokerConfigMap.isEmpty()) {
throw new RuntimeException("No topic route to do the remapping");
}
final Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
//the check is ok, now do the mapping allocation
int maxNum = maxEpochAndNum.getValue();
long maxEpoch = maxEpochAndNum.getKey();
Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
{
TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, maxEpoch, new HashMap<>(), brokerConfigMap);
TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), brokerConfigMap, new HashSet<String>(), new HashSet<String>());
String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
System.out.println("The old mapping data is written to file " + oldMappingDataFile);
}
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), targetBrokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
allocator.upToNum(maxNum);
Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
Map<Integer, String> expectedIdToBroker = new HashMap<>();
//the following logic will make sure that, for one broker, either "map in" or "map out"
//It can't both, map in some queues but also map out some queues.
globalIdMap.forEach((queueId, mappingOne) -> {
String leaderBroker = mappingOne.getBname();
if (expectedBrokerNumMap.containsKey(leaderBroker)) {
if (expectedBrokerNumMap.get(leaderBroker) > 0) {
expectedIdToBroker.put(queueId, leaderBroker);
expectedBrokerNumMap.put(leaderBroker, expectedBrokerNumMap.get(leaderBroker) - 1);
} else {
waitAssignQueues.add(queueId);
expectedBrokerNumMap.remove(leaderBroker);
}
} else {
waitAssignQueues.add(queueId);
}
});
expectedBrokerNumMap.forEach((broker, queueNum) -> {
for (int i = 0; i < queueNum; i++) {
Integer queueId = waitAssignQueues.poll();
assert queueId != null;
expectedIdToBroker.put(queueId, broker);
}
});
long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
//Now construct the remapping info
Set<String> brokersToMapOut = new HashSet<>();
Set<String> brokersToMapIn = new HashSet<>();
for (Map.Entry<Integer, String> mapEntry : expectedIdToBroker.entrySet()) {
Integer queueId = mapEntry.getKey();
String broker = mapEntry.getValue();
TopicQueueMappingOne topicQueueMappingOne = globalIdMap.get(queueId);
assert topicQueueMappingOne != null;
if (topicQueueMappingOne.getBname().equals(broker)) {
continue;
}
//remapping
String mapInBroker = broker;
String mapOutBroker = topicQueueMappingOne.getBname();
brokersToMapIn.add(mapInBroker);
brokersToMapOut.add(mapOutBroker);
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(mapInBroker);
TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(mapOutBroker);
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
List<LogicQueueMappingItem> items = new ArrayList<>(topicQueueMappingOne.getItems());
LogicQueueMappingItem last = items.get(items.size() - 1);
items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1));
ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
//Use the same object
mapInConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
}
brokerConfigMap.values().forEach(configMapping -> {
configMapping.getMappingDetail().setEpoch(newEpoch);
configMapping.getMappingDetail().setTotalQueues(maxNum);
});
//double check
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(brokerConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true);
TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers);
{
TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, expectedIdToBroker, brokerConfigMap);
newWrapper.setBrokerToMapIn(brokersToMapIn);
newWrapper.setBrokerToMapOut(brokersToMapOut);
String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
System.out.println("The old mapping data is written to file " + newMappingDataFile);
}
doRemapping(topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, clientMetadata, defaultMQAdminExt, false);
defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, newWrapper.getBrokerToMapIn(), newWrapper.getBrokerToMapOut(), newWrapper.getBrokerConfigMap(), 10000, false);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......
......@@ -120,6 +120,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force);
return;
}catch (Exception e) {
......@@ -130,6 +131,13 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
public void doUpdate(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
//check it before
for (String broker : brokerConfigMap.keySet()) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
}
}
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey();
......@@ -158,7 +166,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
ClientMetadata clientMetadata = new ClientMetadata();
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
Set<String> targetBrokers = new HashSet<>();
try {
......@@ -202,101 +209,30 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
//get the existed topic config and mapping
brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
{
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
}
}
}
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
if (!brokerConfigMap.isEmpty()) {
maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
}
if (queueNum < globalIdMap.size()) {
throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
}
//check the queue number
if (queueNum == globalIdMap.size()) {
throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing");
}
{
TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), new HashMap<>(), brokerConfigMap);
TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), brokerConfigMap, new HashSet<String>(), new HashSet<String>());
String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
System.out.println("The old mapping data is written to file " + oldMappingDataFile);
}
//the check is ok, now do the mapping allocation
Map<String, Integer> brokerNumMap = targetBrokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
final Map<Integer, String> oldIdToBroker = new HashMap<>();
globalIdMap.forEach((key, value) -> {
String leaderbroker = value.getBname();
oldIdToBroker.put(key, leaderbroker);
if (!brokerNumMap.containsKey(leaderbroker)) {
brokerNumMap.put(leaderbroker, 1);
} else {
brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1);
}
});
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap);
allocator.upToNum(queueNum);
Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
//construct the topic configAndMapping
long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
Integer queueId = e.getKey();
String broker = e.getValue();
if (globalIdMap.containsKey(queueId)) {
//ignore the exited
continue;
}
TopicConfigAndQueueMapping configMapping;
if (!brokerConfigMap.containsKey(broker)) {
configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
configMapping.setWriteQueueNums(1);
configMapping.setReadQueueNums(1);
brokerConfigMap.put(broker, configMapping);
} else {
configMapping = brokerConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
}
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
}
// set the topic config
brokerConfigMap.values().forEach(configMapping -> {
configMapping.getMappingDetail().setEpoch(newEpoch);
configMapping.getMappingDetail().setTotalQueues(queueNum);
});
//double check the config
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
//calculate the new data
TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
{
TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, newIdToBroker, brokerConfigMap);
String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
System.out.println("The new mapping data is written to file " + newMappingDataFile);
}
doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt, false);
doUpdate(newWrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, false);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......
......@@ -150,13 +150,7 @@ public class DefaultMQAdminExtTest {
topicRouteData.setBrokerDatas(brokerDatas);
topicRouteData.setQueueDatas(new ArrayList<QueueData>());
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
LogicalQueuesInfo logicalQueuesInfoinfo = new LogicalQueuesInfo();
logicalQueuesInfoinfo.put(0, Lists.newArrayList(
new LogicalQueueRouteData(0, 0, new MessageQueue(topic1, broker1Name, 0), MessageQueueRouteState.ReadOnly, 0, 1000, 2000, 3000, broker1Addr),
new LogicalQueueRouteData(0, 1000, new MessageQueue(topic1, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr)
));
topicRouteData.setLogicalQueuesInfo(logicalQueuesInfoinfo);
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), any())).thenReturn(topicRouteData);
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
HashMap<String, String> result = new HashMap<>();
result.put("id", String.valueOf(MixAll.MASTER_ID));
......