diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 4b2a12af24d7b36100b833e9d419aa0c1c5a5591..ff516dd662560d9bad88cb1fe90e02981544a98f 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -61,7 +61,7 @@ func NewTargetManager(broker Broker, meta *Meta) *TargetManager { // UpdateCollectionCurrentTarget updates the current target to next target, // WARN: DO NOT call this method for an existing collection as target observer running, or it will lead to a double-update, // which may make the current target not available -func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64, partitionIDs ...int64) { +func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64, partitionIDs ...int64) bool { mgr.rwMutex.Lock() defer mgr.rwMutex.Unlock() log := log.With(zap.Int64("collectionID", collectionID), @@ -72,7 +72,7 @@ func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64, part newTarget := mgr.next.getCollectionTarget(collectionID) if newTarget == nil || newTarget.IsEmpty() { log.Info("next target does not exist, skip it") - return + return false } mgr.current.updateCollectionTarget(collectionID, newTarget) mgr.next.removeCollectionTarget(collectionID) @@ -80,6 +80,7 @@ func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64, part log.Debug("finish to update current target for collection", zap.Int64s("segments", newTarget.GetAllSegmentIDs()), zap.Strings("channels", newTarget.GetAllDmChannelNames())) + return true } // UpdateCollectionNextTargetWithPartitions pulls next target from DataCoord, diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 06ae3ca3d8fe80515d1088513445e2db1b396219..fa2b73061ef875e2287319dc3a3fcb6ff8fa62e9 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -273,16 +273,16 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(collectionID int64) bool { func (ob *TargetObserver) updateCurrentTarget(collectionID int64) { log.Info("observer trigger update current target", zap.Int64("collectionID", collectionID)) - ob.targetMgr.UpdateCollectionCurrentTarget(collectionID) - - ob.mut.Lock() - defer ob.mut.Unlock() - notifiers := ob.readyNotifiers[collectionID] - for _, notifier := range notifiers { - close(notifier) - } - // Reuse the capacity of notifiers slice - if notifiers != nil { - ob.readyNotifiers[collectionID] = notifiers[:0] + if ob.targetMgr.UpdateCollectionCurrentTarget(collectionID) { + ob.mut.Lock() + defer ob.mut.Unlock() + notifiers := ob.readyNotifiers[collectionID] + for _, notifier := range notifiers { + close(notifier) + } + // Reuse the capacity of notifiers slice + if notifiers != nil { + ob.readyNotifiers[collectionID] = notifiers[:0] + } } }