segment_info.go 10.5 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

S
sunby 已提交
17 18 19
package datacoord

import (
S
sunby 已提交
20 21
	"time"

22
	"github.com/golang/protobuf/proto"
G
godchen 已提交
23

S
sunby 已提交
24 25 26 27 28
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
)

29
// SegmentsInfo wraps a map, which maintains ID to SegmentInfo relation
S
sunby 已提交
30
type SegmentsInfo struct {
S
sunby 已提交
31 32 33
	segments map[UniqueID]*SegmentInfo
}

34
// SegmentInfo wraps datapb.SegmentInfo and patches some extra info on it
S
sunby 已提交
35 36
type SegmentInfo struct {
	*datapb.SegmentInfo
S
sunby 已提交
37 38 39
	currRows      int64
	allocations   []*Allocation
	lastFlushTime time.Time
S
sunby 已提交
40
	isCompacting  bool
S
sunby 已提交
41 42
}

43 44 45 46
// NewSegmentInfo create `SegmentInfo` wrapper from `datapb.SegmentInfo`
// assign current rows to 0 and pre-allocate `allocations` slice
// Note that the allocation information is not preserved,
// the worst case scenario is to have a segment with twice size we expects
S
sunby 已提交
47 48
func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
	return &SegmentInfo{
49 50 51 52
		SegmentInfo:   info,
		currRows:      0,
		allocations:   make([]*Allocation, 0, 16),
		lastFlushTime: time.Now().Add(-1 * flushInterval),
S
sunby 已提交
53
	}
S
sunby 已提交
54 55
}

56 57
// NewSegmentsInfo creates a `SegmentsInfo` instance, which makes sure internal map is initialized
// note that no mutex is wrapped so external concurrent control is needed
S
sunby 已提交
58
func NewSegmentsInfo() *SegmentsInfo {
S
sunby 已提交
59
	return &SegmentsInfo{segments: make(map[UniqueID]*SegmentInfo)}
S
sunby 已提交
60 61
}

62
// GetSegment returns SegmentInfo
S
sunby 已提交
63
func (s *SegmentsInfo) GetSegment(segmentID UniqueID) *SegmentInfo {
S
sunby 已提交
64 65 66 67 68 69 70
	segment, ok := s.segments[segmentID]
	if !ok {
		return nil
	}
	return segment
}

71 72
// GetSegments iterates internal map and returns all SegmentInfo in a slice
// no deep copy applied
S
sunby 已提交
73 74
func (s *SegmentsInfo) GetSegments() []*SegmentInfo {
	segments := make([]*SegmentInfo, 0, len(s.segments))
S
sunby 已提交
75 76 77 78 79 80
	for _, segment := range s.segments {
		segments = append(segments, segment)
	}
	return segments
}

81 82
// DropSegment deletes provided segmentID
// no extra method is taken when segmentID not exists
S
sunby 已提交
83 84 85 86
func (s *SegmentsInfo) DropSegment(segmentID UniqueID) {
	delete(s.segments, segmentID)
}

87
// SetSegment sets SegmentInfo with segmentID, perform overwrite if already exists
S
sunby 已提交
88
func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) {
S
sunby 已提交
89 90 91
	s.segments[segmentID] = segment
}

92 93
// SetRowCount sets rowCount info for SegmentInfo with provided segmentID
// if SegmentInfo not found, do nothing
S
sunby 已提交
94 95
func (s *SegmentsInfo) SetRowCount(segmentID UniqueID, rowCount int64) {
	if segment, ok := s.segments[segmentID]; ok {
96
		s.segments[segmentID] = segment.Clone(SetRowCount(rowCount))
S
sunby 已提交
97 98 99
	}
}

100
// SetState sets Segment State info for SegmentInfo with provided segmentID
101
// if SegmentInfo not found, do nothing
S
sunby 已提交
102 103
func (s *SegmentsInfo) SetState(segmentID UniqueID, state commonpb.SegmentState) {
	if segment, ok := s.segments[segmentID]; ok {
104
		s.segments[segmentID] = segment.Clone(SetState(state))
S
sunby 已提交
105 106 107
	}
}

108 109
// SetDmlPosition sets DmlPosition info (checkpoint for recovery) for SegmentInfo with provided segmentID
// if SegmentInfo not found, do nothing
S
sunby 已提交
110
func (s *SegmentsInfo) SetDmlPosition(segmentID UniqueID, pos *internalpb.MsgPosition) {
S
sunby 已提交
111
	if segment, ok := s.segments[segmentID]; ok {
S
sunby 已提交
112
		s.segments[segmentID] = segment.Clone(SetDmlPosition(pos))
S
sunby 已提交
113 114 115
	}
}

