From 3e2c9202392c9a13259662023bbcb997c9770f7f Mon Sep 17 00:00:00 2001 From: dongeforever Date: Sat, 6 Nov 2021 17:39:57 +0800 Subject: [PATCH] Add update_static_topic code --- .../rocketmq/broker/BrokerController.java | 9 +++ .../processor/AdminBrokerProcessor.java | 75 +++++++++++-------- .../topic/TopicQueueMappingManager.java | 9 ++- .../route => }/LogicQueueMappingItem.java | 2 +- .../common/TopicConfigAndQueueMapping.java | 35 +++++++++ .../route => }/TopicQueueMappingInfo.java | 14 +++- .../rocketmq/common/protocol/RequestCode.java | 4 + .../protocol/body/TopicQueueMappingBody.java | 26 +++++++ .../TopicQueueMappingSerializeWrapper.java | 2 +- .../header/GetTopicConfigRequestHeader.java | 10 +++ 10 files changed, 149 insertions(+), 37 deletions(-) rename common/src/main/java/org/apache/rocketmq/common/{protocol/route => }/LogicQueueMappingItem.java (95%) create mode 100644 common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java rename common/src/main/java/org/apache/rocketmq/common/{protocol/route => }/TopicQueueMappingInfo.java (82%) create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 5eb91692..27cba022 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -80,6 +80,7 @@ import org.apache.rocketmq.broker.processor.SendMessageProcessor; import org.apache.rocketmq.broker.slave.SlaveSynchronize; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; +import org.apache.rocketmq.broker.topic.TopicQueueMappingManager; import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener; import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService; import org.apache.rocketmq.broker.transaction.TransactionalMessageService; @@ -179,6 +180,7 @@ public class BrokerController { private RemotingServer remotingServer; private RemotingServer fastRemotingServer; private TopicConfigManager topicConfigManager; + private TopicQueueMappingManager topicQueueMappingManager; private ExecutorService sendMessageExecutor; private ExecutorService pullMessageExecutor; private ExecutorService ackMessageExecutor; @@ -215,6 +217,7 @@ public class BrokerController { this.messageStoreConfig = messageStoreConfig; this.consumerOffsetManager = new ConsumerOffsetManager(this); this.topicConfigManager = new TopicConfigManager(this); + this.topicQueueMappingManager = new TopicQueueMappingManager(this); this.pullMessageProcessor = new PullMessageProcessor(this); this.pullRequestHoldService = new PullRequestHoldService(this); this.popMessageProcessor = new PopMessageProcessor(this); @@ -287,6 +290,8 @@ public class BrokerController { public boolean initialize() throws CloneNotSupportedException { boolean result = this.topicConfigManager.load(); + result = result && this.topicQueueMappingManager.load(); + result = result && this.consumerOffsetManager.load(); result = result && this.subscriptionGroupManager.load(); result = result && this.consumerFilterManager.load(); @@ -1184,6 +1189,10 @@ public class BrokerController { this.topicConfigManager = topicConfigManager; } + public TopicQueueMappingManager getTopicQueueMappingManager() { + return topicQueueMappingManager; + } + public String getHAServerAddr() { return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort(); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index e7b79495..99c70312 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -54,13 +54,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; -import org.apache.rocketmq.common.AclConfig; -import org.apache.rocketmq.common.MQVersion; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.PlainAccessConfig; -import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.TopicQueueId; -import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.*; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.admin.TopicOffset; @@ -74,28 +68,7 @@ import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.BrokerStatsData; -import org.apache.rocketmq.common.protocol.body.BrokerStatsItem; -import org.apache.rocketmq.common.protocol.body.Connection; -import org.apache.rocketmq.common.protocol.body.ConsumeQueueData; -import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; -import org.apache.rocketmq.common.protocol.body.ConsumerConnection; -import org.apache.rocketmq.common.protocol.body.CreateMessageQueueForLogicalQueueRequestBody; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.body.KVTable; -import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody; -import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody; -import org.apache.rocketmq.common.protocol.body.MigrateLogicalQueueBody; -import org.apache.rocketmq.common.protocol.body.ProducerConnection; -import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; -import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody; -import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody; -import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; -import org.apache.rocketmq.common.protocol.body.ReuseTopicLogicalQueueRequestBody; -import org.apache.rocketmq.common.protocol.body.SealTopicLogicalQueueRequestBody; -import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; -import org.apache.rocketmq.common.protocol.body.UpdateTopicLogicalQueueMappingRequestBody; +import org.apache.rocketmq.common.protocol.body.*; import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader; import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader; @@ -282,6 +255,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements return migrateTopicLogicalQueueCommit(ctx, request); case RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY: return migrateTopicLogicalQueueNotify(ctx, request); + case RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC: + return this.updateAndCreateStaticTopic(ctx, request); default: return getUnknownCmdResponse(ctx, request); } @@ -323,6 +298,42 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements return response; } + private synchronized RemotingCommand updateAndCreateStaticTopic(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final CreateTopicRequestHeader requestHeader = + (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class); + log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + final TopicQueueMappingBody topicQueueMappingBody = RemotingSerializable.decode(request.getBody(), TopicQueueMappingBody.class); + + String topic = requestHeader.getTopic(); + + if (!TopicValidator.validateTopic(topic, response)) { + return response; + } + if (TopicValidator.isSystemTopic(topic, response)) { + return response; + } + + TopicConfig topicConfig = new TopicConfig(topic); + topicConfig.setReadQueueNums(requestHeader.getReadQueueNums()); + topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums()); + topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum()); + topicConfig.setPerm(requestHeader.getPerm()); + topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag()); + + this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); + + this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody); + + this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); + + response.setCode(ResponseCode.SUCCESS); + return response; + } + + private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); @@ -1715,7 +1726,11 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements response.setRemark("No topic in this broker. topic: " + requestHeader.getTopic()); return response; } - String content = JSONObject.toJSONString(topicConfig); + TopicQueueMappingInfo topicQueueMappingInfo = null; + if (Boolean.TRUE.equals(requestHeader.getWithMapping())) { + topicQueueMappingInfo = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic()); + } + String content = JSONObject.toJSONString(new TopicConfigAndQueueMapping(topicConfig, topicQueueMappingInfo)); try { response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index 9ee0f514..c885b31d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -23,7 +23,7 @@ import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper; -import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo; +import org.apache.rocketmq.common.TopicQueueMappingInfo; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -45,7 +45,14 @@ public class TopicQueueMappingManager extends ConfigManager { public TopicQueueMappingManager(BrokerController brokerController) { this.brokerController = brokerController; + } + + public void updateTopicQueueMapping(TopicQueueMappingInfo topicQueueMappingInfo) { + topicQueueMappingTable.put(topicQueueMappingInfo.getTopic(), topicQueueMappingInfo); + } + public TopicQueueMappingInfo getTopicQueueMapping(String topic) { + return topicQueueMappingTable.get(topic); } @Override diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java similarity index 95% rename from common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java rename to common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java index fc5cbe62..50d88aeb 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java @@ -1,4 +1,4 @@ -package org.apache.rocketmq.common.protocol.route; +package org.apache.rocketmq.common; public class LogicQueueMappingItem { diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java new file mode 100644 index 00000000..f9a6ab4b --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +public class TopicConfigAndQueueMapping extends TopicConfig { + private TopicConfig topicConfig; + private TopicQueueMappingInfo topicQueueMappingInfo; + + public TopicConfigAndQueueMapping(TopicConfig topicConfig, TopicQueueMappingInfo topicQueueMappingInfo) { + this.topicConfig = topicConfig; + this.topicQueueMappingInfo = topicQueueMappingInfo; + } + + public TopicQueueMappingInfo getTopicQueueMappingInfo() { + return topicQueueMappingInfo; + } + + public TopicConfig getTopicConfig() { + return topicConfig; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java similarity index 82% rename from common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java rename to common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java index 03769650..0956a99c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java @@ -14,21 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.common.protocol.route; +package org.apache.rocketmq.common; + +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import java.util.HashMap; import java.util.List; import java.util.Map; -public class TopicQueueMappingInfo { +public class TopicQueueMappingInfo extends RemotingSerializable { + private String topic; // redundant field private int totalQueues; private String bname; //identify the host name //the newest mapping is in current broker private Map> hostedQueues = new HashMap>(); - public TopicQueueMappingInfo(int totalQueues, String bname) { + public TopicQueueMappingInfo(String topic, int totalQueues, String bname) { + this.topic = topic; this.totalQueues = totalQueues; this.bname = bname; } @@ -57,5 +61,7 @@ public class TopicQueueMappingInfo { return bname; } - + public String getTopic() { + return topic; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index 04f126b2..f724695d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -209,4 +209,8 @@ public class RequestCode { public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_PREPARE = 417; public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_COMMIT = 418; public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY = 419; + + + public static final int UPDATE_AND_CREATE_STATIC_TOPIC = 513; + } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java new file mode 100644 index 00000000..4caba894 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.common.TopicQueueMappingInfo; + +public class TopicQueueMappingBody extends TopicQueueMappingInfo { + + public TopicQueueMappingBody(String topic, int totalQueues, String bname) { + super(topic, totalQueues, bname); + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java index ef3f7581..1d3d6c52 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java @@ -18,7 +18,7 @@ package org.apache.rocketmq.common.protocol.body; import org.apache.rocketmq.common.DataVersion; -import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo; +import org.apache.rocketmq.common.TopicQueueMappingInfo; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import java.util.Map; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java index ea9d17c3..2b5d0409 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java @@ -29,6 +29,8 @@ public class GetTopicConfigRequestHeader implements CommandCustomHeader { @CFNotNull private String topic; + private Boolean withMapping; + /** * @return the topic */ @@ -42,4 +44,12 @@ public class GetTopicConfigRequestHeader implements CommandCustomHeader { public void setTopic(String topic) { this.topic = topic; } + + public Boolean getWithMapping() { + return withMapping; + } + + public void setWithMapping(Boolean withMapping) { + this.withMapping = withMapping; + } } \ No newline at end of file -- GitLab