Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
af8f7acb
M
milvus
项目概览
milvus
/
milvus
11 个月 前同步成功
通知
261
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
af8f7acb
编写于
2月 14, 2023
作者:
C
cai.zhang
提交者:
GitHub
2月 14, 2023
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Remove contraint that compaction based on indexed segment (#22145)
Signed-off-by:
N
cai.zhang
<
cai.zhang@zilliz.com
>
上级
9c306b0d
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
90 addition
and
18 deletion
+90
-18
internal/datacoord/compaction_trigger.go
internal/datacoord/compaction_trigger.go
+0
-2
internal/datacoord/compaction_trigger_test.go
internal/datacoord/compaction_trigger_test.go
+3
-8
internal/datacoord/handler.go
internal/datacoord/handler.go
+18
-8
internal/datacoord/server_test.go
internal/datacoord/server_test.go
+69
-0
未找到文件。
internal/datacoord/compaction_trigger.go
浏览文件 @
af8f7acb
...
@@ -365,8 +365,6 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
...
@@ -365,8 +365,6 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
break
break
}
}
group
.
segments
=
FilterInIndexedSegments
(
t
.
handler
,
t
.
indexCoord
,
group
.
segments
...
)
isDiskIndex
,
err
:=
t
.
updateSegmentMaxSize
(
group
.
segments
)
isDiskIndex
,
err
:=
t
.
updateSegmentMaxSize
(
group
.
segments
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Warn
(
"failed to update segment max size"
,
zap
.
Error
(
err
))
log
.
Warn
(
"failed to update segment max size"
,
zap
.
Error
(
err
))
...
...
internal/datacoord/compaction_trigger_test.go
浏览文件 @
af8f7acb
...
@@ -438,7 +438,7 @@ func Test_compactionTrigger_force(t *testing.T) {
...
@@ -438,7 +438,7 @@ func Test_compactionTrigger_force(t *testing.T) {
case
<-
time
.
After
(
2
*
time
.
Second
)
:
case
<-
time
.
After
(
2
*
time
.
Second
)
:
hasPlan
=
false
hasPlan
=
false
}
}
assert
.
Equal
(
t
,
fals
e
,
hasPlan
)
assert
.
Equal
(
t
,
tru
e
,
hasPlan
)
})
})
t
.
Run
(
tt
.
name
+
" with meta error"
,
func
(
t
*
testing
.
T
)
{
t
.
Run
(
tt
.
name
+
" with meta error"
,
func
(
t
*
testing
.
T
)
{
...
@@ -1033,13 +1033,8 @@ func Test_compactionTrigger_noplan(t *testing.T) {
...
@@ -1033,13 +1033,8 @@ func Test_compactionTrigger_noplan(t *testing.T) {
err
:=
tr
.
triggerCompaction
()
err
:=
tr
.
triggerCompaction
()
assert
.
Equal
(
t
,
tt
.
wantErr
,
err
!=
nil
)
assert
.
Equal
(
t
,
tt
.
wantErr
,
err
!=
nil
)
spy
:=
(
tt
.
fields
.
compactionHandler
)
.
(
*
spyCompactionHandler
)
spy
:=
(
tt
.
fields
.
compactionHandler
)
.
(
*
spyCompactionHandler
)
select
{
plan
:=
<-
spy
.
spyChan
case
val
:=
<-
spy
.
spyChan
:
assert
.
Equal
(
t
,
len
(
plan
.
SegmentBinlogs
),
4
)
assert
.
Fail
(
t
,
"we expect no compaction generated"
,
val
)
return
case
<-
time
.
After
(
3
*
time
.
Second
)
:
return
}
})
})
}
}
}
}
...
...
internal/datacoord/handler.go
浏览文件 @
af8f7acb
...
@@ -137,16 +137,26 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni
...
@@ -137,16 +137,26 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni
unIndexedIDs
.
Insert
(
s
.
GetID
())
unIndexedIDs
.
Insert
(
s
.
GetID
())
}
}
}
}
hasUnIndexed
:=
true
for
hasUnIndexed
{
hasUnIndexed
=
false
for
id
:=
range
unIndexedIDs
{
for
id
:=
range
unIndexedIDs
{
// Indexed segments are compacted to a raw segment,
// Indexed segments are compacted to a raw segment,
// replace it with the indexed ones
// replace it with the indexed ones
if
len
(
segmentInfos
[
id
]
.
GetCompactionFrom
())
>
0
&&
if
len
(
segmentInfos
[
id
]
.
GetCompactionFrom
())
>
0
{
indexed
.
Contain
(
segmentInfos
[
id
]
.
GetCompactionFrom
()
...
)
{
unIndexedIDs
.
Remove
(
id
)
unIndexedIDs
.
Remove
(
id
)
indexedIDs
.
Insert
(
segmentInfos
[
id
]
.
GetCompactionFrom
()
...
)
for
_
,
segID
:=
range
segmentInfos
[
id
]
.
GetCompactionFrom
()
{
if
indexed
.
Contain
(
segID
)
{
indexedIDs
.
Insert
(
segID
)
}
else
{
unIndexedIDs
.
Insert
(
segID
)
hasUnIndexed
=
true
}
}
droppedIDs
.
Remove
(
segmentInfos
[
id
]
.
GetCompactionFrom
()
...
)
droppedIDs
.
Remove
(
segmentInfos
[
id
]
.
GetCompactionFrom
()
...
)
}
}
}
}
}
return
&
datapb
.
VchannelInfo
{
return
&
datapb
.
VchannelInfo
{
CollectionID
:
channel
.
CollectionID
,
CollectionID
:
channel
.
CollectionID
,
...
...
internal/datacoord/server_test.go
浏览文件 @
af8f7acb
...
@@ -2644,6 +2644,75 @@ func TestGetRecoveryInfo(t *testing.T) {
...
@@ -2644,6 +2644,75 @@ func TestGetRecoveryInfo(t *testing.T) {
assert
.
Equal
(
t
,
UniqueID
(
8
),
resp
.
GetChannels
()[
0
]
.
GetDroppedSegmentIds
()[
0
])
assert
.
Equal
(
t
,
UniqueID
(
8
),
resp
.
GetChannels
()[
0
]
.
GetDroppedSegmentIds
()[
0
])
})
})
t
.
Run
(
"with continuous compaction"
,
func
(
t
*
testing
.
T
)
{
svr
:=
newTestServer
(
t
,
nil
)
defer
closeTestServer
(
t
,
svr
)
svr
.
rootCoordClientCreator
=
func
(
ctx
context
.
Context
,
metaRootPath
string
,
etcdCli
*
clientv3
.
Client
)
(
types
.
RootCoord
,
error
)
{
return
newMockRootCoordService
(),
nil
}
svr
.
meta
.
AddCollection
(
&
collectionInfo
{
ID
:
0
,
Schema
:
newTestSchema
(),
})
err
:=
svr
.
meta
.
UpdateChannelCheckpoint
(
"vchan1"
,
&
internalpb
.
MsgPosition
{
ChannelName
:
"vchan1"
,
Timestamp
:
0
,
})
assert
.
NoError
(
t
,
err
)
seg1
:=
createSegment
(
9
,
0
,
0
,
100
,
30
,
"vchan1"
,
commonpb
.
SegmentState_Dropped
)
seg2
:=
createSegment
(
10
,
0
,
0
,
100
,
40
,
"vchan1"
,
commonpb
.
SegmentState_Dropped
)
seg3
:=
createSegment
(
11
,
0
,
0
,
100
,
40
,
"vchan1"
,
commonpb
.
SegmentState_Dropped
)
seg3
.
CompactionFrom
=
[]
int64
{
9
,
10
}
seg4
:=
createSegment
(
12
,
0
,
0
,
100
,
40
,
"vchan1"
,
commonpb
.
SegmentState_Dropped
)
seg5
:=
createSegment
(
13
,
0
,
0
,
100
,
40
,
"vchan1"
,
commonpb
.
SegmentState_Flushed
)
seg5
.
CompactionFrom
=
[]
int64
{
11
,
12
}
err
=
svr
.
meta
.
AddSegment
(
NewSegmentInfo
(
seg1
))
assert
.
Nil
(
t
,
err
)
err
=
svr
.
meta
.
AddSegment
(
NewSegmentInfo
(
seg2
))
assert
.
Nil
(
t
,
err
)
err
=
svr
.
meta
.
AddSegment
(
NewSegmentInfo
(
seg3
))
assert
.
Nil
(
t
,
err
)
err
=
svr
.
meta
.
AddSegment
(
NewSegmentInfo
(
seg4
))
assert
.
Nil
(
t
,
err
)
err
=
svr
.
meta
.
AddSegment
(
NewSegmentInfo
(
seg5
))
assert
.
Nil
(
t
,
err
)
mockResp
:=
&
indexpb
.
GetIndexInfoResponse
{
Status
:
&
commonpb
.
Status
{},
SegmentInfo
:
map
[
int64
]
*
indexpb
.
SegmentInfo
{
seg4
.
ID
:
{
CollectionID
:
seg4
.
CollectionID
,
SegmentID
:
seg4
.
ID
,
EnableIndex
:
true
,
IndexInfos
:
[]
*
indexpb
.
IndexFilePathInfo
{
{
SegmentID
:
seg4
.
ID
,
FieldID
:
2
,
},
},
},
},
}
svr
.
indexCoord
=
mocks
.
NewMockIndexCoord
(
t
)
svr
.
indexCoord
.
(
*
mocks
.
MockIndexCoord
)
.
EXPECT
()
.
GetIndexInfos
(
mock
.
Anything
,
mock
.
Anything
)
.
Return
(
mockResp
,
nil
)
req
:=
&
datapb
.
GetRecoveryInfoRequest
{
CollectionID
:
0
,
PartitionID
:
0
,
}
resp
,
err
:=
svr
.
GetRecoveryInfo
(
context
.
TODO
(),
req
)
assert
.
Nil
(
t
,
err
)
assert
.
EqualValues
(
t
,
commonpb
.
ErrorCode_Success
,
resp
.
Status
.
ErrorCode
)
assert
.
NotNil
(
t
,
resp
.
GetChannels
()[
0
]
.
SeekPosition
)
assert
.
NotEqual
(
t
,
0
,
resp
.
GetChannels
()[
0
]
.
GetSeekPosition
()
.
GetTimestamp
())
assert
.
Len
(
t
,
resp
.
GetChannels
()[
0
]
.
GetDroppedSegmentIds
(),
0
)
assert
.
ElementsMatch
(
t
,
[]
UniqueID
{
9
,
10
},
resp
.
GetChannels
()[
0
]
.
GetUnflushedSegmentIds
())
assert
.
ElementsMatch
(
t
,
[]
UniqueID
{
12
},
resp
.
GetChannels
()[
0
]
.
GetFlushedSegmentIds
())
})
t
.
Run
(
"with closed server"
,
func
(
t
*
testing
.
T
)
{
t
.
Run
(
"with closed server"
,
func
(
t
*
testing
.
T
)
{
svr
:=
newTestServer
(
t
,
nil
)
svr
:=
newTestServer
(
t
,
nil
)
closeTestServer
(
t
,
svr
)
closeTestServer
(
t
,
svr
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录