segment_allocation_policy.go 7.1 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
S
sunby 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
S
sunby 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
S
sunby 已提交
16

17
package datacoord
18 19

import (
C
congqixia 已提交
20
	"errors"
S
sunby 已提交
21
	"sort"
S
sunby 已提交
22
	"time"
S
sunby 已提交
23

S
SimFG 已提交
24 25
	"github.com/milvus-io/milvus-proto/go-api/commonpb"
	"github.com/milvus-io/milvus-proto/go-api/schemapb"
26
	"github.com/milvus-io/milvus/internal/util/tsoutil"
27 28 29
	"github.com/milvus-io/milvus/internal/util/typeutil"
)

S
sunby 已提交
30
type calUpperLimitPolicy func(schema *schemapb.CollectionSchema) (int, error)
31

S
sunby 已提交
32
func calBySchemaPolicy(schema *schemapb.CollectionSchema) (int, error) {
C
congqixia 已提交
33 34 35
	if schema == nil {
		return -1, errors.New("nil schema")
	}
36 37 38 39
	sizePerRecord, err := typeutil.EstimateSizePerRecord(schema)
	if err != nil {
		return -1, err
	}
C
congqixia 已提交
40 41 42 43
	// check zero value, preventing panicking
	if sizePerRecord == 0 {
		return -1, errors.New("zero size record schema found")
	}
44
	threshold := Params.DataCoordCfg.SegmentMaxSize.GetAsFloat() * 1024 * 1024
45 46 47
	return int(threshold / float64(sizePerRecord)), nil
}

48 49 50 51 52 53 54 55 56 57 58 59
func calBySchemaPolicyWithDiskIndex(schema *schemapb.CollectionSchema) (int, error) {
	if schema == nil {
		return -1, errors.New("nil schema")
	}
	sizePerRecord, err := typeutil.EstimateSizePerRecord(schema)
	if err != nil {
		return -1, err
	}
	// check zero value, preventing panicking
	if sizePerRecord == 0 {
		return -1, errors.New("zero size record schema found")
	}
60
	threshold := Params.DataCoordCfg.DiskSegmentMaxSize.GetAsFloat() * 1024 * 1024
61 62 63
	return int(threshold / float64(sizePerRecord)), nil
}

64
// AllocatePolicy helper function definition to allocate Segment space
65 66
type AllocatePolicy func(segments []*SegmentInfo, count int64,
	maxCountPerSegment int64) ([]*Allocation, []*Allocation)
67

68
// AllocatePolicyV1 v1 policy simple allocation policy using Greedy Algorithm
69 70 71 72 73 74
func AllocatePolicyV1(segments []*SegmentInfo, count int64,
	maxCountPerSegment int64) ([]*Allocation, []*Allocation) {
	newSegmentAllocations := make([]*Allocation, 0)
	existedSegmentAllocations := make([]*Allocation, 0)
	// create new segment if count >= max num
	for count >= maxCountPerSegment {
75
		allocation := getAllocation(maxCountPerSegment)
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
		newSegmentAllocations = append(newSegmentAllocations, allocation)
		count -= maxCountPerSegment
	}

	// allocate space for remaining count
	if count == 0 {
		return newSegmentAllocations, existedSegmentAllocations
	}
	for _, segment := range segments {
		var allocSize int64
		for _, allocation := range segment.allocations {
			allocSize += allocation.NumOfRows
		}
		free := segment.GetMaxRowNum() - segment.GetNumOfRows() - allocSize
		if free < count {
			continue
		}
93 94
		allocation := getAllocation(count)
		allocation.SegmentID = segment.GetID()
95 96 97 98 99
		existedSegmentAllocations = append(existedSegmentAllocations, allocation)
		return newSegmentAllocations, existedSegmentAllocations
	}

	// allocate new segment for remaining count
100
	allocation := getAllocation(count)
101 102
	newSegmentAllocations = append(newSegmentAllocations, allocation)
	return newSegmentAllocations, existedSegmentAllocations
103 104
}

S
sunby 已提交
105
// segmentSealPolicy seal policy applies to segment
S
sunby 已提交
106
type segmentSealPolicy func(segment *SegmentInfo, ts Timestamp) bool
S
sunby 已提交
107 108 109

