diff --git a/internal/proxy/multi_rate_limiter.go b/internal/proxy/multi_rate_limiter.go index 6ca55923e6d1e1f5b416433c1ac5f9e331ec22de..506ceeedf0efc6c8162a0dcd31c4e4bcc376b18e 100644 --- a/internal/proxy/multi_rate_limiter.go +++ b/internal/proxy/multi_rate_limiter.go @@ -122,14 +122,12 @@ func (rl *rateLimiter) registerLimiters() { case internalpb.RateType_DQLQuery: r = Params.QuotaConfig.DQLMaxQueryRate } - log.Info("RateLimiter register for rateType", - zap.String("rateType", internalpb.RateType_name[rt]), - zap.String("rate", ratelimitutil.Limit(r).String())) limit := ratelimitutil.Limit(r) - if limit < 0 { - limit = ratelimitutil.Inf - } burst := int(r) // use rate as burst, because Limiter is with punishment mechanism, burst is insignificant. rl.limiters[internalpb.RateType(rt)] = ratelimitutil.NewLimiter(limit, burst) + log.Info("RateLimiter register for rateType", + zap.String("rateType", internalpb.RateType_name[rt]), + zap.String("rate", ratelimitutil.Limit(r).String()), + zap.String("burst", fmt.Sprintf("%v", burst))) } } diff --git a/internal/proxy/multi_rate_limiter_test.go b/internal/proxy/multi_rate_limiter_test.go index cdd0c88c6c062f5e787caac37e7bc35937f721b4..c6d1a48eb22f73c2de58fb731a0408ff9a0a7c53 100644 --- a/internal/proxy/multi_rate_limiter_test.go +++ b/internal/proxy/multi_rate_limiter_test.go @@ -49,11 +49,33 @@ func TestMultiRateLimiter(t *testing.T) { multiLimiter := NewMultiRateLimiter() bak := Params.QuotaConfig.QuotaAndLimitsEnabled Params.QuotaConfig.QuotaAndLimitsEnabled = false - ok, r := multiLimiter.Limit(internalpb.RateType(0), 1) - assert.False(t, ok) - assert.NotEqual(t, float64(0), r) + for _, rt := range internalpb.RateType_value { + ok, r := multiLimiter.Limit(internalpb.RateType(rt), 1) + assert.False(t, ok) + assert.NotEqual(t, float64(0), r) + } Params.QuotaConfig.QuotaAndLimitsEnabled = bak }) + + t.Run("test limit", func(t *testing.T) { + run := func(insertRate float64) { + bakInsertRate := Params.QuotaConfig.DMLMaxInsertRate + Params.QuotaConfig.DMLMaxInsertRate = insertRate + multiLimiter := NewMultiRateLimiter() + bak := Params.QuotaConfig.QuotaAndLimitsEnabled + Params.QuotaConfig.QuotaAndLimitsEnabled = true + ok, r := multiLimiter.Limit(internalpb.RateType_DMLInsert, 1*1024*1024) + assert.False(t, ok) + assert.NotEqual(t, float64(0), r) + Params.QuotaConfig.QuotaAndLimitsEnabled = bak + Params.QuotaConfig.DMLMaxInsertRate = bakInsertRate + } + run(math.MaxInt) + run(math.MaxInt / 1.2) + run(math.MaxInt / 2) + run(math.MaxInt / 3) + run(math.MaxInt / 10000) + }) } func TestRateLimiter(t *testing.T) { @@ -90,14 +112,8 @@ func TestRateLimiter(t *testing.T) { assert.NoError(t, err) for _, rt := range internalpb.RateType_value { for i := 0; i < 100; i++ { - if i == 0 { - // Consume token initialed in bucket - ok, _ := limiter.limit(internalpb.RateType(rt), 1) - assert.False(t, ok) - } else { - ok, _ := limiter.limit(internalpb.RateType(rt), 1) - assert.True(t, ok) - } + ok, _ := limiter.limit(internalpb.RateType(rt), 1) + assert.True(t, ok) } } }) diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index 64841ace2ad1ff1b4cea1d6de9358551f75b5de3..a4ea80f1a5f710b2c78061631d8f298903e8967b 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -654,7 +654,11 @@ func (q *QuotaCenter) setRates() error { } rates := make([]*internalpb.Rate, 0, len(q.currentRates)) for rt, r := range q.currentRates { - rates = append(rates, &internalpb.Rate{Rt: rt, R: float64(r) / float64(proxyNum)}) + if r == Inf { + rates = append(rates, &internalpb.Rate{Rt: rt, R: float64(r)}) + } else { + rates = append(rates, &internalpb.Rate{Rt: rt, R: float64(r) / float64(proxyNum)}) + } } return rates } diff --git a/internal/util/paramtable/quota_param.go b/internal/util/paramtable/quota_param.go index b38b11862a71f58df86fbc4f98b6fabea5e3be3e..3e9075d9c7719aa7c582b89b2faf777da2531d22 100644 --- a/internal/util/paramtable/quota_param.go +++ b/internal/util/paramtable/quota_param.go @@ -29,7 +29,7 @@ import ( const ( // defaultMax is the default unlimited rate or threshold. - defaultMax = float64(math.MaxFloat64) + defaultMax = float64(math.MaxInt) // MBSize used to convert megabytes and bytes. MBSize = 1024.0 * 1024.0 // defaultDiskQuotaInMB is the default disk quota in megabytes. diff --git a/internal/util/ratelimitutil/limiter.go b/internal/util/ratelimitutil/limiter.go index f4b33328e40fe3d9c014526a3fb9659cc018a7d6..0e0be448ee5fa3544727feb9de4bba5c48697672 100644 --- a/internal/util/ratelimitutil/limiter.go +++ b/internal/util/ratelimitutil/limiter.go @@ -31,7 +31,7 @@ import ( type Limit float64 // Inf is the infinite rate limit; it allows all events. -const Inf = Limit(math.MaxFloat64) +const Inf = Limit(math.MaxInt) // A Limiter controls how frequently events are allowed to happen. // It implements a "token bucket" of size b, initially full and refilled @@ -113,6 +113,12 @@ func (lim *Limiter) SetLimit(newLimit Limit) { lim.last = now lim.tokens = tokens lim.limit = newLimit + if newLimit >= math.MaxInt { + lim.burst = math.MaxInt + } else { + // use rate as burst, because Limiter is with punishment mechanism, burst is insignificant. + lim.burst = int(newLimit) + } } // advance calculates and returns an updated state for lim resulting from the passage of time. diff --git a/internal/util/ratelimitutil/limiter_test.go b/internal/util/ratelimitutil/limiter_test.go index 0707bb4ac3ed8f8cb0c8c2b4877ba89585bf0a73..e2a1cf9c111375a610aadd0af35ec02463dc2a78 100644 --- a/internal/util/ratelimitutil/limiter_test.go +++ b/internal/util/ratelimitutil/limiter_test.go @@ -18,6 +18,7 @@ package ratelimitutil import ( "fmt" + "math" "sync" "sync/atomic" "testing" @@ -63,6 +64,15 @@ func run(t *testing.T, lim *Limiter, allows []allow) { } } +func runWithoutCheckToken(t *testing.T, lim *Limiter, allows []allow) { + for i, a := range allows { + ok := lim.AllowN(a.t, a.n) + if ok != a.ok { + t.Errorf("step %d: lim.AllowN(%v, %v) = %v want %v", i, a.t, a.n, ok, a.ok) + } + } +} + func TestLimit(t *testing.T) { t.Run("test limit", func(t *testing.T) { // test base @@ -119,18 +129,30 @@ func TestLimit(t *testing.T) { {t2, 1, false, -1}, {t2, 1, false, -1}, }) + + limit := Inf + burst := math.MaxInt + limiter := NewLimiter(limit, burst) + runWithoutCheckToken(t, limiter, []allow{ + {t0, 1 * 1024 * 1024, true, 0}, + {t1, 1 * 1024 * 1024, true, 0}, + {t2, 1 * 1024 * 1024, true, 0}, + {t3, 1 * 1024 * 1024, true, 0}, + {t4, 1 * 1024 * 1024, true, 0}, + {t5, 1 * 1024 * 1024, true, 0}, + }) }) t.Run("test SetLimit", func(t *testing.T) { - lim := NewLimiter(10, 2) + lim := NewLimiter(10, 10) run(t, lim, []allow{ - {t0, 5, true, -3}, - {t0, 1, false, -3}, - {t1, 1, false, -3}, + {t0, 5, true, 5}, + {t0, 1, true, 4}, + {t1, 1, true, 4}, }) lim.SetLimit(100) - run(t, lim, []allow{{t2, 10, true, -8}}) + runWithoutCheckToken(t, lim, []allow{{t2, 10, true, 0}}) }) t.Run("test no truncation error", func(t *testing.T) {