From 74da53c02767bb48387bfc4e08d76df22dafb495 Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 30 Mar 2023 10:48:23 +0800 Subject: [PATCH] fix update load percentage (#23054) Signed-off-by: Wei Liu --- .../balance/rowcount_based_balancer.go | 2 +- .../querycoordv2/meta/collection_manager.go | 159 +++++++----------- .../meta/collection_manager_test.go | 97 ++++++++++- .../observers/collection_observer.go | 63 ++----- .../observers/collection_observer_test.go | 4 +- internal/querycoordv2/server_test.go | 4 +- internal/querycoordv2/services.go | 19 ++- internal/querycoordv2/services_test.go | 16 +- 8 files changed, 192 insertions(+), 172 deletions(-) diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index b823a6999..8213b1962 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -89,7 +89,7 @@ func (b *RowCountBasedBalancer) Balance() ([]SegmentAssignPlan, []ChannelAssignP // loading collection should skip balance loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool { - return b.meta.GetStatus(cid) == querypb.LoadStatus_Loaded + return b.meta.CalculateLoadStatus(cid) == querypb.LoadStatus_Loaded }) segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index 429069a09..04950936d 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -26,6 +26,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/merr" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -202,11 +203,15 @@ func (m *CollectionManager) GetReplicaNumber(collectionID UniqueID) int32 { return -1 } -// GetCurrentLoadPercentage checks if collection is currently fully loaded. -func (m *CollectionManager) GetCurrentLoadPercentage(collectionID UniqueID) int32 { +// CalculateLoadPercentage checks if collection is currently fully loaded. +func (m *CollectionManager) CalculateLoadPercentage(collectionID UniqueID) int32 { m.rwmutex.RLock() defer m.rwmutex.RUnlock() + return m.calculateLoadPercentage(collectionID) +} + +func (m *CollectionManager) calculateLoadPercentage(collectionID UniqueID) int32 { collection, ok := m.collections[collectionID] if ok { partitions := m.getPartitionsByCollection(collectionID) @@ -223,20 +228,6 @@ func (m *CollectionManager) GetCurrentLoadPercentage(collectionID UniqueID) int3 return -1 } -// GetCollectionLoadPercentage returns collection load percentage. -// Note: collection.LoadPercentage == 100 only means that it used to be fully loaded, and it is queryable, -// to check if it is fully loaded now, use GetCurrentLoadPercentage instead. -func (m *CollectionManager) GetCollectionLoadPercentage(collectionID UniqueID) int32 { - m.rwmutex.RLock() - defer m.rwmutex.RUnlock() - - collection, ok := m.collections[collectionID] - if ok { - return collection.LoadPercentage - } - return -1 -} - func (m *CollectionManager) GetPartitionLoadPercentage(partitionID UniqueID) int32 { m.rwmutex.RLock() defer m.rwmutex.RUnlock() @@ -248,7 +239,7 @@ func (m *CollectionManager) GetPartitionLoadPercentage(partitionID UniqueID) int return -1 } -func (m *CollectionManager) GetStatus(collectionID UniqueID) querypb.LoadStatus { +func (m *CollectionManager) CalculateLoadStatus(collectionID UniqueID) querypb.LoadStatus { m.rwmutex.RLock() defer m.rwmutex.RUnlock() @@ -282,28 +273,6 @@ func (m *CollectionManager) GetFieldIndex(collectionID UniqueID) map[int64]int64 return nil } -// ContainAnyIndex returns true if the loaded collection contains one of the given indexes, -// returns false otherwise. -func (m *CollectionManager) ContainAnyIndex(collectionID int64, indexIDs ...int64) bool { - m.rwmutex.RLock() - defer m.rwmutex.RUnlock() - - for _, indexID := range indexIDs { - if m.containIndex(collectionID, indexID) { - return true - } - } - return false -} - -func (m *CollectionManager) containIndex(collectionID, indexID int64) bool { - collection, ok := m.collections[collectionID] - if ok { - return lo.Contains(lo.Values(collection.GetFieldIndexID()), indexID) - } - return false -} - func (m *CollectionManager) Exist(collectionID UniqueID) bool { m.rwmutex.RLock() defer m.rwmutex.RUnlock() @@ -362,29 +331,11 @@ func (m *CollectionManager) PutCollection(collection *Collection, partitions ... return m.putCollection(true, collection, partitions...) } -func (m *CollectionManager) UpdateCollection(collection *Collection) error { +func (m *CollectionManager) PutCollectionWithoutSave(collection *Collection) error { m.rwmutex.Lock() defer m.rwmutex.Unlock() - _, ok := m.collections[collection.GetCollectionID()] - if !ok { - return merr.WrapErrCollectionNotFound(collection.GetCollectionID()) - } - - return m.putCollection(true, collection) -} - -func (m *CollectionManager) UpdateCollectionInMemory(collection *Collection) bool { - m.rwmutex.Lock() - defer m.rwmutex.Unlock() - - _, ok := m.collections[collection.GetCollectionID()] - if !ok { - return false - } - - m.putCollection(false, collection) - return true + return m.putCollection(false, collection) } func (m *CollectionManager) putCollection(withSave bool, collection *Collection, partitions ...*Partition) error { @@ -414,36 +365,11 @@ func (m *CollectionManager) PutPartition(partitions ...*Partition) error { return m.putPartition(partitions, true) } -func (m *CollectionManager) PutPartitionWithoutSave(partitions ...*Partition) { - m.rwmutex.Lock() - defer m.rwmutex.Unlock() - - m.putPartition(partitions, false) -} - -func (m *CollectionManager) UpdatePartition(partition *Partition) error { +func (m *CollectionManager) PutPartitionWithoutSave(partitions ...*Partition) error { m.rwmutex.Lock() defer m.rwmutex.Unlock() - _, ok := m.partitions[partition.GetPartitionID()] - if !ok { - return merr.WrapErrPartitionNotFound(partition.GetPartitionID()) - } - - return m.putPartition([]*Partition{partition}, true) -} - -func (m *CollectionManager) UpdatePartitionInMemory(partition *Partition) bool { - m.rwmutex.Lock() - defer m.rwmutex.Unlock() - - _, ok := m.partitions[partition.GetPartitionID()] - if !ok { - return false - } - - m.putPartition([]*Partition{partition}, false) - return true + return m.putPartition(partitions, false) } func (m *CollectionManager) putPartition(partitions []*Partition, withSave bool) error { @@ -463,6 +389,55 @@ func (m *CollectionManager) putPartition(partitions []*Partition, withSave bool) return nil } +func (m *CollectionManager) UpdateLoadPercent(partitionID int64, loadPercent int32) (int32, error) { + m.rwmutex.Lock() + defer m.rwmutex.Unlock() + + oldPartition, ok := m.partitions[partitionID] + if !ok { + return 0, merr.WrapErrPartitionNotFound(partitionID) + } + + if loadPercent <= oldPartition.LoadPercentage { + return m.calculateLoadPercentage(oldPartition.GetCollectionID()), nil + } + + // update partition load percentage + newPartition := oldPartition.Clone() + newPartition.LoadPercentage = loadPercent + savePartition := false + if loadPercent == 100 { + savePartition = true + newPartition.Status = querypb.LoadStatus_Loaded + elapsed := time.Since(newPartition.CreatedAt) + metrics.QueryCoordLoadLatency.WithLabelValues().Observe(float64(elapsed.Milliseconds())) + } + err := m.putPartition([]*Partition{newPartition}, savePartition) + if err != nil { + return 0, err + } + + // update collection load percentage + oldCollection, ok := m.collections[newPartition.CollectionID] + if !ok { + return 0, merr.WrapErrCollectionNotFound(newPartition.CollectionID) + } + collectionPercent := m.calculateLoadPercentage(oldCollection.CollectionID) + if collectionPercent <= oldCollection.LoadPercentage { + return m.calculateLoadPercentage(oldPartition.GetCollectionID()), nil + } + newCollection := oldCollection.Clone() + newCollection.LoadPercentage = collectionPercent + saveCollection := false + if collectionPercent == 100 { + saveCollection = true + newCollection.Status = querypb.LoadStatus_Loaded + elapsed := time.Since(newCollection.CreatedAt) + metrics.QueryCoordLoadLatency.WithLabelValues().Observe(float64(elapsed.Milliseconds())) + } + return collectionPercent, m.putCollection(saveCollection, newCollection) +} + // RemoveCollection removes collection and its partitions. func (m *CollectionManager) RemoveCollection(collectionID UniqueID) error { m.rwmutex.Lock() @@ -495,18 +470,6 @@ func (m *CollectionManager) RemovePartition(ids ...UniqueID) error { return m.removePartition(ids...) } -func (m *CollectionManager) RemoveCollectionInMemory(id UniqueID) { - m.rwmutex.Lock() - defer m.rwmutex.Unlock() - - delete(m.collections, id) - - partitions := m.getPartitionsByCollection(id) - for _, partition := range partitions { - delete(m.partitions, partition.GetPartitionID()) - } -} - func (m *CollectionManager) removePartition(ids ...UniqueID) error { partition := m.partitions[ids[0]] err := m.store.ReleasePartition(partition.CollectionID, ids...) diff --git a/internal/querycoordv2/meta/collection_manager_test.go b/internal/querycoordv2/meta/collection_manager_test.go index 5a206c57f..24989cf20 100644 --- a/internal/querycoordv2/meta/collection_manager_test.go +++ b/internal/querycoordv2/meta/collection_manager_test.go @@ -108,7 +108,7 @@ func (suite *CollectionManagerSuite) TestGetProperty() { for i, collection := range suite.collections { loadType := mgr.GetLoadType(collection) replicaNumber := mgr.GetReplicaNumber(collection) - percentage := mgr.GetCurrentLoadPercentage(collection) + percentage := mgr.CalculateLoadPercentage(collection) exist := mgr.Exist(collection) suite.Equal(suite.loadTypes[i], loadType) suite.Equal(suite.replicaNumber[i], replicaNumber) @@ -119,7 +119,7 @@ func (suite *CollectionManagerSuite) TestGetProperty() { invalidCollection := -1 loadType := mgr.GetLoadType(int64(invalidCollection)) replicaNumber := mgr.GetReplicaNumber(int64(invalidCollection)) - percentage := mgr.GetCurrentLoadPercentage(int64(invalidCollection)) + percentage := mgr.CalculateLoadPercentage(int64(invalidCollection)) exist := mgr.Exist(int64(invalidCollection)) suite.Equal(querypb.LoadType_UnKnownType, loadType) suite.EqualValues(-1, replicaNumber) @@ -176,29 +176,29 @@ func (suite *CollectionManagerSuite) TestUpdate() { for _, collection := range collections { collection := collection.Clone() collection.LoadPercentage = 100 - ok := mgr.UpdateCollectionInMemory(collection) - suite.True(ok) + err := mgr.PutCollectionWithoutSave(collection) + suite.NoError(err) modified := mgr.GetCollection(collection.GetCollectionID()) suite.Equal(collection, modified) suite.EqualValues(100, modified.LoadPercentage) collection.Status = querypb.LoadStatus_Loaded - err := mgr.UpdateCollection(collection) + err = mgr.PutCollection(collection) suite.NoError(err) } for _, partition := range partitions { partition := partition.Clone() partition.LoadPercentage = 100 - ok := mgr.UpdatePartitionInMemory(partition) - suite.True(ok) + err := mgr.PutPartitionWithoutSave(partition) + suite.NoError(err) modified := mgr.GetPartition(partition.GetPartitionID()) suite.Equal(partition, modified) suite.EqualValues(100, modified.LoadPercentage) partition.Status = querypb.LoadStatus_Loaded - err := mgr.UpdatePartition(partition) + err = mgr.PutPartition(partition) suite.NoError(err) } @@ -215,6 +215,25 @@ func (suite *CollectionManagerSuite) TestUpdate() { } } +func (suite *CollectionManagerSuite) TestGetFieldIndex() { + mgr := suite.mgr + mgr.PutCollection(&Collection{ + CollectionLoadInfo: &querypb.CollectionLoadInfo{ + CollectionID: 1, + ReplicaNumber: 1, + Status: querypb.LoadStatus_Loading, + LoadType: querypb.LoadType_LoadCollection, + FieldIndexID: map[int64]int64{1: 1, 2: 2}, + }, + LoadPercentage: 0, + CreatedAt: time.Now(), + }) + indexID := mgr.GetFieldIndex(1) + suite.Len(indexID, 2) + suite.Contains(indexID, int64(1)) + suite.Contains(indexID, int64(2)) +} + func (suite *CollectionManagerSuite) TestRemove() { mgr := suite.mgr @@ -291,6 +310,68 @@ func (suite *CollectionManagerSuite) TestRecover() { } } +func (suite *CollectionManagerSuite) TestUpdateLoadPercentage() { + mgr := suite.mgr + mgr.PutCollection(&Collection{ + CollectionLoadInfo: &querypb.CollectionLoadInfo{ + CollectionID: 1, + ReplicaNumber: 1, + Status: querypb.LoadStatus_Loading, + LoadType: querypb.LoadType_LoadCollection, + }, + LoadPercentage: 0, + CreatedAt: time.Now(), + }) + + partitions := []int64{1, 2} + for _, partition := range partitions { + mgr.PutPartition(&Partition{ + PartitionLoadInfo: &querypb.PartitionLoadInfo{ + CollectionID: 1, + PartitionID: partition, + Status: querypb.LoadStatus_Loading, + }, + LoadPercentage: 0, + CreatedAt: time.Now(), + }) + } + // test update partition load percentage + mgr.UpdateLoadPercent(1, 30) + partition := mgr.GetPartition(1) + suite.Equal(int32(30), partition.LoadPercentage) + suite.Equal(int32(30), mgr.GetPartitionLoadPercentage(partition.PartitionID)) + suite.Equal(querypb.LoadStatus_Loading, partition.Status) + collection := mgr.GetCollection(1) + suite.Equal(int32(15), collection.LoadPercentage) + suite.Equal(querypb.LoadStatus_Loading, collection.Status) + // expect nothing changed + mgr.UpdateLoadPercent(1, 15) + partition = mgr.GetPartition(1) + suite.Equal(int32(30), partition.LoadPercentage) + suite.Equal(querypb.LoadStatus_Loading, partition.Status) + collection = mgr.GetCollection(1) + suite.Equal(int32(15), collection.LoadPercentage) + suite.Equal(querypb.LoadStatus_Loading, collection.Status) + suite.Equal(querypb.LoadStatus_Loading, mgr.CalculateLoadStatus(collection.CollectionID)) + // test update partition load percentage to 100 + mgr.UpdateLoadPercent(1, 100) + partition = mgr.GetPartition(1) + suite.Equal(int32(100), partition.LoadPercentage) + suite.Equal(querypb.LoadStatus_Loaded, partition.Status) + collection = mgr.GetCollection(1) + suite.Equal(int32(50), collection.LoadPercentage) + suite.Equal(querypb.LoadStatus_Loading, collection.Status) + // test update collection load percentage + mgr.UpdateLoadPercent(2, 100) + partition = mgr.GetPartition(1) + suite.Equal(int32(100), partition.LoadPercentage) + suite.Equal(querypb.LoadStatus_Loaded, partition.Status) + collection = mgr.GetCollection(1) + suite.Equal(int32(100), collection.LoadPercentage) + suite.Equal(querypb.LoadStatus_Loaded, collection.Status) + suite.Equal(querypb.LoadStatus_Loaded, mgr.CalculateLoadStatus(collection.CollectionID)) +} + func (suite *CollectionManagerSuite) TestUpgradeRecover() { suite.releaseAll() mgr := suite.mgr diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 4945799c5..9bde3bed1 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -22,7 +22,6 @@ import ( "time" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" . "github.com/milvus-io/milvus/internal/querycoordv2/params" @@ -142,38 +141,6 @@ func (ob *CollectionObserver) observeLoadStatus() { replicaNum := ob.meta.GetReplicaNumber(partition.GetCollectionID()) ob.observePartitionLoadStatus(partition, replicaNum) } - - collections := ob.meta.CollectionManager.GetAllCollections() - for _, collection := range collections { - if collection.LoadPercentage == 100 { - continue - } - ob.observeCollectionLoadStatus(collection) - } -} - -func (ob *CollectionObserver) observeCollectionLoadStatus(collection *meta.Collection) { - log := log.With(zap.Int64("collectionID", collection.GetCollectionID())) - - updated := collection.Clone() - percentage := ob.meta.CollectionManager.GetCurrentLoadPercentage(collection.GetCollectionID()) - if percentage <= updated.LoadPercentage { - return - } - - updated.LoadPercentage = percentage - if updated.LoadPercentage == 100 && ob.targetObserver.Check(updated.GetCollectionID()) { - updated.Status = querypb.LoadStatus_Loaded - ob.meta.CollectionManager.UpdateCollection(updated) - - elapsed := time.Since(updated.CreatedAt) - metrics.QueryCoordLoadLatency.WithLabelValues().Observe(float64(elapsed.Milliseconds())) - } else { - ob.meta.CollectionManager.UpdateCollectionInMemory(updated) - } - log.Info("collection load status updated", - zap.Int32("loadPercentage", updated.LoadPercentage), - zap.Int32("collectionStatus", int32(updated.GetStatus()))) } func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partition, replicaNum int32) { @@ -193,10 +160,10 @@ func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partiti ) loadedCount := 0 - updated := partition.Clone() + loadPercentage := int32(0) if targetNum == 0 { log.Info("No segment/channel in target need to be loaded!") - updated.LoadPercentage = 100 + loadPercentage = 100 } else { for _, channel := range channelTargets { group := utils.GroupNodesByReplica(ob.meta.ReplicaManager, @@ -216,26 +183,24 @@ func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partiti zap.Int("subChannelCount", subChannelCount), zap.Int("loadSegmentCount", loadedCount-subChannelCount)) } - updated.LoadPercentage = int32(loadedCount * 100 / (targetNum * int(replicaNum))) + loadPercentage = int32(loadedCount * 100 / (targetNum * int(replicaNum))) } - if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && - updated.LoadPercentage != 100 { + if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && loadPercentage != 100 { ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount return } + ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount - if updated.LoadPercentage == 100 && ob.targetObserver.Check(updated.GetCollectionID()) { + if loadPercentage == 100 && ob.targetObserver.Check(partition.GetCollectionID()) { delete(ob.partitionLoadedCount, partition.GetPartitionID()) - updated.Status = querypb.LoadStatus_Loaded - ob.meta.CollectionManager.PutPartition(updated) - - elapsed := time.Since(updated.CreatedAt) - metrics.QueryCoordLoadLatency.WithLabelValues().Observe(float64(elapsed.Milliseconds())) - } else { - ob.meta.CollectionManager.UpdatePartitionInMemory(updated) } - log.Info("partition load status updated", - zap.Int32("loadPercentage", updated.LoadPercentage), - zap.Int32("partitionStatus", int32(updated.GetStatus()))) + collectionPercentage, err := ob.meta.CollectionManager.UpdateLoadPercent(partition.PartitionID, loadPercentage) + if err != nil { + log.Warn("failed to update load percentage") + } + log.Info("load status updated", + zap.Int32("partitionLoadPercentage", loadPercentage), + zap.Int32("collectionLoadPercentage", collectionPercentage), + ) } diff --git a/internal/querycoordv2/observers/collection_observer_test.go b/internal/querycoordv2/observers/collection_observer_test.go index c323cccd8..5440e36e6 100644 --- a/internal/querycoordv2/observers/collection_observer_test.go +++ b/internal/querycoordv2/observers/collection_observer_test.go @@ -311,8 +311,8 @@ func (suite *CollectionObserverSuite) TestObservePartition() { func (suite *CollectionObserverSuite) isCollectionLoaded(collection int64) bool { exist := suite.meta.Exist(collection) - percentage := suite.meta.GetCurrentLoadPercentage(collection) - status := suite.meta.GetStatus(collection) + percentage := suite.meta.CalculateLoadPercentage(collection) + status := suite.meta.CalculateLoadStatus(collection) replicas := suite.meta.ReplicaManager.GetByCollection(collection) channels := suite.targetMgr.GetDmChannelsByCollection(collection, meta.CurrentTarget) segments := suite.targetMgr.GetHistoricalSegmentsByCollection(collection, meta.CurrentTarget) diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index f10ce0f2a..36185671c 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -435,7 +435,7 @@ func (suite *ServerSuite) updateCollectionStatus(collectionID int64, status quer collection.LoadPercentage = 100 } collection.CollectionLoadInfo.Status = status - suite.server.meta.UpdateCollection(collection) + suite.server.meta.PutCollection(collection) partitions := suite.server.meta.GetPartitionsByCollection(collectionID) for _, partition := range partitions { @@ -445,7 +445,7 @@ func (suite *ServerSuite) updateCollectionStatus(collectionID int64, status quer partition.LoadPercentage = 100 } partition.PartitionLoadInfo.Status = status - suite.server.meta.UpdatePartition(partition) + suite.server.meta.PutPartition(partition) } } } diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 313a6fdc4..d012dc2f5 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -89,8 +89,8 @@ func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectio for _, collectionID := range collections { log := log.With(zap.Int64("collectionID", collectionID)) - percentage := s.meta.CollectionManager.GetCollectionLoadPercentage(collectionID) - if percentage < 0 { + collInfo := s.meta.CollectionManager.GetCollection(collectionID) + if collInfo == nil { if isGetAll { // The collection is released during this, // ignore it @@ -112,6 +112,13 @@ func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectio Status: merr.Status(err), }, nil } + + percentage := s.meta.CollectionManager.CalculateLoadPercentage(collectionID) + if collInfo.Status == querypb.LoadStatus_Loaded { + // when collection is loaded, regard collection as readable, set percentage == 100 + percentage = 100 + } + resp.CollectionIDs = append(resp.CollectionIDs, collectionID) resp.InMemoryPercentages = append(resp.InMemoryPercentages, int64(percentage)) resp.QueryServiceAvailable = append(resp.QueryServiceAvailable, s.checkAnyReplicaAvailable(collectionID)) @@ -540,7 +547,7 @@ func (s *Server) refreshCollection(ctx context.Context, collID int64) (*commonpb } // Check that collection is fully loaded. - if s.meta.CollectionManager.GetCurrentLoadPercentage(collID) != 100 { + if s.meta.CollectionManager.CalculateLoadPercentage(collID) != 100 { errMsg := "a collection must be fully loaded before refreshing" log.Warn(errMsg) return &commonpb.Status{ @@ -594,7 +601,7 @@ func (s *Server) refreshPartitions(ctx context.Context, collID int64, partIDs [] } // Check that all partitions are fully loaded. - if s.meta.CollectionManager.GetCurrentLoadPercentage(collID) != 100 { + if s.meta.CollectionManager.CalculateLoadPercentage(collID) != 100 { errMsg := "partitions must be fully loaded before refreshing" log.Warn(errMsg) return &commonpb.Status{ @@ -664,7 +671,7 @@ func (s *Server) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques log.Warn(msg, zap.Int("source-nodes-num", len(req.GetSourceNodeIDs()))) return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil } - if s.meta.CollectionManager.GetCurrentLoadPercentage(req.GetCollectionID()) < 100 { + if s.meta.CollectionManager.CalculateLoadPercentage(req.GetCollectionID()) < 100 { msg := "can't balance segments of not fully loaded collection" log.Warn(msg) return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil @@ -838,7 +845,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade Status: merr.Status(nil), } - if s.meta.CollectionManager.GetCurrentLoadPercentage(req.GetCollectionID()) < 100 { + if s.meta.CollectionManager.CalculateLoadPercentage(req.GetCollectionID()) < 100 { msg := fmt.Sprintf("collection %v is not fully loaded", req.GetCollectionID()) log.Warn(msg) resp.Status = utils.WrapStatus(commonpb.ErrorCode_NoReplicaAvailable, msg) diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 4d03ce839..d2f6645e0 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -980,7 +980,8 @@ func (suite *ServiceSuite) TestRefreshCollection() { // Load and explicitly mark load percentage to 100%. collObj := utils.CreateTestCollection(collection, 1) collObj.LoadPercentage = 40 - suite.True(suite.server.meta.CollectionManager.UpdateCollectionInMemory(collObj)) + err = suite.server.meta.CollectionManager.PutCollectionWithoutSave(collObj) + suite.NoError(err) } // Test refresh all collections again when collections are loaded. This time should fail with collection not 100% loaded. @@ -996,7 +997,8 @@ func (suite *ServiceSuite) TestRefreshCollection() { // Load and explicitly mark load percentage to 100%. collObj := utils.CreateTestCollection(collection, 1) collObj.LoadPercentage = 100 - suite.True(suite.server.meta.CollectionManager.UpdateCollectionInMemory(collObj)) + err := suite.server.meta.CollectionManager.PutCollectionWithoutSave(collObj) + suite.NoError(err) } // Test refresh all collections again when collections are loaded. This time should fail with context canceled. @@ -1049,7 +1051,8 @@ func (suite *ServiceSuite) TestRefreshPartitions() { for _, partition := range suite.partitions[collection] { partObj := utils.CreateTestPartition(collection, partition) partObj.LoadPercentage = 40 - suite.True(suite.server.meta.CollectionManager.UpdatePartitionInMemory(partObj)) + err := suite.server.meta.CollectionManager.PutPartitionWithoutSave(partObj) + suite.NoError(err) } } @@ -1070,7 +1073,8 @@ func (suite *ServiceSuite) TestRefreshPartitions() { for _, partition := range suite.partitions[collection] { partObj := utils.CreateTestPartition(collection, partition) partObj.LoadPercentage = 100 - suite.True(suite.server.meta.CollectionManager.UpdatePartitionInMemory(partObj)) + err := suite.server.meta.CollectionManager.PutPartitionWithoutSave(partObj) + suite.NoError(err) } } @@ -1845,7 +1849,7 @@ func (suite *ServiceSuite) updateCollectionStatus(collectionID int64, status que collection.LoadPercentage = 100 } collection.CollectionLoadInfo.Status = status - suite.meta.UpdateCollection(collection) + suite.meta.PutCollection(collection) partitions := suite.meta.GetPartitionsByCollection(collectionID) for _, partition := range partitions { @@ -1855,7 +1859,7 @@ func (suite *ServiceSuite) updateCollectionStatus(collectionID int64, status que partition.LoadPercentage = 100 } partition.PartitionLoadInfo.Status = status - suite.meta.UpdatePartition(partition) + suite.meta.PutPartition(partition) } } } -- GitLab