未验证 提交 116a503b 编写于 作者: G godchen 提交者: GitHub

From dml cp load delete msg (#12144)

Signed-off-by: Ngodchen <qingxiang.chen@zilliz.com>
上级 bf391f24
......@@ -21,6 +21,7 @@ import (
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/mqclient"
)
// queryNodeFlowGraph is a TimeTickedFlowGraph in query node
......@@ -200,6 +201,19 @@ func (q *queryNodeFlowGraph) consumerFlowGraph(channel Channel, subName ConsumeS
return nil
}
func (q *queryNodeFlowGraph) consumerFlowGraphLatest(channel Channel, subName ConsumeSubName) error {
if q.dmlStream == nil {
return errors.New("null dml message stream in flow graph")
}
q.dmlStream.AsConsumerWithPosition([]string{channel}, subName, mqclient.SubscriptionPositionLatest)
log.Debug("query node flow graph consumes from pChannel",
zap.Any("collectionID", q.collectionID),
zap.Any("channel", channel),
zap.Any("subName", subName),
)
return nil
}
func (q *queryNodeFlowGraph) seekQueryNodeFlowGraph(position *internalpb.MsgPosition) error {
q.dmlStream.AsConsumer([]string{position.ChannelName}, position.MsgGroup)
err := q.dmlStream.Seek([]*internalpb.MsgPosition{position})
......
......@@ -888,7 +888,7 @@ func genSimpleSegmentLoader(ctx context.Context, historicalReplica ReplicaInterf
if err != nil {
return nil, err
}
return newSegmentLoader(ctx, newMockRootCoord(), newMockIndexCoord(), historicalReplica, streamingReplica, kv), nil
return newSegmentLoader(ctx, newMockRootCoord(), newMockIndexCoord(), historicalReplica, streamingReplica, kv, msgstream.NewPmsFactory()), nil
}
func genSimpleHistorical(ctx context.Context, tSafeReplica TSafeReplicaInterface) (*historical, error) {
......
......@@ -210,7 +210,8 @@ func (node *QueryNode) Init() error {
node.indexCoord,
node.historical.replica,
node.streaming.replica,
node.etcdKV)
node.etcdKV,
node.msFactory)
node.statsService = newStatsService(node.queryNodeLoopCtx, node.historical.replica, node.loader.indexLoader.fieldStatsChan, node.msFactory)
node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, streamingReplica, historicalReplica, node.tSafeReplica, node.msFactory)
......
......@@ -199,7 +199,7 @@ func newQueryNodeMock() *QueryNode {
svr.streaming = newStreaming(ctx, streamingReplica, msFactory, etcdKV, tsReplica)
svr.dataSyncService = newDataSyncService(ctx, svr.streaming.replica, svr.historical.replica, tsReplica, msFactory)
svr.statsService = newStatsService(ctx, svr.historical.replica, nil, msFactory)
svr.loader = newSegmentLoader(ctx, nil, nil, svr.historical.replica, svr.streaming.replica, etcdKV)
svr.loader = newSegmentLoader(ctx, nil, nil, svr.historical.replica, svr.streaming.replica, etcdKV, msgstream.NewPmsFactory())
svr.etcdKV = etcdKV
return svr
......
......@@ -17,6 +17,7 @@ import (
"fmt"
"path"
"strconv"
"sync"
"go.uber.org/zap"
......@@ -30,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
......@@ -46,6 +48,8 @@ type segmentLoader struct {
etcdKV *etcdkv.EtcdKV
indexLoader *indexLoader
factory msgstream.Factory
}
func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, segmentType segmentType) error {
......@@ -443,6 +447,86 @@ func (loader *segmentLoader) loadDeltaLogs(segment *Segment, deltaLogs []*datapb
return nil
}
func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collectionID int64, position *internalpb.MsgPosition) error {
log.Debug("from dmlcp load delete", zap.Any("position", position), zap.Any("msg id", position.MsgID))
stream, err := loader.factory.NewMsgStream(ctx)
if err != nil {
return err
}
pChannelName := rootcoord.ToPhysicalChannel(position.ChannelName)
position.ChannelName = pChannelName
stream.AsReader([]string{pChannelName})
stream.SeekReaders([]*internalpb.MsgPosition{position})
delData := &deleteData{
deleteIDs: make(map[UniqueID][]int64),
deleteTimestamps: make(map[UniqueID][]Timestamp),
deleteOffset: make(map[UniqueID]int64),
}
log.Debug("start read msg from stream reader")
for {
tsMsg, err := stream.Next(ctx, pChannelName)
if err != nil {
return err
}
if tsMsg == nil {
break
}
log.Debug("receive msg", zap.Any("type", tsMsg.Type()))
if tsMsg.Type() == commonpb.MsgType_Delete {
dmsg := tsMsg.(*msgstream.DeleteMsg)
if dmsg.CollectionID != collectionID {
continue
}
log.Debug("delete pk", zap.Any("pk", dmsg.PrimaryKeys))
processDeleteMessages(loader.historicalReplica, dmsg, delData)
}
}
for segmentID, pks := range delData.deleteIDs {
segment, err := loader.historicalReplica.getSegmentByID(segmentID)
if err != nil {
log.Debug(err.Error())
continue
}
offset := segment.segmentPreDelete(len(pks))
delData.deleteOffset[segmentID] = offset
}
wg := sync.WaitGroup{}
for segmentID := range delData.deleteOffset {
wg.Add(1)
go deletePk(loader.historicalReplica, delData, segmentID, &wg)
}
wg.Wait()
return nil
}
func deletePk(replica ReplicaInterface, deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) {
defer wg.Done()
log.Debug("QueryNode::iNode::delete", zap.Any("SegmentID", segmentID))
targetSegment, err := replica.getSegmentByID(segmentID)
if err != nil {
log.Error(err.Error())
return
}
if targetSegment.segmentType != segmentTypeSealed {
return
}
ids := deleteData.deleteIDs[segmentID]
timestamps := deleteData.deleteTimestamps[segmentID]
offset := deleteData.deleteOffset[segmentID]
err = targetSegment.segmentDelete(offset, &ids, &timestamps)
if err != nil {
log.Warn("QueryNode: targetSegmentDelete failed", zap.Error(err))
return
}
log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID))
}
// JoinIDPath joins ids to path format.
func JoinIDPath(ids ...UniqueID) string {
idStr := make([]string, len(ids))
......@@ -565,7 +649,8 @@ func newSegmentLoader(ctx context.Context,
indexCoord types.IndexCoord,
historicalReplica ReplicaInterface,
streamingReplica ReplicaInterface,
etcdKV *etcdkv.EtcdKV) *segmentLoader {
etcdKV *etcdkv.EtcdKV,
factory msgstream.Factory) *segmentLoader {
option := &minioKV.Option{
Address: Params.MinioEndPoint,
AccessKeyID: Params.MinioAccessKeyID,
......@@ -589,5 +674,7 @@ func newSegmentLoader(ctx context.Context,
etcdKV: etcdKV,
indexLoader: iLoader,
factory: factory,
}
}
......@@ -481,7 +481,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
for _, fg := range nodeFGs {
if fg.channel == channel {
// use pChannel to consume
err := fg.consumerFlowGraph(VPDeltaChannels[channel], consumeSubName)
err := fg.consumerFlowGraphLatest(VPDeltaChannels[channel], consumeSubName)
if err != nil {
errMsg := "msgStream consume error :" + err.Error()
log.Warn(errMsg)
......@@ -494,28 +494,9 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
zap.Any("collectionID", collectionID),
zap.Any("toSubChannels", toSubChannels))
// TODO: seek with check points
/*
// seek channel
for _, pos := range toSeekChannels {
for _, fg := range nodeFGs {
if fg.channel == pos.ChannelName {
pos.MsgGroup = consumeSubName
// use pChannel to seek
pos.ChannelName = VPChannels[fg.channel]
err := fg.seekQueryNodeFlowGraph(pos)
if err != nil {
errMsg := "msgStream seek error :" + err.Error()
log.Warn(errMsg)
return errors.New(errMsg)
}
}
}
}
log.Debug("Seek all channel done",
zap.Any("collectionID", collectionID),
zap.Any("toSeekChannels", toSeekChannels))
*/
for _, info := range w.req.Infos {
w.node.loader.FromDmlCPLoadDelete(w.ctx, collectionID, info.SeekPosition)
}
// start flow graphs
err = w.node.dataSyncService.startCollectionDeltaFlowGraph(collectionID, vDeltaChannels)
......
......@@ -12,6 +12,7 @@
package mqclient
import (
"errors"
"strconv"
"go.uber.org/zap"
......@@ -47,7 +48,20 @@ func (rc *rmqClient) CreateProducer(options ProducerOptions) (Producer, error) {
//TODO fishpenguin: implementation
func (rc *rmqClient) CreateReader(options ReaderOptions) (Reader, error) {
panic("this method has not been implented")
opts := rocksmq.ReaderOptions{
Topic: options.Topic,
StartMessageID: options.StartMessageID.(*rmqID).messageID,
StartMessageIDInclusive: options.StartMessageIDInclusive,
}
pr, err := rc.client.CreateReader(opts)
if err != nil {
return nil, err
}
if pr == nil {
return nil, errors.New("pulsar is not ready, producer is nil")
}
reader := &rmqReader{r: pr}
return reader, nil
}
// Subscribe subscribes a consumer in rmq client
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册