diff --git "a/docs/install_guide/\345\215\225\346\234\272\351\203\250\347\275\262\346\211\213\345\206\214.md" "b/docs/install_guide/\345\215\225\346\234\272\351\203\250\347\275\262\346\211\213\345\206\214.md" index f9f5ad1a41266b2332c25cd1039be63b4dfa67ad..b429b9d5f3b5d56dbceea2a89a6f64f6f5d2c92d 100644 --- "a/docs/install_guide/\345\215\225\346\234\272\351\203\250\347\275\262\346\211\213\345\206\214.md" +++ "b/docs/install_guide/\345\215\225\346\234\272\351\203\250\347\275\262\346\211\213\345\206\214.md" @@ -59,6 +59,8 @@ sh deploy_KnowStreaming-offline.sh ### 2.1.3、容器部署 +#### 2.1.3.1、Helm + **环境依赖** - Kubernetes >= 1.14 ,Helm >= 2.17.0 @@ -87,6 +89,103 @@ helm pull knowstreaming/knowstreaming-manager   +#### 2.1.3.2、Docker Compose +```yml +version: "3" + +services: + + knowstreaming-manager: + image: knowstreaming/knowstreaming-manager:0.2.0-test + container_name: knowstreaming-manager + privileged: true + restart: always + depends_on: + - elasticsearch-single + - knowstreaming-mysql + expose: + - 80 + command: + - /bin/sh + - /ks-start.sh + environment: + TZ: Asia/Shanghai + + SERVER_MYSQL_ADDRESS: knowstreaming-mysql:3306 + SERVER_MYSQL_DB: know_streaming + SERVER_MYSQL_USER: root + SERVER_MYSQL_PASSWORD: admin2022_ + + SERVER_ES_ADDRESS: elasticsearch-single:9200 + + JAVA_OPTS: -Xmx1g -Xms1g + +# extra_hosts: +# - "hostname:x.x.x.x" +# volumes: +# - /ks/manage/log:/logs + knowstreaming-ui: + image: knowstreaming/knowstreaming-ui:0.2.0-test1 + container_name: knowstreaming-ui + restart: always + ports: + - '18092:80' + environment: + TZ: Asia/Shanghai + depends_on: + - knowstreaming-manager +# extra_hosts: +# - "hostname:x.x.x.x" + + elasticsearch-single: + image: docker.io/library/elasticsearch:7.6.2 + container_name: elasticsearch-single + restart: always + expose: + - 9200 + - 9300 +# ports: +# - '9200:9200' +# - '9300:9300' + environment: + TZ: Asia/Shanghai + ES_JAVA_OPTS: -Xms512m -Xmx512m + discovery.type: single-node +# volumes: +# - /ks/es/data:/usr/share/elasticsearch/data + + knowstreaming-init: + image: knowstreaming/knowstreaming-manager:0.2.0-test + container_name: knowstreaming_init + depends_on: + - elasticsearch-single + command: + - /bin/bash + - /es_template_create.sh + environment: + TZ: Asia/Shanghai + SERVER_ES_ADDRESS: elasticsearch-single:9200 + + + knowstreaming-mysql: + image: knowstreaming/knowstreaming-mysql:0.2.0-test + container_name: knowstreaming-mysql + restart: always + environment: + TZ: Asia/Shanghai + MYSQL_ROOT_PASSWORD: admin2022_ + MYSQL_DATABASE: know_streaming + MYSQL_ROOT_HOST: '%' + expose: + - 3306 +# ports: +# - '3306:3306' +# volumes: +# - /ks/mysql/data:/data/mysql +``` + +  + ### 2.1.4、手动部署 **部署流程** diff --git a/docs/user_guide/faq.md b/docs/user_guide/faq.md index 764c58b97b1df1fe76dc109d5b55beff82de5904..98dfbf8329feaf62ced9f4cc9b4926e92f1fc097 100644 --- a/docs/user_guide/faq.md +++ b/docs/user_guide/faq.md @@ -166,3 +166,19 @@ Node 版本: v12.22.12 需要到具体的应用中执行 `npm run start`,例如 `cd packages/layout-clusters-fe` 后,执行 `npm run start`。 应用启动后需要到基座应用中查看(需要启动基座应用,即 layout-clusters-fe)。 + + +## 8.12、权限识别失败问题 +1、使用admin账号登陆KnowStreaming时,点击系统管理-用户管理-角色管理-新增角色,查看页面是否正常。 + + + +2、查看'/logi-security/api/v1/permission/tree'接口返回值,出现如下图所示乱码现象。 +![接口返回值](http://img-ys011.didistatic.com/static/dc2img/do1_jTxBkwNGU9vZuYQQbdNw) + +3、查看logi_security_permission表,看看是否出现了中文乱码现象。 + +根据以上几点,我们可以确定是由于数据库乱码造成的权限识别失败问题。 + ++ 原因:由于数据库编码和我们提供的脚本不一致,数据库里的数据发生了乱码,因此出现权限识别失败问题。 ++ 解决方案:清空数据库数据,将数据库字符集调整为utf8,最后重新执行[dml-logi.sql](https://github.com/didi/KnowStreaming/blob/master/km-dist/init/sql/dml-logi.sql)脚本导入数据即可。 diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java index 994b2c8a5891bd93e8d2c6abdbe292893c2946f1..52a915200a78e8f4971d014ada13dfbfce9f4757 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java @@ -7,12 +7,14 @@ import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.security.common.dto.config.ConfigDTO; import com.didiglobal.logi.security.service.ConfigService; import com.xiaojukeji.know.streaming.km.biz.version.VersionControlManager; +import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDetailDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.UserMetricConfigDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.metric.UserMetricConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem; import com.xiaojukeji.know.streaming.km.common.bean.vo.config.metric.UserMetricConfigVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.version.VersionItemVO; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.VersionUtil; @@ -47,29 +49,29 @@ public class VersionControlManagerImpl implements VersionControlManager { @PostConstruct public void init(){ defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_HEALTH_SCORE, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_TOTAL_PRODUCE_REQUESTS, true)); defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_FAILED_FETCH_REQ, true)); defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_FAILED_PRODUCE_REQ, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_MESSAGE_IN, true)); defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_UNDER_REPLICA_PARTITIONS, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_TOTAL_PRODUCE_REQUESTS, true)); defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_BYTES_IN, true)); defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_BYTES_OUT, true)); defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_BYTES_REJECTED, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_MESSAGE_IN, true)); defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_HEALTH_SCORE, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_TOTAL_REQ_QUEUE_SIZE, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_TOTAL_RES_QUEUE_SIZE, true)); defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_ACTIVE_CONTROLLER_COUNT, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_TOTAL_PRODUCE_REQ, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_TOTAL_LOG_SIZE, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_CONNECTIONS, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_MESSAGES_IN, true)); defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_BYTES_IN, true)); defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_BYTES_OUT, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_GROUP_REBALANCES, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_JOB_RUNNING, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_CONNECTIONS, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_MESSAGES_IN, true)); defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_PARTITIONS_NO_LEADER, true)); defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_PARTITION_URP, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_TOTAL_LOG_SIZE, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_TOTAL_PRODUCE_REQ, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_TOTAL_REQ_QUEUE_SIZE, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_TOTAL_RES_QUEUE_SIZE, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_GROUP_REBALANCES, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_JOB_RUNNING, true)); defaultMetrics.add(new UserMetricConfig(METRIC_GROUP.getCode(), GROUP_METRIC_OFFSET_CONSUMED, true)); defaultMetrics.add(new UserMetricConfig(METRIC_GROUP.getCode(), GROUP_METRIC_LAG, true)); @@ -77,18 +79,18 @@ public class VersionControlManagerImpl implements VersionControlManager { defaultMetrics.add(new UserMetricConfig(METRIC_GROUP.getCode(), GROUP_METRIC_HEALTH_SCORE, true)); defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_HEALTH_SCORE, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_TOTAL_REQ_QUEUE, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_TOTAL_RES_QUEUE, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_CONNECTION_COUNT, true)); defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_MESSAGE_IN, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_TOTAL_PRODUCE_REQ, true)); defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_NETWORK_RPO_AVG_IDLE, true)); defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_REQ_AVG_IDLE, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_CONNECTION_COUNT, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_BYTES_IN, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_BYTES_OUT, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_PARTITIONS_SKEW, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_TOTAL_PRODUCE_REQ, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_TOTAL_REQ_QUEUE, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_TOTAL_RES_QUEUE, true)); defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_LEADERS_SKEW, true)); defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_UNDER_REPLICATE_PARTITION, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_PARTITIONS_SKEW, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_BYTES_IN, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_BYTES_OUT, true)); } @Autowired @@ -159,6 +161,9 @@ public class VersionControlManagerImpl implements VersionControlManager { UserMetricConfig umc = userMetricConfigMap.get(itemType + "@" + metric); userMetricConfigVO.setSet(null != umc && umc.isSet()); + if (umc != null) { + userMetricConfigVO.setRank(umc.getRank()); + } userMetricConfigVO.setName(itemVO.getName()); userMetricConfigVO.setType(itemVO.getType()); userMetricConfigVO.setDesc(itemVO.getDesc()); @@ -178,13 +183,29 @@ public class VersionControlManagerImpl implements VersionControlManager { @Override public Result updateUserMetricItem(Long clusterId, Integer type, UserMetricConfigDTO dto, String operator) { Map metricsSetMap = dto.getMetricsSet(); - if(null == metricsSetMap || metricsSetMap.isEmpty()){ + + //转换metricDetailDTOList + List metricDetailDTOList = dto.getMetricDetailDTOList(); + Map metricDetailMap = new HashMap<>(); + if (metricDetailDTOList != null && !metricDetailDTOList.isEmpty()) { + metricDetailMap = metricDetailDTOList.stream().collect(Collectors.toMap(MetricDetailDTO::getMetric, Function.identity())); + } + + //转换metricsSetMap + if (metricsSetMap != null && !metricsSetMap.isEmpty()) { + for (Map.Entry metricAndShowEntry : metricsSetMap.entrySet()) { + if (metricDetailMap.containsKey(metricAndShowEntry.getKey())) continue; + metricDetailMap.put(metricAndShowEntry.getKey(), new MetricDetailDTO(metricAndShowEntry.getKey(), metricAndShowEntry.getValue(), null)); + } + } + + if (metricDetailMap.isEmpty()) { return Result.buildSuc(); } Set userMetricConfigs = getUserMetricConfig(operator); - for(Map.Entry metricAndShowEntry : metricsSetMap.entrySet()){ - UserMetricConfig userMetricConfig = new UserMetricConfig(type, metricAndShowEntry.getKey(), metricAndShowEntry.getValue()); + for (MetricDetailDTO metricDetailDTO : metricDetailMap.values()) { + UserMetricConfig userMetricConfig = new UserMetricConfig(type, metricDetailDTO.getMetric(), metricDetailDTO.getSet(), metricDetailDTO.getRank()); userMetricConfigs.remove(userMetricConfig); userMetricConfigs.add(userMetricConfig); } @@ -228,7 +249,7 @@ public class VersionControlManagerImpl implements VersionControlManager { return defaultMetrics; } - return JSON.parseObject(value, new TypeReference>(){}); + return JSON.parseObject(value, new TypeReference>() {}); } public static void main(String[] args){ diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/metrices/MetricDetailDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/metrices/MetricDetailDTO.java new file mode 100644 index 0000000000000000000000000000000000000000..0d51d2c668d3a76131912132addb61b62f3011f0 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/metrices/MetricDetailDTO.java @@ -0,0 +1,28 @@ +package com.xiaojukeji.know.streaming.km.common.bean.dto.metrices; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author didi + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@ApiModel(description = "指标详细属性信息") +public class MetricDetailDTO extends BaseDTO { + + @ApiModelProperty("指标名称") + private String metric; + + @ApiModelProperty("指标是否显示") + private Boolean set; + + @ApiModelProperty("指标优先级") + private Integer rank; + +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/metrices/UserMetricConfigDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/metrices/UserMetricConfigDTO.java index 02bb1d2a230d1f37f81b981791e52f537130395d..bf71b66375a2ea41087cee05c707025cbd741e23 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/metrices/UserMetricConfigDTO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/metrices/UserMetricConfigDTO.java @@ -7,6 +7,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import java.util.List; import java.util.Map; @@ -17,4 +18,7 @@ import java.util.Map; public class UserMetricConfigDTO extends BaseDTO { @ApiModelProperty("指标展示设置项,key:指标名;value:是否展现(true展现/false不展现)") private Map metricsSet; + + @ApiModelProperty("指标自定义属性列表") + private List metricDetailDTOList; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java index fa67cac528c98265c04b2ba2c3415f6f3ab1d238..752aade037990a364e7f93dd3ddca6d70d124b7c 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java @@ -5,7 +5,6 @@ import com.alibaba.fastjson.TypeReference; import com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData; import com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.BrokerMetadata; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -79,20 +78,6 @@ public class Broker implements Serializable { return metadata; } - public static Broker buildFrom(Long clusterPhyId, Integer brokerId, BrokerMetadata brokerMetadata) { - Broker metadata = new Broker(); - metadata.setClusterPhyId(clusterPhyId); - metadata.setBrokerId(brokerId); - metadata.setHost(brokerMetadata.getHost()); - metadata.setPort(brokerMetadata.getPort()); - metadata.setJmxPort(brokerMetadata.getJmxPort()); - metadata.setStartTimestamp(brokerMetadata.getTimestamp()); - metadata.setRack(brokerMetadata.getRack()); - metadata.setStatus(1); - metadata.setEndpointMap(brokerMetadata.getEndpointMap()); - return metadata; - } - public static Broker buildFrom(BrokerPO brokerPO) { Broker broker = ConvertUtil.obj2Obj(brokerPO, Broker.class); String endpointMapStr = brokerPO.getEndpointMap(); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/metric/UserMetricConfig.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/metric/UserMetricConfig.java index 6895fb403f1ec4f0a33d2be4f1ad003a2c0da3b7..e244181abf952269a4b8ecf992f6e584379818b0 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/metric/UserMetricConfig.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/metric/UserMetricConfig.java @@ -1,12 +1,12 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.config.metric; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor -@AllArgsConstructor public class UserMetricConfig { private int type; @@ -15,6 +15,22 @@ public class UserMetricConfig { private boolean set; + private Integer rank; + + public UserMetricConfig(int type, String metric, boolean set, Integer rank) { + this.type = type; + this.metric = metric; + this.set = set; + this.rank = rank; + } + + public UserMetricConfig(int type, String metric, boolean set) { + this.type = type; + this.metric = metric; + this.set = set; + this.rank = null; + } + @Override public int hashCode(){ return metric.hashCode() << 1 + type; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/config/metric/UserMetricConfigVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/config/metric/UserMetricConfigVO.java index e50fc6e7dee486dd2caaa2942ed93bfccc0fc39d..2b4e76b3fffa1efbe855c98d32175eee6d769ecd 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/config/metric/UserMetricConfigVO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/config/metric/UserMetricConfigVO.java @@ -14,4 +14,7 @@ import lombok.NoArgsConstructor; public class UserMetricConfigVO extends VersionItemVO { @ApiModelProperty(value = "该指标用户是否设置展现", example = "true") private Boolean set; + + @ApiModelProperty(value = "该指标展示优先级", example = "1") + private Integer rank; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java index 36575938b9e3151e4c0d3749283883041a2627c9..edd897ff826b321ff0f23fef01ceb5c5857ae734 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java @@ -42,6 +42,7 @@ public class Constant { */ public static final Integer DEFAULT_CLUSTER_HEALTH_SCORE = 90; + public static final String DEFAULT_USER_NAME = "know-streaming-app"; public static final int INVALID_CODE = -1; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java index 3d0b6a5c2cc19d13bc1da976aed8042569f77a1c..1be8dadfafde777fb8bd7af555c50fcb4b6acbfc 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java @@ -52,6 +52,10 @@ public class MsgConstant { /**************************************************** Partition ****************************************************/ + public static String getPartitionNoLeader(Long clusterPhyId, String topicName) { + return String.format("集群ID:[%d] Topic名称:[%s] 所有分区NoLeader", clusterPhyId, topicName); + } + public static String getPartitionNotExist(Long clusterPhyId, String topicName) { return String.format("集群ID:[%d] Topic名称:[%s] 存在非法的分区ID", clusterPhyId, topicName); } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxConnectorWrap.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxConnectorWrap.java index 0fb65589d52a24e4ff8bd4dff60956613d077f81..ca7c01c4f5aec4c10e0f1ccc298f722d77d86090 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxConnectorWrap.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxConnectorWrap.java @@ -90,6 +90,8 @@ public class JmxConnectorWrap { } try { jmxConnector.close(); + + jmxConnector = null; } catch (IOException e) { LOGGER.warn("close JmxConnector exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e); } @@ -105,6 +107,11 @@ public class JmxConnectorWrap { acquire(); MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); return mBeanServerConnection.getAttribute(name, attribute); + } catch (IOException ioe) { + // 如果是因为连接断开,则进行重新连接,并抛出异常 + reInitDueIOException(); + + throw ioe; } finally { atomicInteger.incrementAndGet(); } @@ -120,6 +127,11 @@ public class JmxConnectorWrap { acquire(); MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); return mBeanServerConnection.getAttributes(name, attributes); + } catch (IOException ioe) { + // 如果是因为连接断开,则进行重新连接,并抛出异常 + reInitDueIOException(); + + throw ioe; } finally { atomicInteger.incrementAndGet(); } @@ -131,6 +143,11 @@ public class JmxConnectorWrap { acquire(); MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); return mBeanServerConnection.queryNames(name, query); + } catch (IOException ioe) { + // 如果是因为连接断开,则进行重新连接,并抛出异常 + reInitDueIOException(); + + throw ioe; } finally { atomicInteger.incrementAndGet(); } @@ -186,4 +203,26 @@ public class JmxConnectorWrap { } } } + + private synchronized void reInitDueIOException() { + try { + if (jmxConnector == null) { + return; + } + + // 检查是否正常 + jmxConnector.getConnectionId(); + + // 如果正常则直接返回 + return; + } catch (Exception e) { + // ignore + } + + // 关闭旧的 + this.close(); + + // 重新创建 + this.checkJmxConnectionAndInitIfNeed(); + } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/AbstractZKWatcher.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/AbstractZKWatcher.java index 261aff0af95333d1f4b072815adfa81b1650177b..e43f1b40f0ee174e3d3afa1c3daf65d2be1a593b 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/AbstractZKWatcher.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/AbstractZKWatcher.java @@ -5,7 +5,7 @@ import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.zk.KafkaZkClient; import org.springframework.beans.factory.annotation.Autowired; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/AbstractZKHandler.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/AbstractZKHandler.java index e2ed09d15a5e800fe75d6f4689f02a96293a4ea4..04a14e877ac9e9f057995a1b5f0a5718b703a172 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/AbstractZKHandler.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/AbstractZKHandler.java @@ -7,7 +7,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService; import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; public abstract class AbstractZKHandler { diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/BrokersNodeChangeHandler.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/BrokersNodeChangeHandler.java index b7c93c2fb80db68de87323a14bad5af0ccf2fdeb..314195af2a1442d401d2e5b1a0f3eaf228dc0782 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/BrokersNodeChangeHandler.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/BrokersNodeChangeHandler.java @@ -9,7 +9,7 @@ import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.zk.BrokerIdsZNode; import kafka.zookeeper.ZNodeChildChangeHandler; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ConfigNotificationNodeChangeHandler.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ConfigNotificationNodeChangeHandler.java index 91d91571756b8f7c15f16450749f45155fbab91d..1e6266327c218b319155d2b0e761d7bfd06ddfa4 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ConfigNotificationNodeChangeHandler.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ConfigNotificationNodeChangeHandler.java @@ -8,11 +8,11 @@ import com.xiaojukeji.know.streaming.km.common.enums.KafkaConfigTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum; import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; import com.xiaojukeji.know.streaming.km.common.utils.Tuple; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.config.ConfigChangeNotificationBaseData; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.config.ConfigChangeNotificationDataV1; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.config.ConfigChangeNotificationDataV2; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config.ConfigChangeNotificationBaseData; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config.ConfigChangeNotificationDataV1; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config.ConfigChangeNotificationDataV2; import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.zk.ConfigEntityChangeNotificationZNode; import kafka.zookeeper.ZNodeChildChangeHandler; import org.apache.zookeeper.data.Stat; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java index 904b7d72e066dac93c2eda73c92bde2169af6eb0..b671c4a3ce2f3f91871d7f9861a8d5b7492fc8d9 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java @@ -11,7 +11,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService; import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.zk.ControllerZNode; import kafka.zookeeper.ZNodeChangeHandler; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/TopicsNodeChangeHandler.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/TopicsNodeChangeHandler.java index 316026326bfc61ff560984a2b590f4e707678044..88c012814c78170cb0932c10330d6aa17d2c39b9 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/TopicsNodeChangeHandler.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/TopicsNodeChangeHandler.java @@ -9,7 +9,7 @@ import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.zk.TopicsZNode; import kafka.zookeeper.ZNodeChildChangeHandler; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java index dc702388ce6014a78ca5b7b396fb7d3cf19f887d..fbede23c684204bf7f875b4626fcfcd2be703dbf 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java @@ -24,7 +24,6 @@ import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistExcept import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.BrokerMetadata; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; @@ -32,8 +31,7 @@ import com.xiaojukeji.know.streaming.km.persistence.jmx.JmxDAO; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient; import com.xiaojukeji.know.streaming.km.persistence.mysql.broker.BrokerDAO; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; -import kafka.zk.BrokerIdZNode; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.zk.BrokerIdsZNode; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.Node; @@ -310,9 +308,7 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok List brokerIdList = kafkaZKDAO.getChildren(clusterPhy.getId(), BrokerIdsZNode.path(), false); for (String brokerId: brokerIdList) { - BrokerMetadata metadata = kafkaZKDAO.getData(clusterPhy.getId(), BrokerIdZNode.path(Integer.valueOf(brokerId)), BrokerMetadata.class); - BrokerMetadata.parseAndUpdateBrokerMetadata(metadata); - brokerList.add(Broker.buildFrom(clusterPhy.getId(), Integer.valueOf(brokerId), metadata)); + brokerList.add(kafkaZKDAO.getBrokerMetadata(clusterPhy.getId(), Integer.valueOf(brokerId))); } return Result.buildSuc(brokerList); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterValidateServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterValidateServiceImpl.java index 6dcd858e1e93b4717f77e4b1f4d4c0ae08f12194..ba72d2fea3273c0156514c7ab8ccfb88e371fee5 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterValidateServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterValidateServiceImpl.java @@ -13,8 +13,8 @@ import com.xiaojukeji.know.streaming.km.common.enums.valid.ValidateKafkaAddressE import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterValidateService; import com.xiaojukeji.know.streaming.km.persistence.jmx.JmxDAO; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; -import com.xiaojukeji.know.streaming.km.persistence.zk.impl.KafkaZKDAOImpl; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.impl.KafkaZKDAOImpl; import kafka.server.KafkaConfig; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.*; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java index 1fb3f488e299ee94e78ea15e44346097b91121cf..8048eabe3b13625697dd8812ae2f3f24474d547d 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java @@ -19,7 +19,7 @@ import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.mysql.kafkacontroller.KafkaControllerDAO; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.Node; import org.springframework.beans.factory.annotation.Autowired; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java index 13eedb41517f7e7b8879329860d9cd9a411cbad2..1795e4d43656c870bf8cb5b6eff4a5198441b076 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java @@ -21,14 +21,14 @@ import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.PartitionMap; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.PartitionState; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionMap; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionState; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaConsumerClient; import com.xiaojukeji.know.streaming.km.persistence.mysql.partition.PartitionDAO; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.zk.TopicPartitionStateZNode; import kafka.zk.TopicPartitionsZNode; import kafka.zk.TopicZNode; @@ -202,10 +202,22 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P @Override public Result> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, OffsetSpec offsetSpec, Long timestamp) { Map topicPartitionOffsets = new HashMap<>(); - this.listPartitionByTopic(clusterPhyId, topicName) - .stream() + + List partitionList = this.listPartitionByTopic(clusterPhyId, topicName); + if (partitionList == null || partitionList.isEmpty()) { + // Topic不存在 + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getTopicNotExist(clusterPhyId, topicName)); + } + + partitionList.stream() + .filter(item -> !item.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER)) .forEach(elem -> topicPartitionOffsets.put(new TopicPartition(topicName, elem.getPartitionId()), offsetSpec)); + if (topicPartitionOffsets.isEmpty()) { + // 所有分区no-leader + return Result.buildFromRSAndMsg(ResultStatus.OPERATION_FAILED, MsgConstant.getPartitionNoLeader(clusterPhyId, topicName)); + } + try { return (Result>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, topicPartitionOffsets, timestamp)); } catch (VCHandlerNotExistException e) { diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java index 7f289c8884c097695f5ea4f9201755e51e433168..7cd017f441399025178e62e3163badd25624ae03 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java @@ -23,7 +23,7 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.controller.ReplicaAssignment; import kafka.server.ConfigType; import kafka.zk.AdminZkClient; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicConfigServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicConfigServiceImpl.java index 9aaadee5251fba3b1f3dfe2defa92382248976d7..09be0d43fd1010c6135291745cc1cfe0bfb40bc1 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicConfigServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicConfigServiceImpl.java @@ -30,7 +30,7 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.server.ConfigType; import kafka.zk.AdminZkClient; import kafka.zk.KafkaZkClient; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java index bffabec81123ebf5e58ee2dd63eedea66a965c86..e2870d9d0cce415b1860e35b0f84bf467b156096 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java @@ -23,7 +23,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.mysql.topic.TopicDAO; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.zk.TopicsZNode; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.TopicPartitionInfo; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/package-info.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/package-info.java new file mode 100644 index 0000000000000000000000000000000000000000..88139db3e69ef7d5ce7256adc2d622d2e47012ed --- /dev/null +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/package-info.java @@ -0,0 +1,4 @@ +/** + * 读取Kafka在ZK中存储的数据的包 + */ +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper; \ No newline at end of file diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/zk/KafkaZKDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/KafkaZKDAO.java similarity index 97% rename from km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/zk/KafkaZKDAO.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/KafkaZKDAO.java index 3e00e558a708042c8de3a7f283ed443b42a14a77..7a7d4b761a7bbbda0b8eec3c22a8ab20465c9e50 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/zk/KafkaZKDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/KafkaZKDAO.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.persistence.zk; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service; import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/zk/impl/KafkaZKDAOImpl.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/impl/KafkaZKDAOImpl.java similarity index 90% rename from km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/zk/impl/KafkaZKDAOImpl.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/impl/KafkaZKDAOImpl.java index 61a7bad0911aba19b58b4b0770c7893fcc5a0d38..82cb8130bbb0810c2916cc3360599c458d2676fb 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/zk/impl/KafkaZKDAOImpl.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/impl/KafkaZKDAOImpl.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.persistence.zk.impl; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.impl; import com.alibaba.fastjson.JSON; import com.didiglobal.logi.log.ILog; @@ -11,11 +11,11 @@ import com.xiaojukeji.know.streaming.km.common.enums.topic.TopicTypeEnum; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.utils.Tuple; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.ControllerData; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.BrokerMetadata; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.PartitionMap; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.ControllerData; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.BrokerMetadata; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionMap; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.utils.Json; import kafka.zk.*; import kafka.zookeeper.AsyncResponse; @@ -46,14 +46,14 @@ public class KafkaZKDAOImpl implements KafkaZKDAO { public Broker getBrokerMetadata(String zkAddress) throws KeeperException.NoNodeException, AdminOperateException { ZooKeeper zooKeeper = null; try { - zooKeeper = new ZooKeeper(zkAddress, 1000, watchedEvent -> logger.info(" receive event : " + watchedEvent.getType().name())); + zooKeeper = new ZooKeeper(zkAddress, 3000, watchedEvent -> logger.info(" receive event : " + watchedEvent.getType().name())); List brokerIdList = this.getChildren(zooKeeper, BrokerIdsZNode.path()); if (brokerIdList == null || brokerIdList.isEmpty()) { return null; } BrokerMetadata brokerMetadata = this.getData(zooKeeper, BrokerIdZNode.path(Integer.parseInt(brokerIdList.get(0))), false, BrokerMetadata.class); - return Broker.buildFrom(null, Integer.valueOf(brokerIdList.get(0)), brokerMetadata); + return this.convert2Broker(null, Integer.valueOf(brokerIdList.get(0)), brokerMetadata); } catch (KeeperException.NoNodeException nne) { logger.warn("method=getBrokerMetadata||zkAddress={}||errMsg=exception", zkAddress, nne); throw nne; @@ -79,7 +79,7 @@ public class KafkaZKDAOImpl implements KafkaZKDAO { try { BrokerMetadata metadata = this.getData(kafkaZkClient.currentZooKeeper(), BrokerIdZNode.path(brokerId), false, BrokerMetadata.class); BrokerMetadata.parseAndUpdateBrokerMetadata(metadata); - return Broker.buildFrom(clusterPhyId, brokerId, metadata); + return this.convert2Broker(clusterPhyId, brokerId, metadata); } catch (KeeperException ke) { logger.error("method=getBrokerMetadata||clusterPhyId={}||brokerId={}||errMsg=exception", clusterPhyId, brokerId, ke); throw ke; @@ -269,4 +269,18 @@ public class KafkaZKDAOImpl implements KafkaZKDAO { byte[] bytes = zooKeeper.getData(path, addWatch, null); return JSON.parseObject(bytes, clazz); } + + private Broker convert2Broker(Long clusterPhyId, Integer brokerId, BrokerMetadata brokerMetadata) { + Broker metadata = new Broker(); + metadata.setClusterPhyId(clusterPhyId); + metadata.setBrokerId(brokerId); + metadata.setHost(brokerMetadata.getHost()); + metadata.setPort(brokerMetadata.getPort()); + metadata.setJmxPort(brokerMetadata.getJmxPort()); + metadata.setStartTimestamp(brokerMetadata.getTimestamp()); + metadata.setRack(brokerMetadata.getRack()); + metadata.setStatus(1); + metadata.setEndpointMap(brokerMetadata.getEndpointMap()); + return metadata; + } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/ControllerData.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/ControllerData.java similarity index 81% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/ControllerData.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/ControllerData.java index f69c6862091fdb1572802ce2faee1dd668ee5fa4..afc7f55b2ff77ccf382a2a31d9cf4d7bbcd20eb8 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/ControllerData.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/ControllerData.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/BrokerMetadata.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/BrokerMetadata.java similarity index 97% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/BrokerMetadata.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/BrokerMetadata.java index 480867afa0a8d9eefdce8cddc3badc5e21cf3395..3b252c5f45dc5f7aa4c797a0ea8c8c93d3de8a9a 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/BrokerMetadata.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/BrokerMetadata.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/PartitionMap.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/PartitionMap.java similarity index 91% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/PartitionMap.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/PartitionMap.java index bf1fbd1a904bda99bdcc50ecf44440c08117492c..4bc36cac8bcdc0c396c30ca291d17226b3d44ab8 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/PartitionMap.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/PartitionMap.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/PartitionState.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/PartitionState.java similarity index 93% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/PartitionState.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/PartitionState.java index 60ae43074d54579b300dcaa5c7d7533936856fc0..47be5cb97eabb54b9bc02222288f3580b8dbac45 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/PartitionState.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/PartitionState.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.AllArgsConstructor; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/TopicMetadata.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/TopicMetadata.java similarity index 91% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/TopicMetadata.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/TopicMetadata.java index 803a5e299b3a214323b625e97a4c915538197a46..f84c8fcf90236310f6d6cfe8788e3e1a845fb79a 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/TopicMetadata.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/TopicMetadata.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationBaseData.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationBaseData.java similarity index 77% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationBaseData.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationBaseData.java index 86a3abe90964772b2af27286a4776d6e0c6b9188..09ffee10485acbfc4860015f4cb4ea6f60cc47a2 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationBaseData.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationBaseData.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode.config; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationDataV1.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationDataV1.java similarity index 86% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationDataV1.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationDataV1.java index 75598e651ba13bd984c3e08ba685674a8af24db9..1853b94061524501bb6e651c686f63662e4e953e 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationDataV1.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationDataV1.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode.config; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.AllArgsConstructor; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationDataV2.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationDataV2.java similarity index 90% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationDataV2.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationDataV2.java index 6b0d8806b059114536ade1ca4743de56c7c739df..5e6024fafa6eed9a9903b83e002e8b738e9f2f4a 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationDataV2.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationDataV2.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode.config; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.AllArgsConstructor; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigNodeData.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigNodeData.java similarity index 80% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigNodeData.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigNodeData.java index 13132b4f66357b090125f6ce68fba4ac562f8d0f..287912dc1670e9eb24356258966c076b256c7011 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigNodeData.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigNodeData.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode.config; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/km-task/pom.xml b/km-task/pom.xml index 502d806dc6c15e0bfcf7de5d0214d3518addd830..d07b37b3e4b3af497019a81b304d9b8e97e2a59f 100644 --- a/km-task/pom.xml +++ b/km-task/pom.xml @@ -43,7 +43,21 @@ io.github.zqrferrari logi-job-spring-boot-starter + + + oshi-core + com.github.oshi + + + + + + com.github.oshi + oshi-core + 5.6.1 + + io.github.zqrferrari logi-security-spring-boot-starter diff --git a/pom.xml b/pom.xml index e30add73879a2fc3a821f156dd98443c9e7d88c1..619f24c75214bcd29ff769ccf4373e198766162e 100644 --- a/pom.xml +++ b/pom.xml @@ -230,6 +230,19 @@ io.github.zqrferrari logi-job-spring-boot-starter 1.0.23 + + + oshi-core + com.github.oshi + + + + + + + com.github.oshi + oshi-core + 5.6.1