From 6c02ae4eb1906a57893dfd92ba188585883f443d Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Sat, 29 May 2021 18:04:30 +0800 Subject: [PATCH] Remove Old msgposition logic (#5487) Signed-off-by: yangxuan --- internal/datanode/collection_replica.go | 87 ++----------------- internal/datanode/collection_replica_test.go | 22 ----- internal/datanode/data_node.go | 82 +++++++++++++---- internal/datanode/data_node_test.go | 9 +- .../datanode/flow_graph_insert_buffer_node.go | 51 +---------- internal/datanode/mock_test.go | 1 + 6 files changed, 84 insertions(+), 168 deletions(-) diff --git a/internal/datanode/collection_replica.go b/internal/datanode/collection_replica.go index 84638bfc5..04512f942 100644 --- a/internal/datanode/collection_replica.go +++ b/internal/datanode/collection_replica.go @@ -36,16 +36,12 @@ type Replica interface { addSegment(segmentID UniqueID, collID UniqueID, partitionID UniqueID, channelName string) error removeSegment(segmentID UniqueID) error hasSegment(segmentID UniqueID) bool - setIsFlushed(segmentID UniqueID) error - setStartPosition(segmentID UniqueID, startPos *internalpb.MsgPosition) error - setEndPosition(segmentID UniqueID, endPos *internalpb.MsgPosition) error updateStatistics(segmentID UniqueID, numRows int64) error getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) getSegmentByID(segmentID UniqueID) (*Segment, error) bufferAutoFlushBinlogPaths(segmentID UniqueID, field2Path map[UniqueID]string) error getBufferPaths(segID UniqueID) (map[UniqueID][]string, error) getChannelName(segID UniqueID) (string, error) - //new msg postions setStartPositions(segmentID UniqueID, startPos []*internalpb.MsgPosition) error setEndPositions(segmentID UniqueID, endPos []*internalpb.MsgPosition) error getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) @@ -59,14 +55,8 @@ type Segment struct { numRows int64 memorySize int64 isNew atomic.Value // bool - isFlushed bool - - createTime Timestamp // not using - endTime Timestamp // not using - startPosition *internalpb.MsgPosition - endPosition *internalpb.MsgPosition // not using - channelName string - field2Paths map[UniqueID][]string // fieldID to binlog paths, only auto-flushed paths will be buffered. + channelName string + field2Paths map[UniqueID][]string // fieldID to binlog paths, only auto-flushed paths will be buffered. } // CollectionSegmentReplica is the data replication of persistent data in datanode. @@ -157,8 +147,6 @@ func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Se } // `addSegment` add a new segment into replica when data node see the segment -// for the first time in insert channels. It sets the startPosition of a segment, and -// flags `isNew=true` func (replica *CollectionSegmentReplica) addSegment( segmentID UniqueID, collID UniqueID, @@ -169,20 +157,12 @@ func (replica *CollectionSegmentReplica) addSegment( defer replica.mu.Unlock() log.Debug("Add Segment", zap.Int64("Segment ID", segmentID)) - position := &internalpb.MsgPosition{ - ChannelName: channelName, - } - seg := &Segment{ - segmentID: segmentID, - collectionID: collID, - partitionID: partitionID, - isFlushed: false, - createTime: 0, - startPosition: position, - endPosition: new(internalpb.MsgPosition), - channelName: channelName, - field2Paths: make(map[UniqueID][]string), + segmentID: segmentID, + collectionID: collID, + partitionID: partitionID, + channelName: channelName, + field2Paths: make(map[UniqueID][]string), } seg.isNew.Store(true) @@ -208,48 +188,6 @@ func (replica *CollectionSegmentReplica) hasSegment(segmentID UniqueID) bool { return ok } -func (replica *CollectionSegmentReplica) setIsFlushed(segmentID UniqueID) error { - replica.mu.RLock() - defer replica.mu.RUnlock() - - if seg, ok := replica.segments[segmentID]; ok { - seg.isFlushed = true - return nil - } - - return fmt.Errorf("There's no segment %v", segmentID) -} - -func (replica *CollectionSegmentReplica) setStartPosition(segmentID UniqueID, startPos *internalpb.MsgPosition) error { - replica.mu.RLock() - defer replica.mu.RUnlock() - - if startPos == nil { - return fmt.Errorf("Nil MsgPosition") - } - - if seg, ok := replica.segments[segmentID]; ok { - seg.startPosition = startPos - return nil - } - return fmt.Errorf("There's no segment %v", segmentID) -} - -func (replica *CollectionSegmentReplica) setEndPosition(segmentID UniqueID, endPos *internalpb.MsgPosition) error { - replica.mu.RLock() - defer replica.mu.RUnlock() - - if endPos == nil { - return fmt.Errorf("Nil MsgPosition") - } - - if seg, ok := replica.segments[segmentID]; ok { - seg.endPosition = endPos - return nil - } - return fmt.Errorf("There's no segment %v", segmentID) -} - // `updateStatistics` updates the number of rows of a segment in replica. func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, numRows int64) error { replica.mu.Lock() @@ -266,8 +204,6 @@ func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, nu } // `getSegmentStatisticsUpdates` gives current segment's statistics updates. -// if the segment's flag `isNew` is true, updates will contain a valid start position. -// if the segment's flag `isFlushed` is true, updates will contain a valid end position. func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) { replica.mu.Lock() defer replica.mu.Unlock() @@ -279,15 +215,6 @@ func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID U NumRows: seg.numRows, } - if seg.isNew.Load() == true { - updates.StartPosition = seg.startPosition - seg.isNew.Store(false) - } - - if seg.isFlushed { - updates.EndPosition = seg.endPosition - } - return updates, nil } return nil, fmt.Errorf("Error, there's no segment %v", segmentID) diff --git a/internal/datanode/collection_replica_test.go b/internal/datanode/collection_replica_test.go index dc7474047..e94697d5b 100644 --- a/internal/datanode/collection_replica_test.go +++ b/internal/datanode/collection_replica_test.go @@ -14,7 +14,6 @@ package datanode import ( "testing" - "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -135,8 +134,6 @@ func TestReplica_Segment(t *testing.T) { assert.NoError(t, err) assert.Equal(t, UniqueID(0), update.SegmentID) assert.Equal(t, int64(100), update.NumRows) - assert.NotNil(t, update.StartPosition) - assert.Nil(t, update.EndPosition) f2p := map[UniqueID]string{ 1: "a", @@ -156,16 +153,8 @@ func TestReplica_Segment(t *testing.T) { assert.ElementsMatch(t, []string{"a", "a"}, r[1]) assert.ElementsMatch(t, []string{"b", "b"}, r[2]) - err = replica.setIsFlushed(0) - assert.NoError(t, err) - err = replica.setStartPosition(0, &internalpb.MsgPosition{}) - assert.NoError(t, err) - err = replica.setEndPosition(0, &internalpb.MsgPosition{}) - assert.NoError(t, err) update, err = replica.getSegmentStatisticsUpdates(0) assert.NoError(t, err) - assert.Nil(t, update.StartPosition) - assert.NotNil(t, update.EndPosition) err = replica.removeSegment(0) assert.NoError(t, err) @@ -180,17 +169,6 @@ func TestReplica_Segment(t *testing.T) { assert.Error(t, err) assert.Nil(t, seg) - err = replica.setIsFlushed(0) - assert.Error(t, err) - err = replica.setStartPosition(0, &internalpb.MsgPosition{}) - assert.Error(t, err) - err = replica.setStartPosition(0, nil) - assert.Error(t, err) - err = replica.setEndPosition(0, &internalpb.MsgPosition{}) - assert.Error(t, err) - err = replica.setEndPosition(0, nil) - assert.Error(t, err) - err = replica.updateStatistics(0, 0) assert.Error(t, err) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 539e0b2f9..0aae869d9 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -141,7 +141,6 @@ func (node *DataNode) Init() error { node.session = sessionutil.NewSession(ctx, Params.MetaRootPath, []string{Params.EtcdAddress}) node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false) - // TODO find DataService & MasterService req := &datapb.RegisterNodeRequest{ Base: &commonpb.MsgBase{ SourceID: node.NodeID, @@ -195,12 +194,13 @@ func (node *DataNode) NewDataSyncService(vchanPair *datapb.VchannelPair) error { } replica := newReplica() + var alloc allocatorInterface = newAllocator(node.masterService) + metaService := newMetaService(node.ctx, replica, node.masterService) flushChan := make(chan *flushMsg, 100) dataSyncService := newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory, vchanPair) // TODO metaService using timestamp in DescribeCollection - metaService := newMetaService(node.ctx, replica, node.masterService) node.vchan2SyncService[vchanPair.GetDmlVchannelName()] = dataSyncService node.vchan2FlushCh[vchanPair.GetDmlVchannelName()] = flushChan @@ -246,6 +246,7 @@ func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmCha } } +// GetComponentStates will return current state of DataNode func (node *DataNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { log.Debug("DataNode current state", zap.Any("State", node.State.Load())) states := &internalpb.ComponentStates{ @@ -271,29 +272,73 @@ func (node *DataNode) getChannelName(segID UniqueID) string { return "" } +// ReadyToFlush tells wether DataNode is ready for flushing +func (node *DataNode) ReadyToFlush() error { + if node.State.Load().(internalpb.StateCode) != internalpb.StateCode_Healthy { + return errors.New("DataNode not in HEALTHY state") + } + + node.chanMut.RLock() + defer node.chanMut.RUnlock() + if len(node.vchan2SyncService) == 0 && len(node.vchan2FlushCh) == 0 { + // Healthy but Idle + msg := "DataNode HEALTHY but IDLE, please try WatchDmChannels to make it work" + log.Info(msg) + return errors.New(msg) + } + + if len(node.vchan2SyncService) != len(node.vchan2FlushCh) { + // TODO restart + msg := "DataNode HEALTHY but abnormal inside, restarting..." + log.Info(msg) + return errors.New(msg) + } + return nil +} + +func (node *DataNode) getSegmentPositionPair(segmentID UniqueID, chanName string) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) { + node.chanMut.Lock() + defer node.chanMut.Unlock() + sync, ok := node.vchan2SyncService[chanName] + if !ok { + return nil, nil + } + + starts, ends := sync.replica.getSegmentPositions(segmentID) + return starts, ends +} + // FlushSegments packs flush messages into flowgraph through flushChan. // If DataNode receives a valid segment to flush, new flush message for the segment should be ignored. // So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed. +// +// There are 1 precondition: The segmentID in req is in ascending order. func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) { - log.Debug("FlushSegments ...", zap.Int("num", len(req.SegmentIDs))) status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, } + if err := node.ReadyToFlush(); err != nil { + status.Reason = err.Error() + return status, nil + } + + log.Debug("FlushSegments ...", zap.Int("num", len(req.SegmentIDs))) for _, id := range req.SegmentIDs { chanName := node.getChannelName(id) log.Info("vchannel", zap.String("name", chanName)) - if chanName == "" { + if len(chanName) == 0 { status.Reason = fmt.Sprintf("DataNode not find segment %d!", id) return status, errors.New(status.GetReason()) } + node.chanMut.RLock() flushCh, ok := node.vchan2FlushCh[chanName] node.chanMut.RUnlock() if !ok { // TODO restart DataNode or reshape vchan2FlushCh and vchan2SyncService - status.Reason = "DataNode abnormal!" - return status, errors.New(status.GetReason()) + status.Reason = "DataNode abnormal, restarting" + return status, nil } ddlFlushedCh := make(chan []*datapb.DDLBinlogMeta) @@ -310,7 +355,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen waitReceive := func(wg *sync.WaitGroup, flushedCh interface{}, req *datapb.SaveBinlogPathsRequest) { defer wg.Done() - log.Info("Inside waitReceive") + log.Debug("Inside waitReceive") switch Ch := flushedCh.(type) { case chan []*datapb.ID2PathList: select { @@ -324,6 +369,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen } // Modify req with valid dml binlog paths + req.Field2BinlogPaths = meta log.Info("Insert messeges flush done!", zap.Any("Binlog paths", meta)) } @@ -345,6 +391,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen } // Modify req with valid ddl binlog paths + req.DdlBinlogPaths = meta log.Info("Ddl messages flush done!", zap.Any("Binlog paths", meta)) } default: @@ -352,18 +399,19 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen } } - // TODO make a queue for this func - currentSegID := id + req := &datapb.SaveBinlogPathsRequest{ + Base: &commonpb.MsgBase{}, + SegmentID: id, + CollectionID: req.CollectionID, + } + + // TODO Set start_positions and end_positions + + log.Info("Waiting for flush completed", zap.Int64("segmentID", id)) + go func() { flushCh <- flushmsg - log.Info("Waiting for flush completed", zap.Int64("segmentID", currentSegID)) - req := &datapb.SaveBinlogPathsRequest{ - Base: &commonpb.MsgBase{}, - SegmentID: currentSegID, - CollectionID: req.CollectionID, - } - var wg sync.WaitGroup wg.Add(1) go waitReceive(&wg, ddlFlushedCh, req) @@ -371,6 +419,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen go waitReceive(&wg, dmlFlushedCh, req) wg.Wait() + log.Info("Notify DataService BinlogPaths and Positions") status, err := node.dataService.SaveBinlogPaths(node.ctx, req) if err != nil { log.Error("DataNode or DataService abnormal, restarting DataNode") @@ -385,7 +434,6 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen return } - log.Info("Flush Completed", zap.Int64("segmentID", currentSegID)) }() } diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 9c74d9965..3e9159934 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -135,7 +135,7 @@ func TestDataNode(t *testing.T) { SegmentIDs: []int64{0}, } - status, err := node1.FlushSegments(node.ctx, req) + status, err := node1.FlushSegments(node1.ctx, req) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) @@ -183,11 +183,16 @@ func TestDataNode(t *testing.T) { err = ddMsgStream.Broadcast(&timeTickMsgPack) assert.NoError(t, err) + err = insertMsgStream.Broadcast(&timeTickMsgPack) + assert.NoError(t, err) + err = ddMsgStream.Broadcast(&timeTickMsgPack) + assert.NoError(t, err) + _, err = sync.replica.getSegmentByID(0) assert.NoError(t, err) defer func() { - node1.ctx.Done() + <-node1.ctx.Done() node1.Stop() }() }) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 402e250c6..d8514fb27 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -139,28 +139,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { log.Error("add segment wrong", zap.Error(err)) } - switch { - case iMsg.startPositions == nil || len(iMsg.startPositions) <= 0: - log.Error("insert Msg StartPosition empty") - default: - segment, err := ibNode.replica.getSegmentByID(currentSegID) - if err != nil { - log.Error("get segment wrong", zap.Error(err)) - } - var startPosition *internalpb.MsgPosition = nil - for _, pos := range iMsg.startPositions { - if pos.ChannelName == segment.channelName { - startPosition = pos - break - } - } - if startPosition == nil { - log.Error("get position wrong", zap.Error(err)) - } else { - ibNode.replica.setStartPosition(currentSegID, startPosition) - } - } - // set msg pack start positions, new design + // set msg pack start positions ibNode.replica.setStartPositions(currentSegID, iMsg.startPositions) } @@ -461,27 +440,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // 1.3 store in buffer ibNode.insertBuffer.insertData[currentSegID] = idata - switch { - case iMsg.endPositions == nil || len(iMsg.endPositions) <= 0: - log.Error("insert Msg EndPosition empty") - default: - segment, err := ibNode.replica.getSegmentByID(currentSegID) - if err != nil { - log.Error("get segment wrong", zap.Error(err)) - } - var endPosition *internalpb.MsgPosition = nil - for _, pos := range iMsg.endPositions { - if pos.ChannelName == segment.channelName { - endPosition = pos - } - } - if endPosition == nil { - log.Error("get position wrong", zap.Error(err)) - } - ibNode.replica.setEndPosition(currentSegID, endPosition) - } - - // store current startPositions as Segment->EndPostion + // store current endPositions as Segment->EndPostion ibNode.replica.setEndPositions(currentSegID, iMsg.endPositions) } @@ -717,7 +676,6 @@ func (ibNode *insertBufferNode) completeFlush(segID UniqueID, wait <-chan map[Un dmlFlushedCh <- binlogPaths log.Debug(".. Segment flush completed ..") - ibNode.replica.setIsFlushed(segID) ibNode.updateSegStatistics([]UniqueID{segID}) } @@ -870,8 +828,7 @@ func newInsertBufferNode(ctx context.Context, replica Replica, factory msgstream segmentStatisticsStream: segStatisticsMsgStream, completeFlushStream: completeFlushStream, replica: replica, - // flushMeta: flushMeta, - flushMap: sync.Map{}, - idAllocator: idAllocator, + flushMap: sync.Map{}, + idAllocator: idAllocator, } } diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index d2642b2c6..22ee22998 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -110,6 +110,7 @@ func newHEALTHDataNodeMock(dmChannelName, ddChannelName string) *DataNode { DdlPosition: &datapb.PositionPair{}, DmlPosition: &datapb.PositionPair{}, } + node.Start() _ = node.NewDataSyncService(vpair) -- GitLab