From 5574a104c9d0eb8c476f9723cab6d572802c940b Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Thu, 11 Nov 2021 15:54:42 +0800 Subject: [PATCH] Add triggerInfo in compactionTask (#11517) issue: #9904 Signed-off-by: sunby Co-authored-by: sunby --- internal/datacoord/compaction.go | 21 +++++----- internal/datacoord/compaction_test.go | 41 ++++++++++++++++--- internal/datacoord/compaction_trigger.go | 34 +++++++-------- internal/datacoord/compaction_trigger_test.go | 2 +- internal/datacoord/mock_test.go | 6 +-- internal/datacoord/services.go | 3 ++ 6 files changed, 71 insertions(+), 36 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 422599cb2..014de0a95 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -24,7 +24,7 @@ type compactionPlanContext interface { start() stop() // execCompactionPlan start to execute plan and return immediately - execCompactionPlan(plan *datapb.CompactionPlan) error + execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error // completeCompaction record the result of a compaction completeCompaction(result *datapb.CompactionResult) error // getCompaction return compaction task. If planId does not exist, return nil. @@ -60,9 +60,10 @@ type compactionTask struct { func (t *compactionTask) shadowClone(opts ...compactionTaskOpt) *compactionTask { task := &compactionTask{ - plan: t.plan, - state: t.state, - dataNodeID: t.dataNodeID, + triggerInfo: t.triggerInfo, + plan: t.plan, + state: t.state, + dataNodeID: t.dataNodeID, } for _, opt := range opts { opt(task) @@ -131,7 +132,7 @@ func (c *compactionPlanHandler) stop() { } // execCompactionPlan start to execute plan and return immediately -func (c *compactionPlanHandler) execCompactionPlan(plan *datapb.CompactionPlan) error { +func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { c.mu.Lock() defer c.mu.Unlock() @@ -146,9 +147,10 @@ func (c *compactionPlanHandler) execCompactionPlan(plan *datapb.CompactionPlan) c.sessions.Compaction(nodeID, plan) task := &compactionTask{ - plan: plan, - state: executing, - dataNodeID: nodeID, + triggerInfo: signal, + plan: plan, + state: executing, + dataNodeID: nodeID, } c.plans[plan.PlanID] = task c.executingTaskNum++ @@ -188,8 +190,7 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu default: return errors.New("unknown compaction type") } - c.plans[planID] = c.plans[planID].shadowClone(setState(completed)) - c.plans[planID] = c.plans[planID].shadowClone(setResult(result)) + c.plans[planID] = c.plans[planID].shadowClone(setState(completed), setResult(result)) c.executingTaskNum-- if c.plans[planID].plan.GetType() == datapb.CompactionType_MergeCompaction { c.flushCh <- result.GetSegmentID() diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index f2f731588..b72a78094 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -21,7 +21,8 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { chManager *ChannelManager } type args struct { - plan *datapb.CompactionPlan + signal *compactionSignal + plan *datapb.CompactionPlan } tests := []struct { name string @@ -53,7 +54,8 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { }, }, args{ - plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction}, + signal: &compactionSignal{id: 100}, + plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction}, }, false, nil, @@ -66,12 +68,13 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { sessions: tt.fields.sessions, chManager: tt.fields.chManager, } - err := c.execCompactionPlan(tt.args.plan) + err := c.execCompactionPlan(tt.args.signal, tt.args.plan) assert.Equal(t, tt.err, err) if err == nil { <-ch task := c.getCompaction(tt.args.plan.PlanID) assert.Equal(t, tt.args.plan, task.plan) + assert.Equal(t, tt.args.signal, task.triggerInfo) } }) } @@ -92,6 +95,7 @@ func Test_compactionPlanHandler_completeCompaction(t *testing.T) { fields fields args args wantErr bool + want *compactionTask }{ { "test complete non existed compaction task", @@ -102,6 +106,7 @@ func Test_compactionPlanHandler_completeCompaction(t *testing.T) { result: &datapb.CompactionResult{PlanID: 2}, }, true, + nil, }, { "test complete completed task", @@ -112,13 +117,15 @@ func Test_compactionPlanHandler_completeCompaction(t *testing.T) { result: &datapb.CompactionResult{PlanID: 1}, }, true, + nil, }, { "test complete inner compaction", fields{ map[int64]*compactionTask{ 1: { - state: executing, + triggerInfo: &compactionSignal{id: 1}, + state: executing, plan: &datapb.CompactionPlan{ PlanID: 1, SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ @@ -147,13 +154,25 @@ func Test_compactionPlanHandler_completeCompaction(t *testing.T) { }, }, false, + &compactionTask{ + triggerInfo: &compactionSignal{id: 1}, + state: completed, + plan: &datapb.CompactionPlan{ + PlanID: 1, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + {SegmentID: 1, FieldBinlogs: []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"log1"}}}}, + }, + Type: datapb.CompactionType_InnerCompaction, + }, + }, }, { "test complete merge compaction", fields{ map[int64]*compactionTask{ 1: { - state: executing, + triggerInfo: &compactionSignal{id: 1}, + state: executing, plan: &datapb.CompactionPlan{ PlanID: 1, SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ @@ -184,6 +203,18 @@ func Test_compactionPlanHandler_completeCompaction(t *testing.T) { }, }, false, + &compactionTask{ + triggerInfo: &compactionSignal{id: 1}, + state: completed, + plan: &datapb.CompactionPlan{ + PlanID: 1, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + {SegmentID: 1, FieldBinlogs: []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"log1"}}}}, + {SegmentID: 2, FieldBinlogs: []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"log2"}}}}, + }, + Type: datapb.CompactionType_MergeCompaction, + }, + }, }, } for _, tt := range tests { diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 28cb40ecc..82da1fbce 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -199,12 +199,12 @@ func (t *compactionTrigger) handleForceSignal(signal *compactionSignal) { t1 := time.Now() segments := t.meta.GetSegmentsOfCollection(signal.collectionID) - singleCompactionPlans := t.globalSingleCompaction(segments, true, signal.timetravel) + singleCompactionPlans := t.globalSingleCompaction(segments, true, signal) if len(singleCompactionPlans) != 0 { log.Debug("force single compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("planIDs", getPlanIDs(singleCompactionPlans))) } - mergeCompactionPlans := t.globalMergeCompaction(signal.timetravel, true, signal.collectionID) + mergeCompactionPlans := t.globalMergeCompaction(signal, true, signal.collectionID) if len(mergeCompactionPlans) != 0 { log.Debug("force merge compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("planIDs", getPlanIDs(mergeCompactionPlans))) } @@ -230,7 +230,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { return } segments := t.meta.segments.GetSegments() - singleCompactionPlans := t.globalSingleCompaction(segments, false, signal.timetravel) + singleCompactionPlans := t.globalSingleCompaction(segments, false, signal) if len(singleCompactionPlans) != 0 { log.Debug("global single compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("plans", getPlanIDs(singleCompactionPlans))) } @@ -240,7 +240,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { return } - mergeCompactionPlans := t.globalMergeCompaction(signal.timetravel, false) + mergeCompactionPlans := t.globalMergeCompaction(signal, false) if len(mergeCompactionPlans) != 0 { log.Debug("global merge compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("plans", getPlanIDs(mergeCompactionPlans))) } @@ -259,7 +259,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { } segment := t.meta.GetSegment(signal.segmentID) - singleCompactionPlan, err := t.singleCompaction(segment, signal.isForce, signal.timetravel) + singleCompactionPlan, err := t.singleCompaction(segment, signal.isForce, signal) if err != nil { log.Warn("failed to do single compaction", zap.Int64("segmentID", segment.ID), zap.Error(err)) } else { @@ -277,7 +277,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { segments := t.getCandidateSegments(channel, partitionID) - plans := t.mergeCompaction(segments, signal.timetravel, false) + plans := t.mergeCompaction(segments, signal, false) if len(plans) != 0 { log.Debug("merge compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("plans", getPlanIDs(plans))) } @@ -286,7 +286,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { // zap.String("channel", channel), zap.Int64("partitionID", partitionID)) } -func (t *compactionTrigger) globalMergeCompaction(timetravel *timetravel, isForce bool, collections ...UniqueID) []*datapb.CompactionPlan { +func (t *compactionTrigger) globalMergeCompaction(signal *compactionSignal, isForce bool, collections ...UniqueID) []*datapb.CompactionPlan { colls := make(map[int64]struct{}) for _, collID := range collections { colls[collID] = struct{}{} @@ -302,19 +302,19 @@ func (t *compactionTrigger) globalMergeCompaction(timetravel *timetravel, isForc if !isForce && t.compactionHandler.isFull() { return plans } - mplans := t.mergeCompaction(segments.segments, timetravel, isForce) + mplans := t.mergeCompaction(segments.segments, signal, isForce) plans = append(plans, mplans...) } return plans } -func (t *compactionTrigger) mergeCompaction(segments []*SegmentInfo, timetravel *timetravel, isForce bool) []*datapb.CompactionPlan { +func (t *compactionTrigger) mergeCompaction(segments []*SegmentInfo, signal *compactionSignal, isForce bool) []*datapb.CompactionPlan { if !isForce && !t.shouldDoMergeCompaction(segments) { return nil } - plans := t.mergeCompactionPolicy.generatePlan(segments, timetravel) + plans := t.mergeCompactionPolicy.generatePlan(segments, signal.timetravel) if len(plans) == 0 { return nil } @@ -331,7 +331,7 @@ func (t *compactionTrigger) mergeCompaction(segments []*SegmentInfo, timetravel } log.Debug("exec merge compaction plan", zap.Any("plan", plan)) - if err := t.compactionHandler.execCompactionPlan(plan); err != nil { + if err := t.compactionHandler.execCompactionPlan(signal, plan); err != nil { log.Warn("failed to execute compaction plan", zap.Error(err)) continue } @@ -396,13 +396,13 @@ func (t *compactionTrigger) shouldDoSingleCompaction(segment *SegmentInfo, timet return float32(totalDeletedRows)/float32(segment.NumOfRows) >= singleCompactionRatioThreshold || totalDeleteLogSize > singleCompactionDeltaLogMaxSize } -func (t *compactionTrigger) globalSingleCompaction(segments []*SegmentInfo, isForce bool, timetravel *timetravel) []*datapb.CompactionPlan { +func (t *compactionTrigger) globalSingleCompaction(segments []*SegmentInfo, isForce bool, signal *compactionSignal) []*datapb.CompactionPlan { plans := make([]*datapb.CompactionPlan, 0) for _, segment := range segments { if !isForce && t.compactionHandler.isFull() { return plans } - plan, err := t.singleCompaction(segment, isForce, timetravel) + plan, err := t.singleCompaction(segment, isForce, signal) if err != nil { log.Warn("failed to exec single compaction", zap.Error(err)) continue @@ -415,16 +415,16 @@ func (t *compactionTrigger) globalSingleCompaction(segments []*SegmentInfo, isFo return plans } -func (t *compactionTrigger) singleCompaction(segment *SegmentInfo, isForce bool, timetravel *timetravel) (*datapb.CompactionPlan, error) { +func (t *compactionTrigger) singleCompaction(segment *SegmentInfo, isForce bool, signal *compactionSignal) (*datapb.CompactionPlan, error) { if segment == nil { return nil, nil } - if !isForce && !t.shouldDoSingleCompaction(segment, timetravel) { + if !isForce && !t.shouldDoSingleCompaction(segment, signal.timetravel) { return nil, nil } - plan := t.singleCompactionPolicy.generatePlan(segment, timetravel) + plan := t.singleCompactionPolicy.generatePlan(segment, signal.timetravel) if plan == nil { return nil, nil } @@ -432,5 +432,5 @@ func (t *compactionTrigger) singleCompaction(segment *SegmentInfo, isForce bool, if err := t.fillOriginPlan(plan); err != nil { return nil, err } - return plan, t.compactionHandler.execCompactionPlan(plan) + return plan, t.compactionHandler.execCompactionPlan(signal, plan) } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index f703ca469..8a33c41cb 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -17,7 +17,7 @@ type spyCompactionHandler struct { } // execCompactionPlan start to execute plan and return immediately -func (h *spyCompactionHandler) execCompactionPlan(plan *datapb.CompactionPlan) error { +func (h *spyCompactionHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { h.spyChan <- plan return nil } diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 0993349bd..4acd2bb2d 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -472,10 +472,10 @@ func (h *mockCompactionHandler) stop() { } // execCompactionPlan start to execute plan and return immediately -func (h *mockCompactionHandler) execCompactionPlan(plan *datapb.CompactionPlan) error { +func (h *mockCompactionHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { if f, ok := h.methods["execCompactionPlan"]; ok { - if ff, ok := f.(func(plan *datapb.CompactionPlan) error); ok { - return ff(plan) + if ff, ok := f.(func(signal *compactionSignal, plan *datapb.CompactionPlan) error); ok { + return ff(signal, plan) } } panic("not implemented") diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index e19c3799b..0a484b432 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -710,6 +710,8 @@ func (s *Server) GetCompactionState(ctx context.Context, req *milvuspb.GetCompac resp.CompletedPlanNo = int64(completedCnt) resp.TimeoutPlanNo = int64(timeoutCnt) resp.Status.ErrorCode = commonpb.ErrorCode_Success + log.Debug("success to get compaction state", zap.Any("state", state), zap.Int("executing", executingCnt), + zap.Int("completed", completedCnt), zap.Int("timeout", timeoutCnt)) return resp, nil } @@ -740,6 +742,7 @@ func (s *Server) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb. resp.Status.ErrorCode = commonpb.ErrorCode_Success resp.State = state + log.Debug("success to get state with plans", zap.Any("state", state), zap.Any("merge infos", resp.MergeInfos)) return resp, nil } -- GitLab