From 2b600e96eba64a95cd136afc24fcaa3d7cb09a8d Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 12 Oct 2022 16:41:27 +0800 Subject: [PATCH] =?UTF-8?q?=E5=81=A5=E5=BA=B7=E6=A3=80=E6=9F=A5=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../task/health/AbstractHealthCheckTask.java | 115 ++++++++++++++++++ .../km/task/health/BrokerHealthCheckTask.java | 109 +---------------- .../task/health/ClusterHealthCheckTask.java | 109 +---------------- .../km/task/health/GroupHealthCheckTask.java | 107 +--------------- .../km/task/health/TopicHealthCheckTask.java | 108 +--------------- 5 files changed, 127 insertions(+), 421 deletions(-) create mode 100644 km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/AbstractHealthCheckTask.java diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/AbstractHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/AbstractHealthCheckTask.java new file mode 100644 index 00000000..1f6c83ff --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/AbstractHealthCheckTask.java @@ -0,0 +1,115 @@ +package com.xiaojukeji.know.streaming.km.task.health; + +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; +import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; +import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +public abstract class AbstractHealthCheckTask extends AbstractAsyncMetricsDispatchTask { + private static final ILog log = LogFactory.getLog(AbstractHealthCheckTask.class); + + @Autowired + private HealthCheckResultService healthCheckResultService; + + public abstract AbstractHealthCheckService getCheckService(); + + @Override + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs); + } + + private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + // 获取配置,<配置名,配置信息> + Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); + + // 检查结果 + List resultList = new ArrayList<>(); + + // 遍历Check-Service + List paramList = this.getCheckService().getResList(clusterPhy.getId()); + if (ValidateUtils.isEmptyList(paramList)) { + // 当前无该维度的资源,则直接设置为 + resultList.addAll(this.getNoResResult(clusterPhy.getId(), this.getCheckService(), healthConfigMap)); + } + + // 遍历资源 + for (ClusterPhyParam clusterPhyParam: paramList) { + resultList.addAll(this.checkAndGetResult(clusterPhyParam, healthConfigMap)); + } + + for (HealthCheckResult checkResult: resultList) { + try { + healthCheckResultService.replace(checkResult); + } catch (Exception e) { + log.error("class=AbstractHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e); + } + } + + // 删除10分钟之前的检查结果 + try { + healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 10 * 60 * 1000)); + } catch (Exception e) { + log.error("class=AbstractHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); + } + + return TaskResult.SUCCESS; + } + + private List getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map healthConfigMap) { + List resultList = new ArrayList<>(); + + // 进行检查 + for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { + HealthCheckDimensionEnum dimensionEnum = healthCheckService.getHealthCheckDimensionEnum(); + if (!clusterHealthConfig.getCheckNameEnum().getDimensionEnum().equals(dimensionEnum)) { + // 类型不匹配 + continue; + } + + // 记录 + HealthCheckResult checkResult = new HealthCheckResult( + dimensionEnum.getDimension(), + clusterHealthConfig.getCheckNameEnum().getConfigName(), + clusterPhyId, + "-1" + ); + checkResult.setPassed(Constant.YES); + resultList.add(checkResult); + } + + return resultList; + } + + private List checkAndGetResult(ClusterPhyParam clusterPhyParam, + Map healthConfigMap) { + List resultList = new ArrayList<>(); + + // 进行检查 + for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { + HealthCheckResult healthCheckResult = this.getCheckService().checkAndGetResult(clusterPhyParam, clusterHealthConfig); + if (healthCheckResult == null) { + continue; + } + + // 记录 + resultList.add(healthCheckResult); + } + + return resultList; + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/BrokerHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/BrokerHealthCheckTask.java index 7b611823..ef02be8e 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/BrokerHealthCheckTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/BrokerHealthCheckTask.java @@ -1,30 +1,13 @@ package com.xiaojukeji.know.streaming.km.task.health; import com.didiglobal.logi.job.annotation.Task; -import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.core.consensual.ConsensualEnum; -import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; -import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; -import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; -import com.xiaojukeji.know.streaming.km.common.constant.Constant; -import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; -import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; import com.xiaojukeji.know.streaming.km.core.service.health.checker.broker.HealthCheckBrokerService; -import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; -import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; - @NoArgsConstructor @AllArgsConstructor @Task(name = "BrokerHealthCheckTask", @@ -33,98 +16,12 @@ import java.util.Map; autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class BrokerHealthCheckTask extends AbstractAsyncMetricsDispatchTask { - private static final ILog log = LogFactory.getLog(BrokerHealthCheckTask.class); - - @Autowired - private HealthCheckResultService healthCheckResultService; - +public class BrokerHealthCheckTask extends AbstractHealthCheckTask { @Autowired private HealthCheckBrokerService healthCheckBrokerService; @Override - public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs); - } - - private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - // 获取配置,<配置名,配置信息> - Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); - - // 检查结果 - List resultList = new ArrayList<>(); - - // 遍历Check-Service - List paramList = healthCheckBrokerService.getResList(clusterPhy.getId()); - if (ValidateUtils.isEmptyList(paramList)) { - // 当前无该维度的资源,则直接设置为 - resultList.addAll(this.getNoResResult(clusterPhy.getId(), healthCheckBrokerService, healthConfigMap)); - } - - // 遍历资源 - for (ClusterPhyParam clusterPhyParam: paramList) { - resultList.addAll(this.checkAndGetResult(healthCheckBrokerService, clusterPhyParam, healthConfigMap)); - } - - for (HealthCheckResult checkResult: resultList) { - try { - healthCheckResultService.replace(checkResult); - } catch (Exception e) { - log.error("class=BrokerHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e); - } - } - - // 删除10分钟之前的检查结果 - try { - healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 10 * 60 * 1000)); - } catch (Exception e) { - log.error("class=BrokerHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); - } - - return TaskResult.SUCCESS; - } - - private List getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map healthConfigMap) { - List resultList = new ArrayList<>(); - - // 进行检查 - for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { - HealthCheckDimensionEnum dimensionEnum = healthCheckService.getHealthCheckDimensionEnum(); - if (!clusterHealthConfig.getCheckNameEnum().getDimensionEnum().equals(dimensionEnum)) { - // 类型不匹配 - continue; - } - - // 记录 - HealthCheckResult checkResult = new HealthCheckResult( - dimensionEnum.getDimension(), - clusterHealthConfig.getCheckNameEnum().getConfigName(), - clusterPhyId, - "-1" - ); - checkResult.setPassed(Constant.YES); - resultList.add(checkResult); - } - - return resultList; - } - - private List checkAndGetResult(AbstractHealthCheckService healthCheckService, - ClusterPhyParam clusterPhyParam, - Map healthConfigMap) { - List resultList = new ArrayList<>(); - - // 进行检查 - for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { - HealthCheckResult healthCheckResult = healthCheckService.checkAndGetResult(clusterPhyParam, clusterHealthConfig); - if (healthCheckResult == null) { - continue; - } - - // 记录 - resultList.add(healthCheckResult); - } - - return resultList; + public AbstractHealthCheckService getCheckService() { + return healthCheckBrokerService; } } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/ClusterHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/ClusterHealthCheckTask.java index cb7f78b2..43c16cb8 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/ClusterHealthCheckTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/ClusterHealthCheckTask.java @@ -1,30 +1,13 @@ package com.xiaojukeji.know.streaming.km.task.health; import com.didiglobal.logi.job.annotation.Task; -import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.core.consensual.ConsensualEnum; -import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; -import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; -import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; -import com.xiaojukeji.know.streaming.km.common.constant.Constant; -import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; -import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; import com.xiaojukeji.know.streaming.km.core.service.health.checker.cluster.HealthCheckClusterService; -import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; -import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; - @NoArgsConstructor @AllArgsConstructor @Task(name = "ClusterHealthCheckTask", @@ -33,98 +16,12 @@ import java.util.Map; autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class ClusterHealthCheckTask extends AbstractAsyncMetricsDispatchTask { - private static final ILog log = LogFactory.getLog(ClusterHealthCheckTask.class); - - @Autowired - private HealthCheckResultService healthCheckResultService; - +public class ClusterHealthCheckTask extends AbstractHealthCheckTask { @Autowired private HealthCheckClusterService healthCheckClusterService; @Override - public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs); - } - - private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - // 获取配置,<配置名,配置信息> - Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); - - // 检查结果 - List resultList = new ArrayList<>(); - - // 遍历Check-Service - List paramList = healthCheckClusterService.getResList(clusterPhy.getId()); - if (ValidateUtils.isEmptyList(paramList)) { - // 当前无该维度的资源,则直接设置为 - resultList.addAll(this.getNoResResult(clusterPhy.getId(), healthCheckClusterService, healthConfigMap)); - } - - // 遍历资源 - for (ClusterPhyParam clusterPhyParam: paramList) { - resultList.addAll(this.checkAndGetResult(healthCheckClusterService, clusterPhyParam, healthConfigMap)); - } - - for (HealthCheckResult checkResult: resultList) { - try { - healthCheckResultService.replace(checkResult); - } catch (Exception e) { - log.error("class=ClusterHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e); - } - } - - // 删除10分钟之前的检查结果 - try { - healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 10 * 60 * 1000)); - } catch (Exception e) { - log.error("class=ClusterHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); - } - - return TaskResult.SUCCESS; - } - - private List getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map healthConfigMap) { - List resultList = new ArrayList<>(); - - // 进行检查 - for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { - HealthCheckDimensionEnum dimensionEnum = healthCheckService.getHealthCheckDimensionEnum(); - if (!clusterHealthConfig.getCheckNameEnum().getDimensionEnum().equals(dimensionEnum)) { - // 类型不匹配 - continue; - } - - // 记录 - HealthCheckResult checkResult = new HealthCheckResult( - dimensionEnum.getDimension(), - clusterHealthConfig.getCheckNameEnum().getConfigName(), - clusterPhyId, - "-1" - ); - checkResult.setPassed(Constant.YES); - resultList.add(checkResult); - } - - return resultList; - } - - private List checkAndGetResult(AbstractHealthCheckService healthCheckService, - ClusterPhyParam clusterPhyParam, - Map healthConfigMap) { - List resultList = new ArrayList<>(); - - // 进行检查 - for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { - HealthCheckResult healthCheckResult = healthCheckService.checkAndGetResult(clusterPhyParam, clusterHealthConfig); - if (healthCheckResult == null) { - continue; - } - - // 记录 - resultList.add(healthCheckResult); - } - - return resultList; + public AbstractHealthCheckService getCheckService() { + return healthCheckClusterService; } } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/GroupHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/GroupHealthCheckTask.java index 581a679a..d24f981d 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/GroupHealthCheckTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/GroupHealthCheckTask.java @@ -1,29 +1,13 @@ package com.xiaojukeji.know.streaming.km.task.health; import com.didiglobal.logi.job.annotation.Task; -import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.core.consensual.ConsensualEnum; -import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; -import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; -import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; -import com.xiaojukeji.know.streaming.km.common.constant.Constant; -import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; -import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; import com.xiaojukeji.know.streaming.km.core.service.health.checker.group.HealthCheckGroupService; -import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; -import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; @NoArgsConstructor @AllArgsConstructor @@ -33,98 +17,13 @@ import java.util.Map; autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class GroupHealthCheckTask extends AbstractAsyncMetricsDispatchTask { - private static final ILog log = LogFactory.getLog(GroupHealthCheckTask.class); - - @Autowired - private HealthCheckResultService healthCheckResultService; +public class GroupHealthCheckTask extends AbstractHealthCheckTask { @Autowired private HealthCheckGroupService healthCheckGroupService; @Override - public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs); - } - - private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - // 获取配置,<配置名,配置信息> - Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); - - // 检查结果 - List resultList = new ArrayList<>(); - - // 遍历Check-Service - List paramList = healthCheckGroupService.getResList(clusterPhy.getId()); - if (ValidateUtils.isEmptyList(paramList)) { - // 当前无该维度的资源,则直接设置为 - resultList.addAll(this.getNoResResult(clusterPhy.getId(), healthCheckGroupService, healthConfigMap)); - } - - // 遍历资源 - for (ClusterPhyParam clusterPhyParam: paramList) { - resultList.addAll(this.checkAndGetResult(healthCheckGroupService, clusterPhyParam, healthConfigMap)); - } - - for (HealthCheckResult checkResult: resultList) { - try { - healthCheckResultService.replace(checkResult); - } catch (Exception e) { - log.error("class=GroupHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e); - } - } - - // 删除10分钟之前的检查结果 - try { - healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 10 * 60 * 1000)); - } catch (Exception e) { - log.error("class=GroupHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); - } - - return TaskResult.SUCCESS; - } - - private List getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map healthConfigMap) { - List resultList = new ArrayList<>(); - - // 进行检查 - for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { - HealthCheckDimensionEnum dimensionEnum = healthCheckService.getHealthCheckDimensionEnum(); - if (!clusterHealthConfig.getCheckNameEnum().getDimensionEnum().equals(dimensionEnum)) { - // 类型不匹配 - continue; - } - - // 记录 - HealthCheckResult checkResult = new HealthCheckResult( - dimensionEnum.getDimension(), - clusterHealthConfig.getCheckNameEnum().getConfigName(), - clusterPhyId, - "-1" - ); - checkResult.setPassed(Constant.YES); - resultList.add(checkResult); - } - - return resultList; - } - - private List checkAndGetResult(AbstractHealthCheckService healthCheckService, - ClusterPhyParam clusterPhyParam, - Map healthConfigMap) { - List resultList = new ArrayList<>(); - - // 进行检查 - for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { - HealthCheckResult healthCheckResult = healthCheckService.checkAndGetResult(clusterPhyParam, clusterHealthConfig); - if (healthCheckResult == null) { - continue; - } - - // 记录 - resultList.add(healthCheckResult); - } - - return resultList; + public AbstractHealthCheckService getCheckService() { + return healthCheckGroupService; } } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/TopicHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/TopicHealthCheckTask.java index 8badae99..25a1e531 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/TopicHealthCheckTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/TopicHealthCheckTask.java @@ -1,30 +1,13 @@ package com.xiaojukeji.know.streaming.km.task.health; import com.didiglobal.logi.job.annotation.Task; -import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.core.consensual.ConsensualEnum; -import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; -import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; -import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; -import com.xiaojukeji.know.streaming.km.common.constant.Constant; -import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; -import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; import com.xiaojukeji.know.streaming.km.core.service.health.checker.topic.HealthCheckTopicService; -import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; -import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; - @NoArgsConstructor @AllArgsConstructor @Task(name = "TopicHealthCheckTask", @@ -33,98 +16,13 @@ import java.util.Map; autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class TopicHealthCheckTask extends AbstractAsyncMetricsDispatchTask { - private static final ILog log = LogFactory.getLog(TopicHealthCheckTask.class); - - @Autowired - private HealthCheckResultService healthCheckResultService; +public class TopicHealthCheckTask extends AbstractHealthCheckTask { @Autowired private HealthCheckTopicService healthCheckTopicService; @Override - public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs); - } - - private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - // 获取配置,<配置名,配置信息> - Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); - - // 检查结果 - List resultList = new ArrayList<>(); - - // 遍历Check-Service - List paramList = healthCheckTopicService.getResList(clusterPhy.getId()); - if (ValidateUtils.isEmptyList(paramList)) { - // 当前无该维度的资源,则直接设置为 - resultList.addAll(this.getNoResResult(clusterPhy.getId(), healthCheckTopicService, healthConfigMap)); - } - - // 遍历资源 - for (ClusterPhyParam clusterPhyParam: paramList) { - resultList.addAll(this.checkAndGetResult(healthCheckTopicService, clusterPhyParam, healthConfigMap)); - } - - for (HealthCheckResult checkResult: resultList) { - try { - healthCheckResultService.replace(checkResult); - } catch (Exception e) { - log.error("class=TopicHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e); - } - } - - // 删除10分钟之前的检查结果 - try { - healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 10 * 60 * 1000)); - } catch (Exception e) { - log.error("class=TopicHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); - } - - return TaskResult.SUCCESS; - } - - private List getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map healthConfigMap) { - List resultList = new ArrayList<>(); - - // 进行检查 - for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { - HealthCheckDimensionEnum dimensionEnum = healthCheckService.getHealthCheckDimensionEnum(); - if (!clusterHealthConfig.getCheckNameEnum().getDimensionEnum().equals(dimensionEnum)) { - // 类型不匹配 - continue; - } - - // 记录 - HealthCheckResult checkResult = new HealthCheckResult( - dimensionEnum.getDimension(), - clusterHealthConfig.getCheckNameEnum().getConfigName(), - clusterPhyId, - "-1" - ); - checkResult.setPassed(Constant.YES); - resultList.add(checkResult); - } - - return resultList; - } - - private List checkAndGetResult(AbstractHealthCheckService healthCheckService, - ClusterPhyParam clusterPhyParam, - Map healthConfigMap) { - List resultList = new ArrayList<>(); - - // 进行检查 - for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { - HealthCheckResult healthCheckResult = healthCheckService.checkAndGetResult(clusterPhyParam, clusterHealthConfig); - if (healthCheckResult == null) { - continue; - } - - // 记录 - resultList.add(healthCheckResult); - } - - return resultList; + public AbstractHealthCheckService getCheckService() { + return healthCheckTopicService; } } -- GitLab