diff --git a/internal/querynode/flow_graph_query_node.go b/internal/querynode/flow_graph_query_node.go index fb20aa8442b57fd693eac7e49ff82c35297761db..acdc11055d4f4dedfe6fa4bbfc29054e39814aa8 100644 --- a/internal/querynode/flow_graph_query_node.go +++ b/internal/querynode/flow_graph_query_node.go @@ -21,6 +21,7 @@ import ( "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/flowgraph" + "github.com/milvus-io/milvus/internal/util/mqclient" ) // queryNodeFlowGraph is a TimeTickedFlowGraph in query node @@ -200,6 +201,19 @@ func (q *queryNodeFlowGraph) consumerFlowGraph(channel Channel, subName ConsumeS return nil } +func (q *queryNodeFlowGraph) consumerFlowGraphLatest(channel Channel, subName ConsumeSubName) error { + if q.dmlStream == nil { + return errors.New("null dml message stream in flow graph") + } + q.dmlStream.AsConsumerWithPosition([]string{channel}, subName, mqclient.SubscriptionPositionLatest) + log.Debug("query node flow graph consumes from pChannel", + zap.Any("collectionID", q.collectionID), + zap.Any("channel", channel), + zap.Any("subName", subName), + ) + return nil +} + func (q *queryNodeFlowGraph) seekQueryNodeFlowGraph(position *internalpb.MsgPosition) error { q.dmlStream.AsConsumer([]string{position.ChannelName}, position.MsgGroup) err := q.dmlStream.Seek([]*internalpb.MsgPosition{position}) diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index a9db4cc2915ba9187e54da23a1dd87be08de645e..3d801739d0824acd1a85070a624e8aca793c6adc 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -888,7 +888,7 @@ func genSimpleSegmentLoader(ctx context.Context, historicalReplica ReplicaInterf if err != nil { return nil, err } - return newSegmentLoader(ctx, newMockRootCoord(), newMockIndexCoord(), historicalReplica, streamingReplica, kv), nil + return newSegmentLoader(ctx, newMockRootCoord(), newMockIndexCoord(), historicalReplica, streamingReplica, kv, msgstream.NewPmsFactory()), nil } func genSimpleHistorical(ctx context.Context, tSafeReplica TSafeReplicaInterface) (*historical, error) { diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index b4720024dca42e65424a072dc0f0882da5a844cb..5c37fdf628d31b18cc4744d9e75a92747d2218a0 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -210,7 +210,8 @@ func (node *QueryNode) Init() error { node.indexCoord, node.historical.replica, node.streaming.replica, - node.etcdKV) + node.etcdKV, + node.msFactory) node.statsService = newStatsService(node.queryNodeLoopCtx, node.historical.replica, node.loader.indexLoader.fieldStatsChan, node.msFactory) node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, streamingReplica, historicalReplica, node.tSafeReplica, node.msFactory) diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 0c48648d061940e52ff26cf98645ba617b1d7fb8..f2da9712a26501b501bbecf2a5eeed572a334c58 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -199,7 +199,7 @@ func newQueryNodeMock() *QueryNode { svr.streaming = newStreaming(ctx, streamingReplica, msFactory, etcdKV, tsReplica) svr.dataSyncService = newDataSyncService(ctx, svr.streaming.replica, svr.historical.replica, tsReplica, msFactory) svr.statsService = newStatsService(ctx, svr.historical.replica, nil, msFactory) - svr.loader = newSegmentLoader(ctx, nil, nil, svr.historical.replica, svr.streaming.replica, etcdKV) + svr.loader = newSegmentLoader(ctx, nil, nil, svr.historical.replica, svr.streaming.replica, etcdKV, msgstream.NewPmsFactory()) svr.etcdKV = etcdKV return svr diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 9383c731c79bac17481cad392eb6ccb82da7d29c..6bb89fd102bb97997e597ae80f6e81115ca747df 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -17,6 +17,7 @@ import ( "fmt" "path" "strconv" + "sync" "go.uber.org/zap" @@ -30,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" @@ -46,6 +48,8 @@ type segmentLoader struct { etcdKV *etcdkv.EtcdKV indexLoader *indexLoader + + factory msgstream.Factory } func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, segmentType segmentType) error { @@ -443,6 +447,86 @@ func (loader *segmentLoader) loadDeltaLogs(segment *Segment, deltaLogs []*datapb return nil } +func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collectionID int64, position *internalpb.MsgPosition) error { + log.Debug("from dmlcp load delete", zap.Any("position", position), zap.Any("msg id", position.MsgID)) + stream, err := loader.factory.NewMsgStream(ctx) + if err != nil { + return err + } + pChannelName := rootcoord.ToPhysicalChannel(position.ChannelName) + position.ChannelName = pChannelName + stream.AsReader([]string{pChannelName}) + stream.SeekReaders([]*internalpb.MsgPosition{position}) + + delData := &deleteData{ + deleteIDs: make(map[UniqueID][]int64), + deleteTimestamps: make(map[UniqueID][]Timestamp), + deleteOffset: make(map[UniqueID]int64), + } + log.Debug("start read msg from stream reader") + for { + tsMsg, err := stream.Next(ctx, pChannelName) + if err != nil { + return err + } + if tsMsg == nil { + break + } + log.Debug("receive msg", zap.Any("type", tsMsg.Type())) + + if tsMsg.Type() == commonpb.MsgType_Delete { + dmsg := tsMsg.(*msgstream.DeleteMsg) + if dmsg.CollectionID != collectionID { + continue + } + log.Debug("delete pk", zap.Any("pk", dmsg.PrimaryKeys)) + processDeleteMessages(loader.historicalReplica, dmsg, delData) + } + } + for segmentID, pks := range delData.deleteIDs { + segment, err := loader.historicalReplica.getSegmentByID(segmentID) + if err != nil { + log.Debug(err.Error()) + continue + } + offset := segment.segmentPreDelete(len(pks)) + delData.deleteOffset[segmentID] = offset + } + + wg := sync.WaitGroup{} + for segmentID := range delData.deleteOffset { + wg.Add(1) + go deletePk(loader.historicalReplica, delData, segmentID, &wg) + } + wg.Wait() + return nil +} + +func deletePk(replica ReplicaInterface, deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) { + defer wg.Done() + log.Debug("QueryNode::iNode::delete", zap.Any("SegmentID", segmentID)) + targetSegment, err := replica.getSegmentByID(segmentID) + if err != nil { + log.Error(err.Error()) + return + } + + if targetSegment.segmentType != segmentTypeSealed { + return + } + + ids := deleteData.deleteIDs[segmentID] + timestamps := deleteData.deleteTimestamps[segmentID] + offset := deleteData.deleteOffset[segmentID] + + err = targetSegment.segmentDelete(offset, &ids, ×tamps) + if err != nil { + log.Warn("QueryNode: targetSegmentDelete failed", zap.Error(err)) + return + } + log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID)) +} + // JoinIDPath joins ids to path format. func JoinIDPath(ids ...UniqueID) string { idStr := make([]string, len(ids)) @@ -565,7 +649,8 @@ func newSegmentLoader(ctx context.Context, indexCoord types.IndexCoord, historicalReplica ReplicaInterface, streamingReplica ReplicaInterface, - etcdKV *etcdkv.EtcdKV) *segmentLoader { + etcdKV *etcdkv.EtcdKV, + factory msgstream.Factory) *segmentLoader { option := &minioKV.Option{ Address: Params.MinioEndPoint, AccessKeyID: Params.MinioAccessKeyID, @@ -589,5 +674,7 @@ func newSegmentLoader(ctx context.Context, etcdKV: etcdKV, indexLoader: iLoader, + + factory: factory, } } diff --git a/internal/querynode/task.go b/internal/querynode/task.go index beea33e5e16313856c95777d9bb83c509da04582..57797968cb43872685fdbb95d8f830ba565f8a3e 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -481,7 +481,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error { for _, fg := range nodeFGs { if fg.channel == channel { // use pChannel to consume - err := fg.consumerFlowGraph(VPDeltaChannels[channel], consumeSubName) + err := fg.consumerFlowGraphLatest(VPDeltaChannels[channel], consumeSubName) if err != nil { errMsg := "msgStream consume error :" + err.Error() log.Warn(errMsg) @@ -494,28 +494,9 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error { zap.Any("collectionID", collectionID), zap.Any("toSubChannels", toSubChannels)) - // TODO: seek with check points - /* - // seek channel - for _, pos := range toSeekChannels { - for _, fg := range nodeFGs { - if fg.channel == pos.ChannelName { - pos.MsgGroup = consumeSubName - // use pChannel to seek - pos.ChannelName = VPChannels[fg.channel] - err := fg.seekQueryNodeFlowGraph(pos) - if err != nil { - errMsg := "msgStream seek error :" + err.Error() - log.Warn(errMsg) - return errors.New(errMsg) - } - } - } - } - log.Debug("Seek all channel done", - zap.Any("collectionID", collectionID), - zap.Any("toSeekChannels", toSeekChannels)) - */ + for _, info := range w.req.Infos { + w.node.loader.FromDmlCPLoadDelete(w.ctx, collectionID, info.SeekPosition) + } // start flow graphs err = w.node.dataSyncService.startCollectionDeltaFlowGraph(collectionID, vDeltaChannels) diff --git a/internal/util/mqclient/rmq_client.go b/internal/util/mqclient/rmq_client.go index 371adef40df586df48f4ba2b3b2f6a9891ff5453..ac8ee26d3c742041a313678eb112350f1c476ddb 100644 --- a/internal/util/mqclient/rmq_client.go +++ b/internal/util/mqclient/rmq_client.go @@ -12,6 +12,7 @@ package mqclient import ( + "errors" "strconv" "go.uber.org/zap" @@ -47,7 +48,20 @@ func (rc *rmqClient) CreateProducer(options ProducerOptions) (Producer, error) { //TODO fishpenguin: implementation func (rc *rmqClient) CreateReader(options ReaderOptions) (Reader, error) { - panic("this method has not been implented") + opts := rocksmq.ReaderOptions{ + Topic: options.Topic, + StartMessageID: options.StartMessageID.(*rmqID).messageID, + StartMessageIDInclusive: options.StartMessageIDInclusive, + } + pr, err := rc.client.CreateReader(opts) + if err != nil { + return nil, err + } + if pr == nil { + return nil, errors.New("pulsar is not ready, producer is nil") + } + reader := &rmqReader{r: pr} + return reader, nil } // Subscribe subscribes a consumer in rmq client