未验证 提交 5751759c 编写于 作者: C congqixia 提交者: GitHub

Implement mix compaction logic (#15542)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 daf25bcb
......@@ -199,7 +199,7 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu
if err := c.handleInnerCompactionResult(plan, result); err != nil {
return err
case datapb.CompactionType_MergeCompaction:
case datapb.CompactionType_MergeCompaction, datapb.CompactionType_MixCompaction:
if err := c.handleMergeCompactionResult(plan, result); err != nil {
return err
......@@ -208,7 +208,8 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu
c.plans[planID] = c.plans[planID].shadowClone(setState(completed), setResult(result))
if c.plans[planID].plan.GetType() == datapb.CompactionType_MergeCompaction {
if c.plans[planID].plan.GetType() == datapb.CompactionType_MergeCompaction ||
c.plans[planID].plan.GetType() == datapb.CompactionType_MixCompaction {
c.flushCh <- result.GetSegmentID()
// TODO: when to clean task list
......@@ -17,6 +17,7 @@
package datacoord
import (
......@@ -198,6 +199,7 @@ func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, s
// forceTriggerCompaction force to start a compaction
// invoked by user `ManualCompaction` operation
func (t *compactionTrigger) forceTriggerCompaction(collectionID int64, timetravel *timetravel) (UniqueID, error) {
id, err := t.allocSignalID()
if err != nil {
......@@ -206,11 +208,11 @@ func (t *compactionTrigger) forceTriggerCompaction(collectionID int64, timetrave
signal := &compactionSignal{
id: id,
isForce: true,
isGlobal: false,
isGlobal: true,
collectionID: collectionID,
timetravel: timetravel,
return id, nil
......@@ -220,26 +222,6 @@ func (t *compactionTrigger) allocSignalID() (UniqueID, error) {
return t.allocator.allocID(ctx)
func (t *compactionTrigger) handleForceSignal(signal *compactionSignal) {
defer t.forceMu.Unlock()
t1 := time.Now()
segments := t.meta.GetSegmentsOfCollection(signal.collectionID)
singleCompactionPlans := t.globalSingleCompaction(segments, true, signal)
if len(singleCompactionPlans) != 0 {
log.Debug("force single compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("planIDs", getPlanIDs(singleCompactionPlans)))
mergeCompactionPlans := t.globalMergeCompaction(signal, true, signal.collectionID)
if len(mergeCompactionPlans) != 0 {
log.Debug("force merge compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("planIDs", getPlanIDs(mergeCompactionPlans)))
log.Info("handle force signal cost", zap.Int64("milliseconds", time.Since(t1).Milliseconds()),
zap.Int64("collectionID", signal.collectionID), zap.Int64("signalID", signal.id))
func getPlanIDs(plans []*datapb.CompactionPlan) []int64 {
ids := make([]int64, 0, len(plans))
for _, p := range plans {
......@@ -252,37 +234,42 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
defer t.forceMu.Unlock()
// 1. try global single compaction
t1 := time.Now()
if t.compactionHandler.isFull() {
// only flushed or flushing(flushed but not notified) segments
segments := t.meta.SelectSegments(isFlush)
singleCompactionPlans := t.globalSingleCompaction(segments, false, signal)
if len(singleCompactionPlans) != 0 {
log.Debug("global single compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("plans", getPlanIDs(singleCompactionPlans)))
m := t.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
return (signal.collectionID == 0 || segment.CollectionID == signal.collectionID) &&
isSegmentHealthy(segment) &&
isFlush(segment) &&
!segment.isCompacting // not compacting now
}) // m is list of chanPartSegments, which is channel-partition organized segments
for _, group := range m {
if !signal.isForce && t.compactionHandler.isFull() {
// 2. try global merge compaction
if t.compactionHandler.isFull() {
plans := t.generatePlans(group.segments, signal.isForce, signal.timetravel)
log.Info("global generated plans", zap.Int64("collection", signal.collectionID), zap.Int("plan count", len(plans)))
for _, plan := range plans {
if !signal.isForce && t.compactionHandler.isFull() {
log.Warn("compaction plan skipped due to handler full", zap.Int64("collection", signal.collectionID), zap.Int64("planID", plan.PlanID))
start := time.Now()
if err := t.fillOriginPlan(plan); err != nil {
log.Warn("failed to fill plan", zap.Error(err))
t.compactionHandler.execCompactionPlan(signal, plan)
mergeCompactionPlans := t.globalMergeCompaction(signal, false)
if len(mergeCompactionPlans) != 0 {
log.Debug("global merge compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("plans", getPlanIDs(mergeCompactionPlans)))
if time.Since(t1).Milliseconds() > 500 {
log.Info("handle global compaction cost too long", zap.Int64("milliseconds", time.Since(t1).Milliseconds()))
log.Info("time cost of generating global compaction", zap.Int64("planID", plan.PlanID), zap.Any("time cost", time.Since(start).Milliseconds()),
zap.Int64("collectionID", signal.collectionID), zap.String("channel", group.channelName), zap.Int64("partitionID", group.partitionID))
// handleSignal processes segment flush caused partition-chan level compaction signal
func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
defer t.forceMu.Unlock()
t1 := time.Now()
// 1. check whether segment's binlogs should be compacted or not
if t.compactionHandler.isFull() {
......@@ -293,33 +280,30 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
log.Warn("segment in compaction signal not found in meta", zap.Int64("segmentID", signal.segmentID))
singleCompactionPlan, err := t.singleCompaction(segment, signal.isForce, signal)
if err != nil {
log.Warn("failed to do single compaction", zap.Int64("segmentID", segment.ID), zap.Error(err))
} else {
log.Info("time cost of generating single compaction plan", zap.Int64("millis", time.Since(t1).Milliseconds()),
zap.Int64("planID", singleCompactionPlan.GetPlanID()), zap.Int64("signalID", signal.id))
// 2. check whether segments of partition&channel level should be compacted or not
if t.compactionHandler.isFull() {
channel := segment.GetInsertChannel()
partitionID := segment.GetPartitionID()
segments := t.getCandidateSegments(channel, partitionID)
plans := t.generatePlans(segments, signal.isForce, signal.timetravel)
log.Info("single generated plans", zap.Int64("collection", signal.collectionID), zap.Int("plan count", len(plans)))
for _, plan := range plans {
if t.compactionHandler.isFull() {
log.Warn("compaction plan skipped due to handler full", zap.Int64("collection", signal.collectionID), zap.Int64("planID", plan.PlanID))
start := time.Now()
if err := t.fillOriginPlan(plan); err != nil {
log.Warn("failed to fill plan", zap.Error(err))
t.compactionHandler.execCompactionPlan(signal, plan)
plans := t.mergeCompaction(segments, signal, false)
if len(plans) != 0 {
log.Debug("merge compaction plans", zap.Int64("signalID", signal.id), zap.Int64s("plans", getPlanIDs(plans)))
log.Info("time cost of generating compaction", zap.Int64("planID", plan.PlanID), zap.Any("time cost", time.Since(start).Milliseconds()),
zap.Int64("collectionID", signal.collectionID), zap.String("channel", channel), zap.Int64("partitionID", partitionID))
// log.Info("time cost of generating merge compaction", zap.Int64("planID", plan.PlanID), zap.Any("time cost", time.Since(t1).Milliseconds()),
// zap.String("channel", channel), zap.Int64("partitionID", partitionID))
func (t *compactionTrigger) globalMergeCompaction(signal *compactionSignal, isForce bool, collections ...UniqueID) []*datapb.CompactionPlan {
colls := make(map[int64]struct{})
for _, collID := range collections {
......@@ -373,11 +357,112 @@ func (t *compactionTrigger) mergeCompaction(segments []*SegmentInfo, signal *com
res = append(res, plan)
return res
type SegmentHeap []*SegmentInfo
func (h *SegmentHeap) Len() int {
return len(*h)
func (h *SegmentHeap) Less(i, j int) bool {
return (*h)[i].GetNumOfRows() < (*h)[j].GetNumOfRows()
func (h *SegmentHeap) Swap(i, j int) {
(*h)[i], (*h)[j] = (*h)[j], (*h)[i]
func (h *SegmentHeap) Push(x interface{}) {
*h = append(*h, x.(*SegmentInfo))
func (h *SegmentHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, timetravel *timetravel) []*datapb.CompactionPlan {
// find segments need internal compaction
internalCandidates := &SegmentHeap{}
mergeCandidates := &SegmentHeap{}
for _, segment := range segments {
segment := segment.ShadowClone()
if (force && t.hasValidDeltaLogs(segment, timetravel)) || t.shouldDoSingleCompaction(segment, timetravel) {
heap.Push(internalCandidates, segment)
if t.isSmallSegment(segment) {
heap.Push(mergeCandidates, segment)
var plans []*datapb.CompactionPlan
generatePlan := func(segment *SegmentInfo) {
var bucket []*SegmentInfo
bucket = append(bucket, segment)
free := segment.GetMaxRowNum()
result, free := greedySelect(internalCandidates, free)
bucket = append(bucket, result...)
result, _ = greedySelect(mergeCandidates, free)
bucket = append(bucket, result...)
plans = append(plans, segmentsToPlan(bucket, timetravel))
var segment *SegmentInfo
for internalCandidates.Len() > 0 {
segment = heap.Pop(internalCandidates).(*SegmentInfo)
// merge compaction need 2 or more segment candidates
for mergeCandidates.Len() > 1 &&
(mergeCandidates.Len() >= t.mergeCompactionSegmentThreshold || force) {
segment = heap.Pop(mergeCandidates).(*SegmentInfo)
return plans
func segmentsToPlan(segments []*SegmentInfo, timeTravel *timetravel) *datapb.CompactionPlan {
plan := &datapb.CompactionPlan{
Timetravel: timeTravel.time,
Type: datapb.CompactionType_MixCompaction,
Channel: segments[0].GetInsertChannel(),
for _, s := range segments {
segmentBinlogs := &datapb.CompactionSegmentBinlogs{
SegmentID: s.GetID(),
FieldBinlogs: s.GetBinlogs(),
Field2StatslogPaths: s.GetStatslogs(),
Deltalogs: s.GetDeltalogs(),
plan.SegmentBinlogs = append(plan.SegmentBinlogs, segmentBinlogs)
return plan
func greedySelect(candidates *SegmentHeap, free int64) ([]*SegmentInfo, int64) {
var result []*SegmentInfo
for candidates.Len() > 0 && (*candidates)[0].GetNumOfRows() < free {
segment := heap.Pop(candidates).(*SegmentInfo)
result = append(result, segment)
free -= segment.GetNumOfRows()
return result, free
func (t *compactionTrigger) getCandidateSegments(channel string, partitionID UniqueID) []*SegmentInfo {
segments := t.meta.GetSegmentsByChannel(channel)
res := make([]*SegmentInfo, 0)
var res []*SegmentInfo
for _, s := range segments {
if !isFlush(s) || s.GetInsertChannel() != channel ||
s.GetPartitionID() != partitionID || s.isCompacting {
......@@ -388,10 +473,14 @@ func (t *compactionTrigger) getCandidateSegments(channel string, partitionID Uni
return res
func (t *compactionTrigger) isSmallSegment(segment *SegmentInfo) bool {
return segment.GetNumOfRows() < segment.GetMaxRowNum()/2
func (t *compactionTrigger) shouldDoMergeCompaction(segments []*SegmentInfo) bool {
littleSegmentNum := 0
for _, s := range segments {
if s.GetNumOfRows() < s.GetMaxRowNum()/2 {
if t.isSmallSegment(s) {
......@@ -438,6 +527,23 @@ func (t *compactionTrigger) shouldDoSingleCompaction(segment *SegmentInfo, timet
return float32(totalDeletedRows)/float32(segment.NumOfRows) >= singleCompactionRatioThreshold || totalDeleteLogSize > singleCompactionDeltaLogMaxSize
func (t *compactionTrigger) hasValidDeltaLogs(segment *SegmentInfo, timetravel *timetravel) bool {
if segment.LastExpireTime >= timetravel.time {
return false
for _, fbl := range segment.GetDeltalogs() {
for _, l := range fbl.GetBinlogs() {
if l.TimestampTo < timetravel.time {
return true
return false
func (t *compactionTrigger) globalSingleCompaction(segments []*SegmentInfo, isForce bool, signal *compactionSignal) []*datapb.CompactionPlan {
plans := make([]*datapb.CompactionPlan, 0)
for _, segment := range segments {
......@@ -475,7 +581,7 @@ func (t *compactionTrigger) singleCompaction(segment *SegmentInfo, isForce bool,
return nil, err
return plan, t.compactionHandler.execCompactionPlan(signal, plan)
func isFlush(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing
......@@ -22,6 +22,7 @@ import (
......@@ -194,7 +195,7 @@ func Test_compactionTrigger_forceTriggerCompaction(t *testing.T) {
StartTime: 3,
TimeoutInSeconds: maxCompactionTimeoutInSeconds,
Type: datapb.CompactionType_MergeCompaction,
Type: datapb.CompactionType_MixCompaction,
Timetravel: 200,
Channel: "ch1",
......@@ -262,7 +263,7 @@ func Test_compactionTrigger_forceTriggerCompaction(t *testing.T) {
StartTime: 3,
TimeoutInSeconds: maxCompactionTimeoutInSeconds,
Type: datapb.CompactionType_InnerCompaction,
Type: datapb.CompactionType_MixCompaction,
Timetravel: 200,
Channel: "ch1",
......@@ -373,7 +374,7 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) {
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 300,
NumOfRows: 301,
MaxRowNum: 1000,
InsertChannel: "ch2",
State: commonpb.SegmentState_Flushed,
......@@ -424,7 +425,7 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) {
StartTime: 3,
TimeoutInSeconds: maxCompactionTimeoutInSeconds,
Type: datapb.CompactionType_InnerCompaction,
Type: datapb.CompactionType_MixCompaction,
Timetravel: 200,
Channel: "ch1",
......@@ -460,7 +461,7 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) {
StartTime: 5,
TimeoutInSeconds: maxCompactionTimeoutInSeconds,
Type: datapb.CompactionType_MergeCompaction,
Type: datapb.CompactionType_MixCompaction,
Timetravel: 200,
Channel: "ch2",
......@@ -522,7 +523,7 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) {
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 300,
NumOfRows: 301,
MaxRowNum: 1000,
InsertChannel: "ch2",
State: commonpb.SegmentState_Flushed,
......@@ -653,7 +654,7 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) {
StartTime: 3,
TimeoutInSeconds: maxCompactionTimeoutInSeconds,
Type: datapb.CompactionType_MergeCompaction,
Type: datapb.CompactionType_MixCompaction,
Timetravel: 200,
Channel: "ch2",
......@@ -686,11 +687,37 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) {
for _, plan := range gotPlans {
assert.EqualValues(t, tt.wantPlans, gotPlans)
for _, wantPlan := range tt.wantPlans {
foundMatch := false
for _, gotPlan := range gotPlans {
if sameSegmentInfos(gotPlan.SegmentBinlogs, wantPlan.SegmentBinlogs) {
assert.Equal(t, wantPlan.Channel, gotPlan.Channel)
assert.Equal(t, wantPlan.Timetravel, gotPlan.Timetravel)
assert.Equal(t, wantPlan.TimeoutInSeconds, gotPlan.TimeoutInSeconds)
assert.Equal(t, wantPlan.Type, gotPlan.Type)
foundMatch = true
assert.True(t, foundMatch)
func sameSegmentInfos(s1, s2 []*datapb.CompactionSegmentBinlogs) bool {
if len(s1) != len(s2) {
return false
for idx, seg := range s1 {
if !proto.Equal(seg, s2[idx]) {
return false
return true
func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) {
type fields struct {
meta *meta
......@@ -1079,10 +1106,12 @@ func Test_compactionTrigger_singleTriggerCompaction(t *testing.T) {
if tt.wantPlan {
assert.EqualValues(t, tt.wantBinlogs, plan.GetSegmentBinlogs())
} else {
t.Error("case not want plan but got one")
case <-ctx.Done():
if tt.wantPlan {
t.Error("case want plan but got none")
......@@ -319,7 +319,7 @@ func (t *compactionTask) compact() error {
log.Error("compact wrong, there's no segments in segment binlogs")
return errIllegalCompactionPlan
case t.plan.GetType() == datapb.CompactionType_MergeCompaction:
case t.plan.GetType() == datapb.CompactionType_MergeCompaction || t.plan.GetType() == datapb.CompactionType_MixCompaction:
targetSegID, err = t.allocID()
if err != nil {
log.Error("compact wrong", zap.Error(err))
......@@ -338,6 +338,7 @@ enum CompactionType {
UndefinedCompaction = 0;
InnerCompaction = 1;
MergeCompaction = 2;
MixCompaction = 3;
message CompactionSegmentBinlogs {
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册