From c71c6378ff18a602858230d74db3fc2ba567b5ea Mon Sep 17 00:00:00 2001 From: yah01 Date: Thu, 10 Nov 2022 17:01:03 +0800 Subject: [PATCH] Clear stale replicas (#20456) Signed-off-by: yah01 Signed-off-by: yah01 --- internal/querycoordv2/job/job.go | 16 ++++++++-- internal/querycoordv2/meta/replica_manager.go | 30 ++++++++++++++++--- .../querycoordv2/meta/replica_manager_test.go | 8 ++--- internal/querycoordv2/server.go | 2 +- 4 files changed, 45 insertions(+), 11 deletions(-) diff --git a/internal/querycoordv2/job/job.go b/internal/querycoordv2/job/job.go index 102222b19..c4dbddf38 100644 --- a/internal/querycoordv2/job/job.go +++ b/internal/querycoordv2/job/job.go @@ -180,8 +180,14 @@ func (job *LoadCollectionJob) Execute() error { zap.Int64("collectionID", req.GetCollectionID()), ) + // Clear stale replicas + err := job.meta.ReplicaManager.RemoveCollection(req.GetCollectionID()) + if err != nil { + log.Warn("failed to clear stale replicas", zap.Error(err)) + return err + } + // Create replicas - // TODO(yah01): store replicas and collection atomically replicas, err := utils.SpawnReplicas(job.meta.ReplicaManager, job.nodeMgr, req.GetCollectionID(), @@ -381,8 +387,14 @@ func (job *LoadPartitionJob) Execute() error { zap.Int64s("partitionIDs", req.GetPartitionIDs()), ) + // Clear stale replicas + err := job.meta.ReplicaManager.RemoveCollection(req.GetCollectionID()) + if err != nil { + log.Warn("failed to clear stale replicas", zap.Error(err)) + return err + } + // Create replicas - // TODO(yah01): store replicas and collection atomically replicas, err := utils.SpawnReplicas(job.meta.ReplicaManager, job.nodeMgr, req.GetCollectionID(), diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index a131bea1f..48c9d3757 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -21,8 +21,11 @@ import ( "sync" "github.com/golang/protobuf/proto" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/typeutil" . "github.com/milvus-io/milvus/internal/util/typeutil" + "go.uber.org/zap" ) type Replica struct { @@ -64,15 +67,34 @@ func NewReplicaManager(idAllocator func() (int64, error), store Store) *ReplicaM } // Recover recovers the replicas for given collections from meta store -func (m *ReplicaManager) Recover() error { +func (m *ReplicaManager) Recover(collections []int64) error { replicas, err := m.store.GetReplicas() if err != nil { return fmt.Errorf("failed to recover replicas, err=%w", err) } + + collectionSet := typeutil.NewUniqueSet(collections...) for _, replica := range replicas { - m.replicas[replica.GetID()] = &Replica{ - Replica: replica, - Nodes: NewUniqueSet(replica.GetNodes()...), + if collectionSet.Contain(replica.GetCollectionID()) { + m.replicas[replica.GetID()] = &Replica{ + Replica: replica, + Nodes: NewUniqueSet(replica.GetNodes()...), + } + log.Info("recover replica", + zap.Int64("collectionID", replica.GetCollectionID()), + zap.Int64("replicaID", replica.GetID()), + zap.Int64s("nodes", replica.GetNodes()), + ) + } else { + err := m.store.ReleaseReplica(replica.GetCollectionID(), replica.GetID()) + if err != nil { + return err + } + log.Info("clear stale replica", + zap.Int64("collectionID", replica.GetCollectionID()), + zap.Int64("replicaID", replica.GetID()), + zap.Int64s("nodes", replica.GetNodes()), + ) } } return nil diff --git a/internal/querycoordv2/meta/replica_manager_test.go b/internal/querycoordv2/meta/replica_manager_test.go index d6d7d8c1d..397e81203 100644 --- a/internal/querycoordv2/meta/replica_manager_test.go +++ b/internal/querycoordv2/meta/replica_manager_test.go @@ -111,7 +111,7 @@ func (suite *ReplicaManagerSuite) TestRecover() { // Clear data in memory, and then recover from meta store suite.clearMemory() - mgr.Recover() + mgr.Recover(suite.collections) suite.TestGet() // Test recover from 2.1 meta store @@ -125,7 +125,7 @@ func (suite *ReplicaManagerSuite) TestRecover() { suite.kv.Save(ReplicaMetaPrefixV1+"/2100", string(value)) suite.clearMemory() - mgr.Recover() + mgr.Recover(append(suite.collections, 1000)) replica := mgr.Get(2100) suite.NotNil(replica) suite.EqualValues(1000, replica.CollectionID) @@ -148,7 +148,7 @@ func (suite *ReplicaManagerSuite) TestRemove() { } // Check whether the replicas are also removed from meta store - mgr.Recover() + mgr.Recover(suite.collections) for _, collection := range suite.collections { replicas := mgr.GetByCollection(collection) suite.Empty(replicas) @@ -179,7 +179,7 @@ func (suite *ReplicaManagerSuite) TestNodeManipulate() { // Check these modifications are applied to meta store suite.clearMemory() - mgr.Recover() + mgr.Recover(suite.collections) for _, collection := range suite.collections { replica := mgr.GetByCollectionAndNode(collection, firstNode) suite.Nil(replica) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 7959f7779..32d682c44 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -256,7 +256,7 @@ func (s *Server) initMeta() error { } metrics.QueryCoordNumCollections.WithLabelValues().Set(float64(len(s.meta.GetAll()))) - err = s.meta.ReplicaManager.Recover() + err = s.meta.ReplicaManager.Recover(s.meta.CollectionManager.GetAll()) if err != nil { log.Error("failed to recover replicas") return err -- GitLab