diff --git a/internal/datanode/collection_replica.go b/internal/datanode/collection_replica.go index c2470b30725cb003a6cc94fa6dfb1c8f655f54eb..7bf3a697aa066669e8005feb06bfce514322cc2f 100644 --- a/internal/datanode/collection_replica.go +++ b/internal/datanode/collection_replica.go @@ -45,9 +45,7 @@ type Replica interface { setStartPositions(segmentID UniqueID, startPos []*internalpb.MsgPosition) error setEndPositions(segmentID UniqueID, endPos []*internalpb.MsgPosition) error getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) - setSegmentCheckPoint(segID UniqueID) - listOpenSegmentCheckPointAndNumRows() (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) - removeSegmentCheckPoint(segID UniqueID) + listOpenSegmentCheckPointAndNumRows(segs []UniqueID) (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) } // Segment is the data structure of segments in data node replica. @@ -69,10 +67,9 @@ type CollectionSegmentReplica struct { segments map[UniqueID]*Segment collections map[UniqueID]*Collection - posMu sync.Mutex - startPositions map[UniqueID][]*internalpb.MsgPosition - endPositions map[UniqueID][]*internalpb.MsgPosition - openSegmentCheckPoint map[UniqueID]internalpb.MsgPosition + posMu sync.Mutex + startPositions map[UniqueID][]*internalpb.MsgPosition + endPositions map[UniqueID][]*internalpb.MsgPosition } var _ Replica = &CollectionSegmentReplica{} @@ -82,11 +79,10 @@ func newReplica() Replica { collections := make(map[UniqueID]*Collection) var replica Replica = &CollectionSegmentReplica{ - segments: segments, - collections: collections, - startPositions: make(map[UniqueID][]*internalpb.MsgPosition), - endPositions: make(map[UniqueID][]*internalpb.MsgPosition), - openSegmentCheckPoint: make(map[UniqueID]internalpb.MsgPosition), + segments: segments, + collections: collections, + startPositions: make(map[UniqueID][]*internalpb.MsgPosition), + endPositions: make(map[UniqueID][]*internalpb.MsgPosition), } return replica } @@ -320,29 +316,15 @@ func (replica *CollectionSegmentReplica) getSegmentPositions(segID UniqueID) ([] endPos := replica.endPositions[segID] return startPos, endPos } -func (replica *CollectionSegmentReplica) setSegmentCheckPoint(segID UniqueID) { - replica.posMu.Lock() - defer replica.posMu.Unlock() - ep := replica.endPositions[segID] - if len(ep) != 1 { - panic("msgstream's position should be 1") - } - replica.openSegmentCheckPoint[segID] = *ep[0] -} -func (replica *CollectionSegmentReplica) listOpenSegmentCheckPointAndNumRows() (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) { + +func (replica *CollectionSegmentReplica) listOpenSegmentCheckPointAndNumRows(segs []UniqueID) (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) { replica.posMu.Lock() defer replica.posMu.Unlock() r1 := make(map[UniqueID]internalpb.MsgPosition) r2 := make(map[UniqueID]int64) - for k, v := range replica.openSegmentCheckPoint { - r1[k] = v - r2[k] = replica.segments[k].numRows + for _, seg := range segs { + r1[seg] = *replica.endPositions[seg][0] + r2[seg] = replica.segments[seg].numRows } return r1, r2 } - -func (replica *CollectionSegmentReplica) removeSegmentCheckPoint(segID UniqueID) { - replica.posMu.Lock() - defer replica.posMu.Unlock() - delete(replica.openSegmentCheckPoint, segID) -} diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 6fcdac488d819b51006fa83f06915140ea0e5ca1..f66e7196aaa3ce16db8202762ed9c700922151ef 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -61,6 +61,8 @@ type insertBufferNode struct { segmentStatisticsStream msgstream.MsgStream dsSaveBinlog func(fu *autoFlushUnit) error + openSegList map[UniqueID]bool + openSegLock sync.Mutex } type autoFlushUnit struct { @@ -494,7 +496,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { finishCnt.Add(1) go flushSegment(collMeta, segToFlush, partitionID, collID, - &ibNode.flushMap, ibNode.minIOKV, finishCh, &finishCnt, ibNode.replica, ibNode.idAllocator) + &ibNode.flushMap, ibNode.minIOKV, finishCh, &finishCnt, ibNode, ibNode.idAllocator) } } finishCnt.Wait() @@ -504,7 +506,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { log.Debug("segment is empty") continue } - fu.openSegCheckpoints, fu.numRows = ibNode.replica.listOpenSegmentCheckPointAndNumRows() + fu.openSegCheckpoints, fu.numRows = ibNode.listOpenSegmentCheckPointAndNumRows() fu.flushed = false if err := ibNode.dsSaveBinlog(&fu); err != nil { log.Debug("data service save bin log path failed", zap.Error(err)) @@ -520,7 +522,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if ibNode.insertBuffer.size(currentSegID) <= 0 { log.Debug(".. Buffer empty ...") - c, n := ibNode.replica.listOpenSegmentCheckPointAndNumRows() + c, n := ibNode.listOpenSegmentCheckPointAndNumRows() ibNode.dsSaveBinlog(&autoFlushUnit{ collID: fmsg.collectionID, segID: currentSegID, @@ -529,7 +531,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { openSegCheckpoints: c, flushed: true, }) - ibNode.replica.removeSegmentCheckPoint(fmsg.segmentID) + ibNode.removeSegmentCheckPoint(fmsg.segmentID) fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}} } else { log.Debug(".. Buffer not empty, flushing ..") @@ -569,17 +571,17 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } flushSegment(collMeta, currentSegID, seg.partitionID, seg.collectionID, - &ibNode.flushMap, ibNode.minIOKV, finishCh, nil, ibNode.replica, ibNode.idAllocator) + &ibNode.flushMap, ibNode.minIOKV, finishCh, nil, ibNode, ibNode.idAllocator) fu := <-finishCh close(finishCh) if fu.field2Path != nil { - fu.openSegCheckpoints, fu.numRows = ibNode.replica.listOpenSegmentCheckPointAndNumRows() + fu.openSegCheckpoints, fu.numRows = ibNode.listOpenSegmentCheckPointAndNumRows() fu.flushed = true if ibNode.dsSaveBinlog(&fu) != nil { log.Debug("data service save bin log path failed", zap.Error(err)) } else { // this segment has flushed, so it's not `open segment`, so remove from the check point - ibNode.replica.removeSegmentCheckPoint(fu.segID) + ibNode.removeSegmentCheckPoint(fu.segID) } } fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}} @@ -602,7 +604,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID UniqueID, insertData *sync.Map, kv kv.BaseKV, flushUnit chan<- autoFlushUnit, wgFinish *sync.WaitGroup, - replica Replica, idAllocator allocatorInterface) { + ibNode *insertBufferNode, idAllocator allocatorInterface) { if wgFinish != nil { defer wgFinish.Done() } @@ -693,11 +695,31 @@ func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID Un return } - replica.setSegmentCheckPoint(segID) + ibNode.setSegmentCheckPoint(segID) flushUnit <- autoFlushUnit{collID: collID, segID: segID, field2Path: field2Path} clearFn(true) } +func (ibNode *insertBufferNode) setSegmentCheckPoint(segID UniqueID) { + ibNode.openSegLock.Lock() + defer ibNode.openSegLock.Unlock() + ibNode.openSegList[segID] = true +} +func (ibNode *insertBufferNode) removeSegmentCheckPoint(segID UniqueID) { + ibNode.openSegLock.Lock() + defer ibNode.openSegLock.Unlock() + delete(ibNode.openSegList, segID) +} +func (ibNode *insertBufferNode) listOpenSegmentCheckPointAndNumRows() (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) { + ibNode.openSegLock.Lock() + defer ibNode.openSegLock.Unlock() + segs := make([]UniqueID, 0, len(ibNode.openSegList)) + for k := range ibNode.openSegList { + segs = append(segs, k) + } + return ibNode.replica.listOpenSegmentCheckPointAndNumRows(segs) +} + func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error { msgPack := msgstream.MsgPack{} timeTickMsg := msgstream.TimeTickMsg{ @@ -852,5 +874,6 @@ func newInsertBufferNode( flushChan: flushCh, idAllocator: idAllocator, dsSaveBinlog: saveBinlog, + openSegList: make(map[UniqueID]bool), } } diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 583bc87d8968ce8d4dd9bc7a4dc606425b168409..ee2f77e2f93bd6fcb87631ae8d31907c26a55044 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -121,6 +121,8 @@ func genInsertMsg() insertMsg { } func TestFlushSegment(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() idAllocMock := NewAllocatorFactory(1) mockMinIO := memkv.NewMemoryKV() @@ -154,6 +156,19 @@ func TestFlushSegment(t *testing.T) { } flushMap.Store(segmentID, insertData) + msFactory := msgstream.NewPmsFactory() + m := map[string]interface{}{ + "receiveBufSize": 1024, + "pulsarAddress": Params.PulsarAddress, + "pulsarBufSize": 1024} + err := msFactory.SetParams(m) + assert.Nil(t, err) + flushChan := make(chan *flushMsg, 100) + saveBinlog := func(*autoFlushUnit) error { + return nil + } + ibNode := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog) + flushSegment(collMeta, segmentID, partitionID, @@ -162,7 +177,7 @@ func TestFlushSegment(t *testing.T) { mockMinIO, finishCh, nil, - replica, + ibNode, idAllocMock) fu := <-finishCh @@ -241,11 +256,10 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { dataFactory := NewDataFactory() colRep := &CollectionSegmentReplica{ - segments: make(map[UniqueID]*Segment), - collections: make(map[UniqueID]*Collection), - startPositions: make(map[UniqueID][]*internalpb.MsgPosition), - endPositions: make(map[UniqueID][]*internalpb.MsgPosition), - openSegmentCheckPoint: make(map[UniqueID]internalpb.MsgPosition), + segments: make(map[UniqueID]*Segment), + collections: make(map[UniqueID]*Collection), + startPositions: make(map[UniqueID][]*internalpb.MsgPosition), + endPositions: make(map[UniqueID][]*internalpb.MsgPosition), } err = colRep.addCollection(collMeta.ID, collMeta.Schema) require.NoError(t, err) @@ -282,7 +296,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { assert.Equal(t, len(colRep.endPositions), 2) assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(123)) assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(123)) - assert.Equal(t, len(colRep.openSegmentCheckPoint), 0) + assert.Equal(t, len(iBNode.openSegList), 0) assert.Equal(t, len(iBNode.insertBuffer.insertData), 2) assert.Equal(t, iBNode.insertBuffer.size(1), int32(50+16000)) assert.Equal(t, iBNode.insertBuffer.size(2), int32(50+16000)) @@ -296,8 +310,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(123)) assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234)) assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234)) - assert.Equal(t, len(colRep.openSegmentCheckPoint), 1) - assert.Equal(t, colRep.openSegmentCheckPoint[2].Timestamp, Timestamp(234)) + assert.Equal(t, iBNode.openSegList, map[UniqueID]bool{2: true}) assert.Equal(t, len(flushUnit), 1) assert.Equal(t, flushUnit[0].segID, int64(2)) assert.Equal(t, len(flushUnit[0].numRows), 1) @@ -319,9 +332,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345)) assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234)) assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234)) - assert.Equal(t, len(colRep.openSegmentCheckPoint), 2) - assert.Equal(t, colRep.openSegmentCheckPoint[1].Timestamp, Timestamp(345)) - assert.Equal(t, colRep.openSegmentCheckPoint[2].Timestamp, Timestamp(234)) + assert.Equal(t, iBNode.openSegList, map[UniqueID]bool{1: true, 2: true}) assert.Equal(t, len(flushUnit), 2) assert.Equal(t, flushUnit[1].segID, int64(1)) assert.Equal(t, len(flushUnit[1].numRows), 2) @@ -358,8 +369,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345)) assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234)) assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234)) - assert.Equal(t, len(colRep.openSegmentCheckPoint), 1) - assert.Equal(t, colRep.openSegmentCheckPoint[2].Timestamp, Timestamp(234)) + assert.Equal(t, iBNode.openSegList, map[UniqueID]bool{2: true}) assert.Equal(t, len(flushUnit), 3) assert.Equal(t, flushUnit[2].segID, int64(1)) assert.Equal(t, len(flushUnit[2].numRows), 2) @@ -390,8 +400,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345)) assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234)) assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234)) - assert.Equal(t, len(colRep.openSegmentCheckPoint), 1) - assert.Equal(t, colRep.openSegmentCheckPoint[2].Timestamp, Timestamp(234)) + assert.Equal(t, iBNode.openSegList, map[UniqueID]bool{2: true}) assert.Equal(t, len(flushUnit), 4) assert.Equal(t, flushUnit[3].segID, int64(3)) assert.Equal(t, len(flushUnit[3].numRows), 2)