diff --git a/internal/querycoord/group_balance.go b/internal/querycoord/group_balance.go index cb9c535d50c0be3740c6a995553ab24824960e5d..2685c70adf4df5a1a915735c8e47a5bd28f0aed5 100644 --- a/internal/querycoord/group_balance.go +++ b/internal/querycoord/group_balance.go @@ -1,17 +1,23 @@ package querycoord -import "sort" +import ( + "sort" +) -type balancer interface { - addNode(nodeID int64) ([]*balancePlan, error) - removeNode(nodeID int64) []*balancePlan - rebalance() []*balancePlan +type Balancer interface { + AddNode(nodeID int64) ([]*balancePlan, error) + RemoveNode(nodeID int64) []*balancePlan + Rebalance() []*balancePlan } +// Plan for adding/removing node from replica, +// adds node into targetReplica, +// removes node from sourceReplica. +// Set the replica ID to invalidReplicaID to avoid adding/removing into/from replica type balancePlan struct { - nodeID int64 - sourceReplica int64 - targetReplica int64 + nodes []UniqueID + sourceReplica UniqueID + targetReplica UniqueID } type replicaBalancer struct { @@ -23,7 +29,7 @@ func newReplicaBalancer(meta Meta, cluster Cluster) *replicaBalancer { return &replicaBalancer{meta, cluster} } -func (b *replicaBalancer) addNode(nodeID int64) ([]*balancePlan, error) { +func (b *replicaBalancer) AddNode(nodeID int64) ([]*balancePlan, error) { // allocate this node to all collections replicas var ret []*balancePlan collections := b.meta.showCollections() @@ -36,6 +42,25 @@ func (b *replicaBalancer) addNode(nodeID int64) ([]*balancePlan, error) { continue } + foundNode := false + for _, replica := range replicas { + for _, replicaNode := range replica.NodeIds { + if replicaNode == nodeID { + foundNode = true + break + } + } + + if foundNode { + break + } + } + + // This node is serving this collection + if foundNode { + continue + } + replicaAvailableMemory := make(map[UniqueID]uint64, len(replicas)) for _, replica := range replicas { replicaAvailableMemory[replica.ReplicaID] = getReplicaAvailableMemory(b.cluster, replica) @@ -48,7 +73,7 @@ func (b *replicaBalancer) addNode(nodeID int64) ([]*balancePlan, error) { }) ret = append(ret, &balancePlan{ - nodeID: nodeID, + nodes: []UniqueID{nodeID}, sourceReplica: invalidReplicaID, targetReplica: replicas[0].GetReplicaID(), }) @@ -56,11 +81,11 @@ func (b *replicaBalancer) addNode(nodeID int64) ([]*balancePlan, error) { return ret, nil } -func (b *replicaBalancer) removeNode(nodeID int64) []*balancePlan { +func (b *replicaBalancer) RemoveNode(nodeID int64) []*balancePlan { // for this version, querynode does not support move from a replica to another return nil } -func (b *replicaBalancer) rebalance() []*balancePlan { +func (b *replicaBalancer) Rebalance() []*balancePlan { return nil } diff --git a/internal/querycoord/group_balance_test.go b/internal/querycoord/group_balance_test.go new file mode 100644 index 0000000000000000000000000000000000000000..35cf2e0b0f56a9cdf19cb804edb33b6d1420812a --- /dev/null +++ b/internal/querycoord/group_balance_test.go @@ -0,0 +1,54 @@ +package querycoord + +import ( + "context" + "testing" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/stretchr/testify/assert" +) + +func TestAddNode(t *testing.T) { + defer removeAllSession() + + ctx := context.Background() + coord, err := startQueryCoord(ctx) + assert.NoError(t, err) + defer coord.Stop() + + node1, err := startQueryNodeServer(ctx) + assert.NoError(t, err) + defer node1.stop() + node2, err := startQueryNodeServer(ctx) + assert.NoError(t, err) + defer node2.stop() + waitQueryNodeOnline(coord.cluster, node1.queryNodeID) + waitQueryNodeOnline(coord.cluster, node2.queryNodeID) + + loadCollectionReq := &querypb.LoadCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_LoadCollection, + }, + CollectionID: defaultCollectionID, + Schema: genDefaultCollectionSchema(false), + ReplicaNumber: 1, + } + status, err := coord.LoadCollection(ctx, loadCollectionReq) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) + waitLoadCollectionDone(ctx, coord, defaultCollectionID) + + plans, err := coord.groupBalancer.AddNode(node1.queryNodeID) + assert.NoError(t, err) + assert.Equal(t, 0, len(plans)) + + plans, err = coord.groupBalancer.AddNode(node2.queryNodeID) + assert.NoError(t, err) + assert.Equal(t, 0, len(plans)) + + newNodeID := node2.queryNodeID + 1 + plans, err = coord.groupBalancer.AddNode(newNodeID) + assert.NoError(t, err) + assert.Equal(t, 1, len(plans)) +} diff --git a/internal/querycoord/impl_test.go b/internal/querycoord/impl_test.go index 97f7516af54def70fb59337931606bb8f98d9678..92041a3648a17b0f19218db9f52c78c1a0fc10de 100644 --- a/internal/querycoord/impl_test.go +++ b/internal/querycoord/impl_test.go @@ -81,7 +81,7 @@ func waitLoadCollectionDone(ctx context.Context, queryCoord *QueryCoord, collect return errors.New("showCollection failed") } - loadDone := true + loadDone := len(res.InMemoryPercentages) > 0 for _, percent := range res.InMemoryPercentages { if percent < 100 { loadDone = false @@ -90,6 +90,8 @@ func waitLoadCollectionDone(ctx context.Context, queryCoord *QueryCoord, collect if loadDone { break } + + time.Sleep(500 * time.Millisecond) } return nil diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 1d515be86bdf88bde363ffcade89520e6c440119..9df5b87d3b60e84e2ce7d2260399b312264428c8 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -97,7 +97,7 @@ type QueryCoord struct { factory dependency.Factory chunkManager storage.ChunkManager - groupBalancer balancer + groupBalancer Balancer } // Register register query service at etcd @@ -341,16 +341,37 @@ func (qc *QueryCoord) watchNodeLoop() { defer qc.loopWg.Done() log.Info("QueryCoord start watch node loop") - unallocatedNodes := qc.getUnallocatedNodes() - for _, n := range unallocatedNodes { - if err := qc.allocateNode(n); err != nil { - log.Warn("unable to allcoate node", zap.Int64("nodeID", n), zap.Error(err)) + onlineNodes := qc.cluster.OnlineNodeIDs() + for _, node := range onlineNodes { + if err := qc.allocateNode(node); err != nil { + log.Warn("unable to allcoate node", zap.Int64("nodeID", node), zap.Error(err)) } } go qc.loadBalanceNodeLoop(ctx) - for _, nodeID := range qc.cluster.OfflineNodeIDs() { - qc.offlineNodesChan <- nodeID + offlineNodes := make(typeutil.UniqueSet) + collections := qc.meta.showCollections() + for _, collection := range collections { + for _, replicaID := range collection.ReplicaIds { + replica, err := qc.meta.getReplicaByID(replicaID) + if err != nil { + log.Warn("failed to get replica", + zap.Int64("replicaID", replicaID), + zap.Error(err)) + continue + } + + for _, node := range replica.NodeIds { + ok, err := qc.cluster.IsOnline(node) + if err != nil || !ok { + offlineNodes.Insert(node) + } + } + } + } + + for node := range offlineNodes { + qc.offlineNodesChan <- node } // TODO silverxia add Rewatch logic @@ -359,7 +380,7 @@ func (qc *QueryCoord) watchNodeLoop() { } func (qc *QueryCoord) allocateNode(nodeID int64) error { - plans, err := qc.groupBalancer.addNode(nodeID) + plans, err := qc.groupBalancer.AddNode(nodeID) if err != nil { return err } @@ -371,22 +392,6 @@ func (qc *QueryCoord) allocateNode(nodeID int64) error { return nil } -func (qc *QueryCoord) getUnallocatedNodes() []int64 { - onlines := qc.cluster.OnlineNodeIDs() - var ret []int64 - for _, n := range onlines { - replica, err := qc.meta.getReplicasByNodeID(n) - if err != nil { - log.Warn("failed to get replica", zap.Int64("nodeID", n), zap.Error(err)) - continue - } - if replica == nil { - ret = append(ret, n) - } - } - return ret -} - func (qc *QueryCoord) handleNodeEvent(ctx context.Context) { for { select { @@ -439,6 +444,8 @@ func (qc *QueryCoord) handleNodeEvent(ctx context.Context) { } func (qc *QueryCoord) loadBalanceNodeLoop(ctx context.Context) { + const LoadBalanceRetryAfter = 100 * time.Millisecond + for { select { case <-ctx.Done(): @@ -463,13 +470,14 @@ func (qc *QueryCoord) loadBalanceNodeLoop(ctx context.Context) { meta: qc.meta, } qc.metricsCacheManager.InvalidateSystemInfoMetrics() - //TODO:: deal enqueue error + err := qc.scheduler.Enqueue(loadBalanceTask) if err != nil { log.Warn("failed to enqueue LoadBalance task into the scheduler", zap.Int64("nodeID", node), zap.Error(err)) qc.offlineNodesChan <- node + time.Sleep(LoadBalanceRetryAfter) continue } @@ -483,6 +491,7 @@ func (qc *QueryCoord) loadBalanceNodeLoop(ctx context.Context) { zap.Int64("nodeID", node), zap.Error(err)) qc.offlineNodesChan <- node + time.Sleep(LoadBalanceRetryAfter) continue } diff --git a/internal/querycoord/query_coord_test.go b/internal/querycoord/query_coord_test.go index 2402803ceba85ecf557da526c6f434e757542ec0..243de2a2ca9950161ad81326dfb8b1faf7489dbc 100644 --- a/internal/querycoord/query_coord_test.go +++ b/internal/querycoord/query_coord_test.go @@ -238,7 +238,7 @@ func TestWatchNodeLoop(t *testing.T) { err = removeNodeSession(nodeID) assert.Nil(t, err) - waitAllQueryNodeOffline(queryCoord.cluster, onlineNodeIDs) + waitAllQueryNodeOffline(queryCoord.cluster, onlineNodeIDs...) queryCoord.Stop() err = removeAllSession() @@ -620,6 +620,8 @@ func TestLoadBalanceSegmentLoop(t *testing.T) { if len(segmentInfos) > 0 { break } + + time.Sleep(time.Second) } queryCoord.Stop() diff --git a/internal/querycoord/querynode_test.go b/internal/querycoord/querynode_test.go index fd0c3672dd24eb34edeb0415fac2707af3e30592..c920b2f75d9f509bcc691af0c710872a52b3be97 100644 --- a/internal/querycoord/querynode_test.go +++ b/internal/querycoord/querynode_test.go @@ -65,7 +65,7 @@ func removeAllSession() error { return nil } -func waitAllQueryNodeOffline(cluster Cluster, nodeIDs []int64) bool { +func waitAllQueryNodeOffline(cluster Cluster, nodeIDs ...int64) bool { for { allOffline := true for _, nodeID := range nodeIDs { @@ -136,7 +136,7 @@ func TestQueryNode_MultiNode_stop(t *testing.T) { err = removeNodeSession(queryNode2.queryNodeID) assert.Nil(t, err) - waitAllQueryNodeOffline(queryCoord.cluster, onlineNodeIDs) + waitAllQueryNodeOffline(queryCoord.cluster, onlineNodeIDs...) queryCoord.Stop() err = removeAllSession() assert.Nil(t, err) @@ -182,7 +182,7 @@ func TestQueryNode_MultiNode_reStart(t *testing.T) { err = removeNodeSession(queryNode3.queryNodeID) assert.Nil(t, err) - waitAllQueryNodeOffline(queryCoord.cluster, onlineNodeIDs) + waitAllQueryNodeOffline(queryCoord.cluster, onlineNodeIDs...) queryCoord.Stop() err = removeAllSession() assert.Nil(t, err) diff --git a/internal/querycoord/replica.go b/internal/querycoord/replica.go index 3b4c16af3cccb22939520ae5f0e462ac5fb9350f..867c87ae7f0bbb4e15777becad048ffd06c23c59 100644 --- a/internal/querycoord/replica.go +++ b/internal/querycoord/replica.go @@ -169,12 +169,13 @@ func (rep *ReplicaInfos) ApplyBalancePlan(p *balancePlan, kv kv.MetaKv) error { // generate ReplicaInfo to save to MetaKv if sourceReplica != nil { // remove node from replica node list - removeNodeFromReplica(sourceReplica, p.nodeID) + sourceReplica.NodeIds = removeFromSlice(sourceReplica.NodeIds, p.nodes...) replicasChanged = append(replicasChanged, sourceReplica) } if targetReplica != nil { // add node to replica - targetReplica.NodeIds = append(targetReplica.NodeIds, p.nodeID) + targetReplica.NodeIds = append(targetReplica.NodeIds, p.nodes...) + targetReplica.NodeIds = uniqueSlice(targetReplica.NodeIds) replicasChanged = append(replicasChanged, targetReplica) } @@ -223,18 +224,6 @@ func (rep *ReplicaInfos) UpdateShardLeader(replicaID UniqueID, dmChannel string, return nil } -// removeNodeFromReplica helper function to remove nodeID from replica NodeIds list. -func removeNodeFromReplica(replica *milvuspb.ReplicaInfo, nodeID int64) *milvuspb.ReplicaInfo { - for i := 0; i < len(replica.NodeIds); i++ { - if replica.NodeIds[i] != nodeID { - continue - } - replica.NodeIds = append(replica.NodeIds[:i], replica.NodeIds[i+1:]...) - return replica - } - return replica -} - // save the replicas into etcd. func saveReplica(meta kv.MetaKv, replicas ...*milvuspb.ReplicaInfo) error { data := make(map[string]string) diff --git a/internal/querycoord/replica_test.go b/internal/querycoord/replica_test.go index 8cde1cf9ac31b5ecdcdf0314fcbb6600a1627a3f..59c3acd7e5a959bfccd0ba7617ded45d29959466 100644 --- a/internal/querycoord/replica_test.go +++ b/internal/querycoord/replica_test.go @@ -135,7 +135,7 @@ func TestReplicaInfos_ApplyBalancePlan(t *testing.T) { t.Run("source replica not exist", func(t *testing.T) { replicas := NewReplicaInfos() err := replicas.ApplyBalancePlan(&balancePlan{ - nodeID: 1, + nodes: []UniqueID{1}, sourceReplica: 1, targetReplica: invalidReplicaID, }, kv) @@ -145,7 +145,7 @@ func TestReplicaInfos_ApplyBalancePlan(t *testing.T) { t.Run("target replica not exist", func(t *testing.T) { replicas := NewReplicaInfos() err := replicas.ApplyBalancePlan(&balancePlan{ - nodeID: 1, + nodes: []UniqueID{1}, sourceReplica: invalidReplicaID, targetReplica: 1, }, kv) @@ -162,7 +162,7 @@ func TestReplicaInfos_ApplyBalancePlan(t *testing.T) { }) err := replicas.ApplyBalancePlan(&balancePlan{ - nodeID: 2, + nodes: []UniqueID{2}, sourceReplica: invalidReplicaID, targetReplica: 1, }, kv) @@ -189,7 +189,7 @@ func TestReplicaInfos_ApplyBalancePlan(t *testing.T) { }) err := replicas.ApplyBalancePlan(&balancePlan{ - nodeID: 1, + nodes: []UniqueID{1}, sourceReplica: 1, targetReplica: invalidReplicaID, }, kv) @@ -216,7 +216,7 @@ func TestReplicaInfos_ApplyBalancePlan(t *testing.T) { }) err := replicas.ApplyBalancePlan(&balancePlan{ - nodeID: 2, + nodes: []UniqueID{2}, sourceReplica: 1, targetReplica: invalidReplicaID, }, kv) @@ -235,7 +235,7 @@ func TestReplicaInfos_ApplyBalancePlan(t *testing.T) { }) err := replicas.ApplyBalancePlan(&balancePlan{ - nodeID: 2, + nodes: []UniqueID{2}, sourceReplica: invalidReplicaID, targetReplica: 1, }, kv) diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 3cd2644324ed95b0d00fbfcf4ce79dc62e6aad92..b039ea9e958716cdc2ad90bfe2cf989a6e964adb 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -2306,26 +2306,13 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error { offlineNodes.Insert(nodeID) } - for _, replica := range replicas { - replica := replica + for replicaID := range replicas { + replicaID := replicaID wg.Go(func() error { - onlineNodes := make([]UniqueID, 0, len(replica.NodeIds)) - for _, nodeID := range replica.NodeIds { - if !offlineNodes.Contain(nodeID) { - onlineNodes = append(onlineNodes, nodeID) - } - } - replica.NodeIds = onlineNodes - - err := lbt.meta.setReplicaInfo(replica) - if err != nil { - log.Error("failed to remove offline nodes from replica info", - zap.Int64("replicaID", replica.ReplicaID), - zap.Error(err)) - return err - } - - return nil + return lbt.meta.applyReplicaBalancePlan(&balancePlan{ + nodes: lbt.SourceNodeIDs, + sourceReplica: replicaID, + }) }) } } diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index 1350bc64ff8a7fb14e25930e2e77a064ec15ece5..25c2f10c8878fc8429a5bcfcb04870eb2980edcf 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -685,12 +685,14 @@ func (scheduler *TaskScheduler) scheduleLoop() { } } - err = triggerTask.globalPostExecute(triggerTask.traceCtx()) - if err != nil { - log.Error("scheduleLoop: failed to execute globalPostExecute() of task", - zap.Int64("taskID", triggerTask.getTaskID()), - zap.Error(err)) - triggerTask.setResultInfo(err) + if triggerTask.getResultInfo().ErrorCode == commonpb.ErrorCode_Success { + err = triggerTask.globalPostExecute(triggerTask.traceCtx()) + if err != nil { + log.Error("scheduleLoop: failed to execute globalPostExecute() of task", + zap.Int64("taskID", triggerTask.getTaskID()), + zap.Error(err)) + triggerTask.setResultInfo(err) + } } err = removeTaskFromKVFn(triggerTask) diff --git a/internal/querycoord/util.go b/internal/querycoord/util.go index daf5d7c0d9249511ae5cd8e1ec50b501b7c98b1f..b0fd8f30927911fe9baeb480203b75108e246343 100644 --- a/internal/querycoord/util.go +++ b/internal/querycoord/util.go @@ -203,6 +203,12 @@ func removeFromSlice(origin []UniqueID, del ...UniqueID) []UniqueID { return set.Collect() } +func uniqueSlice(origin []UniqueID) []UniqueID { + set := make(typeutil.UniqueSet, len(origin)) + set.Insert(origin...) + return set.Collect() +} + func getReplicaAvailableMemory(cluster Cluster, replica *milvuspb.ReplicaInfo) uint64 { availableMemory := uint64(0) nodes := getNodeInfos(cluster, replica.NodeIds)