Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
6534396b
M
milvus
项目概览
milvus
/
milvus
10 个月 前同步成功
通知
260
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
6534396b
编写于
7月 19, 2023
作者:
W
wei liu
提交者:
GitHub
7月 19, 2023
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enable config different interval for different checker (#25514)
Signed-off-by:
N
Wei Liu
<
wei.liu@zilliz.com
>
上级
9a4761dc
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
265 addition
and
58 deletion
+265
-58
internal/querycoordv2/checkers/controller.go
internal/querycoordv2/checkers/controller.go
+89
-52
internal/querycoordv2/checkers/controller_test.go
internal/querycoordv2/checkers/controller_test.go
+132
-0
pkg/util/paramtable/component_param.go
pkg/util/paramtable/component_param.go
+40
-6
pkg/util/paramtable/component_param_test.go
pkg/util/paramtable/component_param_test.go
+4
-0
未找到文件。
internal/querycoordv2/checkers/controller.go
浏览文件 @
6534396b
...
...
@@ -27,24 +27,31 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/pkg/log"
"go.uber.org/zap"
)
var
(
checkRoundTaskNumLimit
=
256
)
var
(
Segment_Checker
=
"segment_checker"
Channel_Checker
=
"channel_checker"
Balance_Checker
=
"balance_checker"
)
type
CheckerController
struct
{
stopCh
chan
struct
{}
checkCh
chan
struct
{}
meta
*
meta
.
Meta
dist
*
meta
.
DistributionManager
targetMgr
*
meta
.
TargetManager
broker
*
meta
.
CoordinatorBroker
nodeMgr
*
session
.
NodeManager
balancer
balance
.
Balance
stopCh
chan
struct
{}
manualCheckChs
map
[
string
]
chan
struct
{}
meta
*
meta
.
Meta
dist
*
meta
.
DistributionManager
targetMgr
*
meta
.
TargetManager
broker
*
meta
.
CoordinatorBroker
nodeMgr
*
session
.
NodeManager
balancer
balance
.
Balance
scheduler
task
.
Scheduler
checkers
[
]
Checker
checkers
map
[
string
]
Checker
stopOnce
sync
.
Once
}
...
...
@@ -59,50 +66,80 @@ func NewCheckerController(
// CheckerController runs checkers with the order,
// the former checker has higher priority
checkers
:=
[
]
Checker
{
NewChannelChecker
(
meta
,
dist
,
targetMgr
,
balancer
),
NewSegmentChecker
(
meta
,
dist
,
targetMgr
,
balancer
,
nodeMgr
),
NewBalanceChecker
(
meta
,
balancer
,
nodeMgr
,
scheduler
),
checkers
:=
map
[
string
]
Checker
{
Channel_Checker
:
NewChannelChecker
(
meta
,
dist
,
targetMgr
,
balancer
),
Segment_Checker
:
NewSegmentChecker
(
meta
,
dist
,
targetMgr
,
balancer
,
nodeMgr
),
Balance_Checker
:
NewBalanceChecker
(
meta
,
balancer
,
nodeMgr
,
scheduler
),
}
for
i
,
checker
:=
range
checkers
{
checker
.
SetID
(
int64
(
i
+
1
))
id
:=
0
for
_
,
checker
:=
range
checkers
{
checker
.
SetID
(
int64
(
id
+
1
))
}
manualCheckChs
:=
map
[
string
]
chan
struct
{}{
Channel_Checker
:
make
(
chan
struct
{},
1
),
Segment_Checker
:
make
(
chan
struct
{},
1
),
Balance_Checker
:
make
(
chan
struct
{},
1
),
}
return
&
CheckerController
{
stopCh
:
make
(
chan
struct
{}),
checkCh
:
make
(
chan
struct
{},
1
)
,
meta
:
meta
,
dist
:
dist
,
targetMgr
:
targetMgr
,
scheduler
:
scheduler
,
checkers
:
checkers
,
stopCh
:
make
(
chan
struct
{}),
manualCheckChs
:
manualCheckChs
,
meta
:
meta
,
dist
:
dist
,
targetMgr
:
targetMgr
,
scheduler
:
scheduler
,
checkers
:
checkers
,
}
}
func
(
controller
*
CheckerController
)
Start
(
ctx
context
.
Context
)
{
go
func
()
{
ticker
:=
time
.
NewTicker
(
Params
.
QueryCoordCfg
.
CheckInterval
.
GetAsDuration
(
time
.
Millisecond
))
defer
ticker
.
Stop
()
for
{
select
{
case
<-
ctx
.
Done
()
:
log
.
Info
(
"CheckerController stopped due to context canceled"
)
return
case
<-
controller
.
stopCh
:
log
.
Info
(
"CheckerController stopped"
)
return
case
<-
ticker
.
C
:
controller
.
check
(
ctx
)
case
<-
controller
.
checkCh
:
ticker
.
Stop
()
controller
.
check
(
ctx
)
ticker
.
Reset
(
Params
.
QueryCoordCfg
.
CheckInterval
.
GetAsDuration
(
time
.
Millisecond
))
}
for
checkerType
:=
range
controller
.
checkers
{
go
controller
.
StartChecker
(
ctx
,
checkerType
)
}
}
func
getCheckerInterval
(
checkerType
string
)
time
.
Duration
{
switch
checkerType
{
case
Segment_Checker
:
return
Params
.
QueryCoordCfg
.
SegmentCheckInterval
.
GetAsDuration
(
time
.
Millisecond
)
case
Channel_Checker
:
return
Params
.
QueryCoordCfg
.
ChannelCheckInterval
.
GetAsDuration
(
time
.
Millisecond
)
case
Balance_Checker
:
return
Params
.
QueryCoordCfg
.
BalanceCheckInterval
.
GetAsDuration
(
time
.
Millisecond
)
default
:
return
Params
.
QueryCoordCfg
.
CheckInterval
.
GetAsDuration
(
time
.
Millisecond
)
}
}
func
(
controller
*
CheckerController
)
StartChecker
(
ctx
context
.
Context
,
checkerType
string
)
{
interval
:=
getCheckerInterval
(
checkerType
)
ticker
:=
time
.
NewTicker
(
interval
)
defer
ticker
.
Stop
()
for
{
select
{
case
<-
ctx
.
Done
()
:
log
.
Info
(
"Checker stopped due to context canceled"
,
zap
.
String
(
"type"
,
checkerType
))
return
case
<-
controller
.
stopCh
:
log
.
Info
(
"Checker stopped"
,
zap
.
String
(
"type"
,
checkerType
))
return
case
<-
ticker
.
C
:
controller
.
check
(
ctx
,
checkerType
)
case
<-
controller
.
manualCheckChs
[
checkerType
]
:
ticker
.
Stop
()
controller
.
check
(
ctx
,
checkerType
)
ticker
.
Reset
(
Params
.
QueryCoordCfg
.
CheckInterval
.
GetAsDuration
(
time
.
Millisecond
))
}
}
()
}
}
func
(
controller
*
CheckerController
)
Stop
()
{
...
...
@@ -112,18 +149,18 @@ func (controller *CheckerController) Stop() {
}
func
(
controller
*
CheckerController
)
Check
()
{
select
{
case
controller
.
checkCh
<-
struct
{}{}
:
default
:
for
_
,
checkCh
:=
range
controller
.
manualCheckChs
{
select
{
case
checkCh
<-
struct
{}{}
:
default
:
}
}
}
// check is the real implementation of Check
func
(
controller
*
CheckerController
)
check
(
ctx
context
.
Context
)
{
tasks
:=
make
([]
task
.
Task
,
0
)
for
_
,
checker
:=
range
controller
.
checkers
{
tasks
=
append
(
tasks
,
checker
.
Check
(
ctx
)
...
)
}
func
(
controller
*
CheckerController
)
check
(
ctx
context
.
Context
,
checkerType
string
)
{
checker
:=
controller
.
checkers
[
checkerType
]
tasks
:=
checker
.
Check
(
ctx
)
for
_
,
task
:=
range
tasks
{
err
:=
controller
.
scheduler
.
Add
(
task
)
...
...
internal/querycoordv2/checkers/controller_test.go
0 → 100644
浏览文件 @
6534396b
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
checkers
import
(
"context"
"testing"
"time"
"github.com/milvus-io/milvus/internal/kv"
etcdkv
"github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
.
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
)
type
CheckerControllerSuite
struct
{
suite
.
Suite
kv
kv
.
MetaKv
meta
*
meta
.
Meta
broker
*
meta
.
MockBroker
nodeMgr
*
session
.
NodeManager
dist
*
meta
.
DistributionManager
targetManager
*
meta
.
TargetManager
scheduler
*
task
.
MockScheduler
balancer
*
balance
.
MockBalancer
controller
*
CheckerController
}
func
(
suite
*
CheckerControllerSuite
)
SetupSuite
()
{
Params
.
Init
()
}
func
(
suite
*
CheckerControllerSuite
)
SetupTest
()
{
var
err
error
config
:=
GenerateEtcdConfig
()
cli
,
err
:=
etcd
.
GetEtcdClient
(
config
.
UseEmbedEtcd
.
GetAsBool
(),
config
.
EtcdUseSSL
.
GetAsBool
(),
config
.
Endpoints
.
GetAsStrings
(),
config
.
EtcdTLSCert
.
GetValue
(),
config
.
EtcdTLSKey
.
GetValue
(),
config
.
EtcdTLSCACert
.
GetValue
(),
config
.
EtcdTLSMinVersion
.
GetValue
())
suite
.
Require
()
.
NoError
(
err
)
suite
.
kv
=
etcdkv
.
NewEtcdKV
(
cli
,
config
.
MetaRootPath
.
GetValue
())
// meta
store
:=
meta
.
NewMetaStore
(
suite
.
kv
)
idAllocator
:=
RandomIncrementIDAllocator
()
suite
.
nodeMgr
=
session
.
NewNodeManager
()
suite
.
meta
=
meta
.
NewMeta
(
idAllocator
,
store
,
suite
.
nodeMgr
)
suite
.
dist
=
meta
.
NewDistributionManager
()
suite
.
broker
=
meta
.
NewMockBroker
(
suite
.
T
())
suite
.
targetManager
=
meta
.
NewTargetManager
(
suite
.
broker
,
suite
.
meta
)
suite
.
balancer
=
balance
.
NewMockBalancer
(
suite
.
T
())
suite
.
scheduler
=
task
.
NewMockScheduler
(
suite
.
T
())
suite
.
controller
=
NewCheckerController
(
suite
.
meta
,
suite
.
dist
,
suite
.
targetManager
,
suite
.
balancer
,
suite
.
nodeMgr
,
suite
.
scheduler
)
}
func
(
suite
*
CheckerControllerSuite
)
TestBasic
()
{
// set meta
suite
.
meta
.
CollectionManager
.
PutCollection
(
utils
.
CreateTestCollection
(
1
,
1
))
suite
.
meta
.
ReplicaManager
.
Put
(
utils
.
CreateTestReplica
(
1
,
1
,
[]
int64
{
1
,
2
}))
suite
.
nodeMgr
.
Add
(
session
.
NewNodeInfo
(
1
,
"localhost"
))
suite
.
nodeMgr
.
Add
(
session
.
NewNodeInfo
(
2
,
"localhost"
))
suite
.
meta
.
ResourceManager
.
AssignNode
(
meta
.
DefaultResourceGroupName
,
1
)
suite
.
meta
.
ResourceManager
.
AssignNode
(
meta
.
DefaultResourceGroupName
,
2
)
// set target
segments
:=
[]
*
datapb
.
SegmentInfo
{
{
ID
:
1
,
PartitionID
:
1
,
InsertChannel
:
"test-insert-channel"
,
},
}
suite
.
broker
.
EXPECT
()
.
GetRecoveryInfoV2
(
mock
.
Anything
,
int64
(
1
))
.
Return
(
nil
,
segments
,
nil
)
suite
.
targetManager
.
UpdateCollectionNextTargetWithPartitions
(
int64
(
1
),
int64
(
1
))
// set dist
suite
.
dist
.
ChannelDistManager
.
Update
(
2
,
utils
.
CreateTestChannel
(
1
,
2
,
1
,
"test-insert-channel"
))
suite
.
dist
.
LeaderViewManager
.
Update
(
2
,
utils
.
CreateTestLeaderView
(
2
,
1
,
"test-insert-channel"
,
map
[
int64
]
int64
{},
map
[
int64
]
*
meta
.
Segment
{}))
counter
:=
atomic
.
NewInt64
(
0
)
suite
.
scheduler
.
EXPECT
()
.
Add
(
mock
.
Anything
)
.
Run
(
func
(
task
task
.
Task
)
{
counter
.
Inc
()
})
.
Return
(
nil
)
suite
.
scheduler
.
EXPECT
()
.
GetSegmentTaskNum
()
.
Return
(
0
)
.
Maybe
()
suite
.
scheduler
.
EXPECT
()
.
GetChannelTaskNum
()
.
Return
(
0
)
.
Maybe
()
suite
.
balancer
.
EXPECT
()
.
AssignSegment
(
mock
.
Anything
,
mock
.
Anything
,
mock
.
Anything
)
.
Return
(
nil
)
suite
.
balancer
.
EXPECT
()
.
AssignChannel
(
mock
.
Anything
,
mock
.
Anything
)
.
Return
(
nil
)
ctx
:=
context
.
Background
()
suite
.
controller
.
Start
(
ctx
)
defer
suite
.
controller
.
Stop
()
suite
.
Eventually
(
func
()
bool
{
suite
.
controller
.
Check
()
return
counter
.
Load
()
>
0
},
5
*
time
.
Second
,
1
*
time
.
Second
)
}
func
TestCheckControllerSuite
(
t
*
testing
.
T
)
{
suite
.
Run
(
t
,
new
(
CheckerControllerSuite
))
}
pkg/util/paramtable/component_param.go
浏览文件 @
6534396b
...
...
@@ -1152,16 +1152,23 @@ type queryCoordConfig struct {
OverloadedMemoryThresholdPercentage
ParamItem
`refreshable:"true"`
BalanceIntervalSeconds
ParamItem
`refreshable:"true"`
MemoryUsageMaxDifferencePercentage
ParamItem
`refreshable:"true"`
CheckInterval
ParamItem
`refreshable:"true"`
ChannelTaskTimeout
ParamItem
`refreshable:"true"`
SegmentTaskTimeout
ParamItem
`refreshable:"true"`
DistPullInterval
ParamItem
`refreshable:"false"`
HeartbeatAvailableInterval
ParamItem
`refreshable:"true"`
LoadTimeoutSeconds
ParamItem
`refreshable:"true"`
SegmentCheckInterval
ParamItem
`refreshable:"true"`
ChannelCheckInterval
ParamItem
`refreshable:"true"`
BalanceCheckInterval
ParamItem
`refreshable:"true"`
ChannelTaskTimeout
ParamItem
`refreshable:"true"`
SegmentTaskTimeout
ParamItem
`refreshable:"true"`
DistPullInterval
ParamItem
`refreshable:"false"`
HeartbeatAvailableInterval
ParamItem
`refreshable:"true"`
LoadTimeoutSeconds
ParamItem
`refreshable:"true"`
// Deprecated: Since 2.2.2, QueryCoord do not use HandOff logic anymore
CheckHandoffInterval
ParamItem
`refreshable:"true"`
EnableActiveStandby
ParamItem
`refreshable:"false"`
// Deprecated: Since 2.2.2, use different interval for different checker
CheckInterval
ParamItem
`refreshable:"true"`
NextTargetSurviveTime
ParamItem
`refreshable:"true"`
UpdateNextTargetInterval
ParamItem
`refreshable:"false"`
CheckNodeInReplicaInterval
ParamItem
`refreshable:"false"`
...
...
@@ -1300,6 +1307,33 @@ func (p *queryCoordConfig) init(base *BaseTable) {
}
p
.
CheckInterval
.
Init
(
base
.
mgr
)
p
.
SegmentCheckInterval
=
ParamItem
{
Key
:
"queryCoord.checkSegmentInterval"
,
Version
:
"2.3.0"
,
DefaultValue
:
"1000"
,
PanicIfEmpty
:
true
,
Export
:
true
,
}
p
.
SegmentCheckInterval
.
Init
(
base
.
mgr
)
p
.
ChannelCheckInterval
=
ParamItem
{
Key
:
"queryCoord.checkChannelInterval"
,
Version
:
"2.3.0"
,
DefaultValue
:
"1000"
,
PanicIfEmpty
:
true
,
Export
:
true
,
}
p
.
ChannelCheckInterval
.
Init
(
base
.
mgr
)
p
.
BalanceCheckInterval
=
ParamItem
{
Key
:
"queryCoord.checkChannelInterval"
,
Version
:
"2.3.0"
,
DefaultValue
:
"10000"
,
PanicIfEmpty
:
true
,
Export
:
true
,
}
p
.
BalanceCheckInterval
.
Init
(
base
.
mgr
)
p
.
ChannelTaskTimeout
=
ParamItem
{
Key
:
"queryCoord.channelTaskTimeout"
,
Version
:
"2.0.0"
,
...
...
pkg/util/paramtable/component_param_test.go
浏览文件 @
6534396b
...
...
@@ -292,6 +292,10 @@ func TestComponentParam(t *testing.T) {
assert
.
Equal
(
t
,
1.3
,
Params
.
ReverseUnbalanceTolerationFactor
.
GetAsFloat
())
params
.
Save
(
"queryCoord.reverseUnBalanceTolerationFactor"
,
"1.5"
)
assert
.
Equal
(
t
,
1.5
,
Params
.
ReverseUnbalanceTolerationFactor
.
GetAsFloat
())
assert
.
Equal
(
t
,
1000
,
Params
.
SegmentCheckInterval
.
GetAsInt
())
assert
.
Equal
(
t
,
1000
,
Params
.
ChannelCheckInterval
.
GetAsInt
())
assert
.
Equal
(
t
,
10000
,
Params
.
BalanceCheckInterval
.
GetAsInt
())
})
t
.
Run
(
"test queryNodeConfig"
,
func
(
t
*
testing
.
T
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录