Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
DiDi
kafka-manager
提交
d10a7bcc
K
kafka-manager
项目概览
DiDi
/
kafka-manager
9 个月 前同步成功
通知
58
Star
6372
Fork
1229
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
K
kafka-manager
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
d10a7bcc
编写于
9月 03, 2022
作者:
E
EricZeng
提交者:
GitHub
9月 03, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #549 from didi/dev
合并开发分支
上级
639b1f83
afe44a25
变更
32
展开全部
隐藏空白更改
内联
并排
Showing
32 changed file
with
1001 addition
and
127 deletion
+1001
-127
docs/install_guide/版本升级手册.md
docs/install_guide/版本升级手册.md
+25
-1
docs/user_guide/faq.md
docs/user_guide/faq.md
+18
-0
km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/MetricESSender.java
...ji/know/streaming/km/collector/metric/MetricESSender.java
+12
-11
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java
...i/know/streaming/km/common/bean/entity/broker/Broker.java
+33
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxConfig.java
...now/streaming/km/common/bean/entity/config/JmxConfig.java
+3
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/PartitionOffsetParam.java
...mon/bean/entity/param/partition/PartitionOffsetParam.java
+4
-4
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/topic/TopicParam.java
...reaming/km/common/bean/entity/param/topic/TopicParam.java
+8
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/broker/BrokerPO.java
...eji/know/streaming/km/common/bean/po/broker/BrokerPO.java
+5
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/point/MetricPointVO.java
...eaming/km/common/bean/vo/metrics/point/MetricPointVO.java
+4
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESIndexConstant.java
...ji/know/streaming/km/common/constant/ESIndexConstant.java
+647
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java
...keji/know/streaming/km/common/constant/KafkaConstant.java
+1
-1
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/metric/KafkaMetricIndexEnum.java
...treaming/km/common/enums/metric/KafkaMetricIndexEnum.java
+0
-54
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java
...eaming/km/core/service/broker/impl/BrokerServiceImpl.java
+5
-3
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java
...treaming/km/core/service/group/impl/GroupServiceImpl.java
+4
-1
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java
.../km/core/service/partition/impl/PartitionServiceImpl.java
+10
-4
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java
...re/service/version/metrics/ClusterMetricVersionItems.java
+2
-0
km-dist/init/sql/ddl-ks-km.sql
km-dist/init/sql/ddl-ks-km.sql
+1
-0
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/BaseESDAO.java
...iaojukeji/know/streaming/km/persistence/es/BaseESDAO.java
+1
-1
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java
...aojukeji/know/streaming/km/persistence/es/ESOpClient.java
+91
-0
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java
...know/streaming/km/persistence/es/dao/BaseMetricESDAO.java
+66
-7
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java
...ow/streaming/km/persistence/es/dao/BrokerMetricESDAO.java
+6
-4
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java
...w/streaming/km/persistence/es/dao/ClusterMetricESDAO.java
+6
-4
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java
...now/streaming/km/persistence/es/dao/GroupMetricESDAO.java
+6
-5
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/PartitionMetricESDAO.java
...streaming/km/persistence/es/dao/PartitionMetricESDAO.java
+5
-3
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ReplicationMetricESDAO.java
...reaming/km/persistence/es/dao/ReplicationMetricESDAO.java
+5
-3
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java
...now/streaming/km/persistence/es/dao/TopicMetricESDAO.java
+6
-4
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java
...i/know/streaming/km/persistence/kafka/KafkaJMXClient.java
+3
-3
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/broker/BrokerDAO.java
...know/streaming/km/persistence/mysql/broker/BrokerDAO.java
+0
-1
km-persistence/src/main/resources/mybatis/BrokerMapper.xml
km-persistence/src/main/resources/mybatis/BrokerMapper.xml
+1
-6
km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java
...w/streaming/km/persistence/es/ClusterMetricESDAOTest.java
+2
-2
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/HealthCheckTask.java
...jukeji/know/streaming/km/task/health/HealthCheckTask.java
+20
-4
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/TaskThreadPoolService.java
...know/streaming/km/task/service/TaskThreadPoolService.java
+1
-1
未找到文件。
docs/install_guide/版本升级手册.md
浏览文件 @
d10a7bcc
## 6.2、版本升级手册
**`2.x`版本 升级至 `3.0.0`版本**
注意:如果想升级至具体版本,需要将你当前版本至你期望使用版本的变更统统执行一遍,然后才能正常使用。
`master`
版本,需要
### 6.2.0、升级至 `master` 版本
**SQL变更**
1、在
`ks_km_broker`
表增加了一个监听信息字段。
2、为
`logi_security_oplog`
表operation_methods字段设置默认值''。
因此需要执行下面的sql对数据库表进行更新。
```
sql
ALTER
TABLE
`ks_km_broker`
ADD
COLUMN
`endpoint_map`
VARCHAR
(
1024
)
NOT
NULL
DEFAULT
''
COMMENT
'监听信息'
AFTER
`update_time`
;
ALTER
TABLE
`logi_security_oplog`
ALTER
COLUMN
`operation_methods`
set
default
''
;
```
---
### 6.2.1、`2.x`版本 升级至 `3.0.0`版本
**升级步骤:**
...
...
docs/user_guide/faq.md
浏览文件 @
d10a7bcc
...
...
@@ -109,3 +109,21 @@ SECURITY.TRICK_USERS
设置完成上面两步之后,就可以直接调用需要登录的接口了。
但是还有一点需要注意,绕过的用户仅能调用他有权限的接口,比如一个普通用户,那么他就只能调用普通的接口,不能去调用运维人员的接口。
## 8.8、Specified key was too long; max key length is 767 bytes
**原因:**
不同版本的InoDB引擎,参数‘innodb_large_prefix’默认值不同,即在5.6默认值为OFF,5.7默认值为ON。
对于引擎为InnoDB,innodb_large_prefix=OFF,且行格式为Antelope即支持REDUNDANT或COMPACT时,索引键前缀长度最大为 767 字节。innodb_large_prefix=ON,且行格式为Barracuda即支持DYNAMIC或COMPRESSED时,索引键前缀长度最大为3072字节。
**解决方案:**
-
减少varchar字符大小低于767/4=191。
-
将字符集改为latin1(一个字符=一个字节)。
-
开启‘innodb_large_prefix’,修改默认行格式‘innodb_file_format’为Barracuda,并设置row_format=dynamic。
## 8.9、出现ESIndexNotFoundEXception报错
**原因 :**
没有创建ES索引模版
**解决方案:**
执行init_es_template.sh脚本,创建ES索引模版即可。
km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/MetricESSender.java
浏览文件 @
d10a7bcc
...
...
@@ -5,7 +5,6 @@ import com.didiglobal.logi.log.LogFactory;
import
com.xiaojukeji.know.streaming.km.common.bean.event.metric.*
;
import
com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO
;
import
com.xiaojukeji.know.streaming.km.common.bean.po.metrice.*
;
import
com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum
;
import
com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil
;
import
com.xiaojukeji.know.streaming.km.common.utils.EnvUtil
;
import
com.xiaojukeji.know.streaming.km.common.utils.NamedThreadFactory
;
...
...
@@ -21,6 +20,8 @@ import java.util.concurrent.LinkedBlockingDeque;
import
java.util.concurrent.ThreadPoolExecutor
;
import
java.util.concurrent.TimeUnit
;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
constant
.
ESIndexConstant
.*;
@Component
public
class
MetricESSender
implements
ApplicationListener
<
BaseMetricEvent
>
{
protected
static
final
ILog
LOGGER
=
LogFactory
.
getLog
(
"METRIC_LOGGER"
);
...
...
@@ -41,37 +42,37 @@ public class MetricESSender implements ApplicationListener<BaseMetricEvent> {
public
void
onApplicationEvent
(
BaseMetricEvent
event
)
{
if
(
event
instanceof
BrokerMetricEvent
)
{
BrokerMetricEvent
brokerMetricEvent
=
(
BrokerMetricEvent
)
event
;
send2es
(
KafkaMetricIndexEnum
.
BROKER_INFO
,
send2es
(
BROKER_INDEX
,
ConvertUtil
.
list2List
(
brokerMetricEvent
.
getBrokerMetrics
(),
BrokerMetricPO
.
class
)
);
}
else
if
(
event
instanceof
ClusterMetricEvent
)
{
ClusterMetricEvent
clusterMetricEvent
=
(
ClusterMetricEvent
)
event
;
send2es
(
KafkaMetricIndexEnum
.
CLUSTER_INFO
,
send2es
(
CLUSTER_INDEX
,
ConvertUtil
.
list2List
(
clusterMetricEvent
.
getClusterMetrics
(),
ClusterMetricPO
.
class
)
);
}
else
if
(
event
instanceof
TopicMetricEvent
)
{
TopicMetricEvent
topicMetricEvent
=
(
TopicMetricEvent
)
event
;
send2es
(
KafkaMetricIndexEnum
.
TOPIC_INFO
,
send2es
(
TOPIC_INDEX
,
ConvertUtil
.
list2List
(
topicMetricEvent
.
getTopicMetrics
(),
TopicMetricPO
.
class
)
);
}
else
if
(
event
instanceof
PartitionMetricEvent
)
{
PartitionMetricEvent
partitionMetricEvent
=
(
PartitionMetricEvent
)
event
;
send2es
(
KafkaMetricIndexEnum
.
PARTITION_INFO
,
send2es
(
PARTITION_INDEX
,
ConvertUtil
.
list2List
(
partitionMetricEvent
.
getPartitionMetrics
(),
PartitionMetricPO
.
class
)
);
}
else
if
(
event
instanceof
GroupMetricEvent
)
{
GroupMetricEvent
groupMetricEvent
=
(
GroupMetricEvent
)
event
;
send2es
(
KafkaMetricIndexEnum
.
GROUP_INFO
,
send2es
(
GROUP_INDEX
,
ConvertUtil
.
list2List
(
groupMetricEvent
.
getGroupMetrics
(),
GroupMetricPO
.
class
)
);
}
else
if
(
event
instanceof
ReplicaMetricEvent
)
{
ReplicaMetricEvent
replicaMetricEvent
=
(
ReplicaMetricEvent
)
event
;
send2es
(
KafkaMetricIndexEnum
.
REPLICATION_INFO
,
send2es
(
REPLICATION_INDEX
,
ConvertUtil
.
list2List
(
replicaMetricEvent
.
getReplicationMetrics
(),
ReplicationMetricPO
.
class
)
);
}
...
...
@@ -80,19 +81,19 @@ public class MetricESSender implements ApplicationListener<BaseMetricEvent> {
/**
* 根据不同监控维度来发送
*/
private
boolean
send2es
(
KafkaMetricIndexEnum
stats
,
List
<?
extends
BaseESPO
>
statsList
){
private
boolean
send2es
(
String
index
,
List
<?
extends
BaseESPO
>
statsList
){
if
(
CollectionUtils
.
isEmpty
(
statsList
))
{
return
true
;
}
if
(!
EnvUtil
.
isOnline
())
{
LOGGER
.
info
(
"class=MetricESSender||method=send2es||ariusStats={}||size={}"
,
stats
.
getIndex
()
,
statsList
.
size
());
index
,
statsList
.
size
());
}
BaseMetricESDAO
baseMetricESDao
=
BaseMetricESDAO
.
getByStatsType
(
stats
);
BaseMetricESDAO
baseMetricESDao
=
BaseMetricESDAO
.
getByStatsType
(
index
);
if
(
Objects
.
isNull
(
baseMetricESDao
))
{
LOGGER
.
error
(
"class=MetricESSender||method=send2es||errMsg=fail to find {}"
,
stats
.
getIndex
()
);
LOGGER
.
error
(
"class=MetricESSender||method=send2es||errMsg=fail to find {}"
,
index
);
return
false
;
}
...
...
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java
浏览文件 @
d10a7bcc
package
com.xiaojukeji.know.streaming.km.common.bean.entity.broker
;
import
com.alibaba.fastjson.TypeReference
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData
;
import
com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO
;
import
com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil
;
import
com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.BrokerMetadata
;
import
lombok.AllArgsConstructor
;
import
lombok.Data
;
...
...
@@ -7,6 +12,7 @@ import lombok.NoArgsConstructor;
import
org.apache.kafka.common.Node
;
import
java.io.Serializable
;
import
java.util.Map
;
/**
* @author didi
...
...
@@ -55,6 +61,11 @@ public class Broker implements Serializable {
*/
private
Integer
status
;
/**
* 监听信息
*/
private
Map
<
String
,
IpPortData
>
endpointMap
;
public
static
Broker
buildFrom
(
Long
clusterPhyId
,
Node
node
,
Long
startTimestamp
)
{
Broker
metadata
=
new
Broker
();
metadata
.
setClusterPhyId
(
clusterPhyId
);
...
...
@@ -78,9 +89,31 @@ public class Broker implements Serializable {
metadata
.
setStartTimestamp
(
brokerMetadata
.
getTimestamp
());
metadata
.
setRack
(
brokerMetadata
.
getRack
());
metadata
.
setStatus
(
1
);
metadata
.
setEndpointMap
(
brokerMetadata
.
getEndpointMap
());
return
metadata
;
}
public
static
Broker
buildFrom
(
BrokerPO
brokerPO
)
{
Broker
broker
=
ConvertUtil
.
obj2Obj
(
brokerPO
,
Broker
.
class
);
String
endpointMapStr
=
brokerPO
.
getEndpointMap
();
if
(
broker
==
null
||
endpointMapStr
==
null
||
endpointMapStr
.
equals
(
""
))
{
return
broker
;
}
// 填充endpoint信息
Map
<
String
,
IpPortData
>
endpointMap
=
ConvertUtil
.
str2ObjByJson
(
endpointMapStr
,
new
TypeReference
<
Map
<
String
,
IpPortData
>>(){});
broker
.
setEndpointMap
(
endpointMap
);
return
broker
;
}
public
String
getJmxHost
(
String
endPoint
)
{
if
(
endPoint
==
null
||
endpointMap
==
null
)
{
return
host
;
}
IpPortData
ip
=
endpointMap
.
get
(
endPoint
);
return
ip
==
null
?
ip
.
getIp
()
:
host
;
}
public
boolean
alive
()
{
return
status
!=
null
&&
status
>
0
;
}
...
...
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxConfig.java
浏览文件 @
d10a7bcc
...
...
@@ -27,6 +27,9 @@ public class JmxConfig implements Serializable {
@ApiModelProperty
(
value
=
"SSL情况下的token"
,
example
=
"KsKmCCY19"
)
private
String
token
;
@ApiModelProperty
(
value
=
"使用哪个endpoint网络"
,
example
=
"EXTERNAL"
)
private
String
useWhichEndpoint
;
}
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/PartitionOffsetParam.java
浏览文件 @
d10a7bcc
package
com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.param.
cluster.ClusterPhy
Param
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.param.
topic.Topic
Param
;
import
lombok.Data
;
import
lombok.NoArgsConstructor
;
import
org.apache.kafka.clients.admin.OffsetSpec
;
...
...
@@ -10,13 +10,13 @@ import java.util.Map;
@Data
@NoArgsConstructor
public
class
PartitionOffsetParam
extends
ClusterPhy
Param
{
public
class
PartitionOffsetParam
extends
Topic
Param
{
private
Map
<
TopicPartition
,
OffsetSpec
>
topicPartitionOffsets
;
private
Long
timestamp
;
public
PartitionOffsetParam
(
Long
clusterPhyId
,
Map
<
TopicPartition
,
OffsetSpec
>
topicPartitionOffsets
,
Long
timestamp
)
{
super
(
clusterPhyId
);
public
PartitionOffsetParam
(
Long
clusterPhyId
,
String
topicName
,
Map
<
TopicPartition
,
OffsetSpec
>
topicPartitionOffsets
,
Long
timestamp
)
{
super
(
clusterPhyId
,
topicName
);
this
.
topicPartitionOffsets
=
topicPartitionOffsets
;
this
.
timestamp
=
timestamp
;
}
...
...
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/topic/TopicParam.java
浏览文件 @
d10a7bcc
...
...
@@ -15,4 +15,12 @@ public class TopicParam extends ClusterPhyParam {
super
(
clusterPhyId
);
this
.
topicName
=
topicName
;
}
@Override
public
String
toString
()
{
return
"TopicParam{"
+
"clusterPhyId="
+
clusterPhyId
+
", topicName='"
+
topicName
+
'\''
+
'}'
;
}
}
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/broker/BrokerPO.java
浏览文件 @
d10a7bcc
...
...
@@ -42,4 +42,9 @@ public class BrokerPO extends BasePO {
* Broker状态
*/
private
Integer
status
;
/**
* 监听信息
*/
private
String
endpointMap
;
}
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/point/MetricPointVO.java
浏览文件 @
d10a7bcc
...
...
@@ -29,6 +29,10 @@ public class MetricPointVO implements Comparable<MetricPointVO> {
@Override
public
int
compareTo
(
MetricPointVO
o
)
{
if
(
null
==
o
){
return
0
;}
if
(
null
==
this
.
getTimeStamp
()
||
null
==
o
.
getTimeStamp
()){
return
0
;
}
return
this
.
getTimeStamp
().
intValue
()
-
o
.
getTimeStamp
().
intValue
();
}
...
...
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESIndexConstant.java
0 → 100644
浏览文件 @
d10a7bcc
此差异已折叠。
点击以展开。
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java
浏览文件 @
d10a7bcc
...
...
@@ -33,7 +33,7 @@ public class KafkaConstant {
public
static
final
Integer
DATA_VERSION_ONE
=
1
;
public
static
final
Integer
ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS
=
3
000
;
public
static
final
Integer
ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS
=
5
000
;
public
static
final
Integer
KAFKA_SASL_SCRAM_ITERATIONS
=
8192
;
...
...
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/metric/KafkaMetricIndexEnum.java
已删除
100644 → 0
浏览文件 @
639b1f83
package
com.xiaojukeji.know.streaming.km.common.enums.metric
;
/**
* @author: D10865
* @description:
* @date: Create on 2019/3/11 下午2:19
* @modified By D10865
*
* 不同维度的es监控数据
*/
public
enum
KafkaMetricIndexEnum
{
/**
* topic 维度
*/
TOPIC_INFO
(
"ks_kafka_topic_metric"
),
/**
* 集群 维度
*/
CLUSTER_INFO
(
"ks_kafka_cluster_metric"
),
/**
* broker 维度
*/
BROKER_INFO
(
"ks_kafka_broker_metric"
),
/**
* partition 维度
*/
PARTITION_INFO
(
"ks_kafka_partition_metric"
),
/**
* group 维度
*/
GROUP_INFO
(
"ks_kafka_group_metric"
),
/**
* replication 维度
*/
REPLICATION_INFO
(
"ks_kafka_replication_metric"
),
;
private
String
index
;
KafkaMetricIndexEnum
(
String
index
)
{
this
.
index
=
index
;
}
public
String
getIndex
()
{
return
index
;
}
}
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java
浏览文件 @
d10a7bcc
...
...
@@ -130,6 +130,9 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
// 如果当前Broker还存活,则更新DB信息
BrokerPO
newBrokerPO
=
ConvertUtil
.
obj2Obj
(
presentAliveBroker
,
BrokerPO
.
class
);
if
(
presentAliveBroker
.
getEndpointMap
()
!=
null
)
{
newBrokerPO
.
setEndpointMap
(
ConvertUtil
.
obj2Json
(
presentAliveBroker
.
getEndpointMap
()));
}
newBrokerPO
.
setId
(
inDBBrokerPO
.
getId
());
newBrokerPO
.
setStatus
(
Constant
.
ALIVE
);
newBrokerPO
.
setCreateTime
(
inDBBrokerPO
.
getCreateTime
());
...
...
@@ -203,7 +206,7 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
lambdaQueryWrapper
.
eq
(
BrokerPO:
:
getClusterPhyId
,
clusterPhyId
);
lambdaQueryWrapper
.
eq
(
BrokerPO:
:
getBrokerId
,
brokerId
);
return
ConvertUtil
.
obj2Obj
(
brokerDAO
.
selectOne
(
lambdaQueryWrapper
),
Broker
.
class
);
return
Broker
.
buildFrom
(
brokerDAO
.
selectOne
(
lambdaQueryWrapper
)
);
}
@Override
...
...
@@ -272,9 +275,8 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
/**************************************************** private method ****************************************************/
private
List
<
Broker
>
listAllBrokersAndUpdateCache
(
Long
clusterPhyId
)
{
List
<
Broker
>
allBrokerList
=
ConvertUtil
.
list2List
(
this
.
getAllBrokerPOsFromDB
(
clusterPhyId
),
Broker
.
class
);
List
<
Broker
>
allBrokerList
=
getAllBrokerPOsFromDB
(
clusterPhyId
).
stream
().
map
(
elem
->
Broker
.
buildFrom
(
elem
)).
collect
(
Collectors
.
toList
()
);
brokersCache
.
put
(
clusterPhyId
,
allBrokerList
);
return
allBrokerList
;
}
...
...
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java
浏览文件 @
d10a7bcc
...
...
@@ -102,7 +102,10 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
AdminClient
adminClient
=
kafkaAdminClient
.
getClient
(
clusterPhyId
);
try
{
DescribeConsumerGroupsResult
describeConsumerGroupsResult
=
adminClient
.
describeConsumerGroups
(
Arrays
.
asList
(
groupName
),
new
DescribeConsumerGroupsOptions
().
timeoutMs
(
KafkaConstant
.
ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS
).
includeAuthorizedOperations
(
true
));
DescribeConsumerGroupsResult
describeConsumerGroupsResult
=
adminClient
.
describeConsumerGroups
(
Arrays
.
asList
(
groupName
),
new
DescribeConsumerGroupsOptions
().
timeoutMs
(
KafkaConstant
.
ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS
).
includeAuthorizedOperations
(
false
)
);
return
describeConsumerGroupsResult
.
all
().
get
().
get
(
groupName
);
}
catch
(
Exception
e
){
...
...
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java
浏览文件 @
d10a7bcc
...
...
@@ -207,7 +207,7 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
.
forEach
(
elem
->
topicPartitionOffsets
.
put
(
new
TopicPartition
(
topicName
,
elem
.
getPartitionId
()),
offsetSpec
));
try
{
return
(
Result
<
Map
<
TopicPartition
,
Long
>>)
doVCHandler
(
clusterPhyId
,
PARTITION_OFFSET_GET
,
new
PartitionOffsetParam
(
clusterPhyId
,
topicPartitionOffsets
,
timestamp
));
return
(
Result
<
Map
<
TopicPartition
,
Long
>>)
doVCHandler
(
clusterPhyId
,
PARTITION_OFFSET_GET
,
new
PartitionOffsetParam
(
clusterPhyId
,
topic
Name
,
topic
PartitionOffsets
,
timestamp
));
}
catch
(
VCHandlerNotExistException
e
)
{
return
Result
.
buildFailure
(
VC_HANDLE_NOT_EXIST
);
}
...
...
@@ -226,7 +226,7 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
.
forEach
(
elem
->
topicPartitionOffsets
.
put
(
new
TopicPartition
(
topicName
,
elem
.
getPartitionId
()),
offsetSpec
));
try
{
return
(
Result
<
Map
<
TopicPartition
,
Long
>>)
doVCHandler
(
clusterPhyId
,
PARTITION_OFFSET_GET
,
new
PartitionOffsetParam
(
clusterPhyId
,
topicPartitionOffsets
,
timestamp
));
return
(
Result
<
Map
<
TopicPartition
,
Long
>>)
doVCHandler
(
clusterPhyId
,
PARTITION_OFFSET_GET
,
new
PartitionOffsetParam
(
clusterPhyId
,
topic
Name
,
topic
PartitionOffsets
,
timestamp
));
}
catch
(
VCHandlerNotExistException
e
)
{
return
Result
.
buildFailure
(
VC_HANDLE_NOT_EXIST
);
}
...
...
@@ -300,7 +300,10 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
}
catch
(
NotExistException
nee
)
{
return
Result
.
buildFromRSAndMsg
(
ResultStatus
.
NOT_EXIST
,
MsgConstant
.
getClusterPhyNotExist
(
offsetParam
.
getClusterPhyId
()));
}
catch
(
Exception
e
)
{
log
.
error
(
"method=getPartitionOffsetFromKafkaAdminClient||clusterPhyId={}||errMsg=exception!"
,
offsetParam
.
getClusterPhyId
(),
e
);
log
.
error
(
"class=PartitionServiceImpl||method=getPartitionOffsetFromKafkaAdminClient||clusterPhyId={}||topicName={}||errMsg=exception!"
,
offsetParam
.
getClusterPhyId
(),
offsetParam
.
getTopicName
(),
e
);
return
Result
.
buildFromRSAndMsg
(
ResultStatus
.
KAFKA_OPERATE_FAILED
,
e
.
getMessage
());
}
...
...
@@ -355,7 +358,10 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
}
catch
(
NotExistException
nee
)
{
return
Result
.
buildFromRSAndMsg
(
ResultStatus
.
NOT_EXIST
,
MsgConstant
.
getClusterPhyNotExist
(
offsetParam
.
getClusterPhyId
()));
}
catch
(
Exception
e
)
{
log
.
error
(
"method=getPartitionOffsetFromKafkaConsumerClient||clusterPhyId={}||errMsg=exception!"
,
offsetParam
.
getClusterPhyId
(),
e
);
log
.
error
(
"class=PartitionServiceImpl||method=getPartitionOffsetFromKafkaConsumerClient||clusterPhyId={}||topicName={}||errMsg=exception!"
,
offsetParam
.
getClusterPhyId
(),
offsetParam
.
getTopicName
(),
e
);
return
Result
.
buildFromRSAndMsg
(
ResultStatus
.
KAFKA_OPERATE_FAILED
,
e
.
getMessage
());
}
finally
{
...
...
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java
浏览文件 @
d10a7bcc
...
...
@@ -64,11 +64,13 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric {
public
static
final
String
CLUSTER_METRIC_BYTES_OUT
=
"BytesOut"
;
public
static
final
String
CLUSTER_METRIC_BYTES_OUT_5_MIN
=
"BytesOut_min_5"
;
public
static
final
String
CLUSTER_METRIC_BYTES_OUT_15_MIN
=
"BytesOut_min_15"
;
public
static
final
String
CLUSTER_METRIC_GROUP
=
"Groups"
;
public
static
final
String
CLUSTER_METRIC_GROUP_ACTIVES
=
"GroupActives"
;
public
static
final
String
CLUSTER_METRIC_GROUP_EMPTYS
=
"GroupEmptys"
;
public
static
final
String
CLUSTER_METRIC_GROUP_REBALANCES
=
"GroupRebalances"
;
public
static
final
String
CLUSTER_METRIC_GROUP_DEADS
=
"GroupDeads"
;
public
static
final
String
CLUSTER_METRIC_ALIVE
=
"Alive"
;
public
static
final
String
CLUSTER_METRIC_ACL_ENABLE
=
"AclEnable"
;
...
...
km-dist/init/sql/ddl-ks-km.sql
浏览文件 @
d10a7bcc
...
...
@@ -13,6 +13,7 @@ CREATE TABLE `ks_km_broker` (
`status`
int
(
16
)
NOT
NULL
DEFAULT
'0'
COMMENT
'状态: 1存活,0未存活'
,
`create_time`
timestamp
NOT
NULL
DEFAULT
CURRENT_TIMESTAMP
COMMENT
'创建时间'
,
`update_time`
timestamp
NOT
NULL
DEFAULT
CURRENT_TIMESTAMP
ON
UPDATE
CURRENT_TIMESTAMP
COMMENT
'修改时间'
,
`endpoint_map`
varchar
(
1024
)
NOT
NULL
DEFAULT
''
COMMENT
'监听信息'
,
PRIMARY
KEY
(
`id`
),
UNIQUE
KEY
`uniq_cluster_phy_id_broker_id`
(
`cluster_phy_id`
,
`broker_id`
)
)
ENGINE
=
InnoDB
DEFAULT
CHARSET
=
utf8
COMMENT
=
'Broker信息表'
;
...
...
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/BaseESDAO.java
浏览文件 @
d10a7bcc
...
...
@@ -8,7 +8,7 @@ import org.springframework.beans.factory.annotation.Autowired;
/**
* 直接操作es集群的dao
*/
public
class
BaseESDAO
{
public
abstract
class
BaseESDAO
{
protected
static
final
ILog
LOGGER
=
LogFactory
.
getLog
(
"ES_LOGGER"
);
/**
...
...
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java
浏览文件 @
d10a7bcc
...
...
@@ -11,7 +11,11 @@ import com.didiglobal.logi.elasticsearch.client.request.batch.ESBatchRequest;
import
com.didiglobal.logi.elasticsearch.client.request.query.query.ESQueryRequest
;
import
com.didiglobal.logi.elasticsearch.client.response.batch.ESBatchResponse
;
import
com.didiglobal.logi.elasticsearch.client.response.batch.IndexResultItemNode
;
import
com.didiglobal.logi.elasticsearch.client.response.indices.gettemplate.ESIndicesGetTemplateResponse
;
import
com.didiglobal.logi.elasticsearch.client.response.indices.putindex.ESIndicesPutIndexResponse
;
import
com.didiglobal.logi.elasticsearch.client.response.indices.puttemplate.ESIndicesPutTemplateResponse
;
import
com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResponse
;
import
com.didiglobal.logi.elasticsearch.client.response.setting.template.TemplateConfig
;
import
com.didiglobal.logi.log.ILog
;
import
com.didiglobal.logi.log.LogFactory
;
import
com.google.common.collect.Lists
;
...
...
@@ -340,7 +344,94 @@ public class ESOpClient {
return
false
;
}
/**
* 根据表达式判断索引是否已存在
*/
public
boolean
indexExist
(
String
indexName
)
{
ESClient
esClient
=
null
;
try
{
esClient
=
this
.
getESClientFromPool
();
if
(
esClient
==
null
)
{
return
false
;
}
// 检查索引是否存在
return
esClient
.
admin
().
indices
().
prepareExists
(
indexName
).
execute
().
actionGet
(
30
,
TimeUnit
.
SECONDS
).
isExists
();
}
catch
(
Exception
e
){
LOGGER
.
warn
(
"class=ESOpClient||method=indexExist||indexName={}||msg=exception!"
,
indexName
,
e
);
}
finally
{
if
(
esClient
!=
null
)
{
returnESClientToPool
(
esClient
);
}
}
return
false
;
}
/**
* 创建索引
*/
public
boolean
createIndex
(
String
indexName
)
{
if
(
indexExist
(
indexName
))
{
return
true
;
}
ESClient
client
=
getESClientFromPool
();
if
(
client
!=
null
)
{
try
{
ESIndicesPutIndexResponse
response
=
client
.
admin
().
indices
().
preparePutIndex
(
indexName
).
execute
()
.
actionGet
(
30
,
TimeUnit
.
SECONDS
);
return
response
.
getAcknowledged
();
}
catch
(
Exception
e
){
LOGGER
.
warn
(
"msg=create index fail||indexName={}"
,
indexName
,
e
);
}
finally
{
returnESClientToPool
(
client
);
}
}
return
false
;
}
/**
* 创建索引模板
*/
public
boolean
createIndexTemplateIfNotExist
(
String
indexTemplateName
,
String
config
)
{
ESClient
esClient
=
null
;
try
{
esClient
=
this
.
getESClientFromPool
();
// 获取es中原来index template的配置
ESIndicesGetTemplateResponse
getTemplateResponse
=
esClient
.
admin
().
indices
().
prepareGetTemplate
(
indexTemplateName
).
execute
().
actionGet
(
30
,
TimeUnit
.
SECONDS
);
TemplateConfig
templateConfig
=
getTemplateResponse
.
getMultiTemplatesConfig
().
getSingleConfig
();
if
(
null
!=
templateConfig
)
{
return
true
;
}
// 创建新的模板
ESIndicesPutTemplateResponse
response
=
esClient
.
admin
().
indices
().
preparePutTemplate
(
indexTemplateName
)
.
setTemplateConfig
(
config
).
execute
().
actionGet
(
30
,
TimeUnit
.
SECONDS
);
return
response
.
getAcknowledged
();
}
catch
(
Exception
e
)
{
LOGGER
.
warn
(
"class=ESOpClient||method=createIndexTemplateIfNotExist||indexTemplateName={}||config={}||msg=exception!"
,
indexTemplateName
,
config
,
e
);
}
finally
{
if
(
esClient
!=
null
)
{
this
.
returnESClientToPool
(
esClient
);
}
}
return
false
;
}
/**************************************************** private method ****************************************************/
/**
* 执行查询
* @param request
...
...
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java
浏览文件 @
d10a7bcc
...
...
@@ -8,11 +8,12 @@ import com.google.common.collect.Maps;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.search.*
;
import
com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO
;
import
com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BaseMetricESPO
;
import
com.xiaojukeji.know.streaming.km.common.
enums.metric.KafkaMetricIndexEnum
;
import
com.xiaojukeji.know.streaming.km.common.
bean.vo.metrics.point.MetricPointVO
;
import
com.xiaojukeji.know.streaming.km.common.utils.IndexNameUtils
;
import
com.xiaojukeji.know.streaming.km.persistence.es.BaseESDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant
;
import
lombok.NoArgsConstructor
;
import
org.springframework.scheduling.annotation.Scheduled
;
import
org.springframework.util.CollectionUtils
;
import
java.util.*
;
...
...
@@ -25,7 +26,8 @@ public class BaseMetricESDAO extends BaseESDAO {
/**
* 操作的索引名称
*/
protected
String
indexName
;
protected
String
indexName
;
protected
String
indexTemplate
;
protected
static
final
Long
ONE_MIN
=
60
*
1000L
;
protected
static
final
Long
FIVE_MIN
=
5
*
ONE_MIN
;
...
...
@@ -35,10 +37,24 @@ public class BaseMetricESDAO extends BaseESDAO {
/**
* 不同维度 kafka 监控数据
*/
private
static
Map
<
KafkaMetricIndexEnum
,
BaseMetricESDAO
>
ariusStatsEsDaoMap
=
Maps
private
static
Map
<
String
,
BaseMetricESDAO
>
ariusStatsEsDaoMap
=
Maps
.
newConcurrentMap
();
public
static
BaseMetricESDAO
getByStatsType
(
KafkaMetricIndexEnum
statsType
)
{
/**
* 检查 es 索引是否存在,不存在则创建索引
*/
@Scheduled
(
cron
=
"0 3/5 * * * ?"
)
public
void
checkCurrentDayIndexExist
(){
String
realIndex
=
IndexNameUtils
.
genCurrentDailyIndexName
(
indexName
);
if
(
esOpClient
.
indexExist
(
realIndex
)){
return
;}
if
(
esOpClient
.
createIndexTemplateIfNotExist
(
indexName
,
indexTemplate
)){
esOpClient
.
createIndex
(
realIndex
);
}
}
public
static
BaseMetricESDAO
getByStatsType
(
String
statsType
)
{
return
ariusStatsEsDaoMap
.
get
(
statsType
);
}
...
...
@@ -48,7 +64,7 @@ public class BaseMetricESDAO extends BaseESDAO {
* @param statsType
* @param baseAriusStatsEsDao
*/
public
static
void
register
(
KafkaMetricIndexEnum
statsType
,
BaseMetricESDAO
baseAriusStatsEsDao
)
{
public
static
void
register
(
String
statsType
,
BaseMetricESDAO
baseAriusStatsEsDao
)
{
ariusStatsEsDaoMap
.
put
(
statsType
,
baseAriusStatsEsDao
);
}
...
...
@@ -358,7 +374,50 @@ public class BaseMetricESDAO extends BaseESDAO {
String
dsl
=
dslLoaderUtil
.
getFormatDslByFileName
(
DslsConstant
.
GET_LATEST_METRIC_TIME
,
startTime
,
endTime
,
appendQueryDsl
);
String
realIndexName
=
IndexNameUtils
.
genDailyIndexName
(
indexName
,
startTime
,
endTime
);
return
esOpClient
.
performRequest
(
realIndexName
,
dsl
,
s
->
s
.
getHits
().
getHits
().
isEmpty
()
?
System
.
currentTimeMillis
()
:
((
Map
<
String
,
Long
>)
s
.
getHits
().
getHits
().
get
(
0
).
getSource
()).
get
(
TIME_STAMP
),
3
);
return
esOpClient
.
performRequest
(
realIndexName
,
dsl
,
s
->
s
==
null
||
s
.
getHits
().
getHits
().
isEmpty
()
?
System
.
currentTimeMillis
()
:
((
Map
<
String
,
Long
>)
s
.
getHits
().
getHits
().
get
(
0
).
getSource
()).
get
(
TIME_STAMP
),
3
);
}
/**
* 对 metricPointVOS 进行缺点优化
*/
protected
List
<
MetricPointVO
>
optimizeMetricPoints
(
List
<
MetricPointVO
>
metricPointVOS
){
if
(
CollectionUtils
.
isEmpty
(
metricPointVOS
)){
return
metricPointVOS
;}
int
size
=
metricPointVOS
.
size
();
if
(
size
<
2
){
return
metricPointVOS
;}
Collections
.
sort
(
metricPointVOS
);
List
<
MetricPointVO
>
rets
=
new
ArrayList
<>();
for
(
int
first
=
0
,
second
=
first
+
1
;
second
<
size
;
first
++,
second
++){
MetricPointVO
firstPoint
=
metricPointVOS
.
get
(
first
);
MetricPointVO
secondPoint
=
metricPointVOS
.
get
(
second
);
if
(
null
!=
firstPoint
&&
null
!=
secondPoint
){
rets
.
add
(
firstPoint
);
//说明有空点,那就增加一个点
if
(
secondPoint
.
getTimeStamp
()
-
firstPoint
.
getTimeStamp
()
>
ONE_MIN
){
MetricPointVO
addPoint
=
new
MetricPointVO
();
addPoint
.
setName
(
firstPoint
.
getName
());
addPoint
.
setAggType
(
firstPoint
.
getAggType
());
addPoint
.
setValue
(
firstPoint
.
getValue
());
addPoint
.
setTimeStamp
(
firstPoint
.
getTimeStamp
()
+
ONE_MIN
);
rets
.
add
(
addPoint
);
}
if
(
second
==
size
-
1
){
rets
.
add
(
secondPoint
);
}
}
}
return
rets
;
}
}
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java
浏览文件 @
d10a7bcc
...
...
@@ -18,14 +18,16 @@ import java.util.*;
import
java.util.stream.Collectors
;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
constant
.
ESConstant
.*;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
enums
.
metric
.
KafkaMetricIndexEnum
.
BROKER_INFO
;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
constant
.
ESIndexConstant
.*
;
@Component
public
class
BrokerMetricESDAO
extends
BaseMetricESDAO
{
@PostConstruct
public
void
init
()
{
super
.
indexName
=
BROKER_INFO
.
getIndex
();
BaseMetricESDAO
.
register
(
BROKER_INFO
,
this
);
super
.
indexName
=
BROKER_INDEX
;
super
.
indexTemplate
=
BROKER_TEMPLATE
;
checkCurrentDayIndexExist
();
BaseMetricESDAO
.
register
(
indexName
,
this
);
}
protected
FutureWaitUtil
<
Void
>
queryFuture
=
FutureWaitUtil
.
init
(
"BrokerMetricESDAO"
,
4
,
8
,
500
);
...
...
@@ -258,7 +260,7 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
}
}
);
metricMap
.
put
(
metric
,
metricPoints
);
metricMap
.
put
(
metric
,
optimizeMetricPoints
(
metricPoints
)
);
}
return
metricMap
;
...
...
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java
浏览文件 @
d10a7bcc
...
...
@@ -23,15 +23,17 @@ import java.util.List;
import
java.util.Map
;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
constant
.
ESConstant
.*;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
enums
.
metric
.
KafkaMetricIndexEnum
.
CLUSTER_INFO
;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
constant
.
ESIndexConstant
.*
;
@Component
public
class
ClusterMetricESDAO
extends
BaseMetricESDAO
{
@PostConstruct
public
void
init
()
{
super
.
indexName
=
CLUSTER_INFO
.
getIndex
();
BaseMetricESDAO
.
register
(
CLUSTER_INFO
,
this
);
super
.
indexName
=
CLUSTER_INDEX
;
super
.
indexTemplate
=
CLUSTER_TEMPLATE
;
checkCurrentDayIndexExist
();
BaseMetricESDAO
.
register
(
indexName
,
this
);
}
protected
FutureWaitUtil
<
Void
>
queryFuture
=
FutureWaitUtil
.
init
(
"ClusterMetricESDAO"
,
4
,
8
,
500
);
...
...
@@ -207,7 +209,7 @@ public class ClusterMetricESDAO extends BaseMetricESDAO {
}
}
);
metricMap
.
put
(
metric
,
metricPoints
);
metricMap
.
put
(
metric
,
optimizeMetricPoints
(
metricPoints
)
);
}
return
metricMap
;
...
...
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java
浏览文件 @
d10a7bcc
...
...
@@ -23,16 +23,17 @@ import java.util.stream.Collectors;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
constant
.
Constant
.
ZERO
;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
constant
.
ESConstant
.*;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
constant
.
ESConstant
.
KEY
;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
enums
.
metric
.
KafkaMetricIndexEnum
.
GROUP_INFO
;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
constant
.
ESIndexConstant
.*;
@Component
public
class
GroupMetricESDAO
extends
BaseMetricESDAO
{
@PostConstruct
public
void
init
()
{
super
.
indexName
=
GROUP_INFO
.
getIndex
();
BaseMetricESDAO
.
register
(
GROUP_INFO
,
this
);
super
.
indexName
=
GROUP_INDEX
;
super
.
indexTemplate
=
GROUP_TEMPLATE
;
checkCurrentDayIndexExist
();
BaseMetricESDAO
.
register
(
indexName
,
this
);
}
protected
FutureWaitUtil
<
Void
>
queryFuture
=
FutureWaitUtil
.
init
(
"GroupMetricESDAO"
,
4
,
8
,
500
);
...
...
@@ -206,7 +207,7 @@ public class GroupMetricESDAO extends BaseMetricESDAO {
}
}
);
metricMap
.
put
(
metric
,
metricPoints
);
metricMap
.
put
(
metric
,
optimizeMetricPoints
(
metricPoints
)
);
}
return
metricMap
;
...
...
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/PartitionMetricESDAO.java
浏览文件 @
d10a7bcc
...
...
@@ -8,7 +8,7 @@ import javax.annotation.PostConstruct;
import
java.util.List
;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
enums
.
metric
.
KafkaMetricIndexEnum
.
PARTITION_INFO
;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
constant
.
ESIndexConstant
.*
;
/**
* @author didi
...
...
@@ -18,8 +18,10 @@ public class PartitionMetricESDAO extends BaseMetricESDAO {
@PostConstruct
public
void
init
()
{
super
.
indexName
=
PARTITION_INFO
.
getIndex
();
BaseMetricESDAO
.
register
(
PARTITION_INFO
,
this
);
super
.
indexName
=
PARTITION_INDEX
;
super
.
indexTemplate
=
PARTITION_TEMPLATE
;
checkCurrentDayIndexExist
();
BaseMetricESDAO
.
register
(
indexName
,
this
);
}
public
PartitionMetricPO
getPartitionLatestMetrics
(
Long
clusterPhyId
,
String
topic
,
...
...
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ReplicationMetricESDAO.java
浏览文件 @
d10a7bcc
...
...
@@ -14,7 +14,7 @@ import java.util.List;
import
java.util.Map
;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
constant
.
ESConstant
.
VALUE
;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
enums
.
metric
.
KafkaMetricIndexEnum
.
REPLICATION_INFO
;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
constant
.
ESIndexConstant
.*
;
/**
* @author didi
...
...
@@ -24,8 +24,10 @@ public class ReplicationMetricESDAO extends BaseMetricESDAO {
@PostConstruct
public
void
init
()
{
super
.
indexName
=
REPLICATION_INFO
.
getIndex
();
BaseMetricESDAO
.
register
(
REPLICATION_INFO
,
this
);
super
.
indexName
=
REPLICATION_INDEX
;
super
.
indexTemplate
=
REPLICATION_TEMPLATE
;
checkCurrentDayIndexExist
();
BaseMetricESDAO
.
register
(
indexName
,
this
);
}
/**
...
...
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java
浏览文件 @
d10a7bcc
...
...
@@ -22,15 +22,17 @@ import java.util.*;
import
java.util.stream.Collectors
;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
constant
.
ESConstant
.*;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
enums
.
metric
.
KafkaMetricIndexEnum
.
TOPIC_INFO
;
import
static
com
.
xiaojukeji
.
know
.
streaming
.
km
.
common
.
constant
.
ESIndexConstant
.*
;
@Component
public
class
TopicMetricESDAO
extends
BaseMetricESDAO
{
@PostConstruct
public
void
init
()
{
super
.
indexName
=
TOPIC_INFO
.
getIndex
();
BaseMetricESDAO
.
register
(
TOPIC_INFO
,
this
);
super
.
indexName
=
TOPIC_INDEX
;
super
.
indexTemplate
=
TOPIC_TEMPLATE
;
checkCurrentDayIndexExist
();
BaseMetricESDAO
.
register
(
indexName
,
this
);
}
protected
FutureWaitUtil
<
Void
>
queryFuture
=
FutureWaitUtil
.
init
(
"TopicMetricESDAO"
,
4
,
8
,
500
);
...
...
@@ -352,7 +354,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
}
}
);
metricMap
.
put
(
metric
,
metricPoints
);
metricMap
.
put
(
metric
,
optimizeMetricPoints
(
metricPoints
)
);
}
return
metricMap
;
...
...
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java
浏览文件 @
d10a7bcc
...
...
@@ -165,8 +165,8 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler {
clusterPhy
.
getId
(),
brokerId
,
broker
.
getStartTimestamp
(),
broker
.
getHost
(),
broker
.
getJmxPort
()
!=
null
?
broker
.
getJmxPort
()
:
jmxConfig
.
getJmxPort
(),
jmxConfig
!=
null
?
broker
.
getJmxHost
(
jmxConfig
.
getUseWhichEndpoint
())
:
broker
.
getHost
(),
broker
.
getJmxPort
()
!=
null
?
broker
.
getJmxPort
()
:
jmxConfig
.
getJmxPort
(),
jmxConfig
);
...
...
@@ -191,6 +191,6 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler {
lambdaQueryWrapper
.
eq
(
BrokerPO:
:
getStatus
,
Constant
.
ALIVE
);
BrokerPO
brokerPO
=
brokerDAO
.
selectOne
(
lambdaQueryWrapper
);
return
ConvertUtil
.
obj2Obj
(
brokerPO
,
Broker
.
class
);
return
Broker
.
buildFrom
(
brokerPO
);
}
}
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/broker/BrokerDAO.java
浏览文件 @
d10a7bcc
...
...
@@ -6,5 +6,4 @@ import org.springframework.stereotype.Repository;
@Repository
public
interface
BrokerDAO
extends
BaseMapper
<
BrokerPO
>
{
int
replace
(
BrokerPO
brokerPO
);
}
km-persistence/src/main/resources/mybatis/BrokerMapper.xml
浏览文件 @
d10a7bcc
...
...
@@ -14,12 +14,7 @@
<result
column=
"jmx_port"
property=
"jmxPort"
/>
<result
column=
"start_timestamp"
property=
"startTimestamp"
/>
<result
column=
"status"
property=
"status"
/>
<result
column=
"endpoint_map"
property=
"endpointMap"
/>
</resultMap>
<insert
id=
"replace"
parameterType=
"com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO"
>
REPLACE ks_km_broker
(cluster_phy_id, broker_id, host, port, jmx_port, start_timestamp, status, update_time)
VALUES
(#{clusterPhyId}, #{brokerId}, #{host}, #{port}, #{jmxPort}, #{startTimestamp}, #{status}, #{updateTime})
</insert>
</mapper>
km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java
浏览文件 @
d10a7bcc
...
...
@@ -20,8 +20,8 @@ public class ClusterMetricESDAOTest extends KnowStreamApplicationTest {
@Test
public
void
listClusterMetricsByClusterIdsTest
(){
List
<
String
>
metrics
=
Arrays
.
asList
(
"
BytesIn_min_1"
,
"BytesOut_min_1
"
);
List
<
Long
>
clusterIds
=
Arrays
.
asList
(
12
3L
);
List
<
String
>
metrics
=
Arrays
.
asList
(
"
MessagesIn
"
);
List
<
Long
>
clusterIds
=
Arrays
.
asList
(
29
3L
);
Long
endTime
=
System
.
currentTimeMillis
();
Long
startTime
=
endTime
-
4
*
60
*
60
*
1000
;
...
...
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/HealthCheckTask.java
浏览文件 @
d10a7bcc
...
...
@@ -16,6 +16,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import
com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService
;
import
com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService
;
import
com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask
;
import
com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService
;
import
lombok.AllArgsConstructor
;
import
lombok.NoArgsConstructor
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
...
@@ -24,11 +25,18 @@ import java.util.*;
@NoArgsConstructor
@AllArgsConstructor
@Task
(
name
=
"HealthCheckTask"
,
description
=
"健康检查"
,
cron
=
"0 0/1 * * * ? *"
,
autoRegister
=
true
,
consensual
=
ConsensualEnum
.
BROADCAST
,
timeout
=
2
*
60
)
@Task
(
name
=
"HealthCheckTask"
,
description
=
"健康检查"
,
cron
=
"0 0/1 * * * ? *"
,
autoRegister
=
true
,
consensual
=
ConsensualEnum
.
BROADCAST
,
timeout
=
2
*
60
)
public
class
HealthCheckTask
extends
AbstractClusterPhyDispatchTask
{
private
static
final
ILog
log
=
LogFactory
.
getLog
(
HealthCheckTask
.
class
);
@Autowired
private
TaskThreadPoolService
taskThreadPoolService
;
@Autowired
private
HealthCheckResultService
healthCheckResultService
;
...
...
@@ -38,6 +46,16 @@ public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
@Override
public
TaskResult
processSubTask
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
taskThreadPoolService
.
submitHeavenTask
(
String
.
format
(
"TaskName=%s clusterPhyId=%d"
,
this
.
taskName
,
clusterPhy
.
getId
()),
100000
,
()
->
this
.
calAndUpdateHealthCheckResult
(
clusterPhy
,
triggerTimeUnitMs
)
);
return
TaskResult
.
SUCCESS
;
}
private
void
calAndUpdateHealthCheckResult
(
ClusterPhy
clusterPhy
,
long
triggerTimeUnitMs
)
{
// 获取配置,<配置名,配置信息>
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
=
healthCheckResultService
.
getClusterHealthConfig
(
clusterPhy
.
getId
());
...
...
@@ -73,8 +91,6 @@ public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
}
catch
(
Exception
e
)
{
log
.
error
(
"method=processSubTask||clusterPhyId={}||errMsg=exception!"
,
clusterPhy
.
getId
(),
e
);
}
return
TaskResult
.
SUCCESS
;
}
private
List
<
HealthCheckResult
>
getNoResResult
(
Long
clusterPhyId
,
AbstractHealthCheckService
healthCheckService
,
Map
<
String
,
BaseClusterHealthConfig
>
healthConfigMap
)
{
...
...
km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/TaskThreadPoolService.java
浏览文件 @
d10a7bcc
...
...
@@ -10,7 +10,7 @@ import javax.annotation.PostConstruct;
/**
* 为了尽量避免大任务的执行,由LogIJob的线程执行,
* 因此,在Task模块,需要有自己的线程池来执行相关任务,
* 而
FutureUtils
Service 的职责就是负责任务的执行。
* 而
TaskThreadPool
Service 的职责就是负责任务的执行。
*/
@Service
@NoArgsConstructor
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录