From 22b32465e7e29a0b919a5a8e8be647eb99e22729 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Fri, 14 Oct 2022 14:27:25 +0800 Subject: [PATCH] Make bulkLoad failed if quota exceeds and remove autoindex config (#19770) Signed-off-by: bigsheeper Signed-off-by: bigsheeper --- configs/milvus.yaml | 4 ---- internal/datacoord/meta.go | 9 +++++++ internal/datacoord/meta_test.go | 25 ++++++++++++++++++++ internal/util/paramtable/quota_param.go | 8 ++++--- internal/util/paramtable/quota_param_test.go | 5 ++-- 5 files changed, 42 insertions(+), 9 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 356c2d66d..9b5886bae 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -468,7 +468,3 @@ quotaAndLimits: maxReadResultRate: -1 # MB/s, default no limit # coolOffSpeed is the speed of search&query rates cool off. coolOffSpeed: 0.9 # (0, 1] - -# AutoIndexConfig -autoIndex: - enable: false diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 12e080e8c..346412752 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -477,9 +477,18 @@ func (m *meta) UpdateFlushSegmentsInfo( s.NumOfRows = cp.GetNumOfRows() modSegments[cp.GetSegmentID()] = s } + var totalSize int64 segments := make([]*datapb.SegmentInfo, 0, len(modSegments)) for _, seg := range modSegments { segments = append(segments, seg.SegmentInfo) + totalSize += seg.getSegmentSize() + } + // check disk quota + for _, seg := range m.segments.GetSegments() { + totalSize += seg.getSegmentSize() + } + if float64(totalSize) >= Params.QuotaConfig.DiskQuota { + return fmt.Errorf("UpdateFlushSegmentsInfo failed: disk quota exceeds if update, segID = %d", segmentID) } if err := m.catalog.AlterSegments(m.ctx, segments); err != nil { log.Error("meta update: update flush segments info - failed to store flush segment info into Etcd", diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 0f5db7fec..f9d8e5326 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -514,6 +514,7 @@ func TestGetUnFlushedSegments(t *testing.T) { } func TestUpdateFlushSegmentsInfo(t *testing.T) { + Params.Init() t.Run("normal", func(t *testing.T) { meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "") assert.Nil(t, err) @@ -590,6 +591,30 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { assert.Nil(t, segmentInfo.Binlogs) assert.Nil(t, segmentInfo.StartPosition) }) + + t.Run("test exceed disk quota", func(t *testing.T) { + meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "") + assert.Nil(t, err) + + diskQuotaBackup := Params.QuotaConfig.DiskQuota + const ( + diskQuota = 5 * 1024 * 1024 * 1024 + segmentSize = 3 * 1024 * 1024 * 1024 + ) + Params.QuotaConfig.DiskQuota = diskQuota + segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing, + Binlogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: segmentSize}}}}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statslog0")}}} + err = meta.AddSegment(segment1) + assert.Nil(t, err) + + err = meta.UpdateFlushSegmentsInfo(1, true, false, true, []*datapb.FieldBinlog{getFieldBinlogPaths(1, "binlog1")}, + []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statslog1")}, + []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: segmentSize}}}}, + []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}) + assert.Error(t, err) + Params.QuotaConfig.DiskQuota = diskQuotaBackup + }) } func TestSaveHandoffMeta(t *testing.T) { diff --git a/internal/util/paramtable/quota_param.go b/internal/util/paramtable/quota_param.go index 754d842fc..afb23c916 100644 --- a/internal/util/paramtable/quota_param.go +++ b/internal/util/paramtable/quota_param.go @@ -437,6 +437,7 @@ func (p *quotaConfig) initTtProtectionEnabled() { func (p *quotaConfig) initMaxTimeTickDelay() { if !p.TtProtectionEnabled { + p.MaxTimeTickDelay = math.MaxInt64 return } const defaultMaxTtDelay = 30.0 @@ -466,6 +467,7 @@ func (p *quotaConfig) initDataNodeMemoryLowWaterLevel() { func (p *quotaConfig) initDataNodeMemoryHighWaterLevel() { if !p.MemProtectionEnabled { + p.DataNodeMemoryHighWaterLevel = 1 return } p.DataNodeMemoryHighWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.memProtection.dataNodeMemoryHighWaterLevel", defaultHighWaterLevel) @@ -494,6 +496,7 @@ func (p *quotaConfig) initQueryNodeMemoryLowWaterLevel() { func (p *quotaConfig) initQueryNodeMemoryHighWaterLevel() { if !p.MemProtectionEnabled { + p.QueryNodeMemoryLowWaterLevel = defaultLowWaterLevel return } p.QueryNodeMemoryHighWaterLevel = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.memProtection.queryNodeMemoryHighWaterLevel", defaultHighWaterLevel) @@ -537,6 +540,7 @@ func (p *quotaConfig) initQueueProtectionEnabled() { func (p *quotaConfig) initNQInQueueThreshold() { if !p.QueueProtectionEnabled { + p.NQInQueueThreshold = math.MaxInt64 return } p.NQInQueueThreshold = p.Base.ParseInt64WithDefault("quotaAndLimits.limitReading.queueProtection.nqInQueueThreshold", math.MaxInt64) @@ -548,6 +552,7 @@ func (p *quotaConfig) initNQInQueueThreshold() { func (p *quotaConfig) initQueueLatencyThreshold() { if !p.QueueProtectionEnabled { + p.QueueLatencyThreshold = defaultMax return } p.QueueLatencyThreshold = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.queueProtection.queueLatencyThreshold", defaultMax) @@ -579,9 +584,6 @@ func (p *quotaConfig) initMaxReadResultRate() { func (p *quotaConfig) initCoolOffSpeed() { const defaultSpeed = 0.9 p.CoolOffSpeed = defaultSpeed - if !p.QueueProtectionEnabled { - return - } p.CoolOffSpeed = p.Base.ParseFloatWithDefault("quotaAndLimits.limitReading.coolOffSpeed", defaultSpeed) // (0, 1] if p.CoolOffSpeed <= 0 || p.CoolOffSpeed > 1 { diff --git a/internal/util/paramtable/quota_param_test.go b/internal/util/paramtable/quota_param_test.go index d765b3363..da05ef1ca 100644 --- a/internal/util/paramtable/quota_param_test.go +++ b/internal/util/paramtable/quota_param_test.go @@ -17,6 +17,7 @@ package paramtable import ( + "math" "testing" "time" @@ -84,8 +85,8 @@ func TestQuotaParam(t *testing.T) { t.Run("test limit reading", func(t *testing.T) { assert.False(t, qc.ForceDenyReading) assert.Equal(t, false, qc.QueueProtectionEnabled) - assert.Equal(t, int64(0), qc.NQInQueueThreshold) - assert.Equal(t, float64(0), qc.QueueLatencyThreshold) + assert.Equal(t, int64(math.MaxInt64), qc.NQInQueueThreshold) + assert.Equal(t, defaultMax, qc.QueueLatencyThreshold) assert.Equal(t, false, qc.ResultProtectionEnabled) assert.Equal(t, defaultMax, qc.MaxReadResultRate) assert.Equal(t, 0.9, qc.CoolOffSpeed) -- GitLab