Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
DiDi
kafka-manager
提交
714e9a56
K
kafka-manager
项目概览
DiDi
/
kafka-manager
大约 1 年 前同步成功
通知
60
Star
6372
Fork
1229
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
K
kafka-manager
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
714e9a56
编写于
10月 20, 2022
作者:
Z
zengqiao
提交者:
EricZeng
10月 21, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[Optimize] 优化ZK指标的获取,减少重复采集的出现 (#709)
1、避免不同集群,相同的ZK地址时,指标重复获取的情况; 2、避免集群某个ZK地址获取指标失败时,下一个周期还会继续尝试从该地址获取指标;
上级
88d0a601
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
102 addition
and
32 deletion
+102
-32
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/ZookeeperLocalCache.java
...eji/know/streaming/km/core/cache/ZookeeperLocalCache.java
+46
-0
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperMetricServiceImpl.java
...re/service/zookeeper/impl/ZookeeperMetricServiceImpl.java
+56
-32
未找到文件。
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/ZookeeperLocalCache.java
0 → 100644
浏览文件 @
714e9a56
package
com.xiaojukeji.know.streaming.km.core.cache
;
import
com.github.benmanes.caffeine.cache.Cache
;
import
com.github.benmanes.caffeine.cache.Caffeine
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.BaseFourLetterWordCmdData
;
import
java.util.concurrent.TimeUnit
;
public
class
ZookeeperLocalCache
{
private
static
final
Cache
<
String
,
String
>
fourLetterCmdFailedServerCache
=
Caffeine
.
newBuilder
()
.
expireAfterWrite
(
10
,
TimeUnit
.
MINUTES
)
.
maximumSize
(
10000
)
.
build
();
private
static
final
Cache
<
String
,
BaseFourLetterWordCmdData
>
fourLetterCmdDataCache
=
Caffeine
.
newBuilder
()
.
expireAfterWrite
(
60
,
TimeUnit
.
SECONDS
)
.
maximumSize
(
10000
)
.
build
();
public
static
boolean
canUse
(
String
host
,
int
port
,
String
cmd
)
{
String
data
=
fourLetterCmdFailedServerCache
.
getIfPresent
(
gen4lwFailedKey
(
host
,
port
,
cmd
));
return
data
==
null
;
}
public
static
void
setFailed
(
String
host
,
int
port
,
String
cmd
)
{
fourLetterCmdFailedServerCache
.
put
(
gen4lwFailedKey
(
host
,
port
,
cmd
),
""
);
}
public
static
BaseFourLetterWordCmdData
getData
(
String
host
,
int
port
,
String
cmd
)
{
return
fourLetterCmdDataCache
.
getIfPresent
(
gen4lwFailedKey
(
host
,
port
,
cmd
));
}
public
static
void
putData
(
String
host
,
int
port
,
String
cmd
,
BaseFourLetterWordCmdData
cmdData
)
{
fourLetterCmdDataCache
.
put
(
gen4lwFailedKey
(
host
,
port
,
cmd
),
cmdData
);
}
/**************************************************** private method ****************************************************/
private
static
String
gen4lwFailedKey
(
String
host
,
int
port
,
String
cmd
)
{
return
host
+
"@"
+
port
+
"@"
+
cmd
;
}
}
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperMetricServiceImpl.java
浏览文件 @
714e9a56
...
...
@@ -10,6 +10,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.Zookeepe
import
com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionJmxInfo
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.BaseFourLetterWordCmdData
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ServerCmdData
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.MonitorCmdDataParser
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.ServerCmdDataParser
;
...
...
@@ -26,8 +27,8 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterw
import
com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics
;
import
com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO
;
import
com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil
;
import
com.xiaojukeji.know.streaming.km.core.cache.ZookeeperLocalCache
;
import
com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService
;
import
com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService
;
import
com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService
;
import
com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService
;
import
com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService
;
...
...
@@ -53,6 +54,7 @@ public class ZookeeperMetricServiceImpl extends BaseMetricService implements Zoo
public
static
final
String
ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD
=
"getMetricFromMonitorCmd"
;
public
static
final
String
ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD
=
"getMetricFromServerCmd"
;
public
static
final
String
ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX
=
"getMetricFromKafkaByJMX"
;
public
static
final
String
ZOOKEEPER_METHOD_GET_METRIC_FROM_HEALTH_SERVICE
=
"getMetricFromHealthService"
;
@Autowired
private
ClusterPhyService
clusterPhyService
;
...
...
@@ -66,9 +68,6 @@ public class ZookeeperMetricServiceImpl extends BaseMetricService implements Zoo
@Autowired
private
KafkaJMXClient
kafkaJMXClient
;
@Autowired
private
KafkaControllerService
kafkaControllerService
;
@Override
protected
VersionItemTypeEnum
getVersionItemType
()
{
return
VersionItemTypeEnum
.
METRIC_ZOOKEEPER
;
...
...
@@ -171,24 +170,37 @@ public class ZookeeperMetricServiceImpl extends BaseMetricService implements Zoo
Result
<
ZookeeperMetrics
>
rz
=
null
;
for
(
Tuple
<
String
,
Integer
>
hostPort:
param
.
getZkAddressList
())
{
Result
<
ServerCmdData
>
cmdDataResult
=
FourLetterWordUtil
.
executeFourLetterCmd
(
param
.
getClusterPhyId
(),
hostPort
.
getV1
(),
hostPort
.
getV2
(),
param
.
getZkConfig
()
!=
null
?
param
.
getZkConfig
().
getOpenSecure
():
false
,
param
.
getZkConfig
()
!=
null
?
param
.
getZkConfig
().
getRequestTimeoutUnitMs
():
Constant
.
DEFAULT_REQUEST_TIMEOUT_UNIT_MS
,
new
ServerCmdDataParser
()
);
if
(
cmdDataResult
.
failed
())
{
rz
=
Result
.
buildFromIgnoreData
(
cmdDataResult
);
ServerCmdData
cmdData
=
null
;
BaseFourLetterWordCmdData
baseCmdData
=
ZookeeperLocalCache
.
getData
(
hostPort
.
getV1
(),
hostPort
.
getV2
(),
FourLetterWordUtil
.
ServerCmd
);
if
(
baseCmdData
!=
null
)
{
cmdData
=
(
ServerCmdData
)
baseCmdData
;
}
else
if
(
ZookeeperLocalCache
.
canUse
(
hostPort
.
getV1
(),
hostPort
.
getV2
(),
FourLetterWordUtil
.
ServerCmd
))
{
Result
<
ServerCmdData
>
cmdDataResult
=
FourLetterWordUtil
.
executeFourLetterCmd
(
param
.
getClusterPhyId
(),
hostPort
.
getV1
(),
hostPort
.
getV2
(),
param
.
getZkConfig
()
!=
null
?
param
.
getZkConfig
().
getOpenSecure
():
false
,
param
.
getZkConfig
()
!=
null
?
param
.
getZkConfig
().
getRequestTimeoutUnitMs
():
Constant
.
DEFAULT_REQUEST_TIMEOUT_UNIT_MS
,
new
ServerCmdDataParser
()
);
if
(
cmdDataResult
.
failed
())
{
ZookeeperLocalCache
.
setFailed
(
hostPort
.
getV1
(),
hostPort
.
getV2
(),
FourLetterWordUtil
.
ServerCmd
);
rz
=
Result
.
buildFromIgnoreData
(
cmdDataResult
);
continue
;
}
cmdData
=
cmdDataResult
.
getData
();
ZookeeperLocalCache
.
putData
(
hostPort
.
getV1
(),
hostPort
.
getV2
(),
FourLetterWordUtil
.
ServerCmd
,
cmdData
);
}
else
{
// baseCmdData为空 且 当前地址不可使用
continue
;
}
ServerCmdData
cmdData
=
cmdDataResult
.
getData
();
ZookeeperMetrics
metrics
=
new
ZookeeperMetrics
(
param
.
getClusterPhyId
());
metrics
.
putMetric
(
ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY
,
cmdData
.
getZkAvgLatency
()
.
floatValue
()
);
metrics
.
putMetric
(
ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY
,
cmdData
.
getZkAvgLatency
());
metrics
.
putMetric
(
ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY
,
cmdData
.
getZkMinLatency
().
floatValue
());
metrics
.
putMetric
(
ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY
,
cmdData
.
getZkMaxLatency
().
floatValue
());
metrics
.
putMetric
(
ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS
,
cmdData
.
getZkOutstandingRequests
().
floatValue
());
...
...
@@ -208,24 +220,36 @@ public class ZookeeperMetricServiceImpl extends BaseMetricService implements Zoo
Result
<
ZookeeperMetrics
>
rz
=
null
;
for
(
Tuple
<
String
,
Integer
>
hostPort:
param
.
getZkAddressList
())
{
Result
<
MonitorCmdData
>
cmdDataResult
=
FourLetterWordUtil
.
executeFourLetterCmd
(
param
.
getClusterPhyId
(),
hostPort
.
getV1
(),
hostPort
.
getV2
(),
param
.
getZkConfig
()
!=
null
?
param
.
getZkConfig
().
getOpenSecure
():
false
,
param
.
getZkConfig
()
!=
null
?
param
.
getZkConfig
().
getRequestTimeoutUnitMs
():
Constant
.
DEFAULT_REQUEST_TIMEOUT_UNIT_MS
,
new
MonitorCmdDataParser
()
);
if
(
cmdDataResult
.
failed
())
{
rz
=
Result
.
buildFromIgnoreData
(
cmdDataResult
);
MonitorCmdData
cmdData
=
null
;
BaseFourLetterWordCmdData
baseCmdData
=
ZookeeperLocalCache
.
getData
(
hostPort
.
getV1
(),
hostPort
.
getV2
(),
FourLetterWordUtil
.
MonitorCmd
);
if
(
baseCmdData
!=
null
)
{
cmdData
=
(
MonitorCmdData
)
baseCmdData
;
}
else
if
(
ZookeeperLocalCache
.
canUse
(
hostPort
.
getV1
(),
hostPort
.
getV2
(),
FourLetterWordUtil
.
MonitorCmd
))
{
Result
<
MonitorCmdData
>
cmdDataResult
=
FourLetterWordUtil
.
executeFourLetterCmd
(
param
.
getClusterPhyId
(),
hostPort
.
getV1
(),
hostPort
.
getV2
(),
param
.
getZkConfig
()
!=
null
?
param
.
getZkConfig
().
getOpenSecure
():
false
,
param
.
getZkConfig
()
!=
null
?
param
.
getZkConfig
().
getRequestTimeoutUnitMs
():
Constant
.
DEFAULT_REQUEST_TIMEOUT_UNIT_MS
,
new
MonitorCmdDataParser
()
);
if
(
cmdDataResult
.
failed
())
{
ZookeeperLocalCache
.
setFailed
(
hostPort
.
getV1
(),
hostPort
.
getV2
(),
FourLetterWordUtil
.
MonitorCmd
);
rz
=
Result
.
buildFromIgnoreData
(
cmdDataResult
);
continue
;
}
cmdData
=
cmdDataResult
.
getData
();
ZookeeperLocalCache
.
putData
(
hostPort
.
getV1
(),
hostPort
.
getV2
(),
FourLetterWordUtil
.
MonitorCmd
,
cmdData
);
}
else
{
continue
;
}
MonitorCmdData
cmdData
=
cmdDataResult
.
getData
();
ZookeeperMetrics
metrics
=
new
ZookeeperMetrics
(
param
.
getClusterPhyId
());
metrics
.
putMetric
(
ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY
,
cmdData
.
getZkAvgLatency
()
.
floatValue
()
);
metrics
.
putMetric
(
ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY
,
cmdData
.
getZkAvgLatency
());
metrics
.
putMetric
(
ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY
,
cmdData
.
getZkMinLatency
().
floatValue
());
metrics
.
putMetric
(
ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY
,
cmdData
.
getZkMaxLatency
().
floatValue
());
metrics
.
putMetric
(
ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS
,
cmdData
.
getZkOutstandingRequests
().
floatValue
());
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录