Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
DiDi
kafka-manager
提交
facae65f
K
kafka-manager
项目概览
DiDi
/
kafka-manager
大约 1 年 前同步成功
通知
60
Star
6372
Fork
1229
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
K
kafka-manager
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
facae65f
编写于
10月 12, 2022
作者:
Z
zengqiao
提交者:
EricZeng
10月 21, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
健康检查任务优化
上级
0c6475b0
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
127 addition
and
421 deletion
+127
-421
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/AbstractHealthCheckTask.java
...now/streaming/km/task/health/AbstractHealthCheckTask.java
+115
-0
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/BrokerHealthCheckTask.java
.../know/streaming/km/task/health/BrokerHealthCheckTask.java
+3
-106
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/ClusterHealthCheckTask.java
...know/streaming/km/task/health/ClusterHealthCheckTask.java
+3
-106
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/GroupHealthCheckTask.java
...i/know/streaming/km/task/health/GroupHealthCheckTask.java
+3
-104
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/TopicHealthCheckTask.java
...i/know/streaming/km/task/health/TopicHealthCheckTask.java
+3
-105
未找到文件。
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/AbstractHealthCheckTask.java
0 → 100644
浏览文件 @
facae65f
package
com.xiaojukeji.know.streaming.km.task.health
;
import
com.didiglobal.logi.job.common.TaskResult
;
import
com.didiglobal.logi.log.ILog
;
import
com.didiglobal.logi.log.LogFactory
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam
;
import
com.xiaojukeji.know.streaming.km.common.constant.Constant
;
import
com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum
;
import
com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService
;
import
com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.Map
;
public
abstract
class
AbstractHealthCheckTask
extends
AbstractAsyncMetricsDispatchTask
{
private
static
final
ILog
log
=
LogFactory
.
getLog
(
AbstractHealthCheckTask
.
class
);
@Autowired
private
HealthCheckResultService
healthCheckResultService
;
public
abstract
AbstractHealthCheckService
getCheckService
();
@Override
public
TaskResult
processClusterTask
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
return
this
.
calAndUpdateHealthCheckResult
(
clusterPhy
,
triggerTimeUnitMs
);
}
private
TaskResult
calAndUpdateHealthCheckResult
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
// 获取配置,<配置名,配置信息>
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
=
healthCheckResultService
.
getClusterHealthConfig
(
clusterPhy
.
getId
());
// 检查结果
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 遍历Check-Service
List
<
ClusterPhyParam
>
paramList
=
this
.
getCheckService
().
getResList
(
clusterPhy
.
getId
());
if
(
ValidateUtils
.
isEmptyList
(
paramList
))
{
// 当前无该维度的资源,则直接设置为
resultList
.
addAll
(
this
.
getNoResResult
(
clusterPhy
.
getId
(),
this
.
getCheckService
(),
healthConfigMap
));
}
// 遍历资源
for
(
ClusterPhyParam
clusterPhyParam:
paramList
)
{
resultList
.
addAll
(
this
.
checkAndGetResult
(
clusterPhyParam
,
healthConfigMap
));
}
for
(
HealthCheckResult
checkResult:
resultList
)
{
try
{
healthCheckResultService
.
replace
(
checkResult
);
}
catch
(
Exception
e
)
{
log
.
error
(
"class=AbstractHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
checkResult
,
e
);
}
}
// 删除10分钟之前的检查结果
try
{
healthCheckResultService
.
deleteByUpdateTimeBeforeInDB
(
clusterPhy
.
getId
(),
new
Date
(
triggerTimeUnitMs
-
10
*
60
*
1000
));
}
catch
(
Exception
e
)
{
log
.
error
(
"class=AbstractHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
e
);
}
return
TaskResult
.
SUCCESS
;
}
private
List
<
HealthCheckResult
>
getNoResResult
(
Long
clusterPhyId
,
AbstractHealthCheckService
healthCheckService
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckDimensionEnum
dimensionEnum
=
healthCheckService
.
getHealthCheckDimensionEnum
();
if
(!
clusterHealthConfig
.
getCheckNameEnum
().
getDimensionEnum
().
equals
(
dimensionEnum
))
{
// 类型不匹配
continue
;
}
// 记录
HealthCheckResult
checkResult
=
new
HealthCheckResult
(
dimensionEnum
.
getDimension
(),
clusterHealthConfig
.
getCheckNameEnum
().
getConfigName
(),
clusterPhyId
,
"-1"
);
checkResult
.
setPassed
(
Constant
.
YES
);
resultList
.
add
(
checkResult
);
}
return
resultList
;
}
private
List
<
HealthCheckResult
>
checkAndGetResult
(
ClusterPhyParam
clusterPhyParam
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckResult
healthCheckResult
=
this
.
getCheckService
().
checkAndGetResult
(
clusterPhyParam
,
clusterHealthConfig
);
if
(
healthCheckResult
==
null
)
{
continue
;
}
// 记录
resultList
.
add
(
healthCheckResult
);
}
return
resultList
;
}
}
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/BrokerHealthCheckTask.java
浏览文件 @
facae65f
package
com.xiaojukeji.know.streaming.km.task.health
;
import
com.didiglobal.logi.job.annotation.Task
;
import
com.didiglobal.logi.job.common.TaskResult
;
import
com.didiglobal.logi.job.core.consensual.ConsensualEnum
;
import
com.didiglobal.logi.log.ILog
;
import
com.didiglobal.logi.log.LogFactory
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam
;
import
com.xiaojukeji.know.streaming.km.common.constant.Constant
;
import
com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum
;
import
com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.broker.HealthCheckBrokerService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService
;
import
com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask
;
import
lombok.AllArgsConstructor
;
import
lombok.NoArgsConstructor
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.Map
;
@NoArgsConstructor
@AllArgsConstructor
@Task
(
name
=
"BrokerHealthCheckTask"
,
...
...
@@ -33,98 +16,12 @@ import java.util.Map;
autoRegister
=
true
,
consensual
=
ConsensualEnum
.
BROADCAST
,
timeout
=
2
*
60
)
public
class
BrokerHealthCheckTask
extends
AbstractAsyncMetricsDispatchTask
{
private
static
final
ILog
log
=
LogFactory
.
getLog
(
BrokerHealthCheckTask
.
class
);
@Autowired
private
HealthCheckResultService
healthCheckResultService
;
public
class
BrokerHealthCheckTask
extends
AbstractHealthCheckTask
{
@Autowired
private
HealthCheckBrokerService
healthCheckBrokerService
;
@Override
public
TaskResult
processClusterTask
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
return
this
.
calAndUpdateHealthCheckResult
(
clusterPhy
,
triggerTimeUnitMs
);
}
private
TaskResult
calAndUpdateHealthCheckResult
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
// 获取配置,<配置名,配置信息>
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
=
healthCheckResultService
.
getClusterHealthConfig
(
clusterPhy
.
getId
());
// 检查结果
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 遍历Check-Service
List
<
ClusterPhyParam
>
paramList
=
healthCheckBrokerService
.
getResList
(
clusterPhy
.
getId
());
if
(
ValidateUtils
.
isEmptyList
(
paramList
))
{
// 当前无该维度的资源,则直接设置为
resultList
.
addAll
(
this
.
getNoResResult
(
clusterPhy
.
getId
(),
healthCheckBrokerService
,
healthConfigMap
));
}
// 遍历资源
for
(
ClusterPhyParam
clusterPhyParam:
paramList
)
{
resultList
.
addAll
(
this
.
checkAndGetResult
(
healthCheckBrokerService
,
clusterPhyParam
,
healthConfigMap
));
}
for
(
HealthCheckResult
checkResult:
resultList
)
{
try
{
healthCheckResultService
.
replace
(
checkResult
);
}
catch
(
Exception
e
)
{
log
.
error
(
"class=BrokerHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
checkResult
,
e
);
}
}
// 删除10分钟之前的检查结果
try
{
healthCheckResultService
.
deleteByUpdateTimeBeforeInDB
(
clusterPhy
.
getId
(),
new
Date
(
triggerTimeUnitMs
-
10
*
60
*
1000
));
}
catch
(
Exception
e
)
{
log
.
error
(
"class=BrokerHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
e
);
}
return
TaskResult
.
SUCCESS
;
}
private
List
<
HealthCheckResult
>
getNoResResult
(
Long
clusterPhyId
,
AbstractHealthCheckService
healthCheckService
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckDimensionEnum
dimensionEnum
=
healthCheckService
.
getHealthCheckDimensionEnum
();
if
(!
clusterHealthConfig
.
getCheckNameEnum
().
getDimensionEnum
().
equals
(
dimensionEnum
))
{
// 类型不匹配
continue
;
}
// 记录
HealthCheckResult
checkResult
=
new
HealthCheckResult
(
dimensionEnum
.
getDimension
(),
clusterHealthConfig
.
getCheckNameEnum
().
getConfigName
(),
clusterPhyId
,
"-1"
);
checkResult
.
setPassed
(
Constant
.
YES
);
resultList
.
add
(
checkResult
);
}
return
resultList
;
}
private
List
<
HealthCheckResult
>
checkAndGetResult
(
AbstractHealthCheckService
healthCheckService
,
ClusterPhyParam
clusterPhyParam
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckResult
healthCheckResult
=
healthCheckService
.
checkAndGetResult
(
clusterPhyParam
,
clusterHealthConfig
);
if
(
healthCheckResult
==
null
)
{
continue
;
}
// 记录
resultList
.
add
(
healthCheckResult
);
}
return
resultList
;
public
AbstractHealthCheckService
getCheckService
()
{
return
healthCheckBrokerService
;
}
}
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/ClusterHealthCheckTask.java
浏览文件 @
facae65f
package
com.xiaojukeji.know.streaming.km.task.health
;
import
com.didiglobal.logi.job.annotation.Task
;
import
com.didiglobal.logi.job.common.TaskResult
;
import
com.didiglobal.logi.job.core.consensual.ConsensualEnum
;
import
com.didiglobal.logi.log.ILog
;
import
com.didiglobal.logi.log.LogFactory
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam
;
import
com.xiaojukeji.know.streaming.km.common.constant.Constant
;
import
com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum
;
import
com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.cluster.HealthCheckClusterService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService
;
import
com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask
;
import
lombok.AllArgsConstructor
;
import
lombok.NoArgsConstructor
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.Map
;
@NoArgsConstructor
@AllArgsConstructor
@Task
(
name
=
"ClusterHealthCheckTask"
,
...
...
@@ -33,98 +16,12 @@ import java.util.Map;
autoRegister
=
true
,
consensual
=
ConsensualEnum
.
BROADCAST
,
timeout
=
2
*
60
)
public
class
ClusterHealthCheckTask
extends
AbstractAsyncMetricsDispatchTask
{
private
static
final
ILog
log
=
LogFactory
.
getLog
(
ClusterHealthCheckTask
.
class
);
@Autowired
private
HealthCheckResultService
healthCheckResultService
;
public
class
ClusterHealthCheckTask
extends
AbstractHealthCheckTask
{
@Autowired
private
HealthCheckClusterService
healthCheckClusterService
;
@Override
public
TaskResult
processClusterTask
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
return
this
.
calAndUpdateHealthCheckResult
(
clusterPhy
,
triggerTimeUnitMs
);
}
private
TaskResult
calAndUpdateHealthCheckResult
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
// 获取配置,<配置名,配置信息>
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
=
healthCheckResultService
.
getClusterHealthConfig
(
clusterPhy
.
getId
());
// 检查结果
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 遍历Check-Service
List
<
ClusterPhyParam
>
paramList
=
healthCheckClusterService
.
getResList
(
clusterPhy
.
getId
());
if
(
ValidateUtils
.
isEmptyList
(
paramList
))
{
// 当前无该维度的资源,则直接设置为
resultList
.
addAll
(
this
.
getNoResResult
(
clusterPhy
.
getId
(),
healthCheckClusterService
,
healthConfigMap
));
}
// 遍历资源
for
(
ClusterPhyParam
clusterPhyParam:
paramList
)
{
resultList
.
addAll
(
this
.
checkAndGetResult
(
healthCheckClusterService
,
clusterPhyParam
,
healthConfigMap
));
}
for
(
HealthCheckResult
checkResult:
resultList
)
{
try
{
healthCheckResultService
.
replace
(
checkResult
);
}
catch
(
Exception
e
)
{
log
.
error
(
"class=ClusterHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
checkResult
,
e
);
}
}
// 删除10分钟之前的检查结果
try
{
healthCheckResultService
.
deleteByUpdateTimeBeforeInDB
(
clusterPhy
.
getId
(),
new
Date
(
triggerTimeUnitMs
-
10
*
60
*
1000
));
}
catch
(
Exception
e
)
{
log
.
error
(
"class=ClusterHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
e
);
}
return
TaskResult
.
SUCCESS
;
}
private
List
<
HealthCheckResult
>
getNoResResult
(
Long
clusterPhyId
,
AbstractHealthCheckService
healthCheckService
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckDimensionEnum
dimensionEnum
=
healthCheckService
.
getHealthCheckDimensionEnum
();
if
(!
clusterHealthConfig
.
getCheckNameEnum
().
getDimensionEnum
().
equals
(
dimensionEnum
))
{
// 类型不匹配
continue
;
}
// 记录
HealthCheckResult
checkResult
=
new
HealthCheckResult
(
dimensionEnum
.
getDimension
(),
clusterHealthConfig
.
getCheckNameEnum
().
getConfigName
(),
clusterPhyId
,
"-1"
);
checkResult
.
setPassed
(
Constant
.
YES
);
resultList
.
add
(
checkResult
);
}
return
resultList
;
}
private
List
<
HealthCheckResult
>
checkAndGetResult
(
AbstractHealthCheckService
healthCheckService
,
ClusterPhyParam
clusterPhyParam
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckResult
healthCheckResult
=
healthCheckService
.
checkAndGetResult
(
clusterPhyParam
,
clusterHealthConfig
);
if
(
healthCheckResult
==
null
)
{
continue
;
}
// 记录
resultList
.
add
(
healthCheckResult
);
}
return
resultList
;
public
AbstractHealthCheckService
getCheckService
()
{
return
healthCheckClusterService
;
}
}
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/GroupHealthCheckTask.java
浏览文件 @
facae65f
package
com.xiaojukeji.know.streaming.km.task.health
;
import
com.didiglobal.logi.job.annotation.Task
;
import
com.didiglobal.logi.job.common.TaskResult
;
import
com.didiglobal.logi.job.core.consensual.ConsensualEnum
;
import
com.didiglobal.logi.log.ILog
;
import
com.didiglobal.logi.log.LogFactory
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam
;
import
com.xiaojukeji.know.streaming.km.common.constant.Constant
;
import
com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum
;
import
com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.group.HealthCheckGroupService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService
;
import
com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask
;
import
lombok.AllArgsConstructor
;
import
lombok.NoArgsConstructor
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.Map
;
@NoArgsConstructor
@AllArgsConstructor
...
...
@@ -33,98 +17,13 @@ import java.util.Map;
autoRegister
=
true
,
consensual
=
ConsensualEnum
.
BROADCAST
,
timeout
=
2
*
60
)
public
class
GroupHealthCheckTask
extends
AbstractAsyncMetricsDispatchTask
{
private
static
final
ILog
log
=
LogFactory
.
getLog
(
GroupHealthCheckTask
.
class
);
@Autowired
private
HealthCheckResultService
healthCheckResultService
;
public
class
GroupHealthCheckTask
extends
AbstractHealthCheckTask
{
@Autowired
private
HealthCheckGroupService
healthCheckGroupService
;
@Override
public
TaskResult
processClusterTask
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
return
this
.
calAndUpdateHealthCheckResult
(
clusterPhy
,
triggerTimeUnitMs
);
}
private
TaskResult
calAndUpdateHealthCheckResult
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
// 获取配置,<配置名,配置信息>
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
=
healthCheckResultService
.
getClusterHealthConfig
(
clusterPhy
.
getId
());
// 检查结果
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 遍历Check-Service
List
<
ClusterPhyParam
>
paramList
=
healthCheckGroupService
.
getResList
(
clusterPhy
.
getId
());
if
(
ValidateUtils
.
isEmptyList
(
paramList
))
{
// 当前无该维度的资源,则直接设置为
resultList
.
addAll
(
this
.
getNoResResult
(
clusterPhy
.
getId
(),
healthCheckGroupService
,
healthConfigMap
));
}
// 遍历资源
for
(
ClusterPhyParam
clusterPhyParam:
paramList
)
{
resultList
.
addAll
(
this
.
checkAndGetResult
(
healthCheckGroupService
,
clusterPhyParam
,
healthConfigMap
));
}
for
(
HealthCheckResult
checkResult:
resultList
)
{
try
{
healthCheckResultService
.
replace
(
checkResult
);
}
catch
(
Exception
e
)
{
log
.
error
(
"class=GroupHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
checkResult
,
e
);
}
}
// 删除10分钟之前的检查结果
try
{
healthCheckResultService
.
deleteByUpdateTimeBeforeInDB
(
clusterPhy
.
getId
(),
new
Date
(
triggerTimeUnitMs
-
10
*
60
*
1000
));
}
catch
(
Exception
e
)
{
log
.
error
(
"class=GroupHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
e
);
}
return
TaskResult
.
SUCCESS
;
}
private
List
<
HealthCheckResult
>
getNoResResult
(
Long
clusterPhyId
,
AbstractHealthCheckService
healthCheckService
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckDimensionEnum
dimensionEnum
=
healthCheckService
.
getHealthCheckDimensionEnum
();
if
(!
clusterHealthConfig
.
getCheckNameEnum
().
getDimensionEnum
().
equals
(
dimensionEnum
))
{
// 类型不匹配
continue
;
}
// 记录
HealthCheckResult
checkResult
=
new
HealthCheckResult
(
dimensionEnum
.
getDimension
(),
clusterHealthConfig
.
getCheckNameEnum
().
getConfigName
(),
clusterPhyId
,
"-1"
);
checkResult
.
setPassed
(
Constant
.
YES
);
resultList
.
add
(
checkResult
);
}
return
resultList
;
}
private
List
<
HealthCheckResult
>
checkAndGetResult
(
AbstractHealthCheckService
healthCheckService
,
ClusterPhyParam
clusterPhyParam
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckResult
healthCheckResult
=
healthCheckService
.
checkAndGetResult
(
clusterPhyParam
,
clusterHealthConfig
);
if
(
healthCheckResult
==
null
)
{
continue
;
}
// 记录
resultList
.
add
(
healthCheckResult
);
}
return
resultList
;
public
AbstractHealthCheckService
getCheckService
()
{
return
healthCheckGroupService
;
}
}
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/TopicHealthCheckTask.java
浏览文件 @
facae65f
package
com.xiaojukeji.know.streaming.km.task.health
;
import
com.didiglobal.logi.job.annotation.Task
;
import
com.didiglobal.logi.job.common.TaskResult
;
import
com.didiglobal.logi.job.core.consensual.ConsensualEnum
;
import
com.didiglobal.logi.log.ILog
;
import
com.didiglobal.logi.log.LogFactory
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam
;
import
com.xiaojukeji.know.streaming.km.common.constant.Constant
;
import
com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum
;
import
com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.topic.HealthCheckTopicService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService
;
import
com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask
;
import
lombok.AllArgsConstructor
;
import
lombok.NoArgsConstructor
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.Map
;
@NoArgsConstructor
@AllArgsConstructor
@Task
(
name
=
"TopicHealthCheckTask"
,
...
...
@@ -33,98 +16,13 @@ import java.util.Map;
autoRegister
=
true
,
consensual
=
ConsensualEnum
.
BROADCAST
,
timeout
=
2
*
60
)
public
class
TopicHealthCheckTask
extends
AbstractAsyncMetricsDispatchTask
{
private
static
final
ILog
log
=
LogFactory
.
getLog
(
TopicHealthCheckTask
.
class
);
@Autowired
private
HealthCheckResultService
healthCheckResultService
;
public
class
TopicHealthCheckTask
extends
AbstractHealthCheckTask
{
@Autowired
private
HealthCheckTopicService
healthCheckTopicService
;
@Override
public
TaskResult
processClusterTask
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
return
this
.
calAndUpdateHealthCheckResult
(
clusterPhy
,
triggerTimeUnitMs
);
}
private
TaskResult
calAndUpdateHealthCheckResult
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
// 获取配置,<配置名,配置信息>
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
=
healthCheckResultService
.
getClusterHealthConfig
(
clusterPhy
.
getId
());
// 检查结果
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 遍历Check-Service
List
<
ClusterPhyParam
>
paramList
=
healthCheckTopicService
.
getResList
(
clusterPhy
.
getId
());
if
(
ValidateUtils
.
isEmptyList
(
paramList
))
{
// 当前无该维度的资源,则直接设置为
resultList
.
addAll
(
this
.
getNoResResult
(
clusterPhy
.
getId
(),
healthCheckTopicService
,
healthConfigMap
));
}
// 遍历资源
for
(
ClusterPhyParam
clusterPhyParam:
paramList
)
{
resultList
.
addAll
(
this
.
checkAndGetResult
(
healthCheckTopicService
,
clusterPhyParam
,
healthConfigMap
));
}
for
(
HealthCheckResult
checkResult:
resultList
)
{
try
{
healthCheckResultService
.
replace
(
checkResult
);
}
catch
(
Exception
e
)
{
log
.
error
(
"class=TopicHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
checkResult
,
e
);
}
}
// 删除10分钟之前的检查结果
try
{
healthCheckResultService
.
deleteByUpdateTimeBeforeInDB
(
clusterPhy
.
getId
(),
new
Date
(
triggerTimeUnitMs
-
10
*
60
*
1000
));
}
catch
(
Exception
e
)
{
log
.
error
(
"class=TopicHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
e
);
}
return
TaskResult
.
SUCCESS
;
}
private
List
<
HealthCheckResult
>
getNoResResult
(
Long
clusterPhyId
,
AbstractHealthCheckService
healthCheckService
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckDimensionEnum
dimensionEnum
=
healthCheckService
.
getHealthCheckDimensionEnum
();
if
(!
clusterHealthConfig
.
getCheckNameEnum
().
getDimensionEnum
().
equals
(
dimensionEnum
))
{
// 类型不匹配
continue
;
}
// 记录
HealthCheckResult
checkResult
=
new
HealthCheckResult
(
dimensionEnum
.
getDimension
(),
clusterHealthConfig
.
getCheckNameEnum
().
getConfigName
(),
clusterPhyId
,
"-1"
);
checkResult
.
setPassed
(
Constant
.
YES
);
resultList
.
add
(
checkResult
);
}
return
resultList
;
}
private
List
<
HealthCheckResult
>
checkAndGetResult
(
AbstractHealthCheckService
healthCheckService
,
ClusterPhyParam
clusterPhyParam
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckResult
healthCheckResult
=
healthCheckService
.
checkAndGetResult
(
clusterPhyParam
,
clusterHealthConfig
);
if
(
healthCheckResult
==
null
)
{
continue
;
}
// 记录
resultList
.
add
(
healthCheckResult
);
}
return
resultList
;
public
AbstractHealthCheckService
getCheckService
()
{
return
healthCheckTopicService
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录