diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index e9ef46eff0004bffb5a9a42f5e9104aa50b7bb58..6a5b31e7d9ab1442c07a47bc799d3b960b9b044d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -350,7 +350,7 @@ public class BrokerOuterAPI { return changedList; } - public TopicConfigSerializeWrapper getAllTopicConfig( + public TopicConfigAndMappingSerializeWrapper getAllTopicConfig( final String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); @@ -359,7 +359,7 @@ public class BrokerOuterAPI { assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { - return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class); + return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigAndMappingSerializeWrapper.class); } default: break; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index e4f36973cd333c8f460e7d1be3d681f56e9284f5..2d39538317459db230e1df65e296c6f87a02cd63 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -64,6 +64,7 @@ import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody; import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; @@ -545,7 +546,16 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements // final GetAllTopicConfigResponseHeader responseHeader = // (GetAllTopicConfigResponseHeader) response.readCustomHeader(); - String content = this.brokerController.getTopicConfigManager().encode(); + TopicConfigAndMappingSerializeWrapper topicConfigAndMappingSerializeWrapper = new TopicConfigAndMappingSerializeWrapper(); + + topicConfigAndMappingSerializeWrapper.setDataVersion(this.brokerController.getTopicConfigManager().getDataVersion()); + topicConfigAndMappingSerializeWrapper.setTopicConfigTable(this.brokerController.getTopicConfigManager().getTopicConfigTable()); + + topicConfigAndMappingSerializeWrapper.setMappingDataVersion(this.brokerController.getTopicQueueMappingManager().getDataVersion()); + topicConfigAndMappingSerializeWrapper.setTopicQueueMappingDetailMap(this.brokerController.getTopicQueueMappingManager().getTopicQueueMappingTable()); + + + String content = topicConfigAndMappingSerializeWrapper.toJson(); if (content != null && content.length() > 0) { try { response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index 7b5e5645a0b0f383863116892efd27e7e22c0604..00293187c8b8182e91d7526f987e12b4dfd87ef6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -21,6 +21,7 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper; @@ -56,7 +57,7 @@ public class SlaveSynchronize { String masterAddrBak = this.masterAddr; if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { try { - TopicConfigSerializeWrapper topicWrapper = + TopicConfigAndMappingSerializeWrapper topicWrapper = this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); if (!this.brokerController.getTopicConfigManager().getDataVersion() .equals(topicWrapper.getDataVersion())) { @@ -67,9 +68,17 @@ public class SlaveSynchronize { this.brokerController.getTopicConfigManager().getTopicConfigTable() .putAll(topicWrapper.getTopicConfigTable()); this.brokerController.getTopicConfigManager().persist(); - - log.info("Update slave topic config from master, {}", masterAddrBak); } + if (topicWrapper.getTopicQueueMappingDetailMap() != null + && !topicWrapper.getMappingDataVersion().equals(this.brokerController.getTopicQueueMappingManager().getDataVersion())) { + this.brokerController.getTopicQueueMappingManager().getDataVersion() + .assignNewOne(topicWrapper.getMappingDataVersion()); + this.brokerController.getTopicQueueMappingManager().getTopicQueueMappingTable().clear(); + this.brokerController.getTopicQueueMappingManager().getTopicQueueMappingTable() + .putAll(topicWrapper.getTopicQueueMappingDetailMap()); + this.brokerController.getTopicQueueMappingManager().persist(); + } + log.info("Update slave topic config from master, {}", masterAddrBak); } catch (Exception e) { log.error("SyncTopicConfig Exception, {}", masterAddrBak, e); } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java index 47892e657e604b771ec40a4f008d861294f58a75..830a8a43d9859b21eea07b2a29e7ef27cac18592 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.common.protocol.body; +import org.apache.rocketmq.common.DataVersion; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo; import java.util.Map; @@ -25,6 +27,11 @@ import java.util.concurrent.ConcurrentHashMap; public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeWrapper { private Map topicQueueMappingInfoMap = new ConcurrentHashMap(); + private Map topicQueueMappingDetailMap = new ConcurrentHashMap(); + + private DataVersion mappingDataVersion = new DataVersion(); + + public Map getTopicQueueMappingInfoMap() { return topicQueueMappingInfoMap; } @@ -33,6 +40,22 @@ public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeW this.topicQueueMappingInfoMap = topicQueueMappingInfoMap; } + public Map getTopicQueueMappingDetailMap() { + return topicQueueMappingDetailMap; + } + + public void setTopicQueueMappingDetailMap(Map topicQueueMappingDetailMap) { + this.topicQueueMappingDetailMap = topicQueueMappingDetailMap; + } + + public DataVersion getMappingDataVersion() { + return mappingDataVersion; + } + + public void setMappingDataVersion(DataVersion mappingDataVersion) { + this.mappingDataVersion = mappingDataVersion; + } + public static TopicConfigAndMappingSerializeWrapper from(TopicConfigSerializeWrapper wrapper) { if (wrapper instanceof TopicConfigAndMappingSerializeWrapper) { return (TopicConfigAndMappingSerializeWrapper) wrapper;