提交 215c0e4d 编写于 作者: D dongeforever

Add the adminExt

上级 86c9f100
......@@ -329,7 +329,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody);
this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody.getMappingDetail());
this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
......
......@@ -28,6 +28,7 @@ import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
......@@ -79,6 +80,24 @@ public class MQAdminImpl {
this.timeoutMillis = timeoutMillis;
}
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws MQClientException {
MQClientException exception = null;
for (int i = 0; i < 3; i++) {
try {
this.mQClientFactory.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, timeoutMillis);
break;
} catch (Exception e) {
if (2 == i) {
exception = new MQClientException("create topic to broker exception", e);
}
}
}
if (exception != null) {
throw exception;
}
}
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
}
......
......@@ -57,6 +57,8 @@ import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
......@@ -2704,22 +2706,53 @@ public class MQClientAPIImpl {
migrateTopicLogicalQueue(RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY, brokerAddr, fromQueueRouteData, toQueueRouteData, timeoutMillis);
}
public TopicConfig getTopicConfig(final String brokerAddr, String topic,
public TopicConfigAndQueueMapping getTopicConfig(final String brokerAddr, String topic,
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic);
header.setWithMapping(true);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, header);
RemotingCommand response = this.remotingClient
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return RemotingSerializable.decode(response.getBody(), TopicConfig.class);
return RemotingSerializable.decode(response.getBody(), TopicConfigAndQueueMapping.class);
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
requestHeader.setPerm(topicConfig.getPerm());
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
requestHeader.setOrder(topicConfig.isOrder());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC, requestHeader);
request.setBody(topicQueueMappingDetail.encode());
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
}
......@@ -16,20 +16,19 @@
*/
package org.apache.rocketmq.common;
public class TopicConfigAndQueueMapping {
private TopicConfig topicConfig;
public class TopicConfigAndQueueMapping extends TopicConfig {
private TopicQueueMappingDetail topicQueueMappingDetail;
public TopicConfigAndQueueMapping() {
}
public TopicConfigAndQueueMapping(TopicConfig topicConfig, TopicQueueMappingDetail topicQueueMappingDetail) {
this.topicConfig = topicConfig;
super(topicConfig);
this.topicQueueMappingDetail = topicQueueMappingDetail;
}
public TopicQueueMappingDetail getTopicQueueMappingInfo() {
return topicQueueMappingDetail;
}
public TopicConfig getTopicConfig() {
return topicConfig;
}
}
......@@ -29,8 +29,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
// make sure this value is not null
private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
public TopicQueueMappingDetail(String topic, int totalQueues, String bname) {
super(topic, totalQueues, bname);
public TopicQueueMappingDetail(String topic, int totalQueues, String bname, int gen) {
super(topic, totalQueues, bname, gen);
buildIdMap();
}
......@@ -118,7 +118,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
public TopicQueueMappingInfo cloneAsMappingInfo() {
TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname);
TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.gen);
topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1);
......
......@@ -28,15 +28,17 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
String topic; // redundant field
int totalQueues;
String bname; //identify the hosted broker name
int gen; //important to fence the old dirty data
//register to broker to construct the route
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
//register to broker to help detect remapping failure
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
public TopicQueueMappingInfo(String topic, int totalQueues, String bname) {
public TopicQueueMappingInfo(String topic, int totalQueues, String bname, int gen) {
this.topic = topic;
this.totalQueues = totalQueues;
this.bname = bname;
this.gen = gen;
}
......@@ -56,6 +58,10 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
return topic;
}
public int getGen() {
return gen;
}
public ConcurrentMap<Integer, Integer> getCurrIdMap() {
return currIdMap;
}
......
......@@ -17,10 +17,35 @@
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class TopicQueueMappingBody extends TopicQueueMappingDetail {
public class TopicQueueMappingBody extends RemotingSerializable {
public TopicQueueMappingBody(String topic, int totalQueues, String bname) {
super(topic, totalQueues, bname);
private boolean force;
private int prevGen;
private TopicQueueMappingDetail mappingDetail;
public int getPrevGen() {
return prevGen;
}
public void setPrevGen(int prevGen) {
this.prevGen = prevGen;
}
public TopicQueueMappingDetail getMappingDetail() {
return mappingDetail;
}
public void setMappingDetail(TopicQueueMappingDetail mappingDetail) {
this.mappingDetail = mappingDetail;
}
public boolean isForce() {
return force;
}
public void setForce(boolean force) {
this.force = force;
}
}
......@@ -28,6 +28,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
......@@ -653,6 +654,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
return this.defaultMQAdminExtImpl.migrateTopicLogicalQueueCommit(fromQueueRouteData, toQueueRouteData);
}
@Override
public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail);
}
@Override public void migrateTopicLogicalQueueNotify(String brokerAddr,
LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
......
......@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
......@@ -1087,6 +1088,11 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
this.mqClientInstance.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
}
@Override
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws MQClientException {
this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail);
}
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
......
......@@ -27,6 +27,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
......@@ -332,7 +333,10 @@ public interface MQAdminExt extends MQAdmin {
LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
void migrateTopicLogicalQueueNotify(String brokerAddr,
LogicalQueueRouteData fromQueueRouteData,
void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
void migrateTopicLogicalQueueNotify(String brokerAddr, LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.topic;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand;
import java.util.Set;
public class UpdateStaticTopicSubCommand implements SubCommand {
@Override
public String commandName() {
return "updateStaticTopic";
}
@Override
public String commandDesc() {
return "Update or create static topic, which has fixed number of queues";
}
@Override
public Options buildCommandlineOptions(Options options) {
OptionGroup optionGroup = new OptionGroup();
Option opt = null;
opt = new Option("c", "clusterName", true, "create topic to which cluster");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
options.addOptionGroup(optionGroup);
opt = new Option("t", "topic", true, "topic name");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("qn", "totalQueueNum", true, "total queue num");
opt.setRequired(true);
options.addOption(opt);
return options;
}
@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setReadQueueNums(8);
topicConfig.setWriteQueueNums(8);
topicConfig.setTopicName(commandLine.getOptionValue('t').trim());
// readQueueNums
if (commandLine.hasOption('r')) {
topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));
}
// writeQueueNums
if (commandLine.hasOption('w')) {
topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim()));
}
// perm
if (commandLine.hasOption('p')) {
topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim()));
}
boolean isUnit = false;
if (commandLine.hasOption('u')) {
isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim());
}
boolean isCenterSync = false;
if (commandLine.hasOption('s')) {
isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim());
}
int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync);
topicConfig.setTopicSysFlag(topicCenterSync);
boolean isOrder = false;
if (commandLine.hasOption('o')) {
isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim());
}
topicConfig.setOrder(isOrder);
boolean useLogicalQueue = false;
if (commandLine.hasOption("lq")) {
useLogicalQueue = Boolean.parseBoolean(commandLine.getOptionValue("lq").trim());
}
if (commandLine.hasOption('b')) {
if (useLogicalQueue) {
System.out.printf("-lq and -b can not be used together.%n");
return;
}
String addr = commandLine.getOptionValue('b').trim();
defaultMQAdminExt.start();
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
if (isOrder) {
String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr);
String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums();
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false);
System.out.printf("%s", String.format("set broker orderConf. isOrder=%s, orderConf=[%s]",
isOrder, orderConf.toString()));
}
System.out.printf("create topic to %s success.%n", addr);
System.out.printf("%s", topicConfig);
return;
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
defaultMQAdminExt.start();
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
System.out.printf("create topic to %s success.%n", addr);
}
if (isOrder) {
Set<String> brokerNameSet =
CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
StringBuilder orderConf = new StringBuilder();
String splitor = "";
for (String s : brokerNameSet) {
orderConf.append(splitor).append(s).append(":")
.append(topicConfig.getWriteQueueNums());
splitor = ";";
}
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
orderConf.toString(), true);
System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf);
}
System.out.printf("%s", topicConfig);
if (useLogicalQueue) {
new UpdateTopicLogicalQueueMappingCommand().execute(defaultMQAdminExt, topicConfig.getTopicName(), masterSet);
}
return;
}
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册