116 117
// SetStartPosition sets StartPosition info (recovery info when no checkout point found) for SegmentInfo with provided segmentID
// if SegmentInfo not found, do nothing
S
sunby 已提交
118 119
func (s *SegmentsInfo) SetStartPosition(segmentID UniqueID, pos *internalpb.MsgPosition) {
	if segment, ok := s.segments[segmentID]; ok {
S
sunby 已提交
120
		s.segments[segmentID] = segment.Clone(SetStartPosition(pos))
S
sunby 已提交
121 122 123
	}
}

124 125 126
// SetAllocations sets allocations for segment with specified id
// if the segment id is not found, do nothing
// uses `ShadowClone` since internal SegmentInfo is not changed
S
sunby 已提交
127 128
func (s *SegmentsInfo) SetAllocations(segmentID UniqueID, allocations []*Allocation) {
	if segment, ok := s.segments[segmentID]; ok {
S
sunby 已提交
129
		s.segments[segmentID] = segment.ShadowClone(SetAllocations(allocations))
S
sunby 已提交
130 131 132
	}
}

133 134 135
// AddAllocation adds a new allocation to specified segment
// if the segment is not found, do nothing
// uses `Clone` since internal SegmentInfo's LastExpireTime is changed
S
sunby 已提交
136 137
func (s *SegmentsInfo) AddAllocation(segmentID UniqueID, allocation *Allocation) {
	if segment, ok := s.segments[segmentID]; ok {
S
sunby 已提交
138
		s.segments[segmentID] = segment.Clone(AddAllocation(allocation))
S
sunby 已提交
139 140 141
	}
}

142
// SetCurrentRows sets rows count for segment
143 144
// if the segment is not found, do nothing
// uses `ShadowClone` since internal SegmentInfo is not changed
S
sunby 已提交
145 146
func (s *SegmentsInfo) SetCurrentRows(segmentID UniqueID, rows int64) {
	if segment, ok := s.segments[segmentID]; ok {
S
sunby 已提交
147
		s.segments[segmentID] = segment.ShadowClone(SetCurrentRows(rows))
S
sunby 已提交
148 149 150
	}
}

151
// SetBinlogs sets binlog paths for segment
152 153
// if the segment is not found, do nothing
// uses `Clone` since internal SegmentInfo's Binlogs is changed
S
sunby 已提交
154 155 156 157 158 159
func (s *SegmentsInfo) SetBinlogs(segmentID UniqueID, binlogs []*datapb.FieldBinlog) {
	if segment, ok := s.segments[segmentID]; ok {
		s.segments[segmentID] = segment.Clone(SetBinlogs(binlogs))
	}
}

160
// SetFlushTime sets flush time for segment
161 162
// if the segment is not found, do nothing
// uses `ShadowClone` since internal SegmentInfo is not changed
S
sunby 已提交
163 164 165 166 167 168
func (s *SegmentsInfo) SetFlushTime(segmentID UniqueID, t time.Time) {
	if segment, ok := s.segments[segmentID]; ok {
		s.segments[segmentID] = segment.ShadowClone(SetFlushTime(t))
	}
}

169
// AddSegmentBinlogs adds binlogs for segment
170 171
// if the segment is not found, do nothing
// uses `Clone` since internal SegmentInfo's Binlogs is changed
172
func (s *SegmentsInfo) AddSegmentBinlogs(segmentID UniqueID, field2Binlogs map[UniqueID][]*datapb.Binlog) {
173 174 175 176 177
	if segment, ok := s.segments[segmentID]; ok {
		s.segments[segmentID] = segment.Clone(addSegmentBinlogs(field2Binlogs))
	}
}

S
sunby 已提交
178 179 180 181 182 183 184
// SetIsCompacting sets compactino status for segment
func (s *SegmentsInfo) SetIsCompacting(segmentID UniqueID, isCompacting bool) {
	if segment, ok := s.segments[segmentID]; ok {
		s.segments[segmentID] = segment.ShadowClone(SetIsCompacting(isCompacting))
	}
}

S
sunby 已提交
185
// Clone deep clone the segment info and return a new instance
S
sunby 已提交
186 187
func (s *SegmentInfo) Clone(opts ...SegmentInfoOption) *SegmentInfo {
	info := proto.Clone(s.SegmentInfo).(*datapb.SegmentInfo)
S
sunby 已提交
188
	cloned := &SegmentInfo{
S
sunby 已提交
189 190 191 192
		SegmentInfo:   info,
		currRows:      s.currRows,
		allocations:   s.allocations,
		lastFlushTime: s.lastFlushTime,
S
sunby 已提交
193 194 195 196 197 198 199
	}
	for _, opt := range opts {
		opt(cloned)
	}
	return cloned
}

