未验证 提交 4f6b8731 编写于 作者: X Xiaofan 提交者: GitHub

Refine request merger (#20189)

Signed-off-by: Nxiaofan-luan <xiaofan.luan@zilliz.com>
Signed-off-by: Nxiaofan-luan <xiaofan.luan@zilliz.com>
上级 d737a661
......@@ -83,7 +83,7 @@ func (mgr *TargetManager) RemoveSegment(segmentID int64) {
func (mgr *TargetManager) removeSegment(segmentID int64) {
delete(mgr.segments, segmentID)
log.Info("segment removed from targets")
log.Info("segment removed from targets", zap.Int64("segment", segmentID))
}
// AddSegment adds segment into target set,
......
......@@ -133,6 +133,7 @@ func (ex *Executor) scheduleRequests() {
}
func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
startTs := time.Now()
task := mergeTask.tasks[0]
action := task.Actions()[mergeTask.steps[0]]
......@@ -152,6 +153,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
log := log.With(
zap.Int64s("taskIDs", taskIDs),
zap.Int64("collectionID", task.CollectionID()),
zap.String("shard", task.Shard()),
zap.Int64s("segmentIDs", segments),
zap.Int64("nodeID", action.Node()),
zap.Int64("source", task.SourceID()),
......@@ -177,7 +179,8 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
log.Warn("failed to load segment", zap.String("reason", status.GetReason()))
return
}
log.Info("load segments done")
elapsed := time.Since(startTs)
log.Info("load segments done", zap.Int64("taskID", task.ID()), zap.Duration("timeTaken", elapsed))
}
func (ex *Executor) removeAction(task Task, step int) {
......@@ -275,7 +278,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
defer ex.removeAction(task, step)
startTs := time.Now()
action := task.Actions()[step].(*SegmentAction)
defer action.isReleaseCommitted.Store(true)
......@@ -332,7 +335,8 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
log.Warn("failed to release segment", zap.String("reason", status.GetReason()))
return
}
log.Info("release segment done")
elapsed := time.Since(startTs)
log.Info("release segment done", zap.Int64("taskID", task.ID()), zap.Duration("time taken", elapsed))
}
func (ex *Executor) executeDmChannelAction(task *ChannelTask, step int) {
......@@ -347,7 +351,7 @@ func (ex *Executor) executeDmChannelAction(task *ChannelTask, step int) {
func (ex *Executor) subDmChannel(task *ChannelTask, step int) error {
defer ex.removeAction(task, step)
startTs := time.Now()
action := task.Actions()[step].(*ChannelAction)
log := log.With(
zap.Int64("taskID", task.ID()),
......@@ -407,13 +411,14 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) error {
log.Warn("failed to subscribe DmChannel", zap.String("reason", status.GetReason()))
return err
}
log.Info("subscribe DmChannel done")
elapsed := time.Since(startTs)
log.Info("subscribe DmChannel done", zap.Int64("taskID", task.ID()), zap.Duration("time taken", elapsed))
return nil
}
func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) error {
defer ex.removeAction(task, step)
startTs := time.Now()
action := task.Actions()[step].(*ChannelAction)
log := log.With(
zap.Int64("taskID", task.ID()),
......@@ -444,5 +449,8 @@ func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) error {
log.Warn("failed to unsubscribe DmChannel", zap.String("reason", status.GetReason()))
return err
}
elapsed := time.Since(startTs)
log.Info("unsubscribe DmChannel done", zap.Int64("taskID", task.ID()), zap.Duration("time taken", elapsed))
return nil
}
......@@ -23,7 +23,6 @@ import (
"github.com/milvus-io/milvus/internal/log"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)
......@@ -31,24 +30,21 @@ import (
const waitQueueCap = 256
type Merger[K comparable, R any] struct {
stopCh chan struct{}
wg sync.WaitGroup
processors *typeutil.ConcurrentSet[K] // Tasks of having processor
queues map[K]chan MergeableTask[K, R] // TaskID -> Queue
waitQueue chan MergeableTask[K, R]
outCh chan MergeableTask[K, R]
stopCh chan struct{}
wg sync.WaitGroup
queues map[K][]MergeableTask[K, R] // TaskID -> Queue
waitQueue chan MergeableTask[K, R]
outCh chan MergeableTask[K, R]
stopOnce sync.Once
}
func NewMerger[K comparable, R any]() *Merger[K, R] {
return &Merger[K, R]{
stopCh: make(chan struct{}),
processors: typeutil.NewConcurrentSet[K](),
queues: make(map[K]chan MergeableTask[K, R]),
waitQueue: make(chan MergeableTask[K, R], waitQueueCap),
outCh: make(chan MergeableTask[K, R], Params.QueryCoordCfg.TaskMergeCap),
stopCh: make(chan struct{}),
queues: make(map[K][]MergeableTask[K, R]),
waitQueue: make(chan MergeableTask[K, R], waitQueueCap),
outCh: make(chan MergeableTask[K, R], Params.QueryCoordCfg.TaskMergeCap),
}
}
......@@ -60,7 +56,6 @@ func (merger *Merger[K, R]) Stop() {
merger.stopOnce.Do(func() {
close(merger.stopCh)
merger.wg.Wait()
close(merger.outCh)
})
}
......@@ -73,40 +68,23 @@ func (merger *Merger[K, R]) schedule(ctx context.Context) {
go func() {
defer merger.wg.Done()
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
close(merger.outCh)
log.Info("Merger stopped due to context canceled")
return
case <-merger.stopCh:
close(merger.outCh)
log.Info("Merger stopped")
return
case task := <-merger.waitQueue:
queue, ok := merger.queues[task.ID()]
if !ok {
queue = make(chan MergeableTask[K, R], Params.QueryCoordCfg.TaskMergeCap)
merger.queues[task.ID()] = queue
}
outer:
for {
select {
case queue <- task:
break outer
default: // Queue full, flush and retry
merger.merge(task.ID(), queue)
}
}
case <-ticker.C:
for id, queue := range merger.queues {
if len(queue) > 0 {
merger.merge(id, queue)
} else {
// Release resource if no task for the queue
delete(merger.queues, id)
}
merger.drain()
for id := range merger.queues {
merger.triggerExecution(id)
}
}
}
......@@ -126,32 +104,44 @@ func (merger *Merger[K, R]) Add(task MergeableTask[K, R]) {
merger.waitQueue <- task
}
func (merger *Merger[K, R]) merge(id K, queue chan MergeableTask[K, R]) {
if merger.isStopped() {
return
}
if !merger.processors.Insert(id) {
return
func (merger *Merger[K, R]) drain() {
for {
select {
case task := <-merger.waitQueue:
queue, ok := merger.queues[task.ID()]
if !ok {
queue = []MergeableTask[K, R]{}
}
queue = append(queue, task)
merger.queues[task.ID()] = queue
default:
return
}
}
merger.wg.Add(1)
go merger.mergeQueue(id, queue)
}
// mergeQueue merges tasks in the given queue,
// it only processes tasks with the number of the length of queue at the time,
// to avoid leaking goroutines
func (merger *Merger[K, R]) mergeQueue(id K, queue chan MergeableTask[K, R]) {
defer merger.wg.Done()
defer merger.processors.Remove(id)
len := len(queue)
task := <-queue
for i := 1; i < len; i++ {
task.Merge(<-queue)
func (merger *Merger[K, R]) triggerExecution(id K) {
tasks := merger.queues[id]
delete(merger.queues, id)
var task MergeableTask[K, R]
merged := 0
for i := 0; i < len(tasks); i++ {
if merged == 0 {
task = tasks[i]
} else {
task.Merge(tasks[i])
}
merged++
if merged >= int(Params.QueryCoordCfg.TaskMergeCap) {
merger.outCh <- task
merged = 0
}
}
if merged != 0 {
merger.outCh <- task
}
log.Info("merge tasks done",
zap.Any("mergeID", task.ID()))
merger.outCh <- task
log.Info("merge tasks done, trigger execution", zap.Any("mergeID", task.ID()))
}
......@@ -134,6 +134,9 @@ func (suite *MergerSuite) TestMerge() {
suite.Len(task.steps, 3)
suite.EqualValues(1, task.Result().DeltaPositions[0].Timestamp)
suite.EqualValues(1, task.Result().DeltaPositions[1].Timestamp)
suite.merger.Stop()
_, ok := <-suite.merger.Chan()
suite.Equal(ok, false)
}
func TestMerger(t *testing.T) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册