diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index decb8e35f0b54d8a81145a3aafb551e9838068ce..525216da7858494963c328f2c920dea06f2957ad 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -408,12 +408,10 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { if len(watchDeltaChannels) != len(recoveryInfo.Channels) { for _, info := range recoveryInfo.Channels { - deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix) + deltaChannel, err := generateWatchDeltaChannelInfo(info) if err != nil { return err } - deltaChannel := proto.Clone(info).(*datapb.VchannelInfo) - deltaChannel.ChannelName = deltaChannelName watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } } @@ -775,12 +773,10 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error { if len(watchDeltaChannels) != len(recoveryInfo.Channels) { for _, info := range recoveryInfo.Channels { - deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix) + deltaChannel, err := generateWatchDeltaChannelInfo(info) if err != nil { return err } - deltaChannel := proto.Clone(info).(*datapb.VchannelInfo) - deltaChannel.ChannelName = deltaChannelName watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } } @@ -1578,12 +1574,10 @@ func (ht *handoffTask) execute(ctx context.Context) error { } if len(watchDeltaChannels) != len(recoveryInfo.Channels) { for _, info := range recoveryInfo.Channels { - deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix) + deltaChannel, err := generateWatchDeltaChannelInfo(info) if err != nil { return err } - deltaChannel := proto.Clone(info).(*datapb.VchannelInfo) - deltaChannel.ChannelName = deltaChannelName watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } } @@ -1770,12 +1764,10 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { if len(watchDeltaChannels) != len(recoveryInfo.Channels) { for _, info := range recoveryInfo.Channels { - deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix) + deltaChannel, err := generateWatchDeltaChannelInfo(info) if err != nil { return err } - deltaChannel := proto.Clone(info).(*datapb.VchannelInfo) - deltaChannel.ChannelName = deltaChannelName watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } } @@ -1975,12 +1967,10 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { if len(watchDeltaChannels) != len(recoveryInfo.Channels) { for _, info := range recoveryInfo.Channels { - deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix) + deltaChannel, err := generateWatchDeltaChannelInfo(info) if err != nil { return err } - deltaChannel := proto.Clone(info).(*datapb.VchannelInfo) - deltaChannel.ChannelName = deltaChannelName watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } } @@ -2213,3 +2203,16 @@ func assignInternalTask(ctx context.Context, func getSizeOfLoadSegmentReq(req *querypb.LoadSegmentsRequest) int { return proto.Size(req) } + +func generateWatchDeltaChannelInfo(info *datapb.VchannelInfo) (*datapb.VchannelInfo, error) { + deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix) + if err != nil { + return nil, err + } + deltaChannel := proto.Clone(info).(*datapb.VchannelInfo) + deltaChannel.ChannelName = deltaChannelName + deltaChannel.UnflushedSegments = nil + deltaChannel.FlushedSegments = nil + deltaChannel.DroppedSegments = nil + return deltaChannel, nil +} diff --git a/internal/querynode/task.go b/internal/querynode/task.go index 1699f25edb228b0c9f0e6e61a9b54b4ad3be64c9..6958be63ab62c130aa0ce2821cc961b972c9e0de 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -853,9 +853,10 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error { if err != nil { return err } - log.Debug("start release history pids", zap.Any("pids", pids)) + log.Debug("start release history pids", zap.Any("pids", pids), zap.Any("load type", hCol.getLoadType())) if len(pids) == 0 && hCol.getLoadType() == loadTypePartition { r.node.dataSyncService.removeCollectionDeltaFlowGraph(r.req.CollectionID) + log.Debug("release delta channels", zap.Any("deltaChannels", hCol.getVDeltaChannels())) vChannels := hCol.getVDeltaChannels() for _, channel := range vChannels { log.Debug("Releasing tSafe in releasePartitionTask...",