未验证 提交 1647beba 编写于 作者: Z zhenshan.cao 提交者: GitHub

Count growing segment number to estimate cpu usage (#17820) (#18027)

Signed-off-by: Nzhenshan.cao <zhenshan.cao@zilliz.com>
上级 9bd35dd8
......@@ -183,15 +183,17 @@ queryNode:
enabled: true
memoryLimit: 2147483648 # 2 GB, 2 * 1024 *1024 *1024
grouping:
enabled: true
scheduler:
receiveChanSize: 10240
unsolvedQueueSize: 10240
maxReadConcurrency: 0 # maximum concurrency of read task. if set to less or equal 0, it means no uppper limit.
cpuRatio: 10.0 # ratio used to estimate read task cpu usage.
grouping:
enabled: true
maxNQ: 1000
topKMergeRatio: 10.0
indexCoord:
address: localhost
port: 31000
......
......@@ -80,7 +80,8 @@ type ReplicaInterface interface {
// getSegmentIDs returns segment ids
getSegmentIDs(partitionID UniqueID, segType segmentType) ([]UniqueID, error)
// getSegmentIDsByVChannel returns segment ids which virtual channel is vChannel
getSegmentIDsByVChannel(partitionID UniqueID, vChannel Channel) ([]UniqueID, error)
// if partitionIDs is empty, it means that filtering by partitionIDs is not required.
getSegmentIDsByVChannel(partitionIDs []UniqueID, vChannel Channel, segType segmentType) ([]UniqueID, error)
// segment
// addSegment add a new segment to collectionReplica
......@@ -473,25 +474,40 @@ func (replica *metaReplica) getSegmentIDs(partitionID UniqueID, segType segmentT
}
// getSegmentIDsByVChannel returns segment ids which virtual channel is vChannel
func (replica *metaReplica) getSegmentIDsByVChannel(partitionID UniqueID, vChannel Channel) ([]UniqueID, error) {
// if partitionIDs is empty, it means that filtering by partitionIDs is not required.
func (replica *metaReplica) getSegmentIDsByVChannel(partitionIDs []UniqueID, vChannel Channel, segType segmentType) ([]UniqueID, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
segmentIDs, err := replica.getSegmentIDsPrivate(partitionID, segmentTypeGrowing)
if err != nil {
return nil, err
var segments map[UniqueID]*Segment
var ret []UniqueID
filterPartition := len(partitionIDs) != 0
switch segType {
case segmentTypeGrowing:
segments = replica.growingSegments
case segmentTypeSealed:
segments = replica.sealedSegments
default:
return nil, fmt.Errorf("unexpected segment type, segmentType = %s", segType.String())
}
segmentIDsTmp := make([]UniqueID, 0)
for _, segmentID := range segmentIDs {
segment, err := replica.getSegmentByIDPrivate(segmentID, segmentTypeGrowing)
if err != nil {
return nil, err
}
partitionMap := make(map[UniqueID]struct{}, len(partitionIDs))
for _, partID := range partitionIDs {
partitionMap[partID] = struct{}{}
}
for _, segment := range segments {
if segment.vChannelID == vChannel {
segmentIDsTmp = append(segmentIDsTmp, segment.ID())
if filterPartition {
partitionID := segment.partitionID
if _, ok := partitionMap[partitionID]; !ok {
continue
}
}
ret = append(ret, segment.ID())
}
}
return segmentIDsTmp, nil
return ret, nil
}
// getSegmentIDsPrivate is private function in collectionReplica, it returns segment ids
......
......@@ -253,6 +253,73 @@ func TestMetaReplica_segment(t *testing.T) {
}
}
})
t.Run("test getSegmentIDsByVChannel", func(t *testing.T) {
replica, err := genSimpleReplica()
assert.NoError(t, err)
defer replica.freeAll()
schema := genTestCollectionSchema()
collection := replica.addCollection(defaultCollectionID, schema)
replica.addPartition(defaultCollectionID, defaultPartitionID)
replica.addPartition(defaultCollectionID, defaultPartitionID+1)
segment1, err := newSegment(collection, UniqueID(1), defaultPartitionID, defaultCollectionID, "channel1", segmentTypeGrowing)
assert.NoError(t, err)
err = replica.setSegment(segment1)
assert.NoError(t, err)
segment2, err := newSegment(collection, UniqueID(2), defaultPartitionID+1, defaultCollectionID, "channel2", segmentTypeGrowing)
assert.NoError(t, err)
err = replica.setSegment(segment2)
assert.NoError(t, err)
segment3, err := newSegment(collection, UniqueID(3), defaultPartitionID+1, defaultCollectionID, "channel2", segmentTypeGrowing)
assert.NoError(t, err)
err = replica.setSegment(segment3)
assert.NoError(t, err)
segment4, err := newSegment(collection, UniqueID(4), defaultPartitionID, defaultCollectionID, "channel1", segmentTypeSealed)
assert.NoError(t, err)
err = replica.setSegment(segment4)
assert.NoError(t, err)
seg1, err := replica.getSegmentIDsByVChannel([]UniqueID{defaultPartitionID}, "channel1", segmentTypeGrowing)
assert.Equal(t, 1, len(seg1))
assert.NoError(t, err)
seg1, err = replica.getSegmentIDsByVChannel([]UniqueID{}, "channel1", segmentTypeGrowing)
assert.Equal(t, 1, len(seg1))
assert.NoError(t, err)
seg1, err = replica.getSegmentIDsByVChannel([]UniqueID{}, "channel1", segmentTypeSealed)
assert.Equal(t, 1, len(seg1))
assert.NoError(t, err)
seg1, err = replica.getSegmentIDsByVChannel(nil, "channel1", segmentTypeGrowing)
assert.Equal(t, 1, len(seg1))
assert.NoError(t, err)
seg1, err = replica.getSegmentIDsByVChannel([]UniqueID{defaultPartitionID}, "channel1", segmentTypeSealed)
assert.Equal(t, 1, len(seg1))
assert.NoError(t, err)
seg1, err = replica.getSegmentIDsByVChannel(nil, "channel1", segmentTypeSealed)
assert.Equal(t, 1, len(seg1))
assert.NoError(t, err)
seg0, err := replica.getSegmentIDsByVChannel([]UniqueID{defaultPartitionID}, "channel2", segmentTypeSealed)
assert.Equal(t, 0, len(seg0))
assert.NoError(t, err)
seg0, err = replica.getSegmentIDsByVChannel([]UniqueID{defaultPartitionID}, "channel2", segmentTypeGrowing)
assert.Equal(t, 0, len(seg0))
assert.NoError(t, err)
seg2, err := replica.getSegmentIDsByVChannel([]UniqueID{defaultPartitionID + 1}, "channel2", segmentTypeGrowing)
assert.Equal(t, 2, len(seg2))
assert.NoError(t, err)
seg2, err = replica.getSegmentIDsByVChannel([]UniqueID{}, "channel2", segmentTypeGrowing)
assert.Equal(t, 2, len(seg2))
assert.NoError(t, err)
seg2, err = replica.getSegmentIDsByVChannel(nil, "channel2", segmentTypeGrowing)
assert.Equal(t, 2, len(seg2))
assert.NoError(t, err)
})
}
func TestMetaReplica_freeAll(t *testing.T) {
......
......@@ -4,13 +4,13 @@ import (
"container/list"
)
type scheduleReadTaskPolicy func(sqTasks *list.List, targetUsage int32) ([]readTask, int32)
type scheduleReadTaskPolicy func(sqTasks *list.List, targetUsage int32, maxNum int32) ([]readTask, int32)
func defaultScheduleReadPolicy(sqTasks *list.List, targetUsage int32) ([]readTask, int32) {
func defaultScheduleReadPolicy(sqTasks *list.List, targetUsage int32, maxNum int32) ([]readTask, int32) {
var ret []readTask
usage := int32(0)
var next *list.Element
for e := sqTasks.Front(); e != nil; e = next {
for e := sqTasks.Front(); e != nil && maxNum > 0; e = next {
next = e.Next()
t, _ := e.Value.(readTask)
tUsage := t.CPUUsage()
......@@ -20,6 +20,7 @@ func defaultScheduleReadPolicy(sqTasks *list.List, targetUsage int32) ([]readTas
usage += tUsage
sqTasks.Remove(e)
ret = append(ret, t)
maxNum--
}
return ret, usage
}
package querynode
import (
"container/list"
"math"
"testing"
"github.com/stretchr/testify/assert"
)
func TestScheduler_defaultScheduleReadPolicy(t *testing.T) {
readyReadTasks := list.New()
for i := 1; i <= 10; i++ {
t := mockReadTask{
cpuUsage: int32(i * 10),
}
readyReadTasks.PushBack(&t)
}
scheduleFunc := defaultScheduleReadPolicy
targetUsage := int32(100)
maxNum := int32(2)
tasks, cur := scheduleFunc(readyReadTasks, targetUsage, maxNum)
assert.Equal(t, int32(30), cur)
assert.Equal(t, int32(2), int32(len(tasks)))
targetUsage = 300
maxNum = 0
tasks, cur = scheduleFunc(readyReadTasks, targetUsage, maxNum)
assert.Equal(t, int32(0), cur)
assert.Equal(t, 0, len(tasks))
targetUsage = 0
maxNum = 0
tasks, cur = scheduleFunc(readyReadTasks, targetUsage, maxNum)
assert.Equal(t, int32(0), cur)
assert.Equal(t, 0, len(tasks))
targetUsage = 0
maxNum = 300
tasks, cur = scheduleFunc(readyReadTasks, targetUsage, maxNum)
assert.Equal(t, int32(0), cur)
assert.Equal(t, 0, len(tasks))
actual := int32(180) // sum(3..6) * 10 3 + 4 + 5 + 6
targetUsage = int32(190) // > actual
maxNum = math.MaxInt32
tasks, cur = scheduleFunc(readyReadTasks, targetUsage, maxNum)
assert.Equal(t, actual, cur)
assert.Equal(t, 4, len(tasks))
actual = 340 // sum(7..10) * 10 , 7+ 8 + 9 + 10
targetUsage = 340
maxNum = 4
tasks, cur = scheduleFunc(readyReadTasks, targetUsage, maxNum)
assert.Equal(t, actual, cur)
assert.Equal(t, 4, len(tasks))
}
......@@ -34,7 +34,6 @@ type readTask interface {
Ctx() context.Context
GetTimeRecorder() *timerecord.TimeRecorder
GetCollectionID() UniqueID
Ready() (bool, error)
......@@ -43,7 +42,7 @@ type readTask interface {
CPUUsage() int32
Timeout() bool
SetMaxCPUUSage(int32)
SetMaxCPUUsage(int32)
SetStep(step TaskStep)
}
......@@ -84,7 +83,7 @@ func (b *baseReadTask) OnEnqueue() error {
return nil
}
func (b *baseReadTask) SetMaxCPUUSage(cpu int32) {
func (b *baseReadTask) SetMaxCPUUsage(cpu int32) {
b.maxCPU = cpu
}
......@@ -116,10 +115,6 @@ func (b *baseReadTask) GetCollectionID() UniqueID {
return b.CollectionID
}
func (b *baseReadTask) GetTimeRecorder() *timerecord.TimeRecorder {
return b.tr
}
func (b *baseReadTask) CanMergeWith(t readTask) bool {
return false
}
......
......@@ -210,7 +210,7 @@ func (s *taskScheduler) scheduleReadTasks() {
}
func (s *taskScheduler) AddReadTask(ctx context.Context, t readTask) error {
t.SetMaxCPUUSage(s.maxCPUUsage)
t.SetMaxCPUUsage(s.maxCPUUsage)
t.OnEnqueue()
select {
case <-ctx.Done():
......@@ -223,6 +223,8 @@ func (s *taskScheduler) AddReadTask(ctx context.Context, t readTask) error {
}
func (s *taskScheduler) popAndAddToExecute() {
readConcurrency := atomic.LoadInt32(&s.readConcurrency)
metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(readConcurrency))
if s.readyReadTasks.Len() == 0 {
return
}
......@@ -235,7 +237,13 @@ func (s *taskScheduler) popAndAddToExecute() {
if targetUsage <= 0 {
return
}
tasks, deltaUsage := s.schedule(s.readyReadTasks, targetUsage)
remain := Params.QueryNodeCfg.MaxReadConcurrency - readConcurrency
if remain <= 0 {
return
}
tasks, deltaUsage := s.schedule(s.readyReadTasks, targetUsage, remain)
atomic.AddInt32(&s.cpuUsage, deltaUsage)
for _, t := range tasks {
s.executeReadTaskChan <- t
......@@ -358,6 +366,4 @@ func (s *taskScheduler) tryMergeReadTasks() {
}
metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(s.unsolvedReadTasks.Len()))
metrics.QueryNodeReadTaskReadyLen.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(s.readyReadTasks.Len()))
readConcurrency := atomic.LoadInt32(&s.readConcurrency)
metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(readConcurrency))
}
......@@ -55,6 +55,52 @@ func (m *mockTask) PostExecute(ctx context.Context) error {
return nil
}
var _ readTask = (*mockReadTask)(nil)
type mockReadTask struct {
mockTask
cpuUsage int32
maxCPU int32
collectionID UniqueID
ready bool
canMerge bool
timeout bool
step TaskStep
readyError error
}
func (m *mockReadTask) GetCollectionID() UniqueID {
return m.collectionID
}
func (m *mockReadTask) Ready() (bool, error) {
return m.ready, m.readyError
}
func (m *mockReadTask) Merge(o readTask) {
}
func (m *mockReadTask) CPUUsage() int32 {
return m.cpuUsage
}
func (m *mockReadTask) Timeout() bool {
return m.timeout
}
func (m *mockReadTask) SetMaxCPUUsage(cpu int32) {
m.maxCPU = cpu
}
func (m *mockReadTask) SetStep(step TaskStep) {
m.step = step
}
func (m *mockReadTask) CanMergeWith(o readTask) bool {
return m.canMerge
}
func TestTaskScheduler(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
......
......@@ -177,13 +177,24 @@ func (s *searchTask) Notify(err error) {
}
func (s *searchTask) estimateCPUUsage() {
var segmentNum int64
if s.DataScope == querypb.DataScope_Streaming {
// assume growing segments num is 5
s.cpu = int32(s.NQ) * 5 / 2
partitionIDs := s.iReq.GetPartitionIDs()
channel := s.req.GetDmlChannel()
segIDs, err := s.QS.metaReplica.getSegmentIDsByVChannel(partitionIDs, channel, segmentTypeGrowing)
if err != nil {
log.Error("searchTask estimateCPUUsage", zap.Error(err))
}
segmentNum = int64(len(segIDs))
if segmentNum <= 0 {
segmentNum = 1
}
} else if s.DataScope == querypb.DataScope_Historical {
segmentNum := int64(len(s.req.GetSegmentIDs()))
s.cpu = int32(s.NQ * segmentNum / 2)
segmentNum = int64(len(s.req.GetSegmentIDs()))
}
cpu := float64(s.NQ*segmentNum) * Params.QueryNodeCfg.CPURatio
s.cpu = int32(cpu)
if s.cpu <= 0 {
s.cpu = 5
} else if s.cpu > s.maxCPU {
......
......@@ -122,17 +122,12 @@ func validateOnStreamReplica(replica ReplicaInterface, collectionID UniqueID, pa
return searchPartIDs, segmentIDs, nil
}
for _, partID := range searchPartIDs {
segIDs, err2 := replica.getSegmentIDsByVChannel(partID, vChannel)
log.Debug("get segmentIDs by vChannel",
zap.Any("collectionID", collectionID),
zap.Any("vChannel", vChannel),
zap.Any("partitionID", partID),
zap.Any("segmentIDs", segIDs))
if err2 != nil {
return searchPartIDs, segmentIDs, err2
}
segmentIDs = append(segmentIDs, segIDs...)
}
segmentIDs, err = replica.getSegmentIDsByVChannel(searchPartIDs, vChannel, segmentTypeGrowing)
log.Debug("validateOnStreamReplica getSegmentIDsByVChannel",
zap.Any("collectionID", collectionID),
zap.Any("vChannel", vChannel),
zap.Any("partitionIDs", searchPartIDs),
zap.Any("segmentIDs", segmentIDs),
zap.Error(err))
return searchPartIDs, segmentIDs, nil
}
......@@ -709,8 +709,10 @@ type queryNodeConfig struct {
GroupEnabled bool
MaxReceiveChanSize int32
MaxUnsolvedQueueSize int32
MaxReadConcurrency int32
MaxGroupNQ int64
TopKMergeRatio float64
CPURatio float64
}
func (p *queryNodeConfig) init(base *BaseTable) {
......@@ -733,9 +735,11 @@ func (p *queryNodeConfig) init(base *BaseTable) {
p.initGroupEnabled()
p.initMaxReceiveChanSize()
p.initMaxReadConcurrency()
p.initMaxUnsolvedQueueSize()
p.initMaxGroupNQ()
p.initTopKMergeRatio()
p.initCPURatio()
}
// InitAlias initializes an alias for the QueryNode role.
......@@ -850,11 +854,22 @@ func (p *queryNodeConfig) initGroupEnabled() {
}
func (p *queryNodeConfig) initMaxReceiveChanSize() {
p.MaxReceiveChanSize = p.Base.ParseInt32WithDefault("queryNode.grouping.receiveChanSize", 10240)
p.MaxReceiveChanSize = p.Base.ParseInt32WithDefault("queryNode.scheduler.receiveChanSize", 10240)
}
func (p *queryNodeConfig) initMaxUnsolvedQueueSize() {
p.MaxUnsolvedQueueSize = p.Base.ParseInt32WithDefault("queryNode.grouping.unsolvedQueueSize", 10240)
p.MaxUnsolvedQueueSize = p.Base.ParseInt32WithDefault("queryNode.scheduler.unsolvedQueueSize", 10240)
}
func (p *queryNodeConfig) initCPURatio() {
p.CPURatio = p.Base.ParseFloatWithDefault("queryNode.scheduler.cpuRatio", 10.0)
}
func (p *queryNodeConfig) initMaxReadConcurrency() {
p.MaxReadConcurrency = p.Base.ParseInt32WithDefault("queryNode.scheduler.maxReadConcurrency", 0)
if p.MaxReadConcurrency <= 0 {
p.MaxReadConcurrency = math.MaxInt32
}
}
func (p *queryNodeConfig) initMaxGroupNQ() {
......
......@@ -12,6 +12,7 @@
package paramtable
import (
"math"
"os"
"path"
"testing"
......@@ -244,8 +245,10 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, true, Params.GroupEnabled)
assert.Equal(t, int32(10240), Params.MaxReceiveChanSize)
assert.Equal(t, int32(10240), Params.MaxUnsolvedQueueSize)
assert.Equal(t, int32(math.MaxInt32), Params.MaxReadConcurrency)
assert.Equal(t, int64(1000), Params.MaxGroupNQ)
assert.Equal(t, 10.0, Params.TopKMergeRatio)
assert.Equal(t, 10.0, Params.CPURatio)
// test small indexNlist/NProbe default
Params.Base.Remove("queryNode.segcore.smallIndex.nlist")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册