200
// ShadowClone shadow clone the segment and return a new instance
S
sunby 已提交
201
func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo {
S
sunby 已提交
202
	cloned := &SegmentInfo{
S
sunby 已提交
203 204 205 206
		SegmentInfo:   s.SegmentInfo,
		currRows:      s.currRows,
		allocations:   s.allocations,
		lastFlushTime: s.lastFlushTime,
S
sunby 已提交
207 208 209 210 211 212 213 214
	}

	for _, opt := range opts {
		opt(cloned)
	}
	return cloned
}

215
// SegmentInfoOption is the option to set fields in segment info
S
sunby 已提交
216
type SegmentInfoOption func(segment *SegmentInfo)
S
sunby 已提交
217

218
// SetRowCount is the option to set row count for segment info
S
sunby 已提交
219
func SetRowCount(rowCount int64) SegmentInfoOption {
S
sunby 已提交
220
	return func(segment *SegmentInfo) {
S
sunby 已提交
221 222 223 224
		segment.NumOfRows = rowCount
	}
}

225
// SetExpireTime is the option to set expire time for segment info
S
sunby 已提交
226
func SetExpireTime(expireTs Timestamp) SegmentInfoOption {
S
sunby 已提交
227
	return func(segment *SegmentInfo) {
S
sunby 已提交
228 229 230 231
		segment.LastExpireTime = expireTs
	}
}

S
sunby 已提交
232
// SetState is the option to set state for segment info
S
sunby 已提交
233
func SetState(state commonpb.SegmentState) SegmentInfoOption {
S
sunby 已提交
234
	return func(segment *SegmentInfo) {
S
sunby 已提交
235 236 237 238
		segment.State = state
	}
}

239
// SetDmlPosition is the option to set dml position for segment info
S
sunby 已提交
240
func SetDmlPosition(pos *internalpb.MsgPosition) SegmentInfoOption {
S
sunby 已提交
241
	return func(segment *SegmentInfo) {
S
sunby 已提交
242 243 244 245
		segment.DmlPosition = pos
	}
}

246
// SetStartPosition is the option to set start position for segment info
S
sunby 已提交
247
func SetStartPosition(pos *internalpb.MsgPosition) SegmentInfoOption {
S
sunby 已提交
248
	return func(segment *SegmentInfo) {
S
sunby 已提交
249 250 251
		segment.StartPosition = pos
	}
}
S
sunby 已提交
252

253
// SetAllocations is the option to set allocations for segment info
S
sunby 已提交
254 255 256 257 258 259
func SetAllocations(allocations []*Allocation) SegmentInfoOption {
	return func(segment *SegmentInfo) {
		segment.allocations = allocations
	}
}

260
// AddAllocation is the option to add allocation info for segment info
S
sunby 已提交
261 262 263
func AddAllocation(allocation *Allocation) SegmentInfoOption {
	return func(segment *SegmentInfo) {
		segment.allocations = append(segment.allocations, allocation)
264
		segment.LastExpireTime = allocation.ExpireTime
S
sunby 已提交
265 266 267
	}
}

268
// SetCurrentRows is the option to set current row count for segment info
S
sunby 已提交
269 270 271 272 273
func SetCurrentRows(rows int64) SegmentInfoOption {
	return func(segment *SegmentInfo) {
		segment.currRows = rows
	}
}
S
sunby 已提交
274

275
// SetBinlogs is the option to set binlogs for segment info
S
sunby 已提交
276 277 278 279 280
func SetBinlogs(binlogs []*datapb.FieldBinlog) SegmentInfoOption {
	return func(segment *SegmentInfo) {
		segment.Binlogs = binlogs
	}
}
S
sunby 已提交
281

282
// SetFlushTime is the option to set flush time for segment info
S
sunby 已提交
283 284 285 286 287
func SetFlushTime(t time.Time) SegmentInfoOption {
	return func(segment *SegmentInfo) {
		segment.lastFlushTime = t
	}
}
288

G
godchen 已提交
289
// SetIsCompacting is the option to set compaction state for segment info
S
sunby 已提交
290 291 292 293 294 295
func SetIsCompacting(isCompacting bool) SegmentInfoOption {
	return func(segment *SegmentInfo) {
		segment.isCompacting = isCompacting
	}
}

296
func addSegmentBinlogs(field2Binlogs map[UniqueID][]*datapb.Binlog) SegmentInfoOption {
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
	return func(segment *SegmentInfo) {
		for fieldID, binlogPaths := range field2Binlogs {
			found := false
			for _, binlog := range segment.Binlogs {
				if binlog.FieldID != fieldID {
					continue
				}
				binlog.Binlogs = append(binlog.Binlogs, binlogPaths...)
				found = true
				break
			}
			if !found {
				// if no field matched
				segment.Binlogs = append(segment.Binlogs, &datapb.FieldBinlog{
					FieldID: fieldID,
					Binlogs: binlogPaths,
				})
			}
		}
	}
}
318 319 320

// SegmentInfoSelector is the function type to select SegmentInfo from meta
type SegmentInfoSelector func(*SegmentInfo) bool