未验证 提交 7c502601 编写于 作者: W wei liu 提交者: GitHub

skip balance on loading collection (#20507)

Signed-off-by: NWei Liu <wei.liu@zilliz.com>
Signed-off-by: NWei Liu <wei.liu@zilliz.com>
上级 8f13fcd6
......@@ -19,9 +19,11 @@ package balance
import (
"sort"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/samber/lo"
)
type RowCountBasedBalancer struct {
......@@ -81,8 +83,13 @@ func (b *RowCountBasedBalancer) convertToNodeItems(nodeIDs []int64) []*nodeItem
func (b *RowCountBasedBalancer) Balance() ([]SegmentAssignPlan, []ChannelAssignPlan) {
ids := b.meta.CollectionManager.GetAll()
// loading collection should skip balance
loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool {
return b.meta.GetStatus(cid) == querypb.LoadStatus_Loaded
})
segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0)
for _, cid := range ids {
for _, cid := range loadedCollections {
replicas := b.meta.ReplicaManager.GetByCollection(cid)
for _, replica := range replicas {
splans, cplans := b.balanceReplica(replica)
......
......@@ -21,6 +21,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
......@@ -144,7 +145,52 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer
balancer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
collection := utils.CreateTestCollection(1, 1)
collection.LoadPercentage = 100
collection.Status = querypb.LoadStatus_Loaded
balancer.meta.CollectionManager.PutCollection(collection)
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, c.nodes))
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
segmentPlans, channelPlans := balancer.Balance()
suite.Empty(channelPlans)
suite.ElementsMatch(c.expectPlans, segmentPlans)
})
}
}
func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnLoadingCollection() {
cases := []struct {
name string
nodes []int64
distributions map[int64][]*meta.Segment
expectPlans []SegmentAssignPlan
}{
{
name: "normal balance",
nodes: []int64{1, 2},
distributions: map[int64][]*meta.Segment{
1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}},
2: {
{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2},
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2},
},
},
expectPlans: []SegmentAssignPlan{},
},
}
for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer
collection := utils.CreateTestCollection(1, 1)
collection.LoadPercentage = 100
collection.Status = querypb.LoadStatus_Loading
balancer.meta.CollectionManager.PutCollection(collection)
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, c.nodes))
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册