From 13641c00ba5d0b65ecad407a119564752aaa0ae8 Mon Sep 17 00:00:00 2001 From: ZQKC Date: Mon, 3 Apr 2023 11:45:56 +0800 Subject: [PATCH] =?UTF-8?q?[Bugfix]=E4=BF=AE=E5=A4=8DBroker=E5=85=83?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=E8=A7=A3=E6=9E=90=E6=96=B9=E6=B3=95=E6=9C=AA?= =?UTF-8?q?=E8=B0=83=E7=94=A8=E5=AF=BC=E8=87=B4=E6=8E=A5=E5=85=A5=E9=9B=86?= =?UTF-8?q?=E7=BE=A4=E5=A4=B1=E8=B4=A5=E7=9A=84=E9=97=AE=E9=A2=98(#986)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/KafkaZKDAOImpl.java | 5 +- .../znode/brokers/BrokerMetadata.java | 124 ++++++++++++++---- 2 files changed, 102 insertions(+), 27 deletions(-) diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/impl/KafkaZKDAOImpl.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/impl/KafkaZKDAOImpl.java index 82cb8130..b56c53d6 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/impl/KafkaZKDAOImpl.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/impl/KafkaZKDAOImpl.java @@ -7,6 +7,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.enums.topic.TopicTypeEnum; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; @@ -78,7 +79,7 @@ public class KafkaZKDAOImpl implements KafkaZKDAO { try { BrokerMetadata metadata = this.getData(kafkaZkClient.currentZooKeeper(), BrokerIdZNode.path(brokerId), false, BrokerMetadata.class); - BrokerMetadata.parseAndUpdateBrokerMetadata(metadata); + return this.convert2Broker(clusterPhyId, brokerId, metadata); } catch (KeeperException ke) { logger.error("method=getBrokerMetadata||clusterPhyId={}||brokerId={}||errMsg=exception", clusterPhyId, brokerId, ke); @@ -279,7 +280,7 @@ public class KafkaZKDAOImpl implements KafkaZKDAO { metadata.setJmxPort(brokerMetadata.getJmxPort()); metadata.setStartTimestamp(brokerMetadata.getTimestamp()); metadata.setRack(brokerMetadata.getRack()); - metadata.setStatus(1); + metadata.setStatus(Constant.ALIVE); metadata.setEndpointMap(brokerMetadata.getEndpointMap()); return metadata; } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/BrokerMetadata.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/BrokerMetadata.java index 3b252c5f..a944d4b9 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/BrokerMetadata.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/BrokerMetadata.java @@ -1,12 +1,11 @@ package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers; -import com.fasterxml.jackson.annotation.JsonIgnore; +import com.alibaba.fastjson.JSON; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; -import lombok.Data; import java.io.Serializable; import java.util.HashMap; @@ -51,7 +50,6 @@ import java.util.Map; * } * */ -@Data @JsonIgnoreProperties(ignoreUnknown = true) public class BrokerMetadata implements Serializable { private static final long serialVersionUID = 3918113492423375809L; @@ -74,34 +72,92 @@ public class BrokerMetadata implements Serializable { private String rack; - @JsonIgnore - public String getExternalHost() { - if (!endpointMap.containsKey(KafkaConstant.EXTERNAL_KEY)) { - // external如果不存在,就返回host - return host; + public List getEndpoints() { + return endpoints; + } + + public void setEndpoints(List endpoints) { + this.endpoints = endpoints; + } + + public Map getEndpointMap() { + if (endpointMap == null) { + this.parseBrokerMetadata(); } - return endpointMap.get(KafkaConstant.EXTERNAL_KEY).getIp(); + return endpointMap; + } + + public String getHost() { + if (endpointMap == null) { + this.parseBrokerMetadata(); + } + + return host; + } + + public void setHost(String host) { + this.host = host; } - @JsonIgnore - public String getInternalHost() { - if (!endpointMap.containsKey(KafkaConstant.INTERNAL_KEY)) { - // internal如果不存在,就返回host - return host; + public Integer getPort() { + if (endpointMap == null) { + this.parseBrokerMetadata(); } - return endpointMap.get(KafkaConstant.INTERNAL_KEY).getIp(); + + return port; } - public static void parseAndUpdateBrokerMetadata(BrokerMetadata brokerMetadata) { - brokerMetadata.setEndpointMap(new HashMap<>()); + public void setPort(Integer port) { + this.port = port; + } + + public Integer getJmxPort() { + return jmxPort; + } + + public void setJmxPort(Integer jmxPort) { + this.jmxPort = jmxPort; + } + + public Integer getVersion() { + return version; + } - if (brokerMetadata.getEndpoints().isEmpty()) { + public void setVersion(Integer version) { + this.version = version; + } + + public Long getTimestamp() { + return timestamp; + } + + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp; + } + + public String getRack() { + return rack; + } + + public void setRack(String rack) { + this.rack = rack; + } + + private synchronized void parseBrokerMetadata() { + if (this.endpointMap != null) { + return; + } + + if (this.endpoints == null || this.endpoints.isEmpty()) { + this.endpointMap = new HashMap<>(0); return; } + Map tempEndpointMap = new HashMap<>(); + // example EXTERNAL://10.179.162.202:7092 - for (String endpoint: brokerMetadata.getEndpoints()) { + for (String endpoint: this.endpoints) { int idx1 = endpoint.indexOf("://"); int idx2 = endpoint.lastIndexOf(":"); if (idx1 == -1 || idx2 == -1 || idx1 == idx2) { @@ -111,19 +167,37 @@ public class BrokerMetadata implements Serializable { String brokerHost = endpoint.substring(idx1 + "://".length(), idx2); String brokerPort = endpoint.substring(idx2 + 1); - brokerMetadata.getEndpointMap().put(endpoint.substring(0, idx1), new IpPortData(brokerHost, brokerPort)); + tempEndpointMap.put(endpoint.substring(0, idx1), new IpPortData(brokerHost, brokerPort)); if (KafkaConstant.INTERNAL_KEY.equals(endpoint.substring(0, idx1))) { // 优先使用internal的地址进行展示 - brokerMetadata.setHost(brokerHost); - brokerMetadata.setPort(ConvertUtil.string2Integer(brokerPort)); + this.host = brokerHost; + this.port = ConvertUtil.string2Integer(brokerPort); } - if (null == brokerMetadata.getHost()) { - brokerMetadata.setHost(brokerHost); - brokerMetadata.setPort(ConvertUtil.string2Integer(brokerPort)); + if (null == this.host) { + this.host = brokerHost; + this.port = ConvertUtil.string2Integer(brokerPort); } } + + this.endpointMap = tempEndpointMap; + } + + public static void main(String[] args) { + String str = "{\t\n" + + "\t\"listener_security_protocol_map\":{\"EXTERNAL\":\"SASL_PLAINTEXT\",\"INTERNAL\":\"SASL_PLAINTEXT\"},\n" + + "\t\"endpoints\":[\"EXTERNAL://10.179.162.202:7092\",\"INTERNAL://10.179.162.202:7093\"],\n" + + "\t\"jmx_port\":8099,\n" + + "\t\"host\":null,\n" + + "\t\"timestamp\":\"1627289710439\",\n" + + "\t\"port\":-1,\n" + + "\t\"version\":4\n" + + "}"; + + BrokerMetadata bm = JSON.parseObject(str, BrokerMetadata.class); + System.out.println(bm.getHost()); + System.out.println(JSON.toJSON(bm)); } } -- GitLab