未验证 提交 5574a104 编写于 作者: B Bingyi Sun 提交者: GitHub

Add triggerInfo in compactionTask (#11517)

issue: #9904
Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
Co-authored-by: Nsunby <bingyi.sun@zilliz.com>
上级 7970c341
......@@ -24,7 +24,7 @@ type compactionPlanContext interface {
start()
stop()
// execCompactionPlan start to execute plan and return immediately
execCompactionPlan(plan *datapb.CompactionPlan) error
execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error
// completeCompaction record the result of a compaction
completeCompaction(result *datapb.CompactionResult) error
// getCompaction return compaction task. If planId does not exist, return nil.
......@@ -60,9 +60,10 @@ type compactionTask struct {
func (t *compactionTask) shadowClone(opts ...compactionTaskOpt) *compactionTask {
task := &compactionTask{
plan: t.plan,
state: t.state,
dataNodeID: t.dataNodeID,
triggerInfo: t.triggerInfo,
plan: t.plan,
state: t.state,
dataNodeID: t.dataNodeID,
}
for _, opt := range opts {
opt(task)
......@@ -131,7 +132,7 @@ func (c *compactionPlanHandler) stop() {
}
// execCompactionPlan start to execute plan and return immediately
func (c *compactionPlanHandler) execCompactionPlan(plan *datapb.CompactionPlan) error {
func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
c.mu.Lock()
defer c.mu.Unlock()
......@@ -146,9 +147,10 @@ func (c *compactionPlanHandler) execCompactionPlan(plan *datapb.CompactionPlan)
c.sessions.Compaction(nodeID, plan)
task := &compactionTask{
plan: plan,
state: executing,
dataNodeID: nodeID,
triggerInfo: signal,
plan: plan,
state: executing,
dataNodeID: nodeID,
}
c.plans[plan.PlanID] = task
c.executingTaskNum++
......@@ -188,8 +190,7 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu
default:
return errors.New("unknown compaction type")
}
c.plans[planID] = c.plans[planID].shadowClone(setState(completed))
c.plans[planID] = c.plans[planID].shadowClone(setResult(result))
c.plans[planID] = c.plans[planID].shadowClone(setState(completed), setResult(result))
c.executingTaskNum--
if c.plans[planID].plan.GetType() == datapb.CompactionType_MergeCompaction {
c.flushCh <- result.GetSegmentID()
......
......@@ -21,7 +21,8 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
chManager *ChannelManager
}
type args struct {
plan *datapb.CompactionPlan
signal *compactionSignal
plan *datapb.CompactionPlan
}
tests := []struct {
name string
......@@ -53,7 +54,8 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
},
},
args{
plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction},
signal: &compactionSignal{id: 100},
plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction},
},
false,
nil,
......@@ -66,12 +68,13 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
sessions: tt.fields.sessions,
chManager: tt.fields.chManager,
}
err := c.execCompactionPlan(tt.args.plan)
err := c.execCompactionPlan(tt.args.signal, tt.args.plan)
assert.Equal(t, tt.err, err)
if err == nil {
<-ch
task := c.getCompaction(tt.args.plan.PlanID)
assert.Equal(t, tt.args.plan, task.plan)
assert.Equal(t, tt.args.signal, task.triggerInfo)
}
})
}
......@@ -92,6 +95,7 @@ func Test_compactionPlanHandler_completeCompaction(t *testing.T) {
fields fields
args args
wantErr bool
want *compactionTask
}{
{
"test complete non existed compaction task",
......@@ -102,6 +106,7 @@ func Test_compactionPlanHandler_completeCompaction(t *testing.T) {
result: &datapb.CompactionResult{PlanID: 2},
},
true,
nil,
},
{
"test complete completed task",
......@@ -112,13 +117,15 @@ func Test_compactionPlanHandler_completeCompaction(t *testing.T) {
result: &datapb.CompactionResult{PlanID: 1},
},
true,
nil,
},
{
"test complete inner compaction",
fields{
map[int64]*compactionTask{
1: {
state: executing,
triggerInfo: &compactionSignal{id: 1},
state: executing,
plan: &datapb.CompactionPlan{
PlanID: 1,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
......@@ -147,13 +154,25 @@ func Test_compactionPlanHandler_completeCompaction(t *testing.T) {
},
},
false,
&compactionTask{
triggerInfo: &compactionSignal{id: 1},
state: completed,
plan: &datapb.CompactionPlan{
PlanID: 1,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 1, FieldBinlogs: []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"log1"}}}},
},
Type: datapb.CompactionType_InnerCompaction,
},
},
},
{
"test complete merge compaction",
fields{
map[int64]*compactionTask{
1: {
state: executing,
triggerInfo: &compactionSignal{id: 1},
state: executing,
plan: &datapb.CompactionPlan{
PlanID: 1,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
......@@ -184,6 +203,18 @@ func Test_compactionPlanHandler_completeCompaction(t *testing.T) {
},
},
false,
&compactionTask{
triggerInfo: &compactionSignal{id: 1},
state: completed,
plan: &datapb.CompactionPlan{
PlanID: 1,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 1, FieldBinlogs: []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"log1"}}}},
{SegmentID: 2, FieldBinlogs: []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"log2"}}}},
},
Type: datapb.CompactionType_MergeCompaction,
},
},
},
}
for _, tt := range tests {
......
......@@ -199,12 +199,12 @@ func (t *compactionTrigger) handleForceSignal(signal *compactionSignal) {
t1 := time.Now()
segments := t.meta.GetSegmentsOfCollection(signal.collectionID)
singleCompactionPlans := t.globalSingleCompaction(segments, true, signal.timetravel)
singleCompactionPlans := t.globalSingleCompaction(segments, true, signal)
if len(singleCompactionPlans) != 0 {
log.Debug("force single compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("planIDs", getPlanIDs(singleCompactionPlans)))
}
mergeCompactionPlans := t.globalMergeCompaction(signal.timetravel, true, signal.collectionID)
mergeCompactionPlans := t.globalMergeCompaction(signal, true, signal.collectionID)
if len(mergeCompactionPlans) != 0 {
log.Debug("force merge compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("planIDs", getPlanIDs(mergeCompactionPlans)))
}
......@@ -230,7 +230,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
return
}
segments := t.meta.segments.GetSegments()
singleCompactionPlans := t.globalSingleCompaction(segments, false, signal.timetravel)
singleCompactionPlans := t.globalSingleCompaction(segments, false, signal)
if len(singleCompactionPlans) != 0 {
log.Debug("global single compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("plans", getPlanIDs(singleCompactionPlans)))
}
......@@ -240,7 +240,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
return
}
mergeCompactionPlans := t.globalMergeCompaction(signal.timetravel, false)
mergeCompactionPlans := t.globalMergeCompaction(signal, false)
if len(mergeCompactionPlans) != 0 {
log.Debug("global merge compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("plans", getPlanIDs(mergeCompactionPlans)))
}
......@@ -259,7 +259,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
}
segment := t.meta.GetSegment(signal.segmentID)
singleCompactionPlan, err := t.singleCompaction(segment, signal.isForce, signal.timetravel)
singleCompactionPlan, err := t.singleCompaction(segment, signal.isForce, signal)
if err != nil {
log.Warn("failed to do single compaction", zap.Int64("segmentID", segment.ID), zap.Error(err))
} else {
......@@ -277,7 +277,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
segments := t.getCandidateSegments(channel, partitionID)
plans := t.mergeCompaction(segments, signal.timetravel, false)
plans := t.mergeCompaction(segments, signal, false)
if len(plans) != 0 {
log.Debug("merge compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("plans", getPlanIDs(plans)))
}
......@@ -286,7 +286,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
// zap.String("channel", channel), zap.Int64("partitionID", partitionID))
}
func (t *compactionTrigger) globalMergeCompaction(timetravel *timetravel, isForce bool, collections ...UniqueID) []*datapb.CompactionPlan {
func (t *compactionTrigger) globalMergeCompaction(signal *compactionSignal, isForce bool, collections ...UniqueID) []*datapb.CompactionPlan {
colls := make(map[int64]struct{})
for _, collID := range collections {
colls[collID] = struct{}{}
......@@ -302,19 +302,19 @@ func (t *compactionTrigger) globalMergeCompaction(timetravel *timetravel, isForc
if !isForce && t.compactionHandler.isFull() {
return plans
}
mplans := t.mergeCompaction(segments.segments, timetravel, isForce)
mplans := t.mergeCompaction(segments.segments, signal, isForce)
plans = append(plans, mplans...)
}
return plans
}
func (t *compactionTrigger) mergeCompaction(segments []*SegmentInfo, timetravel *timetravel, isForce bool) []*datapb.CompactionPlan {
func (t *compactionTrigger) mergeCompaction(segments []*SegmentInfo, signal *compactionSignal, isForce bool) []*datapb.CompactionPlan {
if !isForce && !t.shouldDoMergeCompaction(segments) {
return nil
}
plans := t.mergeCompactionPolicy.generatePlan(segments, timetravel)
plans := t.mergeCompactionPolicy.generatePlan(segments, signal.timetravel)
if len(plans) == 0 {
return nil
}
......@@ -331,7 +331,7 @@ func (t *compactionTrigger) mergeCompaction(segments []*SegmentInfo, timetravel
}
log.Debug("exec merge compaction plan", zap.Any("plan", plan))
if err := t.compactionHandler.execCompactionPlan(plan); err != nil {
if err := t.compactionHandler.execCompactionPlan(signal, plan); err != nil {
log.Warn("failed to execute compaction plan", zap.Error(err))
continue
}
......@@ -396,13 +396,13 @@ func (t *compactionTrigger) shouldDoSingleCompaction(segment *SegmentInfo, timet
return float32(totalDeletedRows)/float32(segment.NumOfRows) >= singleCompactionRatioThreshold || totalDeleteLogSize > singleCompactionDeltaLogMaxSize
}
func (t *compactionTrigger) globalSingleCompaction(segments []*SegmentInfo, isForce bool, timetravel *timetravel) []*datapb.CompactionPlan {
func (t *compactionTrigger) globalSingleCompaction(segments []*SegmentInfo, isForce bool, signal *compactionSignal) []*datapb.CompactionPlan {
plans := make([]*datapb.CompactionPlan, 0)
for _, segment := range segments {
if !isForce && t.compactionHandler.isFull() {
return plans
}
plan, err := t.singleCompaction(segment, isForce, timetravel)
plan, err := t.singleCompaction(segment, isForce, signal)
if err != nil {
log.Warn("failed to exec single compaction", zap.Error(err))
continue
......@@ -415,16 +415,16 @@ func (t *compactionTrigger) globalSingleCompaction(segments []*SegmentInfo, isFo
return plans
}
func (t *compactionTrigger) singleCompaction(segment *SegmentInfo, isForce bool, timetravel *timetravel) (*datapb.CompactionPlan, error) {
func (t *compactionTrigger) singleCompaction(segment *SegmentInfo, isForce bool, signal *compactionSignal) (*datapb.CompactionPlan, error) {
if segment == nil {
return nil, nil
}
if !isForce && !t.shouldDoSingleCompaction(segment, timetravel) {
if !isForce && !t.shouldDoSingleCompaction(segment, signal.timetravel) {
return nil, nil
}
plan := t.singleCompactionPolicy.generatePlan(segment, timetravel)
plan := t.singleCompactionPolicy.generatePlan(segment, signal.timetravel)
if plan == nil {
return nil, nil
}
......@@ -432,5 +432,5 @@ func (t *compactionTrigger) singleCompaction(segment *SegmentInfo, isForce bool,
if err := t.fillOriginPlan(plan); err != nil {
return nil, err
}
return plan, t.compactionHandler.execCompactionPlan(plan)
return plan, t.compactionHandler.execCompactionPlan(signal, plan)
}
......@@ -17,7 +17,7 @@ type spyCompactionHandler struct {
}
// execCompactionPlan start to execute plan and return immediately
func (h *spyCompactionHandler) execCompactionPlan(plan *datapb.CompactionPlan) error {
func (h *spyCompactionHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
h.spyChan <- plan
return nil
}
......
......@@ -472,10 +472,10 @@ func (h *mockCompactionHandler) stop() {
}
// execCompactionPlan start to execute plan and return immediately
func (h *mockCompactionHandler) execCompactionPlan(plan *datapb.CompactionPlan) error {
func (h *mockCompactionHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
if f, ok := h.methods["execCompactionPlan"]; ok {
if ff, ok := f.(func(plan *datapb.CompactionPlan) error); ok {
return ff(plan)
if ff, ok := f.(func(signal *compactionSignal, plan *datapb.CompactionPlan) error); ok {
return ff(signal, plan)
}
}
panic("not implemented")
......
......@@ -710,6 +710,8 @@ func (s *Server) GetCompactionState(ctx context.Context, req *milvuspb.GetCompac
resp.CompletedPlanNo = int64(completedCnt)
resp.TimeoutPlanNo = int64(timeoutCnt)
resp.Status.ErrorCode = commonpb.ErrorCode_Success
log.Debug("success to get compaction state", zap.Any("state", state), zap.Int("executing", executingCnt),
zap.Int("completed", completedCnt), zap.Int("timeout", timeoutCnt))
return resp, nil
}
......@@ -740,6 +742,7 @@ func (s *Server) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.
resp.Status.ErrorCode = commonpb.ErrorCode_Success
resp.State = state
log.Debug("success to get state with plans", zap.Any("state", state), zap.Any("merge infos", resp.MergeInfos))
return resp, nil
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册