Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
d6f95f49
M
milvus
项目概览
milvus
/
milvus
11 个月 前同步成功
通知
260
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
d6f95f49
编写于
10月 18, 2021
作者:
Z
zhenshan.cao
提交者:
GitHub
10月 18, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Fix golint error in querycoord (#10127)
Signed-off-by:
N
zhenshan.cao
<
zhenshan.cao@zilliz.com
>
上级
32a029fb
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
269 addition
and
274 deletion
+269
-274
internal/querycoord/impl.go
internal/querycoord/impl.go
+8
-8
internal/querycoord/impl_test.go
internal/querycoord/impl_test.go
+2
-2
internal/querycoord/query_coord.go
internal/querycoord/query_coord.go
+4
-4
internal/querycoord/task.go
internal/querycoord/task.go
+185
-190
internal/querycoord/task_scheduler.go
internal/querycoord/task_scheduler.go
+18
-18
internal/querycoord/task_scheduler_test.go
internal/querycoord/task_scheduler_test.go
+27
-27
internal/querycoord/task_test.go
internal/querycoord/task_test.go
+25
-25
未找到文件。
internal/querycoord/impl.go
浏览文件 @
d6f95f49
...
...
@@ -144,8 +144,8 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
}
baseTask
:=
newBaseTask
(
qc
.
loopCtx
,
querypb
.
TriggerCondition_grpcRequest
)
loadCollectionTask
:=
&
L
oadCollectionTask
{
B
aseTask
:
baseTask
,
loadCollectionTask
:=
&
l
oadCollectionTask
{
b
aseTask
:
baseTask
,
LoadCollectionRequest
:
req
,
rootCoord
:
qc
.
rootCoordClient
,
dataCoord
:
qc
.
dataCoordClient
,
...
...
@@ -193,8 +193,8 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
}
baseTask
:=
newBaseTask
(
qc
.
loopCtx
,
querypb
.
TriggerCondition_grpcRequest
)
releaseCollectionTask
:=
&
R
eleaseCollectionTask
{
B
aseTask
:
baseTask
,
releaseCollectionTask
:=
&
r
eleaseCollectionTask
{
b
aseTask
:
baseTask
,
ReleaseCollectionRequest
:
req
,
cluster
:
qc
.
cluster
,
meta
:
qc
.
meta
,
...
...
@@ -336,8 +336,8 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
}
baseTask
:=
newBaseTask
(
qc
.
loopCtx
,
querypb
.
TriggerCondition_grpcRequest
)
loadPartitionTask
:=
&
L
oadPartitionTask
{
B
aseTask
:
baseTask
,
loadPartitionTask
:=
&
l
oadPartitionTask
{
b
aseTask
:
baseTask
,
LoadPartitionsRequest
:
req
,
dataCoord
:
qc
.
dataCoordClient
,
cluster
:
qc
.
cluster
,
...
...
@@ -407,8 +407,8 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
req
.
PartitionIDs
=
toReleasedPartitions
baseTask
:=
newBaseTask
(
qc
.
loopCtx
,
querypb
.
TriggerCondition_grpcRequest
)
releasePartitionTask
:=
&
R
eleasePartitionTask
{
B
aseTask
:
baseTask
,
releasePartitionTask
:=
&
r
eleasePartitionTask
{
b
aseTask
:
baseTask
,
ReleasePartitionsRequest
:
req
,
cluster
:
qc
.
cluster
,
}
...
...
internal/querycoord/impl_test.go
浏览文件 @
d6f95f49
...
...
@@ -452,8 +452,8 @@ func TestLoadBalanceTask(t *testing.T) {
BalanceReason
:
querypb
.
TriggerCondition_nodeDown
,
}
loadBalanceTask
:=
&
L
oadBalanceTask
{
BaseTask
:
&
B
aseTask
{
loadBalanceTask
:=
&
l
oadBalanceTask
{
baseTask
:
&
b
aseTask
{
ctx
:
baseCtx
,
Condition
:
NewTaskCondition
(
baseCtx
),
triggerCondition
:
querypb
.
TriggerCondition_nodeDown
,
...
...
internal/querycoord/query_coord.go
浏览文件 @
d6f95f49
...
...
@@ -238,8 +238,8 @@ func (qc *QueryCoord) watchNodeLoop() {
}
baseTask
:=
newBaseTask
(
qc
.
loopCtx
,
querypb
.
TriggerCondition_nodeDown
)
loadBalanceTask
:=
&
L
oadBalanceTask
{
B
aseTask
:
baseTask
,
loadBalanceTask
:=
&
l
oadBalanceTask
{
b
aseTask
:
baseTask
,
LoadBalanceRequest
:
loadBalanceSegment
,
rootCoord
:
qc
.
rootCoordClient
,
dataCoord
:
qc
.
dataCoordClient
,
...
...
@@ -289,8 +289,8 @@ func (qc *QueryCoord) watchNodeLoop() {
}
baseTask
:=
newBaseTask
(
qc
.
loopCtx
,
querypb
.
TriggerCondition_nodeDown
)
loadBalanceTask
:=
&
L
oadBalanceTask
{
B
aseTask
:
baseTask
,
loadBalanceTask
:=
&
l
oadBalanceTask
{
b
aseTask
:
baseTask
,
LoadBalanceRequest
:
loadBalanceSegment
,
rootCoord
:
qc
.
rootCoordClient
,
dataCoord
:
qc
.
dataCoordClient
,
...
...
internal/querycoord/task.go
浏览文件 @
d6f95f49
...
...
@@ -86,7 +86,7 @@ type task interface {
updateTaskProcess
()
}
type
B
aseTask
struct
{
type
b
aseTask
struct
{
Condition
ctx
context
.
Context
cancel
context
.
CancelFunc
...
...
@@ -104,11 +104,11 @@ type BaseTask struct {
childTasksMu
sync
.
RWMutex
}
func
newBaseTask
(
ctx
context
.
Context
,
triggerType
querypb
.
TriggerCondition
)
*
B
aseTask
{
func
newBaseTask
(
ctx
context
.
Context
,
triggerType
querypb
.
TriggerCondition
)
*
b
aseTask
{
childCtx
,
cancel
:=
context
.
WithCancel
(
ctx
)
condition
:=
NewTaskCondition
(
childCtx
)
baseTask
:=
&
B
aseTask
{
baseTask
:=
&
b
aseTask
{
ctx
:
childCtx
,
cancel
:
cancel
,
Condition
:
condition
,
...
...
@@ -122,52 +122,52 @@ func newBaseTask(ctx context.Context, triggerType querypb.TriggerCondition) *Bas
}
// getTaskID function returns the unique taskID of the trigger task
func
(
bt
*
B
aseTask
)
getTaskID
()
UniqueID
{
func
(
bt
*
b
aseTask
)
getTaskID
()
UniqueID
{
return
bt
.
taskID
}
// setTaskID function sets the trigger task with a unique id, which is allocated by tso
func
(
bt
*
B
aseTask
)
setTaskID
(
id
UniqueID
)
{
func
(
bt
*
b
aseTask
)
setTaskID
(
id
UniqueID
)
{
bt
.
taskID
=
id
}
func
(
bt
*
B
aseTask
)
traceCtx
()
context
.
Context
{
func
(
bt
*
b
aseTask
)
traceCtx
()
context
.
Context
{
return
bt
.
ctx
}
func
(
bt
*
B
aseTask
)
getTriggerCondition
()
querypb
.
TriggerCondition
{
func
(
bt
*
b
aseTask
)
getTriggerCondition
()
querypb
.
TriggerCondition
{
return
bt
.
triggerCondition
}
func
(
bt
*
B
aseTask
)
taskPriority
()
querypb
.
TriggerCondition
{
func
(
bt
*
b
aseTask
)
taskPriority
()
querypb
.
TriggerCondition
{
return
bt
.
triggerCondition
}
func
(
bt
*
B
aseTask
)
setParentTask
(
t
task
)
{
func
(
bt
*
b
aseTask
)
setParentTask
(
t
task
)
{
bt
.
parentTask
=
t
}
func
(
bt
*
B
aseTask
)
getParentTask
()
task
{
func
(
bt
*
b
aseTask
)
getParentTask
()
task
{
return
bt
.
parentTask
}
// GetChildTask function returns all the child tasks of the trigger task
// Child task may be loadSegmentTask, watchDmChannelTask or watchQueryChannelTask
func
(
bt
*
B
aseTask
)
getChildTask
()
[]
task
{
func
(
bt
*
b
aseTask
)
getChildTask
()
[]
task
{
bt
.
childTasksMu
.
RLock
()
defer
bt
.
childTasksMu
.
RUnlock
()
return
bt
.
childTasks
}
func
(
bt
*
B
aseTask
)
addChildTask
(
t
task
)
{
func
(
bt
*
b
aseTask
)
addChildTask
(
t
task
)
{
bt
.
childTasksMu
.
Lock
()
defer
bt
.
childTasksMu
.
Unlock
()
bt
.
childTasks
=
append
(
bt
.
childTasks
,
t
)
}
func
(
bt
*
B
aseTask
)
removeChildTaskByID
(
taskID
UniqueID
)
{
func
(
bt
*
b
aseTask
)
removeChildTaskByID
(
taskID
UniqueID
)
{
bt
.
childTasksMu
.
Lock
()
defer
bt
.
childTasksMu
.
Unlock
()
...
...
@@ -180,32 +180,32 @@ func (bt *BaseTask) removeChildTaskByID(taskID UniqueID) {
bt
.
childTasks
=
result
}
func
(
bt
*
B
aseTask
)
isValid
()
bool
{
func
(
bt
*
b
aseTask
)
isValid
()
bool
{
return
true
}
func
(
bt
*
B
aseTask
)
reschedule
(
ctx
context
.
Context
)
([]
task
,
error
)
{
func
(
bt
*
b
aseTask
)
reschedule
(
ctx
context
.
Context
)
([]
task
,
error
)
{
return
nil
,
nil
}
// State returns the state of task, such as taskUndo, taskDoing, taskDone, taskExpired, taskFailed
func
(
bt
*
B
aseTask
)
getState
()
taskState
{
func
(
bt
*
b
aseTask
)
getState
()
taskState
{
bt
.
stateMu
.
RLock
()
defer
bt
.
stateMu
.
RUnlock
()
return
bt
.
state
}
func
(
bt
*
B
aseTask
)
setState
(
state
taskState
)
{
func
(
bt
*
b
aseTask
)
setState
(
state
taskState
)
{
bt
.
stateMu
.
Lock
()
defer
bt
.
stateMu
.
Unlock
()
bt
.
state
=
state
}
func
(
bt
*
B
aseTask
)
isRetryable
()
bool
{
func
(
bt
*
b
aseTask
)
isRetryable
()
bool
{
return
bt
.
retryCount
>
0
}
func
(
bt
*
B
aseTask
)
setResultInfo
(
err
error
)
{
func
(
bt
*
b
aseTask
)
setResultInfo
(
err
error
)
{
bt
.
resultMu
.
Lock
()
defer
bt
.
resultMu
.
Unlock
()
...
...
@@ -222,25 +222,23 @@ func (bt *BaseTask) setResultInfo(err error) {
bt
.
result
.
Reason
=
bt
.
result
.
Reason
+
", "
+
err
.
Error
()
}
func
(
bt
*
B
aseTask
)
getResultInfo
()
*
commonpb
.
Status
{
func
(
bt
*
b
aseTask
)
getResultInfo
()
*
commonpb
.
Status
{
bt
.
resultMu
.
RLock
()
defer
bt
.
resultMu
.
RUnlock
()
return
proto
.
Clone
(
bt
.
result
)
.
(
*
commonpb
.
Status
)
}
func
(
bt
*
B
aseTask
)
updateTaskProcess
()
{
func
(
bt
*
b
aseTask
)
updateTaskProcess
()
{
// TODO::
}
func
(
bt
*
B
aseTask
)
rollBack
(
ctx
context
.
Context
)
[]
task
{
func
(
bt
*
b
aseTask
)
rollBack
(
ctx
context
.
Context
)
[]
task
{
//TODO::
return
nil
}
//************************grpcTask***************************//
// LoadCollectionTask will load all the data of this collection to query nodes
type
LoadCollectionTask
struct
{
*
BaseTask
type
loadCollectionTask
struct
{
*
baseTask
*
querypb
.
LoadCollectionRequest
rootCoord
types
.
RootCoord
dataCoord
types
.
DataCoord
...
...
@@ -248,23 +246,23 @@ type LoadCollectionTask struct {
meta
Meta
}
func
(
lct
*
L
oadCollectionTask
)
msgBase
()
*
commonpb
.
MsgBase
{
func
(
lct
*
l
oadCollectionTask
)
msgBase
()
*
commonpb
.
MsgBase
{
return
lct
.
Base
}
func
(
lct
*
L
oadCollectionTask
)
marshal
()
([]
byte
,
error
)
{
func
(
lct
*
l
oadCollectionTask
)
marshal
()
([]
byte
,
error
)
{
return
proto
.
Marshal
(
lct
.
LoadCollectionRequest
)
}
func
(
lct
*
L
oadCollectionTask
)
msgType
()
commonpb
.
MsgType
{
func
(
lct
*
l
oadCollectionTask
)
msgType
()
commonpb
.
MsgType
{
return
lct
.
Base
.
MsgType
}
func
(
lct
*
L
oadCollectionTask
)
timestamp
()
Timestamp
{
func
(
lct
*
l
oadCollectionTask
)
timestamp
()
Timestamp
{
return
lct
.
Base
.
Timestamp
}
func
(
lct
*
L
oadCollectionTask
)
updateTaskProcess
()
{
func
(
lct
*
l
oadCollectionTask
)
updateTaskProcess
()
{
collectionID
:=
lct
.
CollectionID
childTasks
:=
lct
.
getChildTask
()
allDone
:=
true
...
...
@@ -282,18 +280,18 @@ func (lct *LoadCollectionTask) updateTaskProcess() {
}
}
func
(
lct
*
L
oadCollectionTask
)
preExecute
(
ctx
context
.
Context
)
error
{
func
(
lct
*
l
oadCollectionTask
)
preExecute
(
ctx
context
.
Context
)
error
{
collectionID
:=
lct
.
CollectionID
schema
:=
lct
.
Schema
lct
.
setResultInfo
(
nil
)
log
.
Debug
(
"start do
L
oadCollectionTask"
,
log
.
Debug
(
"start do
l
oadCollectionTask"
,
zap
.
Int64
(
"msgID"
,
lct
.
getTaskID
()),
zap
.
Int64
(
"collectionID"
,
collectionID
),
zap
.
Stringer
(
"schema"
,
schema
))
return
nil
}
func
(
lct
*
L
oadCollectionTask
)
execute
(
ctx
context
.
Context
)
error
{
func
(
lct
*
l
oadCollectionTask
)
execute
(
ctx
context
.
Context
)
error
{
defer
func
()
{
lct
.
retryCount
--
}()
...
...
@@ -439,23 +437,23 @@ func (lct *LoadCollectionTask) execute(ctx context.Context) error {
return
nil
}
func
(
lct
*
L
oadCollectionTask
)
postExecute
(
ctx
context
.
Context
)
error
{
func
(
lct
*
l
oadCollectionTask
)
postExecute
(
ctx
context
.
Context
)
error
{
collectionID
:=
lct
.
CollectionID
if
lct
.
result
.
ErrorCode
!=
commonpb
.
ErrorCode_Success
{
lct
.
childTasks
=
[]
task
{}
err
:=
lct
.
meta
.
releaseCollection
(
collectionID
)
if
err
!=
nil
{
log
.
Error
(
"
L
oadCollectionTask: occur error when release collection info from meta"
,
zap
.
Error
(
err
))
log
.
Error
(
"
l
oadCollectionTask: occur error when release collection info from meta"
,
zap
.
Error
(
err
))
}
}
log
.
Debug
(
"
L
oadCollectionTask postExecute done"
,
log
.
Debug
(
"
l
oadCollectionTask postExecute done"
,
zap
.
Int64
(
"msgID"
,
lct
.
getTaskID
()),
zap
.
Int64
(
"collectionID"
,
collectionID
))
return
nil
}
func
(
lct
*
L
oadCollectionTask
)
rollBack
(
ctx
context
.
Context
)
[]
task
{
func
(
lct
*
l
oadCollectionTask
)
rollBack
(
ctx
context
.
Context
)
[]
task
{
nodes
,
_
:=
lct
.
cluster
.
onlineNodes
()
resultTasks
:=
make
([]
task
,
0
)
//TODO::call rootCoord.ReleaseDQLMessageStream
...
...
@@ -474,8 +472,8 @@ func (lct *LoadCollectionTask) rollBack(ctx context.Context) []task {
}
baseTask
:=
newBaseTask
(
ctx
,
querypb
.
TriggerCondition_grpcRequest
)
baseTask
.
setParentTask
(
lct
)
releaseCollectionTask
:=
&
R
eleaseCollectionTask
{
B
aseTask
:
baseTask
,
releaseCollectionTask
:=
&
r
eleaseCollectionTask
{
b
aseTask
:
baseTask
,
ReleaseCollectionRequest
:
req
,
cluster
:
lct
.
cluster
,
}
...
...
@@ -485,41 +483,41 @@ func (lct *LoadCollectionTask) rollBack(ctx context.Context) []task {
return
resultTasks
}
//
R
eleaseCollectionTask will release all the data of this collection on query nodes
type
R
eleaseCollectionTask
struct
{
*
B
aseTask
//
r
eleaseCollectionTask will release all the data of this collection on query nodes
type
r
eleaseCollectionTask
struct
{
*
b
aseTask
*
querypb
.
ReleaseCollectionRequest
cluster
Cluster
meta
Meta
rootCoord
types
.
RootCoord
}
func
(
rct
*
R
eleaseCollectionTask
)
msgBase
()
*
commonpb
.
MsgBase
{
func
(
rct
*
r
eleaseCollectionTask
)
msgBase
()
*
commonpb
.
MsgBase
{
return
rct
.
Base
}
func
(
rct
*
R
eleaseCollectionTask
)
marshal
()
([]
byte
,
error
)
{
func
(
rct
*
r
eleaseCollectionTask
)
marshal
()
([]
byte
,
error
)
{
return
proto
.
Marshal
(
rct
.
ReleaseCollectionRequest
)
}
func
(
rct
*
R
eleaseCollectionTask
)
msgType
()
commonpb
.
MsgType
{
func
(
rct
*
r
eleaseCollectionTask
)
msgType
()
commonpb
.
MsgType
{
return
rct
.
Base
.
MsgType
}
func
(
rct
*
R
eleaseCollectionTask
)
timestamp
()
Timestamp
{
func
(
rct
*
r
eleaseCollectionTask
)
timestamp
()
Timestamp
{
return
rct
.
Base
.
Timestamp
}
func
(
rct
*
R
eleaseCollectionTask
)
preExecute
(
context
.
Context
)
error
{
func
(
rct
*
r
eleaseCollectionTask
)
preExecute
(
context
.
Context
)
error
{
collectionID
:=
rct
.
CollectionID
rct
.
setResultInfo
(
nil
)
log
.
Debug
(
"start do
R
eleaseCollectionTask"
,
log
.
Debug
(
"start do
r
eleaseCollectionTask"
,
zap
.
Int64
(
"msgID"
,
rct
.
getTaskID
()),
zap
.
Int64
(
"collectionID"
,
collectionID
))
return
nil
}
func
(
rct
*
R
eleaseCollectionTask
)
execute
(
ctx
context
.
Context
)
error
{
func
(
rct
*
r
eleaseCollectionTask
)
execute
(
ctx
context
.
Context
)
error
{
defer
func
()
{
rct
.
retryCount
--
}()
...
...
@@ -540,7 +538,7 @@ func (rct *ReleaseCollectionTask) execute(ctx context.Context) error {
}
res
,
err
:=
rct
.
rootCoord
.
ReleaseDQLMessageStream
(
rct
.
ctx
,
releaseDQLMessageStreamReq
)
if
res
.
ErrorCode
!=
commonpb
.
ErrorCode_Success
||
err
!=
nil
{
log
.
Warn
(
"
R
eleaseCollectionTask: release collection end, releaseDQLMessageStream occur error"
,
zap
.
Int64
(
"collectionID"
,
rct
.
CollectionID
))
log
.
Warn
(
"
r
eleaseCollectionTask: release collection end, releaseDQLMessageStream occur error"
,
zap
.
Int64
(
"collectionID"
,
rct
.
CollectionID
))
err
=
errors
.
New
(
"rootCoord releaseDQLMessageStream failed"
)
rct
.
setResultInfo
(
err
)
return
err
...
...
@@ -555,54 +553,54 @@ func (rct *ReleaseCollectionTask) execute(ctx context.Context) error {
req
.
NodeID
=
nodeID
baseTask
:=
newBaseTask
(
ctx
,
querypb
.
TriggerCondition_grpcRequest
)
baseTask
.
setParentTask
(
rct
)
releaseCollectionTask
:=
&
R
eleaseCollectionTask
{
B
aseTask
:
baseTask
,
releaseCollectionTask
:=
&
r
eleaseCollectionTask
{
b
aseTask
:
baseTask
,
ReleaseCollectionRequest
:
req
,
cluster
:
rct
.
cluster
,
}
rct
.
addChildTask
(
releaseCollectionTask
)
log
.
Debug
(
"
R
eleaseCollectionTask: add a releaseCollectionTask to releaseCollectionTask's childTask"
,
zap
.
Any
(
"task"
,
releaseCollectionTask
))
log
.
Debug
(
"
r
eleaseCollectionTask: add a releaseCollectionTask to releaseCollectionTask's childTask"
,
zap
.
Any
(
"task"
,
releaseCollectionTask
))
}
}
else
{
err
:=
rct
.
cluster
.
releaseCollection
(
ctx
,
rct
.
NodeID
,
rct
.
ReleaseCollectionRequest
)
if
err
!=
nil
{
log
.
Warn
(
"
R
eleaseCollectionTask: release collection end, node occur error"
,
zap
.
Int64
(
"nodeID"
,
rct
.
NodeID
))
log
.
Warn
(
"
r
eleaseCollectionTask: release collection end, node occur error"
,
zap
.
Int64
(
"nodeID"
,
rct
.
NodeID
))
rct
.
setResultInfo
(
err
)
return
err
}
}
log
.
Debug
(
"
R
eleaseCollectionTask Execute done"
,
log
.
Debug
(
"
r
eleaseCollectionTask Execute done"
,
zap
.
Int64
(
"msgID"
,
rct
.
getTaskID
()),
zap
.
Int64
(
"collectionID"
,
collectionID
),
zap
.
Int64
(
"nodeID"
,
rct
.
NodeID
))
return
nil
}
func
(
rct
*
R
eleaseCollectionTask
)
postExecute
(
context
.
Context
)
error
{
func
(
rct
*
r
eleaseCollectionTask
)
postExecute
(
context
.
Context
)
error
{
collectionID
:=
rct
.
CollectionID
if
rct
.
result
.
ErrorCode
!=
commonpb
.
ErrorCode_Success
{
rct
.
childTasks
=
[]
task
{}
}
log
.
Debug
(
"
R
eleaseCollectionTask postExecute done"
,
log
.
Debug
(
"
r
eleaseCollectionTask postExecute done"
,
zap
.
Int64
(
"msgID"
,
rct
.
getTaskID
()),
zap
.
Int64
(
"collectionID"
,
collectionID
),
zap
.
Int64
(
"nodeID"
,
rct
.
NodeID
))
return
nil
}
func
(
rct
*
R
eleaseCollectionTask
)
rollBack
(
ctx
context
.
Context
)
[]
task
{
func
(
rct
*
r
eleaseCollectionTask
)
rollBack
(
ctx
context
.
Context
)
[]
task
{
//TODO::
//if taskID == 0, recovery meta
//if taskID != 0, recovery collection on queryNode
return
nil
}
//
L
oadPartitionTask will load all the data of this partition to query nodes
type
L
oadPartitionTask
struct
{
*
B
aseTask
//
l
oadPartitionTask will load all the data of this partition to query nodes
type
l
oadPartitionTask
struct
{
*
b
aseTask
*
querypb
.
LoadPartitionsRequest
dataCoord
types
.
DataCoord
cluster
Cluster
...
...
@@ -610,23 +608,23 @@ type LoadPartitionTask struct {
addCol
bool
}
func
(
lpt
*
L
oadPartitionTask
)
msgBase
()
*
commonpb
.
MsgBase
{
func
(
lpt
*
l
oadPartitionTask
)
msgBase
()
*
commonpb
.
MsgBase
{
return
lpt
.
Base
}
func
(
lpt
*
L
oadPartitionTask
)
marshal
()
([]
byte
,
error
)
{
func
(
lpt
*
l
oadPartitionTask
)
marshal
()
([]
byte
,
error
)
{
return
proto
.
Marshal
(
lpt
.
LoadPartitionsRequest
)
}
func
(
lpt
*
L
oadPartitionTask
)
msgType
()
commonpb
.
MsgType
{
func
(
lpt
*
l
oadPartitionTask
)
msgType
()
commonpb
.
MsgType
{
return
lpt
.
Base
.
MsgType
}
func
(
lpt
*
L
oadPartitionTask
)
timestamp
()
Timestamp
{
func
(
lpt
*
l
oadPartitionTask
)
timestamp
()
Timestamp
{
return
lpt
.
Base
.
Timestamp
}
func
(
lpt
*
L
oadPartitionTask
)
updateTaskProcess
()
{
func
(
lpt
*
l
oadPartitionTask
)
updateTaskProcess
()
{
collectionID
:=
lpt
.
CollectionID
partitionIDs
:=
lpt
.
PartitionIDs
childTasks
:=
lpt
.
getChildTask
()
...
...
@@ -647,16 +645,16 @@ func (lpt *LoadPartitionTask) updateTaskProcess() {
}
}
func
(
lpt
*
L
oadPartitionTask
)
preExecute
(
context
.
Context
)
error
{
func
(
lpt
*
l
oadPartitionTask
)
preExecute
(
context
.
Context
)
error
{
collectionID
:=
lpt
.
CollectionID
lpt
.
setResultInfo
(
nil
)
log
.
Debug
(
"start do
L
oadPartitionTask"
,
log
.
Debug
(
"start do
l
oadPartitionTask"
,
zap
.
Int64
(
"msgID"
,
lpt
.
getTaskID
()),
zap
.
Int64
(
"collectionID"
,
collectionID
))
return
nil
}
func
(
lpt
*
L
oadPartitionTask
)
execute
(
ctx
context
.
Context
)
error
{
func
(
lpt
*
l
oadPartitionTask
)
execute
(
ctx
context
.
Context
)
error
{
defer
func
()
{
lpt
.
retryCount
--
}()
...
...
@@ -722,25 +720,25 @@ func (lpt *LoadPartitionTask) execute(ctx context.Context) error {
}
channelsToWatch
=
append
(
channelsToWatch
,
channel
)
watchDmReqs
=
append
(
watchDmReqs
,
watchDmRequest
)
log
.
Debug
(
"
L
oadPartitionTask: set watchDmChannelsRequests"
,
zap
.
Any
(
"request"
,
watchDmRequest
),
zap
.
Int64
(
"collectionID"
,
collectionID
))
log
.
Debug
(
"
l
oadPartitionTask: set watchDmChannelsRequests"
,
zap
.
Any
(
"request"
,
watchDmRequest
),
zap
.
Int64
(
"collectionID"
,
collectionID
))
}
}
err
:=
assignInternalTask
(
ctx
,
collectionID
,
lpt
,
lpt
.
meta
,
lpt
.
cluster
,
loadSegmentReqs
,
watchDmReqs
,
false
)
if
err
!=
nil
{
log
.
Warn
(
"
L
oadPartitionTask: assign child task failed"
,
zap
.
Int64
(
"collectionID"
,
collectionID
),
zap
.
Int64s
(
"partitionIDs"
,
partitionIDs
))
log
.
Warn
(
"
l
oadPartitionTask: assign child task failed"
,
zap
.
Int64
(
"collectionID"
,
collectionID
),
zap
.
Int64s
(
"partitionIDs"
,
partitionIDs
))
lpt
.
setResultInfo
(
err
)
return
err
}
log
.
Debug
(
"
L
oadPartitionTask: assign child task done"
,
zap
.
Int64
(
"collectionID"
,
collectionID
),
zap
.
Int64s
(
"partitionIDs"
,
partitionIDs
))
log
.
Debug
(
"
l
oadPartitionTask: assign child task done"
,
zap
.
Int64
(
"collectionID"
,
collectionID
),
zap
.
Int64s
(
"partitionIDs"
,
partitionIDs
))
log
.
Debug
(
"
L
oadPartitionTask Execute done"
,
log
.
Debug
(
"
l
oadPartitionTask Execute done"
,
zap
.
Int64
(
"msgID"
,
lpt
.
getTaskID
()),
zap
.
Int64
(
"collectionID"
,
collectionID
),
zap
.
Int64s
(
"partitionIDs"
,
partitionIDs
))
return
nil
}
func
(
lpt
*
L
oadPartitionTask
)
postExecute
(
ctx
context
.
Context
)
error
{
func
(
lpt
*
l
oadPartitionTask
)
postExecute
(
ctx
context
.
Context
)
error
{
collectionID
:=
lpt
.
CollectionID
partitionIDs
:=
lpt
.
PartitionIDs
if
lpt
.
result
.
ErrorCode
!=
commonpb
.
ErrorCode_Success
{
...
...
@@ -748,26 +746,26 @@ func (lpt *LoadPartitionTask) postExecute(ctx context.Context) error {
if
lpt
.
addCol
{
err
:=
lpt
.
meta
.
releaseCollection
(
collectionID
)
if
err
!=
nil
{
log
.
Error
(
"
L
oadPartitionTask: occur error when release collection info from meta"
,
zap
.
Error
(
err
))
log
.
Error
(
"
l
oadPartitionTask: occur error when release collection info from meta"
,
zap
.
Error
(
err
))
}
}
else
{
for
_
,
partitionID
:=
range
partitionIDs
{
err
:=
lpt
.
meta
.
releasePartition
(
collectionID
,
partitionID
)
if
err
!=
nil
{
log
.
Error
(
"
L
oadPartitionTask: occur error when release partition info from meta"
,
zap
.
Error
(
err
))
log
.
Error
(
"
l
oadPartitionTask: occur error when release partition info from meta"
,
zap
.
Error
(
err
))
}
}
}
}
log
.
Debug
(
"
L
oadPartitionTask postExecute done"
,
log
.
Debug
(
"
l
oadPartitionTask postExecute done"
,
zap
.
Int64
(
"msgID"
,
lpt
.
getTaskID
()),
zap
.
Int64
(
"collectionID"
,
collectionID
),
zap
.
Int64s
(
"partitionIDs"
,
partitionIDs
))
return
nil
}
func
(
lpt
*
L
oadPartitionTask
)
rollBack
(
ctx
context
.
Context
)
[]
task
{
func
(
lpt
*
l
oadPartitionTask
)
rollBack
(
ctx
context
.
Context
)
[]
task
{
partitionIDs
:=
lpt
.
PartitionIDs
resultTasks
:=
make
([]
task
,
0
)
//brute force rollBack, should optimize
...
...
@@ -787,8 +785,8 @@ func (lpt *LoadPartitionTask) rollBack(ctx context.Context) []task {
}
baseTask
:=
newBaseTask
(
ctx
,
querypb
.
TriggerCondition_grpcRequest
)
baseTask
.
setParentTask
(
lpt
)
releaseCollectionTask
:=
&
R
eleaseCollectionTask
{
B
aseTask
:
baseTask
,
releaseCollectionTask
:=
&
r
eleaseCollectionTask
{
b
aseTask
:
baseTask
,
ReleaseCollectionRequest
:
req
,
cluster
:
lpt
.
cluster
,
}
...
...
@@ -812,8 +810,8 @@ func (lpt *LoadPartitionTask) rollBack(ctx context.Context) []task {
baseTask
:=
newBaseTask
(
ctx
,
querypb
.
TriggerCondition_grpcRequest
)
baseTask
.
setParentTask
(
lpt
)
releasePartitionTask
:=
&
R
eleasePartitionTask
{
B
aseTask
:
baseTask
,
releasePartitionTask
:=
&
r
eleasePartitionTask
{
b
aseTask
:
baseTask
,
ReleasePartitionsRequest
:
req
,
cluster
:
lpt
.
cluster
,
}
...
...
@@ -824,30 +822,30 @@ func (lpt *LoadPartitionTask) rollBack(ctx context.Context) []task {
return
resultTasks
}
//
R
eleasePartitionTask will release all the data of this partition on query nodes
type
R
eleasePartitionTask
struct
{
*
B
aseTask
//
r
eleasePartitionTask will release all the data of this partition on query nodes
type
r
eleasePartitionTask
struct
{
*
b
aseTask
*
querypb
.
ReleasePartitionsRequest
cluster
Cluster
}
func
(
rpt
*
R
eleasePartitionTask
)
msgBase
()
*
commonpb
.
MsgBase
{
func
(
rpt
*
r
eleasePartitionTask
)
msgBase
()
*
commonpb
.
MsgBase
{
return
rpt
.
Base
}
func
(
rpt
*
R
eleasePartitionTask
)
marshal
()
([]
byte
,
error
)
{
func
(
rpt
*
r
eleasePartitionTask
)
marshal
()
([]
byte
,
error
)
{
return
proto
.
Marshal
(
rpt
.
ReleasePartitionsRequest
)
}
func
(
rpt
*
R
eleasePartitionTask
)
msgType
()
commonpb
.
MsgType
{
func
(
rpt
*
r
eleasePartitionTask
)
msgType
()
commonpb
.
MsgType
{
return
rpt
.
Base
.
MsgType
}
func
(
rpt
*
R
eleasePartitionTask
)
timestamp
()
Timestamp
{
func
(
rpt
*
r
eleasePartitionTask
)
timestamp
()
Timestamp
{
return
rpt
.
Base
.
Timestamp
}
func
(
rpt
*
R
eleasePartitionTask
)
preExecute
(
context
.
Context
)
error
{
func
(
rpt
*
r
eleasePartitionTask
)
preExecute
(
context
.
Context
)
error
{
collectionID
:=
rpt
.
CollectionID
rpt
.
setResultInfo
(
nil
)
log
.
Debug
(
"start do releasePartitionTask"
,
...
...
@@ -856,7 +854,7 @@ func (rpt *ReleasePartitionTask) preExecute(context.Context) error {
return
nil
}
func
(
rpt
*
R
eleasePartitionTask
)
execute
(
ctx
context
.
Context
)
error
{
func
(
rpt
*
r
eleasePartitionTask
)
execute
(
ctx
context
.
Context
)
error
{
defer
func
()
{
rpt
.
retryCount
--
}()
...
...
@@ -874,13 +872,13 @@ func (rpt *ReleasePartitionTask) execute(ctx context.Context) error {
req
.
NodeID
=
nodeID
baseTask
:=
newBaseTask
(
ctx
,
querypb
.
TriggerCondition_grpcRequest
)
baseTask
.
setParentTask
(
rpt
)
releasePartitionTask
:=
&
R
eleasePartitionTask
{
B
aseTask
:
baseTask
,
releasePartitionTask
:=
&
r
eleasePartitionTask
{
b
aseTask
:
baseTask
,
ReleasePartitionsRequest
:
req
,
cluster
:
rpt
.
cluster
,
}
rpt
.
addChildTask
(
releasePartitionTask
)
log
.
Debug
(
"
R
eleasePartitionTask: add a releasePartitionTask to releasePartitionTask's childTask"
,
zap
.
Any
(
"task"
,
releasePartitionTask
))
log
.
Debug
(
"
r
eleasePartitionTask: add a releasePartitionTask to releasePartitionTask's childTask"
,
zap
.
Any
(
"task"
,
releasePartitionTask
))
}
}
else
{
err
:=
rpt
.
cluster
.
releasePartitions
(
ctx
,
rpt
.
NodeID
,
rpt
.
ReleasePartitionsRequest
)
...
...
@@ -891,7 +889,7 @@ func (rpt *ReleasePartitionTask) execute(ctx context.Context) error {
}
}
log
.
Debug
(
"
R
eleasePartitionTask Execute done"
,
log
.
Debug
(
"
r
eleasePartitionTask Execute done"
,
zap
.
Int64
(
"msgID"
,
rpt
.
getTaskID
()),
zap
.
Int64
(
"collectionID"
,
collectionID
),
zap
.
Int64s
(
"partitionIDs"
,
partitionIDs
),
...
...
@@ -899,14 +897,14 @@ func (rpt *ReleasePartitionTask) execute(ctx context.Context) error {
return
nil
}
func
(
rpt
*
R
eleasePartitionTask
)
postExecute
(
context
.
Context
)
error
{
func
(
rpt
*
r
eleasePartitionTask
)
postExecute
(
context
.
Context
)
error
{
collectionID
:=
rpt
.
CollectionID
partitionIDs
:=
rpt
.
PartitionIDs
if
rpt
.
result
.
ErrorCode
!=
commonpb
.
ErrorCode_Success
{
rpt
.
childTasks
=
[]
task
{}
}
log
.
Debug
(
"
R
eleasePartitionTask postExecute done"
,
log
.
Debug
(
"
r
eleasePartitionTask postExecute done"
,
zap
.
Int64
(
"msgID"
,
rpt
.
getTaskID
()),
zap
.
Int64
(
"collectionID"
,
collectionID
),
zap
.
Int64s
(
"partitionIDs"
,
partitionIDs
),
...
...
@@ -914,31 +912,30 @@ func (rpt *ReleasePartitionTask) postExecute(context.Context) error {
return
nil
}
func
(
rpt
*
R
eleasePartitionTask
)
rollBack
(
ctx
context
.
Context
)
[]
task
{
func
(
rpt
*
r
eleasePartitionTask
)
rollBack
(
ctx
context
.
Context
)
[]
task
{
//TODO::
//if taskID == 0, recovery meta
//if taskID != 0, recovery partition on queryNode
return
nil
}
//****************************internal task*******************************//
type
LoadSegmentTask
struct
{
*
BaseTask
type
loadSegmentTask
struct
{
*
baseTask
*
querypb
.
LoadSegmentsRequest
meta
Meta
cluster
Cluster
excludeNodeIDs
[]
int64
}
func
(
lst
*
L
oadSegmentTask
)
msgBase
()
*
commonpb
.
MsgBase
{
func
(
lst
*
l
oadSegmentTask
)
msgBase
()
*
commonpb
.
MsgBase
{
return
lst
.
Base
}
func
(
lst
*
L
oadSegmentTask
)
marshal
()
([]
byte
,
error
)
{
func
(
lst
*
l
oadSegmentTask
)
marshal
()
([]
byte
,
error
)
{
return
proto
.
Marshal
(
lst
.
LoadSegmentsRequest
)
}
func
(
lst
*
L
oadSegmentTask
)
isValid
()
bool
{
func
(
lst
*
l
oadSegmentTask
)
isValid
()
bool
{
online
,
err
:=
lst
.
cluster
.
isOnline
(
lst
.
NodeID
)
if
err
!=
nil
{
return
false
...
...
@@ -947,24 +944,24 @@ func (lst *LoadSegmentTask) isValid() bool {
return
lst
.
ctx
!=
nil
&&
online
}
func
(
lst
*
L
oadSegmentTask
)
msgType
()
commonpb
.
MsgType
{
func
(
lst
*
l
oadSegmentTask
)
msgType
()
commonpb
.
MsgType
{
return
lst
.
Base
.
MsgType
}
func
(
lst
*
L
oadSegmentTask
)
timestamp
()
Timestamp
{
func
(
lst
*
l
oadSegmentTask
)
timestamp
()
Timestamp
{
return
lst
.
Base
.
Timestamp
}
func
(
lst
*
L
oadSegmentTask
)
updateTaskProcess
()
{
func
(
lst
*
l
oadSegmentTask
)
updateTaskProcess
()
{
parentTask
:=
lst
.
getParentTask
()
if
parentTask
==
nil
{
log
.
Warn
(
"
L
oadSegmentTask: parentTask should not be nil"
)
log
.
Warn
(
"
l
oadSegmentTask: parentTask should not be nil"
)
return
}
parentTask
.
updateTaskProcess
()
}
func
(
lst
*
L
oadSegmentTask
)
preExecute
(
context
.
Context
)
error
{
func
(
lst
*
l
oadSegmentTask
)
preExecute
(
context
.
Context
)
error
{
segmentIDs
:=
make
([]
UniqueID
,
0
)
for
_
,
info
:=
range
lst
.
Infos
{
segmentIDs
=
append
(
segmentIDs
,
info
.
SegmentID
)
...
...
@@ -977,14 +974,14 @@ func (lst *LoadSegmentTask) preExecute(context.Context) error {
return
nil
}
func
(
lst
*
L
oadSegmentTask
)
execute
(
ctx
context
.
Context
)
error
{
func
(
lst
*
l
oadSegmentTask
)
execute
(
ctx
context
.
Context
)
error
{
defer
func
()
{
lst
.
retryCount
--
}()
err
:=
lst
.
cluster
.
loadSegments
(
ctx
,
lst
.
NodeID
,
lst
.
LoadSegmentsRequest
)
if
err
!=
nil
{
log
.
Warn
(
"
L
oadSegmentTask: loadSegment occur error"
,
zap
.
Int64
(
"taskID"
,
lst
.
getTaskID
()))
log
.
Warn
(
"
l
oadSegmentTask: loadSegment occur error"
,
zap
.
Int64
(
"taskID"
,
lst
.
getTaskID
()))
lst
.
setResultInfo
(
err
)
return
err
}
...
...
@@ -994,13 +991,13 @@ func (lst *LoadSegmentTask) execute(ctx context.Context) error {
return
nil
}
func
(
lst
*
L
oadSegmentTask
)
postExecute
(
context
.
Context
)
error
{
func
(
lst
*
l
oadSegmentTask
)
postExecute
(
context
.
Context
)
error
{
log
.
Debug
(
"loadSegmentTask postExecute done"
,
zap
.
Int64
(
"taskID"
,
lst
.
getTaskID
()))
return
nil
}
func
(
lst
*
L
oadSegmentTask
)
reschedule
(
ctx
context
.
Context
)
([]
task
,
error
)
{
func
(
lst
*
l
oadSegmentTask
)
reschedule
(
ctx
context
.
Context
)
([]
task
,
error
)
{
segmentIDs
:=
make
([]
UniqueID
,
0
)
collectionID
:=
lst
.
Infos
[
0
]
.
CollectionID
reScheduledTask
:=
make
([]
task
,
0
)
...
...
@@ -1025,8 +1022,8 @@ func (lst *LoadSegmentTask) reschedule(ctx context.Context) ([]task, error) {
for
nodeID
,
infos
:=
range
node2segmentInfos
{
loadSegmentBaseTask
:=
newBaseTask
(
ctx
,
lst
.
getTriggerCondition
())
loadSegmentBaseTask
.
setParentTask
(
lst
.
getParentTask
())
loadSegmentTask
:=
&
L
oadSegmentTask
{
B
aseTask
:
loadSegmentBaseTask
,
loadSegmentTask
:=
&
l
oadSegmentTask
{
b
aseTask
:
loadSegmentBaseTask
,
LoadSegmentsRequest
:
&
querypb
.
LoadSegmentsRequest
{
Base
:
lst
.
Base
,
NodeID
:
nodeID
,
...
...
@@ -1039,7 +1036,7 @@ func (lst *LoadSegmentTask) reschedule(ctx context.Context) ([]task, error) {
excludeNodeIDs
:
lst
.
excludeNodeIDs
,
}
reScheduledTask
=
append
(
reScheduledTask
,
loadSegmentTask
)
log
.
Debug
(
"
L
oadSegmentTask: add a loadSegmentTask to RescheduleTasks"
,
zap
.
Any
(
"task"
,
loadSegmentTask
))
log
.
Debug
(
"
l
oadSegmentTask: add a loadSegmentTask to RescheduleTasks"
,
zap
.
Any
(
"task"
,
loadSegmentTask
))
hasWatchQueryChannel
:=
lst
.
cluster
.
hasWatchedQueryChannel
(
lst
.
ctx
,
nodeID
,
collectionID
)
if
!
hasWatchQueryChannel
{
...
...
@@ -1059,34 +1056,34 @@ func (lst *LoadSegmentTask) reschedule(ctx context.Context) ([]task, error) {
}
watchQueryChannelBaseTask
:=
newBaseTask
(
ctx
,
lst
.
getTriggerCondition
())
watchQueryChannelBaseTask
.
setParentTask
(
lst
.
getParentTask
())
watchQueryChannelTask
:=
&
W
atchQueryChannelTask
{
B
aseTask
:
watchQueryChannelBaseTask
,
watchQueryChannelTask
:=
&
w
atchQueryChannelTask
{
b
aseTask
:
watchQueryChannelBaseTask
,
AddQueryChannelRequest
:
addQueryChannelRequest
,
cluster
:
lst
.
cluster
,
}
reScheduledTask
=
append
(
reScheduledTask
,
watchQueryChannelTask
)
log
.
Debug
(
"
L
oadSegmentTask: add a watchQueryChannelTask to RescheduleTasks"
,
zap
.
Any
(
"task"
,
watchQueryChannelTask
))
log
.
Debug
(
"
l
oadSegmentTask: add a watchQueryChannelTask to RescheduleTasks"
,
zap
.
Any
(
"task"
,
watchQueryChannelTask
))
}
}
return
reScheduledTask
,
nil
}
type
R
eleaseSegmentTask
struct
{
*
B
aseTask
type
r
eleaseSegmentTask
struct
{
*
b
aseTask
*
querypb
.
ReleaseSegmentsRequest
cluster
Cluster
}
func
(
rst
*
R
eleaseSegmentTask
)
msgBase
()
*
commonpb
.
MsgBase
{
func
(
rst
*
r
eleaseSegmentTask
)
msgBase
()
*
commonpb
.
MsgBase
{
return
rst
.
Base
}
func
(
rst
*
R
eleaseSegmentTask
)
marshal
()
([]
byte
,
error
)
{
func
(
rst
*
r
eleaseSegmentTask
)
marshal
()
([]
byte
,
error
)
{
return
proto
.
Marshal
(
rst
.
ReleaseSegmentsRequest
)
}
func
(
rst
*
R
eleaseSegmentTask
)
isValid
()
bool
{
func
(
rst
*
r
eleaseSegmentTask
)
isValid
()
bool
{
online
,
err
:=
rst
.
cluster
.
isOnline
(
rst
.
NodeID
)
if
err
!=
nil
{
return
false
...
...
@@ -1094,15 +1091,15 @@ func (rst *ReleaseSegmentTask) isValid() bool {
return
rst
.
ctx
!=
nil
&&
online
}
func
(
rst
*
R
eleaseSegmentTask
)
msgType
()
commonpb
.
MsgType
{
func
(
rst
*
r
eleaseSegmentTask
)
msgType
()
commonpb
.
MsgType
{
return
rst
.
Base
.
MsgType
}
func
(
rst
*
R
eleaseSegmentTask
)
timestamp
()
Timestamp
{
func
(
rst
*
r
eleaseSegmentTask
)
timestamp
()
Timestamp
{
return
rst
.
Base
.
Timestamp
}
func
(
rst
*
R
eleaseSegmentTask
)
preExecute
(
context
.
Context
)
error
{
func
(
rst
*
r
eleaseSegmentTask
)
preExecute
(
context
.
Context
)
error
{
segmentIDs
:=
rst
.
SegmentIDs
rst
.
setResultInfo
(
nil
)
log
.
Debug
(
"start do releaseSegmentTask"
,
...
...
@@ -1112,14 +1109,14 @@ func (rst *ReleaseSegmentTask) preExecute(context.Context) error {
return
nil
}
func
(
rst
*
R
eleaseSegmentTask
)
execute
(
ctx
context
.
Context
)
error
{
func
(
rst
*
r
eleaseSegmentTask
)
execute
(
ctx
context
.
Context
)
error
{
defer
func
()
{
rst
.
retryCount
--
}()
err
:=
rst
.
cluster
.
releaseSegments
(
rst
.
ctx
,
rst
.
NodeID
,
rst
.
ReleaseSegmentsRequest
)
if
err
!=
nil
{
log
.
Warn
(
"
R
eleaseSegmentTask: releaseSegment occur error"
,
zap
.
Int64
(
"taskID"
,
rst
.
getTaskID
()))
log
.
Warn
(
"
r
eleaseSegmentTask: releaseSegment occur error"
,
zap
.
Int64
(
"taskID"
,
rst
.
getTaskID
()))
rst
.
setResultInfo
(
err
)
return
err
}
...
...
@@ -1130,7 +1127,7 @@ func (rst *ReleaseSegmentTask) execute(ctx context.Context) error {
return
nil
}
func
(
rst
*
R
eleaseSegmentTask
)
postExecute
(
context
.
Context
)
error
{
func
(
rst
*
r
eleaseSegmentTask
)
postExecute
(
context
.
Context
)
error
{
segmentIDs
:=
rst
.
SegmentIDs
log
.
Debug
(
"releaseSegmentTask postExecute done"
,
zap
.
Int64s
(
"segmentIDs"
,
segmentIDs
),
...
...
@@ -1138,23 +1135,23 @@ func (rst *ReleaseSegmentTask) postExecute(context.Context) error {
return
nil
}
type
W
atchDmChannelTask
struct
{
*
B
aseTask
type
w
atchDmChannelTask
struct
{
*
b
aseTask
*
querypb
.
WatchDmChannelsRequest
meta
Meta
cluster
Cluster
excludeNodeIDs
[]
int64
}
func
(
wdt
*
W
atchDmChannelTask
)
msgBase
()
*
commonpb
.
MsgBase
{
func
(
wdt
*
w
atchDmChannelTask
)
msgBase
()
*
commonpb
.
MsgBase
{
return
wdt
.
Base
}
func
(
wdt
*
W
atchDmChannelTask
)
marshal
()
([]
byte
,
error
)
{
func
(
wdt
*
w
atchDmChannelTask
)
marshal
()
([]
byte
,
error
)
{
return
proto
.
Marshal
(
wdt
.
WatchDmChannelsRequest
)
}
func
(
wdt
*
W
atchDmChannelTask
)
isValid
()
bool
{
func
(
wdt
*
w
atchDmChannelTask
)
isValid
()
bool
{
online
,
err
:=
wdt
.
cluster
.
isOnline
(
wdt
.
NodeID
)
if
err
!=
nil
{
return
false
...
...
@@ -1162,24 +1159,24 @@ func (wdt *WatchDmChannelTask) isValid() bool {
return
wdt
.
ctx
!=
nil
&&
online
}
func
(
wdt
*
W
atchDmChannelTask
)
msgType
()
commonpb
.
MsgType
{
func
(
wdt
*
w
atchDmChannelTask
)
msgType
()
commonpb
.
MsgType
{
return
wdt
.
Base
.
MsgType
}
func
(
wdt
*
W
atchDmChannelTask
)
timestamp
()
Timestamp
{
func
(
wdt
*
w
atchDmChannelTask
)
timestamp
()
Timestamp
{
return
wdt
.
Base
.
Timestamp
}
func
(
wdt
*
W
atchDmChannelTask
)
updateTaskProcess
()
{
func
(
wdt
*
w
atchDmChannelTask
)
updateTaskProcess
()
{
parentTask
:=
wdt
.
getParentTask
()
if
parentTask
==
nil
{
log
.
Warn
(
"
W
atchDmChannelTask: parentTask should not be nil"
)
log
.
Warn
(
"
w
atchDmChannelTask: parentTask should not be nil"
)
return
}
parentTask
.
updateTaskProcess
()
}
func
(
wdt
*
W
atchDmChannelTask
)
preExecute
(
context
.
Context
)
error
{
func
(
wdt
*
w
atchDmChannelTask
)
preExecute
(
context
.
Context
)
error
{
channelInfos
:=
wdt
.
Infos
channels
:=
make
([]
string
,
0
)
for
_
,
info
:=
range
channelInfos
{
...
...
@@ -1193,14 +1190,14 @@ func (wdt *WatchDmChannelTask) preExecute(context.Context) error {
return
nil
}
func
(
wdt
*
W
atchDmChannelTask
)
execute
(
ctx
context
.
Context
)
error
{
func
(
wdt
*
w
atchDmChannelTask
)
execute
(
ctx
context
.
Context
)
error
{
defer
func
()
{
wdt
.
retryCount
--
}()
err
:=
wdt
.
cluster
.
watchDmChannels
(
wdt
.
ctx
,
wdt
.
NodeID
,
wdt
.
WatchDmChannelsRequest
)
if
err
!=
nil
{
log
.
Warn
(
"
W
atchDmChannelTask: watchDmChannel occur error"
,
zap
.
Int64
(
"taskID"
,
wdt
.
getTaskID
()))
log
.
Warn
(
"
w
atchDmChannelTask: watchDmChannel occur error"
,
zap
.
Int64
(
"taskID"
,
wdt
.
getTaskID
()))
wdt
.
setResultInfo
(
err
)
return
err
}
...
...
@@ -1210,13 +1207,13 @@ func (wdt *WatchDmChannelTask) execute(ctx context.Context) error {
return
nil
}
func
(
wdt
*
W
atchDmChannelTask
)
postExecute
(
context
.
Context
)
error
{
func
(
wdt
*
w
atchDmChannelTask
)
postExecute
(
context
.
Context
)
error
{
log
.
Debug
(
"watchDmChannelTask postExecute done"
,
zap
.
Int64
(
"taskID"
,
wdt
.
getTaskID
()))
return
nil
}
func
(
wdt
*
W
atchDmChannelTask
)
reschedule
(
ctx
context
.
Context
)
([]
task
,
error
)
{
func
(
wdt
*
w
atchDmChannelTask
)
reschedule
(
ctx
context
.
Context
)
([]
task
,
error
)
{
collectionID
:=
wdt
.
CollectionID
channelIDs
:=
make
([]
string
,
0
)
reScheduledTask
:=
make
([]
task
,
0
)
...
...
@@ -1242,8 +1239,8 @@ func (wdt *WatchDmChannelTask) reschedule(ctx context.Context) ([]task, error) {
for
nodeID
,
infos
:=
range
node2channelInfos
{
watchDmChannelBaseTask
:=
newBaseTask
(
ctx
,
wdt
.
getTriggerCondition
())
watchDmChannelBaseTask
.
setParentTask
(
wdt
.
getParentTask
())
watchDmChannelTask
:=
&
W
atchDmChannelTask
{
B
aseTask
:
watchDmChannelBaseTask
,
watchDmChannelTask
:=
&
w
atchDmChannelTask
{
b
aseTask
:
watchDmChannelBaseTask
,
WatchDmChannelsRequest
:
&
querypb
.
WatchDmChannelsRequest
{
Base
:
wdt
.
Base
,
NodeID
:
nodeID
,
...
...
@@ -1258,7 +1255,7 @@ func (wdt *WatchDmChannelTask) reschedule(ctx context.Context) ([]task, error) {
excludeNodeIDs
:
wdt
.
excludeNodeIDs
,
}
reScheduledTask
=
append
(
reScheduledTask
,
watchDmChannelTask
)
log
.
Debug
(
"
W
atchDmChannelTask: add a watchDmChannelTask to RescheduleTasks"
,
zap
.
Any
(
"task"
,
watchDmChannelTask
))
log
.
Debug
(
"
w
atchDmChannelTask: add a watchDmChannelTask to RescheduleTasks"
,
zap
.
Any
(
"task"
,
watchDmChannelTask
))
hasWatchQueryChannel
:=
wdt
.
cluster
.
hasWatchedQueryChannel
(
wdt
.
ctx
,
nodeID
,
collectionID
)
if
!
hasWatchQueryChannel
{
...
...
@@ -1278,34 +1275,34 @@ func (wdt *WatchDmChannelTask) reschedule(ctx context.Context) ([]task, error) {
}
watchQueryChannelBaseTask
:=
newBaseTask
(
ctx
,
wdt
.
getTriggerCondition
())
watchQueryChannelBaseTask
.
setParentTask
(
wdt
.
getParentTask
())
watchQueryChannelTask
:=
&
W
atchQueryChannelTask
{
B
aseTask
:
watchQueryChannelBaseTask
,
watchQueryChannelTask
:=
&
w
atchQueryChannelTask
{
b
aseTask
:
watchQueryChannelBaseTask
,
AddQueryChannelRequest
:
addQueryChannelRequest
,
cluster
:
wdt
.
cluster
,
}
reScheduledTask
=
append
(
reScheduledTask
,
watchQueryChannelTask
)
log
.
Debug
(
"
W
atchDmChannelTask: add a watchQueryChannelTask to RescheduleTasks"
,
zap
.
Any
(
"task"
,
watchQueryChannelTask
))
log
.
Debug
(
"
w
atchDmChannelTask: add a watchQueryChannelTask to RescheduleTasks"
,
zap
.
Any
(
"task"
,
watchQueryChannelTask
))
}
}
return
reScheduledTask
,
nil
}
type
W
atchQueryChannelTask
struct
{
*
B
aseTask
type
w
atchQueryChannelTask
struct
{
*
b
aseTask
*
querypb
.
AddQueryChannelRequest
cluster
Cluster
}
func
(
wqt
*
W
atchQueryChannelTask
)
msgBase
()
*
commonpb
.
MsgBase
{
func
(
wqt
*
w
atchQueryChannelTask
)
msgBase
()
*
commonpb
.
MsgBase
{
return
wqt
.
Base
}
func
(
wqt
*
W
atchQueryChannelTask
)
marshal
()
([]
byte
,
error
)
{
func
(
wqt
*
w
atchQueryChannelTask
)
marshal
()
([]
byte
,
error
)
{
return
proto
.
Marshal
(
wqt
.
AddQueryChannelRequest
)
}
func
(
wqt
*
W
atchQueryChannelTask
)
isValid
()
bool
{
func
(
wqt
*
w
atchQueryChannelTask
)
isValid
()
bool
{
online
,
err
:=
wqt
.
cluster
.
isOnline
(
wqt
.
NodeID
)
if
err
!=
nil
{
return
false
...
...
@@ -1314,26 +1311,26 @@ func (wqt *WatchQueryChannelTask) isValid() bool {
return
wqt
.
ctx
!=
nil
&&
online
}
func
(
wqt
*
W
atchQueryChannelTask
)
msgType
()
commonpb
.
MsgType
{
func
(
wqt
*
w
atchQueryChannelTask
)
msgType
()
commonpb
.
MsgType
{
return
wqt
.
Base
.
MsgType
}
func
(
wqt
*
W
atchQueryChannelTask
)
timestamp
()
Timestamp
{
func
(
wqt
*
w
atchQueryChannelTask
)
timestamp
()
Timestamp
{
return
wqt
.
Base
.
Timestamp
}
func
(
wqt
*
W
atchQueryChannelTask
)
updateTaskProcess
()
{
func
(
wqt
*
w
atchQueryChannelTask
)
updateTaskProcess
()
{
parentTask
:=
wqt
.
getParentTask
()
if
parentTask
==
nil
{
log
.
Warn
(
"
W
atchQueryChannelTask: parentTask should not be nil"
)
log
.
Warn
(
"
w
atchQueryChannelTask: parentTask should not be nil"
)
return
}
parentTask
.
updateTaskProcess
()
}
func
(
wqt
*
W
atchQueryChannelTask
)
preExecute
(
context
.
Context
)
error
{
func
(
wqt
*
w
atchQueryChannelTask
)
preExecute
(
context
.
Context
)
error
{
wqt
.
setResultInfo
(
nil
)
log
.
Debug
(
"start do
W
atchQueryChannelTask"
,
log
.
Debug
(
"start do
w
atchQueryChannelTask"
,
zap
.
Int64
(
"collectionID"
,
wqt
.
CollectionID
),
zap
.
String
(
"queryChannel"
,
wqt
.
RequestChannelID
),
zap
.
String
(
"queryResultChannel"
,
wqt
.
ResultChannelID
),
...
...
@@ -1342,14 +1339,14 @@ func (wqt *WatchQueryChannelTask) preExecute(context.Context) error {
return
nil
}
func
(
wqt
*
W
atchQueryChannelTask
)
execute
(
ctx
context
.
Context
)
error
{
func
(
wqt
*
w
atchQueryChannelTask
)
execute
(
ctx
context
.
Context
)
error
{
defer
func
()
{
wqt
.
retryCount
--
}()
err
:=
wqt
.
cluster
.
addQueryChannel
(
wqt
.
ctx
,
wqt
.
NodeID
,
wqt
.
AddQueryChannelRequest
)
if
err
!=
nil
{
log
.
Warn
(
"
W
atchQueryChannelTask: watchQueryChannel occur error"
,
zap
.
Int64
(
"taskID"
,
wqt
.
getTaskID
()))
log
.
Warn
(
"
w
atchQueryChannelTask: watchQueryChannel occur error"
,
zap
.
Int64
(
"taskID"
,
wqt
.
getTaskID
()))
wqt
.
setResultInfo
(
err
)
return
err
}
...
...
@@ -1362,8 +1359,8 @@ func (wqt *WatchQueryChannelTask) execute(ctx context.Context) error {
return
nil
}
func
(
wqt
*
W
atchQueryChannelTask
)
postExecute
(
context
.
Context
)
error
{
log
.
Debug
(
"
W
atchQueryChannelTask postExecute done"
,
func
(
wqt
*
w
atchQueryChannelTask
)
postExecute
(
context
.
Context
)
error
{
log
.
Debug
(
"
w
atchQueryChannelTask postExecute done"
,
zap
.
Int64
(
"collectionID"
,
wqt
.
CollectionID
),
zap
.
String
(
"queryChannel"
,
wqt
.
RequestChannelID
),
zap
.
String
(
"queryResultChannel"
,
wqt
.
ResultChannelID
),
...
...
@@ -1371,13 +1368,11 @@ func (wqt *WatchQueryChannelTask) postExecute(context.Context) error {
return
nil
}
//****************************handoff task********************************//
type
HandoffTask
struct
{
type
handoffTask
struct
{
}
//*********************** ***load balance task*** ************************//
type
LoadBalanceTask
struct
{
*
BaseTask
type
loadBalanceTask
struct
{
*
baseTask
*
querypb
.
LoadBalanceRequest
rootCoord
types
.
RootCoord
dataCoord
types
.
DataCoord
...
...
@@ -1385,32 +1380,32 @@ type LoadBalanceTask struct {
meta
Meta
}
func
(
lbt
*
L
oadBalanceTask
)
msgBase
()
*
commonpb
.
MsgBase
{
func
(
lbt
*
l
oadBalanceTask
)
msgBase
()
*
commonpb
.
MsgBase
{
return
lbt
.
Base
}
func
(
lbt
*
L
oadBalanceTask
)
marshal
()
([]
byte
,
error
)
{
func
(
lbt
*
l
oadBalanceTask
)
marshal
()
([]
byte
,
error
)
{
return
proto
.
Marshal
(
lbt
.
LoadBalanceRequest
)
}
func
(
lbt
*
L
oadBalanceTask
)
msgType
()
commonpb
.
MsgType
{
func
(
lbt
*
l
oadBalanceTask
)
msgType
()
commonpb
.
MsgType
{
return
lbt
.
Base
.
MsgType
}
func
(
lbt
*
L
oadBalanceTask
)
timestamp
()
Timestamp
{
func
(
lbt
*
l
oadBalanceTask
)
timestamp
()
Timestamp
{
return
lbt
.
Base
.
Timestamp
}
func
(
lbt
*
L
oadBalanceTask
)
preExecute
(
context
.
Context
)
error
{
func
(
lbt
*
l
oadBalanceTask
)
preExecute
(
context
.
Context
)
error
{
lbt
.
setResultInfo
(
nil
)
log
.
Debug
(
"start do
L
oadBalanceTask"
,
log
.
Debug
(
"start do
l
oadBalanceTask"
,
zap
.
Int64s
(
"sourceNodeIDs"
,
lbt
.
SourceNodeIDs
),
zap
.
Any
(
"balanceReason"
,
lbt
.
BalanceReason
),
zap
.
Int64
(
"taskID"
,
lbt
.
getTaskID
()))
return
nil
}
func
(
lbt
*
L
oadBalanceTask
)
execute
(
ctx
context
.
Context
)
error
{
func
(
lbt
*
l
oadBalanceTask
)
execute
(
ctx
context
.
Context
)
error
{
defer
func
()
{
lbt
.
retryCount
--
}()
...
...
@@ -1422,7 +1417,7 @@ func (lbt *LoadBalanceTask) execute(ctx context.Context) error {
collectionID
:=
info
.
CollectionID
metaInfo
,
err
:=
lbt
.
meta
.
getCollectionInfoByID
(
collectionID
)
if
err
!=
nil
{
log
.
Warn
(
"
L
oadBalanceTask: getCollectionInfoByID occur error"
,
zap
.
String
(
"error"
,
err
.
Error
()))
log
.
Warn
(
"
l
oadBalanceTask: getCollectionInfoByID occur error"
,
zap
.
String
(
"error"
,
err
.
Error
()))
lbt
.
setResultInfo
(
err
)
return
err
}
...
...
@@ -1538,30 +1533,30 @@ func (lbt *LoadBalanceTask) execute(ctx context.Context) error {
// return nil
//}
log
.
Debug
(
"
L
oadBalanceTask Execute done"
,
log
.
Debug
(
"
l
oadBalanceTask Execute done"
,
zap
.
Int64s
(
"sourceNodeIDs"
,
lbt
.
SourceNodeIDs
),
zap
.
Any
(
"balanceReason"
,
lbt
.
BalanceReason
),
zap
.
Int64
(
"taskID"
,
lbt
.
getTaskID
()))
return
nil
}
func
(
lbt
*
L
oadBalanceTask
)
postExecute
(
context
.
Context
)
error
{
func
(
lbt
*
l
oadBalanceTask
)
postExecute
(
context
.
Context
)
error
{
if
lbt
.
result
.
ErrorCode
==
commonpb
.
ErrorCode_Success
{
for
_
,
id
:=
range
lbt
.
SourceNodeIDs
{
err
:=
lbt
.
cluster
.
removeNodeInfo
(
id
)
if
err
!=
nil
{
log
.
Error
(
"
L
oadBalanceTask: occur error when removing node info from cluster"
,
zap
.
Int64
(
"nodeID"
,
id
))
log
.
Error
(
"
l
oadBalanceTask: occur error when removing node info from cluster"
,
zap
.
Int64
(
"nodeID"
,
id
))
}
err
=
lbt
.
meta
.
deleteSegmentInfoByNodeID
(
id
)
if
err
!=
nil
{
log
.
Error
(
"
L
oadBalanceTask: occur error when removing node info from meta"
,
zap
.
Int64
(
"nodeID"
,
id
))
log
.
Error
(
"
l
oadBalanceTask: occur error when removing node info from meta"
,
zap
.
Int64
(
"nodeID"
,
id
))
}
}
}
else
{
lbt
.
childTasks
=
[]
task
{}
}
log
.
Debug
(
"
L
oadBalanceTask postExecute done"
,
log
.
Debug
(
"
l
oadBalanceTask postExecute done"
,
zap
.
Int64s
(
"sourceNodeIDs"
,
lbt
.
SourceNodeIDs
),
zap
.
Any
(
"balanceReason"
,
lbt
.
BalanceReason
),
zap
.
Int64
(
"taskID"
,
lbt
.
getTaskID
()))
...
...
@@ -1792,8 +1787,8 @@ func assignInternalTask(ctx context.Context,
loadSegmentsReq
.
NodeID
=
nodeID
baseTask
:=
newBaseTask
(
ctx
,
parentTask
.
getTriggerCondition
())
baseTask
.
setParentTask
(
parentTask
)
loadSegmentTask
:=
&
L
oadSegmentTask
{
B
aseTask
:
baseTask
,
loadSegmentTask
:=
&
l
oadSegmentTask
{
b
aseTask
:
baseTask
,
LoadSegmentsRequest
:
loadSegmentsReq
,
meta
:
meta
,
cluster
:
cluster
,
...
...
@@ -1809,8 +1804,8 @@ func assignInternalTask(ctx context.Context,
watchDmChannelReq
.
NodeID
=
nodeID
baseTask
:=
newBaseTask
(
ctx
,
parentTask
.
getTriggerCondition
())
baseTask
.
setParentTask
(
parentTask
)
watchDmChannelTask
:=
&
W
atchDmChannelTask
{
B
aseTask
:
baseTask
,
watchDmChannelTask
:=
&
w
atchDmChannelTask
{
b
aseTask
:
baseTask
,
WatchDmChannelsRequest
:
watchDmChannelReq
,
meta
:
meta
,
cluster
:
cluster
,
...
...
@@ -1839,8 +1834,8 @@ func assignInternalTask(ctx context.Context,
}
baseTask
:=
newBaseTask
(
ctx
,
parentTask
.
getTriggerCondition
())
baseTask
.
setParentTask
(
parentTask
)
watchQueryChannelTask
:=
&
W
atchQueryChannelTask
{
B
aseTask
:
baseTask
,
watchQueryChannelTask
:=
&
w
atchQueryChannelTask
{
b
aseTask
:
baseTask
,
AddQueryChannelRequest
:
addQueryChannelRequest
,
cluster
:
cluster
,
...
...
internal/querycoord/task_scheduler.go
浏览文件 @
d6f95f49
...
...
@@ -272,8 +272,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task,
if
err
!=
nil
{
return
nil
,
err
}
loadCollectionTask
:=
&
L
oadCollectionTask
{
B
aseTask
:
baseTask
,
loadCollectionTask
:=
&
l
oadCollectionTask
{
b
aseTask
:
baseTask
,
LoadCollectionRequest
:
&
loadReq
,
rootCoord
:
scheduler
.
rootCoord
,
dataCoord
:
scheduler
.
dataCoord
,
...
...
@@ -287,8 +287,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task,
if
err
!=
nil
{
return
nil
,
err
}
loadPartitionTask
:=
&
L
oadPartitionTask
{
B
aseTask
:
baseTask
,
loadPartitionTask
:=
&
l
oadPartitionTask
{
b
aseTask
:
baseTask
,
LoadPartitionsRequest
:
&
loadReq
,
dataCoord
:
scheduler
.
dataCoord
,
cluster
:
scheduler
.
cluster
,
...
...
@@ -301,8 +301,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task,
if
err
!=
nil
{
return
nil
,
err
}
releaseCollectionTask
:=
&
R
eleaseCollectionTask
{
B
aseTask
:
baseTask
,
releaseCollectionTask
:=
&
r
eleaseCollectionTask
{
b
aseTask
:
baseTask
,
ReleaseCollectionRequest
:
&
loadReq
,
cluster
:
scheduler
.
cluster
,
meta
:
scheduler
.
meta
,
...
...
@@ -315,8 +315,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task,
if
err
!=
nil
{
return
nil
,
err
}
releasePartitionTask
:=
&
R
eleasePartitionTask
{
B
aseTask
:
baseTask
,
releasePartitionTask
:=
&
r
eleasePartitionTask
{
b
aseTask
:
baseTask
,
ReleasePartitionsRequest
:
&
loadReq
,
cluster
:
scheduler
.
cluster
,
}
...
...
@@ -328,8 +328,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task,
if
err
!=
nil
{
return
nil
,
err
}
loadSegmentTask
:=
&
L
oadSegmentTask
{
B
aseTask
:
baseTask
,
loadSegmentTask
:=
&
l
oadSegmentTask
{
b
aseTask
:
baseTask
,
LoadSegmentsRequest
:
&
loadReq
,
cluster
:
scheduler
.
cluster
,
meta
:
scheduler
.
meta
,
...
...
@@ -343,8 +343,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task,
if
err
!=
nil
{
return
nil
,
err
}
releaseSegmentTask
:=
&
R
eleaseSegmentTask
{
B
aseTask
:
baseTask
,
releaseSegmentTask
:=
&
r
eleaseSegmentTask
{
b
aseTask
:
baseTask
,
ReleaseSegmentsRequest
:
&
loadReq
,
cluster
:
scheduler
.
cluster
,
}
...
...
@@ -356,8 +356,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task,
if
err
!=
nil
{
return
nil
,
err
}
watchDmChannelTask
:=
&
W
atchDmChannelTask
{
B
aseTask
:
baseTask
,
watchDmChannelTask
:=
&
w
atchDmChannelTask
{
b
aseTask
:
baseTask
,
WatchDmChannelsRequest
:
&
loadReq
,
cluster
:
scheduler
.
cluster
,
meta
:
scheduler
.
meta
,
...
...
@@ -371,8 +371,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task,
if
err
!=
nil
{
return
nil
,
err
}
watchQueryChannelTask
:=
&
W
atchQueryChannelTask
{
B
aseTask
:
baseTask
,
watchQueryChannelTask
:=
&
w
atchQueryChannelTask
{
b
aseTask
:
baseTask
,
AddQueryChannelRequest
:
&
loadReq
,
cluster
:
scheduler
.
cluster
,
}
...
...
@@ -384,8 +384,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task,
if
err
!=
nil
{
return
nil
,
err
}
loadBalanceTask
:=
&
L
oadBalanceTask
{
B
aseTask
:
baseTask
,
loadBalanceTask
:=
&
l
oadBalanceTask
{
b
aseTask
:
baseTask
,
LoadBalanceRequest
:
&
loadReq
,
rootCoord
:
scheduler
.
rootCoord
,
dataCoord
:
scheduler
.
dataCoord
,
...
...
internal/querycoord/task_scheduler_test.go
浏览文件 @
d6f95f49
...
...
@@ -25,7 +25,7 @@ import (
)
type
testTask
struct
{
B
aseTask
b
aseTask
baseMsg
*
commonpb
.
MsgBase
cluster
Cluster
meta
Meta
...
...
@@ -59,8 +59,8 @@ func (tt *testTask) execute(ctx context.Context) error {
switch
tt
.
baseMsg
.
MsgType
{
case
commonpb
.
MsgType_LoadSegments
:
childTask
:=
&
L
oadSegmentTask
{
BaseTask
:
&
B
aseTask
{
childTask
:=
&
l
oadSegmentTask
{
baseTask
:
&
b
aseTask
{
ctx
:
tt
.
ctx
,
Condition
:
NewTaskCondition
(
tt
.
ctx
),
triggerCondition
:
tt
.
triggerCondition
,
...
...
@@ -77,8 +77,8 @@ func (tt *testTask) execute(ctx context.Context) error {
}
tt
.
addChildTask
(
childTask
)
case
commonpb
.
MsgType_WatchDmChannels
:
childTask
:=
&
W
atchDmChannelTask
{
BaseTask
:
&
B
aseTask
{
childTask
:=
&
w
atchDmChannelTask
{
baseTask
:
&
b
aseTask
{
ctx
:
tt
.
ctx
,
Condition
:
NewTaskCondition
(
tt
.
ctx
),
triggerCondition
:
tt
.
triggerCondition
,
...
...
@@ -95,8 +95,8 @@ func (tt *testTask) execute(ctx context.Context) error {
}
tt
.
addChildTask
(
childTask
)
case
commonpb
.
MsgType_WatchQueryChannels
:
childTask
:=
&
W
atchQueryChannelTask
{
BaseTask
:
&
B
aseTask
{
childTask
:=
&
w
atchQueryChannelTask
{
baseTask
:
&
b
aseTask
{
ctx
:
tt
.
ctx
,
Condition
:
NewTaskCondition
(
tt
.
ctx
),
triggerCondition
:
tt
.
triggerCondition
,
...
...
@@ -134,7 +134,7 @@ func TestWatchQueryChannel_ClearEtcdInfoAfterAssignedNodeDown(t *testing.T) {
nodeID
:=
queryNode
.
queryNodeID
waitQueryNodeOnline
(
queryCoord
.
cluster
,
nodeID
)
testTask
:=
&
testTask
{
BaseTask
:
B
aseTask
{
baseTask
:
b
aseTask
{
ctx
:
baseCtx
,
Condition
:
NewTaskCondition
(
baseCtx
),
triggerCondition
:
querypb
.
TriggerCondition_grpcRequest
,
...
...
@@ -173,8 +173,8 @@ func TestUnMarshalTask(t *testing.T) {
cancel
:
cancel
,
}
t
.
Run
(
"Test
L
oadCollectionTask"
,
func
(
t
*
testing
.
T
)
{
loadTask
:=
&
L
oadCollectionTask
{
t
.
Run
(
"Test
l
oadCollectionTask"
,
func
(
t
*
testing
.
T
)
{
loadTask
:=
&
l
oadCollectionTask
{
LoadCollectionRequest
:
&
querypb
.
LoadCollectionRequest
{
Base
:
&
commonpb
.
MsgBase
{
MsgType
:
commonpb
.
MsgType_LoadCollection
,
...
...
@@ -195,7 +195,7 @@ func TestUnMarshalTask(t *testing.T) {
})
t
.
Run
(
"Test LoadPartitionsTask"
,
func
(
t
*
testing
.
T
)
{
loadTask
:=
&
L
oadPartitionTask
{
loadTask
:=
&
l
oadPartitionTask
{
LoadPartitionsRequest
:
&
querypb
.
LoadPartitionsRequest
{
Base
:
&
commonpb
.
MsgBase
{
MsgType
:
commonpb
.
MsgType_LoadPartitions
,
...
...
@@ -215,8 +215,8 @@ func TestUnMarshalTask(t *testing.T) {
assert
.
Equal
(
t
,
task
.
msgType
(),
commonpb
.
MsgType_LoadPartitions
)
})
t
.
Run
(
"Test
R
eleaseCollectionTask"
,
func
(
t
*
testing
.
T
)
{
releaseTask
:=
&
R
eleaseCollectionTask
{
t
.
Run
(
"Test
r
eleaseCollectionTask"
,
func
(
t
*
testing
.
T
)
{
releaseTask
:=
&
r
eleaseCollectionTask
{
ReleaseCollectionRequest
:
&
querypb
.
ReleaseCollectionRequest
{
Base
:
&
commonpb
.
MsgBase
{
MsgType
:
commonpb
.
MsgType_ReleaseCollection
,
...
...
@@ -236,8 +236,8 @@ func TestUnMarshalTask(t *testing.T) {
assert
.
Equal
(
t
,
task
.
msgType
(),
commonpb
.
MsgType_ReleaseCollection
)
})
t
.
Run
(
"Test
R
eleasePartitionTask"
,
func
(
t
*
testing
.
T
)
{
releaseTask
:=
&
R
eleasePartitionTask
{
t
.
Run
(
"Test
r
eleasePartitionTask"
,
func
(
t
*
testing
.
T
)
{
releaseTask
:=
&
r
eleasePartitionTask
{
ReleasePartitionsRequest
:
&
querypb
.
ReleasePartitionsRequest
{
Base
:
&
commonpb
.
MsgBase
{
MsgType
:
commonpb
.
MsgType_ReleasePartitions
,
...
...
@@ -257,8 +257,8 @@ func TestUnMarshalTask(t *testing.T) {
assert
.
Equal
(
t
,
task
.
msgType
(),
commonpb
.
MsgType_ReleasePartitions
)
})
t
.
Run
(
"Test
L
oadSegmentTask"
,
func
(
t
*
testing
.
T
)
{
loadTask
:=
&
L
oadSegmentTask
{
t
.
Run
(
"Test
l
oadSegmentTask"
,
func
(
t
*
testing
.
T
)
{
loadTask
:=
&
l
oadSegmentTask
{
LoadSegmentsRequest
:
&
querypb
.
LoadSegmentsRequest
{
Base
:
&
commonpb
.
MsgBase
{
MsgType
:
commonpb
.
MsgType_LoadSegments
,
...
...
@@ -278,8 +278,8 @@ func TestUnMarshalTask(t *testing.T) {
assert
.
Equal
(
t
,
task
.
msgType
(),
commonpb
.
MsgType_LoadSegments
)
})
t
.
Run
(
"Test
R
eleaseSegmentTask"
,
func
(
t
*
testing
.
T
)
{
releaseTask
:=
&
R
eleaseSegmentTask
{
t
.
Run
(
"Test
r
eleaseSegmentTask"
,
func
(
t
*
testing
.
T
)
{
releaseTask
:=
&
r
eleaseSegmentTask
{
ReleaseSegmentsRequest
:
&
querypb
.
ReleaseSegmentsRequest
{
Base
:
&
commonpb
.
MsgBase
{
MsgType
:
commonpb
.
MsgType_ReleaseSegments
,
...
...
@@ -299,8 +299,8 @@ func TestUnMarshalTask(t *testing.T) {
assert
.
Equal
(
t
,
task
.
msgType
(),
commonpb
.
MsgType_ReleaseSegments
)
})
t
.
Run
(
"Test
W
atchDmChannelTask"
,
func
(
t
*
testing
.
T
)
{
watchTask
:=
&
W
atchDmChannelTask
{
t
.
Run
(
"Test
w
atchDmChannelTask"
,
func
(
t
*
testing
.
T
)
{
watchTask
:=
&
w
atchDmChannelTask
{
WatchDmChannelsRequest
:
&
querypb
.
WatchDmChannelsRequest
{
Base
:
&
commonpb
.
MsgBase
{
MsgType
:
commonpb
.
MsgType_WatchDmChannels
,
...
...
@@ -320,8 +320,8 @@ func TestUnMarshalTask(t *testing.T) {
assert
.
Equal
(
t
,
task
.
msgType
(),
commonpb
.
MsgType_WatchDmChannels
)
})
t
.
Run
(
"Test
W
atchQueryChannelTask"
,
func
(
t
*
testing
.
T
)
{
watchTask
:=
&
W
atchQueryChannelTask
{
t
.
Run
(
"Test
w
atchQueryChannelTask"
,
func
(
t
*
testing
.
T
)
{
watchTask
:=
&
w
atchQueryChannelTask
{
AddQueryChannelRequest
:
&
querypb
.
AddQueryChannelRequest
{
Base
:
&
commonpb
.
MsgBase
{
MsgType
:
commonpb
.
MsgType_WatchQueryChannels
,
...
...
@@ -341,8 +341,8 @@ func TestUnMarshalTask(t *testing.T) {
assert
.
Equal
(
t
,
task
.
msgType
(),
commonpb
.
MsgType_WatchQueryChannels
)
})
t
.
Run
(
"Test
L
oadBalanceTask"
,
func
(
t
*
testing
.
T
)
{
loadBalanceTask
:=
&
L
oadBalanceTask
{
t
.
Run
(
"Test
l
oadBalanceTask"
,
func
(
t
*
testing
.
T
)
{
loadBalanceTask
:=
&
l
oadBalanceTask
{
LoadBalanceRequest
:
&
querypb
.
LoadBalanceRequest
{
Base
:
&
commonpb
.
MsgBase
{
MsgType
:
commonpb
.
MsgType_LoadBalanceSegments
,
...
...
@@ -379,7 +379,7 @@ func TestReloadTaskFromKV(t *testing.T) {
}
kvs
:=
make
(
map
[
string
]
string
)
triggerTask
:=
&
L
oadCollectionTask
{
triggerTask
:=
&
l
oadCollectionTask
{
LoadCollectionRequest
:
&
querypb
.
LoadCollectionRequest
{
Base
:
&
commonpb
.
MsgBase
{
Timestamp
:
1
,
...
...
@@ -392,7 +392,7 @@ func TestReloadTaskFromKV(t *testing.T) {
triggerTaskKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
triggerTaskPrefix
,
100
)
kvs
[
triggerTaskKey
]
=
string
(
triggerBlobs
)
activeTask
:=
&
L
oadSegmentTask
{
activeTask
:=
&
l
oadSegmentTask
{
LoadSegmentsRequest
:
&
querypb
.
LoadSegmentsRequest
{
Base
:
&
commonpb
.
MsgBase
{
Timestamp
:
2
,
...
...
internal/querycoord/task_test.go
浏览文件 @
d6f95f49
...
...
@@ -21,7 +21,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
)
func
genLoadCollectionTask
(
ctx
context
.
Context
,
queryCoord
*
QueryCoord
)
*
L
oadCollectionTask
{
func
genLoadCollectionTask
(
ctx
context
.
Context
,
queryCoord
*
QueryCoord
)
*
l
oadCollectionTask
{
req
:=
&
querypb
.
LoadCollectionRequest
{
Base
:
&
commonpb
.
MsgBase
{
MsgType
:
commonpb
.
MsgType_LoadCollection
,
...
...
@@ -30,8 +30,8 @@ func genLoadCollectionTask(ctx context.Context, queryCoord *QueryCoord) *LoadCol
Schema
:
genCollectionSchema
(
defaultCollectionID
,
false
),
}
baseTask
:=
newBaseTask
(
ctx
,
querypb
.
TriggerCondition_grpcRequest
)
loadCollectionTask
:=
&
L
oadCollectionTask
{
B
aseTask
:
baseTask
,
loadCollectionTask
:=
&
l
oadCollectionTask
{
b
aseTask
:
baseTask
,
LoadCollectionRequest
:
req
,
rootCoord
:
queryCoord
.
rootCoordClient
,
dataCoord
:
queryCoord
.
dataCoordClient
,
...
...
@@ -41,7 +41,7 @@ func genLoadCollectionTask(ctx context.Context, queryCoord *QueryCoord) *LoadCol
return
loadCollectionTask
}
func
genLoadPartitionTask
(
ctx
context
.
Context
,
queryCoord
*
QueryCoord
)
*
L
oadPartitionTask
{
func
genLoadPartitionTask
(
ctx
context
.
Context
,
queryCoord
*
QueryCoord
)
*
l
oadPartitionTask
{
req
:=
&
querypb
.
LoadPartitionsRequest
{
Base
:
&
commonpb
.
MsgBase
{
MsgType
:
commonpb
.
MsgType_LoadPartitions
,
...
...
@@ -50,8 +50,8 @@ func genLoadPartitionTask(ctx context.Context, queryCoord *QueryCoord) *LoadPart
PartitionIDs
:
[]
UniqueID
{
defaultPartitionID
},
}
baseTask
:=
newBaseTask
(
ctx
,
querypb
.
TriggerCondition_grpcRequest
)
loadPartitionTask
:=
&
L
oadPartitionTask
{
B
aseTask
:
baseTask
,
loadPartitionTask
:=
&
l
oadPartitionTask
{
b
aseTask
:
baseTask
,
LoadPartitionsRequest
:
req
,
dataCoord
:
queryCoord
.
dataCoordClient
,
cluster
:
queryCoord
.
cluster
,
...
...
@@ -60,7 +60,7 @@ func genLoadPartitionTask(ctx context.Context, queryCoord *QueryCoord) *LoadPart
return
loadPartitionTask
}
func
genReleaseCollectionTask
(
ctx
context
.
Context
,
queryCoord
*
QueryCoord
)
*
R
eleaseCollectionTask
{
func
genReleaseCollectionTask
(
ctx
context
.
Context
,
queryCoord
*
QueryCoord
)
*
r
eleaseCollectionTask
{
req
:=
&
querypb
.
ReleaseCollectionRequest
{
Base
:
&
commonpb
.
MsgBase
{
MsgType
:
commonpb
.
MsgType_ReleaseCollection
,
...
...
@@ -68,8 +68,8 @@ func genReleaseCollectionTask(ctx context.Context, queryCoord *QueryCoord) *Rele
CollectionID
:
defaultCollectionID
,
}
baseTask
:=
newBaseTask
(
ctx
,
querypb
.
TriggerCondition_grpcRequest
)
releaseCollectionTask
:=
&
R
eleaseCollectionTask
{
B
aseTask
:
baseTask
,
releaseCollectionTask
:=
&
r
eleaseCollectionTask
{
b
aseTask
:
baseTask
,
ReleaseCollectionRequest
:
req
,
rootCoord
:
queryCoord
.
rootCoordClient
,
cluster
:
queryCoord
.
cluster
,
...
...
@@ -79,7 +79,7 @@ func genReleaseCollectionTask(ctx context.Context, queryCoord *QueryCoord) *Rele
return
releaseCollectionTask
}
func
genReleasePartitionTask
(
ctx
context
.
Context
,
queryCoord
*
QueryCoord
)
*
R
eleasePartitionTask
{
func
genReleasePartitionTask
(
ctx
context
.
Context
,
queryCoord
*
QueryCoord
)
*
r
eleasePartitionTask
{
req
:=
&
querypb
.
ReleasePartitionsRequest
{
Base
:
&
commonpb
.
MsgBase
{
MsgType
:
commonpb
.
MsgType_ReleasePartitions
,
...
...
@@ -88,8 +88,8 @@ func genReleasePartitionTask(ctx context.Context, queryCoord *QueryCoord) *Relea
PartitionIDs
:
[]
UniqueID
{
defaultPartitionID
},
}
baseTask
:=
newBaseTask
(
ctx
,
querypb
.
TriggerCondition_grpcRequest
)
releasePartitionTask
:=
&
R
eleasePartitionTask
{
B
aseTask
:
baseTask
,
releasePartitionTask
:=
&
r
eleasePartitionTask
{
b
aseTask
:
baseTask
,
ReleasePartitionsRequest
:
req
,
cluster
:
queryCoord
.
cluster
,
}
...
...
@@ -97,7 +97,7 @@ func genReleasePartitionTask(ctx context.Context, queryCoord *QueryCoord) *Relea
return
releasePartitionTask
}
func
genReleaseSegmentTask
(
ctx
context
.
Context
,
queryCoord
*
QueryCoord
,
nodeID
int64
)
*
R
eleaseSegmentTask
{
func
genReleaseSegmentTask
(
ctx
context
.
Context
,
queryCoord
*
QueryCoord
,
nodeID
int64
)
*
r
eleaseSegmentTask
{
req
:=
&
querypb
.
ReleaseSegmentsRequest
{
Base
:
&
commonpb
.
MsgBase
{
MsgType
:
commonpb
.
MsgType_ReleaseSegments
,
...
...
@@ -108,15 +108,15 @@ func genReleaseSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID i
SegmentIDs
:
[]
UniqueID
{
defaultSegmentID
},
}
baseTask
:=
newBaseTask
(
ctx
,
querypb
.
TriggerCondition_grpcRequest
)
releaseSegmentTask
:=
&
R
eleaseSegmentTask
{
B
aseTask
:
baseTask
,
releaseSegmentTask
:=
&
r
eleaseSegmentTask
{
b
aseTask
:
baseTask
,
ReleaseSegmentsRequest
:
req
,
cluster
:
queryCoord
.
cluster
,
}
return
releaseSegmentTask
}
func
genWatchDmChannelTask
(
ctx
context
.
Context
,
queryCoord
*
QueryCoord
,
nodeID
int64
)
*
W
atchDmChannelTask
{
func
genWatchDmChannelTask
(
ctx
context
.
Context
,
queryCoord
*
QueryCoord
,
nodeID
int64
)
*
w
atchDmChannelTask
{
schema
:=
genCollectionSchema
(
defaultCollectionID
,
false
)
vChannelInfo
:=
&
datapb
.
VchannelInfo
{
CollectionID
:
defaultCollectionID
,
...
...
@@ -134,8 +134,8 @@ func genWatchDmChannelTask(ctx context.Context, queryCoord *QueryCoord, nodeID i
}
baseTask
:=
newBaseTask
(
ctx
,
querypb
.
TriggerCondition_grpcRequest
)
baseTask
.
taskID
=
100
watchDmChannelTask
:=
&
W
atchDmChannelTask
{
B
aseTask
:
baseTask
,
watchDmChannelTask
:=
&
w
atchDmChannelTask
{
b
aseTask
:
baseTask
,
WatchDmChannelsRequest
:
req
,
cluster
:
queryCoord
.
cluster
,
meta
:
queryCoord
.
meta
,
...
...
@@ -151,8 +151,8 @@ func genWatchDmChannelTask(ctx context.Context, queryCoord *QueryCoord, nodeID i
}
baseParentTask
:=
newBaseTask
(
ctx
,
querypb
.
TriggerCondition_grpcRequest
)
baseParentTask
.
taskID
=
10
parentTask
:=
&
L
oadCollectionTask
{
B
aseTask
:
baseParentTask
,
parentTask
:=
&
l
oadCollectionTask
{
b
aseTask
:
baseParentTask
,
LoadCollectionRequest
:
parentReq
,
rootCoord
:
queryCoord
.
rootCoordClient
,
dataCoord
:
queryCoord
.
dataCoordClient
,
...
...
@@ -167,7 +167,7 @@ func genWatchDmChannelTask(ctx context.Context, queryCoord *QueryCoord, nodeID i
queryCoord
.
meta
.
addCollection
(
defaultCollectionID
,
schema
)
return
watchDmChannelTask
}
func
genLoadSegmentTask
(
ctx
context
.
Context
,
queryCoord
*
QueryCoord
,
nodeID
int64
)
*
L
oadSegmentTask
{
func
genLoadSegmentTask
(
ctx
context
.
Context
,
queryCoord
*
QueryCoord
,
nodeID
int64
)
*
l
oadSegmentTask
{
schema
:=
genCollectionSchema
(
defaultCollectionID
,
false
)
segmentInfo
:=
&
querypb
.
SegmentLoadInfo
{
SegmentID
:
defaultSegmentID
,
...
...
@@ -184,8 +184,8 @@ func genLoadSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID int6
}
baseTask
:=
newBaseTask
(
ctx
,
querypb
.
TriggerCondition_grpcRequest
)
baseTask
.
taskID
=
100
loadSegmentTask
:=
&
L
oadSegmentTask
{
B
aseTask
:
baseTask
,
loadSegmentTask
:=
&
l
oadSegmentTask
{
b
aseTask
:
baseTask
,
LoadSegmentsRequest
:
req
,
cluster
:
queryCoord
.
cluster
,
meta
:
queryCoord
.
meta
,
...
...
@@ -201,8 +201,8 @@ func genLoadSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID int6
}
baseParentTask
:=
newBaseTask
(
ctx
,
querypb
.
TriggerCondition_grpcRequest
)
baseParentTask
.
taskID
=
10
parentTask
:=
&
L
oadCollectionTask
{
B
aseTask
:
baseParentTask
,
parentTask
:=
&
l
oadCollectionTask
{
b
aseTask
:
baseParentTask
,
LoadCollectionRequest
:
parentReq
,
rootCoord
:
queryCoord
.
rootCoordClient
,
dataCoord
:
queryCoord
.
dataCoordClient
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录