From 48bdeb56d8267bf776c2ca5138a9481f0bc09965 Mon Sep 17 00:00:00 2001 From: godchen Date: Fri, 26 Nov 2021 22:47:17 +0800 Subject: [PATCH] Merge watch delta channel positions (#12309) Signed-off-by: godchen --- internal/querycoord/task.go | 45 +++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index d62e5329a..d6ce5d4ba 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -406,14 +406,12 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) } - if len(watchDeltaChannels) != len(recoveryInfo.Channels) { - for _, info := range recoveryInfo.Channels { - deltaChannel, err := generateWatchDeltaChannelInfo(info) - if err != nil { - return err - } - watchDeltaChannels = append(watchDeltaChannels, deltaChannel) + for _, info := range recoveryInfo.Channels { + deltaChannel, err := generateWatchDeltaChannelInfo(info) + if err != nil { + return err } + watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } for _, info := range recoveryInfo.Channels { @@ -457,12 +455,13 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { } } + mergedDeltaChannels := mergeWatchDeltaChannelInfo(watchDeltaChannels) msgBase := proto.Clone(lct.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ Base: msgBase, CollectionID: collectionID, - Infos: watchDeltaChannels, + Infos: mergedDeltaChannels, } // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule lct.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos) @@ -1570,14 +1569,12 @@ func (ht *handoffTask) execute(ctx context.Context) error { } } } - if len(watchDeltaChannels) != len(recoveryInfo.Channels) { - for _, info := range recoveryInfo.Channels { - deltaChannel, err := generateWatchDeltaChannelInfo(info) - if err != nil { - return err - } - watchDeltaChannels = append(watchDeltaChannels, deltaChannel) + for _, info := range recoveryInfo.Channels { + deltaChannel, err := generateWatchDeltaChannelInfo(info) + if err != nil { + return err } + watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } if !findBinlog { @@ -1585,12 +1582,13 @@ func (ht *handoffTask) execute(ctx context.Context) error { ht.setResultInfo(err) return err } + mergedDeltaChannels := mergeWatchDeltaChannelInfo(watchDeltaChannels) msgBase := proto.Clone(ht.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ Base: msgBase, CollectionID: collectionID, - Infos: watchDeltaChannels, + Infos: mergedDeltaChannels, } // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule ht.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos) @@ -1962,22 +1960,21 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) } - if len(watchDeltaChannels) != len(recoveryInfo.Channels) { - for _, info := range recoveryInfo.Channels { - deltaChannel, err := generateWatchDeltaChannelInfo(info) - if err != nil { - return err - } - watchDeltaChannels = append(watchDeltaChannels, deltaChannel) + for _, info := range recoveryInfo.Channels { + deltaChannel, err := generateWatchDeltaChannelInfo(info) + if err != nil { + return err } + watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } } + mergedDeltaChannels := mergeWatchDeltaChannelInfo(watchDeltaChannels) msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ Base: msgBase, CollectionID: collectionID, - Infos: watchDeltaChannels, + Infos: mergedDeltaChannels, } // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule lbt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos) -- GitLab