diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 827178f11421027982db684bf7da832400b29eeb..a7bc1fae45f9c4db68e4a5ed43ea1a4e9544a644 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -770,11 +770,14 @@ func (m *meta) CompleteMergeCompaction(compactionLogs []*datapb.CompactionSegmen } data[k] = v } - k, v, err := m.marshal(segment) - if err != nil { - return err + + if segment.NumOfRows > 0 { + k, v, err := m.marshal(segment) + if err != nil { + return err + } + data[k] = v } - data[k] = v if err := m.saveKvTxn(data); err != nil { return err @@ -784,7 +787,10 @@ func (m *meta) CompleteMergeCompaction(compactionLogs []*datapb.CompactionSegmen m.segments.DropSegment(s.GetID()) } - m.segments.SetSegment(segment.GetID(), segment) + // Handle empty segment generated by merge-compaction + if segment.NumOfRows > 0 { + m.segments.SetSegment(segment.GetID(), segment) + } return nil } @@ -793,6 +799,17 @@ func (m *meta) CompleteInnerCompaction(segmentBinlogs *datapb.CompactionSegmentB defer m.Unlock() if segment := m.segments.GetSegment(segmentBinlogs.SegmentID); segment != nil { + // The compaction deletes the entire segment + if result.NumOfRows <= 0 { + err := m.removeSegmentInfo(segment) + if err != nil { + return err + } + + m.segments.DropSegment(segment.ID) + return nil + } + cloned := segment.Clone() cloned.Binlogs = m.updateBinlogs(cloned.GetBinlogs(), segmentBinlogs.GetFieldBinlogs(), result.GetInsertLogs()) cloned.Statslogs = m.updateBinlogs(cloned.GetStatslogs(), segmentBinlogs.GetField2StatslogPaths(), result.GetField2StatslogPaths()) diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 4a015229eaa29c8a1d8b5ca6fdf79ff88981f816..ca67f08d9b561a525ef9e54cd32a2eb00eb08c74 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -350,7 +350,7 @@ func Test_meta_CompleteMergeCompaction(t *testing.T) { wantErr bool }{ { - "test normal merge", + "test normal merge compaction", fields{ memkv.NewMemoryKV(), nil, @@ -389,6 +389,52 @@ func Test_meta_CompleteMergeCompaction(t *testing.T) { InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log5")}, Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog5")}, Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog5")}, + NumOfRows: 1, + }, + }, + false, + }, + { + "test removing all data merge compaction", + fields{ + memkv.NewMemoryKV(), + nil, + &SegmentsInfo{map[int64]*SegmentInfo{ + 1: {SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1", "log2")}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog1", "statlog2")}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog1", "deltalog2")}, + }}, + 2: {SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3", "log4")}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog3", "statlog4")}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog3", "deltalog4")}, + }}, + }}, + }, + args{ + []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 1, + FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1", "log2")}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog1", "statlog2")}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog1", "deltalog2")}, + }, + { + SegmentID: 2, + FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3", "log4")}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog3", "statlog4")}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog3", "deltalog4")}, + }, + }, + &datapb.CompactionResult{ + SegmentID: 3, + InsertLogs: nil, + Field2StatslogPaths: nil, + Deltalogs: nil, + NumOfRows: 0, }, }, false, @@ -408,10 +454,12 @@ func Test_meta_CompleteMergeCompaction(t *testing.T) { assert.Nil(t, m.GetSegment(l.GetSegmentID())) } segment := m.GetSegment(tt.args.result.SegmentID) - assert.NotNil(t, segment) - assert.EqualValues(t, tt.args.result.GetInsertLogs(), segment.GetBinlogs()) - assert.EqualValues(t, tt.args.result.GetField2StatslogPaths(), segment.GetStatslogs()) - assert.EqualValues(t, tt.args.result.GetDeltalogs(), segment.GetDeltalogs()) + assert.Equal(t, segment != nil, tt.args.result.NumOfRows > 0) + if segment != nil { + assert.EqualValues(t, tt.args.result.GetInsertLogs(), segment.GetBinlogs()) + assert.EqualValues(t, tt.args.result.GetField2StatslogPaths(), segment.GetStatslogs()) + assert.EqualValues(t, tt.args.result.GetDeltalogs(), segment.GetDeltalogs()) + } } }) } @@ -435,7 +483,7 @@ func Test_meta_CompleteInnerCompaction(t *testing.T) { want *SegmentInfo }{ { - "test normal merge", + "test normal inner compaction", fields{ memkv.NewMemoryKV(), nil, @@ -462,6 +510,7 @@ func Test_meta_CompleteInnerCompaction(t *testing.T) { InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3")}, Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog3")}, Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog3")}, + NumOfRows: 1, }, }, false, @@ -474,6 +523,40 @@ func Test_meta_CompleteInnerCompaction(t *testing.T) { }, }, }, + { + "test removing all data inner compaction", + fields{ + memkv.NewMemoryKV(), + nil, + &SegmentsInfo{ + map[int64]*SegmentInfo{ + 1: {SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1", "log2")}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog1", "statlog2")}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog1", "deltalog2")}, + }}, + }, + }, + }, + args{ + &datapb.CompactionSegmentBinlogs{ + SegmentID: 1, + FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1", "log2")}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog1", "statlog2")}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog1", "deltalog2")}, + }, + &datapb.CompactionResult{ + SegmentID: 1, + InsertLogs: nil, + Field2StatslogPaths: nil, + Deltalogs: nil, + NumOfRows: 0, + }, + }, + false, + nil, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 4348093b360aff602c607042a6e1ea82baa4f072..cd3bf893aac5e32891017ea7ca52437e355dc74a 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -466,17 +466,18 @@ func (t *compactionTask) compact() error { } uploadStart := time.Now() - cpaths, err := t.upload(ctxTimeout, targetSegID, partID, iDatas, deltaBuf.delData, meta) + segPaths, err := t.upload(ctxTimeout, targetSegID, partID, iDatas, deltaBuf.delData, meta) if err != nil { log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) return err } + uploadEnd := time.Now() defer func() { log.Debug("upload elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(uploadEnd.Sub(uploadStart)))) }() - for _, fbl := range cpaths.deltaInfo { + for _, fbl := range segPaths.deltaInfo { for _, deltaLogInfo := range fbl.GetBinlogs() { deltaLogInfo.LogSize = deltaBuf.GetLogSize() deltaLogInfo.TimestampFrom = deltaBuf.GetTimestampFrom() @@ -488,9 +489,9 @@ func (t *compactionTask) compact() error { pack := &datapb.CompactionResult{ PlanID: t.plan.GetPlanID(), SegmentID: targetSegID, - InsertLogs: cpaths.inPaths, - Field2StatslogPaths: cpaths.statsPaths, - Deltalogs: cpaths.deltaInfo, + InsertLogs: segPaths.inPaths, + Field2StatslogPaths: segPaths.statsPaths, + Deltalogs: segPaths.deltaInfo, NumOfRows: numRows, } @@ -512,7 +513,11 @@ func (t *compactionTask) compact() error { // Compaction I: update pk range. // Compaction II: remove the segments and add a new flushed segment with pk range. if t.hasSegment(targetSegID, true) { - t.refreshFlushedSegStatistics(targetSegID, numRows) + if numRows <= 0 { + t.removeSegments(targetSegID) + } else { + t.refreshFlushedSegStatistics(targetSegID, numRows) + } // no need to shorten the PK range of a segment, deleting dup PKs is valid } else { t.mergeFlushedSegments(targetSegID, collID, partID, t.plan.GetPlanID(), segIDs, t.plan.GetChannel(), numRows) @@ -527,9 +532,9 @@ func (t *compactionTask) compact() error { log.Info("compaction done", zap.Int64("planID", t.plan.GetPlanID()), - zap.Int("num of binlog paths", len(cpaths.inPaths)), - zap.Int("num of stats paths", len(cpaths.statsPaths)), - zap.Int("num of delta paths", len(cpaths.deltaInfo)), + zap.Int("num of binlog paths", len(segPaths.inPaths)), + zap.Int("num of stats paths", len(segPaths.statsPaths)), + zap.Int("num of delta paths", len(segPaths.deltaInfo)), ) log.Info("overall elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(time.Since(compactStart)))) diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 54d401b162652713925f08a3c3bf42d83bcc8e01..eba9b6faea9e93f78acf19a2409510f8742aa0a8 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -422,7 +422,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { mockbIO := &binlogIO{mockKv, alloc} replica, err := newReplica(context.TODO(), rc, collID) require.NoError(t, err) - replica.addFlushedSegmentWithPKs(segID, collID, partID, "channelname", 2, []UniqueID{1}) + replica.addFlushedSegmentWithPKs(segID, collID, partID, "channelname", 2, []UniqueID{1, 2}) iData := genInsertData() meta := NewMetaFactory().GetCollectionMeta(collID, "test_compact_coll_name") @@ -473,6 +473,28 @@ func TestCompactorInterfaceMethods(t *testing.T) { planID := task.getPlanID() assert.Equal(t, plan.GetPlanID(), planID) + // Compact to delete the entire segment + deleteAllData := &DeleteData{ + Pks: []UniqueID{1, 2}, + Tss: []Timestamp{20000, 20001}, + RowCount: 2, + } + + err = mockKv.RemoveWithPrefix("/") + require.NoError(t, err) + cpaths, err = mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, deleteAllData, meta) + require.NoError(t, err) + plan.PlanID++ + + err = task.compact() + assert.NoError(t, err) + // The segment should be removed + assert.False(t, replica.hasSegment(segID, true)) + + // re-add the segment + replica.addFlushedSegmentWithPKs(segID, collID, partID, "channelname", 2, []UniqueID{1, 2}) + + // Compact empty segment err = mockKv.RemoveWithPrefix("/") require.NoError(t, err) cpaths, err = mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, dData, meta) @@ -515,7 +537,6 @@ func TestCompactorInterfaceMethods(t *testing.T) { mockfm.sleepSeconds = plan.TimeoutInSeconds + int32(1) err = task.compact() assert.Error(t, err) - }) t.Run("Test typeII compact valid", func(t *testing.T) {