...
 
Commits (4)
    https://gitcode.net/apacherocketmq/rocketmq/-/commit/3d68590377079e290c8c5f34b9a8834f70da02c4 Polish the code 2021-11-23T11:58:04+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/2f4bf29603aec75be48aa4bd7d2570b10ed3bf2c Polish the code structure for static topic command 2021-11-23T15:00:01+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/e93536a9aeb42f9695767fe28d43b1a229265a67 Add IT Test for static topic 2021-11-23T20:21:28+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/fa39815666187647b6990af7a8a98e1779eb065d Catch the exception 2021-11-23T21:16:26+08:00 dongeforever dongeforever@apache.org
......@@ -1880,7 +1880,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(requestHeader.getTopic());
if (topicConfig == null) {
log.error("No topic in this broker, client: {} topic: {}", ctx.channel().remoteAddress(), requestHeader.getTopic());
response.setCode(ResponseCode.SYSTEM_ERROR);
//be care of the response code, should set "not-exist" explictly
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic in this broker. topic: " + requestHeader.getTopic());
return response;
}
......
......@@ -332,7 +332,7 @@ public class MQClientAPIImpl {
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
throws RemotingException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
......@@ -2708,7 +2708,7 @@ public class MQClientAPIImpl {
public TopicConfigAndQueueMapping getTopicConfig(final String brokerAddr, String topic,
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic);
header.setWithMapping(true);
......@@ -2720,15 +2720,19 @@ public class MQClientAPIImpl {
case ResponseCode.SUCCESS: {
return RemotingSerializable.decode(response.getBody(), TopicConfigAndQueueMapping.class);
}
//should check the exist
case ResponseCode.TOPIC_NOT_EXIST: {
//should return null?
break;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
throw new MQClientException(response.getCode(), response.getRemark());
}
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail, boolean force,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
......@@ -2753,6 +2757,6 @@ public class MQClientAPIImpl {
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
throw new MQClientException(response.getCode(), response.getRemark());
}
}
......@@ -91,6 +91,10 @@ public class ClientMetadata {
return brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID);
}
public ConcurrentMap<String, HashMap<Long, String>> getBrokerAddrTable() {
return brokerAddrTable;
}
public static ConcurrentMap<MessageQueue, String> topicRouteData2EndpointsForStaticTopic(final String topic, final TopicRouteData route) {
if (route.getTopicQueueMappingByBroker() == null
|| route.getTopicQueueMappingByBroker().isEmpty()) {
......@@ -112,7 +116,7 @@ public class ClientMetadata {
for (Map.Entry<String, TopicQueueMappingInfo> entry : mappingInfos) {
TopicQueueMappingInfo info = entry.getValue();
if (info.getEpoch() >= maxTotalNumOfEpoch && info.getTotalQueues() > maxTotalNums) {
maxTotalNums = entry.getValue().getTotalQueues();
maxTotalNums = info.getTotalQueues();
}
for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
int globalId = idEntry.getKey();
......
......@@ -18,17 +18,23 @@ package org.apache.rocketmq.common.statictopic;
import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import java.io.File;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
public class TopicQueueMappingUtils {
......@@ -306,6 +312,167 @@ public class TopicQueueMappingUtils {
}
}
public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<Long, Integer>(System.currentTimeMillis(), queueNum);
if (!brokerConfigMap.isEmpty()) {
maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
}
if (queueNum < globalIdMap.size()) {
throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
}
//check the queue number
if (queueNum == globalIdMap.size()) {
throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing");
}
//the check is ok, now do the mapping allocation
Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
for (String broker: targetBrokers) {
brokerNumMap.put(broker, 0);
}
final Map<Integer, String> oldIdToBroker = new HashMap<Integer, String>();
for (Map.Entry<Integer, TopicQueueMappingOne> entry : globalIdMap.entrySet()) {
String leaderbroker = entry.getValue().getBname();
oldIdToBroker.put(entry.getKey(), leaderbroker);
if (!brokerNumMap.containsKey(leaderbroker)) {
brokerNumMap.put(leaderbroker, 1);
} else {
brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1);
}
}
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap);
allocator.upToNum(queueNum);
Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
//construct the topic configAndMapping
long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
Integer queueId = e.getKey();
String broker = e.getValue();
if (globalIdMap.containsKey(queueId)) {
//ignore the exited
continue;
}
TopicConfigAndQueueMapping configMapping;
if (!brokerConfigMap.containsKey(broker)) {
configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
configMapping.setWriteQueueNums(1);
configMapping.setReadQueueNums(1);
brokerConfigMap.put(broker, configMapping);
} else {
configMapping = brokerConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
}
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
}
// set the topic config
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
TopicConfigAndQueueMapping configMapping = entry.getValue();
configMapping.getMappingDetail().setEpoch(newEpoch);
configMapping.getMappingDetail().setTotalQueues(queueNum);
}
//double check the config
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, brokerConfigMap, new HashSet<String>(), new HashSet<String>());
}
public static TopicRemappingDetailWrapper remappingStaticTopic(String topic, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, Set<String> targetBrokers) {
final Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
final Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
//the check is ok, now do the mapping allocation
int maxNum = maxEpochAndNum.getValue();
Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
for (String broker: targetBrokers) {
brokerNumMap.put(broker, 0);
}
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap);
allocator.upToNum(maxNum);
Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
Map<Integer, String> expectedIdToBroker = new HashMap<Integer, String>();
//the following logic will make sure that, for one broker, either "map in" or "map out"
//It can't both, map in some queues but also map out some queues.
for (Map.Entry<Integer, TopicQueueMappingOne> entry : globalIdMap.entrySet()) {
Integer queueId = entry.getKey();
TopicQueueMappingOne mappingOne = entry.getValue();
String leaderBroker = mappingOne.getBname();
if (expectedBrokerNumMap.containsKey(leaderBroker)) {
if (expectedBrokerNumMap.get(leaderBroker) > 0) {
expectedIdToBroker.put(queueId, leaderBroker);
expectedBrokerNumMap.put(leaderBroker, expectedBrokerNumMap.get(leaderBroker) - 1);
} else {
waitAssignQueues.add(queueId);
expectedBrokerNumMap.remove(leaderBroker);
}
} else {
waitAssignQueues.add(queueId);
}
}
for (Map.Entry<String, Integer> entry: expectedBrokerNumMap.entrySet()) {
String broker = entry.getKey();
Integer queueNum = entry.getValue();
for (int i = 0; i < queueNum; i++) {
Integer queueId = waitAssignQueues.poll();
assert queueId != null;
expectedIdToBroker.put(queueId, broker);
}
}
long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
//Now construct the remapping info
Set<String> brokersToMapOut = new HashSet<String>();
Set<String> brokersToMapIn = new HashSet<String>();
for (Map.Entry<Integer, String> mapEntry : expectedIdToBroker.entrySet()) {
Integer queueId = mapEntry.getKey();
String broker = mapEntry.getValue();
TopicQueueMappingOne topicQueueMappingOne = globalIdMap.get(queueId);
assert topicQueueMappingOne != null;
if (topicQueueMappingOne.getBname().equals(broker)) {
continue;
}
//remapping
final String mapInBroker = broker;
final String mapOutBroker = topicQueueMappingOne.getBname();
brokersToMapIn.add(mapInBroker);
brokersToMapOut.add(mapOutBroker);
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(mapInBroker);
TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(mapOutBroker);
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
List<LogicQueueMappingItem> items = new ArrayList<LogicQueueMappingItem>(topicQueueMappingOne.getItems());
LogicQueueMappingItem last = items.get(items.size() - 1);
items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1));
ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
//Use the same object
mapInConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
}
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
TopicConfigAndQueueMapping configMapping = entry.getValue();
configMapping.getMappingDetail().setEpoch(newEpoch);
configMapping.getMappingDetail().setTotalQueues(maxNum);
}
//double check
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, brokerConfigMap, brokersToMapIn, brokersToMapOut);
}
}
......@@ -18,7 +18,6 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
private final String topic;
private final String type;
private final long epoch;
private Map<Integer, String> expectedIdToBroker = new HashMap<Integer, String>();
private Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
......@@ -26,12 +25,13 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
private Set<String> brokerToMapOut = new HashSet<String>();
public TopicRemappingDetailWrapper(String topic, String type, long epoch, Map<Integer, String> expectedIdToBroker, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
public TopicRemappingDetailWrapper(String topic, String type, long epoch, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, Set<String> brokerToMapIn, Set<String> brokerToMapOut) {
this.topic = topic;
this.type = type;
this.epoch = epoch;
this.expectedIdToBroker = expectedIdToBroker;
this.brokerConfigMap = brokerConfigMap;
this.brokerToMapIn = brokerToMapIn;
this.brokerToMapOut = brokerToMapOut;
}
public String getTopic() {
......@@ -46,10 +46,6 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
return epoch;
}
public Map<Integer, String> getExpectedIdToBroker() {
return expectedIdToBroker;
}
public Map<String, TopicConfigAndQueueMapping> getBrokerConfigMap() {
return brokerConfigMap;
}
......@@ -58,15 +54,7 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
return brokerToMapIn;
}
public void setBrokerToMapIn(Set<String> brokerToMapIn) {
this.brokerToMapIn = brokerToMapIn;
}
public Set<String> getBrokerToMapOut() {
return brokerToMapOut;
}
public void setBrokerToMapOut(Set<String> brokerToMapOut) {
this.brokerToMapOut = brokerToMapOut;
}
}
......@@ -161,6 +161,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
});
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
......
......@@ -19,8 +19,10 @@ package org.apache.rocketmq.test.base;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
......@@ -120,6 +122,13 @@ public class BaseConf {
return group;
}
public static DefaultMQAdminExt getAdmin(String nsAddr) {
final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(500);
mqAdminExt.setNamesrvAddr(nsAddr);
mqClients.add(mqAdminExt);
return mqAdminExt;
}
public static RMQNormalProducer getProducer(String nsAddr, String topic) {
return getProducer(nsAddr, topic, false);
}
......@@ -197,6 +206,13 @@ public class BaseConf {
shutdown(mqClients);
}
public static Set<String> getBrokers() {
Set<String> brokers = new HashSet<>();
brokers.add(broker1Name);
brokers.add(broker2Name);
return brokers;
}
public static void shutdown(List<Object> mqClients) {
mqClients.forEach(mqClient -> ForkJoinPool.commonPool().execute(() -> {
if (mqClient instanceof AbstractMQProducer) {
......
package org.apache.rocketmq.test.smoke;
import org.apache.log4j.Logger;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig;
@FixMethodOrder
public class StaticTopicIT extends BaseConf {
private static Logger logger = Logger.getLogger(StaticTopicIT.class);
private DefaultMQAdminExt defaultMQAdminExt;
private ClientMetadata clientMetadata;
@Before
public void setUp() throws Exception {
defaultMQAdminExt = getAdmin(nsAddr);
waitBrokerRegistered(nsAddr, clusterName);
clientMetadata = new ClientMetadata();
defaultMQAdminExt.start();
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
}
@Test
public void testCreateStaticTopic() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
int queueNum = 10;
Set<String> brokers = getBrokers();
//create topic
{
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
Assert.assertTrue(brokerConfigMap.isEmpty());
TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, getBrokers(), brokerConfigMap);
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
}
}
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
}
@After
public void tearDown() {
super.shutdown();
}
}
......@@ -28,6 +28,8 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
......@@ -220,6 +222,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
return defaultMQAdminExtImpl.examineTopicConfig(addr, topic);
}
@Override
public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.defaultMQAdminExtImpl.examineTopicConfigAll(clientMetadata, topic);
}
@Override
public TopicStatsTable examineTopicStats(
String topic) throws RemotingException, MQClientException, InterruptedException,
......@@ -665,6 +672,12 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
}
@Override
public void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQAdminExtImpl.remappingStaticTopic(clientMetadata, topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, blockSeqSize, force);
}
@Override public void migrateTopicLogicalQueueNotify(String brokerAddr,
LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
......
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.tools.admin;
import java.io.UnsupportedEncodingException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
......@@ -29,6 +30,9 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
......@@ -42,6 +46,9 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
......@@ -82,6 +89,9 @@ import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -201,8 +211,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
}
......@@ -248,7 +257,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicConfig(addr, topic, timeoutMillis);
}
......@@ -1100,6 +1109,130 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
}
@Override
public void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
for (String broker : brokerConfigMap.keySet()) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
}
}
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step2: forbid the write of old leader
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step3: decide the logic offset
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicStatsTable statsTable = examineTopicStats(addr, topic);
TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
ImmutableList<LogicQueueMappingItem> items = entry.getValue();
Integer globalId = entry.getKey();
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem newLeader = items.get(items.size() - 1);
LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
if (newLeader.getLogicOffset() > 0) {
continue;
}
TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
if (topicOffset == null) {
throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
}
//TODO check the max offset, will it return -1?
if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
}
newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize));
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
//fresh the new leader
mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
}
}
//Step4: write to the new leader with logic offset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
}
@Override
public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
try {
TopicRouteData routeData = examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
try {
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
} catch (MQClientException exception) {
if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw exception;
}
}
}
}
} catch (MQClientException exception) {
if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw exception;
}
log.info("The topic {} dose not exist in nameserver, so check it from all brokers", topic);
//if cannot get from nameserver, then check all the brokers
try {
ClusterInfo clusterInfo = examineBrokerClusterInfo();
if (clusterInfo != null
&& clusterInfo.getBrokerAddrTable() != null) {
clientMetadata.refreshClusterInfo(clusterInfo);
}
}catch (MQBrokerException e) {
throw new MQClientException(e.getResponseCode(), e.getMessage());
}
for (Entry<String, HashMap<Long, String>> entry : clientMetadata.getBrokerAddrTable().entrySet()) {
String bname = entry.getKey();
HashMap<Long, String> map = entry.getValue();
String addr = map.get(MixAll.MASTER_ID);
if (addr != null) {
try {
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
} catch (MQClientException clientException) {
if (clientException.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw clientException;
}
}
}
}
}
return brokerConfigMap;
}
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
......
......@@ -21,12 +21,18 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
......@@ -54,6 +60,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
......@@ -102,7 +109,7 @@ public interface MQAdminExt extends MQAdmin {
SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException;
TopicConfig examineTopicConfig(final String addr,
final String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
final String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
TopicStatsTable examineTopicStats(
final String topic) throws RemotingException, MQClientException, InterruptedException,
......@@ -337,8 +344,11 @@ public interface MQAdminExt extends MQAdmin {
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQClientException;
Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException;
void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
void migrateTopicLogicalQueueNotify(String brokerAddr, LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException;
......
......@@ -40,6 +40,7 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
......@@ -121,7 +122,13 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) {
force = true;
}
doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force);
for (String broker : wrapper.getBrokerConfigMap().keySet()) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
}
}
defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), 10000, force);
return;
}catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......@@ -131,68 +138,9 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
}
public void doRemapping(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap,
ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step2: forbid the write of old leader
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step3: decide the logic offset
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic);
TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
ImmutableList<LogicQueueMappingItem> items = entry.getValue();
Integer globalId = entry.getKey();
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem newLeader = items.get(items.size() - 1);
LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
if (newLeader.getLogicOffset() > 0) {
continue;
}
TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
if (topicOffset == null) {
throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
}
//TODO check the max offset, will it return -1?
if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
}
newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), 10000));
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
//fresh the new leader
mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
}
}
//Step4: write to the new leader with logic offset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
}
@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
if (!commandLine.hasOption('t')) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
......@@ -250,120 +198,25 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
}
}
//get the existed topic config and mapping
{
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
}
}
}
brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
if (brokerConfigMap.isEmpty()) {
throw new RuntimeException("No topic route to do the remapping");
}
final Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
//the check is ok, now do the mapping allocation
int maxNum = maxEpochAndNum.getValue();
long maxEpoch = maxEpochAndNum.getKey();
Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
{
TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, maxEpoch, new HashMap<>(), brokerConfigMap);
TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), brokerConfigMap, new HashSet<String>(), new HashSet<String>());
String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
System.out.println("The old mapping data is written to file " + oldMappingDataFile);
}
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), targetBrokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
allocator.upToNum(maxNum);
Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
Map<Integer, String> expectedIdToBroker = new HashMap<>();
//the following logic will make sure that, for one broker, either "map in" or "map out"
//It can't both, map in some queues but also map out some queues.
globalIdMap.forEach((queueId, mappingOne) -> {
String leaderBroker = mappingOne.getBname();
if (expectedBrokerNumMap.containsKey(leaderBroker)) {
if (expectedBrokerNumMap.get(leaderBroker) > 0) {
expectedIdToBroker.put(queueId, leaderBroker);
expectedBrokerNumMap.put(leaderBroker, expectedBrokerNumMap.get(leaderBroker) - 1);
} else {
waitAssignQueues.add(queueId);
expectedBrokerNumMap.remove(leaderBroker);
}
} else {
waitAssignQueues.add(queueId);
}
});
expectedBrokerNumMap.forEach((broker, queueNum) -> {
for (int i = 0; i < queueNum; i++) {
Integer queueId = waitAssignQueues.poll();
assert queueId != null;
expectedIdToBroker.put(queueId, broker);
}
});
long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
//Now construct the remapping info
Set<String> brokersToMapOut = new HashSet<>();
Set<String> brokersToMapIn = new HashSet<>();
for (Map.Entry<Integer, String> mapEntry : expectedIdToBroker.entrySet()) {
Integer queueId = mapEntry.getKey();
String broker = mapEntry.getValue();
TopicQueueMappingOne topicQueueMappingOne = globalIdMap.get(queueId);
assert topicQueueMappingOne != null;
if (topicQueueMappingOne.getBname().equals(broker)) {
continue;
}
//remapping
String mapInBroker = broker;
String mapOutBroker = topicQueueMappingOne.getBname();
brokersToMapIn.add(mapInBroker);
brokersToMapOut.add(mapOutBroker);
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(mapInBroker);
TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(mapOutBroker);
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
List<LogicQueueMappingItem> items = new ArrayList<>(topicQueueMappingOne.getItems());
LogicQueueMappingItem last = items.get(items.size() - 1);
items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1));
ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
//Use the same object
mapInConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
}
brokerConfigMap.values().forEach(configMapping -> {
configMapping.getMappingDetail().setEpoch(newEpoch);
configMapping.getMappingDetail().setTotalQueues(maxNum);
});
//double check
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(brokerConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true);
TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers);
{
TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, expectedIdToBroker, brokerConfigMap);
newWrapper.setBrokerToMapIn(brokersToMapIn);
newWrapper.setBrokerToMapOut(brokersToMapOut);
String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
System.out.println("The old mapping data is written to file " + newMappingDataFile);
}
doRemapping(topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, clientMetadata, defaultMQAdminExt, false);
defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, newWrapper.getBrokerToMapIn(), newWrapper.getBrokerToMapOut(), newWrapper.getBrokerConfigMap(), 10000, false);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......
......@@ -120,6 +120,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force);
return;
}catch (Exception e) {
......@@ -130,6 +131,13 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
public void doUpdate(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
//check it before
for (String broker : brokerConfigMap.keySet()) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
}
}
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey();
......@@ -158,7 +166,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
ClientMetadata clientMetadata = new ClientMetadata();
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
Set<String> targetBrokers = new HashSet<>();
try {
......@@ -202,101 +209,30 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
//get the existed topic config and mapping
brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
{
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
}
}
}
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
if (!brokerConfigMap.isEmpty()) {
maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
}
if (queueNum < globalIdMap.size()) {
throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
}
//check the queue number
if (queueNum == globalIdMap.size()) {
throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing");
}
{
TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), new HashMap<>(), brokerConfigMap);
TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), brokerConfigMap, new HashSet<String>(), new HashSet<String>());
String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
System.out.println("The old mapping data is written to file " + oldMappingDataFile);
}
//the check is ok, now do the mapping allocation
Map<String, Integer> brokerNumMap = targetBrokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
final Map<Integer, String> oldIdToBroker = new HashMap<>();
globalIdMap.forEach((key, value) -> {
String leaderbroker = value.getBname();
oldIdToBroker.put(key, leaderbroker);
if (!brokerNumMap.containsKey(leaderbroker)) {
brokerNumMap.put(leaderbroker, 1);
} else {
brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1);
}
});
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap);
allocator.upToNum(queueNum);
Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
//construct the topic configAndMapping
long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
Integer queueId = e.getKey();
String broker = e.getValue();
if (globalIdMap.containsKey(queueId)) {
//ignore the exited
continue;
}
TopicConfigAndQueueMapping configMapping;
if (!brokerConfigMap.containsKey(broker)) {
configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
configMapping.setWriteQueueNums(1);
configMapping.setReadQueueNums(1);
brokerConfigMap.put(broker, configMapping);
} else {
configMapping = brokerConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
}
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
}
// set the topic config
brokerConfigMap.values().forEach(configMapping -> {
configMapping.getMappingDetail().setEpoch(newEpoch);
configMapping.getMappingDetail().setTotalQueues(queueNum);
});
//double check the config
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
//calculate the new data
TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
{
TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, newIdToBroker, brokerConfigMap);
String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
System.out.println("The new mapping data is written to file " + newMappingDataFile);
}
doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt, false);
doUpdate(newWrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, false);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......
......@@ -150,13 +150,7 @@ public class DefaultMQAdminExtTest {
topicRouteData.setBrokerDatas(brokerDatas);
topicRouteData.setQueueDatas(new ArrayList<QueueData>());
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
LogicalQueuesInfo logicalQueuesInfoinfo = new LogicalQueuesInfo();
logicalQueuesInfoinfo.put(0, Lists.newArrayList(
new LogicalQueueRouteData(0, 0, new MessageQueue(topic1, broker1Name, 0), MessageQueueRouteState.ReadOnly, 0, 1000, 2000, 3000, broker1Addr),
new LogicalQueueRouteData(0, 1000, new MessageQueue(topic1, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr)
));
topicRouteData.setLogicalQueuesInfo(logicalQueuesInfoinfo);
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), any())).thenReturn(topicRouteData);
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
HashMap<String, String> result = new HashMap<>();
result.put("id", String.valueOf(MixAll.MASTER_ID));
......