segment_allocator.go 8.9 KB
Newer Older
S
sunby 已提交
1 2 3
package dataservice

import (
紫晴 已提交
4
	"context"
S
sunby 已提交
5 6 7 8 9
	"fmt"
	"strconv"
	"sync"
	"time"

X
Xiangyu Wang 已提交
10 11 12
	"go.uber.org/zap"

	"github.com/zilliztech/milvus-distributed/internal/log"
紫晴 已提交
13 14
	"github.com/zilliztech/milvus-distributed/internal/util/trace"
	"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
S
sunby 已提交
15 16
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"

紫晴 已提交
17
	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
S
sunby 已提交
18 19 20 21 22 23
)

type errRemainInSufficient struct {
	requestRows int
}

S
sunby 已提交
24 25
func newErrRemainInSufficient(requestRows int) errRemainInSufficient {
	return errRemainInSufficient{requestRows: requestRows}
S
sunby 已提交
26 27
}

S
sunby 已提交
28
func (err errRemainInSufficient) Error() string {
S
sunby 已提交
29 30 31 32
	return "segment remaining is insufficient for" + strconv.Itoa(err.requestRows)
}

// segmentAllocator is used to allocate rows for segments and record the allocations.
33
type segmentAllocatorInterface interface {
S
sunby 已提交
34
	// OpenSegment add the segment to allocator and set it allocatable
紫晴 已提交
35
	OpenSegment(ctx context.Context, segmentInfo *datapb.SegmentInfo) error
S
sunby 已提交
36
	// AllocSegment allocate rows and record the allocation.
紫晴 已提交
37
	AllocSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int) (UniqueID, int, Timestamp, error)
S
sunby 已提交
38
	// GetSealedSegments get all sealed segment.
紫晴 已提交
39
	GetSealedSegments(ctx context.Context) ([]UniqueID, error)
S
sunby 已提交
40
	// SealSegment set segment sealed, the segment will not be allocated anymore.
紫晴 已提交
41
	SealSegment(ctx context.Context, segmentID UniqueID) error
S
sunby 已提交
42
	// DropSegment drop the segment from allocator.
紫晴 已提交
43
	DropSegment(ctx context.Context, segmentID UniqueID)
S
sunby 已提交
44
	// ExpireAllocations check all allocations' expire time and remove the expired allocation.
紫晴 已提交
45
	ExpireAllocations(ctx context.Context, timeTick Timestamp) error
S
sunby 已提交
46
	// SealAllSegments get all opened segment ids of collection. return success and failed segment ids
紫晴 已提交
47
	SealAllSegments(ctx context.Context, collectionID UniqueID)
S
sunby 已提交
48
	// IsAllocationsExpired check all allocations of segment expired.
紫晴 已提交
49
	IsAllocationsExpired(ctx context.Context, segmentID UniqueID, ts Timestamp) (bool, error)
S
sunby 已提交
50 51
}

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
type segmentStatus struct {
	id             UniqueID
	collectionID   UniqueID
	partitionID    UniqueID
	total          int
	sealed         bool
	lastExpireTime Timestamp
	allocations    []*allocation
	insertChannel  string
}
type allocation struct {
	rowNums    int
	expireTime Timestamp
}
type segmentAllocator struct {
	mt                     *meta
	segments               map[UniqueID]*segmentStatus //segment id -> status
	segmentExpireDuration  int64
	segmentThreshold       float64
	segmentThresholdFactor float64
	mu                     sync.RWMutex
	allocator              allocatorInterface
}
S
sunby 已提交
75

76 77
func newSegmentAllocator(meta *meta, allocator allocatorInterface) *segmentAllocator {
	segmentAllocator := &segmentAllocator{
S
sunby 已提交
78
		mt:                     meta,
S
sunby 已提交
79 80 81 82
		segments:               make(map[UniqueID]*segmentStatus),
		segmentExpireDuration:  Params.SegIDAssignExpiration,
		segmentThreshold:       Params.SegmentSize * 1024 * 1024,
		segmentThresholdFactor: Params.SegmentSizeFactor,
S
sunby 已提交
83
		allocator:              allocator,
S
sunby 已提交
84
	}
Y
yukun 已提交
85
	return segmentAllocator
S
sunby 已提交
86 87
}

