From 0c362a8831b389e0d96b4f8d8e394c12334e1824 Mon Sep 17 00:00:00 2001 From: neza2017 Date: Sat, 5 Jun 2021 18:18:34 +0800 Subject: [PATCH] fix bug and add unit test for insert buffer (#5630) Signed-off-by: yefu.chen --- internal/datanode/collection_replica.go | 12 +- .../datanode/flow_graph_insert_buffer_node.go | 41 +--- .../flow_graph_insert_buffer_node_test.go | 176 ++++++++++++++++++ 3 files changed, 191 insertions(+), 38 deletions(-) diff --git a/internal/datanode/collection_replica.go b/internal/datanode/collection_replica.go index 4f9acf383..c2470b307 100644 --- a/internal/datanode/collection_replica.go +++ b/internal/datanode/collection_replica.go @@ -46,7 +46,7 @@ type Replica interface { setEndPositions(segmentID UniqueID, endPos []*internalpb.MsgPosition) error getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) setSegmentCheckPoint(segID UniqueID) - listOpenSegmentCheckPoint() map[UniqueID]internalpb.MsgPosition + listOpenSegmentCheckPointAndNumRows() (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) removeSegmentCheckPoint(segID UniqueID) } @@ -329,10 +329,16 @@ func (replica *CollectionSegmentReplica) setSegmentCheckPoint(segID UniqueID) { } replica.openSegmentCheckPoint[segID] = *ep[0] } -func (replica *CollectionSegmentReplica) listOpenSegmentCheckPoint() map[UniqueID]internalpb.MsgPosition { +func (replica *CollectionSegmentReplica) listOpenSegmentCheckPointAndNumRows() (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) { replica.posMu.Lock() defer replica.posMu.Unlock() - return replica.openSegmentCheckPoint + r1 := make(map[UniqueID]internalpb.MsgPosition) + r2 := make(map[UniqueID]int64) + for k, v := range replica.openSegmentCheckPoint { + r1[k] = v + r2[k] = replica.segments[k].numRows + } + return r1, r2 } func (replica *CollectionSegmentReplica) removeSegmentCheckPoint(segID UniqueID) { diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 925345c13..6fcdac488 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -504,21 +504,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { log.Debug("segment is empty") continue } - fu.openSegCheckpoints = ibNode.replica.listOpenSegmentCheckPoint() - fu.numRows = make(map[UniqueID]int64) - for k := range fu.openSegCheckpoints { - segStat, err := ibNode.replica.getSegmentStatisticsUpdates(k) - if err != nil { - log.Debug("getSegmentStatisticsUpdates failed", zap.Error(err)) - fu.numRows = nil - break - } - fu.numRows[k] = segStat.NumRows - } - if fu.numRows == nil { - log.Debug("failed on get segment num rows") - break - } + fu.openSegCheckpoints, fu.numRows = ibNode.replica.listOpenSegmentCheckPointAndNumRows() fu.flushed = false if err := ibNode.dsSaveBinlog(&fu); err != nil { log.Debug("data service save bin log path failed", zap.Error(err)) @@ -532,32 +518,18 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { currentSegID := fmsg.segmentID log.Debug(". Receiving flush message", zap.Int64("segmentID", currentSegID)) - checkPoints := ibNode.replica.listOpenSegmentCheckPoint() - numRows := make(map[UniqueID]int64) - for k := range checkPoints { - segStat, err := ibNode.replica.getSegmentStatisticsUpdates(k) - if err != nil { - log.Debug("getSegmentStatisticsUpdates failed", zap.Error(err)) - numRows = nil - break - } - numRows[k] = segStat.NumRows - } - if numRows == nil { - log.Debug("failed on get segment num rows") - break - } - if ibNode.insertBuffer.size(currentSegID) <= 0 { log.Debug(".. Buffer empty ...") + c, n := ibNode.replica.listOpenSegmentCheckPointAndNumRows() ibNode.dsSaveBinlog(&autoFlushUnit{ collID: fmsg.collectionID, segID: currentSegID, - numRows: numRows, + numRows: n, field2Path: map[UniqueID]string{}, - openSegCheckpoints: checkPoints, + openSegCheckpoints: c, flushed: true, }) + ibNode.replica.removeSegmentCheckPoint(fmsg.segmentID) fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}} } else { log.Debug(".. Buffer not empty, flushing ..") @@ -601,8 +573,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { fu := <-finishCh close(finishCh) if fu.field2Path != nil { - fu.numRows = numRows - fu.openSegCheckpoints = checkPoints + fu.openSegCheckpoints, fu.numRows = ibNode.replica.listOpenSegmentCheckPointAndNumRows() fu.flushed = true if ibNode.dsSaveBinlog(&fu) != nil { log.Debug("data service save bin log path failed", zap.Error(err)) diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 2b9632a36..583bc87d8 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -226,3 +226,179 @@ func genCollectionMeta(collectionID UniqueID, collectionName string) *etcdpb.Col } return &collection } + +func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + testPath := "/test/datanode/root/meta" + err := clearEtcd(testPath) + require.NoError(t, err) + Params.MetaRootPath = testPath + + Factory := &MetaFactory{} + collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1") + dataFactory := NewDataFactory() + + colRep := &CollectionSegmentReplica{ + segments: make(map[UniqueID]*Segment), + collections: make(map[UniqueID]*Collection), + startPositions: make(map[UniqueID][]*internalpb.MsgPosition), + endPositions: make(map[UniqueID][]*internalpb.MsgPosition), + openSegmentCheckPoint: make(map[UniqueID]internalpb.MsgPosition), + } + err = colRep.addCollection(collMeta.ID, collMeta.Schema) + require.NoError(t, err) + err = colRep.addSegment(1, collMeta.ID, 0, Params.InsertChannelNames[0]) + require.NoError(t, err) + + msFactory := msgstream.NewPmsFactory() + m := map[string]interface{}{ + "receiveBufSize": 1024, + "pulsarAddress": Params.PulsarAddress, + "pulsarBufSize": 1024} + err = msFactory.SetParams(m) + assert.Nil(t, err) + + flushUnit := []autoFlushUnit{} + saveBinlog := func(fu *autoFlushUnit) error { + flushUnit = append(flushUnit, *fu) + return nil + } + + flushChan := make(chan *flushMsg, 100) + iBNode := newInsertBufferNode(ctx, colRep, msFactory, NewAllocatorFactory(), flushChan, saveBinlog) + + inMsg := genInsertMsg() + inMsg.insertMessages = dataFactory.GetMsgStreamInsertMsgs(100) + inMsg.insertMessages = append(inMsg.insertMessages, dataFactory.GetMsgStreamInsertMsgs(32000)...) + for i := range inMsg.insertMessages { + inMsg.insertMessages[i].SegmentID = int64(i%2) + 1 + } + inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 123}} + + var iMsg flowgraph.Msg = &inMsg + iBNode.Operate([]flowgraph.Msg{iMsg}) + assert.Equal(t, len(colRep.endPositions), 2) + assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(123)) + assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(123)) + assert.Equal(t, len(colRep.openSegmentCheckPoint), 0) + assert.Equal(t, len(iBNode.insertBuffer.insertData), 2) + assert.Equal(t, iBNode.insertBuffer.size(1), int32(50+16000)) + assert.Equal(t, iBNode.insertBuffer.size(2), int32(50+16000)) + + for i := range inMsg.insertMessages { + inMsg.insertMessages[i].SegmentID = int64(i%2) + 2 + } + inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 234}} + iBNode.Operate([]flowgraph.Msg{iMsg}) + assert.Equal(t, len(colRep.endPositions), 3) + assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(123)) + assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234)) + assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234)) + assert.Equal(t, len(colRep.openSegmentCheckPoint), 1) + assert.Equal(t, colRep.openSegmentCheckPoint[2].Timestamp, Timestamp(234)) + assert.Equal(t, len(flushUnit), 1) + assert.Equal(t, flushUnit[0].segID, int64(2)) + assert.Equal(t, len(flushUnit[0].numRows), 1) + assert.Equal(t, flushUnit[0].numRows[2], int64(100+32000)) + assert.Equal(t, len(flushUnit[0].openSegCheckpoints), 1) + assert.Equal(t, flushUnit[0].openSegCheckpoints[2].Timestamp, Timestamp(234)) + assert.Greater(t, len(flushUnit[0].field2Path), 0) + assert.False(t, flushUnit[0].flushed) + assert.Equal(t, len(iBNode.insertBuffer.insertData), 2) + assert.Equal(t, iBNode.insertBuffer.size(1), int32(50+16000)) + assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000)) + + for i := range inMsg.insertMessages { + inMsg.insertMessages[i].SegmentID = 1 + } + inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}} + iBNode.Operate([]flowgraph.Msg{iMsg}) + assert.Equal(t, len(colRep.endPositions), 3) + assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345)) + assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234)) + assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234)) + assert.Equal(t, len(colRep.openSegmentCheckPoint), 2) + assert.Equal(t, colRep.openSegmentCheckPoint[1].Timestamp, Timestamp(345)) + assert.Equal(t, colRep.openSegmentCheckPoint[2].Timestamp, Timestamp(234)) + assert.Equal(t, len(flushUnit), 2) + assert.Equal(t, flushUnit[1].segID, int64(1)) + assert.Equal(t, len(flushUnit[1].numRows), 2) + assert.Equal(t, flushUnit[1].numRows[2], int64(100+32000)) + assert.Equal(t, flushUnit[1].numRows[1], int64(50+16000+100+32000)) + assert.Equal(t, len(flushUnit[1].openSegCheckpoints), 2) + assert.Equal(t, flushUnit[1].openSegCheckpoints[1].Timestamp, Timestamp(345)) + assert.Equal(t, flushUnit[1].openSegCheckpoints[2].Timestamp, Timestamp(234)) + assert.False(t, flushUnit[1].flushed) + assert.Greater(t, len(flushUnit[1].field2Path), 0) + assert.Equal(t, len(iBNode.insertBuffer.insertData), 1) + assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000)) + + dmlFlushedCh := make(chan []*datapb.ID2PathList, 1) + + flushChan <- &flushMsg{ + msgID: 3, + timestamp: 456, + segmentID: UniqueID(1), + collectionID: UniqueID(1), + dmlFlushedCh: dmlFlushedCh, + } + + inMsg.insertMessages = []*msgstream.InsertMsg{} + inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 456}} + iBNode.Operate([]flowgraph.Msg{iMsg}) + + flushSeg := <-dmlFlushedCh + assert.NotNil(t, flushSeg) + assert.Equal(t, len(flushSeg), 1) + assert.Equal(t, flushSeg[0].ID, int64(1)) + assert.NotNil(t, flushSeg[0].Paths) + assert.Equal(t, len(colRep.endPositions), 3) + assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345)) + assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234)) + assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234)) + assert.Equal(t, len(colRep.openSegmentCheckPoint), 1) + assert.Equal(t, colRep.openSegmentCheckPoint[2].Timestamp, Timestamp(234)) + assert.Equal(t, len(flushUnit), 3) + assert.Equal(t, flushUnit[2].segID, int64(1)) + assert.Equal(t, len(flushUnit[2].numRows), 2) + assert.Equal(t, flushUnit[2].numRows[2], int64(100+32000)) + assert.Equal(t, flushUnit[2].numRows[1], int64(50+16000+100+32000)) + t.Log(flushUnit[2].openSegCheckpoints) + assert.Equal(t, len(flushUnit[2].openSegCheckpoints), 2) + assert.Equal(t, flushUnit[2].openSegCheckpoints[1].Timestamp, Timestamp(345)) + assert.Equal(t, flushUnit[2].openSegCheckpoints[2].Timestamp, Timestamp(234)) + assert.Equal(t, len(flushUnit[2].field2Path), 0) + assert.NotNil(t, flushUnit[2].field2Path) + assert.True(t, flushUnit[2].flushed) + + flushChan <- &flushMsg{ + msgID: 4, + timestamp: 567, + segmentID: UniqueID(3), + collectionID: UniqueID(3), + dmlFlushedCh: dmlFlushedCh, + } + iBNode.Operate([]flowgraph.Msg{iMsg}) + flushSeg = <-dmlFlushedCh + assert.NotNil(t, flushSeg) + assert.Equal(t, len(flushSeg), 1) + assert.Equal(t, flushSeg[0].ID, int64(3)) + assert.NotNil(t, flushSeg[0].Paths) + assert.Equal(t, len(colRep.endPositions), 3) + assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345)) + assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234)) + assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234)) + assert.Equal(t, len(colRep.openSegmentCheckPoint), 1) + assert.Equal(t, colRep.openSegmentCheckPoint[2].Timestamp, Timestamp(234)) + assert.Equal(t, len(flushUnit), 4) + assert.Equal(t, flushUnit[3].segID, int64(3)) + assert.Equal(t, len(flushUnit[3].numRows), 2) + assert.Equal(t, flushUnit[3].numRows[3], int64(50+16000)) + assert.Equal(t, flushUnit[3].numRows[2], int64(100+32000)) + assert.Greater(t, len(flushUnit[3].field2Path), 0) + assert.NotNil(t, flushUnit[3].field2Path) + assert.True(t, flushUnit[3].flushed) + assert.Equal(t, len(iBNode.insertBuffer.insertData), 0) +} -- GitLab