diff --git a/configs/milvus.yaml b/configs/milvus.yaml index e1b48501428ba05d617346b8438c5a6bfa5be76d..b5e2507aaca24e5ce669d0cde743a1c9afc17f58 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -177,6 +177,7 @@ dataCoord: assignmentExpiration: 2000 # The time of the assignment expiration in ms compaction: + enableAutoCompaction: false retentionDuration: 432000 # 5 days in seconds gc: diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 3b7f570e736b23a15674a5ba1b576d0732beb277..48a7a04d9f50b8995180e5ad4336179ca4d61793 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -107,6 +107,11 @@ func (t *compactionTrigger) startGlobalCompactionLoop() { defer logutil.LogPanic() defer t.wg.Done() + // If AutoCompaction diabled, global loop will not start + if !Params.EnableAutoCompaction { + return + } + for { select { case <-t.quit: @@ -150,6 +155,11 @@ func (t *compactionTrigger) triggerCompaction(timetravel *timetravel) error { // triggerSingleCompaction triger a compaction bundled with collection-partiiton-channel-segment func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, timetravel *timetravel) error { + // If AutoCompaction diabled, flush request will not trigger compaction + if !Params.EnableAutoCompaction { + return nil + } + id, err := t.allocSignalID() if err != nil { return err @@ -244,7 +254,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { log.Debug("global merge compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("plans", getPlanIDs(mergeCompactionPlans))) } - log.Info("handle global compaction cost", zap.Int64("millliseconds", time.Since(t1).Milliseconds())) + log.Info("handle global compaction cost", zap.Int64("milliseconds", time.Since(t1).Milliseconds())) } func (t *compactionTrigger) handleSignal(signal *compactionSignal) { @@ -262,7 +272,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { if err != nil { log.Warn("failed to do single compaction", zap.Int64("segmentID", segment.ID), zap.Error(err)) } else { - log.Info("time cost of generating single compaction plan", zap.Int64("milllis", time.Since(t1).Milliseconds()), + log.Info("time cost of generating single compaction plan", zap.Int64("millis", time.Since(t1).Milliseconds()), zap.Int64("planID", singleCompactionPlan.GetPlanID()), zap.Int64("signalID", signal.id)) } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 8a33c41cb3095548e5a5d0bd17af03e0c9528aaf..1b21a3224a8411e84de58594e42157ca1da1b075 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -263,6 +263,7 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) { mergeCompactionPolicy mergeCompactionPolicy compactionHandler compactionPlanContext mergeCompactionSegmentThreshold int + autoCompactionEnabled bool } type args struct { timetravel *timetravel @@ -343,6 +344,7 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) { (mergeCompactionFunc)(greedyMergeCompaction), &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 2)}, 2, + true, }, args{ &timetravel{200}, @@ -396,10 +398,88 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) { }, }, }, + { + "test auto compaction diabled", + fields{ + &meta{ + segments: &SegmentsInfo{ + map[int64]*SegmentInfo{ + 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 1, + PartitionID: 1, + LastExpireTime: 100, + NumOfRows: 10, + MaxRowNum: 100, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Binlogs: []*datapb.FieldBinlog{ + {FieldID: 1, Binlogs: []string{"binlog1"}}, + }, + Deltalogs: []*datapb.DeltaLogInfo{ + {RecordEntries: 5, DeltaLogPath: "deltalog1"}, + }, + }, + }, + 2: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 2, + PartitionID: 1, + LastExpireTime: 100, + NumOfRows: 300, + MaxRowNum: 1000, + InsertChannel: "ch2", + State: commonpb.SegmentState_Flushed, + Binlogs: []*datapb.FieldBinlog{ + {FieldID: 1, Binlogs: []string{"binlog2"}}, + }, + Deltalogs: []*datapb.DeltaLogInfo{ + {RecordEntries: 5, DeltaLogPath: "deltalog2"}, + }, + }, + }, + 3: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 3, + CollectionID: 2, + PartitionID: 1, + LastExpireTime: 100, + NumOfRows: 300, + MaxRowNum: 1000, + InsertChannel: "ch2", + State: commonpb.SegmentState_Flushed, + Binlogs: []*datapb.FieldBinlog{ + {FieldID: 1, Binlogs: []string{"binlog3"}}, + }, + Deltalogs: []*datapb.DeltaLogInfo{ + {RecordEntries: 5, DeltaLogPath: "deltalog3"}, + }, + }, + }, + }, + }, + }, + newMockAllocator(), + make(chan *compactionSignal, 1), + (singleCompactionFunc)(chooseAllBinlogs), + (mergeCompactionFunc)(greedyMergeCompaction), + &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 2)}, + 2, + false, + }, + args{ + &timetravel{200}, + }, + false, + []*datapb.CompactionPlan{}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + Params.EnableAutoCompaction = tt.fields.autoCompactionEnabled tr := &compactionTrigger{ meta: tt.fields.meta, allocator: tt.fields.allocator, @@ -436,6 +516,7 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) { mergeCompactionPolicy mergeCompactionPolicy compactionHandler compactionPlanContext globalTrigger *time.Ticker + enableAutoCompaction bool } type args struct { collectionID int64 @@ -502,7 +583,8 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) { compactionHandler: &spyCompactionHandler{ spyChan: make(chan *datapb.CompactionPlan, 1), }, - globalTrigger: time.NewTicker(time.Hour), + globalTrigger: time.NewTicker(time.Hour), + enableAutoCompaction: true, }, args: args{ collectionID: 1, @@ -585,7 +667,8 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) { compactionHandler: &spyCompactionHandler{ spyChan: make(chan *datapb.CompactionPlan, 1), }, - globalTrigger: time.NewTicker(time.Hour), + globalTrigger: time.NewTicker(time.Hour), + enableAutoCompaction: true, }, args: args{ collectionID: 1, @@ -664,7 +747,8 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) { compactionHandler: &spyCompactionHandler{ spyChan: make(chan *datapb.CompactionPlan, 1), }, - globalTrigger: time.NewTicker(time.Hour), + globalTrigger: time.NewTicker(time.Hour), + enableAutoCompaction: true, }, args: args{ collectionID: 1, @@ -693,10 +777,95 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) { }, }, }, + { + name: "auto compacted disabled", + fields: fields{ + meta: &meta{ + segments: &SegmentsInfo{ + map[int64]*SegmentInfo{ + 101: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 101, + CollectionID: 1, + PartitionID: 10, + InsertChannel: "test_chan_01", + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 12000, + LastExpireTime: 100, + StartPosition: &internalpb.MsgPosition{ + ChannelName: "", + MsgID: []byte{}, + MsgGroup: "", + Timestamp: 10, + }, + DmlPosition: &internalpb.MsgPosition{ + ChannelName: "", + MsgID: []byte{}, + MsgGroup: "", + Timestamp: 45, + }, + Binlogs: []*datapb.FieldBinlog{}, + Statslogs: []*datapb.FieldBinlog{}, + Deltalogs: []*datapb.DeltaLogInfo{ + { + RecordEntries: 1000, + TimestampFrom: 10, + TimestampTo: 20, + }, + { + RecordEntries: 1001, + TimestampFrom: 30, + TimestampTo: 45, + }, + }, + CreatedByCompaction: false, + CompactionFrom: []int64{}, + }, + isCompacting: false, + }, + }, + }, + }, + allocator: newMockAllocator(), + signals: make(chan *compactionSignal, 1), + singleCompactionPolicy: (singleCompactionFunc)(chooseAllBinlogs), + mergeCompactionPolicy: (mergeCompactionFunc)(greedyGeneratePlans), + compactionHandler: &spyCompactionHandler{ + spyChan: make(chan *datapb.CompactionPlan, 1), + }, + globalTrigger: time.NewTicker(time.Hour), + enableAutoCompaction: false, + }, + args: args{ + collectionID: 1, + partitionID: 10, + segmentID: 101, + channelName: "test_ch_01", + timetravel: &timetravel{ + time: 30, + }, + }, + wantErr: false, + wantPlan: false, + wantBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 101, + FieldBinlogs: []*datapb.FieldBinlog{}, + Field2StatslogPaths: []*datapb.FieldBinlog{}, + Deltalogs: []*datapb.DeltaLogInfo{ + { + RecordEntries: 2001, + }, + }, + }, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + Params.EnableAutoCompaction = tt.fields.enableAutoCompaction tr := &compactionTrigger{ meta: tt.fields.meta, allocator: tt.fields.allocator, diff --git a/internal/datacoord/param_table.go b/internal/datacoord/param_table.go index 329f323f059dfcfb284f34c31cc25c5ba1cb702f..af39206a043d2290c7ba021d0c5e30adeb5acb86 100644 --- a/internal/datacoord/param_table.go +++ b/internal/datacoord/param_table.go @@ -81,6 +81,7 @@ type ParamTable struct { EnableGarbageCollection bool CompactionRetentionDuration int64 + EnableAutoCompaction bool // Garbage Collection GCInterval time.Duration @@ -135,6 +136,7 @@ func (p *ParamTable) Init() { p.initMinioRootPath() p.initCompactionRetentionDuration() + p.initEnableAutoCompaction() p.initEnableGarbageCollection() p.initGCInterval() @@ -378,3 +380,7 @@ func (p *ParamTable) initMinioRootPath() { func (p *ParamTable) initCompactionRetentionDuration() { p.CompactionRetentionDuration = p.ParseInt64WithDefault("dataCoord.compaction.retentionDuration", 432000) } + +func (p *ParamTable) initEnableAutoCompaction() { + p.EnableAutoCompaction = p.ParseBool("dataCoord.compaction.enableAutoCompaction", false) +}