Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
DiDi
kafka-manager
提交
6c610427
K
kafka-manager
项目概览
DiDi
/
kafka-manager
9 个月 前同步成功
通知
58
Star
6372
Fork
1229
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
K
kafka-manager
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
6c610427
编写于
10月 08, 2022
作者:
Z
zengqiao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
ZK-增加ZK信息查询接口
上级
b4cc31c4
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
555 addition
and
0 deletion
+555
-0
km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterZookeepersManager.java
...ow/streaming/km/biz/cluster/ClusterZookeepersManager.java
+19
-0
km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterZookeepersManagerImpl.java
...ing/km/biz/cluster/impl/ClusterZookeepersManagerImpl.java
+137
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/cluster/ClusterZookeepersOverviewDTO.java
...common/bean/dto/cluster/ClusterZookeepersOverviewDTO.java
+13
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/Znode.java
...know/streaming/km/common/bean/entity/zookeeper/Znode.java
+19
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ClusterZookeepersOverviewVO.java
...common/bean/vo/zookeeper/ClusterZookeepersOverviewVO.java
+26
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ClusterZookeepersStateVO.java
...km/common/bean/vo/zookeeper/ClusterZookeepersStateVO.java
+47
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ZnodeStatVO.java
...ow/streaming/km/common/bean/vo/zookeeper/ZnodeStatVO.java
+44
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ZnodeVO.java
...i/know/streaming/km/common/bean/vo/zookeeper/ZnodeVO.java
+22
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ZnodeConverter.java
...ji/know/streaming/km/common/converter/ZnodeConverter.java
+19
-0
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZnodeService.java
...now/streaming/km/core/service/zookeeper/ZnodeService.java
+13
-0
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZnodeServiceImpl.java
...ming/km/core/service/zookeeper/impl/ZnodeServiceImpl.java
+81
-0
km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterZookeepersController.java
...g/km/rest/api/v3/cluster/ClusterZookeepersController.java
+63
-0
km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/zk/ZookeeperMetricsController.java
...reaming/km/rest/api/v3/zk/ZookeeperMetricsController.java
+52
-0
未找到文件。
km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterZookeepersManager.java
0 → 100644
浏览文件 @
6c610427
package
com.xiaojukeji.know.streaming.km.biz.cluster
;
import
com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterZookeepersOverviewDTO
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result
;
import
com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersOverviewVO
;
import
com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersStateVO
;
import
com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ZnodeVO
;
/**
* 多集群总体状态
*/
public
interface
ClusterZookeepersManager
{
Result
<
ClusterZookeepersStateVO
>
getClusterPhyZookeepersState
(
Long
clusterPhyId
);
PaginationResult
<
ClusterZookeepersOverviewVO
>
getClusterPhyZookeepersOverview
(
Long
clusterPhyId
,
ClusterZookeepersOverviewDTO
dto
);
Result
<
ZnodeVO
>
getZnodeVO
(
Long
clusterPhyId
,
String
path
);
}
km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterZookeepersManagerImpl.java
0 → 100644
浏览文件 @
6c610427
package
com.xiaojukeji.know.streaming.km.biz.cluster.impl
;
import
com.didiglobal.logi.log.ILog
;
import
com.didiglobal.logi.log.LogFactory
;
import
com.xiaojukeji.know.streaming.km.biz.cluster.ClusterZookeepersManager
;
import
com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterZookeepersOverviewDTO
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.Znode
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo
;
import
com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersOverviewVO
;
import
com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersStateVO
;
import
com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ZnodeVO
;
import
com.xiaojukeji.know.streaming.km.common.constant.MsgConstant
;
import
com.xiaojukeji.know.streaming.km.common.enums.zookeeper.ZKRoleEnum
;
import
com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil
;
import
com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil
;
import
com.xiaojukeji.know.streaming.km.common.utils.Tuple
;
import
com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService
;
import
com.xiaojukeji.know.streaming.km.core.service.version.metrics.ZookeeperMetricVersionItems
;
import
com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZnodeService
;
import
com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService
;
import
com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
java.util.Arrays
;
import
java.util.List
;
import
java.util.stream.Collectors
;
@Service
public
class
ClusterZookeepersManagerImpl
implements
ClusterZookeepersManager
{
private
static
final
ILog
LOGGER
=
LogFactory
.
getLog
(
ClusterZookeepersManagerImpl
.
class
);
@Autowired
private
ClusterPhyService
clusterPhyService
;
@Autowired
private
ZookeeperService
zookeeperService
;
@Autowired
private
ZookeeperMetricService
zookeeperMetricService
;
@Autowired
private
ZnodeService
znodeService
;
@Override
public
Result
<
ClusterZookeepersStateVO
>
getClusterPhyZookeepersState
(
Long
clusterPhyId
)
{
ClusterPhy
clusterPhy
=
clusterPhyService
.
getClusterByCluster
(
clusterPhyId
);
if
(
clusterPhy
==
null
)
{
return
Result
.
buildFromRSAndMsg
(
ResultStatus
.
CLUSTER_NOT_EXIST
,
MsgConstant
.
getClusterPhyNotExist
(
clusterPhyId
));
}
// // TODO
// private Integer healthState;
// private Integer healthCheckPassed;
// private Integer healthCheckTotal;
List
<
ZookeeperInfo
>
infoList
=
zookeeperService
.
listFromDBByCluster
(
clusterPhyId
);
ClusterZookeepersStateVO
vo
=
new
ClusterZookeepersStateVO
();
vo
.
setTotalServerCount
(
infoList
.
size
());
vo
.
setAliveFollowerCount
(
0
);
vo
.
setTotalFollowerCount
(
0
);
vo
.
setAliveObserverCount
(
0
);
vo
.
setTotalObserverCount
(
0
);
vo
.
setAliveServerCount
(
0
);
for
(
ZookeeperInfo
info:
infoList
)
{
if
(
info
.
getRole
().
equals
(
ZKRoleEnum
.
LEADER
.
getRole
()))
{
vo
.
setLeaderNode
(
info
.
getHost
());
}
if
(
info
.
getRole
().
equals
(
ZKRoleEnum
.
FOLLOWER
.
getRole
()))
{
vo
.
setTotalFollowerCount
(
vo
.
getTotalFollowerCount
()
+
1
);
vo
.
setAliveFollowerCount
(
info
.
alive
()?
vo
.
getAliveFollowerCount
()
+
1
:
vo
.
getAliveFollowerCount
());
}
if
(
info
.
getRole
().
equals
(
ZKRoleEnum
.
OBSERVER
.
getRole
()))
{
vo
.
setTotalObserverCount
(
vo
.
getTotalObserverCount
()
+
1
);
vo
.
setAliveObserverCount
(
info
.
alive
()?
vo
.
getAliveObserverCount
()
+
1
:
vo
.
getAliveObserverCount
());
}
if
(
info
.
alive
())
{
vo
.
setAliveServerCount
(
vo
.
getAliveServerCount
()
+
1
);
}
}
Result
<
ZookeeperMetrics
>
metricsResult
=
zookeeperMetricService
.
collectMetricsFromZookeeper
(
new
ZookeeperMetricParam
(
clusterPhyId
,
infoList
.
stream
().
filter
(
elem
->
elem
.
alive
()).
map
(
item
->
new
Tuple
<
String
,
Integer
>(
item
.
getHost
(),
item
.
getPort
())).
collect
(
Collectors
.
toList
()),
ConvertUtil
.
str2ObjByJson
(
clusterPhy
.
getZkProperties
(),
ZKConfig
.
class
),
ZookeeperMetricVersionItems
.
ZOOKEEPER_METRIC_WATCH_COUNT
));
if
(
metricsResult
.
failed
())
{
LOGGER
.
error
(
"class=ClusterZookeepersManagerImpl||method=getClusterPhyZookeepersState||clusterPhyId={}||errMsg={}"
,
clusterPhyId
,
metricsResult
.
getMessage
()
);
return
Result
.
buildSuc
(
vo
);
}
Float
watchCount
=
metricsResult
.
getData
().
getMetric
(
ZookeeperMetricVersionItems
.
ZOOKEEPER_METRIC_WATCH_COUNT
);
vo
.
setWatchCount
(
watchCount
!=
null
?
watchCount
.
intValue
():
null
);
return
Result
.
buildSuc
(
vo
);
}
@Override
public
PaginationResult
<
ClusterZookeepersOverviewVO
>
getClusterPhyZookeepersOverview
(
Long
clusterPhyId
,
ClusterZookeepersOverviewDTO
dto
)
{
//获取集群zookeeper列表
List
<
ClusterZookeepersOverviewVO
>
clusterZookeepersOverviewVOList
=
ConvertUtil
.
list2List
(
zookeeperService
.
listFromDBByCluster
(
clusterPhyId
),
ClusterZookeepersOverviewVO
.
class
);
//搜索
clusterZookeepersOverviewVOList
=
PaginationUtil
.
pageByFuzzyFilter
(
clusterZookeepersOverviewVOList
,
dto
.
getSearchKeywords
(),
Arrays
.
asList
(
"host"
));
//分页
PaginationResult
<
ClusterZookeepersOverviewVO
>
paginationResult
=
PaginationUtil
.
pageBySubData
(
clusterZookeepersOverviewVOList
,
dto
);
return
paginationResult
;
}
@Override
public
Result
<
ZnodeVO
>
getZnodeVO
(
Long
clusterPhyId
,
String
path
)
{
Result
<
Znode
>
result
=
znodeService
.
getZnode
(
clusterPhyId
,
path
);
if
(
result
.
failed
())
{
return
Result
.
buildFromIgnoreData
(
result
);
}
return
Result
.
buildSuc
(
ConvertUtil
.
obj2ObjByJSON
(
result
.
getData
(),
ZnodeVO
.
class
));
}
/**************************************************** private method ****************************************************/
}
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/cluster/ClusterZookeepersOverviewDTO.java
0 → 100644
浏览文件 @
6c610427
package
com.xiaojukeji.know.streaming.km.common.bean.dto.cluster
;
import
com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO
;
import
lombok.Data
;
/**
* @author wyc
* @date 2022/9/23
*/
@Data
public
class
ClusterZookeepersOverviewDTO
extends
PaginationBaseDTO
{
}
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/Znode.java
0 → 100644
浏览文件 @
6c610427
package
com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper
;
import
com.xiaojukeji.know.streaming.km.common.utils.Tuple
;
import
io.swagger.annotations.ApiModelProperty
;
import
lombok.Data
;
import
org.apache.zookeeper.data.Stat
;
@Data
public
class
Znode
{
@ApiModelProperty
(
value
=
"节点名称"
,
example
=
"broker"
)
private
String
name
;
@ApiModelProperty
(
value
=
"节点数据"
,
example
=
"saassad"
)
private
String
data
;
@ApiModelProperty
(
value
=
"节点属性"
,
example
=
""
)
private
Stat
stat
;
}
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ClusterZookeepersOverviewVO.java
0 → 100644
浏览文件 @
6c610427
package
com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper
;
import
io.swagger.annotations.ApiModel
;
import
io.swagger.annotations.ApiModelProperty
;
import
lombok.Data
;
/**
* @author wyc
* @date 2022/9/23
*/
@Data
@ApiModel
(
description
=
"Zookeeper信息概览"
)
public
class
ClusterZookeepersOverviewVO
{
@ApiModelProperty
(
value
=
"主机ip"
,
example
=
"121.0.0.1"
)
private
String
host
;
@ApiModelProperty
(
value
=
"端口号"
,
example
=
"2416"
)
private
Integer
port
;
@ApiModelProperty
(
value
=
"版本"
,
example
=
"1.1.2"
)
private
String
version
;
@ApiModelProperty
(
value
=
"角色"
,
example
=
"Leader"
)
private
String
role
;
}
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ClusterZookeepersStateVO.java
0 → 100644
浏览文件 @
6c610427
package
com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper
;
import
io.swagger.annotations.ApiModel
;
import
io.swagger.annotations.ApiModelProperty
;
import
lombok.Data
;
/**
* @author wyc
* @date 2022/9/23
*/
@Data
@ApiModel
(
description
=
"ZK状态信息"
)
public
class
ClusterZookeepersStateVO
{
@ApiModelProperty
(
value
=
"健康检查状态"
,
example
=
"1"
)
private
Integer
healthState
;
@ApiModelProperty
(
value
=
"健康检查通过数"
,
example
=
"1"
)
private
Integer
healthCheckPassed
;
@ApiModelProperty
(
value
=
"健康检查总数"
,
example
=
"1"
)
private
Integer
healthCheckTotal
;
@ApiModelProperty
(
value
=
"ZK的Leader机器"
,
example
=
"127.0.0.1"
)
private
String
leaderNode
;
@ApiModelProperty
(
value
=
"Watch数"
,
example
=
"123456"
)
private
Integer
watchCount
;
@ApiModelProperty
(
value
=
"节点存活数"
,
example
=
"8"
)
private
Integer
aliveServerCount
;
@ApiModelProperty
(
value
=
"总节点数"
,
example
=
"10"
)
private
Integer
totalServerCount
;
@ApiModelProperty
(
value
=
"Follower角色存活数"
,
example
=
"8"
)
private
Integer
aliveFollowerCount
;
@ApiModelProperty
(
value
=
"Follower角色总数"
,
example
=
"10"
)
private
Integer
totalFollowerCount
;
@ApiModelProperty
(
value
=
"Observer角色存活数"
,
example
=
"3"
)
private
Integer
aliveObserverCount
;
@ApiModelProperty
(
value
=
"Observer角色总数"
,
example
=
"3"
)
private
Integer
totalObserverCount
;
}
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ZnodeStatVO.java
0 → 100644
浏览文件 @
6c610427
package
com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper
;
import
io.swagger.annotations.ApiModelProperty
;
import
lombok.Data
;
/**
* @author wyc
* @date 2022/9/23
*/
@Data
public
class
ZnodeStatVO
{
@ApiModelProperty
(
value
=
"节点被创建时的事物的ID"
,
example
=
"0x1f09"
)
private
Long
czxid
;
@ApiModelProperty
(
value
=
"创建时间"
,
example
=
"Sat Mar 16 15:38:34 CST 2019"
)
private
Long
ctime
;
@ApiModelProperty
(
value
=
"节点最后一次被修改时的事物的ID"
,
example
=
"0x1f09"
)
private
Long
mzxid
;
@ApiModelProperty
(
value
=
"最后一次修改时间"
,
example
=
"Sat Mar 16 15:38:34 CST 2019"
)
private
Long
mtime
;
@ApiModelProperty
(
value
=
"子节点列表最近一次呗修改的事物ID"
,
example
=
"0x31"
)
private
Long
pzxid
;
@ApiModelProperty
(
value
=
"子节点版本号"
,
example
=
"0"
)
private
Integer
cversion
;
@ApiModelProperty
(
value
=
"数据版本号"
,
example
=
"0"
)
private
Integer
version
;
@ApiModelProperty
(
value
=
"ACL版本号"
,
example
=
"0"
)
private
Integer
aversion
;
@ApiModelProperty
(
value
=
"创建临时节点的事物ID,持久节点事物为0"
,
example
=
"0"
)
private
Long
ephemeralOwner
;
@ApiModelProperty
(
value
=
"数据长度,每个节点都可保存数据"
,
example
=
"22"
)
private
Integer
dataLength
;
@ApiModelProperty
(
value
=
"子节点的个数"
,
example
=
"6"
)
private
Integer
numChildren
;
}
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ZnodeVO.java
0 → 100644
浏览文件 @
6c610427
package
com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper
;
import
io.swagger.annotations.ApiModelProperty
;
import
lombok.Data
;
/**
* @author wyc
* @date 2022/9/23
*/
@Data
public
class
ZnodeVO
{
@ApiModelProperty
(
value
=
"节点名称"
,
example
=
"broker"
)
private
String
name
;
@ApiModelProperty
(
value
=
"节点数据"
,
example
=
"saassad"
)
private
String
data
;
@ApiModelProperty
(
value
=
"节点属性"
,
example
=
""
)
private
ZnodeStatVO
stat
;
}
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ZnodeConverter.java
0 → 100644
浏览文件 @
6c610427
package
com.xiaojukeji.know.streaming.km.common.converter
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.Znode
;
import
com.xiaojukeji.know.streaming.km.common.utils.Tuple
;
import
org.apache.zookeeper.data.Stat
;
public
class
ZnodeConverter
{
ZnodeConverter
(){
}
public
static
Znode
convert2Znode
(
Tuple
<
byte
[],
Stat
>
dataAndStat
,
String
path
)
{
Znode
znode
=
new
Znode
();
znode
.
setStat
(
dataAndStat
.
getV2
());
znode
.
setData
(
dataAndStat
.
getV1
()
==
null
?
null
:
new
String
(
dataAndStat
.
getV1
()));
znode
.
setName
(
path
.
substring
(
path
.
lastIndexOf
(
'/'
)
+
1
));
return
znode
;
}
}
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZnodeService.java
0 → 100644
浏览文件 @
6c610427
package
com.xiaojukeji.know.streaming.km.core.service.zookeeper
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.Znode
;
import
java.util.List
;
public
interface
ZnodeService
{
Result
<
List
<
String
>>
listZnodeChildren
(
Long
clusterPhyId
,
String
path
,
String
keyword
);
Result
<
Znode
>
getZnode
(
Long
clusterPhyId
,
String
path
);
}
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZnodeServiceImpl.java
0 → 100644
浏览文件 @
6c610427
package
com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl
;
import
com.didiglobal.logi.log.ILog
;
import
com.didiglobal.logi.log.LogFactory
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.Znode
;
import
com.xiaojukeji.know.streaming.km.common.constant.MsgConstant
;
import
com.xiaojukeji.know.streaming.km.common.converter.ZnodeConverter
;
import
com.xiaojukeji.know.streaming.km.common.exception.NotExistException
;
import
com.xiaojukeji.know.streaming.km.common.utils.Tuple
;
import
com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService
;
import
com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZnodeService
;
import
com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO
;
import
org.apache.zookeeper.data.Stat
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
java.util.List
;
import
java.util.stream.Collectors
;
@Service
public
class
ZnodeServiceImpl
implements
ZnodeService
{
private
static
final
ILog
LOGGER
=
LogFactory
.
getLog
(
ZnodeServiceImpl
.
class
);
@Autowired
private
KafkaZKDAO
kafkaZKDAO
;
@Autowired
private
ClusterPhyService
clusterPhyService
;
@Override
public
Result
<
List
<
String
>>
listZnodeChildren
(
Long
clusterPhyId
,
String
path
,
String
keyword
)
{
ClusterPhy
clusterPhy
=
clusterPhyService
.
getClusterByCluster
(
clusterPhyId
);
if
(
clusterPhy
==
null
)
{
return
Result
.
buildFromRSAndMsg
(
ResultStatus
.
NOT_EXIST
,
MsgConstant
.
getClusterPhyNotExist
(
clusterPhyId
));
}
List
<
String
>
children
;
try
{
children
=
kafkaZKDAO
.
getChildren
(
clusterPhyId
,
path
,
false
);
}
catch
(
NotExistException
e
)
{
LOGGER
.
error
(
"class=ZnodeServiceImpl||method=listZnodeChildren||clusterPhyId={}||errMsg={}"
,
clusterPhyId
,
"create ZK client create failed"
);
return
Result
.
buildFromRSAndMsg
(
ResultStatus
.
NOT_EXIST
,
"ZK客户端创建失败"
);
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
"class=ZnodeServiceImpl||method=listZnodeChildren||clusterPhyId={}||errMsg={}"
,
clusterPhyId
,
"ZK operate failed"
);
return
Result
.
buildFromRSAndMsg
(
ResultStatus
.
ZK_OPERATE_FAILED
,
"ZK操作失败"
);
}
//关键字搜索
if
(
keyword
!=
null
)
{
children
=
children
.
stream
().
filter
(
elem
->
elem
.
contains
(
keyword
)).
collect
(
Collectors
.
toList
());
}
return
Result
.
buildSuc
(
children
);
}
@Override
public
Result
<
Znode
>
getZnode
(
Long
clusterPhyId
,
String
path
)
{
ClusterPhy
clusterPhy
=
clusterPhyService
.
getClusterByCluster
(
clusterPhyId
);
if
(
clusterPhy
==
null
)
{
return
Result
.
buildFromRSAndMsg
(
ResultStatus
.
NOT_EXIST
,
MsgConstant
.
getClusterPhyNotExist
(
clusterPhyId
));
}
//获取zookeeper上的原始数据
Tuple
<
byte
[],
Stat
>
dataAndStat
;
try
{
dataAndStat
=
kafkaZKDAO
.
getDataAndStat
(
clusterPhyId
,
path
);
}
catch
(
NotExistException
e
)
{
LOGGER
.
error
(
"class=ZnodeServiceImpl||method=getZnode||clusterPhyId={}||errMsg={}"
,
clusterPhyId
,
"create ZK client create failed"
);
return
Result
.
buildFromRSAndMsg
(
ResultStatus
.
NOT_EXIST
,
"ZK客户端创建失败"
);
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
"class=ZnodeServiceImpl||method=getZnode||clusterPhyId={}||errMsg={}"
,
clusterPhyId
,
"ZK operate failed"
);
return
Result
.
buildFromRSAndMsg
(
ResultStatus
.
ZK_OPERATE_FAILED
,
"ZK操作失败"
);
}
return
Result
.
buildSuc
(
ZnodeConverter
.
convert2Znode
(
dataAndStat
,
path
));
}
}
km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterZookeepersController.java
0 → 100644
浏览文件 @
6c610427
package
com.xiaojukeji.know.streaming.km.rest.api.v3.cluster
;
import
com.xiaojukeji.know.streaming.km.biz.cluster.ClusterZookeepersManager
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result
;
import
com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersOverviewVO
;
import
com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix
;
import
com.xiaojukeji.know.streaming.km.common.constant.Constant
;
import
com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterZookeepersOverviewDTO
;
import
com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersStateVO
;
import
com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ZnodeVO
;
import
com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZnodeService
;
import
io.swagger.annotations.Api
;
import
io.swagger.annotations.ApiOperation
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.web.bind.annotation.*
;
import
java.util.List
;
/**
* @author zengqiao
* @date 22/09/19
*/
@Api
(
tags
=
Constant
.
SWAGGER_API_TAG_PREFIX
+
"集群ZK-相关接口(REST)"
)
@RestController
@RequestMapping
(
ApiPrefix
.
API_V3_PREFIX
)
public
class
ClusterZookeepersController
{
@Autowired
private
ClusterZookeepersManager
clusterZookeepersManager
;
@Autowired
private
ZnodeService
znodeService
;
@ApiOperation
(
"集群Zookeeper状态信息"
)
@GetMapping
(
value
=
"clusters/{clusterPhyId}/zookeepers-state"
)
public
Result
<
ClusterZookeepersStateVO
>
getClusterZookeepersState
(
@PathVariable
Long
clusterPhyId
)
{
return
clusterZookeepersManager
.
getClusterPhyZookeepersState
(
clusterPhyId
);
}
@ApiOperation
(
"集群Zookeeper信息列表"
)
@PostMapping
(
value
=
"clusters/{clusterPhyId}/zookeepers-overview"
)
public
PaginationResult
<
ClusterZookeepersOverviewVO
>
getClusterZookeepersOverview
(
@PathVariable
Long
clusterPhyId
,
@RequestBody
ClusterZookeepersOverviewDTO
dto
)
{
return
clusterZookeepersManager
.
getClusterPhyZookeepersOverview
(
clusterPhyId
,
dto
);
}
@ApiOperation
(
"Zookeeper节点数据"
)
@GetMapping
(
value
=
"clusters/{clusterPhyId}/znode-data"
)
public
Result
<
ZnodeVO
>
getClusterZookeeperData
(
@PathVariable
Long
clusterPhyId
,
@RequestParam
String
path
)
{
return
clusterZookeepersManager
.
getZnodeVO
(
clusterPhyId
,
path
);
}
@ApiOperation
(
"Zookeeper节点列表"
)
@GetMapping
(
value
=
"clusters/{clusterPhyId}/znode-children"
)
public
Result
<
List
<
String
>>
getClusterZookeeperChild
(
@PathVariable
Long
clusterPhyId
,
@RequestParam
String
path
,
@RequestParam
(
required
=
false
)
String
keyword
)
{
return
znodeService
.
listZnodeChildren
(
clusterPhyId
,
path
,
keyword
);
}
}
km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/zk/ZookeeperMetricsController.java
0 → 100644
浏览文件 @
6c610427
package
com.xiaojukeji.know.streaming.km.rest.api.v3.zk
;
import
com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result
;
import
com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO
;
import
com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix
;
import
com.xiaojukeji.know.streaming.km.common.constant.Constant
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics
;
import
com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService
;
import
io.swagger.annotations.Api
;
import
io.swagger.annotations.ApiOperation
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.web.bind.annotation.*
;
import
java.util.List
;
/**
* @author zengqiao
* @date 22/09/19
*/
@Api
(
tags
=
Constant
.
SWAGGER_API_TAG_PREFIX
+
"ZKMetrics-相关接口(REST)"
)
@RestController
@RequestMapping
(
ApiPrefix
.
API_V3_PREFIX
)
public
class
ZookeeperMetricsController
{
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
ZookeeperMetricsController
.
class
);
@Autowired
private
ZookeeperMetricService
zookeeperMetricService
;
@ApiOperation
(
value
=
"ZK-最近指标"
,
notes
=
""
)
@PostMapping
(
value
=
"clusters/{clusterPhyId}/zookeeper-latest-metrics"
)
@ResponseBody
public
Result
<
BaseMetrics
>
getLatestMetrics
(
@PathVariable
Long
clusterPhyId
,
@RequestBody
List
<
String
>
metricsNames
)
{
Result
<
ZookeeperMetrics
>
metricsResult
=
zookeeperMetricService
.
batchCollectMetricsFromZookeeper
(
clusterPhyId
,
metricsNames
);
if
(
metricsResult
.
failed
())
{
return
Result
.
buildFromIgnoreData
(
metricsResult
);
}
return
Result
.
buildSuc
(
metricsResult
.
getData
());
}
@ApiOperation
(
value
=
"ZK-多指标历史信息"
,
notes
=
"多条指标线"
)
@PostMapping
(
value
=
"clusters/{clusterPhyId}/zookeeper-metrics"
)
@ResponseBody
public
Result
<
List
<
MetricLineVO
>>
getMetricsLine
(
@PathVariable
Long
clusterPhyId
,
@RequestBody
MetricDTO
dto
)
{
return
zookeeperMetricService
.
listMetricsFromES
(
clusterPhyId
,
dto
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录