Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
DiDi
kafka-manager
提交
375c6f56
K
kafka-manager
项目概览
DiDi
/
kafka-manager
9 个月 前同步成功
通知
58
Star
6372
Fork
1229
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
K
kafka-manager
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
375c6f56
编写于
9月 19, 2022
作者:
Z
zengqiao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
修改GroupOffsetResetEnum类名为OffsetTypeEnum
上级
0bf85c97
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
24 addition
and
24 deletion
+24
-24
km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java
...ji/know/streaming/km/biz/group/impl/GroupManagerImpl.java
+6
-6
km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java
...ow/streaming/km/biz/topic/impl/TopicStateManagerImpl.java
+5
-5
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/group/GroupOffsetResetDTO.java
...reaming/km/common/bean/dto/group/GroupOffsetResetDTO.java
+2
-1
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java
...ow/streaming/km/common/bean/dto/topic/TopicRecordDTO.java
+4
-5
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/OffsetTypeEnum.java
...jukeji/know/streaming/km/common/enums/OffsetTypeEnum.java
+7
-7
未找到文件。
km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java
浏览文件 @
375c6f56
...
...
@@ -19,7 +19,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicConsumedD
import
com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO
;
import
com.xiaojukeji.know.streaming.km.common.constant.MsgConstant
;
import
com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum
;
import
com.xiaojukeji.know.streaming.km.common.enums.
GroupOffsetReset
Enum
;
import
com.xiaojukeji.know.streaming.km.common.enums.
OffsetType
Enum
;
import
com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum
;
import
com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException
;
import
com.xiaojukeji.know.streaming.km.common.exception.NotExistException
;
...
...
@@ -199,12 +199,12 @@ public class GroupManagerImpl implements GroupManager {
return
Result
.
buildFromRSAndMsg
(
ResultStatus
.
NOT_EXIST
,
MsgConstant
.
getTopicNotExist
(
dto
.
getClusterId
(),
dto
.
getTopicName
()));
}
if
(
GroupOffsetReset
Enum
.
PRECISE_OFFSET
.
getResetType
()
==
dto
.
getResetType
()
if
(
OffsetType
Enum
.
PRECISE_OFFSET
.
getResetType
()
==
dto
.
getResetType
()
&&
ValidateUtils
.
isEmptyList
(
dto
.
getOffsetList
()))
{
return
Result
.
buildFromRSAndMsg
(
ResultStatus
.
PARAM_ILLEGAL
,
"参数错误,指定offset重置需传offset信息"
);
}
if
(
GroupOffsetReset
Enum
.
PRECISE_TIMESTAMP
.
getResetType
()
==
dto
.
getResetType
()
if
(
OffsetType
Enum
.
PRECISE_TIMESTAMP
.
getResetType
()
==
dto
.
getResetType
()
&&
ValidateUtils
.
isNull
(
dto
.
getTimestamp
()))
{
return
Result
.
buildFromRSAndMsg
(
ResultStatus
.
PARAM_ILLEGAL
,
"参数错误,指定时间重置需传时间信息"
);
}
...
...
@@ -213,7 +213,7 @@ public class GroupManagerImpl implements GroupManager {
}
private
Result
<
Map
<
TopicPartition
,
Long
>>
getPartitionOffset
(
GroupOffsetResetDTO
dto
)
{
if
(
GroupOffsetReset
Enum
.
PRECISE_OFFSET
.
getResetType
()
==
dto
.
getResetType
())
{
if
(
OffsetType
Enum
.
PRECISE_OFFSET
.
getResetType
()
==
dto
.
getResetType
())
{
return
Result
.
buildSuc
(
dto
.
getOffsetList
().
stream
().
collect
(
Collectors
.
toMap
(
elem
->
new
TopicPartition
(
dto
.
getTopicName
(),
elem
.
getPartitionId
()),
PartitionOffsetDTO:
:
getOffset
,
...
...
@@ -222,9 +222,9 @@ public class GroupManagerImpl implements GroupManager {
}
OffsetSpec
offsetSpec
=
null
;
if
(
GroupOffsetReset
Enum
.
PRECISE_TIMESTAMP
.
getResetType
()
==
dto
.
getResetType
())
{
if
(
OffsetType
Enum
.
PRECISE_TIMESTAMP
.
getResetType
()
==
dto
.
getResetType
())
{
offsetSpec
=
OffsetSpec
.
forTimestamp
(
dto
.
getTimestamp
());
}
else
if
(
GroupOffsetReset
Enum
.
EARLIEST
.
getResetType
()
==
dto
.
getResetType
())
{
}
else
if
(
OffsetType
Enum
.
EARLIEST
.
getResetType
()
==
dto
.
getResetType
())
{
offsetSpec
=
OffsetSpec
.
earliest
();
}
else
{
offsetSpec
=
OffsetSpec
.
latest
();
...
...
km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java
浏览文件 @
375c6f56
...
...
@@ -23,7 +23,7 @@ import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import
com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant
;
import
com.xiaojukeji.know.streaming.km.common.constant.MsgConstant
;
import
com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter
;
import
com.xiaojukeji.know.streaming.km.common.enums.
GroupOffsetReset
Enum
;
import
com.xiaojukeji.know.streaming.km.common.enums.
OffsetType
Enum
;
import
com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum
;
import
com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException
;
import
com.xiaojukeji.know.streaming.km.common.exception.NotExistException
;
...
...
@@ -164,7 +164,7 @@ public class TopicStateManagerImpl implements TopicStateManager {
Map
<
TopicPartition
,
OffsetAndTimestamp
>
partitionOffsetAndTimestampMap
=
new
HashMap
<>();
// 获取指定时间每个分区的offset(按指定开始时间查询消息时)
if
(
GroupOffsetReset
Enum
.
PRECISE_TIMESTAMP
.
getResetType
()
==
dto
.
getFilterOffsetReset
())
{
if
(
OffsetType
Enum
.
PRECISE_TIMESTAMP
.
getResetType
()
==
dto
.
getFilterOffsetReset
())
{
Map
<
TopicPartition
,
Long
>
timestampsToSearch
=
new
HashMap
<>();
partitionList
.
forEach
(
topicPartition
->
{
timestampsToSearch
.
put
(
topicPartition
,
dto
.
getStartTimestampUnitMs
());
...
...
@@ -173,13 +173,13 @@ public class TopicStateManagerImpl implements TopicStateManager {
}
for
(
TopicPartition
partition
:
partitionList
)
{
if
(
GroupOffsetReset
Enum
.
EARLIEST
.
getResetType
()
==
dto
.
getFilterOffsetReset
())
{
if
(
OffsetType
Enum
.
EARLIEST
.
getResetType
()
==
dto
.
getFilterOffsetReset
())
{
// 重置到最旧
kafkaConsumer
.
seek
(
partition
,
beginOffsetsMapResult
.
getData
().
get
(
partition
));
}
else
if
(
GroupOffsetReset
Enum
.
PRECISE_TIMESTAMP
.
getResetType
()
==
dto
.
getFilterOffsetReset
())
{
}
else
if
(
OffsetType
Enum
.
PRECISE_TIMESTAMP
.
getResetType
()
==
dto
.
getFilterOffsetReset
())
{
// 重置到指定时间
kafkaConsumer
.
seek
(
partition
,
partitionOffsetAndTimestampMap
.
get
(
partition
).
offset
());
}
else
if
(
GroupOffsetReset
Enum
.
PRECISE_OFFSET
.
getResetType
()
==
dto
.
getFilterOffsetReset
())
{
}
else
if
(
OffsetType
Enum
.
PRECISE_OFFSET
.
getResetType
()
==
dto
.
getFilterOffsetReset
())
{
// 重置到指定位置
}
else
{
...
...
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/group/GroupOffsetResetDTO.java
浏览文件 @
375c6f56
...
...
@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.common.bean.dto.group;
import
com.fasterxml.jackson.annotation.JsonIgnoreProperties
;
import
com.xiaojukeji.know.streaming.km.common.bean.dto.partition.PartitionOffsetDTO
;
import
com.xiaojukeji.know.streaming.km.common.bean.dto.topic.ClusterTopicDTO
;
import
com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum
;
import
io.swagger.annotations.ApiModelProperty
;
import
lombok.Data
;
...
...
@@ -23,7 +24,7 @@ public class GroupOffsetResetDTO extends ClusterTopicDTO {
private
String
groupName
;
/**
* @see
com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetReset
Enum
* @see
OffsetType
Enum
*/
@NotNull
(
message
=
"resetType不允许为空"
)
@ApiModelProperty
(
value
=
"重置方式"
,
example
=
"1"
)
...
...
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java
浏览文件 @
375c6f56
package
com.xiaojukeji.know.streaming.km.common.bean.dto.topic
;
import
com.fasterxml.jackson.annotation.JsonIgnoreProperties
;
import
com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO
;
import
com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO
;
import
com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum
;
import
io.swagger.annotations.ApiModel
;
import
io.swagger.annotations.ApiModelProperty
;
import
lombok.Data
;
...
...
@@ -36,13 +36,12 @@ public class TopicRecordDTO extends PaginationSortDTO {
@ApiModelProperty
(
value
=
"预览超时时间"
,
example
=
"10000"
)
private
Long
pullTimeoutUnitMs
=
8000L
;
/**
* @see OffsetTypeEnum
*/
@ApiModelProperty
(
value
=
"offset"
,
example
=
""
)
private
Integer
filterOffsetReset
=
0
;
@ApiModelProperty
(
value
=
"开始日期时间戳"
,
example
=
""
)
private
Long
startTimestampUnitMs
;
@ApiModelProperty
(
value
=
"结束日期时间戳"
,
example
=
""
)
private
Long
utilTimestampUnitMs
;
}
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/
GroupOffsetReset
Enum.java
→
km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/
OffsetType
Enum.java
浏览文件 @
375c6f56
...
...
@@ -3,19 +3,19 @@ package com.xiaojukeji.know.streaming.km.common.enums;
import
lombok.Getter
;
/**
*
重置offset
*
offset类型
* @author zengqiao
* @date 19/4/8
*/
@Getter
public
enum
GroupOffsetReset
Enum
{
LATEST
(
0
,
"
重置到
最新"
),
public
enum
OffsetType
Enum
{
LATEST
(
0
,
"最新"
),
EARLIEST
(
1
,
"
重置到
最旧"
),
EARLIEST
(
1
,
"最旧"
),
PRECISE_TIMESTAMP
(
2
,
"
按时间进行重置
"
),
PRECISE_TIMESTAMP
(
2
,
"
指定时间
"
),
PRECISE_OFFSET
(
3
,
"
重置到
指定位置"
),
PRECISE_OFFSET
(
3
,
"指定位置"
),
;
...
...
@@ -23,7 +23,7 @@ public enum GroupOffsetResetEnum {
private
final
String
message
;
GroupOffsetReset
Enum
(
int
resetType
,
String
message
)
{
OffsetType
Enum
(
int
resetType
,
String
message
)
{
this
.
resetType
=
resetType
;
this
.
message
=
message
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录