From 3c70675313a414760971dfacb1a69eb97f05f769 Mon Sep 17 00:00:00 2001 From: neza2017 Date: Fri, 4 Jun 2021 19:20:34 +0800 Subject: [PATCH] data node, save binlog to minIO, and let data service save these meta (#5618) Signed-off-by: yefu.chen --- internal/datanode/data_sync_service.go | 43 ++++++++++++++- .../datanode/flow_graph_insert_buffer_node.go | 54 +++++++++++++------ 2 files changed, 79 insertions(+), 18 deletions(-) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 306390639..71efc7267 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -13,10 +13,13 @@ package datanode import ( "context" + "fmt" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" + "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/flowgraph" "go.uber.org/zap" @@ -30,6 +33,7 @@ type dataSyncService struct { idAllocator allocatorInterface msFactory msgstream.Factory collectionID UniqueID + dataService types.DataService } func newDataSyncService(ctx context.Context, @@ -83,6 +87,43 @@ func (dsService *dataSyncService) initNodes(vchanPair *datapb.VchannelInfo) { panic(err) } + saveBinlog := func(fu *autoFlushUnit) error { + id2path := []*datapb.ID2PathList{} + checkPoints := []*datapb.CheckPoint{} + for k, v := range fu.field2Path { + id2path = append(id2path, &datapb.ID2PathList{ID: k, Paths: []string{v}}) + } + for k, v := range fu.openSegCheckpoints { + v := v + checkPoints = append(checkPoints, &datapb.CheckPoint{ + SegmentID: k, + NumOfRows: fu.numRows[k], + Position: &v, + }) + } + + req := &datapb.SaveBinlogPathsRequest{ + Base: &commonpb.MsgBase{ + MsgType: 0, //TOD msg type + MsgID: 0, //TODO,msg id + Timestamp: 0, //TODO, time stamp + SourceID: Params.NodeID, + }, + SegmentID: fu.segID, + CollectionID: 0, //TODO + Field2BinlogPaths: id2path, + CheckPoints: checkPoints, + Flushed: fu.flushed, + } + rsp, err := dsService.dataService.SaveBinlogPaths(dsService.ctx, req) + if err != nil { + return fmt.Errorf("data service save bin log path failed, err = %w", err) + } + if rsp.ErrorCode != commonpb.ErrorCode_Success { + return fmt.Errorf("data service save bin log path failed, reason = %s", rsp.Reason) + } + return nil + } var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.msFactory, vchanPair.GetChannelName(), vchanPair.GetCheckPoints()) var ddNode Node = newDDNode() var insertBufferNode Node = newInsertBufferNode( @@ -91,7 +132,7 @@ func (dsService *dataSyncService) initNodes(vchanPair *datapb.VchannelInfo) { dsService.msFactory, dsService.idAllocator, dsService.flushChan, - nil, //TODO,=================== call data service save binlog ========= + saveBinlog, ) dsService.fg.AddNode(dmStreamNode) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 0883ca3a0..925345c13 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -64,10 +64,11 @@ type insertBufferNode struct { } type autoFlushUnit struct { + collID UniqueID segID UniqueID - numRows int64 field2Path map[UniqueID]string openSegCheckpoints map[UniqueID]internalpb.MsgPosition + numRows map[UniqueID]int64 flushed bool } @@ -503,15 +504,23 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { log.Debug("segment is empty") continue } - segSta, err := ibNode.replica.getSegmentStatisticsUpdates(fu.segID) - if err != nil { - log.Debug("getSegmentStatisticsUpdates failed", zap.Error(err)) - continue - } - fu.numRows = segSta.NumRows fu.openSegCheckpoints = ibNode.replica.listOpenSegmentCheckPoint() + fu.numRows = make(map[UniqueID]int64) + for k := range fu.openSegCheckpoints { + segStat, err := ibNode.replica.getSegmentStatisticsUpdates(k) + if err != nil { + log.Debug("getSegmentStatisticsUpdates failed", zap.Error(err)) + fu.numRows = nil + break + } + fu.numRows[k] = segStat.NumRows + } + if fu.numRows == nil { + log.Debug("failed on get segment num rows") + break + } fu.flushed = false - if ibNode.dsSaveBinlog(&fu) != nil { + if err := ibNode.dsSaveBinlog(&fu); err != nil { log.Debug("data service save bin log path failed", zap.Error(err)) } } @@ -523,19 +532,30 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { currentSegID := fmsg.segmentID log.Debug(". Receiving flush message", zap.Int64("segmentID", currentSegID)) - segSta, err := ibNode.replica.getSegmentStatisticsUpdates(currentSegID) - if err != nil { - log.Debug("getSegmentStatisticsUpdates failed", zap.Error(err)) + checkPoints := ibNode.replica.listOpenSegmentCheckPoint() + numRows := make(map[UniqueID]int64) + for k := range checkPoints { + segStat, err := ibNode.replica.getSegmentStatisticsUpdates(k) + if err != nil { + log.Debug("getSegmentStatisticsUpdates failed", zap.Error(err)) + numRows = nil + break + } + numRows[k] = segStat.NumRows + } + if numRows == nil { + log.Debug("failed on get segment num rows") break } if ibNode.insertBuffer.size(currentSegID) <= 0 { log.Debug(".. Buffer empty ...") ibNode.dsSaveBinlog(&autoFlushUnit{ + collID: fmsg.collectionID, segID: currentSegID, - numRows: segSta.NumRows, - field2Path: nil, - openSegCheckpoints: ibNode.replica.listOpenSegmentCheckPoint(), + numRows: numRows, + field2Path: map[UniqueID]string{}, + openSegCheckpoints: checkPoints, flushed: true, }) fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}} @@ -581,8 +601,8 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { fu := <-finishCh close(finishCh) if fu.field2Path != nil { - fu.numRows = segSta.NumRows - fu.openSegCheckpoints = ibNode.replica.listOpenSegmentCheckPoint() + fu.numRows = numRows + fu.openSegCheckpoints = checkPoints fu.flushed = true if ibNode.dsSaveBinlog(&fu) != nil { log.Debug("data service save bin log path failed", zap.Error(err)) @@ -703,7 +723,7 @@ func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID Un } replica.setSegmentCheckPoint(segID) - flushUnit <- autoFlushUnit{segID: segID, field2Path: field2Path} + flushUnit <- autoFlushUnit{collID: collID, segID: segID, field2Path: field2Path} clearFn(true) } -- GitLab