From 539585e91b559cd15b94044496854c221684fbd6 Mon Sep 17 00:00:00 2001 From: yah01 Date: Tue, 20 Sep 2022 16:10:50 +0800 Subject: [PATCH] Check whether leader view exists (#19237) Signed-off-by: yah01 Signed-off-by: yah01 --- .../querycoordv2/checkers/segment_checker.go | 10 ++++----- internal/querycoordv2/job/job.go | 22 ++++++++----------- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 7ab7eb489..6b42ba0f7 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -164,13 +164,13 @@ func (c *SegmentChecker) findNeedReleasedGrowingSegments(replica *meta.Replica) ret := make(map[int64][]int64, 0) // leaderID -> segment ids leaders := c.dist.ChannelDistManager.GetShardLeadersByReplica(replica) for shard, leaderID := range leaders { - lview := c.dist.LeaderViewManager.GetLeaderShardView(leaderID, shard) - if lview == nil { + leaderView := c.dist.LeaderViewManager.GetLeaderShardView(leaderID, shard) + if leaderView == nil { continue } // find growing segments from leaderview's sealed segments // because growing segments should be released only after loading the compaction created segment successfully. - for sid := range lview.Segments { + for sid := range leaderView.Segments { segment := c.targetMgr.GetSegment(sid) if segment == nil { continue @@ -178,8 +178,8 @@ func (c *SegmentChecker) findNeedReleasedGrowingSegments(replica *meta.Replica) sources := append(segment.GetCompactionFrom(), segment.GetID()) for _, source := range sources { - if lview.GrowingSegments.Contain(source) { - ret[lview.ID] = append(ret[lview.ID], source) + if leaderView.GrowingSegments.Contain(source) { + ret[leaderView.ID] = append(ret[leaderView.ID], source) } } } diff --git a/internal/querycoordv2/job/job.go b/internal/querycoordv2/job/job.go index 8be5b5aa1..e1fcf08c1 100644 --- a/internal/querycoordv2/job/job.go +++ b/internal/querycoordv2/job/job.go @@ -120,8 +120,7 @@ func NewLoadCollectionJob( func (job *LoadCollectionJob) PreExecute() error { req := job.req - log := log.With( - zap.Int64("msgID", req.Base.GetMsgID()), + log := log.Ctx(job.ctx).With( zap.Int64("collectionID", req.GetCollectionID()), ) @@ -153,8 +152,7 @@ func (job *LoadCollectionJob) PreExecute() error { func (job *LoadCollectionJob) Execute() error { req := job.req - log := log.With( - zap.Int64("msgID", req.GetBase().GetMsgID()), + log := log.Ctx(job.ctx).With( zap.Int64("collectionID", req.GetCollectionID()), ) @@ -249,8 +247,7 @@ func NewReleaseCollectionJob(ctx context.Context, func (job *ReleaseCollectionJob) Execute() error { req := job.req - log := log.With( - zap.Int64("msgID", req.GetBase().GetMsgID()), + log := log.Ctx(job.ctx).With( zap.Int64("collectionID", req.GetCollectionID()), ) if !job.meta.CollectionManager.Exist(req.GetCollectionID()) { @@ -314,8 +311,7 @@ func NewLoadPartitionJob( func (job *LoadPartitionJob) PreExecute() error { req := job.req - log := log.With( - zap.Int64("msgID", req.Base.GetMsgID()), + log := log.Ctx(job.ctx).With( zap.Int64("collectionID", req.GetCollectionID()), ) @@ -357,8 +353,7 @@ func (job *LoadPartitionJob) PreExecute() error { func (job *LoadPartitionJob) Execute() error { req := job.req - log := log.With( - zap.Int64("msgID", req.GetBase().GetMsgID()), + log := log.Ctx(job.ctx).With( zap.Int64("collectionID", req.GetCollectionID()), ) @@ -448,6 +443,9 @@ func NewReleasePartitionJob(ctx context.Context, } func (job *ReleasePartitionJob) PreExecute() error { + log := log.Ctx(job.ctx).With( + zap.Int64("collectionID", job.req.GetCollectionID()), + ) if job.meta.CollectionManager.GetLoadType(job.req.GetCollectionID()) == querypb.LoadType_LoadCollection { msg := "releasing some partitions after load collection is not supported" log.Warn(msg) @@ -458,11 +456,9 @@ func (job *ReleasePartitionJob) PreExecute() error { func (job *ReleasePartitionJob) Execute() error { req := job.req - log := log.With( - zap.Int64("msgID", req.GetBase().GetMsgID()), + log := log.Ctx(job.ctx).With( zap.Int64("collectionID", req.GetCollectionID()), ) - if !job.meta.CollectionManager.Exist(req.GetCollectionID()) { log.Info("release collection end, the collection has not been loaded into QueryNode") return nil -- GitLab