未验证 提交 51f5a128 编写于 作者: X XuanYang-cn 提交者: GitHub

Refine codes of datanode buffer (#23168) (#23332)

This PR refines deltabuffer of datanode:
- Add last sync time for compacted segment, see also: #23210
- Ensure all deltabuffermanager handles all delete related operations
- Change usedMemory to atomic.Int64
- Remove allocator in delete buffer
Signed-off-by: Nyangxuan <xuan.yang@zilliz.com>
上级 c64e8809
......@@ -24,6 +24,7 @@ import (
"strings"
"sync"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
......@@ -35,160 +36,163 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
)
// DelBufferManager is in charge of managing insertBuf and delBuf from an overall prospect
// DeltaBufferManager is in charge of managing insertBuf and delBuf from an overall prospect
// not only controlling buffered data size based on every segment size, but also triggering
// insert/delete flush when the memory usage of the whole manager reach a certain level.
// but at the first stage, this struct is only used for delete buff
type DelBufferManager struct {
channel Channel
mu sync.Mutex // guards delMemorySize and delBufHeap
delMemorySize int64
delBufHeap *PriorityQueue
}
func (bm *DelBufferManager) GetSegDelBufMemSize(segID UniqueID) int64 {
bm.mu.Lock()
defer bm.mu.Unlock()
if delDataBuf, ok := bm.channel.getCurDeleteBuffer(segID); ok {
return delDataBuf.item.memorySize
}
return 0
//
// DeltaBufferManager manages channel, usedMemory and delBufHeap.
type DeltaBufferManager struct {
channel Channel
usedMemory atomic.Int64
heapGuard sync.Mutex // guards delBufHeap
delBufHeap *PriorityQueue
}
func (bm *DelBufferManager) GetEntriesNum(segID UniqueID) int64 {
bm.mu.Lock()
defer bm.mu.Unlock()
if delDataBuf, ok := bm.channel.getCurDeleteBuffer(segID); ok {
return delDataBuf.GetEntriesNum()
func (m *DeltaBufferManager) GetEntriesNum(segID UniqueID) int64 {
if buffer, ok := m.Load(segID); ok {
return buffer.GetEntriesNum()
}
return 0
}
// Store :the method only for unit test
func (bm *DelBufferManager) Store(segID UniqueID, delDataBuf *DelDataBuf) {
bm.channel.setCurDeleteBuffer(segID, delDataBuf)
}
func (m *DeltaBufferManager) UpdateCompactedSegments() {
compactedTo2From := m.channel.listCompactedSegmentIDs()
for compactedTo, compactedFrom := range compactedTo2From {
// if the compactedTo segment has 0 numRows, there'll be no segments
// in the channel meta, so remove all compacted from segments related
if !m.channel.hasSegment(compactedTo, true) {
for _, segID := range compactedFrom {
m.Delete(segID)
}
m.channel.removeSegments(compactedFrom...)
continue
}
func (bm *DelBufferManager) StoreNewDeletes(segID UniqueID, pks []primaryKey,
tss []Timestamp, tr TimeRange, startPos, endPos *internalpb.MsgPosition) {
//1. load or create delDataBuf
var delDataBuf *DelDataBuf
buffer, loaded := bm.channel.getCurDeleteBuffer(segID)
if loaded {
delDataBuf = buffer
} else {
delDataBuf = newDelDataBuf()
}
compactToDelBuff, loaded := m.Load(compactedTo)
if !loaded {
compactToDelBuff = newDelDataBuf(compactedTo)
}
//2. fill in new delta
delData := delDataBuf.delData
rowCount := len(pks)
var bufSize int64
for i := 0; i < rowCount; i++ {
delData.Pks = append(delData.Pks, pks[i])
delData.Tss = append(delData.Tss, tss[i])
switch pks[i].Type() {
case schemapb.DataType_Int64:
bufSize += 8
case schemapb.DataType_VarChar:
varCharPk := pks[i].(*varCharPrimaryKey)
bufSize += int64(len(varCharPk.Value))
for _, segID := range compactedFrom {
if delDataBuf, loaded := m.Load(segID); loaded {
compactToDelBuff.MergeDelDataBuf(delDataBuf)
m.Delete(segID)
}
}
//accumulate buf size for timestamp, which is 8 bytes
bufSize += 8
// only store delBuf if EntriesNum > 0
if compactToDelBuff.EntriesNum > 0 {
m.pushOrFixHeap(compactedTo, compactToDelBuff)
// We need to re-add the memorySize because m.Delete(segID) sub them all.
m.usedMemory.Add(compactToDelBuff.GetMemorySize())
m.updateMeta(compactedTo, compactToDelBuff)
}
log.Info("update delBuf for compacted segments",
zap.Int64("compactedTo segmentID", compactedTo),
zap.Int64s("compactedFrom segmentIDs", compactedFrom),
zap.Int64("usedMemory", m.usedMemory.Load()),
)
m.channel.removeSegments(compactedFrom...)
}
//3. update statistics of del data
delDataBuf.accumulateEntriesNum(int64(rowCount))
delDataBuf.updateTimeRange(tr)
delDataBuf.updateStartAndEndPosition(startPos, endPos)
//4. update and sync memory size with priority queue
bm.mu.Lock()
defer bm.mu.Unlock()
if !loaded {
delDataBuf.item.segmentID = segID
delDataBuf.item.memorySize = bufSize
heap.Push(bm.delBufHeap, delDataBuf.item)
}
func (m *DeltaBufferManager) updateMeta(segID UniqueID, delDataBuf *DelDataBuf) {
m.channel.setCurDeleteBuffer(segID, delDataBuf)
}
// pushOrFixHeap updates and sync memory size with priority queue
func (m *DeltaBufferManager) pushOrFixHeap(segID UniqueID, buffer *DelDataBuf) {
m.heapGuard.Lock()
defer m.heapGuard.Unlock()
if _, loaded := m.Load(segID); loaded {
heap.Fix(m.delBufHeap, buffer.item.index)
} else {
bm.delBufHeap.update(delDataBuf.item, delDataBuf.item.memorySize+bufSize)
heap.Push(m.delBufHeap, buffer.item)
}
bm.channel.setCurDeleteBuffer(segID, delDataBuf)
bm.delMemorySize += bufSize
//4. sync metrics
metrics.DataNodeConsumeMsgRowsCount.WithLabelValues(
fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.DeleteLabel).Add(float64(rowCount))
}
func (bm *DelBufferManager) Load(segID UniqueID) (delDataBuf *DelDataBuf, ok bool) {
return bm.channel.getCurDeleteBuffer(segID)
}
// deleteFromHeap deletes an item from the heap
func (m *DeltaBufferManager) deleteFromHeap(buffer *DelDataBuf) {
m.heapGuard.Lock()
defer m.heapGuard.Unlock()
func (bm *DelBufferManager) Delete(segID UniqueID) {
bm.mu.Lock()
defer bm.mu.Unlock()
if buf, ok := bm.channel.getCurDeleteBuffer(segID); ok {
item := buf.item
bm.delMemorySize -= item.memorySize
heap.Remove(bm.delBufHeap, item.index)
bm.channel.rollDeleteBuffer(segID)
if itemIdx, ok := buffer.GetItemIndex(); ok {
heap.Remove(m.delBufHeap, itemIdx)
}
}
func (bm *DelBufferManager) CompactSegBuf(compactedToSegID UniqueID, compactedFromSegIDs []UniqueID) {
var compactToDelBuff *DelDataBuf
compactToDelBuff, loaded := bm.Load(compactedToSegID)
func (m *DeltaBufferManager) StoreNewDeletes(segID UniqueID, pks []primaryKey,
tss []Timestamp, tr TimeRange, startPos, endPos *internalpb.MsgPosition) {
buffer, loaded := m.Load(segID)
if !loaded {
compactToDelBuff = newDelDataBuf()
compactToDelBuff.item.segmentID = compactedToSegID
buffer = newDelDataBuf(segID)
}
for _, segID := range compactedFromSegIDs {
if delDataBuf, loaded := bm.Load(segID); loaded {
compactToDelBuff.mergeDelDataBuf(delDataBuf)
bm.Delete(segID)
}
}
bm.mu.Lock()
defer bm.mu.Unlock()
// only store delBuf if EntriesNum > 0
if compactToDelBuff.EntriesNum > 0 {
if loaded {
bm.delBufHeap.update(compactToDelBuff.item, compactToDelBuff.item.memorySize)
} else {
heap.Push(bm.delBufHeap, compactToDelBuff.item)
}
size := buffer.Buffer(pks, tss, tr, startPos, endPos)
m.pushOrFixHeap(segID, buffer)
m.updateMeta(segID, buffer)
m.usedMemory.Add(size)
metrics.DataNodeConsumeMsgRowsCount.WithLabelValues(
fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.DeleteLabel).Add(float64(len(pks)))
}
func (m *DeltaBufferManager) Load(segID UniqueID) (delDataBuf *DelDataBuf, ok bool) {
return m.channel.getCurDeleteBuffer(segID)
}
func (m *DeltaBufferManager) Delete(segID UniqueID) {
if buffer, loaded := m.Load(segID); loaded {
m.usedMemory.Sub(buffer.GetMemorySize())
m.deleteFromHeap(buffer)
m.channel.rollDeleteBuffer(segID)
// We need to re-add the memorySize because bm.Delete(segID) sub them all.
bm.delMemorySize += compactToDelBuff.item.memorySize
bm.channel.setCurDeleteBuffer(compactedToSegID, compactToDelBuff)
}
}
func (bm *DelBufferManager) ShouldFlushSegments() []UniqueID {
bm.mu.Lock()
defer bm.mu.Unlock()
var shouldFlushSegments []UniqueID
if bm.delMemorySize < Params.DataNodeCfg.FlushDeleteBufferBytes {
return shouldFlushSegments
func (m *DeltaBufferManager) popHeapItem() *Item {
m.heapGuard.Lock()
defer m.heapGuard.Unlock()
return heap.Pop(m.delBufHeap).(*Item)
}
func (m *DeltaBufferManager) ShouldFlushSegments() []UniqueID {
var memUsage = m.usedMemory.Load()
if memUsage < Params.DataNodeCfg.FlushDeleteBufferBytes {
return nil
}
mmUsage := bm.delMemorySize
var poppedSegMem []*Item
var (
poppedSegmentIDs []UniqueID
poppedItems []*Item
)
for {
segMem := heap.Pop(bm.delBufHeap).(*Item)
poppedSegMem = append(poppedSegMem, segMem)
shouldFlushSegments = append(shouldFlushSegments, segMem.segmentID)
log.Info("add segment for delete buf flush", zap.Int64("segmentID", segMem.segmentID))
mmUsage -= segMem.memorySize
if mmUsage < Params.DataNodeCfg.FlushDeleteBufferBytes {
segItem := m.popHeapItem()
poppedItems = append(poppedItems, segItem)
poppedSegmentIDs = append(poppedSegmentIDs, segItem.segmentID)
memUsage -= segItem.memorySize
if memUsage < Params.DataNodeCfg.FlushDeleteBufferBytes {
break
}
}
//here we push all selected segment back into the heap
//in order to keep the heap semantically correct
for _, segMem := range poppedSegMem {
heap.Push(bm.delBufHeap, segMem)
m.heapGuard.Lock()
for _, segMem := range poppedItems {
heap.Push(m.delBufHeap, segMem)
}
return shouldFlushSegments
m.heapGuard.Unlock()
log.Info("Add segments to sync delete buffer for stressfull memory", zap.Any("segments", poppedItems))
return poppedSegmentIDs
}
// An Item is something we manage in a memorySize priority queue.
......@@ -312,6 +316,49 @@ type DelDataBuf struct {
endPos *internalpb.MsgPosition
}
// Buffer returns the memory size buffered successfully
func (ddb *DelDataBuf) Buffer(pks []primaryKey, tss []Timestamp, tr TimeRange, startPos, endPos *internalpb.MsgPosition) int64 {
var (
rowCount = len(pks)
bufSize int64
)
for i := 0; i < rowCount; i++ {
ddb.delData.Append(pks[i], tss[i])
switch pks[i].Type() {
case schemapb.DataType_Int64:
bufSize += 8
case schemapb.DataType_VarChar:
varCharPk := pks[i].(*varCharPrimaryKey)
bufSize += int64(len(varCharPk.Value))
}
//accumulate buf size for timestamp, which is 8 bytes
bufSize += 8
}
ddb.accumulateEntriesNum(int64(rowCount))
ddb.updateTimeRange(tr)
ddb.updateStartAndEndPosition(startPos, endPos)
// update memorysize
ddb.item.memorySize += bufSize
return bufSize
}
func (ddb *DelDataBuf) GetMemorySize() int64 {
if ddb.item != nil {
return ddb.item.memorySize
}
return 0
}
func (ddb *DelDataBuf) GetItemIndex() (int, bool) {
if ddb.item != nil {
return ddb.item.index, true
}
return 0, false
}
func (ddb *DelDataBuf) accumulateEntriesNum(entryNum int64) {
ddb.EntriesNum += entryNum
}
......@@ -325,7 +372,7 @@ func (ddb *DelDataBuf) updateTimeRange(tr TimeRange) {
}
}
func (ddb *DelDataBuf) mergeDelDataBuf(buf *DelDataBuf) {
func (ddb *DelDataBuf) MergeDelDataBuf(buf *DelDataBuf) {
ddb.accumulateEntriesNum(buf.EntriesNum)
tr := TimeRange{timestampMax: buf.TimestampTo, timestampMin: buf.TimestampFrom}
......@@ -388,7 +435,7 @@ func newBufferData(collSchema *schemapb.CollectionSchema) (*BufferData, error) {
tsTo: 0}, nil
}
func newDelDataBuf() *DelDataBuf {
func newDelDataBuf(segmentID UniqueID) *DelDataBuf {
return &DelDataBuf{
delData: &DeleteData{},
Binlog: datapb.Binlog{
......@@ -397,7 +444,7 @@ func newDelDataBuf() *DelDataBuf {
TimestampTo: 0,
},
item: &Item{
memorySize: 0,
segmentID: segmentID,
},
}
}
......@@ -18,16 +18,20 @@ package datanode
import (
"container/heap"
"context"
"fmt"
"math"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
)
func genTestCollectionSchema(dim int64, vectorType schemapb.DataType) *schemapb.CollectionSchema {
......@@ -157,58 +161,68 @@ func TestPriorityQueueString(t *testing.T) {
func Test_CompactSegBuff(t *testing.T) {
channelSegments := make(map[UniqueID]*Segment)
delBufferManager := &DelBufferManager{
delBufferManager := &DeltaBufferManager{
channel: &ChannelMeta{
segments: channelSegments,
},
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
delBufHeap: &PriorityQueue{},
}
//1. set compactTo and compactFrom
compactedFromSegIDs := make([]UniqueID, 2)
var segID1 UniqueID = 1111
var segID2 UniqueID = 2222
compactedFromSegIDs[0] = segID1
compactedFromSegIDs[1] = segID2
channelSegments[segID1] = &Segment{}
channelSegments[segID2] = &Segment{}
var compactedToSegID UniqueID = 3333
channelSegments[compactedToSegID] = &Segment{}
targetSeg := &Segment{segmentID: 3333}
targetSeg.setType(datapb.SegmentType_Flushed)
seg1 := &Segment{
segmentID: 1111,
compactedTo: targetSeg.segmentID,
}
seg1.setType(datapb.SegmentType_Compacted)
seg2 := &Segment{
segmentID: 2222,
compactedTo: targetSeg.segmentID,
}
seg2.setType(datapb.SegmentType_Compacted)
channelSegments[seg1.segmentID] = seg1
channelSegments[seg2.segmentID] = seg2
channelSegments[targetSeg.segmentID] = targetSeg
//2. set up deleteDataBuf for seg1 and seg2
delDataBuf1 := newDelDataBuf()
delDataBuf1 := newDelDataBuf(seg1.segmentID)
delDataBuf1.EntriesNum++
delDataBuf1.updateStartAndEndPosition(nil, &internalpb.MsgPosition{Timestamp: 50})
delBufferManager.Store(segID1, delDataBuf1)
delBufferManager.updateMeta(seg1.segmentID, delDataBuf1)
heap.Push(delBufferManager.delBufHeap, delDataBuf1.item)
delDataBuf2 := newDelDataBuf()
delDataBuf2 := newDelDataBuf(seg2.segmentID)
delDataBuf2.EntriesNum++
delDataBuf2.updateStartAndEndPosition(nil, &internalpb.MsgPosition{Timestamp: 50})
delBufferManager.Store(segID2, delDataBuf2)
delBufferManager.updateMeta(seg2.segmentID, delDataBuf2)
heap.Push(delBufferManager.delBufHeap, delDataBuf2.item)
//3. test compact
delBufferManager.CompactSegBuf(compactedToSegID, compactedFromSegIDs)
delBufferManager.UpdateCompactedSegments()
//4. expect results in two aspects:
//4.1 compactedFrom segments are removed from delBufferManager
//4.2 compactedTo seg is set properly with correct entriesNum
_, seg1Exist := delBufferManager.Load(segID1)
_, seg2Exist := delBufferManager.Load(segID2)
_, seg1Exist := delBufferManager.Load(seg1.segmentID)
_, seg2Exist := delBufferManager.Load(seg2.segmentID)
assert.False(t, seg1Exist)
assert.False(t, seg2Exist)
assert.Equal(t, int64(2), delBufferManager.GetEntriesNum(compactedToSegID))
assert.Equal(t, int64(2), delBufferManager.GetEntriesNum(targetSeg.segmentID))
// test item of compactedToSegID is correct
compactTo, ok := delBufferManager.Load(compactedToSegID)
targetSegBuf, ok := delBufferManager.Load(targetSeg.segmentID)
assert.True(t, ok)
assert.Equal(t, compactedToSegID, compactTo.item.segmentID)
assert.NotNil(t, targetSegBuf.item)
assert.Equal(t, targetSeg.segmentID, targetSegBuf.item.segmentID)
//5. test roll and evict (https://github.com/milvus-io/milvus/issues/20501)
delBufferManager.channel.rollDeleteBuffer(compactedToSegID)
_, segCompactedToExist := delBufferManager.Load(compactedToSegID)
delBufferManager.channel.rollDeleteBuffer(targetSeg.segmentID)
_, segCompactedToExist := delBufferManager.Load(targetSeg.segmentID)
assert.False(t, segCompactedToExist)
delBufferManager.channel.evictHistoryDeleteBuffer(compactedToSegID, &internalpb.MsgPosition{
delBufferManager.channel.evictHistoryDeleteBuffer(targetSeg.segmentID, &internalpb.MsgPosition{
Timestamp: 100,
})
cp := delBufferManager.channel.getChannelCheckpoint(&internalpb.MsgPosition{
......@@ -216,3 +230,94 @@ func Test_CompactSegBuff(t *testing.T) {
})
assert.Equal(t, Timestamp(200), cp.Timestamp) // evict all buffer, use ttPos as cp
}
func TestUpdateCompactedSegments(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, nil, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
chanName := "datanode-test-FlowGraphDeletenode-showDelBuf"
testPath := "/test/datanode/root/meta"
assert.NoError(t, clearEtcd(testPath))
Params.BaseTable.Save("etcd.rootPath", "/test/datanode/root")
channel := ChannelMeta{
segments: make(map[UniqueID]*Segment),
}
c := &nodeConfig{
channel: &channel,
vChannelName: chanName,
}
delBufManager := &DeltaBufferManager{
channel: &channel,
delBufHeap: &PriorityQueue{},
}
delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c)
require.NoError(t, err)
tests := []struct {
description string
compactToExist bool
compactedToIDs []UniqueID
compactedFromIDs []UniqueID
expectedSegsRemain []UniqueID
}{
{"zero segments", false,
[]UniqueID{}, []UniqueID{}, []UniqueID{}},
{"segment no compaction", false,
[]UniqueID{}, []UniqueID{}, []UniqueID{100, 101}},
{"segment compacted", true,
[]UniqueID{200}, []UniqueID{103}, []UniqueID{100, 101}},
{"segment compacted 100>201", true,
[]UniqueID{201}, []UniqueID{100}, []UniqueID{101, 201}},
{"segment compacted 100+101>201", true,
[]UniqueID{201, 201}, []UniqueID{100, 101}, []UniqueID{201}},
{"segment compacted 100>201, 101>202", true,
[]UniqueID{201, 202}, []UniqueID{100, 101}, []UniqueID{201, 202}},
// false
{"segment compacted 100>201", false,
[]UniqueID{201}, []UniqueID{100}, []UniqueID{101}},
{"segment compacted 100+101>201", false,
[]UniqueID{201, 201}, []UniqueID{100, 101}, []UniqueID{}},
{"segment compacted 100>201, 101>202", false,
[]UniqueID{201, 202}, []UniqueID{100, 101}, []UniqueID{}},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
if test.compactToExist {
for _, segID := range test.compactedToIDs {
seg := Segment{
segmentID: segID,
numRows: 10,
}
seg.setType(datapb.SegmentType_Flushed)
channel.segments[segID] = &seg
}
} else { // clear all segments in channel
channel.segments = make(map[UniqueID]*Segment)
}
for i, segID := range test.compactedFromIDs {
seg := Segment{
segmentID: segID,
compactedTo: test.compactedToIDs[i],
}
seg.setType(datapb.SegmentType_Compacted)
channel.segments[segID] = &seg
}
delNode.delBufferManager.UpdateCompactedSegments()
for _, remain := range test.expectedSegsRemain {
delNode.channel.hasSegment(remain, true)
}
})
}
}
......@@ -941,6 +941,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
partitionID: partID,
segmentID: req.GetCompactedTo(),
numRows: req.GetNumOfRows(),
lastSyncTs: tsoutil.GetCurrentTime(),
}
err = channel.InitPKstats(ctx, targetSeg, req.GetStatsLogs(), tsoutil.GetCurrentTime())
......
......@@ -54,7 +54,7 @@ type dataSyncService struct {
dataCoord types.DataCoord // DataCoord instance to interact with
clearSignal chan<- string // signal channel to notify flowgraph close for collection/partition drop msg consumed
delBufferManager *DelBufferManager
delBufferManager *DeltaBufferManager
flushingSegCache *Cache // a guarding cache stores currently flushing segment ids
flushManager flushManager // flush manager handles flush process
chunkManager storage.ChunkManager
......@@ -85,10 +85,9 @@ func newDataSyncService(ctx context.Context,
ctx1, cancel := context.WithCancel(ctx)
delBufferManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
delBufferManager := &DeltaBufferManager{
channel: channel,
delBufHeap: &PriorityQueue{},
}
service := &dataSyncService{
......
......@@ -39,9 +39,8 @@ type deleteNode struct {
BaseNode
ctx context.Context
channelName string
delBufferManager *DelBufferManager // manager of delete msg
delBufferManager *DeltaBufferManager // manager of delete msg
channel Channel
idAllocator allocatorInterface
flushManager flushManager
clearSignal chan<- string
......@@ -57,12 +56,12 @@ func (dn *deleteNode) Close() {
func (dn *deleteNode) showDelBuf(segIDs []UniqueID, ts Timestamp) {
for _, segID := range segIDs {
if _, ok := dn.delBufferManager.Load(segID); ok {
if buffer, ok := dn.delBufferManager.Load(segID); ok {
log.Debug("delta buffer status",
zap.Int64("segmentID", segID),
zap.Uint64("timestamp", ts),
zap.Int64("segment ID", segID),
zap.Int64("entries", dn.delBufferManager.GetEntriesNum(segID)),
zap.Int64("memory size", dn.delBufferManager.GetSegDelBufMemSize(segID)),
zap.Int64("entriesNum", buffer.GetEntriesNum()),
zap.Int64("memorySize", buffer.GetMemorySize()),
zap.String("vChannel", dn.channelName))
}
}
......@@ -92,7 +91,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
}
// update compacted segment before operation
dn.updateCompactedSegments()
dn.delBufferManager.UpdateCompactedSegments()
// process delete messages
segIDs := typeutil.NewUniqueSet()
......@@ -154,29 +153,6 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
return in
}
// update delBuf for compacted segments
func (dn *deleteNode) updateCompactedSegments() {
compactedTo2From := dn.channel.listCompactedSegmentIDs()
for compactedTo, compactedFrom := range compactedTo2From {
// if the compactedTo segment has 0 numRows, remove all segments related
if !dn.channel.hasSegment(compactedTo, true) {
for _, segID := range compactedFrom {
dn.delBufferManager.Delete(segID)
}
dn.channel.removeSegments(compactedFrom...)
continue
}
dn.delBufferManager.CompactSegBuf(compactedTo, compactedFrom)
log.Info("update delBuf for compacted segments",
zap.Int64("compactedTo segmentID", compactedTo),
zap.Int64s("compactedFrom segmentIDs", compactedFrom),
)
dn.channel.removeSegments(compactedFrom...)
}
}
func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange, startPos, endPos *internalpb.MsgPosition) ([]UniqueID, error) {
log.Debug("bufferDeleteMsg", zap.Any("primary keys", msg.PrimaryKeys), zap.String("vChannelName", dn.channelName))
......@@ -218,7 +194,7 @@ func (dn *deleteNode) filterSegmentByPK(partID UniqueID, pks []primaryKey, tss [
return segID2Pks, segID2Tss
}
func newDeleteNode(ctx context.Context, fm flushManager, delBufManager *DelBufferManager, sig chan<- string, config *nodeConfig) (*deleteNode, error) {
func newDeleteNode(ctx context.Context, fm flushManager, manager *DeltaBufferManager, sig chan<- string, config *nodeConfig) (*deleteNode, error) {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(config.maxQueueLength)
baseNode.SetMaxParallelism(config.maxParallelism)
......@@ -226,9 +202,8 @@ func newDeleteNode(ctx context.Context, fm flushManager, delBufManager *DelBuffe
return &deleteNode{
ctx: ctx,
BaseNode: baseNode,
delBufferManager: delBufManager,
delBufferManager: manager,
channel: config.channel,
idAllocator: config.allocator,
channelName: config.vChannelName,
flushManager: fm,
clearSignal: sig,
......
......@@ -184,13 +184,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, channel, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
c := &nodeConfig{
channel: channel,
allocator: &allocator{},
vChannelName: chanName,
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
delBufManager := &DeltaBufferManager{
channel: channel,
delBufHeap: &PriorityQueue{},
}
dn, err := newDeleteNode(context.Background(), fm, delBufManager, make(chan string, 1), c)
......@@ -218,14 +216,12 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
t.Run("Test get segment by int64 primary keys", func(te *testing.T) {
c := &nodeConfig{
channel: channel,
allocator: &allocator{},
vChannelName: chanName,
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
delBufManager := &DeltaBufferManager{
channel: channel,
delBufHeap: &PriorityQueue{},
}
dn, err := newDeleteNode(context.Background(), fm, delBufManager, make(chan string, 1), c)
......@@ -260,13 +256,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
c := &nodeConfig{
channel: channel,
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
delBufManager := &DeltaBufferManager{
channel: channel,
delBufHeap: &PriorityQueue{},
}
delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c)
assert.Nil(te, err)
......@@ -288,13 +282,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
c := &nodeConfig{
channel: channel,
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
delBufManager := &DeltaBufferManager{
channel: channel,
delBufHeap: &PriorityQueue{},
}
delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c)
assert.Nil(te, err)
......@@ -322,13 +314,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
c := &nodeConfig{
channel: channel,
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
delBufManager := &DeltaBufferManager{
channel: channel,
delBufHeap: &PriorityQueue{},
}
sig := make(chan string, 1)
delNode, err := newDeleteNode(ctx, fm, delBufManager, sig, c)
......@@ -366,13 +356,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
c := &nodeConfig{
channel: channel,
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
delBufManager := &DeltaBufferManager{
channel: channel,
delBufHeap: &PriorityQueue{},
}
delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c)
assert.Nil(t, err)
......@@ -390,7 +378,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
msg.segmentsToSync = []UniqueID{compactedSegment}
bufItem := &Item{memorySize: 0}
delNode.delBufferManager.Store(compactedSegment,
delNode.delBufferManager.updateMeta(compactedSegment,
&DelDataBuf{delData: &DeleteData{}, item: bufItem})
heap.Push(delNode.delBufferManager.delBufHeap, bufItem)
......@@ -421,13 +409,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
c := &nodeConfig{
channel: channel,
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
delBufManager := &DeltaBufferManager{
channel: channel,
delBufHeap: &PriorityQueue{},
}
mockFlushManager := &mockFlushManager{
recordFlushedSeg: true,
......@@ -453,7 +439,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 0, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(208), delNode.delBufferManager.delMemorySize)
assert.Equal(t, int64(208), delNode.delBufferManager.usedMemory.Load())
assert.Equal(t, 5, delNode.delBufferManager.delBufHeap.Len())
//3. note that the whole memory size used by 5 segments will be 208
......@@ -466,7 +452,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 1, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(160), delNode.delBufferManager.delMemorySize)
assert.Equal(t, int64(160), delNode.delBufferManager.usedMemory.Load())
assert.Equal(t, 4, delNode.delBufferManager.delBufHeap.Len())
//4. there is no new delete msg and delBufferSize is still 200
......@@ -474,7 +460,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 1, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(160), delNode.delBufferManager.delMemorySize)
assert.Equal(t, int64(160), delNode.delBufferManager.usedMemory.Load())
assert.Equal(t, 4, delNode.delBufferManager.delBufHeap.Len())
//5. we reset buffer bytes to 150, then we expect there would be one more
......@@ -484,7 +470,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 2, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(112), delNode.delBufferManager.delMemorySize)
assert.Equal(t, int64(112), delNode.delBufferManager.usedMemory.Load())
assert.Equal(t, 3, delNode.delBufferManager.delBufHeap.Len())
//6. we reset buffer bytes to 60, then most of the segments will be flushed
......@@ -493,7 +479,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 4, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(32), delNode.delBufferManager.delMemorySize)
assert.Equal(t, int64(32), delNode.delBufferManager.usedMemory.Load())
assert.Equal(t, 1, delNode.delBufferManager.delBufHeap.Len())
//7. we reset buffer bytes to 20, then as all segment-memory consumption
......@@ -503,7 +489,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 5, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(0), delNode.delBufferManager.delMemorySize)
assert.Equal(t, int64(0), delNode.delBufferManager.usedMemory.Load())
assert.Equal(t, 0, delNode.delBufferManager.delBufHeap.Len())
})
}
......@@ -527,13 +513,11 @@ func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) {
}
c := &nodeConfig{
channel: channel,
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
delBufManager := &DeltaBufferManager{
channel: channel,
delBufHeap: &PriorityQueue{},
}
delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c)
require.NoError(t, err)
......@@ -548,105 +532,11 @@ func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) {
}
for _, test := range tests {
delBuf := newDelDataBuf()
delBuf := newDelDataBuf(test.seg)
delBuf.accumulateEntriesNum(test.numRows)
delNode.delBufferManager.Store(test.seg, delBuf)
delNode.delBufferManager.updateMeta(test.seg, delBuf)
heap.Push(delNode.delBufferManager.delBufHeap, delBuf.item)
}
delNode.showDelBuf([]UniqueID{111, 112, 113}, 100)
}
func TestFlowGraphDeleteNode_updateCompactedSegments(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, nil, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
chanName := "datanode-test-FlowGraphDeletenode-showDelBuf"
testPath := "/test/datanode/root/meta"
assert.NoError(t, clearEtcd(testPath))
Params.EtcdCfg.MetaRootPath = testPath
channel := ChannelMeta{
segments: make(map[UniqueID]*Segment),
}
c := &nodeConfig{
channel: &channel,
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delBufManager := &DelBufferManager{
channel: &channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c)
require.NoError(t, err)
tests := []struct {
description string
compactToExist bool
compactedToIDs []UniqueID
compactedFromIDs []UniqueID
expectedSegsRemain []UniqueID
}{
{"zero segments", false,
[]UniqueID{}, []UniqueID{}, []UniqueID{}},
{"segment no compaction", false,
[]UniqueID{}, []UniqueID{}, []UniqueID{100, 101}},
{"segment compacted", true,
[]UniqueID{200}, []UniqueID{103}, []UniqueID{100, 101}},
{"segment compacted 100>201", true,
[]UniqueID{201}, []UniqueID{100}, []UniqueID{101, 201}},
{"segment compacted 100+101>201", true,
[]UniqueID{201, 201}, []UniqueID{100, 101}, []UniqueID{201}},
{"segment compacted 100>201, 101>202", true,
[]UniqueID{201, 202}, []UniqueID{100, 101}, []UniqueID{201, 202}},
// false
{"segment compacted 100>201", false,
[]UniqueID{201}, []UniqueID{100}, []UniqueID{101}},
{"segment compacted 100+101>201", false,
[]UniqueID{201, 201}, []UniqueID{100, 101}, []UniqueID{}},
{"segment compacted 100>201, 101>202", false,
[]UniqueID{201, 202}, []UniqueID{100, 101}, []UniqueID{}},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
if test.compactToExist {
for _, segID := range test.compactedToIDs {
seg := Segment{
segmentID: segID,
numRows: 10,
}
seg.setType(datapb.SegmentType_Flushed)
channel.segments[segID] = &seg
}
} else { // clear all segments in channel
channel.segments = make(map[UniqueID]*Segment)
}
for i, segID := range test.compactedFromIDs {
seg := Segment{
segmentID: segID,
compactedTo: test.compactedToIDs[i],
}
seg.setType(datapb.SegmentType_Compacted)
channel.segments[segID] = &seg
}
delNode.updateCompactedSegments()
for _, remain := range test.expectedSegsRemain {
delNode.channel.hasSegment(remain, true)
}
})
}
}
......@@ -48,7 +48,7 @@ type insertBufferNode struct {
ctx context.Context
channelName string
delBufferManager *DelBufferManager // manager of delete msg
delBufferManager *DeltaBufferManager // manager of delete msg
channel Channel
idAllocator allocatorInterface
......@@ -666,7 +666,7 @@ func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID Uni
return ibNode.channel.getCollectionAndPartitionID(segmentID)
}
func newInsertBufferNode(ctx context.Context, collID UniqueID, delBufManager *DelBufferManager, flushCh <-chan flushMsg, resendTTCh <-chan resendTTMsg,
func newInsertBufferNode(ctx context.Context, collID UniqueID, delBufManager *DeltaBufferManager, flushCh <-chan flushMsg, resendTTCh <-chan resendTTMsg,
fm flushManager, flushingSegCache *Cache, config *nodeConfig) (*insertBufferNode, error) {
baseNode := BaseNode{}
......
......@@ -105,10 +105,9 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
delBufManager := &DeltaBufferManager{
channel: channel,
delBufHeap: &PriorityQueue{},
}
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c)
......@@ -208,10 +207,9 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
delBufManager := &DeltaBufferManager{
channel: channel,
delBufHeap: &PriorityQueue{},
}
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c)
......@@ -382,10 +380,9 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
delBufManager := &DeltaBufferManager{
channel: channel,
delBufHeap: &PriorityQueue{},
}
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c)
require.NoError(t, err)
......@@ -626,10 +623,9 @@ func TestRollBF(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
delBufManager := &DeltaBufferManager{
channel: channel,
delBufHeap: &PriorityQueue{},
}
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c)
require.NoError(t, err)
......@@ -711,7 +707,7 @@ type InsertBufferNodeSuit struct {
suite.Suite
channel *ChannelMeta
delBufManager *DelBufferManager
delBufManager *DeltaBufferManager
collID UniqueID
partID UniqueID
......@@ -730,10 +726,9 @@ func (s *InsertBufferNodeSuit) SetupSuite() {
s.partID = 10
s.channel = newChannel("channel", s.collID, nil, rc, s.cm)
s.delBufManager = &DelBufferManager{
channel: s.channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
s.delBufManager = &DeltaBufferManager{
channel: s.channel,
delBufHeap: &PriorityQueue{},
}
s.cm = storage.NewLocalChunkManager(storage.RootPath(insertBufferNodeTestDir))
......@@ -1018,10 +1013,9 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
delBufManager := &DeltaBufferManager{
channel: channel,
delBufHeap: &PriorityQueue{},
}
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c)
require.NoError(t, err)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册