// Copyright (C) 2019-2020 Zilliz. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. package querynode import ( "context" "errors" "fmt" "math/rand" "strconv" "time" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" queryPb "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/rootcoord" ) type task interface { ID() UniqueID // return ReqID SetID(uid UniqueID) // set ReqID Timestamp() Timestamp PreExecute(ctx context.Context) error Execute(ctx context.Context) error PostExecute(ctx context.Context) error WaitToFinish() error Notify(err error) OnEnqueue() error } type baseTask struct { done chan error ctx context.Context id UniqueID } type watchDmChannelsTask struct { baseTask req *queryPb.WatchDmChannelsRequest node *QueryNode } type watchDeltaChannelsTask struct { baseTask req *queryPb.WatchDeltaChannelsRequest node *QueryNode } type loadSegmentsTask struct { baseTask req *queryPb.LoadSegmentsRequest node *QueryNode } type releaseCollectionTask struct { baseTask req *queryPb.ReleaseCollectionRequest node *QueryNode } type releasePartitionsTask struct { baseTask req *queryPb.ReleasePartitionsRequest node *QueryNode } func (b *baseTask) ID() UniqueID { return b.id } func (b *baseTask) SetID(uid UniqueID) { b.id = uid } func (b *baseTask) WaitToFinish() error { err := <-b.done return err } func (b *baseTask) Notify(err error) { b.done <- err } // watchDmChannelsTask func (w *watchDmChannelsTask) Timestamp() Timestamp { if w.req.Base == nil { log.Warn("nil base req in watchDmChannelsTask", zap.Any("collectionID", w.req.CollectionID)) return 0 } return w.req.Base.Timestamp } func (w *watchDmChannelsTask) OnEnqueue() error { if w.req == nil || w.req.Base == nil { w.SetID(rand.Int63n(100000000000)) } else { w.SetID(w.req.Base.MsgID) } return nil } func (w *watchDmChannelsTask) PreExecute(ctx context.Context) error { return nil } func (w *watchDmChannelsTask) Execute(ctx context.Context) error { collectionID := w.req.CollectionID partitionID := w.req.PartitionID // if no partitionID is specified, load type is load collection loadPartition := partitionID != 0 // get all vChannels vChannels := make([]Channel, 0) pChannels := make([]Channel, 0) VPChannels := make(map[string]string) // map[vChannel]pChannel for _, info := range w.req.Infos { v := info.ChannelName p := rootcoord.ToPhysicalChannel(info.ChannelName) vChannels = append(vChannels, v) pChannels = append(pChannels, p) VPChannels[v] = p } log.Debug("Starting WatchDmChannels ...", zap.Any("collectionName", w.req.Schema.Name), zap.Any("collectionID", collectionID), zap.Any("vChannels", vChannels), zap.Any("pChannels", pChannels), ) if len(VPChannels) != len(vChannels) { return errors.New("get physical channels failed, illegal channel length, collectionID = " + fmt.Sprintln(collectionID)) } log.Debug("Get physical channels done", zap.Any("collectionID", collectionID), ) // init replica if hasCollectionInStreaming := w.node.streaming.replica.hasCollection(collectionID); !hasCollectionInStreaming { err := w.node.streaming.replica.addCollection(collectionID, w.req.Schema) if err != nil { return err } } // init replica if hasCollectionInHistorical := w.node.historical.replica.hasCollection(collectionID); !hasCollectionInHistorical { err := w.node.historical.replica.addCollection(collectionID, w.req.Schema) if err != nil { return err } } var l loadType if loadPartition { l = loadTypePartition } else { l = loadTypeCollection } sCol, err := w.node.streaming.replica.getCollectionByID(collectionID) if err != nil { return err } sCol.addVChannels(vChannels) sCol.addPChannels(pChannels) sCol.setLoadType(l) hCol, err := w.node.historical.replica.getCollectionByID(collectionID) if err != nil { return err } hCol.addVChannels(vChannels) hCol.addPChannels(pChannels) hCol.setLoadType(l) if loadPartition { sCol.deleteReleasedPartition(partitionID) hCol.deleteReleasedPartition(partitionID) if hasPartitionInStreaming := w.node.streaming.replica.hasPartition(partitionID); !hasPartitionInStreaming { err := w.node.streaming.replica.addPartition(collectionID, partitionID) if err != nil { return err } } if hasPartitionInHistorical := w.node.historical.replica.hasPartition(partitionID); !hasPartitionInHistorical { err := w.node.historical.replica.addPartition(collectionID, partitionID) if err != nil { return err } } } log.Debug("watchDMChannel, init replica done", zap.Any("collectionID", collectionID)) // get subscription name getUniqueSubName := func() string { prefixName := Params.MsgChannelSubName return prefixName + "-" + strconv.FormatInt(collectionID, 10) + "-" + strconv.Itoa(rand.Int()) } consumeSubName := getUniqueSubName() // group channels by to seeking or consuming toSeekChannels := make([]*internalpb.MsgPosition, 0) toSubChannels := make([]Channel, 0) for _, info := range w.req.Infos { if info.SeekPosition == nil || len(info.SeekPosition.MsgID) == 0 { toSubChannels = append(toSubChannels, info.ChannelName) continue } info.SeekPosition.MsgGroup = consumeSubName toSeekChannels = append(toSeekChannels, info.SeekPosition) } log.Debug("watchDMChannel, group channels done", zap.Any("collectionID", collectionID)) // add check points info checkPointInfos := make([]*datapb.SegmentInfo, 0) for _, info := range w.req.Infos { checkPointInfos = append(checkPointInfos, info.UnflushedSegments...) } w.node.streaming.replica.addExcludedSegments(collectionID, checkPointInfos) log.Debug("watchDMChannel, add check points info done", zap.Any("collectionID", collectionID)) // create tSafe for _, channel := range vChannels { w.node.tSafeReplica.addTSafe(channel) } // add flow graph if loadPartition { w.node.dataSyncService.addPartitionFlowGraph(collectionID, partitionID, vChannels) log.Debug("Query node add partition flow graphs", zap.Any("channels", vChannels)) } else { w.node.dataSyncService.addCollectionFlowGraph(collectionID, vChannels) log.Debug("Query node add collection flow graphs", zap.Any("channels", vChannels)) } // add tSafe watcher if queryCollection exists qc, err := w.node.queryService.getQueryCollection(collectionID) if err == nil { for _, channel := range vChannels { err = qc.addTSafeWatcher(channel) if err != nil { // tSafe have been exist, not error log.Warn(err.Error()) } } } // channels as consumer var nodeFGs map[Channel]*queryNodeFlowGraph if loadPartition { nodeFGs, err = w.node.dataSyncService.getPartitionFlowGraphs(partitionID, vChannels) if err != nil { return err } } else { nodeFGs, err = w.node.dataSyncService.getCollectionFlowGraphs(collectionID, vChannels) if err != nil { return err } } for _, channel := range toSubChannels { for _, fg := range nodeFGs { if fg.channel == channel { // use pChannel to consume err := fg.consumerFlowGraph(VPChannels[channel], consumeSubName) if err != nil { errMsg := "msgStream consume error :" + err.Error() log.Warn(errMsg) return errors.New(errMsg) } } } } log.Debug("as consumer channels", zap.Any("collectionID", collectionID), zap.Any("toSubChannels", toSubChannels)) // seek channel for _, pos := range toSeekChannels { for _, fg := range nodeFGs { if fg.channel == pos.ChannelName { pos.MsgGroup = consumeSubName // use pChannel to seek pos.ChannelName = VPChannels[fg.channel] err := fg.seekQueryNodeFlowGraph(pos) if err != nil { errMsg := "msgStream seek error :" + err.Error() log.Warn(errMsg) return errors.New(errMsg) } } } } log.Debug("Seek all channel done", zap.Any("collectionID", collectionID), zap.Any("toSeekChannels", toSeekChannels)) // load growing segments unFlushedSegments := make([]*queryPb.SegmentLoadInfo, 0) unFlushedSegmentIDs := make([]UniqueID, 0) for _, info := range w.req.Infos { for _, ufInfo := range info.UnflushedSegments { // unFlushed segment may not have binLogs, skip loading if len(ufInfo.Binlogs) > 0 { unFlushedSegments = append(unFlushedSegments, &queryPb.SegmentLoadInfo{ SegmentID: ufInfo.ID, PartitionID: ufInfo.PartitionID, CollectionID: ufInfo.CollectionID, BinlogPaths: ufInfo.Binlogs, NumOfRows: ufInfo.NumOfRows, Statslogs: ufInfo.Statslogs, Deltalogs: ufInfo.Deltalogs, }) unFlushedSegmentIDs = append(unFlushedSegmentIDs, ufInfo.ID) } } } req := &queryPb.LoadSegmentsRequest{ Infos: unFlushedSegments, CollectionID: collectionID, Schema: w.req.Schema, } log.Debug("loading growing segments in WatchDmChannels...", zap.Any("collectionID", collectionID), zap.Any("unFlushedSegmentIDs", unFlushedSegmentIDs), ) err = w.node.loader.loadSegment(req, segmentTypeGrowing) if err != nil { return err } log.Debug("load growing segments done in WatchDmChannels", zap.Any("collectionID", collectionID), zap.Any("unFlushedSegmentIDs", unFlushedSegmentIDs), ) // start flow graphs if loadPartition { err = w.node.dataSyncService.startPartitionFlowGraph(partitionID, vChannels) if err != nil { return err } } else { err = w.node.dataSyncService.startCollectionFlowGraph(collectionID, vChannels) if err != nil { return err } } log.Debug("WatchDmChannels done", zap.String("ChannelIDs", fmt.Sprintln(vChannels))) return nil } func (w *watchDmChannelsTask) PostExecute(ctx context.Context) error { return nil } // watchDeltaChannelsTask func (w *watchDeltaChannelsTask) Timestamp() Timestamp { if w.req.Base == nil { log.Warn("nil base req in watchDeltaChannelsTask", zap.Any("collectionID", w.req.CollectionID)) return 0 } return w.req.Base.Timestamp } func (w *watchDeltaChannelsTask) OnEnqueue() error { if w.req == nil || w.req.Base == nil { w.SetID(rand.Int63n(100000000000)) } else { w.SetID(w.req.Base.MsgID) } return nil } func (w *watchDeltaChannelsTask) PreExecute(ctx context.Context) error { return nil } func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error { collectionID := w.req.CollectionID // get all vChannels vDeltaChannels := make([]Channel, 0) pDeltaChannels := make([]Channel, 0) VPDeltaChannels := make(map[string]string) // map[vChannel]pChannel for _, info := range w.req.Infos { v := info.ChannelName p := rootcoord.ToPhysicalChannel(info.ChannelName) vDeltaChannels = append(vDeltaChannels, v) pDeltaChannels = append(pDeltaChannels, p) VPDeltaChannels[v] = p } log.Debug("Starting WatchDeltaChannels ...", zap.Any("collectionID", collectionID), zap.Any("vDeltaChannels", vDeltaChannels), zap.Any("pChannels", pDeltaChannels), ) if len(VPDeltaChannels) != len(vDeltaChannels) { return errors.New("get physical channels failed, illegal channel length, collectionID = " + fmt.Sprintln(collectionID)) } log.Debug("Get physical channels done", zap.Any("collectionID", collectionID), ) if hasCollectionInHistorical := w.node.historical.replica.hasCollection(collectionID); !hasCollectionInHistorical { return fmt.Errorf("cannot find collection with collectionID, %d", collectionID) } hCol, err := w.node.historical.replica.getCollectionByID(collectionID) if err != nil { return err } // Check if the same deltaChannel has been watched for _, dstChan := range vDeltaChannels { for _, srcChan := range hCol.vDeltaChannels { if dstChan == srcChan { return nil } } } hCol.addVDeltaChannels(vDeltaChannels) hCol.addPDeltaChannels(pDeltaChannels) if hasCollectionInStreaming := w.node.streaming.replica.hasCollection(collectionID); !hasCollectionInStreaming { return fmt.Errorf("cannot find collection with collectionID, %d", collectionID) } sCol, err := w.node.streaming.replica.getCollectionByID(collectionID) if err != nil { return err } sCol.addVDeltaChannels(vDeltaChannels) sCol.addPDeltaChannels(pDeltaChannels) // get subscription name getUniqueSubName := func() string { prefixName := Params.MsgChannelSubName return prefixName + "-" + strconv.FormatInt(collectionID, 10) + "-" + strconv.Itoa(rand.Int()) } consumeSubName := getUniqueSubName() // group channels by to seeking or consuming toSubChannels := make([]Channel, 0) for _, info := range w.req.Infos { toSubChannels = append(toSubChannels, info.ChannelName) } log.Debug("watchDeltaChannel, group channels done", zap.Any("collectionID", collectionID)) // create tSafe for _, channel := range vDeltaChannels { w.node.tSafeReplica.addTSafe(channel) } w.node.dataSyncService.addCollectionDeltaFlowGraph(collectionID, vDeltaChannels) // add tSafe watcher if queryCollection exists qc, err := w.node.queryService.getQueryCollection(collectionID) if err == nil { for _, channel := range vDeltaChannels { err = qc.addTSafeWatcher(channel) if err != nil { // tSafe have been exist, not error log.Warn(err.Error()) } } } // channels as consumer var nodeFGs map[Channel]*queryNodeFlowGraph nodeFGs, err = w.node.dataSyncService.getCollectionDeltaFlowGraphs(collectionID, vDeltaChannels) if err != nil { return err } for _, channel := range toSubChannels { for _, fg := range nodeFGs { if fg.channel == channel { // use pChannel to consume err := fg.consumerFlowGraphLatest(VPDeltaChannels[channel], consumeSubName) if err != nil { errMsg := "msgStream consume error :" + err.Error() log.Warn(errMsg) return errors.New(errMsg) } } } } log.Debug("as consumer channels", zap.Any("collectionID", collectionID), zap.Any("toSubChannels", toSubChannels)) for _, info := range w.req.Infos { w.node.loader.FromDmlCPLoadDelete(w.ctx, collectionID, info.SeekPosition) } // start flow graphs err = w.node.dataSyncService.startCollectionDeltaFlowGraph(collectionID, vDeltaChannels) if err != nil { return err } log.Debug("WatchDeltaChannels done", zap.String("ChannelIDs", fmt.Sprintln(vDeltaChannels))) return nil } func (w *watchDeltaChannelsTask) PostExecute(ctx context.Context) error { return nil } // loadSegmentsTask func (l *loadSegmentsTask) Timestamp() Timestamp { if l.req.Base == nil { log.Warn("nil base req in loadSegmentsTask") return 0 } return l.req.Base.Timestamp } func (l *loadSegmentsTask) OnEnqueue() error { if l.req == nil || l.req.Base == nil { l.SetID(rand.Int63n(100000000000)) } else { l.SetID(l.req.Base.MsgID) } return nil } func (l *loadSegmentsTask) PreExecute(ctx context.Context) error { return nil } func (l *loadSegmentsTask) Execute(ctx context.Context) error { // TODO: support db log.Debug("Query node load segment", zap.String("loadSegmentRequest", fmt.Sprintln(l.req))) var err error // init meta for _, info := range l.req.Infos { collectionID := info.CollectionID partitionID := info.PartitionID hasCollectionInHistorical := l.node.historical.replica.hasCollection(collectionID) hasPartitionInHistorical := l.node.historical.replica.hasPartition(partitionID) if !hasCollectionInHistorical { err = l.node.historical.replica.addCollection(collectionID, l.req.Schema) if err != nil { return err } } if !hasPartitionInHistorical { err = l.node.historical.replica.addPartition(collectionID, partitionID) if err != nil { return err } } hasCollectionInStreaming := l.node.streaming.replica.hasCollection(collectionID) hasPartitionInStreaming := l.node.streaming.replica.hasPartition(partitionID) if !hasCollectionInStreaming { err = l.node.streaming.replica.addCollection(collectionID, l.req.Schema) if err != nil { return err } } if !hasPartitionInStreaming { err = l.node.streaming.replica.addPartition(collectionID, partitionID) if err != nil { return err } } } err = l.node.loader.loadSegment(l.req, segmentTypeSealed) if err != nil { log.Warn(err.Error()) return err } for _, info := range l.req.Infos { collectionID := info.CollectionID partitionID := info.PartitionID sCol, err := l.node.streaming.replica.getCollectionByID(collectionID) if err != nil { return err } sCol.deleteReleasedPartition(partitionID) hCol, err := l.node.historical.replica.getCollectionByID(collectionID) if err != nil { return err } hCol.deleteReleasedPartition(partitionID) } log.Debug("LoadSegments done", zap.String("SegmentLoadInfos", fmt.Sprintln(l.req.Infos))) return nil } func (l *loadSegmentsTask) PostExecute(ctx context.Context) error { return nil } // releaseCollectionTask func (r *releaseCollectionTask) Timestamp() Timestamp { if r.req.Base == nil { log.Warn("nil base req in releaseCollectionTask", zap.Any("collectionID", r.req.CollectionID)) return 0 } return r.req.Base.Timestamp } func (r *releaseCollectionTask) OnEnqueue() error { if r.req == nil || r.req.Base == nil { r.SetID(rand.Int63n(100000000000)) } else { r.SetID(r.req.Base.MsgID) } return nil } func (r *releaseCollectionTask) PreExecute(ctx context.Context) error { return nil } type ReplicaType int const ( replicaNone ReplicaType = iota replicaStreaming replicaHistorical ) func (r *releaseCollectionTask) Execute(ctx context.Context) error { log.Debug("Execute release collection task", zap.Any("collectionID", r.req.CollectionID)) errMsg := "release collection failed, collectionID = " + strconv.FormatInt(r.req.CollectionID, 10) + ", err = " log.Debug("release streaming", zap.Any("collectionID", r.req.CollectionID)) err := r.releaseReplica(r.node.streaming.replica, replicaStreaming) if err != nil { return errors.New(errMsg + err.Error()) } // remove collection metas in streaming and historical log.Debug("release historical", zap.Any("collectionID", r.req.CollectionID)) err = r.releaseReplica(r.node.historical.replica, replicaHistorical) if err != nil { return errors.New(errMsg + err.Error()) } r.node.historical.removeGlobalSegmentIDsByCollectionID(r.req.CollectionID) // remove query collection r.node.queryService.stopQueryCollection(r.req.CollectionID) log.Debug("ReleaseCollection done", zap.Int64("collectionID", r.req.CollectionID)) return nil } func (r *releaseCollectionTask) releaseReplica(replica ReplicaInterface, replicaType ReplicaType) error { collection, err := replica.getCollectionByID(r.req.CollectionID) if err != nil { return err } // set release time log.Debug("set release time", zap.Any("collectionID", r.req.CollectionID)) collection.setReleaseTime(r.req.Base.Timestamp) // sleep to wait for query tasks done const gracefulReleaseTime = 1 time.Sleep(gracefulReleaseTime * time.Second) log.Debug("Starting release collection...", zap.Any("collectionID", r.req.CollectionID), ) if replicaType == replicaStreaming { r.node.dataSyncService.removeCollectionFlowGraph(r.req.CollectionID) // remove partition flow graphs which partitions belong to the target collection partitionIDs, err := replica.getPartitionIDs(r.req.CollectionID) if err != nil { return err } for _, partitionID := range partitionIDs { r.node.dataSyncService.removePartitionFlowGraph(partitionID) } // remove all tSafes of the target collection for _, channel := range collection.getVChannels() { log.Debug("Releasing tSafe in releaseCollectionTask...", zap.Any("collectionID", r.req.CollectionID), zap.Any("vChannel", channel), ) // no tSafe in tSafeReplica, don't return error err = r.node.tSafeReplica.removeTSafe(channel) if err != nil { log.Warn(err.Error()) } } } else { r.node.dataSyncService.removeCollectionDeltaFlowGraph(r.req.CollectionID) // remove all tSafes of the target collection for _, channel := range collection.getVDeltaChannels() { log.Debug("Releasing tSafe in releaseCollectionTask...", zap.Any("collectionID", r.req.CollectionID), zap.Any("vDeltaChannel", channel), ) // no tSafe in tSafeReplica, don't return error err = r.node.tSafeReplica.removeTSafe(channel) if err != nil { log.Warn(err.Error()) } } } // remove excludedSegments record replica.removeExcludedSegments(r.req.CollectionID) err = replica.removeCollection(r.req.CollectionID) if err != nil { return err } return nil } func (r *releaseCollectionTask) PostExecute(ctx context.Context) error { return nil } // releasePartitionsTask func (r *releasePartitionsTask) Timestamp() Timestamp { if r.req.Base == nil { log.Warn("nil base req in releasePartitionsTask", zap.Any("collectionID", r.req.CollectionID)) return 0 } return r.req.Base.Timestamp } func (r *releasePartitionsTask) OnEnqueue() error { if r.req == nil || r.req.Base == nil { r.SetID(rand.Int63n(100000000000)) } else { r.SetID(r.req.Base.MsgID) } return nil } func (r *releasePartitionsTask) PreExecute(ctx context.Context) error { return nil } func (r *releasePartitionsTask) Execute(ctx context.Context) error { log.Debug("Execute release partition task", zap.Any("collectionID", r.req.CollectionID), zap.Any("partitionIDs", r.req.PartitionIDs)) errMsg := "release partitions failed, collectionID = " + strconv.FormatInt(r.req.CollectionID, 10) + ", err = " // sleep to wait for query tasks done const gracefulReleaseTime = 1 time.Sleep(gracefulReleaseTime * time.Second) // get collection from streaming and historical hCol, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID) if err != nil { return err } sCol, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID) if err != nil { return err } log.Debug("start release partition", zap.Any("collectionID", r.req.CollectionID)) // release partitions vChannels := sCol.getVChannels() for _, id := range r.req.PartitionIDs { if _, err := r.node.dataSyncService.getPartitionFlowGraphs(id, vChannels); err == nil { r.node.dataSyncService.removePartitionFlowGraph(id) // remove all tSafes of the target partition for _, channel := range vChannels { log.Debug("Releasing tSafe in releasePartitionTask...", zap.Any("collectionID", r.req.CollectionID), zap.Any("partitionID", id), zap.Any("vChannel", channel), ) // no tSafe in tSafeReplica, don't return error err = r.node.tSafeReplica.removeTSafe(channel) if err != nil { log.Warn(err.Error()) } } } // remove partition from streaming and historical hasPartitionInHistorical := r.node.historical.replica.hasPartition(id) if hasPartitionInHistorical { err := r.node.historical.replica.removePartition(id) if err != nil { // not return, try to release all partitions log.Warn(errMsg + err.Error()) } } hasPartitionInStreaming := r.node.streaming.replica.hasPartition(id) if hasPartitionInStreaming { err := r.node.streaming.replica.removePartition(id) if err != nil { // not return, try to release all partitions log.Warn(errMsg + err.Error()) } } hCol.addReleasedPartition(id) sCol.addReleasedPartition(id) } pids, err := r.node.historical.replica.getPartitionIDs(r.req.CollectionID) if err != nil { return err } log.Debug("start release history pids", zap.Any("pids", pids)) if len(pids) == 0 && hCol.getLoadType() == loadTypePartition { r.node.dataSyncService.removeCollectionDeltaFlowGraph(r.req.CollectionID) vChannels := hCol.getVDeltaChannels() for _, channel := range vChannels { log.Debug("Releasing tSafe in releasePartitionTask...", zap.Any("collectionID", r.req.CollectionID), zap.Any("vChannel", channel), ) // no tSafe in tSafeReplica, don't return error err = r.node.tSafeReplica.removeTSafe(channel) if err != nil { log.Warn(err.Error()) } } } // release global segment info r.node.historical.removeGlobalSegmentIDsByPartitionIds(r.req.PartitionIDs) log.Debug("Release partition task done", zap.Any("collectionID", r.req.CollectionID), zap.Any("partitionIDs", r.req.PartitionIDs)) return nil } func (r *releasePartitionsTask) PostExecute(ctx context.Context) error { return nil }