From 3a2aace6f03adc3b05792b2bb93a65421e07b8ec Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Tue, 14 Dec 2021 13:55:07 +0800 Subject: [PATCH] Fix ci issue caused by only compact flushed segments (#13317) Signed-off-by: sunby Co-authored-by: sunby --- internal/datacoord/compaction_trigger.go | 15 ++- internal/datacoord/compaction_trigger_test.go | 94 ++++++++++++++++++- internal/datacoord/mock_test.go | 4 +- 3 files changed, 106 insertions(+), 7 deletions(-) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 48a7a04d9..edf38776e 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -304,7 +304,7 @@ func (t *compactionTrigger) globalMergeCompaction(signal *compactionSignal, isFo _, has := colls[segment.GetCollectionID()] return (has || len(collections) == 0) && // if filters collection isSegmentHealthy(segment) && - segment.State == commonpb.SegmentState_Flushed && // flushed only + isFlush(segment) && !segment.isCompacting // not compacting now }) // m is list of chanPartSegments, which is channel-partition organized segments plans := make([]*datapb.CompactionPlan, 0) @@ -354,7 +354,7 @@ func (t *compactionTrigger) getCandidateSegments(channel string, partitionID Uni segments := t.meta.GetSegmentsByChannel(channel) res := make([]*SegmentInfo, 0) for _, s := range segments { - if s.GetState() != commonpb.SegmentState_Flushed || s.GetInsertChannel() != channel || + if !isFlush(s) || s.GetInsertChannel() != channel || s.GetPartitionID() != partitionID || s.isCompacting { continue } @@ -375,11 +375,16 @@ func (t *compactionTrigger) shouldDoMergeCompaction(segments []*SegmentInfo) boo func (t *compactionTrigger) fillOriginPlan(plan *datapb.CompactionPlan) error { // TODO context - id, err := t.allocator.allocID(context.Background()) + id, err := t.allocator.allocID(context.TODO()) + if err != nil { + return err + } + ts, err := t.allocator.allocTimestamp(context.TODO()) if err != nil { return err } plan.PlanID = id + plan.StartTime = ts plan.TimeoutInSeconds = maxCompactionTimeoutInSeconds return nil } @@ -444,3 +449,7 @@ func (t *compactionTrigger) singleCompaction(segment *SegmentInfo, isForce bool, } return plan, t.compactionHandler.execCompactionPlan(signal, plan) } + +func isFlush(segment *SegmentInfo) bool { + return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing +} diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 1b21a3224..b7b786e3c 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -160,6 +160,7 @@ func Test_compactionTrigger_forceTriggerCompaction(t *testing.T) { }, }, }, + StartTime: 3, TimeoutInSeconds: maxCompactionTimeoutInSeconds, Type: datapb.CompactionType_MergeCompaction, Timetravel: 200, @@ -219,6 +220,7 @@ func Test_compactionTrigger_forceTriggerCompaction(t *testing.T) { }, }, }, + StartTime: 3, TimeoutInSeconds: maxCompactionTimeoutInSeconds, Type: datapb.CompactionType_InnerCompaction, Timetravel: 200, @@ -364,13 +366,14 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) { }, }, }, + StartTime: 3, TimeoutInSeconds: maxCompactionTimeoutInSeconds, Type: datapb.CompactionType_InnerCompaction, Timetravel: 200, Channel: "ch1", }, { - PlanID: 3, + PlanID: 4, SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ { SegmentID: 2, @@ -391,6 +394,7 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) { }, }, }, + StartTime: 5, TimeoutInSeconds: maxCompactionTimeoutInSeconds, Type: datapb.CompactionType_MergeCompaction, Timetravel: 200, @@ -475,6 +479,94 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) { false, []*datapb.CompactionPlan{}, }, + { + "test merge flushing segment", + fields{ + &meta{ + segments: &SegmentsInfo{ + map[int64]*SegmentInfo{ + 2: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 2, + PartitionID: 1, + LastExpireTime: 100, + NumOfRows: 300, + MaxRowNum: 1000, + InsertChannel: "ch2", + State: commonpb.SegmentState_Flushing, + Binlogs: []*datapb.FieldBinlog{ + {FieldID: 1, Binlogs: []string{"binlog2"}}, + }, + Deltalogs: []*datapb.DeltaLogInfo{ + {RecordEntries: 5, DeltaLogPath: "deltalog2"}, + }, + }, + }, + 3: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 3, + CollectionID: 2, + PartitionID: 1, + LastExpireTime: 100, + NumOfRows: 300, + MaxRowNum: 1000, + InsertChannel: "ch2", + State: commonpb.SegmentState_Flushing, + Binlogs: []*datapb.FieldBinlog{ + {FieldID: 1, Binlogs: []string{"binlog3"}}, + }, + Deltalogs: []*datapb.DeltaLogInfo{ + {RecordEntries: 5, DeltaLogPath: "deltalog3"}, + }, + }, + }, + }, + }, + }, + newMockAllocator(), + make(chan *compactionSignal, 1), + (singleCompactionFunc)(chooseAllBinlogs), + (mergeCompactionFunc)(greedyMergeCompaction), + &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 2)}, + 2, + true, + }, + args{ + &timetravel{200}, + }, + false, + []*datapb.CompactionPlan{ + { + PlanID: 2, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 2, + FieldBinlogs: []*datapb.FieldBinlog{ + {FieldID: 1, Binlogs: []string{"binlog2"}}, + }, + Deltalogs: []*datapb.DeltaLogInfo{ + {RecordEntries: 5, DeltaLogPath: "deltalog2"}, + }, + }, + { + SegmentID: 3, + FieldBinlogs: []*datapb.FieldBinlog{ + {FieldID: 1, Binlogs: []string{"binlog3"}}, + }, + Deltalogs: []*datapb.DeltaLogInfo{ + {RecordEntries: 5, DeltaLogPath: "deltalog3"}, + }, + }, + }, + StartTime: 3, + TimeoutInSeconds: maxCompactionTimeoutInSeconds, + Type: datapb.CompactionType_MergeCompaction, + Timetravel: 200, + Channel: "ch2", + }, + }, + }, } for _, tt := range tests { diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index ed50c8504..207ceaeff 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -51,9 +51,7 @@ type MockAllocator struct { func (m *MockAllocator) allocTimestamp(ctx context.Context) (Timestamp, error) { val := atomic.AddInt64(&m.cnt, 1) - phy := time.Now().UnixNano() / int64(time.Millisecond) - ts := tsoutil.ComposeTS(phy, val) - return ts, nil + return Timestamp(val), nil } func (m *MockAllocator) allocID(ctx context.Context) (UniqueID, error) { -- GitLab