Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
8729f82e
M
milvus
项目概览
milvus
/
milvus
9 个月 前同步成功
通知
260
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
8729f82e
编写于
5月 29, 2023
作者:
Y
yihao.dai
提交者:
GitHub
5月 29, 2023
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Improve rate limiter log (#24460)
Signed-off-by:
N
bigsheeper
<
yihao.dai@zilliz.com
>
上级
1dde46bd
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
8 addition
and
13 deletion
+8
-13
internal/proxy/multi_rate_limiter.go
internal/proxy/multi_rate_limiter.go
+2
-2
internal/rootcoord/quota_center.go
internal/rootcoord/quota_center.go
+6
-11
未找到文件。
internal/proxy/multi_rate_limiter.go
浏览文件 @
8729f82e
...
@@ -203,7 +203,7 @@ func (rl *rateLimiter) setRates(collectionRate *proxypb.CollectionRate) error {
...
@@ -203,7 +203,7 @@ func (rl *rateLimiter) setRates(collectionRate *proxypb.CollectionRate) error {
}
else
{
}
else
{
return
fmt
.
Errorf
(
"unregister rateLimiter for rateType %s"
,
r
.
GetRt
()
.
String
())
return
fmt
.
Errorf
(
"unregister rateLimiter for rateType %s"
,
r
.
GetRt
()
.
String
())
}
}
log
.
Rated
Info
(
30
,
"current collection rates in proxy"
,
log
.
Rated
Debug
(
30
,
"current collection rates in proxy"
,
zap
.
String
(
"rateType"
,
r
.
Rt
.
String
()),
zap
.
String
(
"rateType"
,
r
.
Rt
.
String
()),
zap
.
String
(
"rateLimit"
,
ratelimitutil
.
Limit
(
r
.
GetR
())
.
String
()),
zap
.
String
(
"rateLimit"
,
ratelimitutil
.
Limit
(
r
.
GetR
())
.
String
()),
)
)
...
@@ -314,7 +314,7 @@ func (rl *rateLimiter) registerLimiters(globalLevel bool) {
...
@@ -314,7 +314,7 @@ func (rl *rateLimiter) registerLimiters(globalLevel bool) {
limit
:=
ratelimitutil
.
Limit
(
r
)
limit
:=
ratelimitutil
.
Limit
(
r
)
burst
:=
r
// use rate as burst, because Limiter is with punishment mechanism, burst is insignificant.
burst
:=
r
// use rate as burst, because Limiter is with punishment mechanism, burst is insignificant.
rl
.
limiters
.
GetOrInsert
(
internalpb
.
RateType
(
rt
),
ratelimitutil
.
NewLimiter
(
limit
,
burst
))
rl
.
limiters
.
GetOrInsert
(
internalpb
.
RateType
(
rt
),
ratelimitutil
.
NewLimiter
(
limit
,
burst
))
log
.
Info
(
"RateLimiter register for rateType"
,
log
.
RatedDebug
(
30
,
"RateLimiter register for rateType"
,
zap
.
String
(
"rateType"
,
internalpb
.
RateType_name
[
rt
]),
zap
.
String
(
"rateType"
,
internalpb
.
RateType_name
[
rt
]),
zap
.
String
(
"rate"
,
ratelimitutil
.
Limit
(
r
)
.
String
()),
zap
.
String
(
"rate"
,
ratelimitutil
.
Limit
(
r
)
.
String
()),
zap
.
String
(
"burst"
,
fmt
.
Sprintf
(
"%v"
,
burst
)))
zap
.
String
(
"burst"
,
fmt
.
Sprintf
(
"%v"
,
burst
)))
...
...
internal/rootcoord/quota_center.go
浏览文件 @
8729f82e
...
@@ -285,7 +285,7 @@ func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, collections
...
@@ -285,7 +285,7 @@ func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, collections
q
.
currentRates
[
collection
][
internalpb
.
RateType_DMLBulkLoad
]
=
0
q
.
currentRates
[
collection
][
internalpb
.
RateType_DMLBulkLoad
]
=
0
q
.
quotaStates
[
collection
][
milvuspb
.
QuotaState_DenyToWrite
]
=
errorCode
q
.
quotaStates
[
collection
][
milvuspb
.
QuotaState_DenyToWrite
]
=
errorCode
}
}
log
.
Warn
(
"QuotaCenter force to deny writing"
,
log
.
RatedWarn
(
10
,
"QuotaCenter force to deny writing"
,
zap
.
Int64s
(
"collectionIDs"
,
collections
),
zap
.
Int64s
(
"collectionIDs"
,
collections
),
zap
.
String
(
"reason"
,
errorCode
.
String
()))
zap
.
String
(
"reason"
,
errorCode
.
String
()))
}
}
...
@@ -334,6 +334,7 @@ func (q *QuotaCenter) guaranteeMinRate(minRate float64, rateType internalpb.Rate
...
@@ -334,6 +334,7 @@ func (q *QuotaCenter) guaranteeMinRate(minRate float64, rateType internalpb.Rate
// calculateReadRates calculates and sets dql rates.
// calculateReadRates calculates and sets dql rates.
func
(
q
*
QuotaCenter
)
calculateReadRates
()
{
func
(
q
*
QuotaCenter
)
calculateReadRates
()
{
log
:=
log
.
Ctx
(
context
.
Background
())
.
WithRateGroup
(
"rootcoord.QuotaCenter"
,
1.0
,
60.0
)
if
Params
.
QuotaConfig
.
ForceDenyReading
{
if
Params
.
QuotaConfig
.
ForceDenyReading
{
q
.
forceDenyReading
(
commonpb
.
ErrorCode_ForceDeny
)
q
.
forceDenyReading
(
commonpb
.
ErrorCode_ForceDeny
)
return
return
...
@@ -393,13 +394,13 @@ func (q *QuotaCenter) calculateReadRates() {
...
@@ -393,13 +394,13 @@ func (q *QuotaCenter) calculateReadRates() {
for
_
,
collection
:=
range
collections
{
for
_
,
collection
:=
range
collections
{
if
q
.
currentRates
[
collection
][
internalpb
.
RateType_DQLSearch
]
!=
Inf
&&
realTimeSearchRate
>
0
{
if
q
.
currentRates
[
collection
][
internalpb
.
RateType_DQLSearch
]
!=
Inf
&&
realTimeSearchRate
>
0
{
q
.
currentRates
[
collection
][
internalpb
.
RateType_DQLSearch
]
=
Limit
(
realTimeSearchRate
*
coolOffSpeed
)
q
.
currentRates
[
collection
][
internalpb
.
RateType_DQLSearch
]
=
Limit
(
realTimeSearchRate
*
coolOffSpeed
)
log
.
Warn
(
"QuotaCenter cool read rates off done"
,
log
.
RatedWarn
(
10
,
"QuotaCenter cool read rates off done"
,
zap
.
Int64
(
"collectionID"
,
collection
),
zap
.
Int64
(
"collectionID"
,
collection
),
zap
.
Any
(
"searchRate"
,
q
.
currentRates
[
collection
][
internalpb
.
RateType_DQLSearch
]))
zap
.
Any
(
"searchRate"
,
q
.
currentRates
[
collection
][
internalpb
.
RateType_DQLSearch
]))
}
}
if
q
.
currentRates
[
collection
][
internalpb
.
RateType_DQLQuery
]
!=
Inf
&&
realTimeQueryRate
>
0
{
if
q
.
currentRates
[
collection
][
internalpb
.
RateType_DQLQuery
]
!=
Inf
&&
realTimeQueryRate
>
0
{
q
.
currentRates
[
collection
][
internalpb
.
RateType_DQLQuery
]
=
Limit
(
realTimeQueryRate
*
coolOffSpeed
)
q
.
currentRates
[
collection
][
internalpb
.
RateType_DQLQuery
]
=
Limit
(
realTimeQueryRate
*
coolOffSpeed
)
log
.
Warn
(
"QuotaCenter cool read rates off done"
,
log
.
RatedWarn
(
10
,
"QuotaCenter cool read rates off done"
,
zap
.
Int64
(
"collectionID"
,
collection
),
zap
.
Int64
(
"collectionID"
,
collection
),
zap
.
Any
(
"queryRate"
,
q
.
currentRates
[
collection
][
internalpb
.
RateType_DQLQuery
]))
zap
.
Any
(
"queryRate"
,
q
.
currentRates
[
collection
][
internalpb
.
RateType_DQLQuery
]))
}
}
...
@@ -407,8 +408,6 @@ func (q *QuotaCenter) calculateReadRates() {
...
@@ -407,8 +408,6 @@ func (q *QuotaCenter) calculateReadRates() {
q
.
guaranteeMinRate
(
Params
.
QuotaConfig
.
DQLMinSearchRatePerCollection
,
internalpb
.
RateType_DQLSearch
,
collections
...
)
q
.
guaranteeMinRate
(
Params
.
QuotaConfig
.
DQLMinSearchRatePerCollection
,
internalpb
.
RateType_DQLSearch
,
collections
...
)
q
.
guaranteeMinRate
(
Params
.
QuotaConfig
.
DQLMinQueryRatePerCollection
,
internalpb
.
RateType_DQLQuery
,
collections
...
)
q
.
guaranteeMinRate
(
Params
.
QuotaConfig
.
DQLMinQueryRatePerCollection
,
internalpb
.
RateType_DQLQuery
,
collections
...
)
log
.
Info
(
"QueryNodeMetrics when cool-off"
,
zap
.
Any
(
"metrics"
,
q
.
queryNodeMetrics
))
}
}
// TODO: unify search and query?
// TODO: unify search and query?
...
@@ -419,7 +418,6 @@ func (q *QuotaCenter) calculateReadRates() {
...
@@ -419,7 +418,6 @@ func (q *QuotaCenter) calculateReadRates() {
// calculateWriteRates calculates and sets dml rates.
// calculateWriteRates calculates and sets dml rates.
func
(
q
*
QuotaCenter
)
calculateWriteRates
()
error
{
func
(
q
*
QuotaCenter
)
calculateWriteRates
()
error
{
log
:=
log
.
Ctx
(
context
.
Background
())
.
WithRateGroup
(
"rootcoord.QuotaCenter"
,
1.0
,
60.0
)
if
Params
.
QuotaConfig
.
ForceDenyWriting
{
if
Params
.
QuotaConfig
.
ForceDenyWriting
{
q
.
forceDenyWriting
(
commonpb
.
ErrorCode_ForceDeny
)
q
.
forceDenyWriting
(
commonpb
.
ErrorCode_ForceDeny
)
return
nil
return
nil
...
@@ -468,9 +466,6 @@ func (q *QuotaCenter) calculateWriteRates() error {
...
@@ -468,9 +466,6 @@ func (q *QuotaCenter) calculateWriteRates() error {
}
}
q
.
guaranteeMinRate
(
Params
.
QuotaConfig
.
DMLMinInsertRatePerCollection
,
internalpb
.
RateType_DMLInsert
)
q
.
guaranteeMinRate
(
Params
.
QuotaConfig
.
DMLMinInsertRatePerCollection
,
internalpb
.
RateType_DMLInsert
)
q
.
guaranteeMinRate
(
Params
.
QuotaConfig
.
DMLMinDeleteRatePerCollection
,
internalpb
.
RateType_DMLDelete
)
q
.
guaranteeMinRate
(
Params
.
QuotaConfig
.
DMLMinDeleteRatePerCollection
,
internalpb
.
RateType_DMLDelete
)
log
.
RatedDebug
(
10
,
"QuotaCenter cool write rates off done"
,
zap
.
Int64
(
"collectionID"
,
collection
),
zap
.
Float64
(
"factor"
,
factor
))
}
}
return
nil
return
nil
...
@@ -738,7 +733,7 @@ func (q *QuotaCenter) checkDiskQuota() {
...
@@ -738,7 +733,7 @@ func (q *QuotaCenter) checkDiskQuota() {
colDiskQuota
:=
Params
.
QuotaConfig
.
DiskQuotaPerCollection
colDiskQuota
:=
Params
.
QuotaConfig
.
DiskQuotaPerCollection
for
collection
,
binlogSize
:=
range
q
.
dataCoordMetrics
.
CollectionBinlogSize
{
for
collection
,
binlogSize
:=
range
q
.
dataCoordMetrics
.
CollectionBinlogSize
{
if
float64
(
binlogSize
)
>=
colDiskQuota
{
if
float64
(
binlogSize
)
>=
colDiskQuota
{
log
.
Warn
(
"collection disk quota exceeded"
,
log
.
RatedWarn
(
10
,
"collection disk quota exceeded"
,
zap
.
Int64
(
"collection"
,
collection
),
zap
.
Int64
(
"collection"
,
collection
),
zap
.
Int64
(
"coll disk usage"
,
binlogSize
),
zap
.
Int64
(
"coll disk usage"
,
binlogSize
),
zap
.
Float64
(
"coll disk quota"
,
colDiskQuota
))
zap
.
Float64
(
"coll disk quota"
,
colDiskQuota
))
...
@@ -750,7 +745,7 @@ func (q *QuotaCenter) checkDiskQuota() {
...
@@ -750,7 +745,7 @@ func (q *QuotaCenter) checkDiskQuota() {
}
}
total
:=
q
.
dataCoordMetrics
.
TotalBinlogSize
total
:=
q
.
dataCoordMetrics
.
TotalBinlogSize
if
float64
(
total
)
>=
totalDiskQuota
{
if
float64
(
total
)
>=
totalDiskQuota
{
log
.
Warn
(
"total disk quota exceeded"
,
log
.
RatedWarn
(
10
,
"total disk quota exceeded"
,
zap
.
Int64
(
"total disk usage"
,
total
),
zap
.
Int64
(
"total disk usage"
,
total
),
zap
.
Float64
(
"total disk quota"
,
totalDiskQuota
))
zap
.
Float64
(
"total disk quota"
,
totalDiskQuota
))
q
.
forceDenyWriting
(
commonpb
.
ErrorCode_DiskQuotaExhausted
)
q
.
forceDenyWriting
(
commonpb
.
ErrorCode_DiskQuotaExhausted
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录