diff --git a/internal/datanode/buffer.go b/internal/datanode/buffer.go index 0a1d2cae22abad54ec4348178cfce3421505d86a..33dd98617471d219418c28062c955f28717cbe04 100644 --- a/internal/datanode/buffer.go +++ b/internal/datanode/buffer.go @@ -24,6 +24,7 @@ import ( "strings" "sync" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/schemapb" @@ -35,160 +36,163 @@ import ( "github.com/milvus-io/milvus/internal/util/typeutil" ) -// DelBufferManager is in charge of managing insertBuf and delBuf from an overall prospect +// DeltaBufferManager is in charge of managing insertBuf and delBuf from an overall prospect // not only controlling buffered data size based on every segment size, but also triggering // insert/delete flush when the memory usage of the whole manager reach a certain level. // but at the first stage, this struct is only used for delete buff -type DelBufferManager struct { - channel Channel - mu sync.Mutex // guards delMemorySize and delBufHeap - delMemorySize int64 - delBufHeap *PriorityQueue -} - -func (bm *DelBufferManager) GetSegDelBufMemSize(segID UniqueID) int64 { - bm.mu.Lock() - defer bm.mu.Unlock() - if delDataBuf, ok := bm.channel.getCurDeleteBuffer(segID); ok { - return delDataBuf.item.memorySize - } - return 0 +// +// DeltaBufferManager manages channel, usedMemory and delBufHeap. +type DeltaBufferManager struct { + channel Channel + usedMemory atomic.Int64 + + heapGuard sync.Mutex // guards delBufHeap + delBufHeap *PriorityQueue } -func (bm *DelBufferManager) GetEntriesNum(segID UniqueID) int64 { - bm.mu.Lock() - defer bm.mu.Unlock() - if delDataBuf, ok := bm.channel.getCurDeleteBuffer(segID); ok { - return delDataBuf.GetEntriesNum() +func (m *DeltaBufferManager) GetEntriesNum(segID UniqueID) int64 { + if buffer, ok := m.Load(segID); ok { + return buffer.GetEntriesNum() } + return 0 } -// Store :the method only for unit test -func (bm *DelBufferManager) Store(segID UniqueID, delDataBuf *DelDataBuf) { - bm.channel.setCurDeleteBuffer(segID, delDataBuf) -} +func (m *DeltaBufferManager) UpdateCompactedSegments() { + compactedTo2From := m.channel.listCompactedSegmentIDs() + for compactedTo, compactedFrom := range compactedTo2From { + + // if the compactedTo segment has 0 numRows, there'll be no segments + // in the channel meta, so remove all compacted from segments related + if !m.channel.hasSegment(compactedTo, true) { + for _, segID := range compactedFrom { + m.Delete(segID) + } + m.channel.removeSegments(compactedFrom...) + continue + } -func (bm *DelBufferManager) StoreNewDeletes(segID UniqueID, pks []primaryKey, - tss []Timestamp, tr TimeRange, startPos, endPos *internalpb.MsgPosition) { - //1. load or create delDataBuf - var delDataBuf *DelDataBuf - buffer, loaded := bm.channel.getCurDeleteBuffer(segID) - if loaded { - delDataBuf = buffer - } else { - delDataBuf = newDelDataBuf() - } + compactToDelBuff, loaded := m.Load(compactedTo) + if !loaded { + compactToDelBuff = newDelDataBuf(compactedTo) + } - //2. fill in new delta - delData := delDataBuf.delData - rowCount := len(pks) - var bufSize int64 - for i := 0; i < rowCount; i++ { - delData.Pks = append(delData.Pks, pks[i]) - delData.Tss = append(delData.Tss, tss[i]) - switch pks[i].Type() { - case schemapb.DataType_Int64: - bufSize += 8 - case schemapb.DataType_VarChar: - varCharPk := pks[i].(*varCharPrimaryKey) - bufSize += int64(len(varCharPk.Value)) + for _, segID := range compactedFrom { + if delDataBuf, loaded := m.Load(segID); loaded { + compactToDelBuff.MergeDelDataBuf(delDataBuf) + m.Delete(segID) + } } - //accumulate buf size for timestamp, which is 8 bytes - bufSize += 8 + + // only store delBuf if EntriesNum > 0 + if compactToDelBuff.EntriesNum > 0 { + + m.pushOrFixHeap(compactedTo, compactToDelBuff) + // We need to re-add the memorySize because m.Delete(segID) sub them all. + m.usedMemory.Add(compactToDelBuff.GetMemorySize()) + m.updateMeta(compactedTo, compactToDelBuff) + } + + log.Info("update delBuf for compacted segments", + zap.Int64("compactedTo segmentID", compactedTo), + zap.Int64s("compactedFrom segmentIDs", compactedFrom), + zap.Int64("usedMemory", m.usedMemory.Load()), + ) + m.channel.removeSegments(compactedFrom...) } - //3. update statistics of del data - delDataBuf.accumulateEntriesNum(int64(rowCount)) - delDataBuf.updateTimeRange(tr) - delDataBuf.updateStartAndEndPosition(startPos, endPos) - - //4. update and sync memory size with priority queue - bm.mu.Lock() - defer bm.mu.Unlock() - if !loaded { - delDataBuf.item.segmentID = segID - delDataBuf.item.memorySize = bufSize - heap.Push(bm.delBufHeap, delDataBuf.item) +} + +func (m *DeltaBufferManager) updateMeta(segID UniqueID, delDataBuf *DelDataBuf) { + m.channel.setCurDeleteBuffer(segID, delDataBuf) +} + +// pushOrFixHeap updates and sync memory size with priority queue +func (m *DeltaBufferManager) pushOrFixHeap(segID UniqueID, buffer *DelDataBuf) { + m.heapGuard.Lock() + defer m.heapGuard.Unlock() + if _, loaded := m.Load(segID); loaded { + heap.Fix(m.delBufHeap, buffer.item.index) } else { - bm.delBufHeap.update(delDataBuf.item, delDataBuf.item.memorySize+bufSize) + heap.Push(m.delBufHeap, buffer.item) } - bm.channel.setCurDeleteBuffer(segID, delDataBuf) - bm.delMemorySize += bufSize - //4. sync metrics - metrics.DataNodeConsumeMsgRowsCount.WithLabelValues( - fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.DeleteLabel).Add(float64(rowCount)) } -func (bm *DelBufferManager) Load(segID UniqueID) (delDataBuf *DelDataBuf, ok bool) { - return bm.channel.getCurDeleteBuffer(segID) -} +// deleteFromHeap deletes an item from the heap +func (m *DeltaBufferManager) deleteFromHeap(buffer *DelDataBuf) { + m.heapGuard.Lock() + defer m.heapGuard.Unlock() -func (bm *DelBufferManager) Delete(segID UniqueID) { - bm.mu.Lock() - defer bm.mu.Unlock() - if buf, ok := bm.channel.getCurDeleteBuffer(segID); ok { - item := buf.item - bm.delMemorySize -= item.memorySize - heap.Remove(bm.delBufHeap, item.index) - bm.channel.rollDeleteBuffer(segID) + if itemIdx, ok := buffer.GetItemIndex(); ok { + heap.Remove(m.delBufHeap, itemIdx) } } -func (bm *DelBufferManager) CompactSegBuf(compactedToSegID UniqueID, compactedFromSegIDs []UniqueID) { - var compactToDelBuff *DelDataBuf - compactToDelBuff, loaded := bm.Load(compactedToSegID) +func (m *DeltaBufferManager) StoreNewDeletes(segID UniqueID, pks []primaryKey, + tss []Timestamp, tr TimeRange, startPos, endPos *internalpb.MsgPosition) { + buffer, loaded := m.Load(segID) if !loaded { - compactToDelBuff = newDelDataBuf() - compactToDelBuff.item.segmentID = compactedToSegID + buffer = newDelDataBuf(segID) } - for _, segID := range compactedFromSegIDs { - if delDataBuf, loaded := bm.Load(segID); loaded { - compactToDelBuff.mergeDelDataBuf(delDataBuf) - bm.Delete(segID) - } - } - bm.mu.Lock() - defer bm.mu.Unlock() - // only store delBuf if EntriesNum > 0 - if compactToDelBuff.EntriesNum > 0 { - if loaded { - bm.delBufHeap.update(compactToDelBuff.item, compactToDelBuff.item.memorySize) - } else { - heap.Push(bm.delBufHeap, compactToDelBuff.item) - } + size := buffer.Buffer(pks, tss, tr, startPos, endPos) + + m.pushOrFixHeap(segID, buffer) + m.updateMeta(segID, buffer) + m.usedMemory.Add(size) + + metrics.DataNodeConsumeMsgRowsCount.WithLabelValues( + fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.DeleteLabel).Add(float64(len(pks))) +} + +func (m *DeltaBufferManager) Load(segID UniqueID) (delDataBuf *DelDataBuf, ok bool) { + return m.channel.getCurDeleteBuffer(segID) +} + +func (m *DeltaBufferManager) Delete(segID UniqueID) { + if buffer, loaded := m.Load(segID); loaded { + m.usedMemory.Sub(buffer.GetMemorySize()) + m.deleteFromHeap(buffer) + m.channel.rollDeleteBuffer(segID) - // We need to re-add the memorySize because bm.Delete(segID) sub them all. - bm.delMemorySize += compactToDelBuff.item.memorySize - bm.channel.setCurDeleteBuffer(compactedToSegID, compactToDelBuff) } } -func (bm *DelBufferManager) ShouldFlushSegments() []UniqueID { - bm.mu.Lock() - defer bm.mu.Unlock() - var shouldFlushSegments []UniqueID - if bm.delMemorySize < Params.DataNodeCfg.FlushDeleteBufferBytes { - return shouldFlushSegments +func (m *DeltaBufferManager) popHeapItem() *Item { + m.heapGuard.Lock() + defer m.heapGuard.Unlock() + return heap.Pop(m.delBufHeap).(*Item) +} + +func (m *DeltaBufferManager) ShouldFlushSegments() []UniqueID { + var memUsage = m.usedMemory.Load() + if memUsage < Params.DataNodeCfg.FlushDeleteBufferBytes { + return nil } - mmUsage := bm.delMemorySize - var poppedSegMem []*Item + + var ( + poppedSegmentIDs []UniqueID + poppedItems []*Item + ) for { - segMem := heap.Pop(bm.delBufHeap).(*Item) - poppedSegMem = append(poppedSegMem, segMem) - shouldFlushSegments = append(shouldFlushSegments, segMem.segmentID) - log.Info("add segment for delete buf flush", zap.Int64("segmentID", segMem.segmentID)) - mmUsage -= segMem.memorySize - if mmUsage < Params.DataNodeCfg.FlushDeleteBufferBytes { + segItem := m.popHeapItem() + poppedItems = append(poppedItems, segItem) + poppedSegmentIDs = append(poppedSegmentIDs, segItem.segmentID) + memUsage -= segItem.memorySize + if memUsage < Params.DataNodeCfg.FlushDeleteBufferBytes { break } } + //here we push all selected segment back into the heap //in order to keep the heap semantically correct - for _, segMem := range poppedSegMem { - heap.Push(bm.delBufHeap, segMem) + m.heapGuard.Lock() + for _, segMem := range poppedItems { + heap.Push(m.delBufHeap, segMem) } - return shouldFlushSegments + m.heapGuard.Unlock() + + log.Info("Add segments to sync delete buffer for stressfull memory", zap.Any("segments", poppedItems)) + return poppedSegmentIDs } // An Item is something we manage in a memorySize priority queue. @@ -312,6 +316,49 @@ type DelDataBuf struct { endPos *internalpb.MsgPosition } +// Buffer returns the memory size buffered successfully +func (ddb *DelDataBuf) Buffer(pks []primaryKey, tss []Timestamp, tr TimeRange, startPos, endPos *internalpb.MsgPosition) int64 { + var ( + rowCount = len(pks) + bufSize int64 + ) + for i := 0; i < rowCount; i++ { + ddb.delData.Append(pks[i], tss[i]) + + switch pks[i].Type() { + case schemapb.DataType_Int64: + bufSize += 8 + case schemapb.DataType_VarChar: + varCharPk := pks[i].(*varCharPrimaryKey) + bufSize += int64(len(varCharPk.Value)) + } + //accumulate buf size for timestamp, which is 8 bytes + bufSize += 8 + } + + ddb.accumulateEntriesNum(int64(rowCount)) + ddb.updateTimeRange(tr) + ddb.updateStartAndEndPosition(startPos, endPos) + // update memorysize + ddb.item.memorySize += bufSize + + return bufSize +} + +func (ddb *DelDataBuf) GetMemorySize() int64 { + if ddb.item != nil { + return ddb.item.memorySize + } + return 0 +} + +func (ddb *DelDataBuf) GetItemIndex() (int, bool) { + if ddb.item != nil { + return ddb.item.index, true + } + return 0, false +} + func (ddb *DelDataBuf) accumulateEntriesNum(entryNum int64) { ddb.EntriesNum += entryNum } @@ -325,7 +372,7 @@ func (ddb *DelDataBuf) updateTimeRange(tr TimeRange) { } } -func (ddb *DelDataBuf) mergeDelDataBuf(buf *DelDataBuf) { +func (ddb *DelDataBuf) MergeDelDataBuf(buf *DelDataBuf) { ddb.accumulateEntriesNum(buf.EntriesNum) tr := TimeRange{timestampMax: buf.TimestampTo, timestampMin: buf.TimestampFrom} @@ -388,7 +435,7 @@ func newBufferData(collSchema *schemapb.CollectionSchema) (*BufferData, error) { tsTo: 0}, nil } -func newDelDataBuf() *DelDataBuf { +func newDelDataBuf(segmentID UniqueID) *DelDataBuf { return &DelDataBuf{ delData: &DeleteData{}, Binlog: datapb.Binlog{ @@ -397,7 +444,7 @@ func newDelDataBuf() *DelDataBuf { TimestampTo: 0, }, item: &Item{ - memorySize: 0, + segmentID: segmentID, }, } } diff --git a/internal/datanode/buffer_test.go b/internal/datanode/buffer_test.go index 9d46f93c7723a4022cdcb7034f62ab80437b5b70..e59487b6e8c7369ae033da3bc29ead305f0e194e 100644 --- a/internal/datanode/buffer_test.go +++ b/internal/datanode/buffer_test.go @@ -18,16 +18,20 @@ package datanode import ( "container/heap" + "context" "fmt" "math" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/storage" ) func genTestCollectionSchema(dim int64, vectorType schemapb.DataType) *schemapb.CollectionSchema { @@ -157,58 +161,68 @@ func TestPriorityQueueString(t *testing.T) { func Test_CompactSegBuff(t *testing.T) { channelSegments := make(map[UniqueID]*Segment) - delBufferManager := &DelBufferManager{ + delBufferManager := &DeltaBufferManager{ channel: &ChannelMeta{ segments: channelSegments, }, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, + delBufHeap: &PriorityQueue{}, } //1. set compactTo and compactFrom - compactedFromSegIDs := make([]UniqueID, 2) - var segID1 UniqueID = 1111 - var segID2 UniqueID = 2222 - compactedFromSegIDs[0] = segID1 - compactedFromSegIDs[1] = segID2 - channelSegments[segID1] = &Segment{} - channelSegments[segID2] = &Segment{} - var compactedToSegID UniqueID = 3333 - channelSegments[compactedToSegID] = &Segment{} + targetSeg := &Segment{segmentID: 3333} + targetSeg.setType(datapb.SegmentType_Flushed) + + seg1 := &Segment{ + segmentID: 1111, + compactedTo: targetSeg.segmentID, + } + seg1.setType(datapb.SegmentType_Compacted) + + seg2 := &Segment{ + segmentID: 2222, + compactedTo: targetSeg.segmentID, + } + seg2.setType(datapb.SegmentType_Compacted) + + channelSegments[seg1.segmentID] = seg1 + channelSegments[seg2.segmentID] = seg2 + channelSegments[targetSeg.segmentID] = targetSeg //2. set up deleteDataBuf for seg1 and seg2 - delDataBuf1 := newDelDataBuf() + delDataBuf1 := newDelDataBuf(seg1.segmentID) delDataBuf1.EntriesNum++ delDataBuf1.updateStartAndEndPosition(nil, &internalpb.MsgPosition{Timestamp: 50}) - delBufferManager.Store(segID1, delDataBuf1) + delBufferManager.updateMeta(seg1.segmentID, delDataBuf1) heap.Push(delBufferManager.delBufHeap, delDataBuf1.item) - delDataBuf2 := newDelDataBuf() + + delDataBuf2 := newDelDataBuf(seg2.segmentID) delDataBuf2.EntriesNum++ delDataBuf2.updateStartAndEndPosition(nil, &internalpb.MsgPosition{Timestamp: 50}) - delBufferManager.Store(segID2, delDataBuf2) + delBufferManager.updateMeta(seg2.segmentID, delDataBuf2) heap.Push(delBufferManager.delBufHeap, delDataBuf2.item) //3. test compact - delBufferManager.CompactSegBuf(compactedToSegID, compactedFromSegIDs) + delBufferManager.UpdateCompactedSegments() //4. expect results in two aspects: //4.1 compactedFrom segments are removed from delBufferManager //4.2 compactedTo seg is set properly with correct entriesNum - _, seg1Exist := delBufferManager.Load(segID1) - _, seg2Exist := delBufferManager.Load(segID2) + _, seg1Exist := delBufferManager.Load(seg1.segmentID) + _, seg2Exist := delBufferManager.Load(seg2.segmentID) assert.False(t, seg1Exist) assert.False(t, seg2Exist) - assert.Equal(t, int64(2), delBufferManager.GetEntriesNum(compactedToSegID)) + assert.Equal(t, int64(2), delBufferManager.GetEntriesNum(targetSeg.segmentID)) // test item of compactedToSegID is correct - compactTo, ok := delBufferManager.Load(compactedToSegID) + targetSegBuf, ok := delBufferManager.Load(targetSeg.segmentID) assert.True(t, ok) - assert.Equal(t, compactedToSegID, compactTo.item.segmentID) + assert.NotNil(t, targetSegBuf.item) + assert.Equal(t, targetSeg.segmentID, targetSegBuf.item.segmentID) //5. test roll and evict (https://github.com/milvus-io/milvus/issues/20501) - delBufferManager.channel.rollDeleteBuffer(compactedToSegID) - _, segCompactedToExist := delBufferManager.Load(compactedToSegID) + delBufferManager.channel.rollDeleteBuffer(targetSeg.segmentID) + _, segCompactedToExist := delBufferManager.Load(targetSeg.segmentID) assert.False(t, segCompactedToExist) - delBufferManager.channel.evictHistoryDeleteBuffer(compactedToSegID, &internalpb.MsgPosition{ + delBufferManager.channel.evictHistoryDeleteBuffer(targetSeg.segmentID, &internalpb.MsgPosition{ Timestamp: 100, }) cp := delBufferManager.channel.getChannelCheckpoint(&internalpb.MsgPosition{ @@ -216,3 +230,94 @@ func Test_CompactSegBuff(t *testing.T) { }) assert.Equal(t, Timestamp(200), cp.Timestamp) // evict all buffer, use ttPos as cp } + +func TestUpdateCompactedSegments(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir)) + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) + + fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, nil, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) + + chanName := "datanode-test-FlowGraphDeletenode-showDelBuf" + testPath := "/test/datanode/root/meta" + assert.NoError(t, clearEtcd(testPath)) + Params.BaseTable.Save("etcd.rootPath", "/test/datanode/root") + + channel := ChannelMeta{ + segments: make(map[UniqueID]*Segment), + } + + c := &nodeConfig{ + channel: &channel, + vChannelName: chanName, + } + delBufManager := &DeltaBufferManager{ + channel: &channel, + delBufHeap: &PriorityQueue{}, + } + delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c) + require.NoError(t, err) + + tests := []struct { + description string + compactToExist bool + + compactedToIDs []UniqueID + compactedFromIDs []UniqueID + + expectedSegsRemain []UniqueID + }{ + {"zero segments", false, + []UniqueID{}, []UniqueID{}, []UniqueID{}}, + {"segment no compaction", false, + []UniqueID{}, []UniqueID{}, []UniqueID{100, 101}}, + {"segment compacted", true, + []UniqueID{200}, []UniqueID{103}, []UniqueID{100, 101}}, + {"segment compacted 100>201", true, + []UniqueID{201}, []UniqueID{100}, []UniqueID{101, 201}}, + {"segment compacted 100+101>201", true, + []UniqueID{201, 201}, []UniqueID{100, 101}, []UniqueID{201}}, + {"segment compacted 100>201, 101>202", true, + []UniqueID{201, 202}, []UniqueID{100, 101}, []UniqueID{201, 202}}, + // false + {"segment compacted 100>201", false, + []UniqueID{201}, []UniqueID{100}, []UniqueID{101}}, + {"segment compacted 100+101>201", false, + []UniqueID{201, 201}, []UniqueID{100, 101}, []UniqueID{}}, + {"segment compacted 100>201, 101>202", false, + []UniqueID{201, 202}, []UniqueID{100, 101}, []UniqueID{}}, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + if test.compactToExist { + for _, segID := range test.compactedToIDs { + seg := Segment{ + segmentID: segID, + numRows: 10, + } + seg.setType(datapb.SegmentType_Flushed) + channel.segments[segID] = &seg + } + } else { // clear all segments in channel + channel.segments = make(map[UniqueID]*Segment) + } + + for i, segID := range test.compactedFromIDs { + seg := Segment{ + segmentID: segID, + compactedTo: test.compactedToIDs[i], + } + seg.setType(datapb.SegmentType_Compacted) + channel.segments[segID] = &seg + } + + delNode.delBufferManager.UpdateCompactedSegments() + + for _, remain := range test.expectedSegsRemain { + delNode.channel.hasSegment(remain, true) + } + }) + } +} diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index e9b1caaede7da85963fc5c1981902e129fe030e1..4dcb6218b7c2a887339700350c08343c3df2339d 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -941,6 +941,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments partitionID: partID, segmentID: req.GetCompactedTo(), numRows: req.GetNumOfRows(), + lastSyncTs: tsoutil.GetCurrentTime(), } err = channel.InitPKstats(ctx, targetSeg, req.GetStatsLogs(), tsoutil.GetCurrentTime()) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 2d97b2483b255b78c9db3fa211abcdb6ed6ba550..9779b15e3c66e33bb3a20c85be8c4fdcd28e369e 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -54,7 +54,7 @@ type dataSyncService struct { dataCoord types.DataCoord // DataCoord instance to interact with clearSignal chan<- string // signal channel to notify flowgraph close for collection/partition drop msg consumed - delBufferManager *DelBufferManager + delBufferManager *DeltaBufferManager flushingSegCache *Cache // a guarding cache stores currently flushing segment ids flushManager flushManager // flush manager handles flush process chunkManager storage.ChunkManager @@ -85,10 +85,9 @@ func newDataSyncService(ctx context.Context, ctx1, cancel := context.WithCancel(ctx) - delBufferManager := &DelBufferManager{ - channel: channel, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, + delBufferManager := &DeltaBufferManager{ + channel: channel, + delBufHeap: &PriorityQueue{}, } service := &dataSyncService{ diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index ecf459264119f2316c1c84079ef63fc65ae2b6f8..3ce44b96a55980d5a209335a7a7464d6db712067 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -39,9 +39,8 @@ type deleteNode struct { BaseNode ctx context.Context channelName string - delBufferManager *DelBufferManager // manager of delete msg + delBufferManager *DeltaBufferManager // manager of delete msg channel Channel - idAllocator allocatorInterface flushManager flushManager clearSignal chan<- string @@ -57,12 +56,12 @@ func (dn *deleteNode) Close() { func (dn *deleteNode) showDelBuf(segIDs []UniqueID, ts Timestamp) { for _, segID := range segIDs { - if _, ok := dn.delBufferManager.Load(segID); ok { + if buffer, ok := dn.delBufferManager.Load(segID); ok { log.Debug("delta buffer status", + zap.Int64("segmentID", segID), zap.Uint64("timestamp", ts), - zap.Int64("segment ID", segID), - zap.Int64("entries", dn.delBufferManager.GetEntriesNum(segID)), - zap.Int64("memory size", dn.delBufferManager.GetSegDelBufMemSize(segID)), + zap.Int64("entriesNum", buffer.GetEntriesNum()), + zap.Int64("memorySize", buffer.GetMemorySize()), zap.String("vChannel", dn.channelName)) } } @@ -92,7 +91,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { } // update compacted segment before operation - dn.updateCompactedSegments() + dn.delBufferManager.UpdateCompactedSegments() // process delete messages segIDs := typeutil.NewUniqueSet() @@ -154,29 +153,6 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { return in } -// update delBuf for compacted segments -func (dn *deleteNode) updateCompactedSegments() { - compactedTo2From := dn.channel.listCompactedSegmentIDs() - - for compactedTo, compactedFrom := range compactedTo2From { - // if the compactedTo segment has 0 numRows, remove all segments related - if !dn.channel.hasSegment(compactedTo, true) { - for _, segID := range compactedFrom { - dn.delBufferManager.Delete(segID) - } - dn.channel.removeSegments(compactedFrom...) - continue - } - - dn.delBufferManager.CompactSegBuf(compactedTo, compactedFrom) - log.Info("update delBuf for compacted segments", - zap.Int64("compactedTo segmentID", compactedTo), - zap.Int64s("compactedFrom segmentIDs", compactedFrom), - ) - dn.channel.removeSegments(compactedFrom...) - } -} - func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange, startPos, endPos *internalpb.MsgPosition) ([]UniqueID, error) { log.Debug("bufferDeleteMsg", zap.Any("primary keys", msg.PrimaryKeys), zap.String("vChannelName", dn.channelName)) @@ -218,7 +194,7 @@ func (dn *deleteNode) filterSegmentByPK(partID UniqueID, pks []primaryKey, tss [ return segID2Pks, segID2Tss } -func newDeleteNode(ctx context.Context, fm flushManager, delBufManager *DelBufferManager, sig chan<- string, config *nodeConfig) (*deleteNode, error) { +func newDeleteNode(ctx context.Context, fm flushManager, manager *DeltaBufferManager, sig chan<- string, config *nodeConfig) (*deleteNode, error) { baseNode := BaseNode{} baseNode.SetMaxQueueLength(config.maxQueueLength) baseNode.SetMaxParallelism(config.maxParallelism) @@ -226,9 +202,8 @@ func newDeleteNode(ctx context.Context, fm flushManager, delBufManager *DelBuffe return &deleteNode{ ctx: ctx, BaseNode: baseNode, - delBufferManager: delBufManager, + delBufferManager: manager, channel: config.channel, - idAllocator: config.allocator, channelName: config.vChannelName, flushManager: fm, clearSignal: sig, diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index b6923dae76bb2ab4f6f8f8af2f813edcda820306..21087364b7a6aec781a52cdc3503711c1d2f5a28 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -184,13 +184,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, channel, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) c := &nodeConfig{ channel: channel, - allocator: &allocator{}, vChannelName: chanName, } - delBufManager := &DelBufferManager{ - channel: channel, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, + delBufManager := &DeltaBufferManager{ + channel: channel, + delBufHeap: &PriorityQueue{}, } dn, err := newDeleteNode(context.Background(), fm, delBufManager, make(chan string, 1), c) @@ -218,14 +216,12 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { t.Run("Test get segment by int64 primary keys", func(te *testing.T) { c := &nodeConfig{ channel: channel, - allocator: &allocator{}, vChannelName: chanName, } - delBufManager := &DelBufferManager{ - channel: channel, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, + delBufManager := &DeltaBufferManager{ + channel: channel, + delBufHeap: &PriorityQueue{}, } dn, err := newDeleteNode(context.Background(), fm, delBufManager, make(chan string, 1), c) @@ -260,13 +256,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { c := &nodeConfig{ channel: channel, - allocator: NewAllocatorFactory(), vChannelName: chanName, } - delBufManager := &DelBufferManager{ - channel: channel, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, + delBufManager := &DeltaBufferManager{ + channel: channel, + delBufHeap: &PriorityQueue{}, } delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c) assert.Nil(te, err) @@ -288,13 +282,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { c := &nodeConfig{ channel: channel, - allocator: NewAllocatorFactory(), vChannelName: chanName, } - delBufManager := &DelBufferManager{ - channel: channel, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, + delBufManager := &DeltaBufferManager{ + channel: channel, + delBufHeap: &PriorityQueue{}, } delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c) assert.Nil(te, err) @@ -322,13 +314,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { c := &nodeConfig{ channel: channel, - allocator: NewAllocatorFactory(), vChannelName: chanName, } - delBufManager := &DelBufferManager{ - channel: channel, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, + delBufManager := &DeltaBufferManager{ + channel: channel, + delBufHeap: &PriorityQueue{}, } sig := make(chan string, 1) delNode, err := newDeleteNode(ctx, fm, delBufManager, sig, c) @@ -366,13 +356,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { c := &nodeConfig{ channel: channel, - allocator: NewAllocatorFactory(), vChannelName: chanName, } - delBufManager := &DelBufferManager{ - channel: channel, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, + delBufManager := &DeltaBufferManager{ + channel: channel, + delBufHeap: &PriorityQueue{}, } delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c) assert.Nil(t, err) @@ -390,7 +378,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { msg.segmentsToSync = []UniqueID{compactedSegment} bufItem := &Item{memorySize: 0} - delNode.delBufferManager.Store(compactedSegment, + delNode.delBufferManager.updateMeta(compactedSegment, &DelDataBuf{delData: &DeleteData{}, item: bufItem}) heap.Push(delNode.delBufferManager.delBufHeap, bufItem) @@ -421,13 +409,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { c := &nodeConfig{ channel: channel, - allocator: NewAllocatorFactory(), vChannelName: chanName, } - delBufManager := &DelBufferManager{ - channel: channel, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, + delBufManager := &DeltaBufferManager{ + channel: channel, + delBufHeap: &PriorityQueue{}, } mockFlushManager := &mockFlushManager{ recordFlushedSeg: true, @@ -453,7 +439,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() delNode.Operate([]flowgraph.Msg{fgMsg}) assert.Equal(t, 0, len(mockFlushManager.flushedSegIDs)) - assert.Equal(t, int64(208), delNode.delBufferManager.delMemorySize) + assert.Equal(t, int64(208), delNode.delBufferManager.usedMemory.Load()) assert.Equal(t, 5, delNode.delBufferManager.delBufHeap.Len()) //3. note that the whole memory size used by 5 segments will be 208 @@ -466,7 +452,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() delNode.Operate([]flowgraph.Msg{fgMsg}) assert.Equal(t, 1, len(mockFlushManager.flushedSegIDs)) - assert.Equal(t, int64(160), delNode.delBufferManager.delMemorySize) + assert.Equal(t, int64(160), delNode.delBufferManager.usedMemory.Load()) assert.Equal(t, 4, delNode.delBufferManager.delBufHeap.Len()) //4. there is no new delete msg and delBufferSize is still 200 @@ -474,7 +460,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() delNode.Operate([]flowgraph.Msg{fgMsg}) assert.Equal(t, 1, len(mockFlushManager.flushedSegIDs)) - assert.Equal(t, int64(160), delNode.delBufferManager.delMemorySize) + assert.Equal(t, int64(160), delNode.delBufferManager.usedMemory.Load()) assert.Equal(t, 4, delNode.delBufferManager.delBufHeap.Len()) //5. we reset buffer bytes to 150, then we expect there would be one more @@ -484,7 +470,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() delNode.Operate([]flowgraph.Msg{fgMsg}) assert.Equal(t, 2, len(mockFlushManager.flushedSegIDs)) - assert.Equal(t, int64(112), delNode.delBufferManager.delMemorySize) + assert.Equal(t, int64(112), delNode.delBufferManager.usedMemory.Load()) assert.Equal(t, 3, delNode.delBufferManager.delBufHeap.Len()) //6. we reset buffer bytes to 60, then most of the segments will be flushed @@ -493,7 +479,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() delNode.Operate([]flowgraph.Msg{fgMsg}) assert.Equal(t, 4, len(mockFlushManager.flushedSegIDs)) - assert.Equal(t, int64(32), delNode.delBufferManager.delMemorySize) + assert.Equal(t, int64(32), delNode.delBufferManager.usedMemory.Load()) assert.Equal(t, 1, delNode.delBufferManager.delBufHeap.Len()) //7. we reset buffer bytes to 20, then as all segment-memory consumption @@ -503,7 +489,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() delNode.Operate([]flowgraph.Msg{fgMsg}) assert.Equal(t, 5, len(mockFlushManager.flushedSegIDs)) - assert.Equal(t, int64(0), delNode.delBufferManager.delMemorySize) + assert.Equal(t, int64(0), delNode.delBufferManager.usedMemory.Load()) assert.Equal(t, 0, delNode.delBufferManager.delBufHeap.Len()) }) } @@ -527,13 +513,11 @@ func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) { } c := &nodeConfig{ channel: channel, - allocator: NewAllocatorFactory(), vChannelName: chanName, } - delBufManager := &DelBufferManager{ - channel: channel, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, + delBufManager := &DeltaBufferManager{ + channel: channel, + delBufHeap: &PriorityQueue{}, } delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c) require.NoError(t, err) @@ -548,105 +532,11 @@ func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) { } for _, test := range tests { - delBuf := newDelDataBuf() + delBuf := newDelDataBuf(test.seg) delBuf.accumulateEntriesNum(test.numRows) - delNode.delBufferManager.Store(test.seg, delBuf) + delNode.delBufferManager.updateMeta(test.seg, delBuf) heap.Push(delNode.delBufferManager.delBufHeap, delBuf.item) } delNode.showDelBuf([]UniqueID{111, 112, 113}, 100) } - -func TestFlowGraphDeleteNode_updateCompactedSegments(t *testing.T) { - - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir)) - defer cm.RemoveWithPrefix(ctx, cm.RootPath()) - - fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, nil, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) - - chanName := "datanode-test-FlowGraphDeletenode-showDelBuf" - testPath := "/test/datanode/root/meta" - assert.NoError(t, clearEtcd(testPath)) - Params.EtcdCfg.MetaRootPath = testPath - - channel := ChannelMeta{ - segments: make(map[UniqueID]*Segment), - } - - c := &nodeConfig{ - channel: &channel, - allocator: NewAllocatorFactory(), - vChannelName: chanName, - } - delBufManager := &DelBufferManager{ - channel: &channel, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, - } - delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c) - require.NoError(t, err) - - tests := []struct { - description string - compactToExist bool - - compactedToIDs []UniqueID - compactedFromIDs []UniqueID - - expectedSegsRemain []UniqueID - }{ - {"zero segments", false, - []UniqueID{}, []UniqueID{}, []UniqueID{}}, - {"segment no compaction", false, - []UniqueID{}, []UniqueID{}, []UniqueID{100, 101}}, - {"segment compacted", true, - []UniqueID{200}, []UniqueID{103}, []UniqueID{100, 101}}, - {"segment compacted 100>201", true, - []UniqueID{201}, []UniqueID{100}, []UniqueID{101, 201}}, - {"segment compacted 100+101>201", true, - []UniqueID{201, 201}, []UniqueID{100, 101}, []UniqueID{201}}, - {"segment compacted 100>201, 101>202", true, - []UniqueID{201, 202}, []UniqueID{100, 101}, []UniqueID{201, 202}}, - // false - {"segment compacted 100>201", false, - []UniqueID{201}, []UniqueID{100}, []UniqueID{101}}, - {"segment compacted 100+101>201", false, - []UniqueID{201, 201}, []UniqueID{100, 101}, []UniqueID{}}, - {"segment compacted 100>201, 101>202", false, - []UniqueID{201, 202}, []UniqueID{100, 101}, []UniqueID{}}, - } - - for _, test := range tests { - t.Run(test.description, func(t *testing.T) { - if test.compactToExist { - for _, segID := range test.compactedToIDs { - seg := Segment{ - segmentID: segID, - numRows: 10, - } - seg.setType(datapb.SegmentType_Flushed) - channel.segments[segID] = &seg - } - } else { // clear all segments in channel - channel.segments = make(map[UniqueID]*Segment) - } - - for i, segID := range test.compactedFromIDs { - seg := Segment{ - segmentID: segID, - compactedTo: test.compactedToIDs[i], - } - seg.setType(datapb.SegmentType_Compacted) - channel.segments[segID] = &seg - } - - delNode.updateCompactedSegments() - - for _, remain := range test.expectedSegsRemain { - delNode.channel.hasSegment(remain, true) - } - }) - } -} diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 930d5a14c327ceaab5ce3c08b86249b01485f1ed..0a18daf7ff7ef5337b6f8b0a67ee14b7651b9e4b 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -48,7 +48,7 @@ type insertBufferNode struct { ctx context.Context channelName string - delBufferManager *DelBufferManager // manager of delete msg + delBufferManager *DeltaBufferManager // manager of delete msg channel Channel idAllocator allocatorInterface @@ -666,7 +666,7 @@ func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID Uni return ibNode.channel.getCollectionAndPartitionID(segmentID) } -func newInsertBufferNode(ctx context.Context, collID UniqueID, delBufManager *DelBufferManager, flushCh <-chan flushMsg, resendTTCh <-chan resendTTMsg, +func newInsertBufferNode(ctx context.Context, collID UniqueID, delBufManager *DeltaBufferManager, flushCh <-chan flushMsg, resendTTCh <-chan resendTTMsg, fm flushManager, flushingSegCache *Cache, config *nodeConfig) (*insertBufferNode, error) { baseNode := BaseNode{} diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 477c963fe8c28230357e880a882ffcdd35129eab..b635f8515c109f528b16e44713e5aa3ca3f4fcc5 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -105,10 +105,9 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: "string", } - delBufManager := &DelBufferManager{ - channel: channel, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, + delBufManager := &DeltaBufferManager{ + channel: channel, + delBufHeap: &PriorityQueue{}, } iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c) @@ -208,10 +207,9 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: "string", } - delBufManager := &DelBufferManager{ - channel: channel, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, + delBufManager := &DeltaBufferManager{ + channel: channel, + delBufHeap: &PriorityQueue{}, } iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c) @@ -382,10 +380,9 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: "string", } - delBufManager := &DelBufferManager{ - channel: channel, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, + delBufManager := &DeltaBufferManager{ + channel: channel, + delBufHeap: &PriorityQueue{}, } iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c) require.NoError(t, err) @@ -626,10 +623,9 @@ func TestRollBF(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: "string", } - delBufManager := &DelBufferManager{ - channel: channel, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, + delBufManager := &DeltaBufferManager{ + channel: channel, + delBufHeap: &PriorityQueue{}, } iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c) require.NoError(t, err) @@ -711,7 +707,7 @@ type InsertBufferNodeSuit struct { suite.Suite channel *ChannelMeta - delBufManager *DelBufferManager + delBufManager *DeltaBufferManager collID UniqueID partID UniqueID @@ -730,10 +726,9 @@ func (s *InsertBufferNodeSuit) SetupSuite() { s.partID = 10 s.channel = newChannel("channel", s.collID, nil, rc, s.cm) - s.delBufManager = &DelBufferManager{ - channel: s.channel, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, + s.delBufManager = &DeltaBufferManager{ + channel: s.channel, + delBufHeap: &PriorityQueue{}, } s.cm = storage.NewLocalChunkManager(storage.RootPath(insertBufferNodeTestDir)) @@ -1018,10 +1013,9 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { allocator: NewAllocatorFactory(), vChannelName: "string", } - delBufManager := &DelBufferManager{ - channel: channel, - delMemorySize: 0, - delBufHeap: &PriorityQueue{}, + delBufManager := &DeltaBufferManager{ + channel: channel, + delBufHeap: &PriorityQueue{}, } iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c) require.NoError(t, err)