diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java index 53ffa51a20f3ef63c8c1dd6cae9251f2a0660d63..499b6a87fb1cedeae5c3f56f3faf3a097d23568b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java @@ -112,7 +112,7 @@ public class ClientMetadata { for (Map.Entry entry : mappingInfos) { TopicQueueMappingInfo info = entry.getValue(); if (info.getEpoch() >= maxTotalNumOfEpoch && info.getTotalQueues() > maxTotalNums) { - maxTotalNums = entry.getValue().getTotalQueues(); + maxTotalNums = info.getTotalQueues(); } for (Map.Entry idEntry : entry.getValue().getCurrIdMap().entrySet()) { int globalId = idEntry.getKey(); diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index 7ac7ce86f1bd1dedf5651cb1ec911f5c84093200..dc888769890b15c352582d2bcb45728ee0cf3ce7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -18,6 +18,9 @@ package org.apache.rocketmq.common.statictopic; import com.google.common.collect.ImmutableList; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.rpc.ClientMetadata; +import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.File; import java.util.AbstractMap; @@ -29,6 +32,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; public class TopicQueueMappingUtils { @@ -306,6 +311,75 @@ public class TopicQueueMappingUtils { } } + public Map createTopicConfigMapping(String topic, int queueNum, Set targetBrokers, Map brokerConfigMap) { + Map globalIdMap = new HashMap(); + Map.Entry maxEpochAndNum = new AbstractMap.SimpleImmutableEntry(System.currentTimeMillis(), queueNum); + if (!brokerConfigMap.isEmpty()) { + maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); + } + if (queueNum < globalIdMap.size()) { + throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size())); + } + //check the queue number + if (queueNum == globalIdMap.size()) { + throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing"); + } + //the check is ok, now do the mapping allocation + Map brokerNumMap = new HashMap(); + for (String broker: targetBrokers) { + brokerNumMap.put(broker, 0); + } + final Map oldIdToBroker = new HashMap(); + for (Map.Entry entry : globalIdMap.entrySet()) { + String leaderbroker = entry.getValue().getBname(); + oldIdToBroker.put(entry.getKey(), leaderbroker); + if (!brokerNumMap.containsKey(leaderbroker)) { + brokerNumMap.put(leaderbroker, 1); + } else { + brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1); + } + } + TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap); + allocator.upToNum(queueNum); + Map newIdToBroker = allocator.getIdToBroker(); + + //construct the topic configAndMapping + long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis()); + for (Map.Entry e : newIdToBroker.entrySet()) { + Integer queueId = e.getKey(); + String broker = e.getValue(); + if (globalIdMap.containsKey(queueId)) { + //ignore the exited + continue; + } + TopicConfigAndQueueMapping configMapping; + if (!brokerConfigMap.containsKey(broker)) { + configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1)); + configMapping.setWriteQueueNums(1); + configMapping.setReadQueueNums(1); + brokerConfigMap.put(broker, configMapping); + } else { + configMapping = brokerConfigMap.get(broker); + configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); + configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1); + } + LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1); + configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem)); + } + + // set the topic config + for (Map.Entry entry : brokerConfigMap.entrySet()) { + TopicConfigAndQueueMapping configMapping = entry.getValue(); + configMapping.getMappingDetail().setEpoch(newEpoch); + configMapping.getMappingDetail().setTotalQueues(queueNum); + } + //double check the config + TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); + + return brokerConfigMap; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 5ba6cfab0a740a46a20d69a104b74094ad81993f..158f03f74f067094090039a1c82d63e26106c37b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -161,6 +161,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } }); + Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 70300568bea4eb2d4d86ed62c1d2ee0e5b74ce53..3ae0f8c6e90037831268560b0bd5c1426bf53af6 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.tools.admin; import java.io.UnsupportedEncodingException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -28,6 +29,9 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.rpc.ClientMetadata; +import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.RollbackStats; @@ -220,6 +224,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { return defaultMQAdminExtImpl.examineTopicConfig(addr, topic); } + @Override + public Map getTopicConfigMap(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return this.defaultMQAdminExtImpl.getTopicConfigMap(clientMetadata, topic); + } + @Override public TopicStatsTable examineTopicStats( String topic) throws RemotingException, MQClientException, InterruptedException, @@ -665,6 +674,7 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force); } + @Override public void migrateTopicLogicalQueueNotify(String brokerAddr, LogicalQueueRouteData fromQueueRouteData, LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 06b456fa8de0a98300966c3eef7d1f88e4938f44..31be976fd6deda3054c8c2fe74d205b4abf661e4 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.tools.admin; import java.io.UnsupportedEncodingException; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -29,6 +30,9 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Random; import java.util.Set; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableList; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.admin.MQAdminExtInner; @@ -42,6 +46,9 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.rpc.ClientMetadata; +import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; +import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.ConsumeStats; @@ -82,6 +89,9 @@ import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo; import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; +import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; @@ -1100,6 +1110,27 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force); } + @Override + public Map getTopicConfigMap(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + TopicRouteData routeData = examineTopicRouteInfo(topic); + clientMetadata.freshTopicRoute(topic, routeData); + Map brokerConfigMap = new HashMap<>(); + + if (routeData != null + && !routeData.getQueueDatas().isEmpty()) { + for (QueueData queueData: routeData.getQueueDatas()) { + String bname = queueData.getBrokerName(); + String addr = clientMetadata.findMasterBrokerAddr(bname); + TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic); + //allow the config is null + if (mapping != null) { + brokerConfigMap.put(bname, mapping); + } + } + } + return brokerConfigMap; + } + @Override public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 888dad8f755a53cdc908e4ae460c1a206aff9730..f94b90d44c3d2dfd85f7e3ee646db41c52b15673 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -27,6 +27,8 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.rpc.ClientMetadata; +import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.RollbackStats; @@ -340,6 +342,9 @@ public interface MQAdminExt extends MQAdmin { void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + Map getTopicConfigMap(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + void migrateTopicLogicalQueueNotify(String brokerAddr, LogicalQueueRouteData fromQueueRouteData, LogicalQueueRouteData toQueueRouteData) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException; }