...
 
Commits (2)
    https://gitcode.net/apacherocketmq/rocketmq/-/commit/4e9b097478e7de052a14cd883b47858e6a47b94e Check the correctness of logic items 2021-11-19T20:00:23+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/d18d787f92eb44020a482205e27bae4b9c725957 Add the check logic for admin process update-and-create static topic 2021-11-19T20:33:22+08:00 dongeforever dongeforever@apache.org
...@@ -323,6 +323,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -323,6 +323,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (TopicValidator.isSystemTopic(topic, response)) { if (TopicValidator.isSystemTopic(topic, response)) {
return response; return response;
} }
boolean force = false;
if (requestHeader.getForce() != null && requestHeader.getForce()) {
force = true;
}
TopicConfig topicConfig = new TopicConfig(topic); TopicConfig topicConfig = new TopicConfig(topic);
topicConfig.setReadQueueNums(requestHeader.getReadQueueNums()); topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
...@@ -331,13 +335,18 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -331,13 +335,18 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
topicConfig.setPerm(requestHeader.getPerm()); topicConfig.setPerm(requestHeader.getPerm());
topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag()); topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); try {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody.getMappingDetail());
this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody.getMappingDetail(), force);
response.setCode(ResponseCode.SUCCESS); this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
response.setCode(ResponseCode.SUCCESS);
} catch (Exception e) {
log.error("Update static failed for [{}]", request, e);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(e.getMessage());
}
return response; return response;
} }
......
...@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.constant.LoggerName; ...@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader; import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
...@@ -56,8 +57,47 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -56,8 +57,47 @@ public class TopicQueueMappingManager extends ConfigManager {
this.brokerController = brokerController; this.brokerController = brokerController;
} }
public void updateTopicQueueMapping(TopicQueueMappingDetail topicQueueMappingDetail) { public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) {
topicQueueMappingTable.put(topicQueueMappingDetail.getTopic(), topicQueueMappingDetail); lock.lock();
try {
if (newDetail == null) {
return;
}
newDetail.getHostedQueues().forEach((queueId, items) -> {
TopicQueueMappingUtils.checkLogicQueueMappingItemOffset(items);
});
TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic());
if (oldDetail == null) {
topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
return;
}
if (force) {
oldDetail.getHostedQueues().forEach( (queueId, items) -> {
newDetail.getHostedQueues().putIfAbsent(queueId, items);
});
topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
return;
}
//do more check
if (newDetail.getEpoch() <= oldDetail.getEpoch()) {
throw new RuntimeException(String.format("Can't accept data with small epoch %d < %d", newDetail.getEpoch(), oldDetail.getEpoch()));
}
for (Integer globalId : oldDetail.getHostedQueues().keySet()) {
ImmutableList<LogicQueueMappingItem> oldItems = oldDetail.getHostedQueues().get(globalId);
ImmutableList<LogicQueueMappingItem> newItems = newDetail.getHostedQueues().get(globalId);
if (newItems == null) {
//keep the old
newDetail.getHostedQueues().put(globalId, oldItems);
} else {
TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems);
}
}
topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
} finally {
lock.unlock();
}
} }
public TopicQueueMappingDetail getTopicQueueMapping(String topic) { public TopicQueueMappingDetail getTopicQueueMapping(String topic) {
......
...@@ -80,11 +80,11 @@ public class MQAdminImpl { ...@@ -80,11 +80,11 @@ public class MQAdminImpl {
this.timeoutMillis = timeoutMillis; this.timeoutMillis = timeoutMillis;
} }
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws MQClientException { public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, boolean force) throws MQClientException {
MQClientException exception = null; MQClientException exception = null;
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
try { try {
this.mQClientFactory.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, timeoutMillis); this.mQClientFactory.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force, timeoutMillis);
break; break;
} catch (Exception e) { } catch (Exception e) {
if (2 == i) { if (2 == i) {
......
...@@ -2726,7 +2726,7 @@ public class MQClientAPIImpl { ...@@ -2726,7 +2726,7 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark()); throw new MQBrokerException(response.getCode(), response.getRemark());
} }
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail, public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail, boolean force,
final long timeoutMillis) final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException { throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
...@@ -2738,7 +2738,7 @@ public class MQClientAPIImpl { ...@@ -2738,7 +2738,7 @@ public class MQClientAPIImpl {
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name()); requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag()); requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
requestHeader.setOrder(topicConfig.isOrder()); requestHeader.setOrder(topicConfig.isOrder());
requestHeader.setForce(force);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC, requestHeader); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC, requestHeader);
request.setBody(topicQueueMappingDetail.encode()); request.setBody(topicQueueMappingDetail.encode());
......
...@@ -23,6 +23,7 @@ package org.apache.rocketmq.common.protocol.header; ...@@ -23,6 +23,7 @@ package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class CreateTopicRequestHeader implements CommandCustomHeader { public class CreateTopicRequestHeader implements CommandCustomHeader {
...@@ -42,6 +43,9 @@ public class CreateTopicRequestHeader implements CommandCustomHeader { ...@@ -42,6 +43,9 @@ public class CreateTopicRequestHeader implements CommandCustomHeader {
@CFNotNull @CFNotNull
private Boolean order = false; private Boolean order = false;
@CFNullable
private Boolean force = false;
@Override @Override
public void checkFields() throws RemotingCommandException { public void checkFields() throws RemotingCommandException {
try { try {
...@@ -118,4 +122,12 @@ public class CreateTopicRequestHeader implements CommandCustomHeader { ...@@ -118,4 +122,12 @@ public class CreateTopicRequestHeader implements CommandCustomHeader {
public void setOrder(Boolean order) { public void setOrder(Boolean order) {
this.order = order; this.order = order;
} }
public Boolean getForce() {
return force;
}
public void setForce(Boolean force) {
this.force = force;
}
} }
...@@ -7,7 +7,7 @@ public class LogicQueueMappingItem { ...@@ -7,7 +7,7 @@ public class LogicQueueMappingItem {
private String bname; private String bname;
private long logicOffset; // the start of the logic offset private long logicOffset; // the start of the logic offset
private long startOffset; // the start of the physical offset, included private long startOffset; // the start of the physical offset, included
private long endOffset; // the end of the physical offset, excluded private long endOffset = -1; // the end of the physical offset, excluded
private long timeOfStart = -1; // mutable private long timeOfStart = -1; // mutable
private long timeOfEnd = -1; // mutable private long timeOfEnd = -1; // mutable
......
...@@ -162,6 +162,72 @@ public class TopicQueueMappingUtils { ...@@ -162,6 +162,72 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum); return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
} }
public static void makeSureLogicQueueMappingItemImmutable(ImmutableList<LogicQueueMappingItem> oldItems, ImmutableList<LogicQueueMappingItem> newItems) {
if (oldItems == null || oldItems.isEmpty()) {
return;
}
if (newItems == null || newItems.isEmpty() || newItems.size() < oldItems.size()) {
throw new RuntimeException("The new item list is smaller than old ones");
}
int iold = 0, inew = 0;
while (iold < oldItems.size() && inew < newItems.size()) {
LogicQueueMappingItem newItem = newItems.get(inew);
LogicQueueMappingItem oldItem = oldItems.get(iold);
if (newItem.getGen() < oldItem.getGen()) {
inew++;
continue;
} else if (oldItem.getGen() < newItem.getGen()){
throw new RuntimeException("The gen is not correct for old item");
} else {
assert oldItem.getBname().equals(newItem.getBname());
assert oldItem.getQueueId() == newItem.getQueueId();
assert oldItem.getStartOffset() == newItem.getStartOffset();
if (oldItem.getLogicOffset() != -1) {
assert oldItem.getLogicOffset() == newItem.getLogicOffset();
}
iold++;
inew++;
}
}
}
public static void checkLogicQueueMappingItemOffset(ImmutableList<LogicQueueMappingItem> items) {
if (items == null
|| items.isEmpty()) {
return;
}
int lastGen = -1;
long lastOffset = -1;
for (int i = items.size() - 1; i >=0 ; i--) {
LogicQueueMappingItem item = items.get(i);
if (item.getStartOffset() < 0
|| item.getGen() < 0
|| item.getQueueId() < 0) {
throw new RuntimeException("The field is illegal, should not be negative");
}
if (lastGen != -1 && item.getGen() >= lastGen) {
throw new RuntimeException("The gen dose not increase monotonically");
}
if (item.getEndOffset() != -1
&& item.getEndOffset() < item.getStartOffset()) {
throw new RuntimeException("The endOffset is smaller than the start offset");
}
if (lastOffset != -1 && item.getLogicOffset() != -1) {
if (item.getLogicOffset() >= lastOffset) {
throw new RuntimeException("The base logic offset dose not increase monotonically");
}
if (item.computeMaxStaticQueueOffset() >= lastOffset) {
throw new RuntimeException("The max logic offset dose not increase monotonically");
}
}
lastGen = item.getGen();
lastOffset = item.getLogicOffset();
}
}
public static Map<Integer, TopicQueueMappingOne> checkAndBuildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) { public static Map<Integer, TopicQueueMappingOne> checkAndBuildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) {
Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() { Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
@Override @Override
...@@ -178,6 +244,7 @@ public class TopicQueueMappingUtils { ...@@ -178,6 +244,7 @@ public class TopicQueueMappingUtils {
} }
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) { for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
Integer globalid = entry.getKey(); Integer globalid = entry.getKey();
checkLogicQueueMappingItemOffset(entry.getValue());
String leaderBrokerName = getLeaderBroker(entry.getValue()); String leaderBrokerName = getLeaderBroker(entry.getValue());
if (!leaderBrokerName.equals(mappingDetail.getBname())) { if (!leaderBrokerName.equals(mappingDetail.getBname())) {
//not the leader //not the leader
......
...@@ -661,8 +661,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { ...@@ -661,8 +661,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
} }
@Override @Override
public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail); this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
} }
@Override public void migrateTopicLogicalQueueNotify(String brokerAddr, @Override public void migrateTopicLogicalQueueNotify(String brokerAddr,
......
...@@ -1096,8 +1096,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { ...@@ -1096,8 +1096,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
} }
@Override @Override
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws MQClientException { public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws MQClientException {
this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail); this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
} }
@Override @Override
......
...@@ -337,7 +337,7 @@ public interface MQAdminExt extends MQAdmin { ...@@ -337,7 +337,7 @@ public interface MQAdminExt extends MQAdmin {
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws RemotingException, MQBrokerException, void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException; InterruptedException, MQClientException;
void migrateTopicLogicalQueueNotify(String brokerAddr, LogicalQueueRouteData fromQueueRouteData, void migrateTopicLogicalQueueNotify(String brokerAddr, LogicalQueueRouteData fromQueueRouteData,
......
...@@ -111,7 +111,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { ...@@ -111,7 +111,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
throw new RuntimeException("The Cluster info is empty"); throw new RuntimeException("The Cluster info is empty");
} }
clientMetadata.refreshClusterInfo(clusterInfo); clientMetadata.refreshClusterInfo(clusterInfo);
doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt); doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, false);
return; return;
}catch (Exception e) { }catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
...@@ -123,19 +123,19 @@ public class RemappingStaticTopicSubCommand implements SubCommand { ...@@ -123,19 +123,19 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
public void doRemapping(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, public void doRemapping(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap,
ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt) throws Exception { ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
// now do the remapping // now do the remapping
//Step1: let the new leader can be write without the logicOffset //Step1: let the new leader can be write without the logicOffset
for (String broker: brokersToMapIn) { for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker); String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail()); defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
} }
//Step2: forbid the write of old leader //Step2: forbid the write of old leader
for (String broker: brokersToMapOut) { for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker); String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail()); defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
} }
//Step3: decide the logic offset //Step3: decide the logic offset
for (String broker: brokersToMapOut) { for (String broker: brokersToMapOut) {
...@@ -171,7 +171,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { ...@@ -171,7 +171,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
for (String broker: brokersToMapIn) { for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker); String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail()); defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
} }
} }
...@@ -353,7 +353,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { ...@@ -353,7 +353,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
System.out.println("The old mapping data is written to file " + newMappingDataFile); System.out.println("The old mapping data is written to file " + newMappingDataFile);
} }
doRemapping(topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, clientMetadata, defaultMQAdminExt); doRemapping(topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, clientMetadata, defaultMQAdminExt, false);
} catch (Exception e) { } catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......
...@@ -127,7 +127,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -127,7 +127,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
String broker = entry.getKey(); String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker); String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue(); TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail()); defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
} }
} }
......