From 4f6b87311e5e99295b12d2bc7b477a77cfb6a87f Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Tue, 1 Nov 2022 14:27:34 +0800 Subject: [PATCH] Refine request merger (#20189) Signed-off-by: xiaofan-luan Signed-off-by: xiaofan-luan --- internal/querycoordv2/meta/target_manager.go | 2 +- internal/querycoordv2/task/executor.go | 20 +++- internal/querycoordv2/task/merger.go | 110 +++++++++---------- internal/querycoordv2/task/merger_test.go | 3 + 4 files changed, 68 insertions(+), 67 deletions(-) diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index db0e0ddeb..2fdf0a802 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -83,7 +83,7 @@ func (mgr *TargetManager) RemoveSegment(segmentID int64) { func (mgr *TargetManager) removeSegment(segmentID int64) { delete(mgr.segments, segmentID) - log.Info("segment removed from targets") + log.Info("segment removed from targets", zap.Int64("segment", segmentID)) } // AddSegment adds segment into target set, diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 59974e67b..6b154cd09 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -133,6 +133,7 @@ func (ex *Executor) scheduleRequests() { } func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) { + startTs := time.Now() task := mergeTask.tasks[0] action := task.Actions()[mergeTask.steps[0]] @@ -152,6 +153,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) { log := log.With( zap.Int64s("taskIDs", taskIDs), zap.Int64("collectionID", task.CollectionID()), + zap.String("shard", task.Shard()), zap.Int64s("segmentIDs", segments), zap.Int64("nodeID", action.Node()), zap.Int64("source", task.SourceID()), @@ -177,7 +179,8 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) { log.Warn("failed to load segment", zap.String("reason", status.GetReason())) return } - log.Info("load segments done") + elapsed := time.Since(startTs) + log.Info("load segments done", zap.Int64("taskID", task.ID()), zap.Duration("timeTaken", elapsed)) } func (ex *Executor) removeAction(task Task, step int) { @@ -275,7 +278,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { func (ex *Executor) releaseSegment(task *SegmentTask, step int) { defer ex.removeAction(task, step) - + startTs := time.Now() action := task.Actions()[step].(*SegmentAction) defer action.isReleaseCommitted.Store(true) @@ -332,7 +335,8 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) { log.Warn("failed to release segment", zap.String("reason", status.GetReason())) return } - log.Info("release segment done") + elapsed := time.Since(startTs) + log.Info("release segment done", zap.Int64("taskID", task.ID()), zap.Duration("time taken", elapsed)) } func (ex *Executor) executeDmChannelAction(task *ChannelTask, step int) { @@ -347,7 +351,7 @@ func (ex *Executor) executeDmChannelAction(task *ChannelTask, step int) { func (ex *Executor) subDmChannel(task *ChannelTask, step int) error { defer ex.removeAction(task, step) - + startTs := time.Now() action := task.Actions()[step].(*ChannelAction) log := log.With( zap.Int64("taskID", task.ID()), @@ -407,13 +411,14 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) error { log.Warn("failed to subscribe DmChannel", zap.String("reason", status.GetReason())) return err } - log.Info("subscribe DmChannel done") + elapsed := time.Since(startTs) + log.Info("subscribe DmChannel done", zap.Int64("taskID", task.ID()), zap.Duration("time taken", elapsed)) return nil } func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) error { defer ex.removeAction(task, step) - + startTs := time.Now() action := task.Actions()[step].(*ChannelAction) log := log.With( zap.Int64("taskID", task.ID()), @@ -444,5 +449,8 @@ func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) error { log.Warn("failed to unsubscribe DmChannel", zap.String("reason", status.GetReason())) return err } + + elapsed := time.Since(startTs) + log.Info("unsubscribe DmChannel done", zap.Int64("taskID", task.ID()), zap.Duration("time taken", elapsed)) return nil } diff --git a/internal/querycoordv2/task/merger.go b/internal/querycoordv2/task/merger.go index 89b5225da..3e7e7acdb 100644 --- a/internal/querycoordv2/task/merger.go +++ b/internal/querycoordv2/task/merger.go @@ -23,7 +23,6 @@ import ( "github.com/milvus-io/milvus/internal/log" . "github.com/milvus-io/milvus/internal/querycoordv2/params" - "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" ) @@ -31,24 +30,21 @@ import ( const waitQueueCap = 256 type Merger[K comparable, R any] struct { - stopCh chan struct{} - wg sync.WaitGroup - - processors *typeutil.ConcurrentSet[K] // Tasks of having processor - queues map[K]chan MergeableTask[K, R] // TaskID -> Queue - waitQueue chan MergeableTask[K, R] - outCh chan MergeableTask[K, R] + stopCh chan struct{} + wg sync.WaitGroup + queues map[K][]MergeableTask[K, R] // TaskID -> Queue + waitQueue chan MergeableTask[K, R] + outCh chan MergeableTask[K, R] stopOnce sync.Once } func NewMerger[K comparable, R any]() *Merger[K, R] { return &Merger[K, R]{ - stopCh: make(chan struct{}), - processors: typeutil.NewConcurrentSet[K](), - queues: make(map[K]chan MergeableTask[K, R]), - waitQueue: make(chan MergeableTask[K, R], waitQueueCap), - outCh: make(chan MergeableTask[K, R], Params.QueryCoordCfg.TaskMergeCap), + stopCh: make(chan struct{}), + queues: make(map[K][]MergeableTask[K, R]), + waitQueue: make(chan MergeableTask[K, R], waitQueueCap), + outCh: make(chan MergeableTask[K, R], Params.QueryCoordCfg.TaskMergeCap), } } @@ -60,7 +56,6 @@ func (merger *Merger[K, R]) Stop() { merger.stopOnce.Do(func() { close(merger.stopCh) merger.wg.Wait() - close(merger.outCh) }) } @@ -73,40 +68,23 @@ func (merger *Merger[K, R]) schedule(ctx context.Context) { go func() { defer merger.wg.Done() ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() for { select { case <-ctx.Done(): + close(merger.outCh) log.Info("Merger stopped due to context canceled") return case <-merger.stopCh: + close(merger.outCh) log.Info("Merger stopped") return - case task := <-merger.waitQueue: - queue, ok := merger.queues[task.ID()] - if !ok { - queue = make(chan MergeableTask[K, R], Params.QueryCoordCfg.TaskMergeCap) - merger.queues[task.ID()] = queue - } - outer: - for { - select { - case queue <- task: - break outer - default: // Queue full, flush and retry - merger.merge(task.ID(), queue) - } - } - case <-ticker.C: - for id, queue := range merger.queues { - if len(queue) > 0 { - merger.merge(id, queue) - } else { - // Release resource if no task for the queue - delete(merger.queues, id) - } + merger.drain() + for id := range merger.queues { + merger.triggerExecution(id) } } } @@ -126,32 +104,44 @@ func (merger *Merger[K, R]) Add(task MergeableTask[K, R]) { merger.waitQueue <- task } -func (merger *Merger[K, R]) merge(id K, queue chan MergeableTask[K, R]) { - if merger.isStopped() { - return - } - if !merger.processors.Insert(id) { - return +func (merger *Merger[K, R]) drain() { + for { + select { + case task := <-merger.waitQueue: + queue, ok := merger.queues[task.ID()] + if !ok { + queue = []MergeableTask[K, R]{} + } + queue = append(queue, task) + merger.queues[task.ID()] = queue + default: + return + } } - - merger.wg.Add(1) - go merger.mergeQueue(id, queue) } -// mergeQueue merges tasks in the given queue, -// it only processes tasks with the number of the length of queue at the time, -// to avoid leaking goroutines -func (merger *Merger[K, R]) mergeQueue(id K, queue chan MergeableTask[K, R]) { - defer merger.wg.Done() - defer merger.processors.Remove(id) - - len := len(queue) - task := <-queue - for i := 1; i < len; i++ { - task.Merge(<-queue) +func (merger *Merger[K, R]) triggerExecution(id K) { + tasks := merger.queues[id] + delete(merger.queues, id) + + var task MergeableTask[K, R] + merged := 0 + for i := 0; i < len(tasks); i++ { + if merged == 0 { + task = tasks[i] + } else { + task.Merge(tasks[i]) + } + merged++ + if merged >= int(Params.QueryCoordCfg.TaskMergeCap) { + merger.outCh <- task + merged = 0 + } + } + + if merged != 0 { + merger.outCh <- task } - log.Info("merge tasks done", - zap.Any("mergeID", task.ID())) - merger.outCh <- task + log.Info("merge tasks done, trigger execution", zap.Any("mergeID", task.ID())) } diff --git a/internal/querycoordv2/task/merger_test.go b/internal/querycoordv2/task/merger_test.go index d302ac530..91758a2bd 100644 --- a/internal/querycoordv2/task/merger_test.go +++ b/internal/querycoordv2/task/merger_test.go @@ -134,6 +134,9 @@ func (suite *MergerSuite) TestMerge() { suite.Len(task.steps, 3) suite.EqualValues(1, task.Result().DeltaPositions[0].Timestamp) suite.EqualValues(1, task.Result().DeltaPositions[1].Timestamp) + suite.merger.Stop() + _, ok := <-suite.merger.Chan() + suite.Equal(ok, false) } func TestMerger(t *testing.T) { -- GitLab