提交 4df2dc09 编写于 作者: X xuguang

增加对BrokerMetadata中endpoints为internal|External方式的解析

上级 c344fd8c
......@@ -109,5 +109,10 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -17,6 +17,10 @@ public class KafkaConstant {
public static final String RETENTION_MS_KEY = "retention.ms";
public static final String EXTERNAL_KEY = "EXTERNAL";
public static final String INTERNAL_KEY = "INTERNAL";
private KafkaConstant() {
}
}
\ No newline at end of file
package com.xiaojukeji.kafka.manager.common.entity.ao.common;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class IpPortData implements Serializable {
private static final long serialVersionUID = -428897032994630685L;
private String ip;
private String port;
}
package com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
import com.xiaojukeji.kafka.manager.common.entity.ao.common.IpPortData;
import com.xiaojukeji.kafka.manager.common.utils.NumberUtils;
import lombok.Data;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author zengqiao
......@@ -18,22 +29,48 @@ import java.util.List;
* "version":4,
* "rack": "CY"
* }
*
* {
* "listener_security_protocol_map":{"SASL_PLAINTEXT":"SASL_PLAINTEXT","PLAINTEXT":"PLAINTEXT"},
* "endpoints":["SASL_PLAINTEXT://10.179.162.202:9093","PLAINTEXT://10.179.162.202:9092"],
* "jmx_port":8099,
* "host":"10.179.162.202",
* "timestamp":"1628833925822",
* "port":9092,
* "version":4
* }
*
* {
* "listener_security_protocol_map":{"EXTERNAL":"SASL_PLAINTEXT","INTERNAL":"SASL_PLAINTEXT"},
* "endpoints":["EXTERNAL://10.179.162.202:7092","INTERNAL://10.179.162.202:7093"],
* "jmx_port":8099,
* "host":null,
* "timestamp":"1627289710439",
* "port":-1,
* "version":4
* }
*
*/
public class BrokerMetadata implements Cloneable {
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class BrokerMetadata implements Serializable {
private static final long serialVersionUID = 3918113492423375809L;
private long clusterId;
private int brokerId;
private List<String> endpoints;
// <EXTERNAL|INTERNAL, <ip, port>>
private Map<String, IpPortData> endpointMap;
private String host;
private int port;
/*
* ZK上对应的字段就是这个名字, 不要进行修改
*/
private int jmx_port;
@JsonProperty("jmx_port")
private int jmxPort;
private String version;
......@@ -41,91 +78,54 @@ public class BrokerMetadata implements Cloneable {
private String rack;
public long getClusterId() {
return clusterId;
}
public void setClusterId(long clusterId) {
this.clusterId = clusterId;
}
public int getBrokerId() {
return brokerId;
}
public void setBrokerId(int brokerId) {
this.brokerId = brokerId;
}
public List<String> getEndpoints() {
return endpoints;
}
public void setEndpoints(List<String> endpoints) {
this.endpoints = endpoints;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
@JsonIgnore
public String getExternalHost() {
if (!endpointMap.containsKey(KafkaConstant.EXTERNAL_KEY)) {
return null;
}
return endpointMap.get(KafkaConstant.EXTERNAL_KEY).getIp();
}
public int getJmxPort() {
return jmx_port;
@JsonIgnore
public String getInternalHost() {
if (!endpointMap.containsKey(KafkaConstant.INTERNAL_KEY)) {
return null;
}
return endpointMap.get(KafkaConstant.INTERNAL_KEY).getIp();
}
public void setJmxPort(int jmxPort) {
this.jmx_port = jmxPort;
}
public static void parseAndUpdateBrokerMetadata(BrokerMetadata brokerMetadata) {
brokerMetadata.setEndpointMap(new HashMap<>());
public String getVersion() {
return version;
}
if (brokerMetadata.getEndpoints().isEmpty()) {
return;
}
public void setVersion(String version) {
this.version = version;
}
// example EXTERNAL://10.179.162.202:7092
for (String endpoint: brokerMetadata.getEndpoints()) {
int idx1 = endpoint.indexOf("://");
int idx2 = endpoint.lastIndexOf(":");
if (idx1 == -1 || idx2 == -1 || idx1 == idx2) {
continue;
}
public long getTimestamp() {
return timestamp;
}
String brokerHost = endpoint.substring(idx1 + "://".length(), idx2);
String brokerPort = endpoint.substring(idx2 + 1);
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
brokerMetadata.getEndpointMap().put(endpoint.substring(0, idx1), new IpPortData(brokerHost, brokerPort));
public String getRack() {
return rack;
}
public void setRack(String rack) {
this.rack = rack;
}
if (KafkaConstant.EXTERNAL_KEY.equals(endpoint.substring(0, idx1))) {
// 优先使用external的地址进行展示
brokerMetadata.setHost(brokerHost);
brokerMetadata.setPort(NumberUtils.string2Integer(brokerPort));
}
@Override
public String toString() {
return "BrokerMetadata{" +
"clusterId=" + clusterId +
", brokerId=" + brokerId +
", endpoints=" + endpoints +
", host='" + host + '\'' +
", port=" + port +
", jmxPort=" + jmx_port +
", version='" + version + '\'' +
", timestamp=" + timestamp +
", rack='" + rack + '\'' +
'}';
if (null == brokerMetadata.getHost()) {
brokerMetadata.setHost(brokerHost);
brokerMetadata.setPort(NumberUtils.string2Integer(brokerPort));
}
}
}
}
......@@ -74,15 +74,10 @@ public class BrokerStateListener implements StateChangeListener {
BrokerMetadata brokerMetadata = null;
try {
brokerMetadata = zkConfig.get(ZkPathUtil.getBrokerIdNodePath(brokerId), BrokerMetadata.class);
if (!brokerMetadata.getEndpoints().isEmpty()) {
String endpoint = brokerMetadata.getEndpoints().get(0);
int idx = endpoint.indexOf("://");
endpoint = endpoint.substring(idx + "://".length());
idx = endpoint.indexOf(":");
brokerMetadata.setHost(endpoint.substring(0, idx));
brokerMetadata.setPort(Integer.parseInt(endpoint.substring(idx + 1)));
}
// 解析并更新本次存储的broker元信息
BrokerMetadata.parseAndUpdateBrokerMetadata(brokerMetadata);
brokerMetadata.setClusterId(clusterId);
brokerMetadata.setBrokerId(brokerId);
PhysicalClusterMetadataManager.putBrokerMetadata(clusterId, brokerId, brokerMetadata, jmxConfig);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册