segment_allocation_policy.go 4.5 KB
Newer Older
S
sunby 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package datacoord
13 14

import (
S
sunby 已提交
15 16
	"sort"

17
	"github.com/milvus-io/milvus/internal/proto/commonpb"
18 19 20 21
	"github.com/milvus-io/milvus/internal/proto/schemapb"
	"github.com/milvus-io/milvus/internal/util/typeutil"
)

S
sunby 已提交
22
type calUpperLimitPolicy func(schema *schemapb.CollectionSchema) (int, error)
23

S
sunby 已提交
24
func calBySchemaPolicy(schema *schemapb.CollectionSchema) (int, error) {
25 26 27 28
	sizePerRecord, err := typeutil.EstimateSizePerRecord(schema)
	if err != nil {
		return -1, err
	}
S
sunby 已提交
29
	threshold := Params.SegmentMaxSize * 1024 * 1024
30 31 32
	return int(threshold / float64(sizePerRecord)), nil
}

33 34
type AllocatePolicy func(segments []*SegmentInfo, count int64,
	maxCountPerSegment int64) ([]*Allocation, []*Allocation)
35

36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
func AllocatePolicyV1(segments []*SegmentInfo, count int64,
	maxCountPerSegment int64) ([]*Allocation, []*Allocation) {
	newSegmentAllocations := make([]*Allocation, 0)
	existedSegmentAllocations := make([]*Allocation, 0)
	// create new segment if count >= max num
	for count >= maxCountPerSegment {
		allocation := &Allocation{
			NumOfRows: maxCountPerSegment,
		}
		newSegmentAllocations = append(newSegmentAllocations, allocation)
		count -= maxCountPerSegment
	}

	// allocate space for remaining count
	if count == 0 {
		return newSegmentAllocations, existedSegmentAllocations
	}
	for _, segment := range segments {
		var allocSize int64
		for _, allocation := range segment.allocations {
			allocSize += allocation.NumOfRows
		}
		free := segment.GetMaxRowNum() - segment.GetNumOfRows() - allocSize
		if free < count {
			continue
		}
		allocation := &Allocation{
			SegmentID: segment.GetID(),
			NumOfRows: count,
		}
		existedSegmentAllocations = append(existedSegmentAllocations, allocation)
		return newSegmentAllocations, existedSegmentAllocations
	}

	// allocate new segment for remaining count
	allocation := &Allocation{
		NumOfRows: count,
S
sunby 已提交
73
	}
74 75
	newSegmentAllocations = append(newSegmentAllocations, allocation)
	return newSegmentAllocations, existedSegmentAllocations
76 77
}

S
sunby 已提交
78
type sealPolicy func(maxCount, writtenCount, allocatedCount int64) bool
79

S
sunby 已提交
80
// segmentSealPolicy seal policy applies to segment
S
sunby 已提交
81
type segmentSealPolicy func(segment *SegmentInfo, ts Timestamp) bool
S
sunby 已提交
82 83

// channelSealPolicy seal policy applies to channel
S
sunby 已提交
84
type channelSealPolicy func(string, []*SegmentInfo, Timestamp) []*SegmentInfo
S
sunby 已提交
85 86 87

// getSegmentCapacityPolicy get segmentSealPolicy with segment size factor policy
func getSegmentCapacityPolicy(sizeFactor float64) segmentSealPolicy {
S
sunby 已提交
88
	return func(segment *SegmentInfo, ts Timestamp) bool {
S
sunby 已提交
89
		var allocSize int64
S
sunby 已提交
90
		for _, allocation := range segment.allocations {
91
			allocSize += allocation.NumOfRows
S
sunby 已提交
92
		}
S
sunby 已提交
93
		return float64(segment.currRows) >= sizeFactor*float64(segment.GetMaxRowNum())
S
sunby 已提交
94 95 96 97 98
	}
}

// getLastExpiresLifetimePolicy get segmentSealPolicy with lifetime limit compares ts - segment.lastExpireTime
func getLastExpiresLifetimePolicy(lifetime uint64) segmentSealPolicy {
S
sunby 已提交
99 100
	return func(segment *SegmentInfo, ts Timestamp) bool {
		return (ts - segment.GetLastExpireTime()) > lifetime
S
sunby 已提交
101 102 103 104 105
	}
}

// getChannelCapacityPolicy get channelSealPolicy with channel segment capacity policy
func getChannelOpenSegCapacityPolicy(limit int) channelSealPolicy {
S
sunby 已提交
106
	return func(channel string, segs []*SegmentInfo, ts Timestamp) []*SegmentInfo {
S
sunby 已提交
107
		if len(segs) <= limit {
S
sunby 已提交
108
			return []*SegmentInfo{}
S
sunby 已提交
109
		}
110
		sortSegmentsByLastExpires(segs)
S
sunby 已提交
111 112 113 114 115 116
		offLen := len(segs) - limit
		return segs[0:offLen]
	}
}

// sortSegStatusByLastExpires sort segmentStatus with lastExpireTime ascending order
S
sunby 已提交
117
func sortSegmentsByLastExpires(segs []*SegmentInfo) {
S
sunby 已提交
118
	sort.Slice(segs, func(i, j int) bool {
119
		return segs[i].LastExpireTime < segs[j].LastExpireTime
S
sunby 已提交
120 121 122
	})
}

S
sunby 已提交
123
func sealPolicyV1(maxCount, writtenCount, allocatedCount int64) bool {
S
sunby 已提交
124
	return float64(writtenCount) >= Params.SegmentSealProportion*float64(maxCount)
125 126
}

S
sunby 已提交
127
type flushPolicy func(segment *SegmentInfo, t Timestamp) bool
128

S
sunby 已提交
129 130
func flushPolicyV1(segment *SegmentInfo, t Timestamp) bool {
	return segment.GetState() == commonpb.SegmentState_Sealed && segment.GetLastExpireTime() <= t
131
}