// getSegmentCapacityPolicy get segmentSealPolicy with segment size factor policy
func getSegmentCapacityPolicy(sizeFactor float64) segmentSealPolicy {
S
sunby 已提交
110
	return func(segment *SegmentInfo, ts Timestamp) bool {
S
sunby 已提交
111
		var allocSize int64
S
sunby 已提交
112
		for _, allocation := range segment.allocations {
113
			allocSize += allocation.NumOfRows
S
sunby 已提交
114
		}
S
sunby 已提交
115
		return float64(segment.currRows) >= sizeFactor*float64(segment.GetMaxRowNum())
S
sunby 已提交
116 117 118
	}
}

119
// sealByMaxBinlogSizePolicy get segmentSealPolicy with lifetime limit compares ts - segment.lastExpireTime
120
func sealByLifetimePolicy(lifetime time.Duration) segmentSealPolicy {
S
sunby 已提交
121
	return func(segment *SegmentInfo, ts Timestamp) bool {
122 123 124 125
		pts, _ := tsoutil.ParseTS(ts)
		epts, _ := tsoutil.ParseTS(segment.GetLastExpireTime())
		d := pts.Sub(epts)
		return d >= lifetime
S
sunby 已提交
126 127 128
	}
}

129 130 131 132 133 134 135 136 137 138 139 140
// sealByMaxBinlogSizePolicy seal segment if binlog file number of segment exceed configured max number
func sealByMaxBinlogFileNumberPolicy(maxBinlogFileNumber int) segmentSealPolicy {
	return func(segment *SegmentInfo, ts Timestamp) bool {
		logFileCounter := 0
		for _, fieldBinlog := range segment.Binlogs {
			logFileCounter += len(fieldBinlog.GetBinlogs())
		}

		return logFileCounter >= maxBinlogFileNumber
	}
}

141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
// sealLongTimeIdlePolicy seal segment if the segment has been written with a high frequency before.
// serve for this case:
// If users insert entities into segment continuously within a certain period of time, but they forgot to flush/(seal)
// it and the size of segment didn't reach the seal proportion. Under this situation, Milvus will wait these segments to
// be expired and during this period search latency may be a little high. We can assume that entities won't be inserted
// into this segment anymore, so sealLongTimeIdlePolicy will seal these segments to trigger handoff of query cluster.
// Q: Why we don't decrease the expiry time directly?
// A: We don't want to influence segments which are accepting `frequent small` batch entities.
func sealLongTimeIdlePolicy(idleTimeTolerance time.Duration, minSizeToSealIdleSegment float64, maxSizeOfSegment float64) segmentSealPolicy {
	return func(segment *SegmentInfo, ts Timestamp) bool {
		limit := (minSizeToSealIdleSegment / maxSizeOfSegment) * float64(segment.GetMaxRowNum())
		return time.Since(segment.lastWrittenTime) > idleTimeTolerance &&
			float64(segment.currRows) > limit
	}
}

157 158 159
// channelSealPolicy seal policy applies to channel
type channelSealPolicy func(string, []*SegmentInfo, Timestamp) []*SegmentInfo

S
sunby 已提交
160 161
// getChannelCapacityPolicy get channelSealPolicy with channel segment capacity policy
func getChannelOpenSegCapacityPolicy(limit int) channelSealPolicy {
S
sunby 已提交
162
	return func(channel string, segs []*SegmentInfo, ts Timestamp) []*SegmentInfo {
S
sunby 已提交
163
		if len(segs) <= limit {
S
sunby 已提交
164
			return []*SegmentInfo{}
S
sunby 已提交
165
		}
166
		sortSegmentsByLastExpires(segs)
S
sunby 已提交
167
		offLen := len(segs) - limit
168 169 170
		if offLen > len(segs) {
			offLen = len(segs)
		}
S
sunby 已提交
171 172 173 174 175
		return segs[0:offLen]
	}
}

// sortSegStatusByLastExpires sort segmentStatus with lastExpireTime ascending order
S
sunby 已提交
176
func sortSegmentsByLastExpires(segs []*SegmentInfo) {
S
sunby 已提交
177
	sort.Slice(segs, func(i, j int) bool {
178
		return segs[i].LastExpireTime < segs[j].LastExpireTime
S
sunby 已提交
179 180 181
	})
}

S
sunby 已提交
182
type flushPolicy func(segment *SegmentInfo, t Timestamp) bool
183

S
sunby 已提交
184 185
const flushInterval = 2 * time.Second

S
sunby 已提交
186
func flushPolicyV1(segment *SegmentInfo, t Timestamp) bool {
S
sunby 已提交
187
	return segment.GetState() == commonpb.SegmentState_Sealed &&
188
		time.Since(segment.lastFlushTime) >= flushInterval &&
189
		(segment.GetLastExpireTime() <= t && segment.currRows != 0 || (segment.IsImporting))
190
}