未验证 提交 3a42122b 编写于 作者: J Jenny Li 提交者: GitHub

Task/Action won't finish util the RPC returned (#20669) (#20691)

Signed-off-by: Nyah01 <yang.cen@zilliz.com>
Signed-off-by: Nyah01 <yang.cen@zilliz.com>
Signed-off-by: Nyah01 <yang.cen@zilliz.com>
Co-authored-by: Nyah01 <yang.cen@zilliz.com>
上级 dbcbe4d8
......@@ -31,11 +31,6 @@ import (
"go.uber.org/zap"
)
type actionIndex struct {
Task int64
Step int
}
type Executor struct {
doneCh chan struct{}
wg sync.WaitGroup
......@@ -49,7 +44,7 @@ type Executor struct {
// Merge load segment requests
merger *Merger[segmentIndex, *querypb.LoadSegmentsRequest]
executingActions sync.Map
executingTasks sync.Map
}
func NewExecutor(meta *meta.Meta,
......@@ -68,7 +63,7 @@ func NewExecutor(meta *meta.Meta,
nodeMgr: nodeMgr,
merger: NewMerger[segmentIndex, *querypb.LoadSegmentsRequest](),
executingActions: sync.Map{},
executingTasks: sync.Map{},
}
}
......@@ -86,11 +81,7 @@ func (ex *Executor) Stop() {
// does nothing and returns false if the action is already committed,
// returns true otherwise.
func (ex *Executor) Execute(task Task, step int) bool {
index := actionIndex{
Task: task.ID(),
Step: step,
}
_, exist := ex.executingActions.LoadOrStore(index, struct{}{})
_, exist := ex.executingTasks.LoadOrStore(task.ID(), struct{}{})
if exist {
return false
}
......@@ -115,6 +106,11 @@ func (ex *Executor) Execute(task Task, step int) bool {
return true
}
func (ex *Executor) Exist(taskID int64) bool {
_, ok := ex.executingTasks.Load(taskID)
return ok
}
func (ex *Executor) scheduleRequests() {
ex.wg.Add(1)
go func() {
......@@ -191,11 +187,7 @@ func (ex *Executor) removeAction(task Task, step int) {
zap.Error(task.Err()))
}
index := actionIndex{
Task: task.ID(),
Step: step,
}
ex.executingActions.Delete(index)
ex.executingTasks.Delete(task.ID())
}
func (ex *Executor) executeSegmentAction(task *SegmentTask, step int) {
......
......@@ -519,7 +519,7 @@ func (scheduler *taskScheduler) process(task Task) bool {
zap.Int64("source", task.SourceID()),
)
if task.IsFinished(scheduler.distMgr) {
if !scheduler.executor.Exist(task.ID()) && task.IsFinished(scheduler.distMgr) {
task.SetStatus(TaskStatusSucceeded)
} else if scheduler.checkCanceled(task) {
task.SetStatus(TaskStatusCanceled)
......
......@@ -1122,7 +1122,7 @@ func (suite *TaskSuite) dispatchAndWait(node int64) {
for start := time.Now(); time.Since(start) < timeout; {
count = 0
keys = make([]any, 0)
suite.scheduler.executor.executingActions.Range(func(key, value any) bool {
suite.scheduler.executor.executingTasks.Range(func(key, value any) bool {
keys = append(keys, key)
count++
return true
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册