提交 7064937d 编写于 作者: N neza2017 提交者: zhenshan.cao

fix insertBuffer node (#5632)

Signed-off-by: Nyefu.chen <yefu.chen@zilliz.com>
上级 0c362a88
...@@ -45,9 +45,7 @@ type Replica interface { ...@@ -45,9 +45,7 @@ type Replica interface {
setStartPositions(segmentID UniqueID, startPos []*internalpb.MsgPosition) error setStartPositions(segmentID UniqueID, startPos []*internalpb.MsgPosition) error
setEndPositions(segmentID UniqueID, endPos []*internalpb.MsgPosition) error setEndPositions(segmentID UniqueID, endPos []*internalpb.MsgPosition) error
getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition)
setSegmentCheckPoint(segID UniqueID) listOpenSegmentCheckPointAndNumRows(segs []UniqueID) (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64)
listOpenSegmentCheckPointAndNumRows() (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64)
removeSegmentCheckPoint(segID UniqueID)
} }
// Segment is the data structure of segments in data node replica. // Segment is the data structure of segments in data node replica.
...@@ -69,10 +67,9 @@ type CollectionSegmentReplica struct { ...@@ -69,10 +67,9 @@ type CollectionSegmentReplica struct {
segments map[UniqueID]*Segment segments map[UniqueID]*Segment
collections map[UniqueID]*Collection collections map[UniqueID]*Collection
posMu sync.Mutex posMu sync.Mutex
startPositions map[UniqueID][]*internalpb.MsgPosition startPositions map[UniqueID][]*internalpb.MsgPosition
endPositions map[UniqueID][]*internalpb.MsgPosition endPositions map[UniqueID][]*internalpb.MsgPosition
openSegmentCheckPoint map[UniqueID]internalpb.MsgPosition
} }
var _ Replica = &CollectionSegmentReplica{} var _ Replica = &CollectionSegmentReplica{}
...@@ -82,11 +79,10 @@ func newReplica() Replica { ...@@ -82,11 +79,10 @@ func newReplica() Replica {
collections := make(map[UniqueID]*Collection) collections := make(map[UniqueID]*Collection)
var replica Replica = &CollectionSegmentReplica{ var replica Replica = &CollectionSegmentReplica{
segments: segments, segments: segments,
collections: collections, collections: collections,
startPositions: make(map[UniqueID][]*internalpb.MsgPosition), startPositions: make(map[UniqueID][]*internalpb.MsgPosition),
endPositions: make(map[UniqueID][]*internalpb.MsgPosition), endPositions: make(map[UniqueID][]*internalpb.MsgPosition),
openSegmentCheckPoint: make(map[UniqueID]internalpb.MsgPosition),
} }
return replica return replica
} }
...@@ -320,29 +316,15 @@ func (replica *CollectionSegmentReplica) getSegmentPositions(segID UniqueID) ([] ...@@ -320,29 +316,15 @@ func (replica *CollectionSegmentReplica) getSegmentPositions(segID UniqueID) ([]
endPos := replica.endPositions[segID] endPos := replica.endPositions[segID]
return startPos, endPos return startPos, endPos
} }
func (replica *CollectionSegmentReplica) setSegmentCheckPoint(segID UniqueID) {
replica.posMu.Lock() func (replica *CollectionSegmentReplica) listOpenSegmentCheckPointAndNumRows(segs []UniqueID) (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) {
defer replica.posMu.Unlock()
ep := replica.endPositions[segID]
if len(ep) != 1 {
panic("msgstream's position should be 1")
}
replica.openSegmentCheckPoint[segID] = *ep[0]
}
func (replica *CollectionSegmentReplica) listOpenSegmentCheckPointAndNumRows() (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) {
replica.posMu.Lock() replica.posMu.Lock()
defer replica.posMu.Unlock() defer replica.posMu.Unlock()
r1 := make(map[UniqueID]internalpb.MsgPosition) r1 := make(map[UniqueID]internalpb.MsgPosition)
r2 := make(map[UniqueID]int64) r2 := make(map[UniqueID]int64)
for k, v := range replica.openSegmentCheckPoint { for _, seg := range segs {
r1[k] = v r1[seg] = *replica.endPositions[seg][0]
r2[k] = replica.segments[k].numRows r2[seg] = replica.segments[seg].numRows
} }
return r1, r2 return r1, r2
} }
func (replica *CollectionSegmentReplica) removeSegmentCheckPoint(segID UniqueID) {
replica.posMu.Lock()
defer replica.posMu.Unlock()
delete(replica.openSegmentCheckPoint, segID)
}
...@@ -61,6 +61,8 @@ type insertBufferNode struct { ...@@ -61,6 +61,8 @@ type insertBufferNode struct {
segmentStatisticsStream msgstream.MsgStream segmentStatisticsStream msgstream.MsgStream
dsSaveBinlog func(fu *autoFlushUnit) error dsSaveBinlog func(fu *autoFlushUnit) error
openSegList map[UniqueID]bool
openSegLock sync.Mutex
} }
type autoFlushUnit struct { type autoFlushUnit struct {
...@@ -494,7 +496,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ...@@ -494,7 +496,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
finishCnt.Add(1) finishCnt.Add(1)
go flushSegment(collMeta, segToFlush, partitionID, collID, go flushSegment(collMeta, segToFlush, partitionID, collID,
&ibNode.flushMap, ibNode.minIOKV, finishCh, &finishCnt, ibNode.replica, ibNode.idAllocator) &ibNode.flushMap, ibNode.minIOKV, finishCh, &finishCnt, ibNode, ibNode.idAllocator)
} }
} }
finishCnt.Wait() finishCnt.Wait()
...@@ -504,7 +506,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ...@@ -504,7 +506,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
log.Debug("segment is empty") log.Debug("segment is empty")
continue continue
} }
fu.openSegCheckpoints, fu.numRows = ibNode.replica.listOpenSegmentCheckPointAndNumRows() fu.openSegCheckpoints, fu.numRows = ibNode.listOpenSegmentCheckPointAndNumRows()
fu.flushed = false fu.flushed = false
if err := ibNode.dsSaveBinlog(&fu); err != nil { if err := ibNode.dsSaveBinlog(&fu); err != nil {
log.Debug("data service save bin log path failed", zap.Error(err)) log.Debug("data service save bin log path failed", zap.Error(err))
...@@ -520,7 +522,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ...@@ -520,7 +522,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if ibNode.insertBuffer.size(currentSegID) <= 0 { if ibNode.insertBuffer.size(currentSegID) <= 0 {
log.Debug(".. Buffer empty ...") log.Debug(".. Buffer empty ...")
c, n := ibNode.replica.listOpenSegmentCheckPointAndNumRows() c, n := ibNode.listOpenSegmentCheckPointAndNumRows()
ibNode.dsSaveBinlog(&autoFlushUnit{ ibNode.dsSaveBinlog(&autoFlushUnit{
collID: fmsg.collectionID, collID: fmsg.collectionID,
segID: currentSegID, segID: currentSegID,
...@@ -529,7 +531,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ...@@ -529,7 +531,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
openSegCheckpoints: c, openSegCheckpoints: c,
flushed: true, flushed: true,
}) })
ibNode.replica.removeSegmentCheckPoint(fmsg.segmentID) ibNode.removeSegmentCheckPoint(fmsg.segmentID)
fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}} fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}}
} else { } else {
log.Debug(".. Buffer not empty, flushing ..") log.Debug(".. Buffer not empty, flushing ..")
...@@ -569,17 +571,17 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ...@@ -569,17 +571,17 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
} }
flushSegment(collMeta, currentSegID, seg.partitionID, seg.collectionID, flushSegment(collMeta, currentSegID, seg.partitionID, seg.collectionID,
&ibNode.flushMap, ibNode.minIOKV, finishCh, nil, ibNode.replica, ibNode.idAllocator) &ibNode.flushMap, ibNode.minIOKV, finishCh, nil, ibNode, ibNode.idAllocator)
fu := <-finishCh fu := <-finishCh
close(finishCh) close(finishCh)
if fu.field2Path != nil { if fu.field2Path != nil {
fu.openSegCheckpoints, fu.numRows = ibNode.replica.listOpenSegmentCheckPointAndNumRows() fu.openSegCheckpoints, fu.numRows = ibNode.listOpenSegmentCheckPointAndNumRows()
fu.flushed = true fu.flushed = true
if ibNode.dsSaveBinlog(&fu) != nil { if ibNode.dsSaveBinlog(&fu) != nil {
log.Debug("data service save bin log path failed", zap.Error(err)) log.Debug("data service save bin log path failed", zap.Error(err))
} else { } else {
// this segment has flushed, so it's not `open segment`, so remove from the check point // this segment has flushed, so it's not `open segment`, so remove from the check point
ibNode.replica.removeSegmentCheckPoint(fu.segID) ibNode.removeSegmentCheckPoint(fu.segID)
} }
} }
fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}} fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}}
...@@ -602,7 +604,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ...@@ -602,7 +604,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID UniqueID, func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID UniqueID,
insertData *sync.Map, kv kv.BaseKV, flushUnit chan<- autoFlushUnit, wgFinish *sync.WaitGroup, insertData *sync.Map, kv kv.BaseKV, flushUnit chan<- autoFlushUnit, wgFinish *sync.WaitGroup,
replica Replica, idAllocator allocatorInterface) { ibNode *insertBufferNode, idAllocator allocatorInterface) {
if wgFinish != nil { if wgFinish != nil {
defer wgFinish.Done() defer wgFinish.Done()
} }
...@@ -693,11 +695,31 @@ func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID Un ...@@ -693,11 +695,31 @@ func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID Un
return return
} }
replica.setSegmentCheckPoint(segID) ibNode.setSegmentCheckPoint(segID)
flushUnit <- autoFlushUnit{collID: collID, segID: segID, field2Path: field2Path} flushUnit <- autoFlushUnit{collID: collID, segID: segID, field2Path: field2Path}
clearFn(true) clearFn(true)
} }
func (ibNode *insertBufferNode) setSegmentCheckPoint(segID UniqueID) {
ibNode.openSegLock.Lock()
defer ibNode.openSegLock.Unlock()
ibNode.openSegList[segID] = true
}
func (ibNode *insertBufferNode) removeSegmentCheckPoint(segID UniqueID) {
ibNode.openSegLock.Lock()
defer ibNode.openSegLock.Unlock()
delete(ibNode.openSegList, segID)
}
func (ibNode *insertBufferNode) listOpenSegmentCheckPointAndNumRows() (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) {
ibNode.openSegLock.Lock()
defer ibNode.openSegLock.Unlock()
segs := make([]UniqueID, 0, len(ibNode.openSegList))
for k := range ibNode.openSegList {
segs = append(segs, k)
}
return ibNode.replica.listOpenSegmentCheckPointAndNumRows(segs)
}
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error { func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
msgPack := msgstream.MsgPack{} msgPack := msgstream.MsgPack{}
timeTickMsg := msgstream.TimeTickMsg{ timeTickMsg := msgstream.TimeTickMsg{
...@@ -852,5 +874,6 @@ func newInsertBufferNode( ...@@ -852,5 +874,6 @@ func newInsertBufferNode(
flushChan: flushCh, flushChan: flushCh,
idAllocator: idAllocator, idAllocator: idAllocator,
dsSaveBinlog: saveBinlog, dsSaveBinlog: saveBinlog,
openSegList: make(map[UniqueID]bool),
} }
} }
...@@ -121,6 +121,8 @@ func genInsertMsg() insertMsg { ...@@ -121,6 +121,8 @@ func genInsertMsg() insertMsg {
} }
func TestFlushSegment(t *testing.T) { func TestFlushSegment(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
idAllocMock := NewAllocatorFactory(1) idAllocMock := NewAllocatorFactory(1)
mockMinIO := memkv.NewMemoryKV() mockMinIO := memkv.NewMemoryKV()
...@@ -154,6 +156,19 @@ func TestFlushSegment(t *testing.T) { ...@@ -154,6 +156,19 @@ func TestFlushSegment(t *testing.T) {
} }
flushMap.Store(segmentID, insertData) flushMap.Store(segmentID, insertData)
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarAddress,
"pulsarBufSize": 1024}
err := msFactory.SetParams(m)
assert.Nil(t, err)
flushChan := make(chan *flushMsg, 100)
saveBinlog := func(*autoFlushUnit) error {
return nil
}
ibNode := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog)
flushSegment(collMeta, flushSegment(collMeta,
segmentID, segmentID,
partitionID, partitionID,
...@@ -162,7 +177,7 @@ func TestFlushSegment(t *testing.T) { ...@@ -162,7 +177,7 @@ func TestFlushSegment(t *testing.T) {
mockMinIO, mockMinIO,
finishCh, finishCh,
nil, nil,
replica, ibNode,
idAllocMock) idAllocMock)
fu := <-finishCh fu := <-finishCh
...@@ -241,11 +256,10 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { ...@@ -241,11 +256,10 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
dataFactory := NewDataFactory() dataFactory := NewDataFactory()
colRep := &CollectionSegmentReplica{ colRep := &CollectionSegmentReplica{
segments: make(map[UniqueID]*Segment), segments: make(map[UniqueID]*Segment),
collections: make(map[UniqueID]*Collection), collections: make(map[UniqueID]*Collection),
startPositions: make(map[UniqueID][]*internalpb.MsgPosition), startPositions: make(map[UniqueID][]*internalpb.MsgPosition),
endPositions: make(map[UniqueID][]*internalpb.MsgPosition), endPositions: make(map[UniqueID][]*internalpb.MsgPosition),
openSegmentCheckPoint: make(map[UniqueID]internalpb.MsgPosition),
} }
err = colRep.addCollection(collMeta.ID, collMeta.Schema) err = colRep.addCollection(collMeta.ID, collMeta.Schema)
require.NoError(t, err) require.NoError(t, err)
...@@ -282,7 +296,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { ...@@ -282,7 +296,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
assert.Equal(t, len(colRep.endPositions), 2) assert.Equal(t, len(colRep.endPositions), 2)
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(123)) assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(123))
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(123)) assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(123))
assert.Equal(t, len(colRep.openSegmentCheckPoint), 0) assert.Equal(t, len(iBNode.openSegList), 0)
assert.Equal(t, len(iBNode.insertBuffer.insertData), 2) assert.Equal(t, len(iBNode.insertBuffer.insertData), 2)
assert.Equal(t, iBNode.insertBuffer.size(1), int32(50+16000)) assert.Equal(t, iBNode.insertBuffer.size(1), int32(50+16000))
assert.Equal(t, iBNode.insertBuffer.size(2), int32(50+16000)) assert.Equal(t, iBNode.insertBuffer.size(2), int32(50+16000))
...@@ -296,8 +310,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { ...@@ -296,8 +310,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(123)) assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(123))
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234)) assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234))
assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234)) assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234))
assert.Equal(t, len(colRep.openSegmentCheckPoint), 1) assert.Equal(t, iBNode.openSegList, map[UniqueID]bool{2: true})
assert.Equal(t, colRep.openSegmentCheckPoint[2].Timestamp, Timestamp(234))
assert.Equal(t, len(flushUnit), 1) assert.Equal(t, len(flushUnit), 1)
assert.Equal(t, flushUnit[0].segID, int64(2)) assert.Equal(t, flushUnit[0].segID, int64(2))
assert.Equal(t, len(flushUnit[0].numRows), 1) assert.Equal(t, len(flushUnit[0].numRows), 1)
...@@ -319,9 +332,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { ...@@ -319,9 +332,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345)) assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345))
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234)) assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234))
assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234)) assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234))
assert.Equal(t, len(colRep.openSegmentCheckPoint), 2) assert.Equal(t, iBNode.openSegList, map[UniqueID]bool{1: true, 2: true})
assert.Equal(t, colRep.openSegmentCheckPoint[1].Timestamp, Timestamp(345))
assert.Equal(t, colRep.openSegmentCheckPoint[2].Timestamp, Timestamp(234))
assert.Equal(t, len(flushUnit), 2) assert.Equal(t, len(flushUnit), 2)
assert.Equal(t, flushUnit[1].segID, int64(1)) assert.Equal(t, flushUnit[1].segID, int64(1))
assert.Equal(t, len(flushUnit[1].numRows), 2) assert.Equal(t, len(flushUnit[1].numRows), 2)
...@@ -358,8 +369,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { ...@@ -358,8 +369,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345)) assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345))
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234)) assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234))
assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234)) assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234))
assert.Equal(t, len(colRep.openSegmentCheckPoint), 1) assert.Equal(t, iBNode.openSegList, map[UniqueID]bool{2: true})
assert.Equal(t, colRep.openSegmentCheckPoint[2].Timestamp, Timestamp(234))
assert.Equal(t, len(flushUnit), 3) assert.Equal(t, len(flushUnit), 3)
assert.Equal(t, flushUnit[2].segID, int64(1)) assert.Equal(t, flushUnit[2].segID, int64(1))
assert.Equal(t, len(flushUnit[2].numRows), 2) assert.Equal(t, len(flushUnit[2].numRows), 2)
...@@ -390,8 +400,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { ...@@ -390,8 +400,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345)) assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345))
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234)) assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234))
assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234)) assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234))
assert.Equal(t, len(colRep.openSegmentCheckPoint), 1) assert.Equal(t, iBNode.openSegList, map[UniqueID]bool{2: true})
assert.Equal(t, colRep.openSegmentCheckPoint[2].Timestamp, Timestamp(234))
assert.Equal(t, len(flushUnit), 4) assert.Equal(t, len(flushUnit), 4)
assert.Equal(t, flushUnit[3].segID, int64(3)) assert.Equal(t, flushUnit[3].segID, int64(3))
assert.Equal(t, len(flushUnit[3].numRows), 2) assert.Equal(t, len(flushUnit[3].numRows), 2)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册