提交 3e2c9202 编写于 作者: D dongeforever

Add update_static_topic code

上级 e862ac88
......@@ -80,6 +80,7 @@ import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
......@@ -179,6 +180,7 @@ public class BrokerController {
private RemotingServer remotingServer;
private RemotingServer fastRemotingServer;
private TopicConfigManager topicConfigManager;
private TopicQueueMappingManager topicQueueMappingManager;
private ExecutorService sendMessageExecutor;
private ExecutorService pullMessageExecutor;
private ExecutorService ackMessageExecutor;
......@@ -215,6 +217,7 @@ public class BrokerController {
this.messageStoreConfig = messageStoreConfig;
this.consumerOffsetManager = new ConsumerOffsetManager(this);
this.topicConfigManager = new TopicConfigManager(this);
this.topicQueueMappingManager = new TopicQueueMappingManager(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
this.pullRequestHoldService = new PullRequestHoldService(this);
this.popMessageProcessor = new PopMessageProcessor(this);
......@@ -287,6 +290,8 @@ public class BrokerController {
public boolean initialize() throws CloneNotSupportedException {
boolean result = this.topicConfigManager.load();
result = result && this.topicQueueMappingManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();
......@@ -1184,6 +1189,10 @@ public class BrokerController {
this.topicConfigManager = topicConfigManager;
}
public TopicQueueMappingManager getTopicQueueMappingManager() {
return topicQueueMappingManager;
}
public String getHAServerAddr() {
return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();
}
......
......@@ -54,13 +54,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicQueueId;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.*;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset;
......@@ -74,28 +68,7 @@ import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.BrokerStatsItem;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumeQueueData;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.CreateMessageQueueForLogicalQueueRequestBody;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
import org.apache.rocketmq.common.protocol.body.MigrateLogicalQueueBody;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.ReuseTopicLogicalQueueRequestBody;
import org.apache.rocketmq.common.protocol.body.SealTopicLogicalQueueRequestBody;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.common.protocol.body.UpdateTopicLogicalQueueMappingRequestBody;
import org.apache.rocketmq.common.protocol.body.*;
import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
......@@ -282,6 +255,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return migrateTopicLogicalQueueCommit(ctx, request);
case RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY:
return migrateTopicLogicalQueueNotify(ctx, request);
case RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC:
return this.updateAndCreateStaticTopic(ctx, request);
default:
return getUnknownCmdResponse(ctx, request);
}
......@@ -323,6 +298,42 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response;
}
private synchronized RemotingCommand updateAndCreateStaticTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final CreateTopicRequestHeader requestHeader =
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
final TopicQueueMappingBody topicQueueMappingBody = RemotingSerializable.decode(request.getBody(), TopicQueueMappingBody.class);
String topic = requestHeader.getTopic();
if (!TopicValidator.validateTopic(topic, response)) {
return response;
}
if (TopicValidator.isSystemTopic(topic, response)) {
return response;
}
TopicConfig topicConfig = new TopicConfig(topic);
topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
topicConfig.setPerm(requestHeader.getPerm());
topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody);
this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
response.setCode(ResponseCode.SUCCESS);
return response;
}
private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
......@@ -1715,7 +1726,11 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
response.setRemark("No topic in this broker. topic: " + requestHeader.getTopic());
return response;
}
String content = JSONObject.toJSONString(topicConfig);
TopicQueueMappingInfo topicQueueMappingInfo = null;
if (Boolean.TRUE.equals(requestHeader.getWithMapping())) {
topicQueueMappingInfo = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
}
String content = JSONObject.toJSONString(new TopicConfigAndQueueMapping(topicConfig, topicQueueMappingInfo));
try {
response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
......
......@@ -23,7 +23,7 @@ import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo;
import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
......@@ -45,7 +45,14 @@ public class TopicQueueMappingManager extends ConfigManager {
public TopicQueueMappingManager(BrokerController brokerController) {
this.brokerController = brokerController;
}
public void updateTopicQueueMapping(TopicQueueMappingInfo topicQueueMappingInfo) {
topicQueueMappingTable.put(topicQueueMappingInfo.getTopic(), topicQueueMappingInfo);
}
public TopicQueueMappingInfo getTopicQueueMapping(String topic) {
return topicQueueMappingTable.get(topic);
}
@Override
......
package org.apache.rocketmq.common.protocol.route;
package org.apache.rocketmq.common;
public class LogicQueueMappingItem {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;
public class TopicConfigAndQueueMapping extends TopicConfig {
private TopicConfig topicConfig;
private TopicQueueMappingInfo topicQueueMappingInfo;
public TopicConfigAndQueueMapping(TopicConfig topicConfig, TopicQueueMappingInfo topicQueueMappingInfo) {
this.topicConfig = topicConfig;
this.topicQueueMappingInfo = topicQueueMappingInfo;
}
public TopicQueueMappingInfo getTopicQueueMappingInfo() {
return topicQueueMappingInfo;
}
public TopicConfig getTopicConfig() {
return topicConfig;
}
}
......@@ -14,21 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.route;
package org.apache.rocketmq.common;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TopicQueueMappingInfo {
public class TopicQueueMappingInfo extends RemotingSerializable {
private String topic; // redundant field
private int totalQueues;
private String bname; //identify the host name
//the newest mapping is in current broker
private Map<Integer/*global id*/, List<LogicQueueMappingItem>> hostedQueues = new HashMap<Integer, List<LogicQueueMappingItem>>();
public TopicQueueMappingInfo(int totalQueues, String bname) {
public TopicQueueMappingInfo(String topic, int totalQueues, String bname) {
this.topic = topic;
this.totalQueues = totalQueues;
this.bname = bname;
}
......@@ -57,5 +61,7 @@ public class TopicQueueMappingInfo {
return bname;
}
public String getTopic() {
return topic;
}
}
......@@ -209,4 +209,8 @@ public class RequestCode {
public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_PREPARE = 417;
public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_COMMIT = 418;
public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY = 419;
public static final int UPDATE_AND_CREATE_STATIC_TOPIC = 513;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.TopicQueueMappingInfo;
public class TopicQueueMappingBody extends TopicQueueMappingInfo {
public TopicQueueMappingBody(String topic, int totalQueues, String bname) {
super(topic, totalQueues, bname);
}
}
......@@ -18,7 +18,7 @@
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo;
import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.Map;
......
......@@ -29,6 +29,8 @@ public class GetTopicConfigRequestHeader implements CommandCustomHeader {
@CFNotNull
private String topic;
private Boolean withMapping;
/**
* @return the topic
*/
......@@ -42,4 +44,12 @@ public class GetTopicConfigRequestHeader implements CommandCustomHeader {
public void setTopic(String topic) {
this.topic = topic;
}
public Boolean getWithMapping() {
return withMapping;
}
public void setWithMapping(Boolean withMapping) {
this.withMapping = withMapping;
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册