diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 6b154cd0971afba2167970e957090b1fd896ca56..ebf4d6a609fe9e6bbca5c75dac059944581c6edf 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -31,11 +31,6 @@ import ( "go.uber.org/zap" ) -type actionIndex struct { - Task int64 - Step int -} - type Executor struct { doneCh chan struct{} wg sync.WaitGroup @@ -49,7 +44,7 @@ type Executor struct { // Merge load segment requests merger *Merger[segmentIndex, *querypb.LoadSegmentsRequest] - executingActions sync.Map + executingTasks sync.Map } func NewExecutor(meta *meta.Meta, @@ -68,7 +63,7 @@ func NewExecutor(meta *meta.Meta, nodeMgr: nodeMgr, merger: NewMerger[segmentIndex, *querypb.LoadSegmentsRequest](), - executingActions: sync.Map{}, + executingTasks: sync.Map{}, } } @@ -86,11 +81,7 @@ func (ex *Executor) Stop() { // does nothing and returns false if the action is already committed, // returns true otherwise. func (ex *Executor) Execute(task Task, step int) bool { - index := actionIndex{ - Task: task.ID(), - Step: step, - } - _, exist := ex.executingActions.LoadOrStore(index, struct{}{}) + _, exist := ex.executingTasks.LoadOrStore(task.ID(), struct{}{}) if exist { return false } @@ -115,6 +106,11 @@ func (ex *Executor) Execute(task Task, step int) bool { return true } +func (ex *Executor) Exist(taskID int64) bool { + _, ok := ex.executingTasks.Load(taskID) + return ok +} + func (ex *Executor) scheduleRequests() { ex.wg.Add(1) go func() { @@ -191,11 +187,7 @@ func (ex *Executor) removeAction(task Task, step int) { zap.Error(task.Err())) } - index := actionIndex{ - Task: task.ID(), - Step: step, - } - ex.executingActions.Delete(index) + ex.executingTasks.Delete(task.ID()) } func (ex *Executor) executeSegmentAction(task *SegmentTask, step int) { diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 2c54bc8325bc8cd1f0c55a3fa214fbdc2d2f7eb5..da829276266d5322921a874ba9fab5a5e23e49b7 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -519,7 +519,7 @@ func (scheduler *taskScheduler) process(task Task) bool { zap.Int64("source", task.SourceID()), ) - if task.IsFinished(scheduler.distMgr) { + if !scheduler.executor.Exist(task.ID()) && task.IsFinished(scheduler.distMgr) { task.SetStatus(TaskStatusSucceeded) } else if scheduler.checkCanceled(task) { task.SetStatus(TaskStatusCanceled) diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index eff2a25b1dcde54c7a5675df3b5d93c783ecf2ec..025090ee058e4a9dd64f2558c16029a83218ff88 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1122,7 +1122,7 @@ func (suite *TaskSuite) dispatchAndWait(node int64) { for start := time.Now(); time.Since(start) < timeout; { count = 0 keys = make([]any, 0) - suite.scheduler.executor.executingActions.Range(func(key, value any) bool { + suite.scheduler.executor.executingTasks.Range(func(key, value any) bool { keys = append(keys, key) count++ return true