提交 13641c00 编写于 作者: Z ZQKC 提交者: EricZeng

[Bugfix]修复Broker元信息解析方法未调用导致接入集群失败的问题(#986)

上级 769c2c0f
......@@ -7,6 +7,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.topic.TopicTypeEnum;
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
......@@ -78,7 +79,7 @@ public class KafkaZKDAOImpl implements KafkaZKDAO {
try {
BrokerMetadata metadata = this.getData(kafkaZkClient.currentZooKeeper(), BrokerIdZNode.path(brokerId), false, BrokerMetadata.class);
BrokerMetadata.parseAndUpdateBrokerMetadata(metadata);
return this.convert2Broker(clusterPhyId, brokerId, metadata);
} catch (KeeperException ke) {
logger.error("method=getBrokerMetadata||clusterPhyId={}||brokerId={}||errMsg=exception", clusterPhyId, brokerId, ke);
......@@ -279,7 +280,7 @@ public class KafkaZKDAOImpl implements KafkaZKDAO {
metadata.setJmxPort(brokerMetadata.getJmxPort());
metadata.setStartTimestamp(brokerMetadata.getTimestamp());
metadata.setRack(brokerMetadata.getRack());
metadata.setStatus(1);
metadata.setStatus(Constant.ALIVE);
metadata.setEndpointMap(brokerMetadata.getEndpointMap());
return metadata;
}
......
package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import lombok.Data;
import java.io.Serializable;
import java.util.HashMap;
......@@ -51,7 +50,6 @@ import java.util.Map;
* }
*
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class BrokerMetadata implements Serializable {
private static final long serialVersionUID = 3918113492423375809L;
......@@ -74,34 +72,92 @@ public class BrokerMetadata implements Serializable {
private String rack;
@JsonIgnore
public String getExternalHost() {
if (!endpointMap.containsKey(KafkaConstant.EXTERNAL_KEY)) {
// external如果不存在,就返回host
return host;
public List<String> getEndpoints() {
return endpoints;
}
public void setEndpoints(List<String> endpoints) {
this.endpoints = endpoints;
}
public Map<String, IpPortData> getEndpointMap() {
if (endpointMap == null) {
this.parseBrokerMetadata();
}
return endpointMap.get(KafkaConstant.EXTERNAL_KEY).getIp();
return endpointMap;
}
public String getHost() {
if (endpointMap == null) {
this.parseBrokerMetadata();
}
return host;
}
public void setHost(String host) {
this.host = host;
}
@JsonIgnore
public String getInternalHost() {
if (!endpointMap.containsKey(KafkaConstant.INTERNAL_KEY)) {
// internal如果不存在,就返回host
return host;
public Integer getPort() {
if (endpointMap == null) {
this.parseBrokerMetadata();
}
return endpointMap.get(KafkaConstant.INTERNAL_KEY).getIp();
return port;
}
public static void parseAndUpdateBrokerMetadata(BrokerMetadata brokerMetadata) {
brokerMetadata.setEndpointMap(new HashMap<>());
public void setPort(Integer port) {
this.port = port;
}
public Integer getJmxPort() {
return jmxPort;
}
public void setJmxPort(Integer jmxPort) {
this.jmxPort = jmxPort;
}
public Integer getVersion() {
return version;
}
if (brokerMetadata.getEndpoints().isEmpty()) {
public void setVersion(Integer version) {
this.version = version;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public String getRack() {
return rack;
}
public void setRack(String rack) {
this.rack = rack;
}
private synchronized void parseBrokerMetadata() {
if (this.endpointMap != null) {
return;
}
if (this.endpoints == null || this.endpoints.isEmpty()) {
this.endpointMap = new HashMap<>(0);
return;
}
Map<String, IpPortData> tempEndpointMap = new HashMap<>();
// example EXTERNAL://10.179.162.202:7092
for (String endpoint: brokerMetadata.getEndpoints()) {
for (String endpoint: this.endpoints) {
int idx1 = endpoint.indexOf("://");
int idx2 = endpoint.lastIndexOf(":");
if (idx1 == -1 || idx2 == -1 || idx1 == idx2) {
......@@ -111,19 +167,37 @@ public class BrokerMetadata implements Serializable {
String brokerHost = endpoint.substring(idx1 + "://".length(), idx2);
String brokerPort = endpoint.substring(idx2 + 1);
brokerMetadata.getEndpointMap().put(endpoint.substring(0, idx1), new IpPortData(brokerHost, brokerPort));
tempEndpointMap.put(endpoint.substring(0, idx1), new IpPortData(brokerHost, brokerPort));
if (KafkaConstant.INTERNAL_KEY.equals(endpoint.substring(0, idx1))) {
// 优先使用internal的地址进行展示
brokerMetadata.setHost(brokerHost);
brokerMetadata.setPort(ConvertUtil.string2Integer(brokerPort));
this.host = brokerHost;
this.port = ConvertUtil.string2Integer(brokerPort);
}
if (null == brokerMetadata.getHost()) {
brokerMetadata.setHost(brokerHost);
brokerMetadata.setPort(ConvertUtil.string2Integer(brokerPort));
if (null == this.host) {
this.host = brokerHost;
this.port = ConvertUtil.string2Integer(brokerPort);
}
}
this.endpointMap = tempEndpointMap;
}
public static void main(String[] args) {
String str = "{\t\n" +
"\t\"listener_security_protocol_map\":{\"EXTERNAL\":\"SASL_PLAINTEXT\",\"INTERNAL\":\"SASL_PLAINTEXT\"},\n" +
"\t\"endpoints\":[\"EXTERNAL://10.179.162.202:7092\",\"INTERNAL://10.179.162.202:7093\"],\n" +
"\t\"jmx_port\":8099,\n" +
"\t\"host\":null,\n" +
"\t\"timestamp\":\"1627289710439\",\n" +
"\t\"port\":-1,\n" +
"\t\"version\":4\n" +
"}";
BrokerMetadata bm = JSON.parseObject(str, BrokerMetadata.class);
System.out.println(bm.getHost());
System.out.println(JSON.toJSON(bm));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册