紫晴 已提交
88 89 90
func (allocator *segmentAllocator) OpenSegment(ctx context.Context, segmentInfo *datapb.SegmentInfo) error {
	sp, _ := trace.StartSpanFromContext(ctx)
	defer sp.Finish()
X
Xiangyu Wang 已提交
91 92
	allocator.mu.Lock()
	defer allocator.mu.Unlock()
S
sunby 已提交
93 94
	if _, ok := allocator.segments[segmentInfo.ID]; ok {
		return fmt.Errorf("segment %d already exist", segmentInfo.ID)
S
sunby 已提交
95
	}
S
sunby 已提交
96
	totalRows, err := allocator.estimateTotalRows(segmentInfo.CollectionID)
S
sunby 已提交
97 98 99
	if err != nil {
		return err
	}
X
Xiangyu Wang 已提交
100 101
	log.Debug("dataservice: estimateTotalRows: ",
		zap.Int64("CollectionID", segmentInfo.CollectionID),
S
sunby 已提交
102
		zap.Int64("SegmentID", segmentInfo.ID),
X
Xiangyu Wang 已提交
103
		zap.Int("Rows", totalRows))
S
sunby 已提交
104 105
	allocator.segments[segmentInfo.ID] = &segmentStatus{
		id:             segmentInfo.ID,
S
sunby 已提交
106 107
		collectionID:   segmentInfo.CollectionID,
		partitionID:    segmentInfo.PartitionID,
S
sunby 已提交
108 109 110
		total:          totalRows,
		sealed:         false,
		lastExpireTime: 0,
S
sunby 已提交
111
		insertChannel:  segmentInfo.InsertChannel,
S
sunby 已提交
112 113 114 115
	}
	return nil
}

紫晴 已提交
116
func (allocator *segmentAllocator) AllocSegment(ctx context.Context, collectionID UniqueID,
S
sunby 已提交
117
	partitionID UniqueID, channelName string, requestRows int) (segID UniqueID, retCount int, expireTime Timestamp, err error) {
紫晴 已提交
118 119
	sp, _ := trace.StartSpanFromContext(ctx)
	defer sp.Finish()
S
sunby 已提交
120 121 122 123 124
	allocator.mu.Lock()
	defer allocator.mu.Unlock()

	for _, segStatus := range allocator.segments {
		if segStatus.sealed || segStatus.collectionID != collectionID || segStatus.partitionID != partitionID ||
S
sunby 已提交
125
			segStatus.insertChannel != channelName {
S
sunby 已提交
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
			continue
		}
		var success bool
		success, err = allocator.alloc(segStatus, requestRows)
		if err != nil {
			return
		}
		if !success {
			continue
		}
		segID = segStatus.id
		retCount = requestRows
		expireTime = segStatus.lastExpireTime
		return
	}

	err = newErrRemainInSufficient(requestRows)
	return
}

146
func (allocator *segmentAllocator) alloc(segStatus *segmentStatus, numRows int) (bool, error) {
S
sunby 已提交
147 148 149 150
	totalOfAllocations := 0
	for _, allocation := range segStatus.allocations {
		totalOfAllocations += allocation.rowNums
	}
S
sunby 已提交
151
	segMeta, err := allocator.mt.GetSegment(segStatus.id)
S
sunby 已提交
152 153 154 155
	if err != nil {
		return false, err
	}
	free := segStatus.total - int(segMeta.NumRows) - totalOfAllocations
X
Xiangyu Wang 已提交
156 157 158
	log.Debug("dataservice::alloc: ",
		zap.Any("segMeta.NumRows", int(segMeta.NumRows)),
		zap.Any("totalOfAllocations", totalOfAllocations))
S
sunby 已提交
159 160 161 162
	if numRows > free {
		return false, nil
	}

S
sunby 已提交
163
	ts, err := allocator.allocator.allocTimestamp()
S
sunby 已提交
164 165 166 167 168 169 170 171 172
	if err != nil {
		return false, err
	}
	physicalTs, logicalTs := tsoutil.ParseTS(ts)
	expirePhysicalTs := physicalTs.Add(time.Duration(allocator.segmentExpireDuration) * time.Millisecond)
	expireTs := tsoutil.ComposeTS(expirePhysicalTs.UnixNano()/int64(time.Millisecond), int64(logicalTs))
	segStatus.lastExpireTime = expireTs
	segStatus.allocations = append(segStatus.allocations, &allocation{
		numRows,
S
sunby 已提交
173
		expireTs,
S
sunby 已提交
174 175 176 177 178
	})

	return true, nil
}

179
func (allocator *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int, error) {
S
sunby 已提交
180 181 182 183 184 185 186 187 188 189 190
	collMeta, err := allocator.mt.GetCollection(collectionID)
	if err != nil {
		return -1, err
	}
	sizePerRecord, err := typeutil.EstimateSizePerRecord(collMeta.Schema)
	if err != nil {
		return -1, err
	}
	return int(allocator.segmentThreshold / float64(sizePerRecord)), nil
}

