未验证 提交 0d51f55c 编写于 作者: W wei liu 提交者: GitHub

enable config each collection has different rate limit (#24139)

Signed-off-by: NWei Liu <wei.liu@zilliz.com>
上级 c882c75b
......@@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
......@@ -88,6 +89,7 @@ type QuotaCenter struct {
proxies *proxyClientManager
queryCoord types.QueryCoord
dataCoord types.DataCoord
meta IMetaTable
// metrics
queryNodeMetrics map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics
......@@ -111,7 +113,7 @@ type QuotaCenter struct {
}
// NewQuotaCenter returns a new QuotaCenter.
func NewQuotaCenter(proxies *proxyClientManager, queryCoord types.QueryCoord, dataCoord types.DataCoord, tsoAllocator tso.Allocator) *QuotaCenter {
func NewQuotaCenter(proxies *proxyClientManager, queryCoord types.QueryCoord, dataCoord types.DataCoord, tsoAllocator tso.Allocator, meta IMetaTable) *QuotaCenter {
return &QuotaCenter{
proxies: proxies,
queryCoord: queryCoord,
......@@ -119,6 +121,7 @@ func NewQuotaCenter(proxies *proxyClientManager, queryCoord types.QueryCoord, da
currentRates: make(map[int64]map[internalpb.RateType]Limit),
quotaStates: make(map[int64]map[milvuspb.QuotaState]commonpb.ErrorCode),
tsoAllocator: tsoAllocator,
meta: meta,
readableCollections: make([]int64, 0),
writableCollections: make([]int64, 0),
......@@ -403,10 +406,14 @@ func (q *QuotaCenter) calculateReadRates() {
zap.Int64("collectionID", collection),
zap.Any("queryRate", q.currentRates[collection][internalpb.RateType_DQLQuery]))
}
collectionProps := q.getCollectionLimitConfig(collection)
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionSearchRateMinKey), internalpb.RateType_DQLSearch, collection)
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionQueryRateMinKey), internalpb.RateType_DQLQuery, collection)
}
q.guaranteeMinRate(Params.QuotaConfig.DQLMinSearchRatePerCollection.GetAsFloat(), internalpb.RateType_DQLSearch, collections...)
q.guaranteeMinRate(Params.QuotaConfig.DQLMinQueryRatePerCollection.GetAsFloat(), internalpb.RateType_DQLQuery, collections...)
log.RatedInfo(10, "QueryNodeMetrics when cool-off",
zap.Any("metrics", q.queryNodeMetrics))
}
// TODO: unify search and query?
......@@ -417,6 +424,7 @@ func (q *QuotaCenter) calculateReadRates() {
// calculateWriteRates calculates and sets dml rates.
func (q *QuotaCenter) calculateWriteRates() error {
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
if Params.QuotaConfig.ForceDenyWriting.GetAsBool() {
q.forceDenyWriting(commonpb.ErrorCode_ForceDeny)
return nil
......@@ -463,8 +471,13 @@ func (q *QuotaCenter) calculateWriteRates() error {
if q.currentRates[collection][internalpb.RateType_DMLDelete] != Inf {
q.currentRates[collection][internalpb.RateType_DMLDelete] *= Limit(factor)
}
q.guaranteeMinRate(Params.QuotaConfig.DMLMinInsertRatePerCollection.GetAsFloat(), internalpb.RateType_DMLInsert)
q.guaranteeMinRate(Params.QuotaConfig.DMLMinDeleteRatePerCollection.GetAsFloat(), internalpb.RateType_DMLDelete)
collectionProps := q.getCollectionLimitConfig(collection)
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionInsertRateMinKey), internalpb.RateType_DMLInsert, collection)
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionDeleteRateMinKey), internalpb.RateType_DMLDelete, collection)
log.RatedDebug(10, "QuotaCenter cool write rates off done",
zap.Int64("collectionID", collection),
zap.Float64("factor", factor))
}
return nil
......@@ -700,23 +713,43 @@ func (q *QuotaCenter) resetCurrentRate(rt internalpb.RateType, collection int64)
if q.quotaStates[collection] == nil {
q.quotaStates[collection] = make(map[milvuspb.QuotaState]commonpb.ErrorCode)
}
collectionProps := q.getCollectionLimitConfig(collection)
switch rt {
case internalpb.RateType_DMLInsert:
q.currentRates[collection][rt] = Limit(Params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat())
q.currentRates[collection][rt] = Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionInsertRateMaxKey))
case internalpb.RateType_DMLDelete:
q.currentRates[collection][rt] = Limit(Params.QuotaConfig.DMLMaxDeleteRatePerCollection.GetAsFloat())
q.currentRates[collection][rt] = Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionDeleteRateMaxKey))
case internalpb.RateType_DMLBulkLoad:
q.currentRates[collection][rt] = Limit(Params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.GetAsFloat())
q.currentRates[collection][rt] = Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionBulkLoadRateMaxKey))
case internalpb.RateType_DQLSearch:
q.currentRates[collection][rt] = Limit(Params.QuotaConfig.DQLMaxSearchRatePerCollection.GetAsFloat())
q.currentRates[collection][rt] = Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionSearchRateMaxKey))
case internalpb.RateType_DQLQuery:
q.currentRates[collection][rt] = Limit(Params.QuotaConfig.DQLMaxQueryRatePerCollection.GetAsFloat())
q.currentRates[collection][rt] = Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionQueryRateMaxKey))
}
if q.currentRates[collection][rt] < 0 {
q.currentRates[collection][rt] = Inf // no limit
}
}
func (q *QuotaCenter) getCollectionLimitConfig(collection int64) map[string]string {
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
collectionInfo, err := q.meta.GetCollectionByID(context.TODO(), collection, typeutil.MaxTimestamp, false)
if err != nil {
log.RatedWarn(10, "failed to get collection rate limit config",
zap.Int64("collectionID", collection),
zap.Error(err))
return make(map[string]string)
}
properties := make(map[string]string)
for _, pair := range collectionInfo.Properties {
properties[pair.GetKey()] = pair.GetValue()
}
return properties
}
// checkDiskQuota checks if disk quota exceeded.
func (q *QuotaCenter) checkDiskQuota() {
q.diskMu.Lock()
......@@ -729,8 +762,9 @@ func (q *QuotaCenter) checkDiskQuota() {
}
collections := typeutil.NewUniqueSet()
totalDiskQuota := Params.QuotaConfig.DiskQuota.GetAsFloat()
colDiskQuota := Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat()
for collection, binlogSize := range q.dataCoordMetrics.CollectionBinlogSize {
collectionProps := q.getCollectionLimitConfig(collection)
colDiskQuota := getCollectionRateLimitConfig(collectionProps, common.CollectionDiskQuotaKey)
if float64(binlogSize) >= colDiskQuota {
log.RatedWarn(10, "collection disk quota exceeded",
zap.Int64("collection", collection),
......
......@@ -28,8 +28,12 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/internalpb"
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
......@@ -69,7 +73,9 @@ func TestQuotaCenter(t *testing.T) {
t.Run("test QuotaCenter", func(t *testing.T) {
qc := types.NewMockQueryCoord(t)
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
go quotaCenter.run()
time.Sleep(10 * time.Millisecond)
quotaCenter.stop()
......@@ -77,35 +83,39 @@ func TestQuotaCenter(t *testing.T) {
t.Run("test syncMetrics", func(t *testing.T) {
qc := types.NewMockQueryCoord(t)
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{Status: succStatus()}, nil)
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
err = quotaCenter.syncMetrics()
assert.Error(t, err) // for empty response
quotaCenter = NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
quotaCenter = NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
err = quotaCenter.syncMetrics()
assert.Error(t, err)
quotaCenter = NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{retFailStatus: true}, core.tsoAllocator)
quotaCenter = NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{retFailStatus: true}, core.tsoAllocator, meta)
err = quotaCenter.syncMetrics()
assert.Error(t, err)
qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock err"))
quotaCenter = NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{retErr: true}, core.tsoAllocator)
quotaCenter = NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{retErr: true}, core.tsoAllocator, meta)
err = quotaCenter.syncMetrics()
assert.Error(t, err)
qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "mock failure status"),
}, nil)
quotaCenter = NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
quotaCenter = NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
err = quotaCenter.syncMetrics()
assert.Error(t, err)
})
t.Run("test forceDeny", func(t *testing.T) {
qc := types.NewMockQueryCoord(t)
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
quotaCenter.readableCollections = []int64{1, 2, 3}
quotaCenter.resetAllCurrentRates()
quotaCenter.forceDenyReading(commonpb.ErrorCode_ForceDeny, 1, 2, 3, 4)
......@@ -131,7 +141,9 @@ func TestQuotaCenter(t *testing.T) {
t.Run("test calculateRates", func(t *testing.T) {
qc := types.NewMockQueryCoord(t)
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
err = quotaCenter.calculateRates()
assert.NoError(t, err)
alloc := newMockTsoAllocator()
......@@ -145,7 +157,9 @@ func TestQuotaCenter(t *testing.T) {
t.Run("test getTimeTickDelayFactor factors", func(t *testing.T) {
qc := types.NewMockQueryCoord(t)
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
type ttCase struct {
maxTtDelay time.Duration
curTt time.Time
......@@ -191,7 +205,9 @@ func TestQuotaCenter(t *testing.T) {
t.Run("test TimeTickDelayFactor factors", func(t *testing.T) {
qc := types.NewMockQueryCoord(t)
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
type ttCase struct {
delay time.Duration
expectedFactor float64
......@@ -260,7 +276,9 @@ func TestQuotaCenter(t *testing.T) {
t.Run("test calculateReadRates", func(t *testing.T) {
qc := types.NewMockQueryCoord(t)
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
quotaCenter.readableCollections = []int64{1, 2, 3}
quotaCenter.proxyMetrics = map[UniqueID]*metricsinfo.ProxyQuotaMetrics{
1: {Rms: []metricsinfo.RateMetric{
......@@ -317,7 +335,9 @@ func TestQuotaCenter(t *testing.T) {
t.Run("test calculateWriteRates", func(t *testing.T) {
qc := types.NewMockQueryCoord(t)
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
err = quotaCenter.calculateWriteRates()
assert.NoError(t, err)
......@@ -354,7 +374,9 @@ func TestQuotaCenter(t *testing.T) {
t.Run("test MemoryFactor factors", func(t *testing.T) {
qc := types.NewMockQueryCoord(t)
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
type memCase struct {
lowWater float64
highWater float64
......@@ -407,7 +429,9 @@ func TestQuotaCenter(t *testing.T) {
t.Run("test GrowingSegmentsSize factors", func(t *testing.T) {
qc := types.NewMockQueryCoord(t)
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
tests := []struct {
low float64
high float64
......@@ -459,7 +483,9 @@ func TestQuotaCenter(t *testing.T) {
t.Run("test checkDiskQuota", func(t *testing.T) {
qc := types.NewMockQueryCoord(t)
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
quotaCenter.checkDiskQuota()
// total DiskQuota exceeded
......@@ -501,7 +527,9 @@ func TestQuotaCenter(t *testing.T) {
pcm := &proxyClientManager{proxyClient: map[int64]types.Proxy{
TestProxyID: p1,
}}
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
quotaCenter.resetAllCurrentRates()
collectionID := int64(1)
quotaCenter.currentRates[collectionID] = make(map[internalpb.RateType]ratelimitutil.Limit)
......@@ -515,7 +543,9 @@ func TestQuotaCenter(t *testing.T) {
t.Run("test recordMetrics", func(t *testing.T) {
qc := types.NewMockQueryCoord(t)
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
collectionID := int64(1)
quotaCenter.quotaStates[collectionID] = make(map[milvuspb.QuotaState]commonpb.ErrorCode)
quotaCenter.quotaStates[collectionID][milvuspb.QuotaState_DenyToWrite] = commonpb.ErrorCode_MemoryQuotaExhausted
......@@ -525,7 +555,9 @@ func TestQuotaCenter(t *testing.T) {
t.Run("test guaranteeMinRate", func(t *testing.T) {
qc := types.NewMockQueryCoord(t)
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator)
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
quotaCenter.resetAllCurrentRates()
minRate := Limit(100)
collectionID := int64(1)
......@@ -552,7 +584,9 @@ func TestQuotaCenter(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
collection := UniqueID(0)
quotaCenter := NewQuotaCenter(pcm, nil, &dataCoordMockForQuota{}, core.tsoAllocator)
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
quotaCenter := NewQuotaCenter(pcm, nil, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
quotaCenter.resetAllCurrentRates()
quotaBackup := Params.QuotaConfig.DiskQuota
colQuotaBackup := Params.QuotaConfig.DiskQuotaPerCollection
......@@ -570,4 +604,56 @@ func TestQuotaCenter(t *testing.T) {
})
}
})
t.Run("test reset current rates", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
quotaCenter := NewQuotaCenter(pcm, nil, &dataCoordMockForQuota{}, core.tsoAllocator, meta)
quotaCenter.readableCollections = []int64{1}
quotaCenter.writableCollections = []int64{1}
quotaCenter.resetAllCurrentRates()
assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DMLInsert]), Params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat())
assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DMLDelete]), Params.QuotaConfig.DMLMaxDeleteRatePerCollection.GetAsFloat())
assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DMLBulkLoad]), Params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.GetAsFloat())
assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DQLSearch]), Params.QuotaConfig.DQLMaxSearchRatePerCollection.GetAsFloat())
assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DQLQuery]), Params.QuotaConfig.DQLMaxQueryRatePerCollection.GetAsFloat())
meta.ExpectedCalls = nil
meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&model.Collection{
Properties: []*commonpb.KeyValuePair{
{
Key: common.CollectionInsertRateMaxKey,
Value: "1",
},
{
Key: common.CollectionDeleteRateMaxKey,
Value: "2",
},
{
Key: common.CollectionBulkLoadRateMaxKey,
Value: "3",
},
{
Key: common.CollectionQueryRateMaxKey,
Value: "4",
},
{
Key: common.CollectionSearchRateMaxKey,
Value: "5",
},
},
}, nil)
quotaCenter.resetAllCurrentRates()
assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DMLInsert]), float64(1*1024*1024))
assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DMLDelete]), float64(2*1024*1024))
assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DMLBulkLoad]), float64(3*1024*1024))
assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DQLQuery]), float64(4))
assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DQLSearch]), float64(5))
})
}
......@@ -445,7 +445,7 @@ func (c *Core) initInternal() error {
c.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
c.quotaCenter = NewQuotaCenter(c.proxyClientManager, c.queryCoord, c.dataCoord, c.tsoAllocator)
c.quotaCenter = NewQuotaCenter(c.proxyClientManager, c.queryCoord, c.dataCoord, c.tsoAllocator, c.meta)
log.Debug("RootCoord init QuotaCenter done")
if err := c.initImportManager(); err != nil {
......
......@@ -19,6 +19,7 @@ package rootcoord
import (
"encoding/json"
"fmt"
"strconv"
"go.uber.org/zap"
......@@ -135,3 +136,87 @@ func getTravelTs(req TimeTravelRequest) Timestamp {
func isMaxTs(ts Timestamp) bool {
return ts == typeutil.MaxTimestamp
}
func getCollectionRateLimitConfigDefaultValue(configKey string) float64 {
switch configKey {
case common.CollectionInsertRateMaxKey:
return Params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat()
case common.CollectionInsertRateMinKey:
return Params.QuotaConfig.DMLMinInsertRatePerCollection.GetAsFloat()
case common.CollectionDeleteRateMaxKey:
return Params.QuotaConfig.DMLMaxDeleteRatePerCollection.GetAsFloat()
case common.CollectionDeleteRateMinKey:
return Params.QuotaConfig.DMLMinDeleteRatePerCollection.GetAsFloat()
case common.CollectionBulkLoadRateMaxKey:
return Params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.GetAsFloat()
case common.CollectionBulkLoadRateMinKey:
return Params.QuotaConfig.DMLMinBulkLoadRatePerCollection.GetAsFloat()
case common.CollectionQueryRateMaxKey:
return Params.QuotaConfig.DQLMaxQueryRatePerCollection.GetAsFloat()
case common.CollectionQueryRateMinKey:
return Params.QuotaConfig.DQLMinQueryRatePerCollection.GetAsFloat()
case common.CollectionSearchRateMaxKey:
return Params.QuotaConfig.DQLMaxSearchRatePerCollection.GetAsFloat()
case common.CollectionSearchRateMinKey:
return Params.QuotaConfig.DQLMinSearchRatePerCollection.GetAsFloat()
case common.CollectionDiskQuotaKey:
return Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat()
default:
return float64(0)
}
}
func getCollectionRateLimitConfig(properties map[string]string, configKey string) float64 {
megaBytes2Bytes := func(v float64) float64 {
return v * 1024.0 * 1024.0
}
toBytesIfNecessary := func(rate float64) float64 {
switch configKey {
case common.CollectionInsertRateMaxKey:
return megaBytes2Bytes(rate)
case common.CollectionInsertRateMinKey:
return megaBytes2Bytes(rate)
case common.CollectionDeleteRateMaxKey:
return megaBytes2Bytes(rate)
case common.CollectionDeleteRateMinKey:
return megaBytes2Bytes(rate)
case common.CollectionBulkLoadRateMaxKey:
return megaBytes2Bytes(rate)
case common.CollectionBulkLoadRateMinKey:
return megaBytes2Bytes(rate)
case common.CollectionQueryRateMaxKey:
return rate
case common.CollectionQueryRateMinKey:
return rate
case common.CollectionSearchRateMaxKey:
return rate
case common.CollectionSearchRateMinKey:
return rate
case common.CollectionDiskQuotaKey:
return megaBytes2Bytes(rate)
default:
return float64(0)
}
}
v, ok := properties[configKey]
if ok {
rate, err := strconv.ParseFloat(v, 64)
if err != nil {
log.Warn("invalid configuration for collection dml rate",
zap.String("config item", configKey),
zap.String("config value", v))
return getCollectionRateLimitConfigDefaultValue(configKey)
}
rateInBytes := toBytesIfNecessary(rate)
if rateInBytes < 0 {
return getCollectionRateLimitConfigDefaultValue(configKey)
}
return rateInBytes
}
return getCollectionRateLimitConfigDefaultValue(configKey)
}
......@@ -19,13 +19,13 @@ package rootcoord
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/typeutil"
"github.com/stretchr/testify/assert"
)
func Test_EqualKeyPairArray(t *testing.T) {
......@@ -148,3 +148,161 @@ func Test_isMaxTs(t *testing.T) {
})
}
}
func Test_getCollectionRateLimitConfig(t *testing.T) {
type args struct {
properties map[string]string
configKey string
}
configMap := map[string]string{
common.CollectionInsertRateMaxKey: "5",
common.CollectionInsertRateMinKey: "5",
common.CollectionDeleteRateMaxKey: "5",
common.CollectionDeleteRateMinKey: "5",
common.CollectionBulkLoadRateMaxKey: "5",
common.CollectionBulkLoadRateMinKey: "5",
common.CollectionQueryRateMaxKey: "5",
common.CollectionQueryRateMinKey: "5",
common.CollectionSearchRateMaxKey: "5",
common.CollectionSearchRateMinKey: "5",
common.CollectionDiskQuotaKey: "5",
}
tests := []struct {
name string
args args
want float64
}{
{
name: "test CollectionInsertRateMaxKey",
args: args{
properties: configMap,
configKey: common.CollectionInsertRateMaxKey,
},
want: float64(5 * 1024 * 1024),
},
{
name: "test CollectionInsertRateMinKey",
args: args{
properties: configMap,
configKey: common.CollectionInsertRateMinKey,
},
want: float64(5 * 1024 * 1024),
},
{
name: "test CollectionDeleteRateMaxKey",
args: args{
properties: configMap,
configKey: common.CollectionDeleteRateMaxKey,
},
want: float64(5 * 1024 * 1024),
},
{
name: "test CollectionDeleteRateMinKey",
args: args{
properties: configMap,
configKey: common.CollectionDeleteRateMinKey,
},
want: float64(5 * 1024 * 1024),
},
{
name: "test CollectionBulkLoadRateMaxKey",
args: args{
properties: configMap,
configKey: common.CollectionBulkLoadRateMaxKey,
},
want: float64(5 * 1024 * 1024),
},
{
name: "test CollectionBulkLoadRateMinKey",
args: args{
properties: configMap,
configKey: common.CollectionBulkLoadRateMinKey,
},
want: float64(5 * 1024 * 1024),
},
{
name: "test CollectionQueryRateMaxKey",
args: args{
properties: configMap,
configKey: common.CollectionQueryRateMaxKey,
},
want: float64(5),
},
{
name: "test CollectionQueryRateMinKey",
args: args{
properties: configMap,
configKey: common.CollectionQueryRateMinKey,
},
want: float64(5),
},
{
name: "test CollectionSearchRateMaxKey",
args: args{
properties: configMap,
configKey: common.CollectionSearchRateMaxKey,
},
want: float64(5),
},
{
name: "test CollectionSearchRateMinKey",
args: args{
properties: configMap,
configKey: common.CollectionSearchRateMinKey,
},
want: float64(5),
},
{
name: "test CollectionDiskQuotaKey",
args: args{
properties: configMap,
configKey: common.CollectionDiskQuotaKey,
},
want: float64(5 * 1024 * 1024),
},
{
name: "test invalid config value",
args: args{
properties: map[string]string{common.CollectionDiskQuotaKey: "invalid value"},
configKey: common.CollectionDiskQuotaKey,
},
want: Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat(),
},
{
name: "test empty config item",
args: args{
properties: map[string]string{},
configKey: common.CollectionDiskQuotaKey,
},
want: Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat(),
},
{
name: "test unknown config item",
args: args{
properties: configMap,
configKey: "",
},
want: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := getCollectionRateLimitConfig(tt.args.properties, tt.args.configKey)
if got != tt.want {
t.Errorf("getCollectionRateLimitConfig() = %v, want %v", got, tt.want)
}
})
}
}
......@@ -100,6 +100,19 @@ const (
const (
CollectionTTLConfigKey = "collection.ttl.seconds"
CollectionAutoCompactionKey = "collection.autocompaction.enabled"
// rate limit
CollectionInsertRateMaxKey = "collection.insertRate.max.mb"
CollectionInsertRateMinKey = "collection.insertRate.min.mb"
CollectionDeleteRateMaxKey = "collection.deleteRate.max.mb"
CollectionDeleteRateMinKey = "collection.deleteRate.min.mb"
CollectionBulkLoadRateMaxKey = "collection.bulkLoadRate.max.mb"
CollectionBulkLoadRateMinKey = "collection.bulkLoadRate.min.mb"
CollectionQueryRateMaxKey = "collection.queryRate.max.qps"
CollectionQueryRateMinKey = "collection.queryRate.min.qps"
CollectionSearchRateMaxKey = "collection.searchRate.max.vps"
CollectionSearchRateMinKey = "collection.searchRate.min.vps"
CollectionDiskQuotaKey = "collection.diskProtection.diskQuota.mb"
)
const (
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册