未验证 提交 8791d1c4 编写于 作者: M MrPresent-Han 提交者: GitHub

Fix panic error due to rollDeleteOp ahead by load-and-delete (#20563) (#20603)

issue:#20501
Signed-off-by: NMrPresent-Han <jamesharden11122@gmail.com>
Signed-off-by: NMrPresent-Han <jamesharden11122@gmail.com>
上级 bb43497f
......@@ -114,7 +114,7 @@ func (bm *DelBufferManager) Load(segID UniqueID) (delDataBuf *DelDataBuf, ok boo
if ok {
return buffer, ok
}
return nil, ok
return nil, false
}
func (bm *DelBufferManager) Delete(segID UniqueID) {
......@@ -126,14 +126,6 @@ func (bm *DelBufferManager) Delete(segID UniqueID) {
}
}
func (bm *DelBufferManager) LoadAndDelete(segID UniqueID) (delDataBuf *DelDataBuf, ok bool) {
if buf, ok := bm.Load(segID); ok {
bm.Delete(segID)
return buf, ok
}
return nil, ok
}
func (bm *DelBufferManager) CompactSegBuf(compactedToSegID UniqueID, compactedFromSegIDs []UniqueID) {
var compactToDelBuff *DelDataBuf
compactToDelBuff, loaded := bm.Load(compactedToSegID)
......@@ -142,8 +134,9 @@ func (bm *DelBufferManager) CompactSegBuf(compactedToSegID UniqueID, compactedFr
}
for _, segID := range compactedFromSegIDs {
if delDataBuf, loaded := bm.LoadAndDelete(segID); loaded {
if delDataBuf, loaded := bm.Load(segID); loaded {
compactToDelBuff.mergeDelDataBuf(delDataBuf)
bm.Delete(segID)
}
}
// only store delBuf if EntriesNum > 0
......
......@@ -17,6 +17,7 @@
package datanode
import (
"container/heap"
"fmt"
"math"
"testing"
......@@ -137,3 +138,46 @@ func TestBufferData_updateTimeRange(t *testing.T) {
})
}
}
func Test_CompactSegBuff(t *testing.T) {
channelSegments := make(map[UniqueID]*Segment)
delBufferManager := &DelBufferManager{
channel: &ChannelMeta{
segments: channelSegments,
},
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
//1. set compactTo and compactFrom
compactedFromSegIDs := make([]UniqueID, 2)
var segID1 UniqueID = 1111
var segID2 UniqueID = 2222
compactedFromSegIDs[0] = segID1
compactedFromSegIDs[1] = segID2
channelSegments[segID1] = &Segment{}
channelSegments[segID2] = &Segment{}
var compactedToSegID UniqueID = 3333
channelSegments[compactedToSegID] = &Segment{}
//2. set up deleteDataBuf for seg1 and seg2
delDataBuf1 := newDelDataBuf()
delDataBuf1.EntriesNum++
delBufferManager.Store(segID1, delDataBuf1)
heap.Push(delBufferManager.delBufHeap, delDataBuf1.item)
delDataBuf2 := newDelDataBuf()
delDataBuf2.EntriesNum++
delBufferManager.Store(segID2, delDataBuf2)
heap.Push(delBufferManager.delBufHeap, delDataBuf2.item)
//3. test compact
delBufferManager.CompactSegBuf(compactedToSegID, compactedFromSegIDs)
//4. expect results in two aspects:
//4.1 compactedFrom segments are removed from delBufferManager
//4.2 compactedTo seg is set properly with correct entriesNum
_, seg1Exist := delBufferManager.Load(segID1)
_, seg2Exist := delBufferManager.Load(segID2)
assert.False(t, seg1Exist)
assert.False(t, seg2Exist)
assert.Equal(t, int64(2), delBufferManager.GetEntriesNum(compactedToSegID))
}
......@@ -123,7 +123,6 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
//then we will add all segments in the fgMsg.segmentsToFlush into the toFlushSeg and remove duplicate segments
//the aim for taking all these actions is to guarantee that the memory consumed by delBuf will not exceed a limit
segmentsToFlush := dn.delBufferManager.ShouldFlushSegments()
log.Info("should flush segments, ", zap.Int("seg_count", len(segmentsToFlush)))
for _, msgSegmentID := range fgMsg.segmentsToSync {
existed := false
for _, autoFlushSegment := range segmentsToFlush {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册