提交 3d685903 编写于 作者: D dongeforever

Polish the code

上级 daf47490
......@@ -112,7 +112,7 @@ public class ClientMetadata {
for (Map.Entry<String, TopicQueueMappingInfo> entry : mappingInfos) {
TopicQueueMappingInfo info = entry.getValue();
if (info.getEpoch() >= maxTotalNumOfEpoch && info.getTotalQueues() > maxTotalNums) {
maxTotalNums = entry.getValue().getTotalQueues();
maxTotalNums = info.getTotalQueues();
}
for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
int globalId = idEntry.getKey();
......
......@@ -18,6 +18,9 @@ package org.apache.rocketmq.common.statictopic;
import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.File;
import java.util.AbstractMap;
......@@ -29,6 +32,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
public class TopicQueueMappingUtils {
......@@ -306,6 +311,75 @@ public class TopicQueueMappingUtils {
}
}
public Map<String, TopicConfigAndQueueMapping> createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<Long, Integer>(System.currentTimeMillis(), queueNum);
if (!brokerConfigMap.isEmpty()) {
maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
}
if (queueNum < globalIdMap.size()) {
throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
}
//check the queue number
if (queueNum == globalIdMap.size()) {
throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing");
}
//the check is ok, now do the mapping allocation
Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
for (String broker: targetBrokers) {
brokerNumMap.put(broker, 0);
}
final Map<Integer, String> oldIdToBroker = new HashMap<Integer, String>();
for (Map.Entry<Integer, TopicQueueMappingOne> entry : globalIdMap.entrySet()) {
String leaderbroker = entry.getValue().getBname();
oldIdToBroker.put(entry.getKey(), leaderbroker);
if (!brokerNumMap.containsKey(leaderbroker)) {
brokerNumMap.put(leaderbroker, 1);
} else {
brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1);
}
}
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap);
allocator.upToNum(queueNum);
Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
//construct the topic configAndMapping
long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
Integer queueId = e.getKey();
String broker = e.getValue();
if (globalIdMap.containsKey(queueId)) {
//ignore the exited
continue;
}
TopicConfigAndQueueMapping configMapping;
if (!brokerConfigMap.containsKey(broker)) {
configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
configMapping.setWriteQueueNums(1);
configMapping.setReadQueueNums(1);
brokerConfigMap.put(broker, configMapping);
} else {
configMapping = brokerConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
}
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
}
// set the topic config
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
TopicConfigAndQueueMapping configMapping = entry.getValue();
configMapping.getMappingDetail().setEpoch(newEpoch);
configMapping.getMappingDetail().setTotalQueues(queueNum);
}
//double check the config
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
return brokerConfigMap;
}
}
......@@ -161,6 +161,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
});
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
......
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.tools.admin;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
......@@ -28,6 +29,9 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
......@@ -220,6 +224,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
return defaultMQAdminExtImpl.examineTopicConfig(addr, topic);
}
@Override
public Map<String, TopicConfigAndQueueMapping> getTopicConfigMap(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.defaultMQAdminExtImpl.getTopicConfigMap(clientMetadata, topic);
}
@Override
public TopicStatsTable examineTopicStats(
String topic) throws RemotingException, MQClientException, InterruptedException,
......@@ -665,6 +674,7 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
}
@Override public void migrateTopicLogicalQueueNotify(String brokerAddr,
LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
......
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.tools.admin;
import java.io.UnsupportedEncodingException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
......@@ -29,6 +30,9 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
......@@ -42,6 +46,9 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
......@@ -82,6 +89,9 @@ import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -1100,6 +1110,27 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
}
@Override
public Map<String, TopicConfigAndQueueMapping> getTopicConfigMap(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
TopicRouteData routeData = examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
}
}
return brokerConfigMap;
}
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
......
......@@ -27,6 +27,8 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
......@@ -340,6 +342,9 @@ public interface MQAdminExt extends MQAdmin {
void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
Map<String, TopicConfigAndQueueMapping> getTopicConfigMap(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
void migrateTopicLogicalQueueNotify(String brokerAddr, LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册