提交 0c362a88 编写于 作者: N neza2017 提交者: zhenshan.cao

fix bug and add unit test for insert buffer (#5630)

Signed-off-by: Nyefu.chen <yefu.chen@zilliz.com>
上级 3c706753
...@@ -46,7 +46,7 @@ type Replica interface { ...@@ -46,7 +46,7 @@ type Replica interface {
setEndPositions(segmentID UniqueID, endPos []*internalpb.MsgPosition) error setEndPositions(segmentID UniqueID, endPos []*internalpb.MsgPosition) error
getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition)
setSegmentCheckPoint(segID UniqueID) setSegmentCheckPoint(segID UniqueID)
listOpenSegmentCheckPoint() map[UniqueID]internalpb.MsgPosition listOpenSegmentCheckPointAndNumRows() (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64)
removeSegmentCheckPoint(segID UniqueID) removeSegmentCheckPoint(segID UniqueID)
} }
...@@ -329,10 +329,16 @@ func (replica *CollectionSegmentReplica) setSegmentCheckPoint(segID UniqueID) { ...@@ -329,10 +329,16 @@ func (replica *CollectionSegmentReplica) setSegmentCheckPoint(segID UniqueID) {
} }
replica.openSegmentCheckPoint[segID] = *ep[0] replica.openSegmentCheckPoint[segID] = *ep[0]
} }
func (replica *CollectionSegmentReplica) listOpenSegmentCheckPoint() map[UniqueID]internalpb.MsgPosition { func (replica *CollectionSegmentReplica) listOpenSegmentCheckPointAndNumRows() (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) {
replica.posMu.Lock() replica.posMu.Lock()
defer replica.posMu.Unlock() defer replica.posMu.Unlock()
return replica.openSegmentCheckPoint r1 := make(map[UniqueID]internalpb.MsgPosition)
r2 := make(map[UniqueID]int64)
for k, v := range replica.openSegmentCheckPoint {
r1[k] = v
r2[k] = replica.segments[k].numRows
}
return r1, r2
} }
func (replica *CollectionSegmentReplica) removeSegmentCheckPoint(segID UniqueID) { func (replica *CollectionSegmentReplica) removeSegmentCheckPoint(segID UniqueID) {
......
...@@ -504,21 +504,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ...@@ -504,21 +504,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
log.Debug("segment is empty") log.Debug("segment is empty")
continue continue
} }
fu.openSegCheckpoints = ibNode.replica.listOpenSegmentCheckPoint() fu.openSegCheckpoints, fu.numRows = ibNode.replica.listOpenSegmentCheckPointAndNumRows()
fu.numRows = make(map[UniqueID]int64)
for k := range fu.openSegCheckpoints {
segStat, err := ibNode.replica.getSegmentStatisticsUpdates(k)
if err != nil {
log.Debug("getSegmentStatisticsUpdates failed", zap.Error(err))
fu.numRows = nil
break
}
fu.numRows[k] = segStat.NumRows
}
if fu.numRows == nil {
log.Debug("failed on get segment num rows")
break
}
fu.flushed = false fu.flushed = false
if err := ibNode.dsSaveBinlog(&fu); err != nil { if err := ibNode.dsSaveBinlog(&fu); err != nil {
log.Debug("data service save bin log path failed", zap.Error(err)) log.Debug("data service save bin log path failed", zap.Error(err))
...@@ -532,32 +518,18 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ...@@ -532,32 +518,18 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
currentSegID := fmsg.segmentID currentSegID := fmsg.segmentID
log.Debug(". Receiving flush message", zap.Int64("segmentID", currentSegID)) log.Debug(". Receiving flush message", zap.Int64("segmentID", currentSegID))
checkPoints := ibNode.replica.listOpenSegmentCheckPoint()
numRows := make(map[UniqueID]int64)
for k := range checkPoints {
segStat, err := ibNode.replica.getSegmentStatisticsUpdates(k)
if err != nil {
log.Debug("getSegmentStatisticsUpdates failed", zap.Error(err))
numRows = nil
break
}
numRows[k] = segStat.NumRows
}
if numRows == nil {
log.Debug("failed on get segment num rows")
break
}
if ibNode.insertBuffer.size(currentSegID) <= 0 { if ibNode.insertBuffer.size(currentSegID) <= 0 {
log.Debug(".. Buffer empty ...") log.Debug(".. Buffer empty ...")
c, n := ibNode.replica.listOpenSegmentCheckPointAndNumRows()
ibNode.dsSaveBinlog(&autoFlushUnit{ ibNode.dsSaveBinlog(&autoFlushUnit{
collID: fmsg.collectionID, collID: fmsg.collectionID,
segID: currentSegID, segID: currentSegID,
numRows: numRows, numRows: n,
field2Path: map[UniqueID]string{}, field2Path: map[UniqueID]string{},
openSegCheckpoints: checkPoints, openSegCheckpoints: c,
flushed: true, flushed: true,
}) })
ibNode.replica.removeSegmentCheckPoint(fmsg.segmentID)
fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}} fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}}
} else { } else {
log.Debug(".. Buffer not empty, flushing ..") log.Debug(".. Buffer not empty, flushing ..")
...@@ -601,8 +573,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ...@@ -601,8 +573,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
fu := <-finishCh fu := <-finishCh
close(finishCh) close(finishCh)
if fu.field2Path != nil { if fu.field2Path != nil {
fu.numRows = numRows fu.openSegCheckpoints, fu.numRows = ibNode.replica.listOpenSegmentCheckPointAndNumRows()
fu.openSegCheckpoints = checkPoints
fu.flushed = true fu.flushed = true
if ibNode.dsSaveBinlog(&fu) != nil { if ibNode.dsSaveBinlog(&fu) != nil {
log.Debug("data service save bin log path failed", zap.Error(err)) log.Debug("data service save bin log path failed", zap.Error(err))
......
...@@ -226,3 +226,179 @@ func genCollectionMeta(collectionID UniqueID, collectionName string) *etcdpb.Col ...@@ -226,3 +226,179 @@ func genCollectionMeta(collectionID UniqueID, collectionName string) *etcdpb.Col
} }
return &collection return &collection
} }
func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
testPath := "/test/datanode/root/meta"
err := clearEtcd(testPath)
require.NoError(t, err)
Params.MetaRootPath = testPath
Factory := &MetaFactory{}
collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1")
dataFactory := NewDataFactory()
colRep := &CollectionSegmentReplica{
segments: make(map[UniqueID]*Segment),
collections: make(map[UniqueID]*Collection),
startPositions: make(map[UniqueID][]*internalpb.MsgPosition),
endPositions: make(map[UniqueID][]*internalpb.MsgPosition),
openSegmentCheckPoint: make(map[UniqueID]internalpb.MsgPosition),
}
err = colRep.addCollection(collMeta.ID, collMeta.Schema)
require.NoError(t, err)
err = colRep.addSegment(1, collMeta.ID, 0, Params.InsertChannelNames[0])
require.NoError(t, err)
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarAddress,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err)
flushUnit := []autoFlushUnit{}
saveBinlog := func(fu *autoFlushUnit) error {
flushUnit = append(flushUnit, *fu)
return nil
}
flushChan := make(chan *flushMsg, 100)
iBNode := newInsertBufferNode(ctx, colRep, msFactory, NewAllocatorFactory(), flushChan, saveBinlog)
inMsg := genInsertMsg()
inMsg.insertMessages = dataFactory.GetMsgStreamInsertMsgs(100)
inMsg.insertMessages = append(inMsg.insertMessages, dataFactory.GetMsgStreamInsertMsgs(32000)...)
for i := range inMsg.insertMessages {
inMsg.insertMessages[i].SegmentID = int64(i%2) + 1
}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 123}}
var iMsg flowgraph.Msg = &inMsg
iBNode.Operate([]flowgraph.Msg{iMsg})
assert.Equal(t, len(colRep.endPositions), 2)
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(123))
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(123))
assert.Equal(t, len(colRep.openSegmentCheckPoint), 0)
assert.Equal(t, len(iBNode.insertBuffer.insertData), 2)
assert.Equal(t, iBNode.insertBuffer.size(1), int32(50+16000))
assert.Equal(t, iBNode.insertBuffer.size(2), int32(50+16000))
for i := range inMsg.insertMessages {
inMsg.insertMessages[i].SegmentID = int64(i%2) + 2
}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 234}}
iBNode.Operate([]flowgraph.Msg{iMsg})
assert.Equal(t, len(colRep.endPositions), 3)
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(123))
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234))
assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234))
assert.Equal(t, len(colRep.openSegmentCheckPoint), 1)
assert.Equal(t, colRep.openSegmentCheckPoint[2].Timestamp, Timestamp(234))
assert.Equal(t, len(flushUnit), 1)
assert.Equal(t, flushUnit[0].segID, int64(2))
assert.Equal(t, len(flushUnit[0].numRows), 1)
assert.Equal(t, flushUnit[0].numRows[2], int64(100+32000))
assert.Equal(t, len(flushUnit[0].openSegCheckpoints), 1)
assert.Equal(t, flushUnit[0].openSegCheckpoints[2].Timestamp, Timestamp(234))
assert.Greater(t, len(flushUnit[0].field2Path), 0)
assert.False(t, flushUnit[0].flushed)
assert.Equal(t, len(iBNode.insertBuffer.insertData), 2)
assert.Equal(t, iBNode.insertBuffer.size(1), int32(50+16000))
assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000))
for i := range inMsg.insertMessages {
inMsg.insertMessages[i].SegmentID = 1
}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}}
iBNode.Operate([]flowgraph.Msg{iMsg})
assert.Equal(t, len(colRep.endPositions), 3)
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345))
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234))
assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234))
assert.Equal(t, len(colRep.openSegmentCheckPoint), 2)
assert.Equal(t, colRep.openSegmentCheckPoint[1].Timestamp, Timestamp(345))
assert.Equal(t, colRep.openSegmentCheckPoint[2].Timestamp, Timestamp(234))
assert.Equal(t, len(flushUnit), 2)
assert.Equal(t, flushUnit[1].segID, int64(1))
assert.Equal(t, len(flushUnit[1].numRows), 2)
assert.Equal(t, flushUnit[1].numRows[2], int64(100+32000))
assert.Equal(t, flushUnit[1].numRows[1], int64(50+16000+100+32000))
assert.Equal(t, len(flushUnit[1].openSegCheckpoints), 2)
assert.Equal(t, flushUnit[1].openSegCheckpoints[1].Timestamp, Timestamp(345))
assert.Equal(t, flushUnit[1].openSegCheckpoints[2].Timestamp, Timestamp(234))
assert.False(t, flushUnit[1].flushed)
assert.Greater(t, len(flushUnit[1].field2Path), 0)
assert.Equal(t, len(iBNode.insertBuffer.insertData), 1)
assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000))
dmlFlushedCh := make(chan []*datapb.ID2PathList, 1)
flushChan <- &flushMsg{
msgID: 3,
timestamp: 456,
segmentID: UniqueID(1),
collectionID: UniqueID(1),
dmlFlushedCh: dmlFlushedCh,
}
inMsg.insertMessages = []*msgstream.InsertMsg{}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 456}}
iBNode.Operate([]flowgraph.Msg{iMsg})
flushSeg := <-dmlFlushedCh
assert.NotNil(t, flushSeg)
assert.Equal(t, len(flushSeg), 1)
assert.Equal(t, flushSeg[0].ID, int64(1))
assert.NotNil(t, flushSeg[0].Paths)
assert.Equal(t, len(colRep.endPositions), 3)
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345))
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234))
assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234))
assert.Equal(t, len(colRep.openSegmentCheckPoint), 1)
assert.Equal(t, colRep.openSegmentCheckPoint[2].Timestamp, Timestamp(234))
assert.Equal(t, len(flushUnit), 3)
assert.Equal(t, flushUnit[2].segID, int64(1))
assert.Equal(t, len(flushUnit[2].numRows), 2)
assert.Equal(t, flushUnit[2].numRows[2], int64(100+32000))
assert.Equal(t, flushUnit[2].numRows[1], int64(50+16000+100+32000))
t.Log(flushUnit[2].openSegCheckpoints)
assert.Equal(t, len(flushUnit[2].openSegCheckpoints), 2)
assert.Equal(t, flushUnit[2].openSegCheckpoints[1].Timestamp, Timestamp(345))
assert.Equal(t, flushUnit[2].openSegCheckpoints[2].Timestamp, Timestamp(234))
assert.Equal(t, len(flushUnit[2].field2Path), 0)
assert.NotNil(t, flushUnit[2].field2Path)
assert.True(t, flushUnit[2].flushed)
flushChan <- &flushMsg{
msgID: 4,
timestamp: 567,
segmentID: UniqueID(3),
collectionID: UniqueID(3),
dmlFlushedCh: dmlFlushedCh,
}
iBNode.Operate([]flowgraph.Msg{iMsg})
flushSeg = <-dmlFlushedCh
assert.NotNil(t, flushSeg)
assert.Equal(t, len(flushSeg), 1)
assert.Equal(t, flushSeg[0].ID, int64(3))
assert.NotNil(t, flushSeg[0].Paths)
assert.Equal(t, len(colRep.endPositions), 3)
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345))
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234))
assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234))
assert.Equal(t, len(colRep.openSegmentCheckPoint), 1)
assert.Equal(t, colRep.openSegmentCheckPoint[2].Timestamp, Timestamp(234))
assert.Equal(t, len(flushUnit), 4)
assert.Equal(t, flushUnit[3].segID, int64(3))
assert.Equal(t, len(flushUnit[3].numRows), 2)
assert.Equal(t, flushUnit[3].numRows[3], int64(50+16000))
assert.Equal(t, flushUnit[3].numRows[2], int64(100+32000))
assert.Greater(t, len(flushUnit[3].field2Path), 0)
assert.NotNil(t, flushUnit[3].field2Path)
assert.True(t, flushUnit[3].flushed)
assert.Equal(t, len(iBNode.insertBuffer.insertData), 0)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册