Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
DiDi
kafka-manager
提交
0f746917
K
kafka-manager
项目概览
DiDi
/
kafka-manager
10 个月 前同步成功
通知
58
Star
6372
Fork
1229
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
K
kafka-manager
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
0f746917
编写于
7月 02, 2021
作者:
Z
zengqiao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Topic基本信息中增加retention.bytes信息
上级
eb3b8c4b
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
111 addition
and
65 deletion
+111
-65
kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java
.../kafka/manager/common/constant/TopicCreationConstant.java
+2
-0
kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java
...i/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java
+12
-1
kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java
...a/manager/common/entity/vo/normal/topic/TopicBasicVO.java
+52
-40
kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java
...manager/service/cache/PhysicalClusterMetadataManager.java
+33
-10
kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java
.../kafka/manager/service/service/impl/TopicServiceImpl.java
+1
-0
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushTopicProperties.java
.../manager/task/schedule/metadata/FlushTopicProperties.java
+9
-10
kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java
...ager/web/api/versionone/normal/NormalTopicController.java
+1
-4
kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicModelConverter.java
...eji/kafka/manager/web/converters/TopicModelConverter.java
+1
-0
未找到文件。
kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java
浏览文件 @
0f746917
...
...
@@ -25,6 +25,8 @@ public class TopicCreationConstant {
public
static
final
String
TOPIC_RETENTION_TIME_KEY_NAME
=
"retention.ms"
;
public
static
final
String
TOPIC_RETENTION_BYTES_KEY_NAME
=
"retention.bytes"
;
public
static
final
Long
DEFAULT_QUOTA
=
3
*
1024
*
1024L
;
public
static
Properties
createNewProperties
(
Long
retentionTime
)
{
...
...
kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java
浏览文件 @
0f746917
...
...
@@ -37,6 +37,8 @@ public class TopicBasicDTO {
private
Long
retentionTime
;
private
Long
retentionBytes
;
public
Long
getClusterId
()
{
return
clusterId
;
}
...
...
@@ -157,6 +159,14 @@ public class TopicBasicDTO {
this
.
retentionTime
=
retentionTime
;
}
public
Long
getRetentionBytes
()
{
return
retentionBytes
;
}
public
void
setRetentionBytes
(
Long
retentionBytes
)
{
this
.
retentionBytes
=
retentionBytes
;
}
@Override
public
String
toString
()
{
return
"TopicBasicDTO{"
+
...
...
@@ -166,7 +176,7 @@ public class TopicBasicDTO {
", principals='"
+
principals
+
'\''
+
", topicName='"
+
topicName
+
'\''
+
", description='"
+
description
+
'\''
+
", regionNameList=
'"
+
regionNameList
+
'\''
+
", regionNameList=
"
+
regionNameList
+
", score="
+
score
+
", topicCodeC='"
+
topicCodeC
+
'\''
+
", partitionNum="
+
partitionNum
+
...
...
@@ -175,6 +185,7 @@ public class TopicBasicDTO {
", modifyTime="
+
modifyTime
+
", createTime="
+
createTime
+
", retentionTime="
+
retentionTime
+
", retentionBytes="
+
retentionBytes
+
'}'
;
}
}
kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java
浏览文件 @
0f746917
...
...
@@ -33,6 +33,9 @@ public class TopicBasicVO {
@ApiModelProperty
(
value
=
"存储时间(ms)"
)
private
Long
retentionTime
;
@ApiModelProperty
(
value
=
"单分区数据保存大小(Byte)"
)
private
Long
retentionBytes
;
@ApiModelProperty
(
value
=
"创建时间"
)
private
Long
createTime
;
...
...
@@ -62,12 +65,20 @@ public class TopicBasicVO {
this
.
clusterId
=
clusterId
;
}
public
String
get
TopicCodeC
()
{
return
topicCodeC
;
public
String
get
AppId
()
{
return
appId
;
}
public
void
setTopicCodeC
(
String
topicCodeC
)
{
this
.
topicCodeC
=
topicCodeC
;
public
void
setAppId
(
String
appId
)
{
this
.
appId
=
appId
;
}
public
String
getAppName
()
{
return
appName
;
}
public
void
setAppName
(
String
appName
)
{
this
.
appName
=
appName
;
}
public
Integer
getPartitionNum
()
{
...
...
@@ -86,76 +97,76 @@ public class TopicBasicVO {
this
.
replicaNum
=
replicaNum
;
}
public
Long
getModifyTime
()
{
return
modifyTime
;
public
String
getPrincipals
()
{
return
principals
;
}
public
void
set
ModifyTime
(
Long
modifyTime
)
{
this
.
modifyTime
=
modifyTime
;
public
void
set
Principals
(
String
principals
)
{
this
.
principals
=
principals
;
}
public
Long
get
Create
Time
()
{
return
create
Time
;
public
Long
get
Retention
Time
()
{
return
retention
Time
;
}
public
void
set
CreateTime
(
Long
create
Time
)
{
this
.
createTime
=
create
Time
;
public
void
set
RetentionTime
(
Long
retention
Time
)
{
this
.
retentionTime
=
retention
Time
;
}
public
String
getPrincipal
s
()
{
return
principal
s
;
public
Long
getRetentionByte
s
()
{
return
retentionByte
s
;
}
public
void
set
Principals
(
String
principal
s
)
{
this
.
principals
=
principal
s
;
public
void
set
RetentionBytes
(
Long
retentionByte
s
)
{
this
.
retentionBytes
=
retentionByte
s
;
}
public
String
getDescription
()
{
return
description
;
public
Long
getCreateTime
()
{
return
createTime
;
}
public
void
set
Description
(
String
description
)
{
this
.
description
=
description
;
public
void
set
CreateTime
(
Long
createTime
)
{
this
.
createTime
=
createTime
;
}
public
void
setAppId
(
String
appId
)
{
this
.
appId
=
appId
;
public
Long
getModifyTime
(
)
{
return
modifyTime
;
}
public
void
set
BootstrapServers
(
String
bootstrapServers
)
{
this
.
bootstrapServers
=
bootstrapServers
;
public
void
set
ModifyTime
(
Long
modifyTime
)
{
this
.
modifyTime
=
modifyTime
;
}
public
String
getAppId
()
{
return
appId
;
public
Integer
getScore
()
{
return
score
;
}
public
String
getBootstrapServers
(
)
{
return
bootstrapServers
;
public
void
setScore
(
Integer
score
)
{
this
.
score
=
score
;
}
public
Long
getRetentionTime
()
{
return
retentionTime
;
public
String
getTopicCodeC
()
{
return
topicCodeC
;
}
public
void
set
RetentionTime
(
Long
retentionTime
)
{
this
.
retentionTime
=
retentionTime
;
public
void
set
TopicCodeC
(
String
topicCodeC
)
{
this
.
topicCodeC
=
topicCodeC
;
}
public
String
get
AppName
()
{
return
appName
;
public
String
get
Description
()
{
return
description
;
}
public
void
set
AppName
(
String
appName
)
{
this
.
appName
=
appName
;
public
void
set
Description
(
String
description
)
{
this
.
description
=
description
;
}
public
Integer
getScore
()
{
return
score
;
public
String
getBootstrapServers
()
{
return
bootstrapServers
;
}
public
void
set
Score
(
Integer
score
)
{
this
.
score
=
score
;
public
void
set
BootstrapServers
(
String
bootstrapServers
)
{
this
.
bootstrapServers
=
bootstrapServers
;
}
public
List
<
String
>
getRegionNameList
()
{
...
...
@@ -176,6 +187,7 @@ public class TopicBasicVO {
", replicaNum="
+
replicaNum
+
", principals='"
+
principals
+
'\''
+
", retentionTime="
+
retentionTime
+
", retentionBytes="
+
retentionBytes
+
", createTime="
+
createTime
+
", modifyTime="
+
modifyTime
+
", score="
+
score
+
...
...
kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java
浏览文件 @
0f746917
...
...
@@ -3,10 +3,12 @@ package com.xiaojukeji.kafka.manager.service.cache;
import
com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum
;
import
com.xiaojukeji.kafka.manager.common.constant.Constant
;
import
com.xiaojukeji.kafka.manager.common.constant.KafkaConstant
;
import
com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant
;
import
com.xiaojukeji.kafka.manager.common.entity.KafkaVersion
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO
;
import
com.xiaojukeji.kafka.manager.common.utils.JsonUtils
;
import
com.xiaojukeji.kafka.manager.common.utils.ListUtils
;
import
com.xiaojukeji.kafka.manager.common.utils.NumberUtils
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
import
com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig
;
import
com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap
;
...
...
@@ -56,7 +58,7 @@ public class PhysicalClusterMetadataManager {
private
final
static
Map
<
Long
,
Map
<
String
,
TopicMetadata
>>
TOPIC_METADATA_MAP
=
new
ConcurrentHashMap
<>();
private
final
static
Map
<
Long
,
Map
<
String
,
Long
>>
TOPIC_RETENTION_TIME
_MAP
=
new
ConcurrentHashMap
<>();
private
final
static
Map
<
Long
,
Map
<
String
,
Properties
>>
TOPIC_PROPERTIES
_MAP
=
new
ConcurrentHashMap
<>();
private
final
static
Map
<
Long
,
Map
<
Integer
,
BrokerMetadata
>>
BROKER_METADATA_MAP
=
new
ConcurrentHashMap
<>();
...
...
@@ -95,7 +97,7 @@ public class PhysicalClusterMetadataManager {
// 初始化topic-map
TOPIC_METADATA_MAP
.
put
(
clusterDO
.
getId
(),
new
ConcurrentHashMap
<>());
TOPIC_
RETENTION_TIME
_MAP
.
put
(
clusterDO
.
getId
(),
new
ConcurrentHashMap
<>());
TOPIC_
PROPERTIES
_MAP
.
put
(
clusterDO
.
getId
(),
new
ConcurrentHashMap
<>());
// 初始化cluster-map
CLUSTER_MAP
.
put
(
clusterDO
.
getId
(),
clusterDO
);
...
...
@@ -158,7 +160,7 @@ public class PhysicalClusterMetadataManager {
KAFKA_VERSION_MAP
.
remove
(
clusterId
);
TOPIC_METADATA_MAP
.
remove
(
clusterId
);
TOPIC_
RETENTION_TIME
_MAP
.
remove
(
clusterId
);
TOPIC_
PROPERTIES
_MAP
.
remove
(
clusterId
);
CLUSTER_MAP
.
remove
(
clusterId
);
}
...
...
@@ -262,24 +264,45 @@ public class PhysicalClusterMetadataManager {
//---------------------------配置相关元信息--------------
public
static
void
putTopicRetentionTime
(
Long
clusterId
,
String
topicName
,
Long
retentionTime
)
{
Map
<
String
,
Long
>
timeMap
=
TOPIC_RETENTION_TIME_MAP
.
get
(
clusterId
);
if
(
timeMap
==
null
)
{
public
static
void
putTopicProperties
(
Long
clusterId
,
String
topicName
,
Properties
properties
)
{
if
(
ValidateUtils
.
isNull
(
clusterId
)
||
ValidateUtils
.
isBlank
(
topicName
)
||
ValidateUtils
.
isNull
(
properties
))
{
return
;
}
timeMap
.
put
(
topicName
,
retentionTime
);
Map
<
String
,
Properties
>
propertiesMap
=
TOPIC_PROPERTIES_MAP
.
get
(
clusterId
);
if
(
ValidateUtils
.
isNull
(
propertiesMap
))
{
return
;
}
propertiesMap
.
put
(
topicName
,
properties
);
}
public
static
Long
getTopicRetentionTime
(
Long
clusterId
,
String
topicName
)
{
Map
<
String
,
Long
>
timeMap
=
TOPIC_RETENTION_TIME_MAP
.
get
(
clusterId
);
if
(
timeMap
==
null
)
{
Map
<
String
,
Properties
>
propertiesMap
=
TOPIC_PROPERTIES_MAP
.
get
(
clusterId
);
if
(
ValidateUtils
.
isNull
(
propertiesMap
))
{
return
null
;
}
Properties
properties
=
propertiesMap
.
get
(
topicName
);
if
(
ValidateUtils
.
isNull
(
properties
))
{
return
null
;
}
return
timeMap
.
get
(
topicName
);
return
NumberUtils
.
string2Long
(
properties
.
getProperty
(
TopicCreationConstant
.
TOPIC_RETENTION_TIME_KEY_NAME
));
}
public
static
Long
getTopicRetentionBytes
(
Long
clusterId
,
String
topicName
)
{
Map
<
String
,
Properties
>
propertiesMap
=
TOPIC_PROPERTIES_MAP
.
get
(
clusterId
);
if
(
ValidateUtils
.
isNull
(
propertiesMap
))
{
return
null
;
}
Properties
properties
=
propertiesMap
.
get
(
topicName
);
if
(
ValidateUtils
.
isNull
(
properties
))
{
return
null
;
}
return
NumberUtils
.
string2Long
(
properties
.
getProperty
(
TopicCreationConstant
.
TOPIC_RETENTION_BYTES_KEY_NAME
));
}
//---------------------------Broker元信息相关--------------
...
...
kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java
浏览文件 @
0f746917
...
...
@@ -223,6 +223,7 @@ public class TopicServiceImpl implements TopicService {
basicDTO
.
setCreateTime
(
topicMetadata
.
getCreateTime
());
basicDTO
.
setModifyTime
(
topicMetadata
.
getModifyTime
());
basicDTO
.
setRetentionTime
(
PhysicalClusterMetadataManager
.
getTopicRetentionTime
(
clusterId
,
topicName
));
basicDTO
.
setRetentionBytes
(
PhysicalClusterMetadataManager
.
getTopicRetentionBytes
(
clusterId
,
topicName
));
TopicDO
topicDO
=
topicManagerService
.
getByTopicName
(
clusterId
,
topicName
);
if
(!
ValidateUtils
.
isNull
(
topicDO
))
{
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushTopic
RetentionTime
.java
→
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushTopic
Properties
.java
浏览文件 @
0f746917
...
...
@@ -14,13 +14,14 @@ import org.springframework.scheduling.annotation.Scheduled;
import
org.springframework.stereotype.Component
;
import
java.util.List
;
import
java.util.Properties
;
/**
* @author zengqiao
* @date 20/7/23
*/
@Component
public
class
FlushTopic
RetentionTime
{
public
class
FlushTopic
Properties
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
@Autowired
...
...
@@ -33,7 +34,7 @@ public class FlushTopicRetentionTime {
try
{
flush
(
clusterDO
);
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
"flush topic
retention time
failed, clusterId:{}."
,
clusterDO
.
getId
(),
e
);
LOGGER
.
error
(
"flush topic
properties
failed, clusterId:{}."
,
clusterDO
.
getId
(),
e
);
}
}
}
...
...
@@ -41,22 +42,20 @@ public class FlushTopicRetentionTime {
private
void
flush
(
ClusterDO
clusterDO
)
{
ZkConfigImpl
zkConfig
=
PhysicalClusterMetadataManager
.
getZKConfig
(
clusterDO
.
getId
());
if
(
ValidateUtils
.
isNull
(
zkConfig
))
{
LOGGER
.
error
(
"flush topic
retention time
, get zk config failed, clusterId:{}."
,
clusterDO
.
getId
());
LOGGER
.
error
(
"flush topic
properties
, get zk config failed, clusterId:{}."
,
clusterDO
.
getId
());
return
;
}
for
(
String
topicName:
PhysicalClusterMetadataManager
.
getTopicNameList
(
clusterDO
.
getId
()))
{
try
{
Long
retentionTime
=
KafkaZookeeperUtils
.
getTopicRetentionTime
(
zkConfig
,
topicName
);
if
(
retentionTime
==
null
)
{
LOGGER
.
warn
(
"get topic retentionTime failed, clusterId:{} topicName:{}."
,
clusterDO
.
getId
(),
topicName
);
Properties
properties
=
KafkaZookeeperUtils
.
getTopicProperties
(
zkConfig
,
topicName
);
if
(
ValidateUtils
.
isNull
(
properties
))
{
LOGGER
.
warn
(
"get topic properties failed, clusterId:{} topicName:{}."
,
clusterDO
.
getId
(),
topicName
);
continue
;
}
PhysicalClusterMetadataManager
.
putTopic
RetentionTime
(
clusterDO
.
getId
(),
topicName
,
retentionTime
);
PhysicalClusterMetadataManager
.
putTopic
Properties
(
clusterDO
.
getId
(),
topicName
,
properties
);
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
"get topic retentionTime failed, clusterId:{} topicName:{}."
,
clusterDO
.
getId
(),
topicName
,
e
);
LOGGER
.
error
(
"get topic properties failed, clusterId:{} topicName:{}."
,
clusterDO
.
getId
(),
topicName
,
e
);
}
}
}
...
...
kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java
浏览文件 @
0f746917
...
...
@@ -61,10 +61,7 @@ public class NormalTopicController {
@ApiOperation
(
value
=
"Topic基本信息"
,
notes
=
""
)
@RequestMapping
(
value
=
"{clusterId}/topics/{topicName}/basic-info"
,
method
=
RequestMethod
.
GET
)
@ResponseBody
public
Result
<
TopicBasicVO
>
getTopicBasic
(
@PathVariable
Long
clusterId
,
@PathVariable
String
topicName
,
@RequestParam
(
value
=
"isPhysicalClusterId"
,
required
=
false
)
Boolean
isPhysicalClusterId
)
{
public
Result
<
TopicBasicVO
>
getTopicBasic
(
@PathVariable
Long
clusterId
,
@PathVariable
String
topicName
,
@RequestParam
(
value
=
"isPhysicalClusterId"
,
required
=
false
)
Boolean
isPhysicalClusterId
)
{
Long
physicalClusterId
=
logicalClusterMetadataManager
.
getPhysicalClusterId
(
clusterId
,
isPhysicalClusterId
);
if
(
ValidateUtils
.
isNull
(
physicalClusterId
))
{
return
Result
.
buildFrom
(
ResultStatus
.
CLUSTER_NOT_EXIST
);
...
...
kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicModelConverter.java
浏览文件 @
0f746917
...
...
@@ -31,6 +31,7 @@ public class TopicModelConverter {
vo
.
setReplicaNum
(
dto
.
getReplicaNum
());
vo
.
setPrincipals
(
dto
.
getPrincipals
());
vo
.
setRetentionTime
(
dto
.
getRetentionTime
());
vo
.
setRetentionBytes
(
dto
.
getRetentionBytes
());
vo
.
setCreateTime
(
dto
.
getCreateTime
());
vo
.
setModifyTime
(
dto
.
getModifyTime
());
vo
.
setScore
(
dto
.
getScore
());
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录