From 1647bebaf4e70407216ad550041cd8d949929ef6 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Mon, 4 Jul 2022 15:10:20 +0800 Subject: [PATCH] Count growing segment number to estimate cpu usage (#17820) (#18027) Signed-off-by: zhenshan.cao --- configs/milvus.yaml | 10 +-- internal/querynode/meta_replica.go | 44 ++++++++---- internal/querynode/meta_replica_test.go | 67 +++++++++++++++++++ internal/querynode/scheduler_policy.go | 7 +- internal/querynode/scheduler_policy_test.go | 60 +++++++++++++++++ internal/querynode/task_read.go | 9 +-- internal/querynode/task_scheduler.go | 14 ++-- internal/querynode/task_scheduler_test.go | 46 +++++++++++++ internal/querynode/task_search.go | 17 ++++- internal/querynode/validate.go | 19 ++---- internal/util/paramtable/component_param.go | 19 +++++- .../util/paramtable/component_param_test.go | 3 + 12 files changed, 266 insertions(+), 49 deletions(-) create mode 100644 internal/querynode/scheduler_policy_test.go diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 04b9601fb..6a7b968ca 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -183,15 +183,17 @@ queryNode: enabled: true memoryLimit: 2147483648 # 2 GB, 2 * 1024 *1024 *1024 - grouping: - enabled: true + scheduler: receiveChanSize: 10240 unsolvedQueueSize: 10240 + maxReadConcurrency: 0 # maximum concurrency of read task. if set to less or equal 0, it means no uppper limit. + cpuRatio: 10.0 # ratio used to estimate read task cpu usage. + + grouping: + enabled: true maxNQ: 1000 topKMergeRatio: 10.0 - - indexCoord: address: localhost port: 31000 diff --git a/internal/querynode/meta_replica.go b/internal/querynode/meta_replica.go index e8ceccd6c..0d340f5eb 100644 --- a/internal/querynode/meta_replica.go +++ b/internal/querynode/meta_replica.go @@ -80,7 +80,8 @@ type ReplicaInterface interface { // getSegmentIDs returns segment ids getSegmentIDs(partitionID UniqueID, segType segmentType) ([]UniqueID, error) // getSegmentIDsByVChannel returns segment ids which virtual channel is vChannel - getSegmentIDsByVChannel(partitionID UniqueID, vChannel Channel) ([]UniqueID, error) + // if partitionIDs is empty, it means that filtering by partitionIDs is not required. + getSegmentIDsByVChannel(partitionIDs []UniqueID, vChannel Channel, segType segmentType) ([]UniqueID, error) // segment // addSegment add a new segment to collectionReplica @@ -473,25 +474,40 @@ func (replica *metaReplica) getSegmentIDs(partitionID UniqueID, segType segmentT } // getSegmentIDsByVChannel returns segment ids which virtual channel is vChannel -func (replica *metaReplica) getSegmentIDsByVChannel(partitionID UniqueID, vChannel Channel) ([]UniqueID, error) { +// if partitionIDs is empty, it means that filtering by partitionIDs is not required. +func (replica *metaReplica) getSegmentIDsByVChannel(partitionIDs []UniqueID, vChannel Channel, segType segmentType) ([]UniqueID, error) { replica.mu.RLock() defer replica.mu.RUnlock() - segmentIDs, err := replica.getSegmentIDsPrivate(partitionID, segmentTypeGrowing) - if err != nil { - return nil, err + + var segments map[UniqueID]*Segment + var ret []UniqueID + + filterPartition := len(partitionIDs) != 0 + switch segType { + case segmentTypeGrowing: + segments = replica.growingSegments + case segmentTypeSealed: + segments = replica.sealedSegments + default: + return nil, fmt.Errorf("unexpected segment type, segmentType = %s", segType.String()) } - segmentIDsTmp := make([]UniqueID, 0) - for _, segmentID := range segmentIDs { - segment, err := replica.getSegmentByIDPrivate(segmentID, segmentTypeGrowing) - if err != nil { - return nil, err - } + + partitionMap := make(map[UniqueID]struct{}, len(partitionIDs)) + for _, partID := range partitionIDs { + partitionMap[partID] = struct{}{} + } + for _, segment := range segments { if segment.vChannelID == vChannel { - segmentIDsTmp = append(segmentIDsTmp, segment.ID()) + if filterPartition { + partitionID := segment.partitionID + if _, ok := partitionMap[partitionID]; !ok { + continue + } + } + ret = append(ret, segment.ID()) } } - - return segmentIDsTmp, nil + return ret, nil } // getSegmentIDsPrivate is private function in collectionReplica, it returns segment ids diff --git a/internal/querynode/meta_replica_test.go b/internal/querynode/meta_replica_test.go index 77fe3efdb..004a6f527 100644 --- a/internal/querynode/meta_replica_test.go +++ b/internal/querynode/meta_replica_test.go @@ -253,6 +253,73 @@ func TestMetaReplica_segment(t *testing.T) { } } }) + + t.Run("test getSegmentIDsByVChannel", func(t *testing.T) { + replica, err := genSimpleReplica() + assert.NoError(t, err) + defer replica.freeAll() + + schema := genTestCollectionSchema() + collection := replica.addCollection(defaultCollectionID, schema) + replica.addPartition(defaultCollectionID, defaultPartitionID) + replica.addPartition(defaultCollectionID, defaultPartitionID+1) + + segment1, err := newSegment(collection, UniqueID(1), defaultPartitionID, defaultCollectionID, "channel1", segmentTypeGrowing) + assert.NoError(t, err) + err = replica.setSegment(segment1) + assert.NoError(t, err) + + segment2, err := newSegment(collection, UniqueID(2), defaultPartitionID+1, defaultCollectionID, "channel2", segmentTypeGrowing) + assert.NoError(t, err) + err = replica.setSegment(segment2) + assert.NoError(t, err) + + segment3, err := newSegment(collection, UniqueID(3), defaultPartitionID+1, defaultCollectionID, "channel2", segmentTypeGrowing) + assert.NoError(t, err) + err = replica.setSegment(segment3) + assert.NoError(t, err) + + segment4, err := newSegment(collection, UniqueID(4), defaultPartitionID, defaultCollectionID, "channel1", segmentTypeSealed) + assert.NoError(t, err) + err = replica.setSegment(segment4) + assert.NoError(t, err) + + seg1, err := replica.getSegmentIDsByVChannel([]UniqueID{defaultPartitionID}, "channel1", segmentTypeGrowing) + assert.Equal(t, 1, len(seg1)) + assert.NoError(t, err) + seg1, err = replica.getSegmentIDsByVChannel([]UniqueID{}, "channel1", segmentTypeGrowing) + assert.Equal(t, 1, len(seg1)) + assert.NoError(t, err) + seg1, err = replica.getSegmentIDsByVChannel([]UniqueID{}, "channel1", segmentTypeSealed) + assert.Equal(t, 1, len(seg1)) + assert.NoError(t, err) + seg1, err = replica.getSegmentIDsByVChannel(nil, "channel1", segmentTypeGrowing) + assert.Equal(t, 1, len(seg1)) + assert.NoError(t, err) + seg1, err = replica.getSegmentIDsByVChannel([]UniqueID{defaultPartitionID}, "channel1", segmentTypeSealed) + assert.Equal(t, 1, len(seg1)) + assert.NoError(t, err) + seg1, err = replica.getSegmentIDsByVChannel(nil, "channel1", segmentTypeSealed) + assert.Equal(t, 1, len(seg1)) + assert.NoError(t, err) + + seg0, err := replica.getSegmentIDsByVChannel([]UniqueID{defaultPartitionID}, "channel2", segmentTypeSealed) + assert.Equal(t, 0, len(seg0)) + assert.NoError(t, err) + seg0, err = replica.getSegmentIDsByVChannel([]UniqueID{defaultPartitionID}, "channel2", segmentTypeGrowing) + assert.Equal(t, 0, len(seg0)) + assert.NoError(t, err) + + seg2, err := replica.getSegmentIDsByVChannel([]UniqueID{defaultPartitionID + 1}, "channel2", segmentTypeGrowing) + assert.Equal(t, 2, len(seg2)) + assert.NoError(t, err) + seg2, err = replica.getSegmentIDsByVChannel([]UniqueID{}, "channel2", segmentTypeGrowing) + assert.Equal(t, 2, len(seg2)) + assert.NoError(t, err) + seg2, err = replica.getSegmentIDsByVChannel(nil, "channel2", segmentTypeGrowing) + assert.Equal(t, 2, len(seg2)) + assert.NoError(t, err) + }) } func TestMetaReplica_freeAll(t *testing.T) { diff --git a/internal/querynode/scheduler_policy.go b/internal/querynode/scheduler_policy.go index 2ba517276..ed1533824 100644 --- a/internal/querynode/scheduler_policy.go +++ b/internal/querynode/scheduler_policy.go @@ -4,13 +4,13 @@ import ( "container/list" ) -type scheduleReadTaskPolicy func(sqTasks *list.List, targetUsage int32) ([]readTask, int32) +type scheduleReadTaskPolicy func(sqTasks *list.List, targetUsage int32, maxNum int32) ([]readTask, int32) -func defaultScheduleReadPolicy(sqTasks *list.List, targetUsage int32) ([]readTask, int32) { +func defaultScheduleReadPolicy(sqTasks *list.List, targetUsage int32, maxNum int32) ([]readTask, int32) { var ret []readTask usage := int32(0) var next *list.Element - for e := sqTasks.Front(); e != nil; e = next { + for e := sqTasks.Front(); e != nil && maxNum > 0; e = next { next = e.Next() t, _ := e.Value.(readTask) tUsage := t.CPUUsage() @@ -20,6 +20,7 @@ func defaultScheduleReadPolicy(sqTasks *list.List, targetUsage int32) ([]readTas usage += tUsage sqTasks.Remove(e) ret = append(ret, t) + maxNum-- } return ret, usage } diff --git a/internal/querynode/scheduler_policy_test.go b/internal/querynode/scheduler_policy_test.go new file mode 100644 index 000000000..9a14ebbb0 --- /dev/null +++ b/internal/querynode/scheduler_policy_test.go @@ -0,0 +1,60 @@ +package querynode + +import ( + "container/list" + "math" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestScheduler_defaultScheduleReadPolicy(t *testing.T) { + readyReadTasks := list.New() + for i := 1; i <= 10; i++ { + t := mockReadTask{ + cpuUsage: int32(i * 10), + } + readyReadTasks.PushBack(&t) + } + + scheduleFunc := defaultScheduleReadPolicy + + targetUsage := int32(100) + maxNum := int32(2) + + tasks, cur := scheduleFunc(readyReadTasks, targetUsage, maxNum) + assert.Equal(t, int32(30), cur) + assert.Equal(t, int32(2), int32(len(tasks))) + + targetUsage = 300 + maxNum = 0 + tasks, cur = scheduleFunc(readyReadTasks, targetUsage, maxNum) + assert.Equal(t, int32(0), cur) + assert.Equal(t, 0, len(tasks)) + + targetUsage = 0 + maxNum = 0 + tasks, cur = scheduleFunc(readyReadTasks, targetUsage, maxNum) + assert.Equal(t, int32(0), cur) + assert.Equal(t, 0, len(tasks)) + + targetUsage = 0 + maxNum = 300 + tasks, cur = scheduleFunc(readyReadTasks, targetUsage, maxNum) + assert.Equal(t, int32(0), cur) + assert.Equal(t, 0, len(tasks)) + + actual := int32(180) // sum(3..6) * 10 3 + 4 + 5 + 6 + targetUsage = int32(190) // > actual + maxNum = math.MaxInt32 + tasks, cur = scheduleFunc(readyReadTasks, targetUsage, maxNum) + assert.Equal(t, actual, cur) + assert.Equal(t, 4, len(tasks)) + + actual = 340 // sum(7..10) * 10 , 7+ 8 + 9 + 10 + targetUsage = 340 + maxNum = 4 + tasks, cur = scheduleFunc(readyReadTasks, targetUsage, maxNum) + assert.Equal(t, actual, cur) + assert.Equal(t, 4, len(tasks)) +} diff --git a/internal/querynode/task_read.go b/internal/querynode/task_read.go index 7a7d0d727..67319d01a 100644 --- a/internal/querynode/task_read.go +++ b/internal/querynode/task_read.go @@ -34,7 +34,6 @@ type readTask interface { Ctx() context.Context - GetTimeRecorder() *timerecord.TimeRecorder GetCollectionID() UniqueID Ready() (bool, error) @@ -43,7 +42,7 @@ type readTask interface { CPUUsage() int32 Timeout() bool - SetMaxCPUUSage(int32) + SetMaxCPUUsage(int32) SetStep(step TaskStep) } @@ -84,7 +83,7 @@ func (b *baseReadTask) OnEnqueue() error { return nil } -func (b *baseReadTask) SetMaxCPUUSage(cpu int32) { +func (b *baseReadTask) SetMaxCPUUsage(cpu int32) { b.maxCPU = cpu } @@ -116,10 +115,6 @@ func (b *baseReadTask) GetCollectionID() UniqueID { return b.CollectionID } -func (b *baseReadTask) GetTimeRecorder() *timerecord.TimeRecorder { - return b.tr -} - func (b *baseReadTask) CanMergeWith(t readTask) bool { return false } diff --git a/internal/querynode/task_scheduler.go b/internal/querynode/task_scheduler.go index 4c297d4d2..e3e7530c4 100644 --- a/internal/querynode/task_scheduler.go +++ b/internal/querynode/task_scheduler.go @@ -210,7 +210,7 @@ func (s *taskScheduler) scheduleReadTasks() { } func (s *taskScheduler) AddReadTask(ctx context.Context, t readTask) error { - t.SetMaxCPUUSage(s.maxCPUUsage) + t.SetMaxCPUUsage(s.maxCPUUsage) t.OnEnqueue() select { case <-ctx.Done(): @@ -223,6 +223,8 @@ func (s *taskScheduler) AddReadTask(ctx context.Context, t readTask) error { } func (s *taskScheduler) popAndAddToExecute() { + readConcurrency := atomic.LoadInt32(&s.readConcurrency) + metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(readConcurrency)) if s.readyReadTasks.Len() == 0 { return } @@ -235,7 +237,13 @@ func (s *taskScheduler) popAndAddToExecute() { if targetUsage <= 0 { return } - tasks, deltaUsage := s.schedule(s.readyReadTasks, targetUsage) + + remain := Params.QueryNodeCfg.MaxReadConcurrency - readConcurrency + if remain <= 0 { + return + } + + tasks, deltaUsage := s.schedule(s.readyReadTasks, targetUsage, remain) atomic.AddInt32(&s.cpuUsage, deltaUsage) for _, t := range tasks { s.executeReadTaskChan <- t @@ -358,6 +366,4 @@ func (s *taskScheduler) tryMergeReadTasks() { } metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(s.unsolvedReadTasks.Len())) metrics.QueryNodeReadTaskReadyLen.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(s.readyReadTasks.Len())) - readConcurrency := atomic.LoadInt32(&s.readConcurrency) - metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(readConcurrency)) } diff --git a/internal/querynode/task_scheduler_test.go b/internal/querynode/task_scheduler_test.go index be125a570..50ae6fdb7 100644 --- a/internal/querynode/task_scheduler_test.go +++ b/internal/querynode/task_scheduler_test.go @@ -55,6 +55,52 @@ func (m *mockTask) PostExecute(ctx context.Context) error { return nil } +var _ readTask = (*mockReadTask)(nil) + +type mockReadTask struct { + mockTask + cpuUsage int32 + maxCPU int32 + collectionID UniqueID + ready bool + canMerge bool + timeout bool + step TaskStep + readyError error +} + +func (m *mockReadTask) GetCollectionID() UniqueID { + return m.collectionID +} + +func (m *mockReadTask) Ready() (bool, error) { + return m.ready, m.readyError +} + +func (m *mockReadTask) Merge(o readTask) { + +} + +func (m *mockReadTask) CPUUsage() int32 { + return m.cpuUsage +} + +func (m *mockReadTask) Timeout() bool { + return m.timeout +} + +func (m *mockReadTask) SetMaxCPUUsage(cpu int32) { + m.maxCPU = cpu +} + +func (m *mockReadTask) SetStep(step TaskStep) { + m.step = step +} + +func (m *mockReadTask) CanMergeWith(o readTask) bool { + return m.canMerge +} + func TestTaskScheduler(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/querynode/task_search.go b/internal/querynode/task_search.go index 536c0e491..d1287aeb2 100644 --- a/internal/querynode/task_search.go +++ b/internal/querynode/task_search.go @@ -177,13 +177,24 @@ func (s *searchTask) Notify(err error) { } func (s *searchTask) estimateCPUUsage() { + var segmentNum int64 if s.DataScope == querypb.DataScope_Streaming { // assume growing segments num is 5 - s.cpu = int32(s.NQ) * 5 / 2 + partitionIDs := s.iReq.GetPartitionIDs() + channel := s.req.GetDmlChannel() + segIDs, err := s.QS.metaReplica.getSegmentIDsByVChannel(partitionIDs, channel, segmentTypeGrowing) + if err != nil { + log.Error("searchTask estimateCPUUsage", zap.Error(err)) + } + segmentNum = int64(len(segIDs)) + if segmentNum <= 0 { + segmentNum = 1 + } } else if s.DataScope == querypb.DataScope_Historical { - segmentNum := int64(len(s.req.GetSegmentIDs())) - s.cpu = int32(s.NQ * segmentNum / 2) + segmentNum = int64(len(s.req.GetSegmentIDs())) } + cpu := float64(s.NQ*segmentNum) * Params.QueryNodeCfg.CPURatio + s.cpu = int32(cpu) if s.cpu <= 0 { s.cpu = 5 } else if s.cpu > s.maxCPU { diff --git a/internal/querynode/validate.go b/internal/querynode/validate.go index b186ffe52..e1bd60c0d 100644 --- a/internal/querynode/validate.go +++ b/internal/querynode/validate.go @@ -122,17 +122,12 @@ func validateOnStreamReplica(replica ReplicaInterface, collectionID UniqueID, pa return searchPartIDs, segmentIDs, nil } - for _, partID := range searchPartIDs { - segIDs, err2 := replica.getSegmentIDsByVChannel(partID, vChannel) - log.Debug("get segmentIDs by vChannel", - zap.Any("collectionID", collectionID), - zap.Any("vChannel", vChannel), - zap.Any("partitionID", partID), - zap.Any("segmentIDs", segIDs)) - if err2 != nil { - return searchPartIDs, segmentIDs, err2 - } - segmentIDs = append(segmentIDs, segIDs...) - } + segmentIDs, err = replica.getSegmentIDsByVChannel(searchPartIDs, vChannel, segmentTypeGrowing) + log.Debug("validateOnStreamReplica getSegmentIDsByVChannel", + zap.Any("collectionID", collectionID), + zap.Any("vChannel", vChannel), + zap.Any("partitionIDs", searchPartIDs), + zap.Any("segmentIDs", segmentIDs), + zap.Error(err)) return searchPartIDs, segmentIDs, nil } diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index b9c10a1d3..b6516d36a 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -709,8 +709,10 @@ type queryNodeConfig struct { GroupEnabled bool MaxReceiveChanSize int32 MaxUnsolvedQueueSize int32 + MaxReadConcurrency int32 MaxGroupNQ int64 TopKMergeRatio float64 + CPURatio float64 } func (p *queryNodeConfig) init(base *BaseTable) { @@ -733,9 +735,11 @@ func (p *queryNodeConfig) init(base *BaseTable) { p.initGroupEnabled() p.initMaxReceiveChanSize() + p.initMaxReadConcurrency() p.initMaxUnsolvedQueueSize() p.initMaxGroupNQ() p.initTopKMergeRatio() + p.initCPURatio() } // InitAlias initializes an alias for the QueryNode role. @@ -850,11 +854,22 @@ func (p *queryNodeConfig) initGroupEnabled() { } func (p *queryNodeConfig) initMaxReceiveChanSize() { - p.MaxReceiveChanSize = p.Base.ParseInt32WithDefault("queryNode.grouping.receiveChanSize", 10240) + p.MaxReceiveChanSize = p.Base.ParseInt32WithDefault("queryNode.scheduler.receiveChanSize", 10240) } func (p *queryNodeConfig) initMaxUnsolvedQueueSize() { - p.MaxUnsolvedQueueSize = p.Base.ParseInt32WithDefault("queryNode.grouping.unsolvedQueueSize", 10240) + p.MaxUnsolvedQueueSize = p.Base.ParseInt32WithDefault("queryNode.scheduler.unsolvedQueueSize", 10240) +} + +func (p *queryNodeConfig) initCPURatio() { + p.CPURatio = p.Base.ParseFloatWithDefault("queryNode.scheduler.cpuRatio", 10.0) +} + +func (p *queryNodeConfig) initMaxReadConcurrency() { + p.MaxReadConcurrency = p.Base.ParseInt32WithDefault("queryNode.scheduler.maxReadConcurrency", 0) + if p.MaxReadConcurrency <= 0 { + p.MaxReadConcurrency = math.MaxInt32 + } } func (p *queryNodeConfig) initMaxGroupNQ() { diff --git a/internal/util/paramtable/component_param_test.go b/internal/util/paramtable/component_param_test.go index 5a3a22423..f3cf45d1f 100644 --- a/internal/util/paramtable/component_param_test.go +++ b/internal/util/paramtable/component_param_test.go @@ -12,6 +12,7 @@ package paramtable import ( + "math" "os" "path" "testing" @@ -244,8 +245,10 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, true, Params.GroupEnabled) assert.Equal(t, int32(10240), Params.MaxReceiveChanSize) assert.Equal(t, int32(10240), Params.MaxUnsolvedQueueSize) + assert.Equal(t, int32(math.MaxInt32), Params.MaxReadConcurrency) assert.Equal(t, int64(1000), Params.MaxGroupNQ) assert.Equal(t, 10.0, Params.TopKMergeRatio) + assert.Equal(t, 10.0, Params.CPURatio) // test small indexNlist/NProbe default Params.Base.Remove("queryNode.segcore.smallIndex.nlist") -- GitLab