未验证 提交 f16bbe71 编写于 作者: J jaime 提交者: GitHub

Add a segment seal policy by number of binlog files (#21941)

Signed-off-by: Njaime <yun.zhang@zilliz.com>
上级 68156b3d
......@@ -288,6 +288,9 @@ dataCoord:
# `minSizeFromIdleToSealed`, Milvus will automatically seal it.
maxIdleTime: 600 # The max idle time of segment in seconds, 10*60.
minSizeFromIdleToSealed: 16 # The min size in MB of segment which can be idle from sealed.
# The max number of binlog file for one segment, the segment will be sealed if
# the number of binlog file reaches to max value.
maxBinlogFileNumber: 16
smallProportion: 0.5 # The segment is considered as "small segment" when its # of rows is smaller than
# (smallProportion * segment max # of rows).
compactableProportion: 0.85 # A compaction will happen on small segments if the segment after compaction will have
......
......@@ -116,7 +116,7 @@ func getSegmentCapacityPolicy(sizeFactor float64) segmentSealPolicy {
}
}
// getLastExpiresLifetimePolicy get segmentSealPolicy with lifetime limit compares ts - segment.lastExpireTime
// sealByMaxBinlogSizePolicy get segmentSealPolicy with lifetime limit compares ts - segment.lastExpireTime
func sealByLifetimePolicy(lifetime time.Duration) segmentSealPolicy {
return func(segment *SegmentInfo, ts Timestamp) bool {
pts, _ := tsoutil.ParseTS(ts)
......@@ -126,6 +126,18 @@ func sealByLifetimePolicy(lifetime time.Duration) segmentSealPolicy {
}
}
// sealByMaxBinlogSizePolicy seal segment if binlog file number of segment exceed configured max number
func sealByMaxBinlogFileNumberPolicy(maxBinlogFileNumber int) segmentSealPolicy {
return func(segment *SegmentInfo, ts Timestamp) bool {
logFileCounter := 0
for _, fieldBinlog := range segment.GetStatslogs() {
logFileCounter += len(fieldBinlog.GetBinlogs())
}
return logFileCounter >= maxBinlogFileNumber
}
}
// sealLongTimeIdlePolicy seal segment if the segment has been written with a high frequency before.
// serve for this case:
// If users insert entities into segment continuously within a certain period of time, but they forgot to flush/(seal)
......
......@@ -184,6 +184,7 @@ func defaultAllocatePolicy() AllocatePolicy {
func defaultSegmentSealPolicy() []segmentSealPolicy {
return []segmentSealPolicy{
sealByMaxBinlogFileNumberPolicy(Params.DataCoordCfg.SegmentMaxBinlogFileNumber),
sealByLifetimePolicy(Params.DataCoordCfg.SegmentMaxLifetime),
getSegmentCapacityPolicy(Params.DataCoordCfg.SegmentSealProportion),
sealLongTimeIdlePolicy(Params.DataCoordCfg.SegmentMaxIdleTime, Params.DataCoordCfg.SegmentMinSizeFromIdleToSealed, Params.DataCoordCfg.SegmentMaxSize),
......
......@@ -23,11 +23,13 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/util/metautil"
)
func TestManagerOptions(t *testing.T) {
......@@ -517,6 +519,91 @@ func TestTryToSealSegment(t *testing.T) {
}
})
t.Run("test sealByMaxBinlogFileNumberPolicy", func(t *testing.T) {
Params.Init()
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta()
assert.Nil(t, err)
schema := newTestSchema()
collID, err := mockAllocator.allocID(context.Background())
assert.Nil(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, nil)
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
ts, err := segmentManager.allocator.allocTimestamp(context.Background())
assert.Nil(t, err)
// No seal polices
{
err = segmentManager.tryToSealSegment(ts, "c1")
assert.Nil(t, err)
segments := segmentManager.meta.segments.segments
assert.Equal(t, 1, len(segments))
for _, seg := range segments {
assert.Equal(t, commonpb.SegmentState_Growing, seg.GetState())
}
}
// Not trigger seal
{
segmentManager.segmentSealPolicies = []segmentSealPolicy{sealByMaxBinlogFileNumberPolicy(2)}
segments := segmentManager.meta.segments.segments
assert.Equal(t, 1, len(segments))
for _, seg := range segments {
seg.Statslogs = []*datapb.FieldBinlog{
{
FieldID: 2,
Binlogs: []*datapb.Binlog{
{
EntriesNum: 10,
LogID: 3,
LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 2, 3),
},
},
},
}
err = segmentManager.tryToSealSegment(ts, "c1")
assert.Nil(t, err)
seg = segmentManager.meta.segments.segments[seg.ID]
assert.Equal(t, commonpb.SegmentState_Growing, seg.GetState())
}
}
// Trigger seal
{
segmentManager.segmentSealPolicies = []segmentSealPolicy{sealByMaxBinlogFileNumberPolicy(2)}
segments := segmentManager.meta.segments.segments
assert.Equal(t, 1, len(segments))
for _, seg := range segments {
seg.Statslogs = []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
EntriesNum: 10,
LogID: 1,
LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 3),
},
{
EntriesNum: 20,
LogID: 2,
LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 2),
},
},
},
}
err = segmentManager.tryToSealSegment(ts, "c1")
assert.Nil(t, err)
seg = segmentManager.meta.segments.segments[seg.ID]
assert.Equal(t, commonpb.SegmentState_Sealed, seg.GetState())
}
}
})
t.Run("seal with segment policy with kv fails", func(t *testing.T) {
Params.Init()
mockAllocator := newMockAllocator()
......
......@@ -1278,6 +1278,7 @@ type dataCoordConfig struct {
SegmentMaxLifetime time.Duration
SegmentMaxIdleTime time.Duration
SegmentMinSizeFromIdleToSealed float64
SegmentMaxBinlogFileNumber int
CreatedTime time.Time
UpdatedTime time.Time
......@@ -1319,6 +1320,7 @@ func (p *dataCoordConfig) init(base *BaseTable) {
p.initSegmentMaxLifetime()
p.initSegmentMaxIdleTime()
p.initSegmentMinSizeFromIdleToSealed()
p.initSegmentMaxBinlogFileNumber()
p.initEnableCompaction()
p.initEnableAutoCompaction()
......@@ -1375,6 +1377,11 @@ func (p *dataCoordConfig) initSegmentMinSizeFromIdleToSealed() {
log.Info("init segment min size from idle to sealed", zap.Float64("value", p.SegmentMinSizeFromIdleToSealed))
}
func (p *dataCoordConfig) initSegmentMaxBinlogFileNumber() {
p.SegmentMaxBinlogFileNumber = p.Base.ParseIntWithDefault("dataCoord.segment.maxBinlogFileNumber", 16)
log.Info("init segment max binlog file to sealed", zap.Int("value", p.SegmentMaxBinlogFileNumber))
}
func (p *dataCoordConfig) initChannelWatchPrefix() {
// WARN: this value should not be put to milvus.yaml. It's a default value for channel watch path.
// This will be removed after we reconstruct our config module.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册