未验证 提交 aea5a61e 编写于 作者: C congqixia 提交者: GitHub

Add switch for auto compaction (#12974)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 85efcd85
...@@ -177,6 +177,7 @@ dataCoord: ...@@ -177,6 +177,7 @@ dataCoord:
assignmentExpiration: 2000 # The time of the assignment expiration in ms assignmentExpiration: 2000 # The time of the assignment expiration in ms
compaction: compaction:
enableAutoCompaction: false
retentionDuration: 432000 # 5 days in seconds retentionDuration: 432000 # 5 days in seconds
gc: gc:
......
...@@ -107,6 +107,11 @@ func (t *compactionTrigger) startGlobalCompactionLoop() { ...@@ -107,6 +107,11 @@ func (t *compactionTrigger) startGlobalCompactionLoop() {
defer logutil.LogPanic() defer logutil.LogPanic()
defer t.wg.Done() defer t.wg.Done()
// If AutoCompaction diabled, global loop will not start
if !Params.EnableAutoCompaction {
return
}
for { for {
select { select {
case <-t.quit: case <-t.quit:
...@@ -150,6 +155,11 @@ func (t *compactionTrigger) triggerCompaction(timetravel *timetravel) error { ...@@ -150,6 +155,11 @@ func (t *compactionTrigger) triggerCompaction(timetravel *timetravel) error {
// triggerSingleCompaction triger a compaction bundled with collection-partiiton-channel-segment // triggerSingleCompaction triger a compaction bundled with collection-partiiton-channel-segment
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, timetravel *timetravel) error { func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, timetravel *timetravel) error {
// If AutoCompaction diabled, flush request will not trigger compaction
if !Params.EnableAutoCompaction {
return nil
}
id, err := t.allocSignalID() id, err := t.allocSignalID()
if err != nil { if err != nil {
return err return err
...@@ -244,7 +254,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { ...@@ -244,7 +254,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
log.Debug("global merge compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("plans", getPlanIDs(mergeCompactionPlans))) log.Debug("global merge compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("plans", getPlanIDs(mergeCompactionPlans)))
} }
log.Info("handle global compaction cost", zap.Int64("millliseconds", time.Since(t1).Milliseconds())) log.Info("handle global compaction cost", zap.Int64("milliseconds", time.Since(t1).Milliseconds()))
} }
func (t *compactionTrigger) handleSignal(signal *compactionSignal) { func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
...@@ -262,7 +272,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { ...@@ -262,7 +272,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
if err != nil { if err != nil {
log.Warn("failed to do single compaction", zap.Int64("segmentID", segment.ID), zap.Error(err)) log.Warn("failed to do single compaction", zap.Int64("segmentID", segment.ID), zap.Error(err))
} else { } else {
log.Info("time cost of generating single compaction plan", zap.Int64("milllis", time.Since(t1).Milliseconds()), log.Info("time cost of generating single compaction plan", zap.Int64("millis", time.Since(t1).Milliseconds()),
zap.Int64("planID", singleCompactionPlan.GetPlanID()), zap.Int64("signalID", signal.id)) zap.Int64("planID", singleCompactionPlan.GetPlanID()), zap.Int64("signalID", signal.id))
} }
......
...@@ -263,6 +263,7 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) { ...@@ -263,6 +263,7 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) {
mergeCompactionPolicy mergeCompactionPolicy mergeCompactionPolicy mergeCompactionPolicy
compactionHandler compactionPlanContext compactionHandler compactionPlanContext
mergeCompactionSegmentThreshold int mergeCompactionSegmentThreshold int
autoCompactionEnabled bool
} }
type args struct { type args struct {
timetravel *timetravel timetravel *timetravel
...@@ -343,6 +344,7 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) { ...@@ -343,6 +344,7 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) {
(mergeCompactionFunc)(greedyMergeCompaction), (mergeCompactionFunc)(greedyMergeCompaction),
&spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 2)}, &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 2)},
2, 2,
true,
}, },
args{ args{
&timetravel{200}, &timetravel{200},
...@@ -396,10 +398,88 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) { ...@@ -396,10 +398,88 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) {
}, },
}, },
}, },
{
"test auto compaction diabled",
fields{
&meta{
segments: &SegmentsInfo{
map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 10,
MaxRowNum: 100,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{FieldID: 1, Binlogs: []string{"binlog1"}},
},
Deltalogs: []*datapb.DeltaLogInfo{
{RecordEntries: 5, DeltaLogPath: "deltalog1"},
},
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 300,
MaxRowNum: 1000,
InsertChannel: "ch2",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{FieldID: 1, Binlogs: []string{"binlog2"}},
},
Deltalogs: []*datapb.DeltaLogInfo{
{RecordEntries: 5, DeltaLogPath: "deltalog2"},
},
},
},
3: {
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 300,
MaxRowNum: 1000,
InsertChannel: "ch2",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{FieldID: 1, Binlogs: []string{"binlog3"}},
},
Deltalogs: []*datapb.DeltaLogInfo{
{RecordEntries: 5, DeltaLogPath: "deltalog3"},
},
},
},
},
},
},
newMockAllocator(),
make(chan *compactionSignal, 1),
(singleCompactionFunc)(chooseAllBinlogs),
(mergeCompactionFunc)(greedyMergeCompaction),
&spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 2)},
2,
false,
},
args{
&timetravel{200},
},
false,
[]*datapb.CompactionPlan{},
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
Params.EnableAutoCompaction = tt.fields.autoCompactionEnabled
tr := &compactionTrigger{ tr := &compactionTrigger{
meta: tt.fields.meta, meta: tt.fields.meta,
allocator: tt.fields.allocator, allocator: tt.fields.allocator,
...@@ -436,6 +516,7 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) { ...@@ -436,6 +516,7 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) {
mergeCompactionPolicy mergeCompactionPolicy mergeCompactionPolicy mergeCompactionPolicy
compactionHandler compactionPlanContext compactionHandler compactionPlanContext
globalTrigger *time.Ticker globalTrigger *time.Ticker
enableAutoCompaction bool
} }
type args struct { type args struct {
collectionID int64 collectionID int64
...@@ -502,7 +583,8 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) { ...@@ -502,7 +583,8 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) {
compactionHandler: &spyCompactionHandler{ compactionHandler: &spyCompactionHandler{
spyChan: make(chan *datapb.CompactionPlan, 1), spyChan: make(chan *datapb.CompactionPlan, 1),
}, },
globalTrigger: time.NewTicker(time.Hour), globalTrigger: time.NewTicker(time.Hour),
enableAutoCompaction: true,
}, },
args: args{ args: args{
collectionID: 1, collectionID: 1,
...@@ -585,7 +667,8 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) { ...@@ -585,7 +667,8 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) {
compactionHandler: &spyCompactionHandler{ compactionHandler: &spyCompactionHandler{
spyChan: make(chan *datapb.CompactionPlan, 1), spyChan: make(chan *datapb.CompactionPlan, 1),
}, },
globalTrigger: time.NewTicker(time.Hour), globalTrigger: time.NewTicker(time.Hour),
enableAutoCompaction: true,
}, },
args: args{ args: args{
collectionID: 1, collectionID: 1,
...@@ -664,7 +747,8 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) { ...@@ -664,7 +747,8 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) {
compactionHandler: &spyCompactionHandler{ compactionHandler: &spyCompactionHandler{
spyChan: make(chan *datapb.CompactionPlan, 1), spyChan: make(chan *datapb.CompactionPlan, 1),
}, },
globalTrigger: time.NewTicker(time.Hour), globalTrigger: time.NewTicker(time.Hour),
enableAutoCompaction: true,
}, },
args: args{ args: args{
collectionID: 1, collectionID: 1,
...@@ -693,10 +777,95 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) { ...@@ -693,10 +777,95 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) {
}, },
}, },
}, },
{
name: "auto compacted disabled",
fields: fields{
meta: &meta{
segments: &SegmentsInfo{
map[int64]*SegmentInfo{
101: {
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
PartitionID: 10,
InsertChannel: "test_chan_01",
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 12000,
LastExpireTime: 100,
StartPosition: &internalpb.MsgPosition{
ChannelName: "",
MsgID: []byte{},
MsgGroup: "",
Timestamp: 10,
},
DmlPosition: &internalpb.MsgPosition{
ChannelName: "",
MsgID: []byte{},
MsgGroup: "",
Timestamp: 45,
},
Binlogs: []*datapb.FieldBinlog{},
Statslogs: []*datapb.FieldBinlog{},
Deltalogs: []*datapb.DeltaLogInfo{
{
RecordEntries: 1000,
TimestampFrom: 10,
TimestampTo: 20,
},
{
RecordEntries: 1001,
TimestampFrom: 30,
TimestampTo: 45,
},
},
CreatedByCompaction: false,
CompactionFrom: []int64{},
},
isCompacting: false,
},
},
},
},
allocator: newMockAllocator(),
signals: make(chan *compactionSignal, 1),
singleCompactionPolicy: (singleCompactionFunc)(chooseAllBinlogs),
mergeCompactionPolicy: (mergeCompactionFunc)(greedyGeneratePlans),
compactionHandler: &spyCompactionHandler{
spyChan: make(chan *datapb.CompactionPlan, 1),
},
globalTrigger: time.NewTicker(time.Hour),
enableAutoCompaction: false,
},
args: args{
collectionID: 1,
partitionID: 10,
segmentID: 101,
channelName: "test_ch_01",
timetravel: &timetravel{
time: 30,
},
},
wantErr: false,
wantPlan: false,
wantBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 101,
FieldBinlogs: []*datapb.FieldBinlog{},
Field2StatslogPaths: []*datapb.FieldBinlog{},
Deltalogs: []*datapb.DeltaLogInfo{
{
RecordEntries: 2001,
},
},
},
},
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
Params.EnableAutoCompaction = tt.fields.enableAutoCompaction
tr := &compactionTrigger{ tr := &compactionTrigger{
meta: tt.fields.meta, meta: tt.fields.meta,
allocator: tt.fields.allocator, allocator: tt.fields.allocator,
......
...@@ -81,6 +81,7 @@ type ParamTable struct { ...@@ -81,6 +81,7 @@ type ParamTable struct {
EnableGarbageCollection bool EnableGarbageCollection bool
CompactionRetentionDuration int64 CompactionRetentionDuration int64
EnableAutoCompaction bool
// Garbage Collection // Garbage Collection
GCInterval time.Duration GCInterval time.Duration
...@@ -135,6 +136,7 @@ func (p *ParamTable) Init() { ...@@ -135,6 +136,7 @@ func (p *ParamTable) Init() {
p.initMinioRootPath() p.initMinioRootPath()
p.initCompactionRetentionDuration() p.initCompactionRetentionDuration()
p.initEnableAutoCompaction()
p.initEnableGarbageCollection() p.initEnableGarbageCollection()
p.initGCInterval() p.initGCInterval()
...@@ -378,3 +380,7 @@ func (p *ParamTable) initMinioRootPath() { ...@@ -378,3 +380,7 @@ func (p *ParamTable) initMinioRootPath() {
func (p *ParamTable) initCompactionRetentionDuration() { func (p *ParamTable) initCompactionRetentionDuration() {
p.CompactionRetentionDuration = p.ParseInt64WithDefault("dataCoord.compaction.retentionDuration", 432000) p.CompactionRetentionDuration = p.ParseInt64WithDefault("dataCoord.compaction.retentionDuration", 432000)
} }
func (p *ParamTable) initEnableAutoCompaction() {
p.EnableAutoCompaction = p.ParseBool("dataCoord.compaction.enableAutoCompaction", false)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册