提交 40d86269 编写于 作者: D dongeforever

Finish the topicRoute2endPoints for static topic

上级 a64ad010
......@@ -61,13 +61,15 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueAssignment;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
......@@ -77,21 +79,17 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class MQClientInstance {
private final static long LOCK_TIMEOUT_MILLIS = 3000;
private final InternalLogger log = ClientLogger.getLog();
private final static InternalLogger log = ClientLogger.getLog();
private final ClientConfig clientConfig;
private final int instanceIndex;
private final String clientId;
......@@ -103,6 +101,7 @@ public class MQClientInstance {
private final MQClientAPIImpl mQClientAPIImpl;
private final MQAdminImpl mQAdminImpl;
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
private final ConcurrentMap<String/* Topic */, ConcurrentMap<MessageQueue, String/*brokerName*/>> topicEndPointsTable = new ConcurrentHashMap<>();
private final Lock lockNamesrv = new ReentrantLock();
private final Lock lockHeartbeat = new ReentrantLock();
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
......@@ -162,8 +161,42 @@ public class MQClientInstance {
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
public static ConcurrentMap<MessageQueue, String> topicRouteData2EndpointsForStaticTopic(final String topic, final TopicRouteData route) {
if (route.getTopicQueueMappingByBroker() == null
|| route.getTopicQueueMappingByBroker().isEmpty()) {
return new ConcurrentHashMap<>();
}
ConcurrentMap<MessageQueue, String> mqEndPoints = new ConcurrentHashMap<>();
int totalNums = 0;
for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) {
String brokerName = entry.getKey();
if (entry.getValue().getTotalQueues() > totalNums) {
if (totalNums != 0) {
log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues());
}
totalNums = entry.getValue().getTotalQueues();
}
for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
int globalId = idEntry.getKey();
MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, globalId);
String oldBrokerName = mqEndPoints.put(mq, brokerName);
log.warn("The static logic queue is duplicated {} {} {} ", mq, oldBrokerName, brokerName);
}
}
//accomplish the static logic queues
for (int i = 0; i < totalNums; i++) {
MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, i);
if (!mqEndPoints.containsKey(mq)) {
mqEndPoints.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST);
}
}
return mqEndPoints;
}
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
// TODO should check the usage of raw route, it is better to remove such field
info.setTopicRouteData(route);
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
String[] brokers = route.getOrderTopicConf().split(";");
......@@ -177,28 +210,13 @@ public class MQClientInstance {
}
info.setOrderTopic(true);
} else if (route.getOrderTopicConf() == null && route.getLogicalQueuesInfo() != null) {
} else if (route.getOrderTopicConf() == null
&& route.getTopicQueueMappingByBroker() != null
&& !route.getTopicQueueMappingByBroker().isEmpty()) {
info.setOrderTopic(false);
List<MessageQueue> messageQueueList = info.getMessageQueueList();
LogicalQueuesInfo logicalQueueInfo = route.getLogicalQueuesInfo();
for (Map.Entry<Integer, List<LogicalQueueRouteData>> entry : logicalQueueInfo.entrySet()) {
boolean someWritable = false;
for (LogicalQueueRouteData logicalQueueRouteData : entry.getValue()) {
if (logicalQueueRouteData.isWritable()) {
someWritable = true;
break;
}
}
if (!someWritable) {
continue;
}
MessageQueue mq = new MessageQueue();
mq.setQueueId(entry.getKey());
mq.setBrokerName(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
mq.setTopic(topic);
messageQueueList.add(mq);
}
Collections.sort(messageQueueList, new Comparator<MessageQueue>() {
ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, route);
info.getMessageQueueList().addAll(mqEndPoints.keySet());
Collections.sort(info.getMessageQueueList(), new Comparator<MessageQueue>() {
@Override public int compare(MessageQueue o1, MessageQueue o2) {
return MixAll.compareInteger(o1.getQueueId(), o2.getQueueId());
}
......@@ -239,26 +257,10 @@ public class MQClientInstance {
public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
Set<MessageQueue> mqList = new HashSet<MessageQueue>();
if (route.getLogicalQueuesInfo() != null) {
LogicalQueuesInfo logicalQueueInfo = route.getLogicalQueuesInfo();
for (Map.Entry<Integer, List<LogicalQueueRouteData>> entry : logicalQueueInfo.entrySet()) {
boolean someReadable = false;
for (LogicalQueueRouteData logicalQueueRouteData : entry.getValue()) {
if (logicalQueueRouteData.isReadable()) {
someReadable = true;
break;
}
}
if (!someReadable) {
continue;
}
MessageQueue mq = new MessageQueue();
mq.setQueueId(entry.getKey());
mq.setBrokerName(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
mq.setTopic(topic);
mqList.add(mq);
}
return mqList;
if (route.getTopicQueueMappingByBroker() != null
&& !route.getTopicQueueMappingByBroker().isEmpty()) {
ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, route);
return mqEndPoints.keySet();
}
List<QueueData> qds = route.getQueueDatas();
for (QueueData qd : qds) {
......@@ -656,11 +658,6 @@ public class MQClientInstance {
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
return this.updateTopicRouteInfoFromNameServer(topic, isDefault, defaultMQProducer, null);
}
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer, Set<Integer> logicalQueueIdsFilter) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
......@@ -676,7 +673,7 @@ public class MQClientInstance {
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout(), true, logicalQueueIdsFilter);
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
......@@ -688,31 +685,19 @@ public class MQClientInstance {
}
if (changed) {
TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);
if (logicalQueueIdsFilter != null && cloneTopicRouteData.getLogicalQueuesInfo() != null) {
TopicRouteData curTopicRouteData = this.topicRouteTable.get(topic);
if (curTopicRouteData != null) {
LogicalQueuesInfo curLogicalQueuesInfo = curTopicRouteData.getLogicalQueuesInfo();
if (curLogicalQueuesInfo != null) {
LogicalQueuesInfo cloneLogicalQueuesInfo = cloneTopicRouteData.getLogicalQueuesInfo();
curLogicalQueuesInfo.readLock().lock();
try {
for (Entry<Integer, List<LogicalQueueRouteData>> entry : curLogicalQueuesInfo.entrySet()) {
if (!cloneLogicalQueuesInfo.containsKey(entry.getKey())) {
cloneLogicalQueuesInfo.put(entry.getKey(), entry.getValue());
}
}
} finally {
curLogicalQueuesInfo.readLock().unlock();
}
}
}
}
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update endpoint map
{
ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
if (mqEndPoints != null && !mqEndPoints.isEmpty()) {
topicEndPointsTable.put(topic, mqEndPoints);
}
}
// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
......@@ -739,6 +724,7 @@ public class MQClientInstance {
}
}
}
TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
......@@ -865,13 +851,6 @@ public class MQClientInstance {
private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {
if (olddata == null || nowdata == null)
return true;
LogicalQueuesInfo oldLogicalQueuesInfo = olddata.getLogicalQueuesInfo();
LogicalQueuesInfo newLogicalQueuesInfo = nowdata.getLogicalQueuesInfo();
if (oldLogicalQueuesInfo != null && newLogicalQueuesInfo != null) {
return oldLogicalQueuesInfo.keySet().equals(newLogicalQueuesInfo.keySet());
} else if (oldLogicalQueuesInfo != null || newLogicalQueuesInfo != null) {
return true;
}
TopicRouteData old = new TopicRouteData(olddata);
TopicRouteData now = new TopicRouteData(nowdata);
Collections.sort(old.getQueueDatas());
......
......@@ -17,9 +17,13 @@
package org.apache.rocketmq.client.impl.producer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
......@@ -116,4 +120,5 @@ public class TopicPublishInfo {
public void setTopicRouteData(final TopicRouteData topicRouteData) {
this.topicRouteData = topicRouteData;
}
}
......@@ -88,6 +88,8 @@ public class MixAll {
public static final String REPLY_MESSAGE_FLAG = "reply";
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME = "__logical_queue_broker__";
public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__logical_queue_broker_not_exist__";
public static final Type TYPE_LIST_LOGICAL_QUEUE_ROUTE_DATA = new TypeReference<List<LogicalQueueRouteData>>() {
}.getType();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册