提交 ad90cc16 编写于 作者: D dongeforever

Finish the slave sync logic for topic queue mapping

上级 a9addc3c
...@@ -350,7 +350,7 @@ public class BrokerOuterAPI { ...@@ -350,7 +350,7 @@ public class BrokerOuterAPI {
return changedList; return changedList;
} }
public TopicConfigSerializeWrapper getAllTopicConfig( public TopicConfigAndMappingSerializeWrapper getAllTopicConfig(
final String addr) throws RemotingConnectException, RemotingSendRequestException, final String addr) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException { RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
...@@ -359,7 +359,7 @@ public class BrokerOuterAPI { ...@@ -359,7 +359,7 @@ public class BrokerOuterAPI {
assert response != null; assert response != null;
switch (response.getCode()) { switch (response.getCode()) {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class); return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigAndMappingSerializeWrapper.class);
} }
default: default:
break; break;
......
...@@ -64,6 +64,7 @@ import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; ...@@ -64,6 +64,7 @@ import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody; import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody; import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
...@@ -545,7 +546,16 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -545,7 +546,16 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
// final GetAllTopicConfigResponseHeader responseHeader = // final GetAllTopicConfigResponseHeader responseHeader =
// (GetAllTopicConfigResponseHeader) response.readCustomHeader(); // (GetAllTopicConfigResponseHeader) response.readCustomHeader();
String content = this.brokerController.getTopicConfigManager().encode(); TopicConfigAndMappingSerializeWrapper topicConfigAndMappingSerializeWrapper = new TopicConfigAndMappingSerializeWrapper();
topicConfigAndMappingSerializeWrapper.setDataVersion(this.brokerController.getTopicConfigManager().getDataVersion());
topicConfigAndMappingSerializeWrapper.setTopicConfigTable(this.brokerController.getTopicConfigManager().getTopicConfigTable());
topicConfigAndMappingSerializeWrapper.setMappingDataVersion(this.brokerController.getTopicQueueMappingManager().getDataVersion());
topicConfigAndMappingSerializeWrapper.setTopicQueueMappingDetailMap(this.brokerController.getTopicQueueMappingManager().getTopicQueueMappingTable());
String content = topicConfigAndMappingSerializeWrapper.toJson();
if (content != null && content.length() > 0) { if (content != null && content.length() > 0) {
try { try {
response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
......
...@@ -21,6 +21,7 @@ import org.apache.rocketmq.broker.BrokerController; ...@@ -21,6 +21,7 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper; import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
...@@ -56,7 +57,7 @@ public class SlaveSynchronize { ...@@ -56,7 +57,7 @@ public class SlaveSynchronize {
String masterAddrBak = this.masterAddr; String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try { try {
TopicConfigSerializeWrapper topicWrapper = TopicConfigAndMappingSerializeWrapper topicWrapper =
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
if (!this.brokerController.getTopicConfigManager().getDataVersion() if (!this.brokerController.getTopicConfigManager().getDataVersion()
.equals(topicWrapper.getDataVersion())) { .equals(topicWrapper.getDataVersion())) {
...@@ -67,9 +68,17 @@ public class SlaveSynchronize { ...@@ -67,9 +68,17 @@ public class SlaveSynchronize {
this.brokerController.getTopicConfigManager().getTopicConfigTable() this.brokerController.getTopicConfigManager().getTopicConfigTable()
.putAll(topicWrapper.getTopicConfigTable()); .putAll(topicWrapper.getTopicConfigTable());
this.brokerController.getTopicConfigManager().persist(); this.brokerController.getTopicConfigManager().persist();
log.info("Update slave topic config from master, {}", masterAddrBak);
} }
if (topicWrapper.getTopicQueueMappingDetailMap() != null
&& !topicWrapper.getMappingDataVersion().equals(this.brokerController.getTopicQueueMappingManager().getDataVersion())) {
this.brokerController.getTopicQueueMappingManager().getDataVersion()
.assignNewOne(topicWrapper.getMappingDataVersion());
this.brokerController.getTopicQueueMappingManager().getTopicQueueMappingTable().clear();
this.brokerController.getTopicQueueMappingManager().getTopicQueueMappingTable()
.putAll(topicWrapper.getTopicQueueMappingDetailMap());
this.brokerController.getTopicQueueMappingManager().persist();
}
log.info("Update slave topic config from master, {}", masterAddrBak);
} catch (Exception e) { } catch (Exception e) {
log.error("SyncTopicConfig Exception, {}", masterAddrBak, e); log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
} }
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
package org.apache.rocketmq.common.protocol.body; package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo; import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo;
import java.util.Map; import java.util.Map;
...@@ -25,6 +27,11 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -25,6 +27,11 @@ import java.util.concurrent.ConcurrentHashMap;
public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeWrapper { public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeWrapper {
private Map<String/* topic */, TopicQueueMappingInfo> topicQueueMappingInfoMap = new ConcurrentHashMap<String, TopicQueueMappingInfo>(); private Map<String/* topic */, TopicQueueMappingInfo> topicQueueMappingInfoMap = new ConcurrentHashMap<String, TopicQueueMappingInfo>();
private Map<String/* topic */, TopicQueueMappingDetail> topicQueueMappingDetailMap = new ConcurrentHashMap<String, TopicQueueMappingDetail>();
private DataVersion mappingDataVersion = new DataVersion();
public Map<String, TopicQueueMappingInfo> getTopicQueueMappingInfoMap() { public Map<String, TopicQueueMappingInfo> getTopicQueueMappingInfoMap() {
return topicQueueMappingInfoMap; return topicQueueMappingInfoMap;
} }
...@@ -33,6 +40,22 @@ public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeW ...@@ -33,6 +40,22 @@ public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeW
this.topicQueueMappingInfoMap = topicQueueMappingInfoMap; this.topicQueueMappingInfoMap = topicQueueMappingInfoMap;
} }
public Map<String, TopicQueueMappingDetail> getTopicQueueMappingDetailMap() {
return topicQueueMappingDetailMap;
}
public void setTopicQueueMappingDetailMap(Map<String, TopicQueueMappingDetail> topicQueueMappingDetailMap) {
this.topicQueueMappingDetailMap = topicQueueMappingDetailMap;
}
public DataVersion getMappingDataVersion() {
return mappingDataVersion;
}
public void setMappingDataVersion(DataVersion mappingDataVersion) {
this.mappingDataVersion = mappingDataVersion;
}
public static TopicConfigAndMappingSerializeWrapper from(TopicConfigSerializeWrapper wrapper) { public static TopicConfigAndMappingSerializeWrapper from(TopicConfigSerializeWrapper wrapper) {
if (wrapper instanceof TopicConfigAndMappingSerializeWrapper) { if (wrapper instanceof TopicConfigAndMappingSerializeWrapper) {
return (TopicConfigAndMappingSerializeWrapper) wrapper; return (TopicConfigAndMappingSerializeWrapper) wrapper;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册