Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
DiDi
kafka-manager
提交
91332059
K
kafka-manager
项目概览
DiDi
/
kafka-manager
9 个月 前同步成功
通知
58
Star
6372
Fork
1229
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
K
kafka-manager
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
91332059
编写于
9月 22, 2022
作者:
E
EricZeng
提交者:
GitHub
9月 22, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #610 from didi/dev
合并开发分支
上级
f234f740
725ac10c
变更
38
显示空白变更内容
内联
并排
Showing
38 changed file
with
350 addition
and
81 deletion
+350
-81
docs/install_guide/单机部署手册.md
docs/install_guide/单机部署手册.md
+99
-0
docs/user_guide/faq.md
docs/user_guide/faq.md
+16
-0
km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java
...eaming/km/biz/version/impl/VersionControlManagerImpl.java
+42
-21
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/metrices/MetricDetailDTO.java
...treaming/km/common/bean/dto/metrices/MetricDetailDTO.java
+28
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/metrices/UserMetricConfigDTO.java
...ming/km/common/bean/dto/metrices/UserMetricConfigDTO.java
+4
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java
...i/know/streaming/km/common/bean/entity/broker/Broker.java
+0
-15
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/metric/UserMetricConfig.java
...km/common/bean/entity/config/metric/UserMetricConfig.java
+17
-1
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/config/metric/UserMetricConfigVO.java
...g/km/common/bean/vo/config/metric/UserMetricConfigVO.java
+3
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java
...iaojukeji/know/streaming/km/common/constant/Constant.java
+1
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java
...jukeji/know/streaming/km/common/constant/MsgConstant.java
+4
-0
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxConnectorWrap.java
...jukeji/know/streaming/km/common/jmx/JmxConnectorWrap.java
+39
-0
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/AbstractZKWatcher.java
.../know/streaming/km/core/flusher/zk/AbstractZKWatcher.java
+1
-1
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/AbstractZKHandler.java
...reaming/km/core/flusher/zk/handler/AbstractZKHandler.java
+1
-1
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/BrokersNodeChangeHandler.java
.../km/core/flusher/zk/handler/BrokersNodeChangeHandler.java
+1
-1
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ConfigNotificationNodeChangeHandler.java
...usher/zk/handler/ConfigNotificationNodeChangeHandler.java
+4
-4
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java
.../core/flusher/zk/handler/ControllerNodeChangeHandler.java
+1
-1
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/TopicsNodeChangeHandler.java
...g/km/core/flusher/zk/handler/TopicsNodeChangeHandler.java
+1
-1
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java
...eaming/km/core/service/broker/impl/BrokerServiceImpl.java
+2
-6
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterValidateServiceImpl.java
...core/service/cluster/impl/ClusterValidateServiceImpl.java
+2
-2
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java
...vice/kafkacontroller/impl/KafkaControllerServiceImpl.java
+1
-1
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java
.../km/core/service/partition/impl/PartitionServiceImpl.java
+17
-5
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java
...eaming/km/core/service/topic/impl/OpTopicServiceImpl.java
+1
-1
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicConfigServiceImpl.java
...ng/km/core/service/topic/impl/TopicConfigServiceImpl.java
+1
-1
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java
...treaming/km/core/service/topic/impl/TopicServiceImpl.java
+1
-1
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/package-info.java
...treaming/km/persistence/kafka/zookeeper/package-info.java
+4
-0
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/KafkaZKDAO.java
...ng/km/persistence/kafka/zookeeper/service/KafkaZKDAO.java
+1
-1
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/impl/KafkaZKDAOImpl.java
...sistence/kafka/zookeeper/service/impl/KafkaZKDAOImpl.java
+22
-8
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/ControllerData.java
.../km/persistence/kafka/zookeeper/znode/ControllerData.java
+1
-1
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/BrokerMetadata.java
...istence/kafka/zookeeper/znode/brokers/BrokerMetadata.java
+1
-1
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/PartitionMap.java
...rsistence/kafka/zookeeper/znode/brokers/PartitionMap.java
+1
-1
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/PartitionState.java
...istence/kafka/zookeeper/znode/brokers/PartitionState.java
+1
-1
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/TopicMetadata.java
...sistence/kafka/zookeeper/znode/brokers/TopicMetadata.java
+1
-1
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationBaseData.java
...keeper/znode/config/ConfigChangeNotificationBaseData.java
+1
-1
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationDataV1.java
...ookeeper/znode/config/ConfigChangeNotificationDataV1.java
+1
-1
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationDataV2.java
...ookeeper/znode/config/ConfigChangeNotificationDataV2.java
+1
-1
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigNodeData.java
...sistence/kafka/zookeeper/znode/config/ConfigNodeData.java
+1
-1
km-task/pom.xml
km-task/pom.xml
+14
-0
pom.xml
pom.xml
+13
-0
未找到文件。
docs/install_guide/单机部署手册.md
浏览文件 @
91332059
...
...
@@ -59,6 +59,8 @@ sh deploy_KnowStreaming-offline.sh
### 2.1.3、容器部署
#### 2.1.3.1、Helm
**环境依赖**
-
Kubernetes >= 1.14 ,Helm >= 2.17.0
...
...
@@ -87,6 +89,103 @@ helm pull knowstreaming/knowstreaming-manager
#### 2.1.3.2、Docker Compose
```
yml
version
:
"
3"
services
:
knowstreaming-manager
:
image
:
knowstreaming/knowstreaming-manager:0.2.0-test
container_name
:
knowstreaming-manager
privileged
:
true
restart
:
always
depends_on
:
-
elasticsearch-single
-
knowstreaming-mysql
expose
:
-
80
command
:
-
/bin/sh
-
/ks-start.sh
environment
:
TZ
:
Asia/Shanghai
SERVER_MYSQL_ADDRESS
:
knowstreaming-mysql:3306
SERVER_MYSQL_DB
:
know_streaming
SERVER_MYSQL_USER
:
root
SERVER_MYSQL_PASSWORD
:
admin2022_
SERVER_ES_ADDRESS
:
elasticsearch-single:9200
JAVA_OPTS
:
-Xmx1g -Xms1g
# extra_hosts:
# - "hostname:x.x.x.x"
# volumes:
# - /ks/manage/log:/logs
knowstreaming-ui
:
image
:
knowstreaming/knowstreaming-ui:0.2.0-test1
container_name
:
knowstreaming-ui
restart
:
always
ports
:
-
'
18092:80'
environment
:
TZ
:
Asia/Shanghai
depends_on
:
-
knowstreaming-manager
# extra_hosts:
# - "hostname:x.x.x.x"
elasticsearch-single
:
image
:
docker.io/library/elasticsearch:7.6.2
container_name
:
elasticsearch-single
restart
:
always
expose
:
-
9200
-
9300
# ports:
# - '9200:9200'
# - '9300:9300'
environment
:
TZ
:
Asia/Shanghai
ES_JAVA_OPTS
:
-Xms512m -Xmx512m
discovery.type
:
single-node
# volumes:
# - /ks/es/data:/usr/share/elasticsearch/data
knowstreaming-init
:
image
:
knowstreaming/knowstreaming-manager:0.2.0-test
container_name
:
knowstreaming_init
depends_on
:
-
elasticsearch-single
command
:
-
/bin/bash
-
/es_template_create.sh
environment
:
TZ
:
Asia/Shanghai
SERVER_ES_ADDRESS
:
elasticsearch-single:9200
knowstreaming-mysql
:
image
:
knowstreaming/knowstreaming-mysql:0.2.0-test
container_name
:
knowstreaming-mysql
restart
:
always
environment
:
TZ
:
Asia/Shanghai
MYSQL_ROOT_PASSWORD
:
admin2022_
MYSQL_DATABASE
:
know_streaming
MYSQL_ROOT_HOST
:
'
%'
expose
:
-
3306
# ports:
# - '3306:3306'
# volumes:
# - /ks/mysql/data:/data/mysql
```
### 2.1.4、手动部署
**部署流程**
...
...
docs/user_guide/faq.md
浏览文件 @
91332059
...
...
@@ -166,3 +166,19 @@ Node 版本: v12.22.12
需要到具体的应用中执行
`npm run start`
,例如
`cd packages/layout-clusters-fe`
后,执行
`npm run start`
。
应用启动后需要到基座应用中查看(需要启动基座应用,即 layout-clusters-fe)。
## 8.12、权限识别失败问题
1、使用admin账号登陆KnowStreaming时,点击系统管理-用户管理-角色管理-新增角色,查看页面是否正常。
<img
src=
"http://img-ys011.didistatic.com/static/dc2img/do1_gwGfjN9N92UxzHU8dfzr"
width =
"400"
>
2、查看'/logi-security/api/v1/permission/tree'接口返回值,出现如下图所示乱码现象。
![
接口返回值
](
http://img-ys011.didistatic.com/static/dc2img/do1_jTxBkwNGU9vZuYQQbdNw
)
3、查看logi_security_permission表,看看是否出现了中文乱码现象。
根据以上几点,我们可以确定是由于数据库乱码造成的权限识别失败问题。
+
原因:由于数据库编码和我们提供的脚本不一致,数据库里的数据发生了乱码,因此出现权限识别失败问题。
+
解决方案:清空数据库数据,将数据库字符集调整为utf8,最后重新执行
[
dml-logi.sql
](
https://github.com/didi/KnowStreaming/blob/master/km-dist/init/sql/dml-logi.sql
)
脚本导入数据即可。
km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java
浏览文件 @
91332059
...
...
@@ -7,12 +7,14 @@ import com.didiglobal.logi.log.LogFactory;
import
com.didiglobal.logi.security.common.dto.config.ConfigDTO
;
import
com.didiglobal.logi.security.service.ConfigService
;
import
com.xiaojukeji.know.streaming.km.biz.version.VersionControlManager
;
import
com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDetailDTO
;
import
com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.UserMetricConfigDTO
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.config.metric.UserMetricConfig
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem
;
import
com.xiaojukeji.know.streaming.km.common.bean.vo.config.metric.UserMetricConfigVO
;
import
com.xiaojukeji.know.streaming.km.common.bean.vo.version.VersionItemVO
;
import
com.xiaojukeji.know.streaming.km.common.constant.Constant
;
import
com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum
;
import
com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil
;
import
com.xiaojukeji.know.streaming.km.common.utils.VersionUtil
;
...
...
@@ -47,29 +49,29 @@ public class VersionControlManagerImpl implements VersionControlManager {
@PostConstruct
public
void
init
(){
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_TOPIC
.
getCode
(),
TOPIC_METRIC_HEALTH_SCORE
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_TOPIC
.
getCode
(),
TOPIC_METRIC_TOTAL_PRODUCE_REQUESTS
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_TOPIC
.
getCode
(),
TOPIC_METRIC_FAILED_FETCH_REQ
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_TOPIC
.
getCode
(),
TOPIC_METRIC_FAILED_PRODUCE_REQ
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_TOPIC
.
getCode
(),
TOPIC_METRIC_MESSAGE_IN
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_TOPIC
.
getCode
(),
TOPIC_METRIC_UNDER_REPLICA_PARTITIONS
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_TOPIC
.
getCode
(),
TOPIC_METRIC_TOTAL_PRODUCE_REQUESTS
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_TOPIC
.
getCode
(),
TOPIC_METRIC_BYTES_IN
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_TOPIC
.
getCode
(),
TOPIC_METRIC_BYTES_OUT
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_TOPIC
.
getCode
(),
TOPIC_METRIC_BYTES_REJECTED
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_TOPIC
.
getCode
(),
TOPIC_METRIC_MESSAGE_IN
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_HEALTH_SCORE
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_TOTAL_REQ_QUEUE_SIZE
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_TOTAL_RES_QUEUE_SIZE
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_ACTIVE_CONTROLLER_COUNT
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_TOTAL_PRODUCE_REQ
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_TOTAL_LOG_SIZE
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_CONNECTIONS
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_MESSAGES_IN
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_BYTES_IN
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_BYTES_OUT
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_
GROUP_REBALANCE
S
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_
JOB_RUNNING
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_
CONNECTION
S
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_
MESSAGES_IN
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_PARTITIONS_NO_LEADER
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_PARTITION_URP
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_TOTAL_LOG_SIZE
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_TOTAL_PRODUCE_REQ
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_TOTAL_REQ_QUEUE_SIZE
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_TOTAL_RES_QUEUE_SIZE
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_GROUP_REBALANCES
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_CLUSTER
.
getCode
(),
CLUSTER_METRIC_JOB_RUNNING
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_GROUP
.
getCode
(),
GROUP_METRIC_OFFSET_CONSUMED
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_GROUP
.
getCode
(),
GROUP_METRIC_LAG
,
true
));
...
...
@@ -77,18 +79,18 @@ public class VersionControlManagerImpl implements VersionControlManager {
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_GROUP
.
getCode
(),
GROUP_METRIC_HEALTH_SCORE
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_HEALTH_SCORE
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_TOTAL_REQ_QUEUE
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_TOTAL_RES_QUEUE
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_CONNECTION_COUNT
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_MESSAGE_IN
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_TOTAL_PRODUCE_REQ
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_NETWORK_RPO_AVG_IDLE
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_REQ_AVG_IDLE
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_CONNECTION_COUNT
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_BYTES_IN
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_BYTES_OUT
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_PARTITIONS_SKEW
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_TOTAL_PRODUCE_REQ
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_TOTAL_REQ_QUEUE
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_TOTAL_RES_QUEUE
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_LEADERS_SKEW
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_UNDER_REPLICATE_PARTITION
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_PARTITIONS_SKEW
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_BYTES_IN
,
true
));
defaultMetrics
.
add
(
new
UserMetricConfig
(
METRIC_BROKER
.
getCode
(),
BROKER_METRIC_BYTES_OUT
,
true
));
}
@Autowired
...
...
@@ -159,6 +161,9 @@ public class VersionControlManagerImpl implements VersionControlManager {
UserMetricConfig
umc
=
userMetricConfigMap
.
get
(
itemType
+
"@"
+
metric
);
userMetricConfigVO
.
setSet
(
null
!=
umc
&&
umc
.
isSet
());
if
(
umc
!=
null
)
{
userMetricConfigVO
.
setRank
(
umc
.
getRank
());
}
userMetricConfigVO
.
setName
(
itemVO
.
getName
());
userMetricConfigVO
.
setType
(
itemVO
.
getType
());
userMetricConfigVO
.
setDesc
(
itemVO
.
getDesc
());
...
...
@@ -178,13 +183,29 @@ public class VersionControlManagerImpl implements VersionControlManager {
@Override
public
Result
<
Void
>
updateUserMetricItem
(
Long
clusterId
,
Integer
type
,
UserMetricConfigDTO
dto
,
String
operator
)
{
Map
<
String
,
Boolean
>
metricsSetMap
=
dto
.
getMetricsSet
();
if
(
null
==
metricsSetMap
||
metricsSetMap
.
isEmpty
()){
//转换metricDetailDTOList
List
<
MetricDetailDTO
>
metricDetailDTOList
=
dto
.
getMetricDetailDTOList
();
Map
<
String
,
MetricDetailDTO
>
metricDetailMap
=
new
HashMap
<>();
if
(
metricDetailDTOList
!=
null
&&
!
metricDetailDTOList
.
isEmpty
())
{
metricDetailMap
=
metricDetailDTOList
.
stream
().
collect
(
Collectors
.
toMap
(
MetricDetailDTO:
:
getMetric
,
Function
.
identity
()));
}
//转换metricsSetMap
if
(
metricsSetMap
!=
null
&&
!
metricsSetMap
.
isEmpty
())
{
for
(
Map
.
Entry
<
String
,
Boolean
>
metricAndShowEntry
:
metricsSetMap
.
entrySet
())
{
if
(
metricDetailMap
.
containsKey
(
metricAndShowEntry
.
getKey
()))
continue
;
metricDetailMap
.
put
(
metricAndShowEntry
.
getKey
(),
new
MetricDetailDTO
(
metricAndShowEntry
.
getKey
(),
metricAndShowEntry
.
getValue
(),
null
));
}
}
if
(
metricDetailMap
.
isEmpty
())
{
return
Result
.
buildSuc
();
}
Set
<
UserMetricConfig
>
userMetricConfigs
=
getUserMetricConfig
(
operator
);
for
(
Map
.
Entry
<
String
,
Boolean
>
metricAndShowEntry
:
metricsSetMap
.
entrySet
())
{
UserMetricConfig
userMetricConfig
=
new
UserMetricConfig
(
type
,
metric
AndShowEntry
.
getKey
(),
metricAndShowEntry
.
getValue
());
for
(
MetricDetailDTO
metricDetailDTO
:
metricDetailMap
.
values
())
{
UserMetricConfig
userMetricConfig
=
new
UserMetricConfig
(
type
,
metric
DetailDTO
.
getMetric
(),
metricDetailDTO
.
getSet
(),
metricDetailDTO
.
getRank
());
userMetricConfigs
.
remove
(
userMetricConfig
);
userMetricConfigs
.
add
(
userMetricConfig
);
}
...
...
@@ -228,7 +249,7 @@ public class VersionControlManagerImpl implements VersionControlManager {
return
defaultMetrics
;
}
return
JSON
.
parseObject
(
value
,
new
TypeReference
<
Set
<
UserMetricConfig
>>(){});
return
JSON
.
parseObject
(
value
,
new
TypeReference
<
Set
<
UserMetricConfig
>>()
{});
}
public
static
void
main
(
String
[]
args
){
...
...
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/metrices/MetricDetailDTO.java
0 → 100644
浏览文件 @
91332059
package
com.xiaojukeji.know.streaming.km.common.bean.dto.metrices
;
import
com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO
;
import
io.swagger.annotations.ApiModel
;
import
io.swagger.annotations.ApiModelProperty
;
import
lombok.AllArgsConstructor
;
import
lombok.Data
;
import
lombok.NoArgsConstructor
;
/**
* @author didi
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ApiModel
(
description
=
"指标详细属性信息"
)
public
class
MetricDetailDTO
extends
BaseDTO
{
@ApiModelProperty
(
"指标名称"
)
private
String
metric
;
@ApiModelProperty
(
"指标是否显示"
)
private
Boolean
set
;
@ApiModelProperty
(
"指标优先级"
)
private
Integer
rank
;
}
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/metrices/UserMetricConfigDTO.java
浏览文件 @
91332059
...
...
@@ -7,6 +7,7 @@ import lombok.AllArgsConstructor;
import
lombok.Data
;
import
lombok.NoArgsConstructor
;
import
java.util.List
;
import
java.util.Map
;
...
...
@@ -17,4 +18,7 @@ import java.util.Map;
public
class
UserMetricConfigDTO
extends
BaseDTO
{
@ApiModelProperty
(
"指标展示设置项,key:指标名;value:是否展现(true展现/false不展现)"
)
private
Map
<
String
,
Boolean
>
metricsSet
;
@ApiModelProperty
(
"指标自定义属性列表"
)
private
List
<
MetricDetailDTO
>
metricDetailDTOList
;
}
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java
浏览文件 @
91332059
...
...
@@ -5,7 +5,6 @@ import com.alibaba.fastjson.TypeReference;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData
;
import
com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO
;
import
com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil
;
import
com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.BrokerMetadata
;
import
lombok.AllArgsConstructor
;
import
lombok.Data
;
import
lombok.NoArgsConstructor
;
...
...
@@ -79,20 +78,6 @@ public class Broker implements Serializable {
return
metadata
;
}
public
static
Broker
buildFrom
(
Long
clusterPhyId
,
Integer
brokerId
,
BrokerMetadata
brokerMetadata
)
{
Broker
metadata
=
new
Broker
();
metadata
.
setClusterPhyId
(
clusterPhyId
);
metadata
.
setBrokerId
(
brokerId
);
metadata
.
setHost
(
brokerMetadata
.
getHost
());
metadata
.
setPort
(
brokerMetadata
.
getPort
());
metadata
.
setJmxPort
(
brokerMetadata
.
getJmxPort
());
metadata
.
setStartTimestamp
(
brokerMetadata
.
getTimestamp
());
metadata
.
setRack
(
brokerMetadata
.
getRack
());
metadata
.
setStatus
(
1
);
metadata
.
setEndpointMap
(
brokerMetadata
.
getEndpointMap
());
return
metadata
;
}
public
static
Broker
buildFrom
(
BrokerPO
brokerPO
)
{
Broker
broker
=
ConvertUtil
.
obj2Obj
(
brokerPO
,
Broker
.
class
);
String
endpointMapStr
=
brokerPO
.
getEndpointMap
();
...
...
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/metric/UserMetricConfig.java
浏览文件 @
91332059
package
com.xiaojukeji.know.streaming.km.common.bean.entity.config.metric
;
import
com.xiaojukeji.know.streaming.km.common.constant.Constant
;
import
lombok.AllArgsConstructor
;
import
lombok.Data
;
import
lombok.NoArgsConstructor
;
@Data
@NoArgsConstructor
@AllArgsConstructor
public
class
UserMetricConfig
{
private
int
type
;
...
...
@@ -15,6 +15,22 @@ public class UserMetricConfig {
private
boolean
set
;
private
Integer
rank
;
public
UserMetricConfig
(
int
type
,
String
metric
,
boolean
set
,
Integer
rank
)
{
this
.
type
=
type
;
this
.
metric
=
metric
;
this
.
set
=
set
;
this
.
rank
=
rank
;
}
public
UserMetricConfig
(
int
type
,
String
metric
,
boolean
set
)
{
this
.
type
=
type
;
this
.
metric
=
metric
;
this
.
set
=
set
;
this
.
rank
=
null
;
}
@Override
public
int
hashCode
(){
return
metric
.
hashCode
()
<<
1
+
type
;
...
...
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/config/metric/UserMetricConfigVO.java
浏览文件 @
91332059
...
...
@@ -14,4 +14,7 @@ import lombok.NoArgsConstructor;
public
class
UserMetricConfigVO
extends
VersionItemVO
{
@ApiModelProperty
(
value
=
"该指标用户是否设置展现"
,
example
=
"true"
)
private
Boolean
set
;
@ApiModelProperty
(
value
=
"该指标展示优先级"
,
example
=
"1"
)
private
Integer
rank
;
}
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java
浏览文件 @
91332059
...
...
@@ -42,6 +42,7 @@ public class Constant {
*/
public
static
final
Integer
DEFAULT_CLUSTER_HEALTH_SCORE
=
90
;
public
static
final
String
DEFAULT_USER_NAME
=
"know-streaming-app"
;
public
static
final
int
INVALID_CODE
=
-
1
;
...
...
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java
浏览文件 @
91332059
...
...
@@ -52,6 +52,10 @@ public class MsgConstant {
/**************************************************** Partition ****************************************************/
public
static
String
getPartitionNoLeader
(
Long
clusterPhyId
,
String
topicName
)
{
return
String
.
format
(
"集群ID:[%d] Topic名称:[%s] 所有分区NoLeader"
,
clusterPhyId
,
topicName
);
}
public
static
String
getPartitionNotExist
(
Long
clusterPhyId
,
String
topicName
)
{
return
String
.
format
(
"集群ID:[%d] Topic名称:[%s] 存在非法的分区ID"
,
clusterPhyId
,
topicName
);
}
...
...
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxConnectorWrap.java
浏览文件 @
91332059
...
...
@@ -90,6 +90,8 @@ public class JmxConnectorWrap {
}
try
{
jmxConnector
.
close
();
jmxConnector
=
null
;
}
catch
(
IOException
e
)
{
LOGGER
.
warn
(
"close JmxConnector exception, physicalClusterId:{} brokerId:{} host:{} port:{}."
,
physicalClusterId
,
brokerId
,
host
,
port
,
e
);
}
...
...
@@ -105,6 +107,11 @@ public class JmxConnectorWrap {
acquire
();
MBeanServerConnection
mBeanServerConnection
=
jmxConnector
.
getMBeanServerConnection
();
return
mBeanServerConnection
.
getAttribute
(
name
,
attribute
);
}
catch
(
IOException
ioe
)
{
// 如果是因为连接断开,则进行重新连接,并抛出异常
reInitDueIOException
();
throw
ioe
;
}
finally
{
atomicInteger
.
incrementAndGet
();
}
...
...
@@ -120,6 +127,11 @@ public class JmxConnectorWrap {
acquire
();
MBeanServerConnection
mBeanServerConnection
=
jmxConnector
.
getMBeanServerConnection
();
return
mBeanServerConnection
.
getAttributes
(
name
,
attributes
);
}
catch
(
IOException
ioe
)
{
// 如果是因为连接断开,则进行重新连接,并抛出异常
reInitDueIOException
();
throw
ioe
;
}
finally
{
atomicInteger
.
incrementAndGet
();
}
...
...
@@ -131,6 +143,11 @@ public class JmxConnectorWrap {
acquire
();
MBeanServerConnection
mBeanServerConnection
=
jmxConnector
.
getMBeanServerConnection
();
return
mBeanServerConnection
.
queryNames
(
name
,
query
);
}
catch
(
IOException
ioe
)
{
// 如果是因为连接断开,则进行重新连接,并抛出异常
reInitDueIOException
();
throw
ioe
;
}
finally
{
atomicInteger
.
incrementAndGet
();
}
...
...
@@ -186,4 +203,26 @@ public class JmxConnectorWrap {
}
}
}
private
synchronized
void
reInitDueIOException
()
{
try
{
if
(
jmxConnector
==
null
)
{
return
;
}
// 检查是否正常
jmxConnector
.
getConnectionId
();
// 如果正常则直接返回
return
;
}
catch
(
Exception
e
)
{
// ignore
}
// 关闭旧的
this
.
close
();
// 重新创建
this
.
checkJmxConnectionAndInitIfNeed
();
}
}
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/AbstractZKWatcher.java
浏览文件 @
91332059
...
...
@@ -5,7 +5,7 @@ import com.didiglobal.logi.log.LogFactory;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy
;
import
com.xiaojukeji.know.streaming.km.common.exception.NotExistException
;
import
com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient
;
import
com.xiaojukeji.know.streaming.km.persistence.
zk
.KafkaZKDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
kafka.zookeeper.service
.KafkaZKDAO
;
import
kafka.zk.KafkaZkClient
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
...
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/AbstractZKHandler.java
浏览文件 @
91332059
...
...
@@ -7,7 +7,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
import
com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService
;
import
com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache
;
import
com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient
;
import
com.xiaojukeji.know.streaming.km.persistence.
zk
.KafkaZKDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
kafka.zookeeper.service
.KafkaZKDAO
;
public
abstract
class
AbstractZKHandler
{
...
...
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/BrokersNodeChangeHandler.java
浏览文件 @
91332059
...
...
@@ -9,7 +9,7 @@ import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum
import
com.xiaojukeji.know.streaming.km.common.utils.FutureUtil
;
import
com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService
;
import
com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService
;
import
com.xiaojukeji.know.streaming.km.persistence.
zk
.KafkaZKDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
kafka.zookeeper.service
.KafkaZKDAO
;
import
kafka.zk.BrokerIdsZNode
;
import
kafka.zookeeper.ZNodeChildChangeHandler
;
...
...
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ConfigNotificationNodeChangeHandler.java
浏览文件 @
91332059
...
...
@@ -8,11 +8,11 @@ import com.xiaojukeji.know.streaming.km.common.enums.KafkaConfigTypeEnum;
import
com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum
;
import
com.xiaojukeji.know.streaming.km.common.utils.FutureUtil
;
import
com.xiaojukeji.know.streaming.km.common.utils.Tuple
;
import
com.xiaojukeji.know.streaming.km.
common
.zookeeper.znode.config.ConfigChangeNotificationBaseData
;
import
com.xiaojukeji.know.streaming.km.
common
.zookeeper.znode.config.ConfigChangeNotificationDataV1
;
import
com.xiaojukeji.know.streaming.km.
common
.zookeeper.znode.config.ConfigChangeNotificationDataV2
;
import
com.xiaojukeji.know.streaming.km.
persistence.kafka
.zookeeper.znode.config.ConfigChangeNotificationBaseData
;
import
com.xiaojukeji.know.streaming.km.
persistence.kafka
.zookeeper.znode.config.ConfigChangeNotificationDataV1
;
import
com.xiaojukeji.know.streaming.km.
persistence.kafka
.zookeeper.znode.config.ConfigChangeNotificationDataV2
;
import
com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService
;
import
com.xiaojukeji.know.streaming.km.persistence.
zk
.KafkaZKDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
kafka.zookeeper.service
.KafkaZKDAO
;
import
kafka.zk.ConfigEntityChangeNotificationZNode
;
import
kafka.zookeeper.ZNodeChildChangeHandler
;
import
org.apache.zookeeper.data.Stat
;
...
...
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java
浏览文件 @
91332059
...
...
@@ -11,7 +11,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
import
com.xiaojukeji.know.streaming.km.common.utils.FutureUtil
;
import
com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService
;
import
com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService
;
import
com.xiaojukeji.know.streaming.km.persistence.
zk
.KafkaZKDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
kafka.zookeeper.service
.KafkaZKDAO
;
import
kafka.zk.ControllerZNode
;
import
kafka.zookeeper.ZNodeChangeHandler
;
...
...
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/TopicsNodeChangeHandler.java
浏览文件 @
91332059
...
...
@@ -9,7 +9,7 @@ import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum
import
com.xiaojukeji.know.streaming.km.common.utils.FutureUtil
;
import
com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService
;
import
com.xiaojukeji.know.streaming.km.core.service.topic.TopicService
;
import
com.xiaojukeji.know.streaming.km.persistence.
zk
.KafkaZKDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
kafka.zookeeper.service
.KafkaZKDAO
;
import
kafka.zk.TopicsZNode
;
import
kafka.zookeeper.ZNodeChildChangeHandler
;
...
...
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java
浏览文件 @
91332059
...
...
@@ -24,7 +24,6 @@ import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistExcept
import
com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap
;
import
com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil
;
import
com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils
;
import
com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.BrokerMetadata
;
import
com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService
;
import
com.xiaojukeji.know.streaming.km.core.service.topic.TopicService
;
import
com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService
;
...
...
@@ -32,8 +31,7 @@ import com.xiaojukeji.know.streaming.km.persistence.jmx.JmxDAO;
import
com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient
;
import
com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient
;
import
com.xiaojukeji.know.streaming.km.persistence.mysql.broker.BrokerDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO
;
import
kafka.zk.BrokerIdZNode
;
import
com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO
;
import
kafka.zk.BrokerIdsZNode
;
import
org.apache.kafka.clients.admin.*
;
import
org.apache.kafka.common.Node
;
...
...
@@ -310,9 +308,7 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
List
<
String
>
brokerIdList
=
kafkaZKDAO
.
getChildren
(
clusterPhy
.
getId
(),
BrokerIdsZNode
.
path
(),
false
);
for
(
String
brokerId:
brokerIdList
)
{
BrokerMetadata
metadata
=
kafkaZKDAO
.
getData
(
clusterPhy
.
getId
(),
BrokerIdZNode
.
path
(
Integer
.
valueOf
(
brokerId
)),
BrokerMetadata
.
class
);
BrokerMetadata
.
parseAndUpdateBrokerMetadata
(
metadata
);
brokerList
.
add
(
Broker
.
buildFrom
(
clusterPhy
.
getId
(),
Integer
.
valueOf
(
brokerId
),
metadata
));
brokerList
.
add
(
kafkaZKDAO
.
getBrokerMetadata
(
clusterPhy
.
getId
(),
Integer
.
valueOf
(
brokerId
)));
}
return
Result
.
buildSuc
(
brokerList
);
...
...
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterValidateServiceImpl.java
浏览文件 @
91332059
...
...
@@ -13,8 +13,8 @@ import com.xiaojukeji.know.streaming.km.common.enums.valid.ValidateKafkaAddressE
import
com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils
;
import
com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterValidateService
;
import
com.xiaojukeji.know.streaming.km.persistence.jmx.JmxDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
zk
.KafkaZKDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
zk
.impl.KafkaZKDAOImpl
;
import
com.xiaojukeji.know.streaming.km.persistence.
kafka.zookeeper.service
.KafkaZKDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
kafka.zookeeper.service
.impl.KafkaZKDAOImpl
;
import
kafka.server.KafkaConfig
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.kafka.clients.admin.*
;
...
...
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java
浏览文件 @
91332059
...
...
@@ -19,7 +19,7 @@ import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import
com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService
;
import
com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient
;
import
com.xiaojukeji.know.streaming.km.persistence.mysql.kafkacontroller.KafkaControllerDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
zk
.KafkaZKDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
kafka.zookeeper.service
.KafkaZKDAO
;
import
org.apache.kafka.clients.admin.*
;
import
org.apache.kafka.common.Node
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
...
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java
浏览文件 @
91332059
...
...
@@ -21,14 +21,14 @@ import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
import
com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException
;
import
com.xiaojukeji.know.streaming.km.common.utils.CommonUtils
;
import
com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils
;
import
com.xiaojukeji.know.streaming.km.
common
.zookeeper.znode.brokers.PartitionMap
;
import
com.xiaojukeji.know.streaming.km.
common
.zookeeper.znode.brokers.PartitionState
;
import
com.xiaojukeji.know.streaming.km.
persistence.kafka
.zookeeper.znode.brokers.PartitionMap
;
import
com.xiaojukeji.know.streaming.km.
persistence.kafka
.zookeeper.znode.brokers.PartitionState
;
import
com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService
;
import
com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService
;
import
com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient
;
import
com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaConsumerClient
;
import
com.xiaojukeji.know.streaming.km.persistence.mysql.partition.PartitionDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
zk
.KafkaZKDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
kafka.zookeeper.service
.KafkaZKDAO
;
import
kafka.zk.TopicPartitionStateZNode
;
import
kafka.zk.TopicPartitionsZNode
;
import
kafka.zk.TopicZNode
;
...
...
@@ -202,10 +202,22 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
@Override
public
Result
<
Map
<
TopicPartition
,
Long
>>
getPartitionOffsetFromKafka
(
Long
clusterPhyId
,
String
topicName
,
OffsetSpec
offsetSpec
,
Long
timestamp
)
{
Map
<
TopicPartition
,
OffsetSpec
>
topicPartitionOffsets
=
new
HashMap
<>();
this
.
listPartitionByTopic
(
clusterPhyId
,
topicName
)
.
stream
()
List
<
Partition
>
partitionList
=
this
.
listPartitionByTopic
(
clusterPhyId
,
topicName
);
if
(
partitionList
==
null
||
partitionList
.
isEmpty
())
{
// Topic不存在
return
Result
.
buildFromRSAndMsg
(
ResultStatus
.
NOT_EXIST
,
MsgConstant
.
getTopicNotExist
(
clusterPhyId
,
topicName
));
}
partitionList
.
stream
()
.
filter
(
item
->
!
item
.
getLeaderBrokerId
().
equals
(
KafkaConstant
.
NO_LEADER
))
.
forEach
(
elem
->
topicPartitionOffsets
.
put
(
new
TopicPartition
(
topicName
,
elem
.
getPartitionId
()),
offsetSpec
));
if
(
topicPartitionOffsets
.
isEmpty
())
{
// 所有分区no-leader
return
Result
.
buildFromRSAndMsg
(
ResultStatus
.
OPERATION_FAILED
,
MsgConstant
.
getPartitionNoLeader
(
clusterPhyId
,
topicName
));
}
try
{
return
(
Result
<
Map
<
TopicPartition
,
Long
>>)
doVCHandler
(
clusterPhyId
,
PARTITION_OFFSET_GET
,
new
PartitionOffsetParam
(
clusterPhyId
,
topicName
,
topicPartitionOffsets
,
timestamp
));
}
catch
(
VCHandlerNotExistException
e
)
{
...
...
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java
浏览文件 @
91332059
...
...
@@ -23,7 +23,7 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import
com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService
;
import
com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient
;
import
com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient
;
import
com.xiaojukeji.know.streaming.km.persistence.
zk
.KafkaZKDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
kafka.zookeeper.service
.KafkaZKDAO
;
import
kafka.controller.ReplicaAssignment
;
import
kafka.server.ConfigType
;
import
kafka.zk.AdminZkClient
;
...
...
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicConfigServiceImpl.java
浏览文件 @
91332059
...
...
@@ -30,7 +30,7 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import
com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService
;
import
com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient
;
import
com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient
;
import
com.xiaojukeji.know.streaming.km.persistence.
zk
.KafkaZKDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
kafka.zookeeper.service
.KafkaZKDAO
;
import
kafka.server.ConfigType
;
import
kafka.zk.AdminZkClient
;
import
kafka.zk.KafkaZkClient
;
...
...
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java
浏览文件 @
91332059
...
...
@@ -23,7 +23,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import
com.xiaojukeji.know.streaming.km.core.service.topic.TopicService
;
import
com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient
;
import
com.xiaojukeji.know.streaming.km.persistence.mysql.topic.TopicDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
zk
.KafkaZKDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
kafka.zookeeper.service
.KafkaZKDAO
;
import
kafka.zk.TopicsZNode
;
import
org.apache.kafka.clients.admin.*
;
import
org.apache.kafka.common.TopicPartitionInfo
;
...
...
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/package-info.java
0 → 100644
浏览文件 @
91332059
/**
* 读取Kafka在ZK中存储的数据的包
*/
package
com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper
;
\ No newline at end of file
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/
zk
/KafkaZKDAO.java
→
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/
kafka/zookeeper/service
/KafkaZKDAO.java
浏览文件 @
91332059
package
com.xiaojukeji.know.streaming.km.persistence.
zk
;
package
com.xiaojukeji.know.streaming.km.persistence.
kafka.zookeeper.service
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker
;
import
com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController
;
...
...
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/
zk
/impl/KafkaZKDAOImpl.java
→
km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/
kafka/zookeeper/service
/impl/KafkaZKDAOImpl.java
浏览文件 @
91332059
package
com.xiaojukeji.know.streaming.km.persistence.
zk
.impl
;
package
com.xiaojukeji.know.streaming.km.persistence.
kafka.zookeeper.service
.impl
;
import
com.alibaba.fastjson.JSON
;
import
com.didiglobal.logi.log.ILog
;
...
...
@@ -11,11 +11,11 @@ import com.xiaojukeji.know.streaming.km.common.enums.topic.TopicTypeEnum;
import
com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException
;
import
com.xiaojukeji.know.streaming.km.common.exception.NotExistException
;
import
com.xiaojukeji.know.streaming.km.common.utils.Tuple
;
import
com.xiaojukeji.know.streaming.km.
common
.zookeeper.znode.ControllerData
;
import
com.xiaojukeji.know.streaming.km.
common
.zookeeper.znode.brokers.BrokerMetadata
;
import
com.xiaojukeji.know.streaming.km.
common
.zookeeper.znode.brokers.PartitionMap
;
import
com.xiaojukeji.know.streaming.km.
persistence.kafka
.zookeeper.znode.ControllerData
;
import
com.xiaojukeji.know.streaming.km.
persistence.kafka
.zookeeper.znode.brokers.BrokerMetadata
;
import
com.xiaojukeji.know.streaming.km.
persistence.kafka
.zookeeper.znode.brokers.PartitionMap
;
import
com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient
;
import
com.xiaojukeji.know.streaming.km.persistence.
zk
.KafkaZKDAO
;
import
com.xiaojukeji.know.streaming.km.persistence.
kafka.zookeeper.service
.KafkaZKDAO
;
import
kafka.utils.Json
;
import
kafka.zk.*
;
import
kafka.zookeeper.AsyncResponse
;
...
...
@@ -46,14 +46,14 @@ public class KafkaZKDAOImpl implements KafkaZKDAO {
public
Broker
getBrokerMetadata
(
String
zkAddress
)
throws
KeeperException
.
NoNodeException
,
AdminOperateException
{
ZooKeeper
zooKeeper
=
null
;
try
{
zooKeeper
=
new
ZooKeeper
(
zkAddress
,
1
000
,
watchedEvent
->
logger
.
info
(
" receive event : "
+
watchedEvent
.
getType
().
name
()));
zooKeeper
=
new
ZooKeeper
(
zkAddress
,
3
000
,
watchedEvent
->
logger
.
info
(
" receive event : "
+
watchedEvent
.
getType
().
name
()));
List
<
String
>
brokerIdList
=
this
.
getChildren
(
zooKeeper
,
BrokerIdsZNode
.
path
());
if
(
brokerIdList
==
null
||
brokerIdList
.
isEmpty
())
{
return
null
;
}
BrokerMetadata
brokerMetadata
=
this
.
getData
(
zooKeeper
,
BrokerIdZNode
.
path
(
Integer
.
parseInt
(
brokerIdList
.
get
(
0
))),
false
,
BrokerMetadata
.
class
);
return
Broker
.
buildFrom
(
null
,
Integer
.
valueOf
(
brokerIdList
.
get
(
0
)),
brokerMetadata
);
return
this
.
convert2Broker
(
null
,
Integer
.
valueOf
(
brokerIdList
.
get
(
0
)),
brokerMetadata
);
}
catch
(
KeeperException
.
NoNodeException
nne
)
{
logger
.
warn
(
"method=getBrokerMetadata||zkAddress={}||errMsg=exception"
,
zkAddress
,
nne
);
throw
nne
;
...
...
@@ -79,7 +79,7 @@ public class KafkaZKDAOImpl implements KafkaZKDAO {
try
{
BrokerMetadata
metadata
=
this
.
getData
(
kafkaZkClient
.
currentZooKeeper
(),
BrokerIdZNode
.
path
(
brokerId
),
false
,
BrokerMetadata
.
class
);
BrokerMetadata
.
parseAndUpdateBrokerMetadata
(
metadata
);
return
Broker
.
buildFrom
(
clusterPhyId
,
brokerId
,
metadata
);
return
this
.
convert2Broker
(
clusterPhyId
,
brokerId
,
metadata
);
}
catch
(
KeeperException
ke
)
{
logger
.
error
(
"method=getBrokerMetadata||clusterPhyId={}||brokerId={}||errMsg=exception"
,
clusterPhyId
,
brokerId
,
ke
);
throw
ke
;
...
...
@@ -269,4 +269,18 @@ public class KafkaZKDAOImpl implements KafkaZKDAO {
byte
[]
bytes
=
zooKeeper
.
getData
(
path
,
addWatch
,
null
);
return
JSON
.
parseObject
(
bytes
,
clazz
);
}
private
Broker
convert2Broker
(
Long
clusterPhyId
,
Integer
brokerId
,
BrokerMetadata
brokerMetadata
)
{
Broker
metadata
=
new
Broker
();
metadata
.
setClusterPhyId
(
clusterPhyId
);
metadata
.
setBrokerId
(
brokerId
);
metadata
.
setHost
(
brokerMetadata
.
getHost
());
metadata
.
setPort
(
brokerMetadata
.
getPort
());
metadata
.
setJmxPort
(
brokerMetadata
.
getJmxPort
());
metadata
.
setStartTimestamp
(
brokerMetadata
.
getTimestamp
());
metadata
.
setRack
(
brokerMetadata
.
getRack
());
metadata
.
setStatus
(
1
);
metadata
.
setEndpointMap
(
brokerMetadata
.
getEndpointMap
());
return
metadata
;
}
}
km-
common/src/main/java/com/xiaojukeji/know/streaming/km/common
/zookeeper/znode/ControllerData.java
→
km-
persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka
/zookeeper/znode/ControllerData.java
浏览文件 @
91332059
package
com.xiaojukeji.know.streaming.km.
common
.zookeeper.znode
;
package
com.xiaojukeji.know.streaming.km.
persistence.kafka
.zookeeper.znode
;
import
lombok.AllArgsConstructor
;
import
lombok.Data
;
...
...
km-
common/src/main/java/com/xiaojukeji/know/streaming/km/common
/zookeeper/znode/brokers/BrokerMetadata.java
→
km-
persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka
/zookeeper/znode/brokers/BrokerMetadata.java
浏览文件 @
91332059
package
com.xiaojukeji.know.streaming.km.
common
.zookeeper.znode.brokers
;
package
com.xiaojukeji.know.streaming.km.
persistence.kafka
.zookeeper.znode.brokers
;
import
com.fasterxml.jackson.annotation.JsonIgnore
;
import
com.fasterxml.jackson.annotation.JsonIgnoreProperties
;
...
...
km-
common/src/main/java/com/xiaojukeji/know/streaming/km/common
/zookeeper/znode/brokers/PartitionMap.java
→
km-
persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka
/zookeeper/znode/brokers/PartitionMap.java
浏览文件 @
91332059
package
com.xiaojukeji.know.streaming.km.
common
.zookeeper.znode.brokers
;
package
com.xiaojukeji.know.streaming.km.
persistence.kafka
.zookeeper.znode.brokers
;
import
lombok.AllArgsConstructor
;
import
lombok.Data
;
...
...
km-
common/src/main/java/com/xiaojukeji/know/streaming/km/common
/zookeeper/znode/brokers/PartitionState.java
→
km-
persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka
/zookeeper/znode/brokers/PartitionState.java
浏览文件 @
91332059
package
com.xiaojukeji.know.streaming.km.
common
.zookeeper.znode.brokers
;
package
com.xiaojukeji.know.streaming.km.
persistence.kafka
.zookeeper.znode.brokers
;
import
com.fasterxml.jackson.annotation.JsonProperty
;
import
lombok.AllArgsConstructor
;
...
...
km-
common/src/main/java/com/xiaojukeji/know/streaming/km/common
/zookeeper/znode/brokers/TopicMetadata.java
→
km-
persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka
/zookeeper/znode/brokers/TopicMetadata.java
浏览文件 @
91332059
package
com.xiaojukeji.know.streaming.km.
common
.zookeeper.znode.brokers
;
package
com.xiaojukeji.know.streaming.km.
persistence.kafka
.zookeeper.znode.brokers
;
import
lombok.AllArgsConstructor
;
import
lombok.Data
;
...
...
km-
common/src/main/java/com/xiaojukeji/know/streaming/km/common
/zookeeper/znode/config/ConfigChangeNotificationBaseData.java
→
km-
persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka
/zookeeper/znode/config/ConfigChangeNotificationBaseData.java
浏览文件 @
91332059
package
com.xiaojukeji.know.streaming.km.
common
.zookeeper.znode.config
;
package
com.xiaojukeji.know.streaming.km.
persistence.kafka
.zookeeper.znode.config
;
import
lombok.AllArgsConstructor
;
import
lombok.Data
;
...
...
km-
common/src/main/java/com/xiaojukeji/know/streaming/km/common
/zookeeper/znode/config/ConfigChangeNotificationDataV1.java
→
km-
persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka
/zookeeper/znode/config/ConfigChangeNotificationDataV1.java
浏览文件 @
91332059
package
com.xiaojukeji.know.streaming.km.
common
.zookeeper.znode.config
;
package
com.xiaojukeji.know.streaming.km.
persistence.kafka
.zookeeper.znode.config
;
import
com.fasterxml.jackson.annotation.JsonProperty
;
import
lombok.AllArgsConstructor
;
...
...
km-
common/src/main/java/com/xiaojukeji/know/streaming/km/common
/zookeeper/znode/config/ConfigChangeNotificationDataV2.java
→
km-
persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka
/zookeeper/znode/config/ConfigChangeNotificationDataV2.java
浏览文件 @
91332059
package
com.xiaojukeji.know.streaming.km.
common
.zookeeper.znode.config
;
package
com.xiaojukeji.know.streaming.km.
persistence.kafka
.zookeeper.znode.config
;
import
com.fasterxml.jackson.annotation.JsonProperty
;
import
lombok.AllArgsConstructor
;
...
...
km-
common/src/main/java/com/xiaojukeji/know/streaming/km/common
/zookeeper/znode/config/ConfigNodeData.java
→
km-
persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka
/zookeeper/znode/config/ConfigNodeData.java
浏览文件 @
91332059
package
com.xiaojukeji.know.streaming.km.
common
.zookeeper.znode.config
;
package
com.xiaojukeji.know.streaming.km.
persistence.kafka
.zookeeper.znode.config
;
import
lombok.AllArgsConstructor
;
import
lombok.Data
;
...
...
km-task/pom.xml
浏览文件 @
91332059
...
...
@@ -43,7 +43,21 @@
<dependency>
<groupId>
io.github.zqrferrari
</groupId>
<artifactId>
logi-job-spring-boot-starter
</artifactId>
<exclusions>
<exclusion>
<artifactId>
oshi-core
</artifactId>
<groupId>
com.github.oshi
</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- 感谢 linG5821 的反馈,LogiJob中会依赖oshi-core,但是oshi-core使用的版本过低,因此windows系统上会存在问题,因此升级oshi-core版本 -->
<dependency>
<groupId>
com.github.oshi
</groupId>
<artifactId>
oshi-core
</artifactId>
<version>
5.6.1
</version>
</dependency>
<dependency>
<groupId>
io.github.zqrferrari
</groupId>
<artifactId>
logi-security-spring-boot-starter
</artifactId>
...
...
pom.xml
浏览文件 @
91332059
...
...
@@ -230,6 +230,19 @@
<groupId>
io.github.zqrferrari
</groupId>
<artifactId>
logi-job-spring-boot-starter
</artifactId>
<version>
1.0.23
</version>
<exclusions>
<exclusion>
<artifactId>
oshi-core
</artifactId>
<groupId>
com.github.oshi
</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- 感谢 linG5821 的反馈,LogiJob中会依赖oshi-core,但是oshi-core使用的版本过低,因此windows系统上会存在问题,因此升级oshi-core版本 -->
<dependency>
<groupId>
com.github.oshi
</groupId>
<artifactId>
oshi-core
</artifactId>
<version>
5.6.1
</version>
</dependency>
<dependency>
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录