Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
e80fbd1f
M
milvus
项目概览
milvus
/
milvus
11 个月 前同步成功
通知
261
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
e80fbd1f
编写于
10月 14, 2021
作者:
X
xige-16
提交者:
GitHub
10月 14, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Fix golint error in the task of querycoord (#9866)
Signed-off-by:
N
xige-16
<
xi.ge@zilliz.com
>
上级
d0f03b2e
变更
6
展开全部
隐藏空白更改
内联
并排
Showing
6 changed file
with
392 addition
and
392 deletion
+392
-392
internal/querycoord/condition.go
internal/querycoord/condition.go
+4
-4
internal/querycoord/impl.go
internal/querycoord/impl.go
+4
-4
internal/querycoord/task.go
internal/querycoord/task.go
+223
-223
internal/querycoord/task_scheduler.go
internal/querycoord/task_scheduler.go
+115
-115
internal/querycoord/task_scheduler_test.go
internal/querycoord/task_scheduler_test.go
+33
-33
internal/querycoord/task_test.go
internal/querycoord/task_test.go
+13
-13
未找到文件。
internal/querycoord/condition.go
浏览文件 @
e80fbd1f
...
...
@@ -17,8 +17,8 @@ import (
)
type
Condition
interface
{
W
aitToFinish
()
error
N
otify
(
err
error
)
w
aitToFinish
()
error
n
otify
(
err
error
)
Ctx
()
context
.
Context
}
...
...
@@ -27,7 +27,7 @@ type TaskCondition struct {
ctx
context
.
Context
}
func
(
tc
*
TaskCondition
)
W
aitToFinish
()
error
{
func
(
tc
*
TaskCondition
)
w
aitToFinish
()
error
{
for
{
select
{
case
<-
tc
.
ctx
.
Done
()
:
...
...
@@ -38,7 +38,7 @@ func (tc *TaskCondition) WaitToFinish() error {
}
}
func
(
tc
*
TaskCondition
)
N
otify
(
err
error
)
{
func
(
tc
*
TaskCondition
)
n
otify
(
err
error
)
{
tc
.
done
<-
err
}
...
...
internal/querycoord/impl.go
浏览文件 @
e80fbd1f
...
...
@@ -157,7 +157,7 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
return
status
,
err
}
err
=
loadCollectionTask
.
W
aitToFinish
()
err
=
loadCollectionTask
.
w
aitToFinish
()
if
err
!=
nil
{
status
.
ErrorCode
=
commonpb
.
ErrorCode_UnexpectedError
status
.
Reason
=
err
.
Error
()
...
...
@@ -205,7 +205,7 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
return
status
,
err
}
err
=
releaseCollectionTask
.
W
aitToFinish
()
err
=
releaseCollectionTask
.
w
aitToFinish
()
if
err
!=
nil
{
status
.
ErrorCode
=
commonpb
.
ErrorCode_UnexpectedError
status
.
Reason
=
err
.
Error
()
...
...
@@ -348,7 +348,7 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
return
status
,
err
}
err
=
loadPartitionTask
.
W
aitToFinish
()
err
=
loadPartitionTask
.
w
aitToFinish
()
if
err
!=
nil
{
status
.
ErrorCode
=
commonpb
.
ErrorCode_UnexpectedError
status
.
Reason
=
err
.
Error
()
...
...
@@ -417,7 +417,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
return
status
,
err
}
err
=
releasePartitionTask
.
W
aitToFinish
()
err
=
releasePartitionTask
.
w
aitToFinish
()
if
err
!=
nil
{
status
.
ErrorCode
=
commonpb
.
ErrorCode_UnexpectedError
status
.
Reason
=
err
.
Error
()
...
...
internal/querycoord/task.go
浏览文件 @
e80fbd1f
此差异已折叠。
点击以展开。
internal/querycoord/task_scheduler.go
浏览文件 @
e80fbd1f
...
...
@@ -71,7 +71,7 @@ func (queue *TaskQueue) addTask(t task) {
}
for
e
:=
queue
.
tasks
.
Back
();
e
!=
nil
;
e
=
e
.
Prev
()
{
if
t
.
TaskPriority
()
>
e
.
Value
.
(
task
)
.
T
askPriority
()
{
if
t
.
taskPriority
()
>
e
.
Value
.
(
task
)
.
t
askPriority
()
{
if
e
.
Prev
()
==
nil
{
queue
.
taskChan
<-
1
queue
.
tasks
.
InsertBefore
(
t
,
e
)
...
...
@@ -233,18 +233,18 @@ func (scheduler *TaskScheduler) reloadFromKV() error {
log
.
Error
(
"reloadFromKV: taskStateInfo and triggerTaskInfo are inconsistent"
)
continue
}
triggerTasks
[
taskID
]
.
S
etState
(
state
)
triggerTasks
[
taskID
]
.
s
etState
(
state
)
}
var
doneTriggerTask
task
=
nil
for
_
,
t
:=
range
triggerTasks
{
if
t
.
State
()
==
taskDone
{
if
t
.
get
State
()
==
taskDone
{
doneTriggerTask
=
t
for
_
,
childTask
:=
range
activeTasks
{
childTask
.
S
etParentTask
(
t
)
//replace child task after reScheduler
t
.
A
ddChildTask
(
childTask
)
childTask
.
s
etParentTask
(
t
)
//replace child task after reScheduler
t
.
a
ddChildTask
(
childTask
)
}
t
.
S
etResultInfo
(
nil
)
t
.
s
etResultInfo
(
nil
)
continue
}
scheduler
.
triggerTaskQueue
.
addTask
(
t
)
...
...
@@ -399,7 +399,7 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task,
return
nil
,
err
}
newTask
.
Set
ID
(
taskID
)
newTask
.
setTask
ID
(
taskID
)
return
newTask
,
nil
}
...
...
@@ -410,26 +410,26 @@ func (scheduler *TaskScheduler) Enqueue(t task) error {
log
.
Error
(
"allocator trigger taskID failed"
,
zap
.
Error
(
err
))
return
err
}
t
.
Set
ID
(
id
)
t
.
setTask
ID
(
id
)
kvs
:=
make
(
map
[
string
]
string
)
taskKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
triggerTaskPrefix
,
t
.
ID
())
blobs
,
err
:=
t
.
M
arshal
()
taskKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
triggerTaskPrefix
,
t
.
getTask
ID
())
blobs
,
err
:=
t
.
m
arshal
()
if
err
!=
nil
{
log
.
Error
(
"error when save marshal task"
,
zap
.
Int64
(
"taskID"
,
t
.
ID
()),
zap
.
Error
(
err
))
log
.
Error
(
"error when save marshal task"
,
zap
.
Int64
(
"taskID"
,
t
.
getTask
ID
()),
zap
.
Error
(
err
))
return
err
}
kvs
[
taskKey
]
=
string
(
blobs
)
stateKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
t
.
ID
())
stateKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
t
.
getTask
ID
())
kvs
[
stateKey
]
=
strconv
.
Itoa
(
int
(
taskUndo
))
err
=
scheduler
.
client
.
MultiSave
(
kvs
)
if
err
!=
nil
{
//TODO::clean etcd meta
log
.
Error
(
"error when save trigger task to etcd"
,
zap
.
Int64
(
"taskID"
,
t
.
ID
()),
zap
.
Error
(
err
))
log
.
Error
(
"error when save trigger task to etcd"
,
zap
.
Int64
(
"taskID"
,
t
.
getTask
ID
()),
zap
.
Error
(
err
))
return
err
}
t
.
S
etState
(
taskUndo
)
t
.
s
etState
(
taskUndo
)
scheduler
.
triggerTaskQueue
.
addTask
(
t
)
log
.
Debug
(
"EnQueue a triggerTask and save to etcd"
,
zap
.
Int64
(
"taskID"
,
t
.
ID
()))
log
.
Debug
(
"EnQueue a triggerTask and save to etcd"
,
zap
.
Int64
(
"taskID"
,
t
.
getTask
ID
()))
return
nil
}
...
...
@@ -440,19 +440,19 @@ func (scheduler *TaskScheduler) processTask(t task) error {
updateKVFn
:=
func
(
parentTask
task
)
error
{
kvs
:=
make
(
map
[
string
]
string
)
kvs
[
taskInfoKey
]
=
strconv
.
Itoa
(
int
(
taskDone
))
for
_
,
childTask
:=
range
parentTask
.
G
etChildTask
()
{
for
_
,
childTask
:=
range
parentTask
.
g
etChildTask
()
{
id
,
err
:=
scheduler
.
taskIDAllocator
()
if
err
!=
nil
{
return
err
}
childTask
.
Set
ID
(
id
)
childTaskKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
activeTaskPrefix
,
childTask
.
ID
())
blobs
,
err
:=
childTask
.
M
arshal
()
childTask
.
setTask
ID
(
id
)
childTaskKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
activeTaskPrefix
,
childTask
.
getTask
ID
())
blobs
,
err
:=
childTask
.
m
arshal
()
if
err
!=
nil
{
return
err
}
kvs
[
childTaskKey
]
=
string
(
blobs
)
stateKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
childTask
.
ID
())
stateKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
childTask
.
getTask
ID
())
kvs
[
stateKey
]
=
strconv
.
Itoa
(
int
(
taskUndo
))
}
err
:=
scheduler
.
client
.
MultiSave
(
kvs
)
...
...
@@ -462,35 +462,35 @@ func (scheduler *TaskScheduler) processTask(t task) error {
return
nil
}
span
,
ctx
:=
trace
.
StartSpanFromContext
(
t
.
T
raceCtx
(),
span
,
ctx
:=
trace
.
StartSpanFromContext
(
t
.
t
raceCtx
(),
opentracing
.
Tags
{
"Type"
:
t
.
Type
(),
"ID"
:
t
.
ID
(),
"Type"
:
t
.
msg
Type
(),
"ID"
:
t
.
getTask
ID
(),
})
var
err
error
defer
span
.
Finish
()
defer
func
()
{
//task postExecute
span
.
LogFields
(
oplog
.
Int64
(
"processTask: scheduler process PostExecute"
,
t
.
ID
()))
t
.
P
ostExecute
(
ctx
)
span
.
LogFields
(
oplog
.
Int64
(
"processTask: scheduler process PostExecute"
,
t
.
getTask
ID
()))
t
.
p
ostExecute
(
ctx
)
}()
// task preExecute
span
.
LogFields
(
oplog
.
Int64
(
"processTask: scheduler process PreExecute"
,
t
.
ID
()))
t
.
P
reExecute
(
ctx
)
taskInfoKey
=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
t
.
ID
())
span
.
LogFields
(
oplog
.
Int64
(
"processTask: scheduler process PreExecute"
,
t
.
getTask
ID
()))
t
.
p
reExecute
(
ctx
)
taskInfoKey
=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
t
.
getTask
ID
())
err
=
scheduler
.
client
.
Save
(
taskInfoKey
,
strconv
.
Itoa
(
int
(
taskDoing
)))
if
err
!=
nil
{
trace
.
LogError
(
span
,
err
)
t
.
S
etResultInfo
(
err
)
t
.
s
etResultInfo
(
err
)
return
err
}
t
.
S
etState
(
taskDoing
)
t
.
s
etState
(
taskDoing
)
// task execute
span
.
LogFields
(
oplog
.
Int64
(
"processTask: scheduler process Execute"
,
t
.
ID
()))
err
=
t
.
E
xecute
(
ctx
)
span
.
LogFields
(
oplog
.
Int64
(
"processTask: scheduler process Execute"
,
t
.
getTask
ID
()))
err
=
t
.
e
xecute
(
ctx
)
if
err
!=
nil
{
trace
.
LogError
(
span
,
err
)
return
err
...
...
@@ -498,16 +498,16 @@ func (scheduler *TaskScheduler) processTask(t task) error {
err
=
updateKVFn
(
t
)
if
err
!=
nil
{
trace
.
LogError
(
span
,
err
)
t
.
S
etResultInfo
(
err
)
t
.
s
etResultInfo
(
err
)
return
err
}
log
.
Debug
(
"processTask: update etcd success"
,
zap
.
Int64
(
"parent taskID"
,
t
.
ID
()))
if
t
.
Type
()
==
commonpb
.
MsgType_LoadCollection
||
t
.
Type
()
==
commonpb
.
MsgType_LoadPartitions
{
t
.
N
otify
(
nil
)
log
.
Debug
(
"processTask: update etcd success"
,
zap
.
Int64
(
"parent taskID"
,
t
.
getTask
ID
()))
if
t
.
msgType
()
==
commonpb
.
MsgType_LoadCollection
||
t
.
msg
Type
()
==
commonpb
.
MsgType_LoadPartitions
{
t
.
n
otify
(
nil
)
}
t
.
S
etState
(
taskDone
)
t
.
U
pdateTaskProcess
()
t
.
s
etState
(
taskDone
)
t
.
u
pdateTaskProcess
()
return
nil
}
...
...
@@ -521,7 +521,7 @@ func (scheduler *TaskScheduler) scheduleLoop() {
log
.
Debug
(
"scheduleLoop: num of child task"
,
zap
.
Int
(
"num child task"
,
len
(
activateTasks
)))
for
_
,
childTask
:=
range
activateTasks
{
if
childTask
!=
nil
{
log
.
Debug
(
"scheduleLoop: add a activate task to activateChan"
,
zap
.
Int64
(
"taskID"
,
childTask
.
ID
()))
log
.
Debug
(
"scheduleLoop: add a activate task to activateChan"
,
zap
.
Int64
(
"taskID"
,
childTask
.
getTask
ID
()))
scheduler
.
activateTaskChan
<-
childTask
activeTaskWg
.
Add
(
1
)
go
scheduler
.
waitActivateTaskDone
(
activeTaskWg
,
childTask
,
triggerTask
)
...
...
@@ -535,10 +535,10 @@ func (scheduler *TaskScheduler) scheduleLoop() {
removes
:=
make
([]
string
,
0
)
childTaskIDs
:=
make
([]
int64
,
0
)
for
_
,
t
:=
range
originInternalTasks
{
childTaskIDs
=
append
(
childTaskIDs
,
t
.
ID
())
taskKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
activeTaskPrefix
,
t
.
ID
())
childTaskIDs
=
append
(
childTaskIDs
,
t
.
getTask
ID
())
taskKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
activeTaskPrefix
,
t
.
getTask
ID
())
removes
=
append
(
removes
,
taskKey
)
stateKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
t
.
ID
())
stateKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
t
.
getTask
ID
())
removes
=
append
(
removes
,
stateKey
)
}
...
...
@@ -547,14 +547,14 @@ func (scheduler *TaskScheduler) scheduleLoop() {
if
err
!=
nil
{
return
err
}
t
.
Set
ID
(
id
)
taskKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
activeTaskPrefix
,
t
.
ID
())
blobs
,
err
:=
t
.
M
arshal
()
t
.
setTask
ID
(
id
)
taskKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
activeTaskPrefix
,
t
.
getTask
ID
())
blobs
,
err
:=
t
.
m
arshal
()
if
err
!=
nil
{
return
err
}
saves
[
taskKey
]
=
string
(
blobs
)
stateKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
t
.
ID
())
stateKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
t
.
getTask
ID
())
saves
[
stateKey
]
=
strconv
.
Itoa
(
int
(
taskUndo
))
}
...
...
@@ -563,10 +563,10 @@ func (scheduler *TaskScheduler) scheduleLoop() {
return
err
}
for
_
,
taskID
:=
range
childTaskIDs
{
triggerTask
.
R
emoveChildTaskByID
(
taskID
)
triggerTask
.
r
emoveChildTaskByID
(
taskID
)
}
for
_
,
t
:=
range
rollBackTasks
{
triggerTask
.
A
ddChildTask
(
t
)
triggerTask
.
a
ddChildTask
(
t
)
}
return
nil
...
...
@@ -574,14 +574,14 @@ func (scheduler *TaskScheduler) scheduleLoop() {
removeTaskFromKVFn
:=
func
(
triggerTask
task
)
error
{
keys
:=
make
([]
string
,
0
)
taskKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
triggerTaskPrefix
,
triggerTask
.
ID
())
stateKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
triggerTask
.
ID
())
taskKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
triggerTaskPrefix
,
triggerTask
.
getTask
ID
())
stateKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
triggerTask
.
getTask
ID
())
keys
=
append
(
keys
,
taskKey
)
keys
=
append
(
keys
,
stateKey
)
childTasks
:=
triggerTask
.
G
etChildTask
()
childTasks
:=
triggerTask
.
g
etChildTask
()
for
_
,
t
:=
range
childTasks
{
taskKey
=
fmt
.
Sprintf
(
"%s/%d"
,
activeTaskPrefix
,
t
.
ID
())
stateKey
=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
t
.
ID
())
taskKey
=
fmt
.
Sprintf
(
"%s/%d"
,
activeTaskPrefix
,
t
.
getTask
ID
())
stateKey
=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
t
.
getTask
ID
())
keys
=
append
(
keys
,
taskKey
)
keys
=
append
(
keys
,
stateKey
)
}
...
...
@@ -600,36 +600,36 @@ func (scheduler *TaskScheduler) scheduleLoop() {
return
case
<-
scheduler
.
triggerTaskQueue
.
Chan
()
:
triggerTask
=
scheduler
.
triggerTaskQueue
.
PopTask
()
log
.
Debug
(
"scheduleLoop: pop a triggerTask from triggerTaskQueue"
,
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
ID
()))
log
.
Debug
(
"scheduleLoop: pop a triggerTask from triggerTaskQueue"
,
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
getTask
ID
()))
alreadyNotify
:=
true
if
triggerTask
.
State
()
==
taskUndo
||
triggerTask
.
State
()
==
taskDoing
{
if
triggerTask
.
getState
()
==
taskUndo
||
triggerTask
.
get
State
()
==
taskDoing
{
err
=
scheduler
.
processTask
(
triggerTask
)
if
err
!=
nil
{
log
.
Debug
(
"scheduleLoop: process triggerTask failed"
,
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
ID
()),
zap
.
Error
(
err
))
log
.
Debug
(
"scheduleLoop: process triggerTask failed"
,
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
getTask
ID
()),
zap
.
Error
(
err
))
alreadyNotify
=
false
}
}
if
triggerTask
.
Type
()
!=
commonpb
.
MsgType_LoadCollection
&&
triggerTask
.
Type
()
!=
commonpb
.
MsgType_LoadPartitions
{
if
triggerTask
.
msgType
()
!=
commonpb
.
MsgType_LoadCollection
&&
triggerTask
.
msg
Type
()
!=
commonpb
.
MsgType_LoadPartitions
{
alreadyNotify
=
false
}
childTasks
:=
triggerTask
.
G
etChildTask
()
childTasks
:=
triggerTask
.
g
etChildTask
()
if
len
(
childTasks
)
!=
0
{
activateTasks
:=
make
([]
task
,
len
(
childTasks
))
copy
(
activateTasks
,
childTasks
)
processInternalTaskFn
(
activateTasks
,
triggerTask
)
resultStatus
:=
triggerTask
.
G
etResultInfo
()
resultStatus
:=
triggerTask
.
g
etResultInfo
()
if
resultStatus
.
ErrorCode
!=
commonpb
.
ErrorCode_Success
{
rollBackTasks
:=
triggerTask
.
R
ollBack
(
scheduler
.
ctx
)
rollBackTasks
:=
triggerTask
.
r
ollBack
(
scheduler
.
ctx
)
log
.
Debug
(
"scheduleLoop: start rollBack after triggerTask failed"
,
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
getTask
ID
()),
zap
.
Any
(
"rollBackTasks"
,
rollBackTasks
))
err
=
rollBackInterTaskFn
(
triggerTask
,
childTasks
,
rollBackTasks
)
if
err
!=
nil
{
log
.
Error
(
"scheduleLoop: rollBackInternalTask error"
,
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
getTask
ID
()),
zap
.
Error
(
err
))
triggerTask
.
S
etResultInfo
(
err
)
triggerTask
.
s
etResultInfo
(
err
)
}
else
{
processInternalTaskFn
(
rollBackTasks
,
triggerTask
)
}
...
...
@@ -638,23 +638,23 @@ func (scheduler *TaskScheduler) scheduleLoop() {
err
=
removeTaskFromKVFn
(
triggerTask
)
if
err
!=
nil
{
log
.
Error
(
"scheduleLoop: error when remove trigger and internal tasks from etcd"
,
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
ID
()),
zap
.
Error
(
err
))
triggerTask
.
S
etResultInfo
(
err
)
log
.
Error
(
"scheduleLoop: error when remove trigger and internal tasks from etcd"
,
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
getTask
ID
()),
zap
.
Error
(
err
))
triggerTask
.
s
etResultInfo
(
err
)
}
else
{
log
.
Debug
(
"scheduleLoop: trigger task done and delete from etcd"
,
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
ID
()))
log
.
Debug
(
"scheduleLoop: trigger task done and delete from etcd"
,
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
getTask
ID
()))
}
resultStatus
:=
triggerTask
.
G
etResultInfo
()
resultStatus
:=
triggerTask
.
g
etResultInfo
()
if
resultStatus
.
ErrorCode
!=
commonpb
.
ErrorCode_Success
{
triggerTask
.
S
etState
(
taskFailed
)
triggerTask
.
s
etState
(
taskFailed
)
if
!
alreadyNotify
{
triggerTask
.
N
otify
(
errors
.
New
(
resultStatus
.
Reason
))
triggerTask
.
n
otify
(
errors
.
New
(
resultStatus
.
Reason
))
}
}
else
{
triggerTask
.
U
pdateTaskProcess
()
triggerTask
.
S
etState
(
taskExpired
)
triggerTask
.
u
pdateTaskProcess
()
triggerTask
.
s
etState
(
taskExpired
)
if
!
alreadyNotify
{
triggerTask
.
N
otify
(
nil
)
triggerTask
.
n
otify
(
nil
)
}
}
}
...
...
@@ -665,23 +665,23 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task,
defer
wg
.
Done
()
var
err
error
redoFunc1
:=
func
()
{
if
!
t
.
IsValid
()
||
!
t
.
I
sRetryable
()
{
if
!
t
.
isValid
()
||
!
t
.
i
sRetryable
()
{
log
.
Debug
(
"waitActivateTaskDone: reSchedule the activate task"
,
zap
.
Int64
(
"taskID"
,
t
.
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
ID
()))
reScheduledTasks
,
err
:=
t
.
R
eschedule
(
scheduler
.
ctx
)
zap
.
Int64
(
"taskID"
,
t
.
getTask
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
getTask
ID
()))
reScheduledTasks
,
err
:=
t
.
r
eschedule
(
scheduler
.
ctx
)
if
err
!=
nil
{
log
.
Error
(
"waitActivateTaskDone: reschedule task error"
,
zap
.
Int64
(
"taskID"
,
t
.
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
ID
()),
zap
.
Int64
(
"taskID"
,
t
.
getTask
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
getTask
ID
()),
zap
.
Error
(
err
))
triggerTask
.
S
etResultInfo
(
err
)
triggerTask
.
s
etResultInfo
(
err
)
return
}
removes
:=
make
([]
string
,
0
)
taskKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
activeTaskPrefix
,
t
.
ID
())
taskKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
activeTaskPrefix
,
t
.
getTask
ID
())
removes
=
append
(
removes
,
taskKey
)
stateKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
t
.
ID
())
stateKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
t
.
getTask
ID
())
removes
=
append
(
removes
,
stateKey
)
saves
:=
make
(
map
[
string
]
string
)
...
...
@@ -690,44 +690,44 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task,
id
,
err
:=
scheduler
.
taskIDAllocator
()
if
err
!=
nil
{
log
.
Error
(
"waitActivateTaskDone: allocate id error"
,
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
getTask
ID
()),
zap
.
Error
(
err
))
triggerTask
.
S
etResultInfo
(
err
)
triggerTask
.
s
etResultInfo
(
err
)
return
}
rt
.
Set
ID
(
id
)
log
.
Debug
(
"waitActivateTaskDone: reScheduler set id"
,
zap
.
Int64
(
"id"
,
rt
.
ID
()))
taskKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
activeTaskPrefix
,
rt
.
ID
())
blobs
,
err
:=
rt
.
M
arshal
()
rt
.
setTask
ID
(
id
)
log
.
Debug
(
"waitActivateTaskDone: reScheduler set id"
,
zap
.
Int64
(
"id"
,
rt
.
getTask
ID
()))
taskKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
activeTaskPrefix
,
rt
.
getTask
ID
())
blobs
,
err
:=
rt
.
m
arshal
()
if
err
!=
nil
{
log
.
Error
(
"waitActivateTaskDone: error when marshal active task"
,
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
getTask
ID
()),
zap
.
Error
(
err
))
triggerTask
.
S
etResultInfo
(
err
)
triggerTask
.
s
etResultInfo
(
err
)
return
}
saves
[
taskKey
]
=
string
(
blobs
)
stateKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
rt
.
ID
())
stateKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
taskInfoPrefix
,
rt
.
getTask
ID
())
saves
[
stateKey
]
=
strconv
.
Itoa
(
int
(
taskUndo
))
}
}
//TODO::queryNode auto watch queryChannel, then update etcd use same id directly
err
=
scheduler
.
client
.
MultiSaveAndRemove
(
saves
,
removes
)
if
err
!=
nil
{
log
.
Error
(
"waitActivateTaskDone: error when save and remove task from etcd"
,
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
ID
()))
triggerTask
.
S
etResultInfo
(
err
)
log
.
Error
(
"waitActivateTaskDone: error when save and remove task from etcd"
,
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
getTask
ID
()))
triggerTask
.
s
etResultInfo
(
err
)
return
}
triggerTask
.
RemoveChildTaskByID
(
t
.
ID
())
triggerTask
.
removeChildTaskByID
(
t
.
getTask
ID
())
log
.
Debug
(
"waitActivateTaskDone: delete failed active task and save reScheduled task to etcd"
,
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
ID
()),
zap
.
Int64
(
"failed taskID"
,
t
.
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
getTask
ID
()),
zap
.
Int64
(
"failed taskID"
,
t
.
getTask
ID
()),
zap
.
Any
(
"reScheduled tasks"
,
reScheduledTasks
))
for
_
,
rt
:=
range
reScheduledTasks
{
if
rt
!=
nil
{
triggerTask
.
A
ddChildTask
(
rt
)
log
.
Debug
(
"waitActivateTaskDone: add a reScheduled active task to activateChan"
,
zap
.
Int64
(
"taskID"
,
rt
.
ID
()))
triggerTask
.
a
ddChildTask
(
rt
)
log
.
Debug
(
"waitActivateTaskDone: add a reScheduled active task to activateChan"
,
zap
.
Int64
(
"taskID"
,
rt
.
getTask
ID
()))
scheduler
.
activateTaskChan
<-
rt
wg
.
Add
(
1
)
go
scheduler
.
waitActivateTaskDone
(
wg
,
rt
,
triggerTask
)
...
...
@@ -736,8 +736,8 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task,
//delete task from etcd
}
else
{
log
.
Debug
(
"waitActivateTaskDone: retry the active task"
,
zap
.
Int64
(
"taskID"
,
t
.
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
ID
()))
zap
.
Int64
(
"taskID"
,
t
.
getTask
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
getTask
ID
()))
scheduler
.
activateTaskChan
<-
t
wg
.
Add
(
1
)
go
scheduler
.
waitActivateTaskDone
(
wg
,
t
,
triggerTask
)
...
...
@@ -745,30 +745,30 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task,
}
redoFunc2
:=
func
(
err
error
)
{
if
t
.
I
sValid
()
{
if
!
t
.
I
sRetryable
()
{
if
t
.
i
sValid
()
{
if
!
t
.
i
sRetryable
()
{
log
.
Error
(
"waitActivateTaskDone: activate task failed after retry"
,
zap
.
Int64
(
"taskID"
,
t
.
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
ID
()))
triggerTask
.
S
etResultInfo
(
err
)
zap
.
Int64
(
"taskID"
,
t
.
getTask
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
getTask
ID
()))
triggerTask
.
s
etResultInfo
(
err
)
return
}
log
.
Debug
(
"waitActivateTaskDone: retry the active task"
,
zap
.
Int64
(
"taskID"
,
t
.
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
ID
()))
zap
.
Int64
(
"taskID"
,
t
.
getTask
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
getTask
ID
()))
scheduler
.
activateTaskChan
<-
t
wg
.
Add
(
1
)
go
scheduler
.
waitActivateTaskDone
(
wg
,
t
,
triggerTask
)
}
}
err
=
t
.
W
aitToFinish
()
err
=
t
.
w
aitToFinish
()
if
err
!=
nil
{
log
.
Debug
(
"waitActivateTaskDone: activate task return err"
,
zap
.
Int64
(
"taskID"
,
t
.
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
ID
()),
zap
.
Int64
(
"taskID"
,
t
.
getTask
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
getTask
ID
()),
zap
.
Error
(
err
))
switch
t
.
Type
()
{
switch
t
.
msg
Type
()
{
case
commonpb
.
MsgType_LoadSegments
:
redoFunc1
()
case
commonpb
.
MsgType_WatchDmChannels
:
...
...
@@ -786,8 +786,8 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task,
}
}
else
{
log
.
Debug
(
"waitActivateTaskDone: one activate task done"
,
zap
.
Int64
(
"taskID"
,
t
.
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
ID
()))
zap
.
Int64
(
"taskID"
,
t
.
getTask
ID
()),
zap
.
Int64
(
"triggerTaskID"
,
triggerTask
.
getTask
ID
()))
}
}
...
...
@@ -801,14 +801,14 @@ func (scheduler *TaskScheduler) processActivateTaskLoop() {
case
t
:=
<-
scheduler
.
activateTaskChan
:
if
t
==
nil
{
log
.
Error
(
"processActivateTaskLoop: pop a nil active task"
,
zap
.
Int64
(
"taskID"
,
t
.
ID
()))
log
.
Error
(
"processActivateTaskLoop: pop a nil active task"
,
zap
.
Int64
(
"taskID"
,
t
.
getTask
ID
()))
continue
}
log
.
Debug
(
"processActivateTaskLoop: pop a active task from activateChan"
,
zap
.
Int64
(
"taskID"
,
t
.
ID
()))
log
.
Debug
(
"processActivateTaskLoop: pop a active task from activateChan"
,
zap
.
Int64
(
"taskID"
,
t
.
getTask
ID
()))
go
func
()
{
err
:=
scheduler
.
processTask
(
t
)
t
.
N
otify
(
err
)
t
.
n
otify
(
err
)
}()
}
}
...
...
internal/querycoord/task_scheduler_test.go
浏览文件 @
e80fbd1f
...
...
@@ -32,29 +32,29 @@ type testTask struct {
nodeID
int64
}
func
(
tt
*
testTask
)
M
sgBase
()
*
commonpb
.
MsgBase
{
func
(
tt
*
testTask
)
m
sgBase
()
*
commonpb
.
MsgBase
{
return
tt
.
baseMsg
}
func
(
tt
*
testTask
)
M
arshal
()
([]
byte
,
error
)
{
func
(
tt
*
testTask
)
m
arshal
()
([]
byte
,
error
)
{
return
[]
byte
{},
nil
}
func
(
tt
*
testTask
)
Type
()
commonpb
.
MsgType
{
func
(
tt
*
testTask
)
msg
Type
()
commonpb
.
MsgType
{
return
tt
.
baseMsg
.
MsgType
}
func
(
tt
*
testTask
)
T
imestamp
()
Timestamp
{
func
(
tt
*
testTask
)
t
imestamp
()
Timestamp
{
return
tt
.
baseMsg
.
Timestamp
}
func
(
tt
*
testTask
)
P
reExecute
(
ctx
context
.
Context
)
error
{
tt
.
S
etResultInfo
(
nil
)
func
(
tt
*
testTask
)
p
reExecute
(
ctx
context
.
Context
)
error
{
tt
.
s
etResultInfo
(
nil
)
log
.
Debug
(
"test task preExecute..."
)
return
nil
}
func
(
tt
*
testTask
)
E
xecute
(
ctx
context
.
Context
)
error
{
func
(
tt
*
testTask
)
e
xecute
(
ctx
context
.
Context
)
error
{
log
.
Debug
(
"test task execute..."
)
switch
tt
.
baseMsg
.
MsgType
{
...
...
@@ -75,7 +75,7 @@ func (tt *testTask) Execute(ctx context.Context) error {
cluster
:
tt
.
cluster
,
excludeNodeIDs
:
[]
int64
{},
}
tt
.
A
ddChildTask
(
childTask
)
tt
.
a
ddChildTask
(
childTask
)
case
commonpb
.
MsgType_WatchDmChannels
:
childTask
:=
&
WatchDmChannelTask
{
BaseTask
:
&
BaseTask
{
...
...
@@ -93,7 +93,7 @@ func (tt *testTask) Execute(ctx context.Context) error {
meta
:
tt
.
meta
,
excludeNodeIDs
:
[]
int64
{},
}
tt
.
A
ddChildTask
(
childTask
)
tt
.
a
ddChildTask
(
childTask
)
case
commonpb
.
MsgType_WatchQueryChannels
:
childTask
:=
&
WatchQueryChannelTask
{
BaseTask
:
&
BaseTask
{
...
...
@@ -109,13 +109,13 @@ func (tt *testTask) Execute(ctx context.Context) error {
},
cluster
:
tt
.
cluster
,
}
tt
.
A
ddChildTask
(
childTask
)
tt
.
a
ddChildTask
(
childTask
)
}
return
nil
}
func
(
tt
*
testTask
)
P
ostExecute
(
ctx
context
.
Context
)
error
{
func
(
tt
*
testTask
)
p
ostExecute
(
ctx
context
.
Context
)
error
{
log
.
Debug
(
"test task postExecute..."
)
return
nil
}
...
...
@@ -181,7 +181,7 @@ func TestUnMarshalTask(t *testing.T) {
},
},
}
blobs
,
err
:=
loadTask
.
M
arshal
()
blobs
,
err
:=
loadTask
.
m
arshal
()
assert
.
Nil
(
t
,
err
)
err
=
kv
.
Save
(
"testMarshalLoadCollection"
,
string
(
blobs
))
assert
.
Nil
(
t
,
err
)
...
...
@@ -191,7 +191,7 @@ func TestUnMarshalTask(t *testing.T) {
task
,
err
:=
taskScheduler
.
unmarshalTask
(
1000
,
value
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
task
.
Type
(),
commonpb
.
MsgType_LoadCollection
)
assert
.
Equal
(
t
,
task
.
msg
Type
(),
commonpb
.
MsgType_LoadCollection
)
})
t
.
Run
(
"Test LoadPartitionsTask"
,
func
(
t
*
testing
.
T
)
{
...
...
@@ -202,7 +202,7 @@ func TestUnMarshalTask(t *testing.T) {
},
},
}
blobs
,
err
:=
loadTask
.
M
arshal
()
blobs
,
err
:=
loadTask
.
m
arshal
()
assert
.
Nil
(
t
,
err
)
err
=
kv
.
Save
(
"testMarshalLoadPartition"
,
string
(
blobs
))
assert
.
Nil
(
t
,
err
)
...
...
@@ -212,7 +212,7 @@ func TestUnMarshalTask(t *testing.T) {
task
,
err
:=
taskScheduler
.
unmarshalTask
(
1001
,
value
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
task
.
Type
(),
commonpb
.
MsgType_LoadPartitions
)
assert
.
Equal
(
t
,
task
.
msg
Type
(),
commonpb
.
MsgType_LoadPartitions
)
})
t
.
Run
(
"Test ReleaseCollectionTask"
,
func
(
t
*
testing
.
T
)
{
...
...
@@ -223,7 +223,7 @@ func TestUnMarshalTask(t *testing.T) {
},
},
}
blobs
,
err
:=
releaseTask
.
M
arshal
()
blobs
,
err
:=
releaseTask
.
m
arshal
()
assert
.
Nil
(
t
,
err
)
err
=
kv
.
Save
(
"testMarshalReleaseCollection"
,
string
(
blobs
))
assert
.
Nil
(
t
,
err
)
...
...
@@ -233,7 +233,7 @@ func TestUnMarshalTask(t *testing.T) {
task
,
err
:=
taskScheduler
.
unmarshalTask
(
1002
,
value
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
task
.
Type
(),
commonpb
.
MsgType_ReleaseCollection
)
assert
.
Equal
(
t
,
task
.
msg
Type
(),
commonpb
.
MsgType_ReleaseCollection
)
})
t
.
Run
(
"Test ReleasePartitionTask"
,
func
(
t
*
testing
.
T
)
{
...
...
@@ -244,7 +244,7 @@ func TestUnMarshalTask(t *testing.T) {
},
},
}
blobs
,
err
:=
releaseTask
.
M
arshal
()
blobs
,
err
:=
releaseTask
.
m
arshal
()
assert
.
Nil
(
t
,
err
)
err
=
kv
.
Save
(
"testMarshalReleasePartition"
,
string
(
blobs
))
assert
.
Nil
(
t
,
err
)
...
...
@@ -254,7 +254,7 @@ func TestUnMarshalTask(t *testing.T) {
task
,
err
:=
taskScheduler
.
unmarshalTask
(
1003
,
value
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
task
.
Type
(),
commonpb
.
MsgType_ReleasePartitions
)
assert
.
Equal
(
t
,
task
.
msg
Type
(),
commonpb
.
MsgType_ReleasePartitions
)
})
t
.
Run
(
"Test LoadSegmentTask"
,
func
(
t
*
testing
.
T
)
{
...
...
@@ -265,7 +265,7 @@ func TestUnMarshalTask(t *testing.T) {
},
},
}
blobs
,
err
:=
loadTask
.
M
arshal
()
blobs
,
err
:=
loadTask
.
m
arshal
()
assert
.
Nil
(
t
,
err
)
err
=
kv
.
Save
(
"testMarshalLoadSegment"
,
string
(
blobs
))
assert
.
Nil
(
t
,
err
)
...
...
@@ -275,7 +275,7 @@ func TestUnMarshalTask(t *testing.T) {
task
,
err
:=
taskScheduler
.
unmarshalTask
(
1004
,
value
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
task
.
Type
(),
commonpb
.
MsgType_LoadSegments
)
assert
.
Equal
(
t
,
task
.
msg
Type
(),
commonpb
.
MsgType_LoadSegments
)
})
t
.
Run
(
"Test ReleaseSegmentTask"
,
func
(
t
*
testing
.
T
)
{
...
...
@@ -286,7 +286,7 @@ func TestUnMarshalTask(t *testing.T) {
},
},
}
blobs
,
err
:=
releaseTask
.
M
arshal
()
blobs
,
err
:=
releaseTask
.
m
arshal
()
assert
.
Nil
(
t
,
err
)
err
=
kv
.
Save
(
"testMarshalReleaseSegment"
,
string
(
blobs
))
assert
.
Nil
(
t
,
err
)
...
...
@@ -296,7 +296,7 @@ func TestUnMarshalTask(t *testing.T) {
task
,
err
:=
taskScheduler
.
unmarshalTask
(
1005
,
value
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
task
.
Type
(),
commonpb
.
MsgType_ReleaseSegments
)
assert
.
Equal
(
t
,
task
.
msg
Type
(),
commonpb
.
MsgType_ReleaseSegments
)
})
t
.
Run
(
"Test WatchDmChannelTask"
,
func
(
t
*
testing
.
T
)
{
...
...
@@ -307,7 +307,7 @@ func TestUnMarshalTask(t *testing.T) {
},
},
}
blobs
,
err
:=
watchTask
.
M
arshal
()
blobs
,
err
:=
watchTask
.
m
arshal
()
assert
.
Nil
(
t
,
err
)
err
=
kv
.
Save
(
"testMarshalWatchDmChannel"
,
string
(
blobs
))
assert
.
Nil
(
t
,
err
)
...
...
@@ -317,7 +317,7 @@ func TestUnMarshalTask(t *testing.T) {
task
,
err
:=
taskScheduler
.
unmarshalTask
(
1006
,
value
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
task
.
Type
(),
commonpb
.
MsgType_WatchDmChannels
)
assert
.
Equal
(
t
,
task
.
msg
Type
(),
commonpb
.
MsgType_WatchDmChannels
)
})
t
.
Run
(
"Test WatchQueryChannelTask"
,
func
(
t
*
testing
.
T
)
{
...
...
@@ -328,7 +328,7 @@ func TestUnMarshalTask(t *testing.T) {
},
},
}
blobs
,
err
:=
watchTask
.
M
arshal
()
blobs
,
err
:=
watchTask
.
m
arshal
()
assert
.
Nil
(
t
,
err
)
err
=
kv
.
Save
(
"testMarshalWatchQueryChannel"
,
string
(
blobs
))
assert
.
Nil
(
t
,
err
)
...
...
@@ -338,7 +338,7 @@ func TestUnMarshalTask(t *testing.T) {
task
,
err
:=
taskScheduler
.
unmarshalTask
(
1007
,
value
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
task
.
Type
(),
commonpb
.
MsgType_WatchQueryChannels
)
assert
.
Equal
(
t
,
task
.
msg
Type
(),
commonpb
.
MsgType_WatchQueryChannels
)
})
t
.
Run
(
"Test LoadBalanceTask"
,
func
(
t
*
testing
.
T
)
{
...
...
@@ -350,7 +350,7 @@ func TestUnMarshalTask(t *testing.T) {
},
}
blobs
,
err
:=
loadBalanceTask
.
M
arshal
()
blobs
,
err
:=
loadBalanceTask
.
m
arshal
()
assert
.
Nil
(
t
,
err
)
err
=
kv
.
Save
(
"testMarshalLoadBalanceTask"
,
string
(
blobs
))
assert
.
Nil
(
t
,
err
)
...
...
@@ -360,7 +360,7 @@ func TestUnMarshalTask(t *testing.T) {
task
,
err
:=
taskScheduler
.
unmarshalTask
(
1008
,
value
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
task
.
Type
(),
commonpb
.
MsgType_LoadBalanceSegments
)
assert
.
Equal
(
t
,
task
.
msg
Type
(),
commonpb
.
MsgType_LoadBalanceSegments
)
})
taskScheduler
.
Close
()
...
...
@@ -387,7 +387,7 @@ func TestReloadTaskFromKV(t *testing.T) {
},
},
}
triggerBlobs
,
err
:=
triggerTask
.
M
arshal
()
triggerBlobs
,
err
:=
triggerTask
.
m
arshal
()
assert
.
Nil
(
t
,
err
)
triggerTaskKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
triggerTaskPrefix
,
100
)
kvs
[
triggerTaskKey
]
=
string
(
triggerBlobs
)
...
...
@@ -400,7 +400,7 @@ func TestReloadTaskFromKV(t *testing.T) {
},
},
}
activeBlobs
,
err
:=
activeTask
.
M
arshal
()
activeBlobs
,
err
:=
activeTask
.
m
arshal
()
assert
.
Nil
(
t
,
err
)
activeTaskKey
:=
fmt
.
Sprintf
(
"%s/%d"
,
activeTaskPrefix
,
101
)
kvs
[
activeTaskKey
]
=
string
(
activeBlobs
)
...
...
@@ -413,6 +413,6 @@ func TestReloadTaskFromKV(t *testing.T) {
taskScheduler
.
reloadFromKV
()
task
:=
taskScheduler
.
triggerTaskQueue
.
PopTask
()
assert
.
Equal
(
t
,
taskDone
,
task
.
State
())
assert
.
Equal
(
t
,
1
,
len
(
task
.
G
etChildTask
()))
assert
.
Equal
(
t
,
taskDone
,
task
.
get
State
())
assert
.
Equal
(
t
,
1
,
len
(
task
.
g
etChildTask
()))
}
internal/querycoord/task_test.go
浏览文件 @
e80fbd1f
...
...
@@ -159,10 +159,10 @@ func genWatchDmChannelTask(ctx context.Context, queryCoord *QueryCoord, nodeID i
meta
:
queryCoord
.
meta
,
cluster
:
queryCoord
.
cluster
,
}
parentTask
.
S
etState
(
taskDone
)
parentTask
.
S
etResultInfo
(
nil
)
parentTask
.
A
ddChildTask
(
watchDmChannelTask
)
watchDmChannelTask
.
S
etParentTask
(
parentTask
)
parentTask
.
s
etState
(
taskDone
)
parentTask
.
s
etResultInfo
(
nil
)
parentTask
.
a
ddChildTask
(
watchDmChannelTask
)
watchDmChannelTask
.
s
etParentTask
(
parentTask
)
queryCoord
.
meta
.
addCollection
(
defaultCollectionID
,
schema
)
return
watchDmChannelTask
...
...
@@ -209,10 +209,10 @@ func genLoadSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID int6
meta
:
queryCoord
.
meta
,
cluster
:
queryCoord
.
cluster
,
}
parentTask
.
S
etState
(
taskDone
)
parentTask
.
S
etResultInfo
(
nil
)
parentTask
.
A
ddChildTask
(
loadSegmentTask
)
loadSegmentTask
.
S
etParentTask
(
parentTask
)
parentTask
.
s
etState
(
taskDone
)
parentTask
.
s
etResultInfo
(
nil
)
parentTask
.
a
ddChildTask
(
loadSegmentTask
)
loadSegmentTask
.
s
etParentTask
(
parentTask
)
queryCoord
.
meta
.
addCollection
(
defaultCollectionID
,
schema
)
return
loadSegmentTask
...
...
@@ -220,7 +220,7 @@ func genLoadSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID int6
func
waitTaskFinalState
(
t
task
,
state
taskState
)
{
for
{
if
t
.
State
()
==
state
{
if
t
.
get
State
()
==
state
{
break
}
}
...
...
@@ -291,7 +291,7 @@ func Test_LoadCollectionAfterLoadPartition(t *testing.T) {
err
=
queryCoord
.
scheduler
.
Enqueue
(
releaseCollectionTask
)
assert
.
Nil
(
t
,
err
)
err
=
releaseCollectionTask
.
W
aitToFinish
()
err
=
releaseCollectionTask
.
w
aitToFinish
()
assert
.
Nil
(
t
,
err
)
node
.
stop
()
...
...
@@ -323,7 +323,7 @@ func Test_RepeatLoadCollection(t *testing.T) {
err
=
queryCoord
.
scheduler
.
Enqueue
(
releaseCollectionTask
)
assert
.
Nil
(
t
,
err
)
err
=
releaseCollectionTask
.
W
aitToFinish
()
err
=
releaseCollectionTask
.
w
aitToFinish
()
assert
.
Nil
(
t
,
err
)
node
.
stop
()
...
...
@@ -342,7 +342,7 @@ func Test_LoadCollectionAssignTaskFail(t *testing.T) {
err
=
queryCoord
.
scheduler
.
Enqueue
(
loadCollectionTask
)
assert
.
Nil
(
t
,
err
)
err
=
loadCollectionTask
.
W
aitToFinish
()
err
=
loadCollectionTask
.
w
aitToFinish
()
assert
.
NotNil
(
t
,
err
)
queryCoord
.
Stop
()
...
...
@@ -384,7 +384,7 @@ func Test_LoadPartitionAssignTaskFail(t *testing.T) {
err
=
queryCoord
.
scheduler
.
Enqueue
(
loadPartitionTask
)
assert
.
Nil
(
t
,
err
)
err
=
loadPartitionTask
.
W
aitToFinish
()
err
=
loadPartitionTask
.
w
aitToFinish
()
assert
.
NotNil
(
t
,
err
)
queryCoord
.
Stop
()
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录