Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
DiDi
kafka-manager
提交
73d7a0ec
K
kafka-manager
项目概览
DiDi
/
kafka-manager
10 个月 前同步成功
通知
58
Star
6372
Fork
1229
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
K
kafka-manager
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
73d7a0ec
编写于
1月 21, 2022
作者:
E
EricZeng
提交者:
GitHub
1月 21, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #450 from didi/dev
调整Task模块日志及Api请求统计日志的输出
上级
053d4dcb
08943593
变更
37
隐藏空白更改
内联
并排
Showing
37 changed file
with
135 addition
and
131 deletion
+135
-131
docs/dev_guide/周期任务说明文档.md
docs/dev_guide/周期任务说明文档.md
+39
-0
docs/user_guide/faq.md
docs/user_guide/faq.md
+4
-0
kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/LogConstant.java
...xiaojukeji/kafka/manager/common/constant/LogConstant.java
+0
-16
kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java
.../manager/service/cache/LogicalClusterMetadataManager.java
+3
-0
kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/AccountServiceImpl.java
...jukeji/kafka/manager/account/impl/AccountServiceImpl.java
+3
-0
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/AbstractScheduledTask.java
...i/kafka/manager/task/component/AbstractScheduledTask.java
+5
-5
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/CustomScheduled.java
...ojukeji/kafka/manager/task/component/CustomScheduled.java
+2
-0
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/Heartbeat.java
...om/xiaojukeji/kafka/manager/task/component/Heartbeat.java
+4
-2
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/CalKafkaTopicBill.java
...ji/kafka/manager/task/dispatch/biz/CalKafkaTopicBill.java
+2
-5
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/CalRegionCapacity.java
...ji/kafka/manager/task/dispatch/biz/CalRegionCapacity.java
+1
-1
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/CalTopicStatistics.java
...i/kafka/manager/task/dispatch/biz/CalTopicStatistics.java
+2
-3
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/FlushBrokerTable.java
...eji/kafka/manager/task/dispatch/biz/FlushBrokerTable.java
+2
-3
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/FlushExpiredTopic.java
...ji/kafka/manager/task/dispatch/biz/FlushExpiredTopic.java
+2
-3
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/SyncClusterTaskState.java
...kafka/manager/task/dispatch/biz/SyncClusterTaskState.java
+3
-3
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java
...ask/dispatch/metrics/collect/CollectAndPublishCGData.java
+3
-3
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCommunityTopicMetrics.java
...trics/collect/CollectAndPublishCommunityTopicMetrics.java
+1
-1
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishTopicThrottledMetrics.java
...trics/collect/CollectAndPublishTopicThrottledMetrics.java
+2
-1
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java
...a/manager/task/dispatch/metrics/delete/DeleteMetrics.java
+2
-3
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiAppTopicMetrics.java
...task/dispatch/metrics/store/StoreDiDiAppTopicMetrics.java
+3
-3
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiTopicRequestTimeMetrics.java
...patch/metrics/store/StoreDiDiTopicRequestTimeMetrics.java
+3
-3
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/op/AutoHandleTopicOrder.java
.../kafka/manager/task/dispatch/op/AutoHandleTopicOrder.java
+3
-3
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/op/AutomatedHandleOrder.java
.../kafka/manager/task/dispatch/op/AutomatedHandleOrder.java
+2
-3
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/op/FlushReassignment.java
...eji/kafka/manager/task/dispatch/op/FlushReassignment.java
+2
-3
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/op/SyncTopic2DB.java
...aojukeji/kafka/manager/task/dispatch/op/SyncTopic2DB.java
+2
-3
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/StoreCommunityTopicMetrics2DB.java
.../task/listener/sink/db/StoreCommunityTopicMetrics2DB.java
+1
-2
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/StoreTopicThrottledMetrics2DB.java
.../task/listener/sink/db/StoreTopicThrottledMetrics2DB.java
+1
-2
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/kafka/SinkCommunityTopicMetrics2Kafka.java
.../listener/sink/kafka/SinkCommunityTopicMetrics2Kafka.java
+1
-2
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/kafka/SinkConsumerMetrics2Kafka.java
...r/task/listener/sink/kafka/SinkConsumerMetrics2Kafka.java
+1
-2
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkCommunityTopicMetrics2Monitor.java
...tener/sink/monitor/SinkCommunityTopicMetrics2Monitor.java
+3
-3
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkConsumerMetrics2Monitor.java
...sk/listener/sink/monitor/SinkConsumerMetrics2Monitor.java
+1
-2
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/FlushTopicMetrics.java
...jukeji/kafka/manager/task/schedule/FlushTopicMetrics.java
+4
-2
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushBKConsumerGroupMetadata.java
.../task/schedule/metadata/FlushBKConsumerGroupMetadata.java
+4
-2
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushClusterMetadata.java
.../manager/task/schedule/metadata/FlushClusterMetadata.java
+3
-0
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushTopicProperties.java
.../manager/task/schedule/metadata/FlushTopicProperties.java
+4
-2
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java
.../task/schedule/metadata/FlushZKConsumerGroupMetadata.java
+4
-2
kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/metrics/MetricsRegistry.java
...xiaojukeji/kafka/manager/web/metrics/MetricsRegistry.java
+1
-2
kafka-manager-web/src/main/resources/logback-spring.xml
kafka-manager-web/src/main/resources/logback-spring.xml
+12
-41
未找到文件。
docs/dev_guide/周期任务说明文档.md
0 → 100644
浏览文件 @
73d7a0ec
---
![kafka-manager-logo](../assets/images/common/logo_name.png)
*
*一站式
`
Apache Kafka`集群指标监控与运维管控平台**
---
| 定时任务名称或方法名 | 所在类 | 详细说明 | cron | cron说明 | 线程数量 |
| -------------------------------------- | -------------------------------------- | ------------------------------------------ | --------------- | --------------------------------------- | -------- |
| calKafkaBill | CalKafkaTopicBill | 计算Kafka使用账单 | 0 0 1
* *
? | 每天凌晨1点执行一次 | 1 |
| calRegionCapacity | CalRegionCapacity | 计算Region容量 | 0 0 0/12
* *
? | 每隔12小时执行一次,在0分钟0秒时触发 | 1 |
| calTopicStatistics | CalTopicStatistics | 定时计算Topic统计数据 | 0 0 0/4
* *
? | 每隔4小时执行一次,在0分钟0秒时触发 | 5 |
| flushBrokerTable | FlushBrokerTable | 定时刷新BrokerTable数据 | 0 0 0/1
* *
? | 每隔1小时执行一次,在0分钟0秒时触发 | 1 |
| flushExpiredTopic | FlushExpiredTopic | 定期更新过期Topic | 0 0 0/5
* *
? | 每隔5小时执行一次,在0分钟0秒时触发 | 1 |
| syncClusterTaskState | SyncClusterTaskState | 同步更新集群任务状态 | 0 0/1
* *
*
? | 每隔1分钟执行一次,在每分钟的0秒时触发 | 1 |
| newCollectAndPublishCGData | CollectAndPublishCGData | 收集并发布消费者指标数据 | 30 0/1
* *
*
? | 每隔1分钟执行一次,在每分钟的30秒时触发 | 10 |
| collectAndPublishCommunityTopicMetrics | CollectAndPublishCommunityTopicMetrics | Topic社区指标收集 | 31 0/1
* *
*
? | 每隔1分钟执行一次,在每分钟的30秒时触发 | 5 |
| collectAndPublishTopicThrottledMetrics | CollectAndPublishTopicThrottledMetrics | 收集和发布Topic限流信息 | 11 0/1
* *
*
? | 每隔1分钟执行一次,在每分钟的11秒时触发 | 5 |
| deleteMetrics | DeleteMetrics | 定期删除Metrics信息 | 0 0/2
* *
*
? | 每隔2分钟执行一次,在每分钟的0秒时触发 | 1 |
| storeDiDiAppTopicMetrics | StoreDiDiAppTopicMetrics | JMX中获取appId维度的流量信息存DB | 41 0/1
* *
*
? | 每隔1分钟执行一次,在每分钟的41秒时触发 | 5 |
| storeDiDiTopicRequestTimeMetrics | StoreDiDiTopicRequestTimeMetrics | JMX中获取的TopicRequestTimeMetrics信息存DB | 51 0/1
* *
*
? | 每隔1分钟执行一次,在每分钟的51秒时触发 | 5 |
| autoHandleTopicOrder | AutoHandleTopicOrder | 定时自动处理Topic相关工单 | 0 0/1
* *
*
? | 每隔1分钟执行一次,在每分钟的0秒时触发 | 1 |
| automatedHandleOrder | AutomatedHandleOrder | 工单自动化审批 | 0 0/1
* *
*
? | 每隔1分钟执行一次,在每分钟的0秒时触发 | 1 |
| flushReassignment | FlushReassignment | 定时处理分区迁移任务 | 0 0/1
* *
*
? | 每隔1分钟执行一次,在每分钟的0秒时触发 | 1 |
| syncTopic2DB | SyncTopic2DB | 定期将未落盘的Topic刷新到DB中 | 0 0/2
* *
*
? | 每隔2分钟执行一次,在每分钟的0秒时触发 | 1 |
| sinkCommunityTopicMetrics2Monitor | SinkCommunityTopicMetrics2Monitor | 定时上报Topic监控指标 | 1 0/1
* *
*
? | 每隔1分钟执行一次,在每分钟的1秒时触发 | 5 |
| flush方法 | LogicalClusterMetadataManager | 定时刷新逻辑集群元数据到缓存中 | 0/30
* *
* *
? | 每隔30秒执行一次 | 1 |
| flush方法 | AccountServiceImpl | 定时刷新account信息到缓存中 | 0/5
* *
* *
? | 每隔5秒执行一次 | 1 |
| ipFlush方法 | HeartBeat | 定时获取管控平台所在机器IP等信息到DB | 0/10
* *
* *
? | 每隔10秒执行一次 | 1 |
| flushTopicMetrics方法 | FlushTopicMetrics | 定时刷新topic指标到缓存中 | 5 0/1
* *
*
? | 每隔1分钟执行一次,在每分钟的5秒时触发 | 1 |
| schedule方法 | FlushBKConsumerGroupMetadata | 定时刷新broker上消费组信息到缓存中 | 15 0/1
* *
*
? | 每隔1分钟执行一次,在每分钟的15秒时触发 | 1 |
| flush方法 | FlushClusterMetadata | 定时刷新物理集群元信息到缓存中 | 0/30
* *
* *
? | 每隔30秒执行一次 | 1 |
| flush方法 | FlushTopicProperties | 定时刷新物理集群配置到缓存中 | 25 0/1
* *
*
? | 每隔1分钟执行一次,在每分钟的25秒时触发 | 1 |
| schedule方法 | FlushZKConsumerGroupMetadata | 定时刷新zk上的消费组信息到缓存中 | 35 0/1
* *
*
? | 每隔1分钟执行一次,在每分钟的35秒时触发 | 1 |
docs/user_guide/faq.md
浏览文件 @
73d7a0ec
...
...
@@ -30,6 +30,7 @@
-
18、如何在不登录的情况下,调用一些需要登录的接口?
-
19、为什么无法看到连接信息、耗时信息等指标?
-
20、AppID鉴权、生产消费配额不起作用
-
21、如何查看周期任务说明文档
---
...
...
@@ -213,3 +214,6 @@ AppID鉴权、生产消费配额依赖于滴滴kafka-gateway,通过gateway进
具体见:
[
滴滴Logi-KafkaManager开源版和商业版特性对比
](
../开源版与商业版特性对比.md
)
### 20、如何查看周期任务说明文档
具体见:
[
周期任务说明文档
](
../dev_guide/周期任务说明文档.md
)
\ No newline at end of file
kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/LogConstant.java
已删除
100644 → 0
浏览文件 @
053d4dcb
package
com.xiaojukeji.kafka.manager.common.constant
;
/**
* @author zengqiao
* @date 20/8/10
*/
public
class
LogConstant
{
public
static
final
String
COLLECTOR_METRICS_LOGGER
=
"COLLECTOR_METRICS_LOGGER"
;
public
static
final
String
API_METRICS_LOGGER
=
"API_METRICS_LOGGER"
;
public
static
final
String
SCHEDULED_TASK_LOGGER
=
"SCHEDULED_TASK_LOGGER"
;
private
LogConstant
()
{
}
}
\ No newline at end of file
kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java
浏览文件 @
73d7a0ec
...
...
@@ -156,6 +156,9 @@ public class LogicalClusterMetadataManager {
return
logicalClusterDO
.
getClusterId
();
}
/**
* 定时刷新逻辑集群元数据到缓存中
*/
@Scheduled
(
cron
=
"0/30 * * * * ?"
)
public
void
flush
()
{
List
<
LogicalClusterDO
>
logicalClusterDOList
=
logicalClusterService
.
listAll
();
...
...
kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/AccountServiceImpl.java
浏览文件 @
73d7a0ec
...
...
@@ -275,6 +275,9 @@ public class AccountServiceImpl implements AccountService {
return
enterpriseStaffService
.
searchEnterpriseStaffByKeyWord
(
prefix
);
}
/**
* 定时刷新account信息到缓存中
*/
@Scheduled
(
cron
=
"0/5 * * * * ?"
)
public
void
flush
()
{
try
{
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/AbstractScheduledTask.java
浏览文件 @
73d7a0ec
...
...
@@ -72,16 +72,15 @@ public abstract class AbstractScheduledTask<E extends Comparable> implements Sch
LOGGER
.
info
(
"init custom scheduled finished, scheduledName:{} scheduledCron:{}."
,
scheduledName
,
scheduledCron
);
}
private
boolean
checkAndModifyCron
(
String
scheduledName
,
String
scheduledCron
,
boolean
existIfIllegal
)
{
private
boolean
checkAndModifyCron
(
String
scheduledName
,
String
scheduledCron
,
boolean
isInit
)
{
if
(
scheduledCron
.
matches
(
ScheduledTaskConstant
.
CRON_REG_EX
))
{
this
.
scheduledCron
=
scheduledCron
;
LOGGER
.
info
(
"modify scheduledCron success, scheduledName:{} scheduledCron:{}."
,
scheduledName
,
scheduledCron
);
LOGGER
.
info
(
"{} scheduledCron success, scheduledName:{} scheduledCron:{}."
,
isInit
?
"init"
:
"modify"
,
scheduledName
,
scheduledCron
);
return
true
;
}
LOGGER
.
error
(
"modify scheduledCron failed, format invalid, scheduledName:{} scheduledCron:{}."
,
scheduledName
,
scheduledCron
);
if
(
existIfIllegal
)
{
if
(
isInit
)
{
throw
new
UnsupportedOperationException
(
String
.
format
(
"scheduledName:%s scheduledCron:%s format invalid"
,
scheduledName
,
scheduledCron
));
}
return
false
;
...
...
@@ -128,7 +127,8 @@ public abstract class AbstractScheduledTask<E extends Comparable> implements Sch
LOGGER
.
info
(
"customScheduled task finished, empty selected task, scheduledName:{}."
,
scheduledName
);
return
;
}
LOGGER
.
info
(
"customScheduled task running, selected tasks, IP:{} selectedTasks:{}."
,
LOGGER
.
debug
(
"customScheduled task running, selected tasks, IP:{} selectedTasks:{}."
,
NetUtils
.
localIp
(),
JsonUtils
.
toJSONString
(
selectTasks
)
);
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/CustomScheduled.java
浏览文件 @
73d7a0ec
...
...
@@ -18,4 +18,6 @@ public @interface CustomScheduled {
String
cron
();
int
threadNum
()
default
1
;
String
description
()
default
""
;
}
\ No newline at end of file
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/Heartbeat.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.task.component
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.utils.NetUtils
;
import
com.xiaojukeji.kafka.manager.dao.HeartbeatDao
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.HeartbeatDO
;
...
...
@@ -18,11 +17,14 @@ import java.util.Date;
*/
@Component
public
class
Heartbeat
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
Heartbeat
.
class
);
@Autowired
private
HeartbeatDao
heartbeatDao
;
/**
* 定时获取管控平台所在机器IP等信息到DB
*/
@Scheduled
(
cron
=
ScheduledTaskConstant
.
HEARTBEAT_CRON
)
public
void
ipFlush
()
{
try
{
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/CalKafkaTopicBill.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.task.dispatch.biz
;
import
com.xiaojukeji.kafka.manager.common.constant.Constant
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.task.config.TopicBillConfig
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO
;
import
com.xiaojukeji.kafka.manager.common.utils.DateUtils
;
...
...
@@ -24,13 +22,12 @@ import org.springframework.beans.factory.annotation.Autowired;
import
java.util.*
;
/**
* 计算账单
* @author zengqiao
* @date 20/5/11
*/
@CustomScheduled
(
name
=
"calKafkaBill"
,
cron
=
"0 0 1 * *
*"
,
threadNum
=
1
)
@CustomScheduled
(
name
=
"calKafkaBill"
,
cron
=
"0 0 1 * *
?"
,
threadNum
=
1
,
description
=
"计算账单"
)
public
class
CalKafkaTopicBill
extends
AbstractScheduledTask
<
ClusterDO
>
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
CalKafkaTopicBill
.
class
);
@Autowired
private
AppService
appService
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/CalRegionCapacity.java
浏览文件 @
73d7a0ec
...
...
@@ -19,7 +19,7 @@ import java.util.*;
* @author zengqiao
* @date 20/6/30
*/
@CustomScheduled
(
name
=
"calRegionCapacity"
,
cron
=
"0 0 0/12 * * ?"
,
threadNum
=
1
)
@CustomScheduled
(
name
=
"calRegionCapacity"
,
cron
=
"0 0 0/12 * * ?"
,
threadNum
=
1
,
description
=
"计算Region容量"
)
public
class
CalRegionCapacity
extends
AbstractScheduledTask
<
RegionDO
>
{
@Autowired
private
RegionService
regionService
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/CalTopicStatistics.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.task.dispatch.biz
;
import
com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.TopicStatisticsDO
;
import
com.xiaojukeji.kafka.manager.common.utils.DateUtils
;
...
...
@@ -28,9 +27,9 @@ import java.util.Map;
* @author zengqiao
* @date 20/3/29
*/
@CustomScheduled
(
name
=
"calTopicStatistics"
,
cron
=
"0 0 0/4 * * ?"
,
threadNum
=
5
)
@CustomScheduled
(
name
=
"calTopicStatistics"
,
cron
=
"0 0 0/4 * * ?"
,
threadNum
=
5
,
description
=
"定时计算Topic统计数据"
)
public
class
CalTopicStatistics
extends
AbstractScheduledTask
<
ClusterDO
>
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
CalTopicStatistics
.
class
);
@Autowired
private
ClusterService
clusterService
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/FlushBrokerTable.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.task.dispatch.biz
;
import
com.xiaojukeji.kafka.manager.common.bizenum.DBStatusEnum
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
import
com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.BrokerDO
;
...
...
@@ -25,9 +24,9 @@ import java.util.*;
* @author zengqiao
* @date 20/6/2
*/
@CustomScheduled
(
name
=
"flushBrokerTable"
,
cron
=
"0 0 0/1 * * ?"
,
threadNum
=
1
)
@CustomScheduled
(
name
=
"flushBrokerTable"
,
cron
=
"0 0 0/1 * * ?"
,
threadNum
=
1
,
description
=
"定时刷新BrokerTable数据"
)
public
class
FlushBrokerTable
extends
AbstractScheduledTask
<
ClusterDO
>
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
FlushBrokerTable
.
class
);
@Autowired
private
BrokerService
brokerService
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/FlushExpiredTopic.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.task.dispatch.biz
;
import
com.xiaojukeji.kafka.manager.common.constant.Constant
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.TopicExpiredDO
;
...
...
@@ -30,9 +29,9 @@ import java.util.Map;
* @author zengqiao
* @date 20/4/1
*/
@CustomScheduled
(
name
=
"flushExpiredTopic"
,
cron
=
"0 0 0/5 * * ?"
,
threadNum
=
1
)
@CustomScheduled
(
name
=
"flushExpiredTopic"
,
cron
=
"0 0 0/5 * * ?"
,
threadNum
=
1
,
description
=
"定期更新过期Topic"
)
public
class
FlushExpiredTopic
extends
AbstractScheduledTask
<
ClusterDO
>
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
FlushExpiredTopic
.
class
);
@Autowired
private
TopicExpiredDao
topicExpiredDao
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/SyncClusterTaskState.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.task.dispatch.biz
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterTaskDO
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
import
com.xiaojukeji.kafka.manager.kcm.ClusterTaskService
;
...
...
@@ -17,13 +16,14 @@ import java.util.Arrays;
import
java.util.List
;
/**
* 同步更新集群任务状态
* @author zengqiao
* @date 20/9/7
*/
@CustomScheduled
(
name
=
"syncClusterTaskState"
,
cron
=
"0 0/1 * * * ?"
,
threadNum
=
1
)
@CustomScheduled
(
name
=
"syncClusterTaskState"
,
cron
=
"0 0/1 * * * ?"
,
threadNum
=
1
,
description
=
"同步更新集群任务状态"
)
@ConditionalOnProperty
(
prefix
=
"kcm"
,
name
=
"enabled"
,
havingValue
=
"true"
,
matchIfMissing
=
true
)
public
class
SyncClusterTaskState
extends
AbstractScheduledTask
<
EmptyEntry
>
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
SyncClusterTaskState
.
class
);
@Autowired
private
ClusterTaskService
clusterTaskService
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.task.dispatch.metrics.collect
;
import
com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroup
;
import
com.xiaojukeji.kafka.manager.common.entity.metrics.ConsumerMetrics
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO
;
...
...
@@ -28,12 +27,13 @@ import java.util.concurrent.Callable;
import
java.util.concurrent.FutureTask
;
/**
* 收集并发布消费者指标数据
* @author zengqiao
* @date 20/9/14
*/
@CustomScheduled
(
name
=
"newCollectAndPublishCGData"
,
cron
=
"30 0/1 * * *
*"
,
threadNum
=
10
)
@CustomScheduled
(
name
=
"newCollectAndPublishCGData"
,
cron
=
"30 0/1 * * *
?"
,
threadNum
=
10
,
description
=
"收集并发布消费者指标数据"
)
public
class
CollectAndPublishCGData
extends
AbstractScheduledTask
<
ClusterDO
>
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
CollectAndPublishCGData
.
class
);
@Autowired
private
TopicService
topicService
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCommunityTopicMetrics.java
浏览文件 @
73d7a0ec
...
...
@@ -20,7 +20,7 @@ import java.util.*;
* @author zengqiao
* @date 20/7/21
*/
@CustomScheduled
(
name
=
"collectAndPublishCommunityTopicMetrics"
,
cron
=
"31 0/1 * * * ?"
,
threadNum
=
5
)
@CustomScheduled
(
name
=
"collectAndPublishCommunityTopicMetrics"
,
cron
=
"31 0/1 * * * ?"
,
threadNum
=
5
,
description
=
"Topic社区指标收集"
)
public
class
CollectAndPublishCommunityTopicMetrics
extends
AbstractScheduledTask
<
ClusterDO
>
{
@Autowired
private
JmxService
jmxService
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishTopicThrottledMetrics.java
浏览文件 @
73d7a0ec
...
...
@@ -16,10 +16,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import
java.util.*
;
/**
* 收集和发布Topic限流信息
* @author zengqiao
* @date 2019-05-10
*/
@CustomScheduled
(
name
=
"collectAndPublishTopicThrottledMetrics"
,
cron
=
"11 0/1 * * * ?"
,
threadNum
=
5
)
@CustomScheduled
(
name
=
"collectAndPublishTopicThrottledMetrics"
,
cron
=
"11 0/1 * * * ?"
,
threadNum
=
5
,
description
=
"收集和发布Topic限流信息"
)
public
class
CollectAndPublishTopicThrottledMetrics
extends
AbstractScheduledTask
<
ClusterDO
>
{
@Autowired
private
ClusterService
clusterService
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.task.dispatch.metrics.delete
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.utils.BackoffUtils
;
import
com.xiaojukeji.kafka.manager.dao.*
;
import
com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask
;
...
...
@@ -20,9 +19,9 @@ import java.util.List;
* @author zengqiao
* @date 20/1/8
*/
@CustomScheduled
(
name
=
"deleteMetrics"
,
cron
=
"0 0/2 * * * ?"
,
threadNum
=
1
)
@CustomScheduled
(
name
=
"deleteMetrics"
,
cron
=
"0 0/2 * * * ?"
,
threadNum
=
1
,
description
=
"定期删除Metrics信息"
)
public
class
DeleteMetrics
extends
AbstractScheduledTask
<
EmptyEntry
>
{
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
DeleteMetrics
.
class
);
@Autowired
private
TopicMetricsDao
topicMetricsDao
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiAppTopicMetrics.java
浏览文件 @
73d7a0ec
...
...
@@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store;
import
com.xiaojukeji.kafka.manager.common.constant.Constant
;
import
com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
import
com.xiaojukeji.kafka.manager.dao.TopicAppMetricsDao
;
...
...
@@ -21,13 +20,14 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import
java.util.*
;
/**
* JMX中获取appId维度的流量信息存DB
* @author zengqiao
* @date 20/7/21
*/
@CustomScheduled
(
name
=
"storeDiDiAppTopicMetrics"
,
cron
=
"41 0/1 * * * ?"
,
threadNum
=
5
)
@CustomScheduled
(
name
=
"storeDiDiAppTopicMetrics"
,
cron
=
"41 0/1 * * * ?"
,
threadNum
=
5
,
description
=
"JMX中获取appId维度的流量信息存DB"
)
@ConditionalOnProperty
(
prefix
=
"custom.store-metrics-task.didi"
,
name
=
"app-topic-metrics-enabled"
,
havingValue
=
"true"
,
matchIfMissing
=
true
)
public
class
StoreDiDiAppTopicMetrics
extends
AbstractScheduledTask
<
ClusterDO
>
{
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
StoreDiDiAppTopicMetrics
.
class
);
@Autowired
private
JmxService
jmxService
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiTopicRequestTimeMetrics.java
浏览文件 @
73d7a0ec
...
...
@@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store;
import
com.xiaojukeji.kafka.manager.common.constant.Constant
;
import
com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
import
com.xiaojukeji.kafka.manager.dao.TopicRequestMetricsDao
;
...
...
@@ -21,13 +20,14 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import
java.util.*
;
/**
* JMX中获取的TopicRequestTimeMetrics信息存DB
* @author zengqiao
* @date 20/7/21
*/
@CustomScheduled
(
name
=
"storeDiDiTopicRequestTimeMetrics"
,
cron
=
"51 0/1 * * * ?"
,
threadNum
=
5
)
@CustomScheduled
(
name
=
"storeDiDiTopicRequestTimeMetrics"
,
cron
=
"51 0/1 * * * ?"
,
threadNum
=
5
,
description
=
"JMX中获取的TopicRequestTimeMetrics信息存DB"
)
@ConditionalOnProperty
(
prefix
=
"custom.store-metrics-task.didi"
,
name
=
"topic-request-time-metrics-enabled"
,
havingValue
=
"true"
,
matchIfMissing
=
true
)
public
class
StoreDiDiTopicRequestTimeMetrics
extends
AbstractScheduledTask
<
ClusterDO
>
{
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
StoreDiDiTopicRequestTimeMetrics
.
class
);
@Autowired
private
JmxService
jmxService
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/op/AutoHandleTopicOrder.java
浏览文件 @
73d7a0ec
...
...
@@ -4,7 +4,6 @@ import com.xiaojukeji.kafka.manager.bpm.OrderService;
import
com.xiaojukeji.kafka.manager.bpm.common.OrderStatusEnum
;
import
com.xiaojukeji.kafka.manager.bpm.common.OrderTypeEnum
;
import
com.xiaojukeji.kafka.manager.common.constant.Constant
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.constant.SystemCodeConstant
;
import
com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.ResultStatus
;
...
...
@@ -31,14 +30,15 @@ import java.util.List;
import
java.util.Properties
;
/**
* 定时自动处理Topic相关工单
* @author zengqiao
* @date 20/7/28
*/
@Component
@CustomScheduled
(
name
=
"autoHandleTopicOrder"
,
cron
=
"0 0/1 * * * ?"
,
threadNum
=
1
)
@CustomScheduled
(
name
=
"autoHandleTopicOrder"
,
cron
=
"0 0/1 * * * ?"
,
threadNum
=
1
,
description
=
"定时自动处理Topic相关工单"
)
@ConditionalOnProperty
(
prefix
=
"task.op.order-auto-exec"
,
name
=
"topic-enabled"
,
havingValue
=
"true"
,
matchIfMissing
=
false
)
public
class
AutoHandleTopicOrder
extends
AbstractScheduledTask
<
EmptyEntry
>
{
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
AutoHandleTopicOrder
.
class
);
@Autowired
private
ConfigService
configService
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/op/AutomatedHandleOrder.java
浏览文件 @
73d7a0ec
...
...
@@ -4,7 +4,6 @@ import com.xiaojukeji.kafka.manager.bpm.OrderService;
import
com.xiaojukeji.kafka.manager.bpm.common.OrderStatusEnum
;
import
com.xiaojukeji.kafka.manager.bpm.common.OrderTypeEnum
;
import
com.xiaojukeji.kafka.manager.common.constant.Constant
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.ResultStatus
;
import
com.xiaojukeji.kafka.manager.bpm.common.handle.OrderHandleBaseDTO
;
import
com.xiaojukeji.kafka.manager.common.utils.DateUtils
;
...
...
@@ -31,10 +30,10 @@ import java.util.*;
* @date 2020/6/12
*/
@Component
@CustomScheduled
(
name
=
"automatedHandleOrder"
,
cron
=
"0 0/1 * * * ?"
,
threadNum
=
1
)
@CustomScheduled
(
name
=
"automatedHandleOrder"
,
cron
=
"0 0/1 * * * ?"
,
threadNum
=
1
,
description
=
"工单自动化审批"
)
@ConditionalOnProperty
(
prefix
=
"task.op.order-auto-exec"
,
name
=
"app-enabled"
,
havingValue
=
"true"
,
matchIfMissing
=
false
)
public
class
AutomatedHandleOrder
extends
AbstractScheduledTask
<
EmptyEntry
>
{
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
AutomatedHandleOrder
.
class
);
@Autowired
private
OrderService
orderService
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/op/FlushReassignment.java
浏览文件 @
73d7a0ec
...
...
@@ -3,7 +3,6 @@ package com.xiaojukeji.kafka.manager.task.dispatch.op;
import
com.xiaojukeji.kafka.manager.common.bizenum.TaskStatusEnum
;
import
com.xiaojukeji.kafka.manager.common.bizenum.TaskStatusReassignEnum
;
import
com.xiaojukeji.kafka.manager.common.constant.Constant
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.ResultStatus
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
...
...
@@ -34,9 +33,9 @@ import java.util.*;
* @date 19/12/29
*/
@Component
@CustomScheduled
(
name
=
"flushReassignment"
,
cron
=
"0 0/1 * * * ?"
,
threadNum
=
1
)
@CustomScheduled
(
name
=
"flushReassignment"
,
cron
=
"0 0/1 * * * ?"
,
threadNum
=
1
,
description
=
"定时处理分区迁移任务"
)
public
class
FlushReassignment
extends
AbstractScheduledTask
<
EmptyEntry
>
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
FlushReassignment
.
class
);
@Autowired
private
ClusterService
clusterService
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/op/SyncTopic2DB.java
浏览文件 @
73d7a0ec
...
...
@@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.task.dispatch.op;
import
com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum
;
import
com.xiaojukeji.kafka.manager.common.constant.KafkaConstant
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO
;
...
...
@@ -36,10 +35,10 @@ import java.util.stream.Collectors;
* @date 19/12/29
*/
@Component
@CustomScheduled
(
name
=
"syncTopic2DB"
,
cron
=
"0 0/2 * * * ?"
,
threadNum
=
1
)
@CustomScheduled
(
name
=
"syncTopic2DB"
,
cron
=
"0 0/2 * * * ?"
,
threadNum
=
1
,
description
=
"定期将未落盘的Topic刷新到DB中"
)
@ConditionalOnProperty
(
prefix
=
"task.op"
,
name
=
"sync-topic-enabled"
,
havingValue
=
"true"
,
matchIfMissing
=
false
)
public
class
SyncTopic2DB
extends
AbstractScheduledTask
<
EmptyEntry
>
{
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
SyncTopic2DB
.
class
);
private
static
final
String
SYNC_TOPIC_2_DB_CONFIG_KEY
=
"SYNC_TOPIC_2_DB_CONFIG_KEY"
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/StoreCommunityTopicMetrics2DB.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.task.listener.sink.db
;
import
com.xiaojukeji.kafka.manager.common.constant.Constant
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.TopicMetricsDO
;
import
com.xiaojukeji.kafka.manager.common.events.TopicMetricsCollectedEvent
;
...
...
@@ -25,7 +24,7 @@ import java.util.List;
@Component
(
"storeCommunityTopicMetrics2DB"
)
@ConditionalOnProperty
(
prefix
=
"custom.store-metrics-task.community"
,
name
=
"topic-metrics-enabled"
,
havingValue
=
"true"
,
matchIfMissing
=
true
)
public
class
StoreCommunityTopicMetrics2DB
implements
ApplicationListener
<
TopicMetricsCollectedEvent
>
{
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
StoreCommunityTopicMetrics2DB
.
class
);
@Autowired
private
TopicMetricsDao
topicMetricsDao
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/StoreTopicThrottledMetrics2DB.java
浏览文件 @
73d7a0ec
...
...
@@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.task.listener.sink.db;
import
com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum
;
import
com.xiaojukeji.kafka.manager.common.constant.Constant
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.metrics.TopicThrottledMetrics
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.TopicThrottledMetricsDO
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
...
...
@@ -24,7 +23,7 @@ import java.util.*;
@Component
(
"storeTopicThrottledMetrics2DB"
)
@ConditionalOnProperty
(
prefix
=
"custom.store-metrics-task.didi"
,
name
=
"topic-throttled-metrics-enabled"
,
havingValue
=
"true"
,
matchIfMissing
=
true
)
public
class
StoreTopicThrottledMetrics2DB
implements
ApplicationListener
<
TopicThrottledMetricsCollectedEvent
>
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
StoreTopicThrottledMetrics2DB
.
class
);
@Autowired
private
ThrottleService
throttleService
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/kafka/SinkCommunityTopicMetrics2Kafka.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.task.listener.sink.kafka
;
import
com.xiaojukeji.kafka.manager.common.constant.ConfigConstant
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.ao.config.TopicNameConfig
;
import
com.xiaojukeji.kafka.manager.common.entity.ao.remote.KafkaTopicMetrics
;
import
com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics
;
...
...
@@ -27,7 +26,7 @@ import java.util.List;
*/
@Component
(
"sinkCommunityTopicMetrics2Kafka"
)
public
class
SinkCommunityTopicMetrics2Kafka
implements
ApplicationListener
<
TopicMetricsCollectedEvent
>
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
SinkCommunityTopicMetrics2Kafka
.
class
);
@Autowired
private
ConfigService
configService
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/kafka/SinkConsumerMetrics2Kafka.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.task.listener.sink.kafka
;
import
com.xiaojukeji.kafka.manager.common.constant.ConfigConstant
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.ao.config.TopicNameConfig
;
import
com.xiaojukeji.kafka.manager.common.entity.ao.remote.KafkaConsumerMetrics
;
import
com.xiaojukeji.kafka.manager.common.entity.ao.remote.KafkaConsumerMetricsElem
;
...
...
@@ -27,7 +26,7 @@ import java.util.Map;
*/
@Component
(
"produceConsumerMetrics"
)
public
class
SinkConsumerMetrics2Kafka
implements
ApplicationListener
<
ConsumerMetricsCollectedEvent
>
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
SinkConsumerMetrics2Kafka
.
class
);
@Autowired
private
ConfigService
configService
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkCommunityTopicMetrics2Monitor.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.task.listener.sink.monitor
;
import
com.xiaojukeji.kafka.manager.monitor.common.entry.bizenum.MonitorMetricNameEnum
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.monitor.common.MonitorSinkConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics
;
import
com.xiaojukeji.kafka.manager.monitor.common.entry.sink.MonitorTopicSinkTag
;
...
...
@@ -26,13 +25,14 @@ import java.util.Arrays;
import
java.util.List
;
/**
* 定时上报Topic监控指标
* @author zengqiao
* @date 20/8/10
*/
@ConditionalOnProperty
(
prefix
=
"monitor"
,
name
=
"enabled"
,
havingValue
=
"true"
,
matchIfMissing
=
true
)
@CustomScheduled
(
name
=
"sinkCommunityTopicMetrics2Monitor"
,
cron
=
"1 0/1 * * * ?"
,
threadNum
=
5
)
@CustomScheduled
(
name
=
"sinkCommunityTopicMetrics2Monitor"
,
cron
=
"1 0/1 * * * ?"
,
threadNum
=
5
,
description
=
"定时上报Topic监控指标"
)
public
class
SinkCommunityTopicMetrics2Monitor
extends
AbstractScheduledTask
<
ClusterDO
>
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
SinkCommunityTopicMetrics2Monitor
.
class
);
@Autowired
private
AbstractMonitorService
abstractMonitor
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkConsumerMetrics2Monitor.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.task.listener.sink.monitor
;
import
com.xiaojukeji.kafka.manager.monitor.common.entry.bizenum.MonitorMetricNameEnum
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.monitor.common.MonitorSinkConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.metrics.ConsumerMetrics
;
import
com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics
;
...
...
@@ -32,7 +31,7 @@ import java.util.*;
@Component
(
"sinkConsumerMetrics2Monitor"
)
@ConditionalOnProperty
(
prefix
=
"monitor"
,
name
=
"enabled"
,
havingValue
=
"true"
,
matchIfMissing
=
true
)
public
class
SinkConsumerMetrics2Monitor
implements
ApplicationListener
<
ConsumerMetricsCollectedEvent
>
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
SinkConsumerMetrics2Monitor
.
class
);
@Autowired
private
LogicalClusterMetadataManager
logicalClusterMetadataManager
;
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/FlushTopicMetrics.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.task.schedule
;
import
com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO
;
...
...
@@ -22,7 +21,7 @@ import java.util.*;
*/
@Component
public
class
FlushTopicMetrics
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
FlushTopicMetrics
.
class
);
@Autowired
private
JmxService
jmxService
;
...
...
@@ -30,6 +29,9 @@ public class FlushTopicMetrics {
@Autowired
private
ClusterService
clusterService
;
/**
* 定时刷新topic指标到缓存中
*/
@Scheduled
(
cron
=
"5 0/1 * * * ?"
)
public
void
flushTopicMetrics
()
{
long
startTime
=
System
.
currentTimeMillis
();
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushBKConsumerGroupMetadata.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.task.schedule.metadata
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.ConsumerMetadata
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO
;
...
...
@@ -25,11 +24,14 @@ import java.util.*;
*/
@Component
public
class
FlushBKConsumerGroupMetadata
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
FlushBKConsumerGroupMetadata
.
class
);
@Autowired
private
ClusterService
clusterService
;
/**
* 定时刷新broker上消费组信息到缓存中
*/
@Scheduled
(
cron
=
"15 0/1 * * * ?"
)
public
void
schedule
()
{
List
<
ClusterDO
>
doList
=
clusterService
.
list
();
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushClusterMetadata.java
浏览文件 @
73d7a0ec
...
...
@@ -25,6 +25,9 @@ public class FlushClusterMetadata {
@Autowired
private
PhysicalClusterMetadataManager
physicalClusterMetadataManager
;
/**
* 定时刷新物理集群元信息到缓存中
*/
@Scheduled
(
cron
=
"0/30 * * * * ?"
)
public
void
flush
()
{
Map
<
Long
,
ClusterDO
>
dbClusterMap
=
clusterService
.
list
().
stream
().
collect
(
Collectors
.
toMap
(
ClusterDO:
:
getId
,
Function
.
identity
(),
(
key1
,
key2
)
->
key2
));
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushTopicProperties.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.task.schedule.metadata
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
import
com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl
;
import
com.xiaojukeji.kafka.manager.service.utils.KafkaZookeeperUtils
;
...
...
@@ -22,11 +21,14 @@ import java.util.Properties;
*/
@Component
public
class
FlushTopicProperties
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
FlushTopicProperties
.
class
);
@Autowired
private
ClusterService
clusterService
;
/**
* 定时刷新物理集群配置到缓存中
*/
@Scheduled
(
cron
=
"25 0/1 * * * ?"
)
public
void
flush
()
{
List
<
ClusterDO
>
doList
=
clusterService
.
list
();
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.task.schedule.metadata
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.ConsumerMetadata
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
import
com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl
;
...
...
@@ -27,7 +26,7 @@ import java.util.stream.Collectors;
*/
@Component
public
class
FlushZKConsumerGroupMetadata
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
FlushZKConsumerGroupMetadata
.
class
);
@Autowired
private
ClusterService
clusterService
;
...
...
@@ -35,6 +34,9 @@ public class FlushZKConsumerGroupMetadata {
@Autowired
private
ThreadPool
threadPool
;
/**
* 定时刷新zk上的消费组信息到缓存中
*/
@Scheduled
(
cron
=
"35 0/1 * * * ?"
)
public
void
schedule
()
{
List
<
ClusterDO
>
doList
=
clusterService
.
list
();
...
...
kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/metrics/MetricsRegistry.java
浏览文件 @
73d7a0ec
package
com.xiaojukeji.kafka.manager.web.metrics
;
import
com.codahale.metrics.*
;
import
com.xiaojukeji.kafka.manager.common.constant.LogConstant
;
import
com.xiaojukeji.kafka.manager.common.utils.factory.DefaultThreadFactory
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -21,7 +20,7 @@ import java.util.concurrent.TimeUnit;
*/
@Component
public
class
MetricsRegistry
{
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
API_METRICS_LOGGER
);
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
MetricsRegistry
.
class
);
private
static
final
DecimalFormat
DECIMAL_FORMAT
=
new
DecimalFormat
(
"#"
);
...
...
kafka-manager-web/src/main/resources/logback-spring.xml
浏览文件 @
73d7a0ec
...
...
@@ -131,15 +131,15 @@
</filter>
</appender>
<!--
Metrics信息收集
日志 -->
<appender
name=
"
COLLECTOR_METRICS
_LOGGER"
class=
"ch.qos.logback.core.rolling.RollingFileAppender"
>
<file>
${log.path}/
metrics/collector_metrics
.log
</file>
<!--
Task模块相关
日志 -->
<appender
name=
"
TASK
_LOGGER"
class=
"ch.qos.logback.core.rolling.RollingFileAppender"
>
<file>
${log.path}/
log_task
.log
</file>
<encoder>
<pattern>
%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
</pattern>
<charset>
UTF-8
</charset>
</encoder>
<rollingPolicy
class=
"ch.qos.logback.core.rolling.TimeBasedRollingPolicy"
>
<fileNamePattern>
${log.path}/
metrics/collector_metrics
_%d{yyyy-MM-dd}.%i.log
</fileNamePattern>
<fileNamePattern>
${log.path}/
log_task
_%d{yyyy-MM-dd}.%i.log
</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy
class=
"ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"
>
<maxFileSize>
100MB
</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
...
...
@@ -147,15 +147,15 @@
</rollingPolicy>
</appender>
<!--
Metrics信息收集
日志 -->
<!--
Api-Metrics信息相关
日志 -->
<appender
name=
"API_METRICS_LOGGER"
class=
"ch.qos.logback.core.rolling.RollingFileAppender"
>
<file>
${log.path}/
metrics/
api_metrics.log
</file>
<file>
${log.path}/api_metrics.log
</file>
<encoder>
<pattern>
%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
</pattern>
<charset>
UTF-8
</charset>
</encoder>
<rollingPolicy
class=
"ch.qos.logback.core.rolling.TimeBasedRollingPolicy"
>
<fileNamePattern>
${log.path}/
metrics/
api_metrics_%d{yyyy-MM-dd}.%i.log
</fileNamePattern>
<fileNamePattern>
${log.path}/api_metrics_%d{yyyy-MM-dd}.%i.log
</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy
class=
"ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"
>
<maxFileSize>
100MB
</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
...
...
@@ -163,31 +163,13 @@
</rollingPolicy>
</appender>
<!-- Metrics信息收集日志 -->
<appender
name=
"SCHEDULED_TASK_LOGGER"
class=
"ch.qos.logback.core.rolling.RollingFileAppender"
>
<file>
${log.path}/metrics/scheduled_tasks.log
</file>
<encoder>
<pattern>
%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
</pattern>
<charset>
UTF-8
</charset>
</encoder>
<rollingPolicy
class=
"ch.qos.logback.core.rolling.TimeBasedRollingPolicy"
>
<fileNamePattern>
${log.path}/metrics/scheduled_tasks_%d{yyyy-MM-dd}.%i.log
</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy
class=
"ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"
>
<maxFileSize>
100MB
</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<maxHistory>
5
</maxHistory>
</rollingPolicy>
</appender>
<logger
name=
"COLLECTOR_METRICS_LOGGER"
level=
"DEBUG"
additivity=
"false"
>
<appender-ref
ref=
"COLLECTOR_METRICS_LOGGER"
/>
<logger
name=
"com.xiaojukeji.kafka.manager.task"
level=
"INFO"
additivity=
"false"
>
<appender-ref
ref=
"TASK_LOGGER"
/>
</logger>
<logger
name=
"API_METRICS_LOGGER"
level=
"DEBUG"
additivity=
"false"
>
<logger
name=
"com.xiaojukeji.kafka.manager.web.metrics"
level=
"INFO"
additivity=
"false"
>
<appender-ref
ref=
"API_METRICS_LOGGER"
/>
</logger>
<logger
name=
"SCHEDULED_TASK_LOGGER"
level=
"DEBUG"
additivity=
"false"
>
<appender-ref
ref=
"SCHEDULED_TASK_LOGGER"
/>
</logger>
<logger
name=
"org.apache.ibatis"
level=
"INFO"
additivity=
"false"
/>
<logger
name=
"org.mybatis.spring"
level=
"INFO"
additivity=
"false"
/>
...
...
@@ -199,17 +181,6 @@
<appender-ref
ref=
"INFO_FILE"
/>
<appender-ref
ref=
"WARN_FILE"
/>
<appender-ref
ref=
"ERROR_FILE"
/>
<!--<appender-ref ref="METRICS_LOG
" />-->
<!-- <appender-ref ref="TASK_LOGGER
" />-->
</root>
<!--生产环境:输出到文件-->
<!--<springProfile name="pro">-->
<!--<root level="info">-->
<!--<appender-ref ref="CONSOLE" />-->
<!--<appender-ref ref="DEBUG_FILE" />-->
<!--<appender-ref ref="INFO_FILE" />-->
<!--<appender-ref ref="ERROR_FILE" />-->
<!--<appender-ref ref="WARN_FILE" />-->
<!--</root>-->
<!--</springProfile>-->
</configuration>
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录