diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index d62e5329a23573684a86941f7199f30bed0df5f0..d6ce5d4ba09b8381e607cbc4f16ae30d76169a73 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -406,14 +406,12 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) } - if len(watchDeltaChannels) != len(recoveryInfo.Channels) { - for _, info := range recoveryInfo.Channels { - deltaChannel, err := generateWatchDeltaChannelInfo(info) - if err != nil { - return err - } - watchDeltaChannels = append(watchDeltaChannels, deltaChannel) + for _, info := range recoveryInfo.Channels { + deltaChannel, err := generateWatchDeltaChannelInfo(info) + if err != nil { + return err } + watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } for _, info := range recoveryInfo.Channels { @@ -457,12 +455,13 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { } } + mergedDeltaChannels := mergeWatchDeltaChannelInfo(watchDeltaChannels) msgBase := proto.Clone(lct.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ Base: msgBase, CollectionID: collectionID, - Infos: watchDeltaChannels, + Infos: mergedDeltaChannels, } // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule lct.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos) @@ -1570,14 +1569,12 @@ func (ht *handoffTask) execute(ctx context.Context) error { } } } - if len(watchDeltaChannels) != len(recoveryInfo.Channels) { - for _, info := range recoveryInfo.Channels { - deltaChannel, err := generateWatchDeltaChannelInfo(info) - if err != nil { - return err - } - watchDeltaChannels = append(watchDeltaChannels, deltaChannel) + for _, info := range recoveryInfo.Channels { + deltaChannel, err := generateWatchDeltaChannelInfo(info) + if err != nil { + return err } + watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } if !findBinlog { @@ -1585,12 +1582,13 @@ func (ht *handoffTask) execute(ctx context.Context) error { ht.setResultInfo(err) return err } + mergedDeltaChannels := mergeWatchDeltaChannelInfo(watchDeltaChannels) msgBase := proto.Clone(ht.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ Base: msgBase, CollectionID: collectionID, - Infos: watchDeltaChannels, + Infos: mergedDeltaChannels, } // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule ht.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos) @@ -1962,22 +1960,21 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) } - if len(watchDeltaChannels) != len(recoveryInfo.Channels) { - for _, info := range recoveryInfo.Channels { - deltaChannel, err := generateWatchDeltaChannelInfo(info) - if err != nil { - return err - } - watchDeltaChannels = append(watchDeltaChannels, deltaChannel) + for _, info := range recoveryInfo.Channels { + deltaChannel, err := generateWatchDeltaChannelInfo(info) + if err != nil { + return err } + watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } } + mergedDeltaChannels := mergeWatchDeltaChannelInfo(watchDeltaChannels) msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ Base: msgBase, CollectionID: collectionID, - Infos: watchDeltaChannels, + Infos: mergedDeltaChannels, } // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule lbt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)