提交 d18d787f 编写于 作者: D dongeforever

Add the check logic for admin process update-and-create static topic

上级 4e9b0974
......@@ -323,6 +323,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (TopicValidator.isSystemTopic(topic, response)) {
return response;
}
boolean force = false;
if (requestHeader.getForce() != null && requestHeader.getForce()) {
force = true;
}
TopicConfig topicConfig = new TopicConfig(topic);
topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
......@@ -331,13 +335,18 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
topicConfig.setPerm(requestHeader.getPerm());
topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody.getMappingDetail());
try {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody.getMappingDetail(), force);
response.setCode(ResponseCode.SUCCESS);
this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
response.setCode(ResponseCode.SUCCESS);
} catch (Exception e) {
log.error("Update static failed for [{}]", request, e);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(e.getMessage());
}
return response;
}
......
......@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
......@@ -56,8 +57,47 @@ public class TopicQueueMappingManager extends ConfigManager {
this.brokerController = brokerController;
}
public void updateTopicQueueMapping(TopicQueueMappingDetail topicQueueMappingDetail) {
topicQueueMappingTable.put(topicQueueMappingDetail.getTopic(), topicQueueMappingDetail);
public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) {
lock.lock();
try {
if (newDetail == null) {
return;
}
newDetail.getHostedQueues().forEach((queueId, items) -> {
TopicQueueMappingUtils.checkLogicQueueMappingItemOffset(items);
});
TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic());
if (oldDetail == null) {
topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
return;
}
if (force) {
oldDetail.getHostedQueues().forEach( (queueId, items) -> {
newDetail.getHostedQueues().putIfAbsent(queueId, items);
});
topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
return;
}
//do more check
if (newDetail.getEpoch() <= oldDetail.getEpoch()) {
throw new RuntimeException(String.format("Can't accept data with small epoch %d < %d", newDetail.getEpoch(), oldDetail.getEpoch()));
}
for (Integer globalId : oldDetail.getHostedQueues().keySet()) {
ImmutableList<LogicQueueMappingItem> oldItems = oldDetail.getHostedQueues().get(globalId);
ImmutableList<LogicQueueMappingItem> newItems = newDetail.getHostedQueues().get(globalId);
if (newItems == null) {
//keep the old
newDetail.getHostedQueues().put(globalId, oldItems);
} else {
TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems);
}
}
topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
} finally {
lock.unlock();
}
}
public TopicQueueMappingDetail getTopicQueueMapping(String topic) {
......
......@@ -80,11 +80,11 @@ public class MQAdminImpl {
this.timeoutMillis = timeoutMillis;
}
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws MQClientException {
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, boolean force) throws MQClientException {
MQClientException exception = null;
for (int i = 0; i < 3; i++) {
try {
this.mQClientFactory.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, timeoutMillis);
this.mQClientFactory.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force, timeoutMillis);
break;
} catch (Exception e) {
if (2 == i) {
......
......@@ -2726,7 +2726,7 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail,
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail, boolean force,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
......@@ -2738,7 +2738,7 @@ public class MQClientAPIImpl {
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
requestHeader.setOrder(topicConfig.isOrder());
requestHeader.setForce(force);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC, requestHeader);
request.setBody(topicQueueMappingDetail.encode());
......
......@@ -23,6 +23,7 @@ package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class CreateTopicRequestHeader implements CommandCustomHeader {
......@@ -42,6 +43,9 @@ public class CreateTopicRequestHeader implements CommandCustomHeader {
@CFNotNull
private Boolean order = false;
@CFNullable
private Boolean force = false;
@Override
public void checkFields() throws RemotingCommandException {
try {
......@@ -118,4 +122,12 @@ public class CreateTopicRequestHeader implements CommandCustomHeader {
public void setOrder(Boolean order) {
this.order = order;
}
public Boolean getForce() {
return force;
}
public void setForce(Boolean force) {
this.force = force;
}
}
......@@ -162,7 +162,7 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
}
public static void checkLogicQueueMappingItemOffset(ImmutableList<LogicQueueMappingItem> oldItems, ImmutableList<LogicQueueMappingItem> newItems) {
public static void makeSureLogicQueueMappingItemImmutable(ImmutableList<LogicQueueMappingItem> oldItems, ImmutableList<LogicQueueMappingItem> newItems) {
if (oldItems == null || oldItems.isEmpty()) {
return;
}
......
......@@ -661,8 +661,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail);
public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
}
@Override public void migrateTopicLogicalQueueNotify(String brokerAddr,
......
......@@ -1096,8 +1096,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws MQClientException {
this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail);
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws MQClientException {
this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
}
@Override
......
......@@ -337,7 +337,7 @@ public interface MQAdminExt extends MQAdmin {
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws RemotingException, MQBrokerException,
void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
void migrateTopicLogicalQueueNotify(String brokerAddr, LogicalQueueRouteData fromQueueRouteData,
......
......@@ -111,7 +111,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt);
doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, false);
return;
}catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......@@ -123,19 +123,19 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
public void doRemapping(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap,
ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step2: forbid the write of old leader
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step3: decide the logic offset
for (String broker: brokersToMapOut) {
......@@ -171,7 +171,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
}
}
......@@ -353,7 +353,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
System.out.println("The old mapping data is written to file " + newMappingDataFile);
}
doRemapping(topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, clientMetadata, defaultMQAdminExt);
doRemapping(topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, clientMetadata, defaultMQAdminExt, false);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......
......@@ -127,7 +127,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册