紫晴 已提交
191 192 193
func (allocator *segmentAllocator) GetSealedSegments(ctx context.Context) ([]UniqueID, error) {
	sp, _ := trace.StartSpanFromContext(ctx)
	defer sp.Finish()
S
sunby 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
	allocator.mu.Lock()
	defer allocator.mu.Unlock()
	keys := make([]UniqueID, 0)
	for _, segStatus := range allocator.segments {
		if !segStatus.sealed {
			sealed, err := allocator.checkSegmentSealed(segStatus)
			if err != nil {
				return nil, err
			}
			segStatus.sealed = sealed
		}
		if segStatus.sealed {
			keys = append(keys, segStatus.id)
		}
	}
	return keys, nil
}

212
func (allocator *segmentAllocator) checkSegmentSealed(segStatus *segmentStatus) (bool, error) {
S
sunby 已提交
213
	segMeta, err := allocator.mt.GetSegment(segStatus.id)
S
sunby 已提交
214 215 216 217 218 219
	if err != nil {
		return false, err
	}
	return float64(segMeta.NumRows) >= allocator.segmentThresholdFactor*float64(segStatus.total), nil
}

紫晴 已提交
220 221 222
func (allocator *segmentAllocator) SealSegment(ctx context.Context, segmentID UniqueID) error {
	sp, _ := trace.StartSpanFromContext(ctx)
	defer sp.Finish()
S
sunby 已提交
223 224 225 226
	allocator.mu.Lock()
	defer allocator.mu.Unlock()
	status, ok := allocator.segments[segmentID]
	if !ok {
S
sunby 已提交
227 228
		return nil
	}
S
sunby 已提交
229
	status.sealed = true
S
sunby 已提交
230
	return nil
S
sunby 已提交
231 232
}

紫晴 已提交
233 234 235
func (allocator *segmentAllocator) DropSegment(ctx context.Context, segmentID UniqueID) {
	sp, _ := trace.StartSpanFromContext(ctx)
	defer sp.Finish()
S
sunby 已提交
236 237 238 239 240
	allocator.mu.Lock()
	defer allocator.mu.Unlock()
	delete(allocator.segments, segmentID)
}

紫晴 已提交
241 242 243
func (allocator *segmentAllocator) ExpireAllocations(ctx context.Context, timeTick Timestamp) error {
	sp, _ := trace.StartSpanFromContext(ctx)
	defer sp.Finish()
S
sunby 已提交
244 245 246 247 248 249 250
	allocator.mu.Lock()
	defer allocator.mu.Unlock()
	for _, segStatus := range allocator.segments {
		for i := 0; i < len(segStatus.allocations); i++ {
			if timeTick < segStatus.allocations[i].expireTime {
				continue
			}
X
Xiangyu Wang 已提交
251 252 253
			log.Debug("dataservice::ExpireAllocations: ",
				zap.Any("segStatus.id", segStatus.id),
				zap.Any("segStatus.allocations.rowNums", segStatus.allocations[i].rowNums))
S
sunby 已提交
254 255 256 257 258 259 260
			segStatus.allocations = append(segStatus.allocations[:i], segStatus.allocations[i+1:]...)
			i--
		}
	}
	return nil
}

紫晴 已提交
261 262 263
func (allocator *segmentAllocator) IsAllocationsExpired(ctx context.Context, segmentID UniqueID, ts Timestamp) (bool, error) {
	sp, _ := trace.StartSpanFromContext(ctx)
	defer sp.Finish()
S
sunby 已提交
264 265 266 267 268 269 270 271
	allocator.mu.RLock()
	defer allocator.mu.RUnlock()
	status, ok := allocator.segments[segmentID]
	if !ok {
		return false, fmt.Errorf("segment %d not found", segmentID)
	}
	return status.lastExpireTime <= ts, nil
}
S
sunby 已提交
272

紫晴 已提交
273 274 275
func (allocator *segmentAllocator) SealAllSegments(ctx context.Context, collectionID UniqueID) {
	sp, _ := trace.StartSpanFromContext(ctx)
	defer sp.Finish()
S
sunby 已提交
276 277 278 279 280 281 282 283 284 285 286
	allocator.mu.Lock()
	defer allocator.mu.Unlock()
	for _, status := range allocator.segments {
		if status.collectionID == collectionID {
			if status.sealed {
				continue
			}
			status.sealed = true
		}
	}
}