Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
DiDi
kafka-manager
提交
e8f98218
K
kafka-manager
项目概览
DiDi
/
kafka-manager
10 个月 前同步成功
通知
58
Star
6372
Fork
1229
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
K
kafka-manager
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
e8f98218
编写于
10月 13, 2022
作者:
S
shirenchuang
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/master'
上级
bb167b9f
28fbb5e1
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
161 addition
and
433 deletion
+161
-433
docs/user_guide/faq.md
docs/user_guide/faq.md
+1
-1
km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java
...eaming/km/biz/version/impl/VersionControlManagerImpl.java
+1
-1
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ClusterZookeepersOverviewVO.java
...common/bean/vo/zookeeper/ClusterZookeepersOverviewVO.java
+3
-0
km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java
...ming/km/monitor/component/AbstractMonitorSinkService.java
+27
-8
km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/version/VersionController.java
...w/streaming/km/rest/api/v3/version/VersionController.java
+0
-1
km-rest/src/main/resources/application.yml
km-rest/src/main/resources/application.yml
+2
-1
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/AbstractHealthCheckTask.java
...now/streaming/km/task/health/AbstractHealthCheckTask.java
+115
-0
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/BrokerHealthCheckTask.java
.../know/streaming/km/task/health/BrokerHealthCheckTask.java
+3
-106
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/ClusterHealthCheckTask.java
...know/streaming/km/task/health/ClusterHealthCheckTask.java
+3
-106
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/GroupHealthCheckTask.java
...i/know/streaming/km/task/health/GroupHealthCheckTask.java
+3
-104
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/TopicHealthCheckTask.java
...i/know/streaming/km/task/health/TopicHealthCheckTask.java
+3
-105
未找到文件。
docs/user_guide/faq.md
浏览文件 @
e8f98218
...
...
@@ -37,7 +37,7 @@
## 8.4、`Jmx`连接失败如何解决?
-
参看
[
Jmx 连接配置&问题解决
](
./9-attachment#jmx-连接失败问题解决
)
说明。
-
参看
[
Jmx 连接配置&问题解决
](
https://doc.knowstreaming.com/product/9-attachment#91jmx-%E8%BF%9E%E6%8E%A5%E5%A4%B1%E8%B4%A5%E9%97%AE%E9%A2%98%E8%A7%A3%E5%86%B3
)
说明。
...
...
km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java
浏览文件 @
e8f98218
...
...
@@ -14,7 +14,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem
;
import
com.xiaojukeji.know.streaming.km.common.bean.vo.config.metric.UserMetricConfigVO
;
import
com.xiaojukeji.know.streaming.km.common.bean.vo.version.VersionItemVO
;
import
com.xiaojukeji.know.streaming.km.common.constant.Constant
;
import
com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum
;
import
com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil
;
import
com.xiaojukeji.know.streaming.km.common.utils.VersionUtil
;
...
...
@@ -108,6 +107,7 @@ public class VersionControlManagerImpl implements VersionControlManager {
allVersionItemVO
.
addAll
(
ConvertUtil
.
list2List
(
versionControlService
.
listVersionControlItem
(
METRIC_BROKER
.
getCode
()),
VersionItemVO
.
class
));
allVersionItemVO
.
addAll
(
ConvertUtil
.
list2List
(
versionControlService
.
listVersionControlItem
(
METRIC_PARTITION
.
getCode
()),
VersionItemVO
.
class
));
allVersionItemVO
.
addAll
(
ConvertUtil
.
list2List
(
versionControlService
.
listVersionControlItem
(
METRIC_REPLICATION
.
getCode
()),
VersionItemVO
.
class
));
allVersionItemVO
.
addAll
(
ConvertUtil
.
list2List
(
versionControlService
.
listVersionControlItem
(
METRIC_ZOOKEEPER
.
getCode
()),
VersionItemVO
.
class
));
allVersionItemVO
.
addAll
(
ConvertUtil
.
list2List
(
versionControlService
.
listVersionControlItem
(
WEB_OP
.
getCode
()),
VersionItemVO
.
class
));
Map
<
String
,
VersionItemVO
>
map
=
allVersionItemVO
.
stream
().
collect
(
...
...
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ClusterZookeepersOverviewVO.java
浏览文件 @
e8f98218
...
...
@@ -14,6 +14,9 @@ public class ClusterZookeepersOverviewVO {
@ApiModelProperty
(
value
=
"主机ip"
,
example
=
"121.0.0.1"
)
private
String
host
;
@ApiModelProperty
(
value
=
"主机存活状态,1:Live,0:Down"
,
example
=
"1"
)
private
Integer
status
;
@ApiModelProperty
(
value
=
"端口号"
,
example
=
"2416"
)
private
Integer
port
;
...
...
km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java
浏览文件 @
e8f98218
...
...
@@ -37,29 +37,32 @@ public abstract class AbstractMonitorSinkService implements ApplicationListener<
@Override
public
void
onApplicationEvent
(
BaseMetricEvent
event
)
{
executor
.
execute
(
()
->
{
if
(
event
instanceof
BrokerMetricEvent
)
{
if
(
event
instanceof
BrokerMetricEvent
)
{
BrokerMetricEvent
brokerMetricEvent
=
(
BrokerMetricEvent
)
event
;
sinkMetrics
(
brokerMetric2SinkPoint
(
brokerMetricEvent
.
getBrokerMetrics
()));
}
else
if
(
event
instanceof
ClusterMetricEvent
)
{
}
else
if
(
event
instanceof
ClusterMetricEvent
)
{
ClusterMetricEvent
clusterMetricEvent
=
(
ClusterMetricEvent
)
event
;
sinkMetrics
(
clusterMetric2SinkPoint
(
clusterMetricEvent
.
getClusterMetrics
()));
}
else
if
(
event
instanceof
TopicMetricEvent
)
{
}
else
if
(
event
instanceof
TopicMetricEvent
)
{
TopicMetricEvent
topicMetricEvent
=
(
TopicMetricEvent
)
event
;
sinkMetrics
(
topicMetric2SinkPoint
(
topicMetricEvent
.
getTopicMetrics
()));
}
else
if
(
event
instanceof
PartitionMetricEvent
)
{
}
else
if
(
event
instanceof
PartitionMetricEvent
)
{
PartitionMetricEvent
partitionMetricEvent
=
(
PartitionMetricEvent
)
event
;
sinkMetrics
(
partitionMetric2SinkPoint
(
partitionMetricEvent
.
getPartitionMetrics
()));
}
else
if
(
event
instanceof
GroupMetricEvent
)
{
}
else
if
(
event
instanceof
GroupMetricEvent
)
{
GroupMetricEvent
groupMetricEvent
=
(
GroupMetricEvent
)
event
;
sinkMetrics
(
groupMetric2SinkPoint
(
groupMetricEvent
.
getGroupMetrics
()));
}
else
if
(
event
instanceof
ReplicaMetricEvent
)
{
}
else
if
(
event
instanceof
ReplicaMetricEvent
)
{
ReplicaMetricEvent
replicaMetricEvent
=
(
ReplicaMetricEvent
)
event
;
sinkMetrics
(
replicationMetric2SinkPoint
(
replicaMetricEvent
.
getReplicationMetrics
()));
}
else
if
(
event
instanceof
ZookeeperMetricEvent
)
{
ZookeeperMetricEvent
zookeeperMetricEvent
=
(
ZookeeperMetricEvent
)
event
;
sinkMetrics
(
zookeeperMetric2SinkPoint
(
zookeeperMetricEvent
.
getZookeeperMetrics
()));
}
}
);
}
...
...
@@ -72,6 +75,7 @@ public abstract class AbstractMonitorSinkService implements ApplicationListener<
public
abstract
Boolean
sinkMetrics
(
List
<
MetricSinkPoint
>
pointList
);
/**************************************************** private method ****************************************************/
private
List
<
MetricSinkPoint
>
brokerMetric2SinkPoint
(
List
<
BrokerMetrics
>
brokerMetrics
){
List
<
MetricSinkPoint
>
pointList
=
new
ArrayList
<>();
...
...
@@ -161,8 +165,23 @@ public abstract class AbstractMonitorSinkService implements ApplicationListener<
return
pointList
;
}
private
List
<
MetricSinkPoint
>
genSinkPoint
(
String
metricPre
,
Map
<
String
,
Float
>
metrics
,
long
timeStamp
,
Map
<
String
,
Object
>
tagsMap
){
private
List
<
MetricSinkPoint
>
zookeeperMetric2SinkPoint
(
List
<
ZookeeperMetrics
>
zookeeperMetricsList
){
List
<
MetricSinkPoint
>
pointList
=
new
ArrayList
<>();
for
(
ZookeeperMetrics
z
:
zookeeperMetricsList
){
Map
<
String
,
Object
>
tagsMap
=
new
HashMap
<>();
tagsMap
.
put
(
CLUSTER_ID
.
getName
(),
z
.
getClusterPhyId
());
pointList
.
addAll
(
genSinkPoint
(
"Zookeeper"
,
z
.
getMetrics
(),
z
.
getTimestamp
(),
tagsMap
));
}
return
pointList
;
}
private
List
<
MetricSinkPoint
>
genSinkPoint
(
String
metricPre
,
Map
<
String
,
Float
>
metrics
,
long
timeStamp
,
Map
<
String
,
Object
>
tagsMap
)
{
List
<
MetricSinkPoint
>
pointList
=
new
ArrayList
<>();
for
(
String
metricName
:
metrics
.
keySet
()){
...
...
km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/version/VersionController.java
浏览文件 @
e8f98218
...
...
@@ -15,7 +15,6 @@ import org.springframework.validation.annotation.Validated;
import
org.springframework.web.bind.annotation.*
;
import
javax.servlet.http.HttpServletRequest
;
import
javax.validation.Valid
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.SortedMap
;
...
...
km-rest/src/main/resources/application.yml
浏览文件 @
e8f98218
...
...
@@ -84,7 +84,8 @@ client-pool:
es
:
client
:
address
:
127.0.0.1:8091,127.0.0.1:8061,127.0.0.1:8061
client-cnt
:
10
pass
:
# ES账号密码,如果有账号密码,按照 username:password 的格式填写,没有则不需要填写
client-cnt
:
10
# 创建的ES客户端数
io-thread-cnt
:
2
max-retry-cnt
:
5
...
...
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/AbstractHealthCheckTask.java
0 → 100644
浏览文件 @
e8f98218
package
com.xiaojukeji.know.streaming.km.task.health
;
import
com.didiglobal.logi.job.common.TaskResult
;
import
com.didiglobal.logi.log.ILog
;
import
com.didiglobal.logi.log.LogFactory
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam
;
import
com.xiaojukeji.know.streaming.km.common.constant.Constant
;
import
com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum
;
import
com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService
;
import
com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.Map
;
public
abstract
class
AbstractHealthCheckTask
extends
AbstractAsyncMetricsDispatchTask
{
private
static
final
ILog
log
=
LogFactory
.
getLog
(
AbstractHealthCheckTask
.
class
);
@Autowired
private
HealthCheckResultService
healthCheckResultService
;
public
abstract
AbstractHealthCheckService
getCheckService
();
@Override
public
TaskResult
processClusterTask
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
return
this
.
calAndUpdateHealthCheckResult
(
clusterPhy
,
triggerTimeUnitMs
);
}
private
TaskResult
calAndUpdateHealthCheckResult
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
// 获取配置,<配置名,配置信息>
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
=
healthCheckResultService
.
getClusterHealthConfig
(
clusterPhy
.
getId
());
// 检查结果
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 遍历Check-Service
List
<
ClusterPhyParam
>
paramList
=
this
.
getCheckService
().
getResList
(
clusterPhy
.
getId
());
if
(
ValidateUtils
.
isEmptyList
(
paramList
))
{
// 当前无该维度的资源,则直接设置为
resultList
.
addAll
(
this
.
getNoResResult
(
clusterPhy
.
getId
(),
this
.
getCheckService
(),
healthConfigMap
));
}
// 遍历资源
for
(
ClusterPhyParam
clusterPhyParam:
paramList
)
{
resultList
.
addAll
(
this
.
checkAndGetResult
(
clusterPhyParam
,
healthConfigMap
));
}
for
(
HealthCheckResult
checkResult:
resultList
)
{
try
{
healthCheckResultService
.
replace
(
checkResult
);
}
catch
(
Exception
e
)
{
log
.
error
(
"class=AbstractHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
checkResult
,
e
);
}
}
// 删除10分钟之前的检查结果
try
{
healthCheckResultService
.
deleteByUpdateTimeBeforeInDB
(
clusterPhy
.
getId
(),
new
Date
(
triggerTimeUnitMs
-
10
*
60
*
1000
));
}
catch
(
Exception
e
)
{
log
.
error
(
"class=AbstractHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
e
);
}
return
TaskResult
.
SUCCESS
;
}
private
List
<
HealthCheckResult
>
getNoResResult
(
Long
clusterPhyId
,
AbstractHealthCheckService
healthCheckService
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckDimensionEnum
dimensionEnum
=
healthCheckService
.
getHealthCheckDimensionEnum
();
if
(!
clusterHealthConfig
.
getCheckNameEnum
().
getDimensionEnum
().
equals
(
dimensionEnum
))
{
// 类型不匹配
continue
;
}
// 记录
HealthCheckResult
checkResult
=
new
HealthCheckResult
(
dimensionEnum
.
getDimension
(),
clusterHealthConfig
.
getCheckNameEnum
().
getConfigName
(),
clusterPhyId
,
"-1"
);
checkResult
.
setPassed
(
Constant
.
YES
);
resultList
.
add
(
checkResult
);
}
return
resultList
;
}
private
List
<
HealthCheckResult
>
checkAndGetResult
(
ClusterPhyParam
clusterPhyParam
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckResult
healthCheckResult
=
this
.
getCheckService
().
checkAndGetResult
(
clusterPhyParam
,
clusterHealthConfig
);
if
(
healthCheckResult
==
null
)
{
continue
;
}
// 记录
resultList
.
add
(
healthCheckResult
);
}
return
resultList
;
}
}
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/BrokerHealthCheckTask.java
浏览文件 @
e8f98218
package
com.xiaojukeji.know.streaming.km.task.health
;
import
com.didiglobal.logi.job.annotation.Task
;
import
com.didiglobal.logi.job.common.TaskResult
;
import
com.didiglobal.logi.job.core.consensual.ConsensualEnum
;
import
com.didiglobal.logi.log.ILog
;
import
com.didiglobal.logi.log.LogFactory
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam
;
import
com.xiaojukeji.know.streaming.km.common.constant.Constant
;
import
com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum
;
import
com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.broker.HealthCheckBrokerService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService
;
import
com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask
;
import
lombok.AllArgsConstructor
;
import
lombok.NoArgsConstructor
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.Map
;
@NoArgsConstructor
@AllArgsConstructor
@Task
(
name
=
"BrokerHealthCheckTask"
,
...
...
@@ -33,98 +16,12 @@ import java.util.Map;
autoRegister
=
true
,
consensual
=
ConsensualEnum
.
BROADCAST
,
timeout
=
2
*
60
)
public
class
BrokerHealthCheckTask
extends
AbstractAsyncMetricsDispatchTask
{
private
static
final
ILog
log
=
LogFactory
.
getLog
(
BrokerHealthCheckTask
.
class
);
@Autowired
private
HealthCheckResultService
healthCheckResultService
;
public
class
BrokerHealthCheckTask
extends
AbstractHealthCheckTask
{
@Autowired
private
HealthCheckBrokerService
healthCheckBrokerService
;
@Override
public
TaskResult
processClusterTask
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
return
this
.
calAndUpdateHealthCheckResult
(
clusterPhy
,
triggerTimeUnitMs
);
}
private
TaskResult
calAndUpdateHealthCheckResult
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
// 获取配置,<配置名,配置信息>
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
=
healthCheckResultService
.
getClusterHealthConfig
(
clusterPhy
.
getId
());
// 检查结果
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 遍历Check-Service
List
<
ClusterPhyParam
>
paramList
=
healthCheckBrokerService
.
getResList
(
clusterPhy
.
getId
());
if
(
ValidateUtils
.
isEmptyList
(
paramList
))
{
// 当前无该维度的资源,则直接设置为
resultList
.
addAll
(
this
.
getNoResResult
(
clusterPhy
.
getId
(),
healthCheckBrokerService
,
healthConfigMap
));
}
// 遍历资源
for
(
ClusterPhyParam
clusterPhyParam:
paramList
)
{
resultList
.
addAll
(
this
.
checkAndGetResult
(
healthCheckBrokerService
,
clusterPhyParam
,
healthConfigMap
));
}
for
(
HealthCheckResult
checkResult:
resultList
)
{
try
{
healthCheckResultService
.
replace
(
checkResult
);
}
catch
(
Exception
e
)
{
log
.
error
(
"class=BrokerHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
checkResult
,
e
);
}
}
// 删除10分钟之前的检查结果
try
{
healthCheckResultService
.
deleteByUpdateTimeBeforeInDB
(
clusterPhy
.
getId
(),
new
Date
(
triggerTimeUnitMs
-
10
*
60
*
1000
));
}
catch
(
Exception
e
)
{
log
.
error
(
"class=BrokerHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
e
);
}
return
TaskResult
.
SUCCESS
;
}
private
List
<
HealthCheckResult
>
getNoResResult
(
Long
clusterPhyId
,
AbstractHealthCheckService
healthCheckService
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckDimensionEnum
dimensionEnum
=
healthCheckService
.
getHealthCheckDimensionEnum
();
if
(!
clusterHealthConfig
.
getCheckNameEnum
().
getDimensionEnum
().
equals
(
dimensionEnum
))
{
// 类型不匹配
continue
;
}
// 记录
HealthCheckResult
checkResult
=
new
HealthCheckResult
(
dimensionEnum
.
getDimension
(),
clusterHealthConfig
.
getCheckNameEnum
().
getConfigName
(),
clusterPhyId
,
"-1"
);
checkResult
.
setPassed
(
Constant
.
YES
);
resultList
.
add
(
checkResult
);
}
return
resultList
;
}
private
List
<
HealthCheckResult
>
checkAndGetResult
(
AbstractHealthCheckService
healthCheckService
,
ClusterPhyParam
clusterPhyParam
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckResult
healthCheckResult
=
healthCheckService
.
checkAndGetResult
(
clusterPhyParam
,
clusterHealthConfig
);
if
(
healthCheckResult
==
null
)
{
continue
;
}
// 记录
resultList
.
add
(
healthCheckResult
);
}
return
resultList
;
public
AbstractHealthCheckService
getCheckService
()
{
return
healthCheckBrokerService
;
}
}
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/ClusterHealthCheckTask.java
浏览文件 @
e8f98218
package
com.xiaojukeji.know.streaming.km.task.health
;
import
com.didiglobal.logi.job.annotation.Task
;
import
com.didiglobal.logi.job.common.TaskResult
;
import
com.didiglobal.logi.job.core.consensual.ConsensualEnum
;
import
com.didiglobal.logi.log.ILog
;
import
com.didiglobal.logi.log.LogFactory
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam
;
import
com.xiaojukeji.know.streaming.km.common.constant.Constant
;
import
com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum
;
import
com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.cluster.HealthCheckClusterService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService
;
import
com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask
;
import
lombok.AllArgsConstructor
;
import
lombok.NoArgsConstructor
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.Map
;
@NoArgsConstructor
@AllArgsConstructor
@Task
(
name
=
"ClusterHealthCheckTask"
,
...
...
@@ -33,98 +16,12 @@ import java.util.Map;
autoRegister
=
true
,
consensual
=
ConsensualEnum
.
BROADCAST
,
timeout
=
2
*
60
)
public
class
ClusterHealthCheckTask
extends
AbstractAsyncMetricsDispatchTask
{
private
static
final
ILog
log
=
LogFactory
.
getLog
(
ClusterHealthCheckTask
.
class
);
@Autowired
private
HealthCheckResultService
healthCheckResultService
;
public
class
ClusterHealthCheckTask
extends
AbstractHealthCheckTask
{
@Autowired
private
HealthCheckClusterService
healthCheckClusterService
;
@Override
public
TaskResult
processClusterTask
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
return
this
.
calAndUpdateHealthCheckResult
(
clusterPhy
,
triggerTimeUnitMs
);
}
private
TaskResult
calAndUpdateHealthCheckResult
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
// 获取配置,<配置名,配置信息>
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
=
healthCheckResultService
.
getClusterHealthConfig
(
clusterPhy
.
getId
());
// 检查结果
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 遍历Check-Service
List
<
ClusterPhyParam
>
paramList
=
healthCheckClusterService
.
getResList
(
clusterPhy
.
getId
());
if
(
ValidateUtils
.
isEmptyList
(
paramList
))
{
// 当前无该维度的资源,则直接设置为
resultList
.
addAll
(
this
.
getNoResResult
(
clusterPhy
.
getId
(),
healthCheckClusterService
,
healthConfigMap
));
}
// 遍历资源
for
(
ClusterPhyParam
clusterPhyParam:
paramList
)
{
resultList
.
addAll
(
this
.
checkAndGetResult
(
healthCheckClusterService
,
clusterPhyParam
,
healthConfigMap
));
}
for
(
HealthCheckResult
checkResult:
resultList
)
{
try
{
healthCheckResultService
.
replace
(
checkResult
);
}
catch
(
Exception
e
)
{
log
.
error
(
"class=ClusterHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
checkResult
,
e
);
}
}
// 删除10分钟之前的检查结果
try
{
healthCheckResultService
.
deleteByUpdateTimeBeforeInDB
(
clusterPhy
.
getId
(),
new
Date
(
triggerTimeUnitMs
-
10
*
60
*
1000
));
}
catch
(
Exception
e
)
{
log
.
error
(
"class=ClusterHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
e
);
}
return
TaskResult
.
SUCCESS
;
}
private
List
<
HealthCheckResult
>
getNoResResult
(
Long
clusterPhyId
,
AbstractHealthCheckService
healthCheckService
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckDimensionEnum
dimensionEnum
=
healthCheckService
.
getHealthCheckDimensionEnum
();
if
(!
clusterHealthConfig
.
getCheckNameEnum
().
getDimensionEnum
().
equals
(
dimensionEnum
))
{
// 类型不匹配
continue
;
}
// 记录
HealthCheckResult
checkResult
=
new
HealthCheckResult
(
dimensionEnum
.
getDimension
(),
clusterHealthConfig
.
getCheckNameEnum
().
getConfigName
(),
clusterPhyId
,
"-1"
);
checkResult
.
setPassed
(
Constant
.
YES
);
resultList
.
add
(
checkResult
);
}
return
resultList
;
}
private
List
<
HealthCheckResult
>
checkAndGetResult
(
AbstractHealthCheckService
healthCheckService
,
ClusterPhyParam
clusterPhyParam
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckResult
healthCheckResult
=
healthCheckService
.
checkAndGetResult
(
clusterPhyParam
,
clusterHealthConfig
);
if
(
healthCheckResult
==
null
)
{
continue
;
}
// 记录
resultList
.
add
(
healthCheckResult
);
}
return
resultList
;
public
AbstractHealthCheckService
getCheckService
()
{
return
healthCheckClusterService
;
}
}
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/GroupHealthCheckTask.java
浏览文件 @
e8f98218
package
com.xiaojukeji.know.streaming.km.task.health
;
import
com.didiglobal.logi.job.annotation.Task
;
import
com.didiglobal.logi.job.common.TaskResult
;
import
com.didiglobal.logi.job.core.consensual.ConsensualEnum
;
import
com.didiglobal.logi.log.ILog
;
import
com.didiglobal.logi.log.LogFactory
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam
;
import
com.xiaojukeji.know.streaming.km.common.constant.Constant
;
import
com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum
;
import
com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.group.HealthCheckGroupService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService
;
import
com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask
;
import
lombok.AllArgsConstructor
;
import
lombok.NoArgsConstructor
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.Map
;
@NoArgsConstructor
@AllArgsConstructor
...
...
@@ -33,98 +17,13 @@ import java.util.Map;
autoRegister
=
true
,
consensual
=
ConsensualEnum
.
BROADCAST
,
timeout
=
2
*
60
)
public
class
GroupHealthCheckTask
extends
AbstractAsyncMetricsDispatchTask
{
private
static
final
ILog
log
=
LogFactory
.
getLog
(
GroupHealthCheckTask
.
class
);
@Autowired
private
HealthCheckResultService
healthCheckResultService
;
public
class
GroupHealthCheckTask
extends
AbstractHealthCheckTask
{
@Autowired
private
HealthCheckGroupService
healthCheckGroupService
;
@Override
public
TaskResult
processClusterTask
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
return
this
.
calAndUpdateHealthCheckResult
(
clusterPhy
,
triggerTimeUnitMs
);
}
private
TaskResult
calAndUpdateHealthCheckResult
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
// 获取配置,<配置名,配置信息>
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
=
healthCheckResultService
.
getClusterHealthConfig
(
clusterPhy
.
getId
());
// 检查结果
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 遍历Check-Service
List
<
ClusterPhyParam
>
paramList
=
healthCheckGroupService
.
getResList
(
clusterPhy
.
getId
());
if
(
ValidateUtils
.
isEmptyList
(
paramList
))
{
// 当前无该维度的资源,则直接设置为
resultList
.
addAll
(
this
.
getNoResResult
(
clusterPhy
.
getId
(),
healthCheckGroupService
,
healthConfigMap
));
}
// 遍历资源
for
(
ClusterPhyParam
clusterPhyParam:
paramList
)
{
resultList
.
addAll
(
this
.
checkAndGetResult
(
healthCheckGroupService
,
clusterPhyParam
,
healthConfigMap
));
}
for
(
HealthCheckResult
checkResult:
resultList
)
{
try
{
healthCheckResultService
.
replace
(
checkResult
);
}
catch
(
Exception
e
)
{
log
.
error
(
"class=GroupHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
checkResult
,
e
);
}
}
// 删除10分钟之前的检查结果
try
{
healthCheckResultService
.
deleteByUpdateTimeBeforeInDB
(
clusterPhy
.
getId
(),
new
Date
(
triggerTimeUnitMs
-
10
*
60
*
1000
));
}
catch
(
Exception
e
)
{
log
.
error
(
"class=GroupHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
e
);
}
return
TaskResult
.
SUCCESS
;
}
private
List
<
HealthCheckResult
>
getNoResResult
(
Long
clusterPhyId
,
AbstractHealthCheckService
healthCheckService
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckDimensionEnum
dimensionEnum
=
healthCheckService
.
getHealthCheckDimensionEnum
();
if
(!
clusterHealthConfig
.
getCheckNameEnum
().
getDimensionEnum
().
equals
(
dimensionEnum
))
{
// 类型不匹配
continue
;
}
// 记录
HealthCheckResult
checkResult
=
new
HealthCheckResult
(
dimensionEnum
.
getDimension
(),
clusterHealthConfig
.
getCheckNameEnum
().
getConfigName
(),
clusterPhyId
,
"-1"
);
checkResult
.
setPassed
(
Constant
.
YES
);
resultList
.
add
(
checkResult
);
}
return
resultList
;
}
private
List
<
HealthCheckResult
>
checkAndGetResult
(
AbstractHealthCheckService
healthCheckService
,
ClusterPhyParam
clusterPhyParam
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckResult
healthCheckResult
=
healthCheckService
.
checkAndGetResult
(
clusterPhyParam
,
clusterHealthConfig
);
if
(
healthCheckResult
==
null
)
{
continue
;
}
// 记录
resultList
.
add
(
healthCheckResult
);
}
return
resultList
;
public
AbstractHealthCheckService
getCheckService
()
{
return
healthCheckGroupService
;
}
}
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/TopicHealthCheckTask.java
浏览文件 @
e8f98218
package
com.xiaojukeji.know.streaming.km.task.health
;
import
com.didiglobal.logi.job.annotation.Task
;
import
com.didiglobal.logi.job.common.TaskResult
;
import
com.didiglobal.logi.job.core.consensual.ConsensualEnum
;
import
com.didiglobal.logi.log.ILog
;
import
com.didiglobal.logi.log.LogFactory
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam
;
import
com.xiaojukeji.know.streaming.km.common.constant.Constant
;
import
com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum
;
import
com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.topic.HealthCheckTopicService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService
;
import
com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask
;
import
lombok.AllArgsConstructor
;
import
lombok.NoArgsConstructor
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.Map
;
@NoArgsConstructor
@AllArgsConstructor
@Task
(
name
=
"TopicHealthCheckTask"
,
...
...
@@ -33,98 +16,13 @@ import java.util.Map;
autoRegister
=
true
,
consensual
=
ConsensualEnum
.
BROADCAST
,
timeout
=
2
*
60
)
public
class
TopicHealthCheckTask
extends
AbstractAsyncMetricsDispatchTask
{
private
static
final
ILog
log
=
LogFactory
.
getLog
(
TopicHealthCheckTask
.
class
);
@Autowired
private
HealthCheckResultService
healthCheckResultService
;
public
class
TopicHealthCheckTask
extends
AbstractHealthCheckTask
{
@Autowired
private
HealthCheckTopicService
healthCheckTopicService
;
@Override
public
TaskResult
processClusterTask
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
return
this
.
calAndUpdateHealthCheckResult
(
clusterPhy
,
triggerTimeUnitMs
);
}
private
TaskResult
calAndUpdateHealthCheckResult
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
// 获取配置,<配置名,配置信息>
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
=
healthCheckResultService
.
getClusterHealthConfig
(
clusterPhy
.
getId
());
// 检查结果
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 遍历Check-Service
List
<
ClusterPhyParam
>
paramList
=
healthCheckTopicService
.
getResList
(
clusterPhy
.
getId
());
if
(
ValidateUtils
.
isEmptyList
(
paramList
))
{
// 当前无该维度的资源,则直接设置为
resultList
.
addAll
(
this
.
getNoResResult
(
clusterPhy
.
getId
(),
healthCheckTopicService
,
healthConfigMap
));
}
// 遍历资源
for
(
ClusterPhyParam
clusterPhyParam:
paramList
)
{
resultList
.
addAll
(
this
.
checkAndGetResult
(
healthCheckTopicService
,
clusterPhyParam
,
healthConfigMap
));
}
for
(
HealthCheckResult
checkResult:
resultList
)
{
try
{
healthCheckResultService
.
replace
(
checkResult
);
}
catch
(
Exception
e
)
{
log
.
error
(
"class=TopicHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
checkResult
,
e
);
}
}
// 删除10分钟之前的检查结果
try
{
healthCheckResultService
.
deleteByUpdateTimeBeforeInDB
(
clusterPhy
.
getId
(),
new
Date
(
triggerTimeUnitMs
-
10
*
60
*
1000
));
}
catch
(
Exception
e
)
{
log
.
error
(
"class=TopicHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
e
);
}
return
TaskResult
.
SUCCESS
;
}
private
List
<
HealthCheckResult
>
getNoResResult
(
Long
clusterPhyId
,
AbstractHealthCheckService
healthCheckService
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckDimensionEnum
dimensionEnum
=
healthCheckService
.
getHealthCheckDimensionEnum
();
if
(!
clusterHealthConfig
.
getCheckNameEnum
().
getDimensionEnum
().
equals
(
dimensionEnum
))
{
// 类型不匹配
continue
;
}
// 记录
HealthCheckResult
checkResult
=
new
HealthCheckResult
(
dimensionEnum
.
getDimension
(),
clusterHealthConfig
.
getCheckNameEnum
().
getConfigName
(),
clusterPhyId
,
"-1"
);
checkResult
.
setPassed
(
Constant
.
YES
);
resultList
.
add
(
checkResult
);
}
return
resultList
;
}
private
List
<
HealthCheckResult
>
checkAndGetResult
(
AbstractHealthCheckService
healthCheckService
,
ClusterPhyParam
clusterPhyParam
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
List
<
HealthCheckResult
>
resultList
=
new
ArrayList
<>();
// 进行检查
for
(
BaseClusterHealthConfig
clusterHealthConfig:
healthConfigMap
.
values
())
{
HealthCheckResult
healthCheckResult
=
healthCheckService
.
checkAndGetResult
(
clusterPhyParam
,
clusterHealthConfig
);
if
(
healthCheckResult
==
null
)
{
continue
;
}
// 记录
resultList
.
add
(
healthCheckResult
);
}
return
resultList
;
public
AbstractHealthCheckService
getCheckService
()
{
return
healthCheckTopicService
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录