提交 256912a5 编写于 作者: D duhenglucky

Add snode route info obtain logic

上级 032ea2ee
......@@ -174,4 +174,10 @@ public class RequestCode {
public static final int SNODE_PUSH_MESSAGE = 352;
public static final int GET_SNODE_CLUSTER_INFO = 353;
public static final int GET_SNODE_INFO = 354;
}
......@@ -24,11 +24,14 @@ import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.SnodeData;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ClusterInfo extends RemotingSerializable {
private HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private HashMap<String/* snodeName*/, SnodeData> snodeTable;
private HashMap<String/* clusterName*/, Set<String/*snodeName*/>> snodeCluster;
public HashMap<String, BrokerData> getBrokerAddrTable() {
return brokerAddrTable;
......@@ -61,6 +64,23 @@ public class ClusterInfo extends RemotingSerializable {
return addrs.toArray(new String[] {});
}
public HashMap<String, SnodeData> getSnodeTable() {
return snodeTable;
}
public void setSnodeTable(
HashMap<String, SnodeData> snodeTable) {
this.snodeTable = snodeTable;
}
public HashMap<String, Set<String>> getSnodeCluster() {
return snodeCluster;
}
public void setSnodeCluster(HashMap<String, Set<String>> snodeCluster) {
this.snodeCluster = snodeCluster;
}
public String[] retrieveAllMasterAddrByCluster(String cluster) {
List<String> addrs = new ArrayList<String>();
if (clusterAddrTable.containsKey(cluster)) {
......
package org.apache.rocketmq.common.protocol.body;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.HashMap;
import java.util.Set;
import org.apache.rocketmq.common.protocol.route.SnodeData;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class SnodeClusterInfo extends RemotingSerializable {
private HashMap<String/* snodeName*/, SnodeData> snodeTable;
private HashMap<String/* clusterName*/, Set<String/*snodeName*/>> snodeCluster;
public HashMap<String, SnodeData> getSnodeTable() {
return snodeTable;
}
public void setSnodeTable(
HashMap<String, SnodeData> snodeTable) {
this.snodeTable = snodeTable;
}
public HashMap<String, Set<String>> getSnodeCluster() {
return snodeCluster;
}
public void setSnodeCluster(HashMap<String, Set<String>> snodeCluster) {
this.snodeCluster = snodeCluster;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.namesrv;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetSnodeInfoHeader implements CommandCustomHeader {
private String snodeClusterName;
@Override
public void checkFields() throws RemotingCommandException {
}
public String getSnodeClusterName() {
return snodeClusterName;
}
public void setSnodeClusterName(String snodeClusterName) {
this.snodeClusterName = snodeClusterName;
}
}
......@@ -27,9 +27,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -43,20 +40,24 @@ import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHead
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVListByNamespaceRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetSnodeInfoHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......@@ -128,12 +129,38 @@ public class DefaultRequestProcessor implements RequestProcessor {
return this.getConfig(ctx, request);
case RequestCode.REGISTER_SNODE:
return this.registerSnode(ctx, request);
case RequestCode.GET_SNODE_CLUSTER_INFO:
return this.getSnodeClusterInfo(ctx, request);
case RequestCode.GET_SNODE_INFO:
return getSnodeInfo(ctx, request);
default:
break;
}
return null;
}
public RemotingCommand getSnodeClusterInfo(ChannelHandlerContext ctx,
RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
byte[] content = this.namesrvController.getRouteInfoManager().getAllSnodeData();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
public RemotingCommand getSnodeInfo(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final GetSnodeInfoHeader requestHeader =
(GetSnodeInfoHeader) request.decodeCommandCustomHeader(GetSnodeInfoHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
byte[] content = this.namesrvController.getRouteInfoManager().getSnodeDatabyClusterName(requestHeader.getSnodeClusterName());
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
public RemotingCommand registerSnode(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
......
......@@ -33,17 +33,18 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.route.SnodeData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.SnodeClusterInfo;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.SnodeData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
public class RouteInfoManager {
......@@ -72,9 +73,30 @@ public class RouteInfoManager {
ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();
clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);
clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);
clusterInfoSerializeWrapper.setSnodeCluster(this.snodeCluster);
clusterInfoSerializeWrapper.setSnodeTable(this.snodeTable);
return clusterInfoSerializeWrapper.encode();
}
public byte[] getSnodeDatabyClusterName(String clusterName) {
SnodeClusterInfo snodeClusterInfo = new SnodeClusterInfo();
Set<String> snodeNameSet = this.snodeCluster.get(clusterName);
HashMap<String, SnodeData> snodeDatas = new HashMap<>();
for (String snodeName : snodeNameSet) {
SnodeData snodeData = this.snodeTable.get(snodeName);
snodeDatas.putIfAbsent(clusterName, snodeData);
}
snodeClusterInfo.setSnodeTable(snodeDatas);
return snodeClusterInfo.encode();
}
public byte[] getAllSnodeData() {
SnodeClusterInfo snodeClusterInfo = new SnodeClusterInfo();
snodeClusterInfo.setSnodeCluster(this.snodeCluster);
snodeClusterInfo.setSnodeTable(snodeTable);
return snodeClusterInfo.encode();
}
public void deleteTopic(final String topic) {
try {
try {
......
......@@ -32,9 +32,9 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.config.SnodeConfig;
......@@ -58,7 +58,7 @@ public class SnodeStartup {
controller.start();
String tip = "The snode[" + controller.getSnodeConfig().getSnodeName() + ", "
+ controller.getSnodeConfig().getSnodeAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+ controller.getSnodeConfig().getSnodeIP1() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
if (null != controller.getSnodeConfig().getNamesrvAddr()) {
tip += " and name server is " + controller.getSnodeConfig().getNamesrvAddr();
......
......@@ -16,11 +16,14 @@
*/
package org.apache.rocketmq.snode.config;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY;
......@@ -33,9 +36,16 @@ public class SnodeConfig {
@ImportantField
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
private String snodeName = "defaultNode";
@ImportantField
private String snodeIP1 = RemotingUtil.getLocalAddress();
private String snodeIP2 = RemotingUtil.getLocalAddress();
@ImportantField
private String snodeName = localHostName();
private String snodeAddr = "127.0.0.1:11911";
@ImportantField
private long snodeId = MixAll.MASTER_ID;
private String clusterName = "defaultCluster";
......@@ -135,6 +145,16 @@ public class SnodeConfig {
@ImportantField
private boolean fetchNamesrvAddrByAddressServer = false;
public static String localHostName() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
log.error("Failed to obtain the host name", e);
}
return "DEFAULT_SNODE";
}
public boolean isFetchNamesrvAddrByAddressServer() {
return fetchNamesrvAddrByAddressServer;
}
......@@ -207,12 +227,28 @@ public class SnodeConfig {
this.snodeSendMessageMaxPoolSize = snodeSendMessageMaxPoolSize;
}
public String getSnodeAddr() {
return snodeAddr;
public String getSnodeIP1() {
return snodeIP1;
}
public void setSnodeIP1(String snodeIP1) {
this.snodeIP1 = snodeIP1;
}
public String getSnodeIP2() {
return snodeIP2;
}
public void setSnodeIP2(String snodeIP2) {
this.snodeIP2 = snodeIP2;
}
public long getSnodeId() {
return snodeId;
}
public void setSnodeAddr(String snodeAddr) {
this.snodeAddr = snodeAddr;
public void setSnodeId(long snodeId) {
this.snodeId = snodeId;
}
public String getSnodeName() {
......
......@@ -53,12 +53,16 @@ public class NnodeServiceImpl implements NnodeService {
this.snodeController = snodeController;
}
public String getSnodeAddress() {
return this.snodeController.getSnodeConfig().getSnodeIP1() + ":" + this.snodeController.getSnodeConfig().getListenPort();
}
@Override
public void registerSnode(SnodeConfig snodeConfig) {
List<String> nnodeAddressList = this.snodeController.getRemotingClient().getNameServerAddressList();
RemotingCommand remotingCommand = new RemotingCommand();
RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader();
requestHeader.setSnodeAddr(snodeConfig.getSnodeAddr());
requestHeader.setSnodeAddr(getSnodeAddress());
requestHeader.setSnodeName(snodeConfig.getSnodeName());
requestHeader.setClusterName(snodeConfig.getClusterName());
remotingCommand.setCustomHeader(requestHeader);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册