collection_replica.go 10.6 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

X
XuanYang-cn 已提交
12 13 14
package datanode

import (
S
sunby 已提交
15
	"context"
S
sunby 已提交
16
	"fmt"
X
XuanYang-cn 已提交
17
	"sync"
X
XuanYang-cn 已提交
18
	"sync/atomic"
X
XuanYang-cn 已提交
19

X
XuanYang-cn 已提交
20 21
	"go.uber.org/zap"

X
Xiangyu Wang 已提交
22
	"github.com/milvus-io/milvus/internal/log"
23
	"github.com/milvus-io/milvus/internal/proto/datapb"
X
Xiangyu Wang 已提交
24
	"github.com/milvus-io/milvus/internal/proto/internalpb"
S
sunby 已提交
25
	"github.com/milvus-io/milvus/internal/types"
X
XuanYang-cn 已提交
26 27
)

28
type Replica interface {
S
sunby 已提交
29
	init(initTs Timestamp) error
X
XuanYang-cn 已提交
30

S
sunby 已提交
31
	getCollectionByID(collectionID UniqueID, ts Timestamp) (*Collection, error)
X
XuanYang-cn 已提交
32 33
	hasCollection(collectionID UniqueID) bool

34
	// segment
S
sunby 已提交
35
	addSegment(segmentID, collID, partitionID UniqueID, channelName string) error
36 37
	removeSegment(segmentID UniqueID) error
	hasSegment(segmentID UniqueID) bool
38
	updateStatistics(segmentID UniqueID, numRows int64) error
G
godchen 已提交
39
	getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error)
X
XuanYang-cn 已提交
40
	getSegmentByID(segmentID UniqueID) (*Segment, error)
41 42
	bufferAutoFlushBinlogPaths(segmentID UniqueID, field2Path map[UniqueID]string) error
	getBufferPaths(segID UniqueID) (map[UniqueID][]string, error)
S
sunby 已提交
43
	getChannelName(segID UniqueID) (string, error)
44 45
	setStartPositions(segmentID UniqueID, startPos []*internalpb.MsgPosition) error
	setEndPositions(segmentID UniqueID, endPos []*internalpb.MsgPosition) error
46
	getAllStartPositions() []*datapb.SegmentStartPosition
47
	getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition)
N
neza2017 已提交
48
	listOpenSegmentCheckPointAndNumRows(segs []UniqueID) (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64)
X
XuanYang-cn 已提交
49 50
}

51
// Segment is the data structure of segments in data node replica.
52
type Segment struct {
X
XuanYang-cn 已提交
53 54 55 56 57 58
	segmentID    UniqueID
	collectionID UniqueID
	partitionID  UniqueID
	numRows      int64
	memorySize   int64
	isNew        atomic.Value // bool
59 60
	channelName  string
	field2Paths  map[UniqueID][]string // fieldID to binlog paths, only auto-flushed paths will be buffered.
61
}
62

63 64
// CollectionSegmentReplica is the data replication of persistent data in datanode.
// It implements `Replica` interface.
65 66
type CollectionSegmentReplica struct {
	mu          sync.RWMutex
S
sunby 已提交
67
	collection  *Collection
X
XuanYang-cn 已提交
68
	segments    map[UniqueID]*Segment
S
sunby 已提交
69
	metaService *metaService
70

N
neza2017 已提交
71 72 73
	posMu          sync.Mutex
	startPositions map[UniqueID][]*internalpb.MsgPosition
	endPositions   map[UniqueID][]*internalpb.MsgPosition
74
}
75

76 77
var _ Replica = &CollectionSegmentReplica{}

S
sunby 已提交
78 79
func newReplica(ms types.MasterService, collectionID UniqueID) Replica {
	metaService := newMetaService(ms, collectionID)
X
XuanYang-cn 已提交
80
	segments := make(map[UniqueID]*Segment)
S
sunby 已提交
81

82
	var replica Replica = &CollectionSegmentReplica{
N
neza2017 已提交
83
		segments:       segments,
S
sunby 已提交
84 85
		collection:     &Collection{id: collectionID},
		metaService:    metaService,
N
neza2017 已提交
86 87
		startPositions: make(map[UniqueID][]*internalpb.MsgPosition),
		endPositions:   make(map[UniqueID][]*internalpb.MsgPosition),
S
sunby 已提交
88 89 90 91
	}
	return replica
}

S
sunby 已提交
92 93 94 95 96 97 98 99 100 101 102 103 104
func (replica *CollectionSegmentReplica) init(initTs Timestamp) error {
	log.Debug("Initing replica ...")
	ctx := context.Background()
	schema, err := replica.metaService.getCollectionSchema(ctx, replica.collection.GetID(), initTs)
	if err != nil {
		log.Error("Replica init fail", zap.Error(err))
		return err
	}

	replica.collection.schema = schema
	return nil
}

S
sunby 已提交
105 106 107 108 109 110 111 112 113 114 115 116
func (replica *CollectionSegmentReplica) getChannelName(segID UniqueID) (string, error) {
	replica.mu.RLock()
	defer replica.mu.RUnlock()

	seg, ok := replica.segments[segID]
	if !ok {
		return "", fmt.Errorf("Cannot find segment, id = %v", segID)
	}

	return seg.channelName, nil
}

117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
// bufferAutoFlushBinlogPaths buffers binlog paths generated by auto-flush
func (replica *CollectionSegmentReplica) bufferAutoFlushBinlogPaths(segID UniqueID, field2Path map[UniqueID]string) error {
	replica.mu.RLock()
	defer replica.mu.RUnlock()

	seg, ok := replica.segments[segID]
	if !ok {
		return fmt.Errorf("Cannot find segment, id = %v", segID)
	}

	for fieldID, path := range field2Path {
		buffpaths, ok := seg.field2Paths[fieldID]

		if !ok {
			buffpaths = make([]string, 0)
		}

		buffpaths = append(buffpaths, path)
		seg.field2Paths[fieldID] = buffpaths
	}
	log.Info("Buffer auto flush binlog paths", zap.Int64("segment ID", segID))

	return nil
}

func (replica *CollectionSegmentReplica) getBufferPaths(segID UniqueID) (map[UniqueID][]string, error) {
	replica.mu.RLock()
	defer replica.mu.RUnlock()

	if seg, ok := replica.segments[segID]; ok {
		return seg.field2Paths, nil
	}

	return nil, fmt.Errorf("Cannot find segment, id = %v", segID)

}

154
func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) {
155 156
	replica.mu.RLock()
	defer replica.mu.RUnlock()
X
XuanYang-cn 已提交
157

X
XuanYang-cn 已提交
158 159
	if seg, ok := replica.segments[segmentID]; ok {
		return seg, nil
X
XuanYang-cn 已提交
160
	}
S
sunby 已提交
161
	return nil, fmt.Errorf("Cannot find segment, id = %v", segmentID)
X
XuanYang-cn 已提交
162

X
XuanYang-cn 已提交
163 164
}

165
// `addSegment` add a new segment into replica when data node see the segment
166
func (replica *CollectionSegmentReplica) addSegment(
X
XuanYang-cn 已提交
167 168 169 170
	segmentID UniqueID,
	collID UniqueID,
	partitionID UniqueID,
	channelName string) error {
X
XuanYang-cn 已提交
171

172 173
	replica.mu.Lock()
	defer replica.mu.Unlock()
X
XuanYang-cn 已提交
174
	log.Debug("Add Segment", zap.Int64("Segment ID", segmentID))
X
XuanYang-cn 已提交
175

176
	seg := &Segment{
177 178 179 180 181
		segmentID:    segmentID,
		collectionID: collID,
		partitionID:  partitionID,
		channelName:  channelName,
		field2Paths:  make(map[UniqueID][]string),
182
	}
X
XuanYang-cn 已提交
183 184 185 186

	seg.isNew.Store(true)

	replica.segments[segmentID] = seg
187 188 189
	return nil
}

190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
func (replica *CollectionSegmentReplica) getAllStartPositions() []*datapb.SegmentStartPosition {
	replica.mu.RLock()
	defer replica.mu.RUnlock()

	result := make([]*datapb.SegmentStartPosition, 0, len(replica.segments))
	for id, seg := range replica.segments {

		if seg.isNew.Load().(bool) {

			pos, ok := replica.startPositions[id]
			if !ok {
				log.Warn("Segment has no start positions")
				continue
			}

			result = append(result, &datapb.SegmentStartPosition{
				SegmentID:     id,
				StartPosition: pos[0],
			})
			seg.isNew.Store(false)
		}

	}
	return result
}

216
func (replica *CollectionSegmentReplica) removeSegment(segmentID UniqueID) error {
217
	replica.mu.Lock()
X
XuanYang-cn 已提交
218
	delete(replica.segments, segmentID)
219 220 221 222 223 224
	replica.mu.Unlock()

	replica.posMu.Lock()
	delete(replica.startPositions, segmentID)
	delete(replica.endPositions, segmentID)
	replica.posMu.Unlock()
X
XuanYang-cn 已提交
225 226

	return nil
227 228
}

229
func (replica *CollectionSegmentReplica) hasSegment(segmentID UniqueID) bool {
230 231
	replica.mu.RLock()
	defer replica.mu.RUnlock()
232

X
XuanYang-cn 已提交
233 234 235 236
	_, ok := replica.segments[segmentID]
	return ok
}

237
// `updateStatistics` updates the number of rows of a segment in replica.
238
func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, numRows int64) error {
239 240
	replica.mu.Lock()
	defer replica.mu.Unlock()
241

X
XuanYang-cn 已提交
242 243 244 245 246
	if seg, ok := replica.segments[segmentID]; ok {
		log.Debug("updating segment", zap.Int64("Segment ID", segmentID), zap.Int64("numRows", numRows))
		seg.memorySize = 0
		seg.numRows += numRows
		return nil
247
	}
X
XuanYang-cn 已提交
248 249

	return fmt.Errorf("There's no segment %v", segmentID)
250 251
}

252
// `getSegmentStatisticsUpdates` gives current segment's statistics updates.
G
godchen 已提交
253
func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) {
254 255
	replica.mu.Lock()
	defer replica.mu.Unlock()
256

X
XuanYang-cn 已提交
257 258 259 260 261
	if seg, ok := replica.segments[segmentID]; ok {
		updates := &internalpb.SegmentStatisticsUpdates{
			SegmentID:  segmentID,
			MemorySize: seg.memorySize,
			NumRows:    seg.numRows,
262
		}
X
XuanYang-cn 已提交
263 264

		return updates, nil
265
	}
S
sunby 已提交
266
	return nil, fmt.Errorf("Error, there's no segment %v", segmentID)
X
XuanYang-cn 已提交
267 268
}

269
// --- collection ---
S
sunby 已提交
270 271 272
// getCollectionByID will get collection schema from masterservice if not exist.
// If you want the latest collection schema, ts should be 0
func (replica *CollectionSegmentReplica) getCollectionByID(collectionID UniqueID, ts Timestamp) (*Collection, error) {
273 274
	replica.mu.Lock()
	defer replica.mu.Unlock()
X
XuanYang-cn 已提交
275

S
sunby 已提交
276 277
	if collectionID != replica.collection.GetID() {
		return nil, fmt.Errorf("Not supported collection %v", collectionID)
X
XuanYang-cn 已提交
278
	}
X
XuanYang-cn 已提交
279

S
sunby 已提交
280 281 282 283 284 285
	if replica.collection.GetSchema() == nil {
		sch, err := replica.metaService.getCollectionSchema(context.Background(), collectionID, ts)
		if err != nil {
			return nil, err
		}
		replica.collection.schema = sch
X
XuanYang-cn 已提交
286
	}
S
sunby 已提交
287

S
sunby 已提交
288
	return replica.collection, nil
X
XuanYang-cn 已提交
289 290
}

S
sunby 已提交
291
func (replica *CollectionSegmentReplica) hasCollection(collectionID UniqueID) bool {
292 293
	replica.mu.RLock()
	defer replica.mu.RUnlock()
X
XuanYang-cn 已提交
294

S
sunby 已提交
295 296 297 298
	if replica.collection != nil &&
		collectionID == replica.collection.GetID() &&
		replica.collection.schema != nil {
		return true
X
XuanYang-cn 已提交
299 300
	}

S
sunby 已提交
301
	return false
X
XuanYang-cn 已提交
302
}
303

304 305 306 307 308 309 310 311 312
// getSegmentsCheckpoints get current open segments checkpoints
func (replica *CollectionSegmentReplica) getSegmentsCheckpoints() {
	replica.mu.RLock()
	//for segID, segment := range replica.segments {
	//	if segment
	//}
	replica.mu.RUnlock()
}

313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
// setStartPositions set segment `Start Position` - means the `startPositions` from the MsgPack when segment is first found
func (replica *CollectionSegmentReplica) setStartPositions(segID UniqueID, startPositions []*internalpb.MsgPosition) error {
	replica.posMu.Lock()
	defer replica.posMu.Unlock()
	replica.startPositions[segID] = startPositions
	return nil
}

// setEndPositions set segment `End Position` - means the `endPositions` from the MsgPack when segment need to be flushed
func (replica *CollectionSegmentReplica) setEndPositions(segID UniqueID, endPositions []*internalpb.MsgPosition) error {
	replica.posMu.Lock()
	defer replica.posMu.Unlock()
	replica.endPositions[segID] = endPositions
	return nil
}

// getSegmentPositions returns stored segment start-end Positions
// To te Noted: start/end positions are NOT start&end position from one single MsgPack, they are from different MsgPack!
// see setStartPositions, setEndPositions comment
func (replica *CollectionSegmentReplica) getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) {
	replica.posMu.Lock()
	defer replica.posMu.Unlock()
	startPos := replica.startPositions[segID]
	endPos := replica.endPositions[segID]
	return startPos, endPos
}
N
neza2017 已提交
339 340

func (replica *CollectionSegmentReplica) listOpenSegmentCheckPointAndNumRows(segs []UniqueID) (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) {
N
neza2017 已提交
341 342
	replica.posMu.Lock()
	defer replica.posMu.Unlock()
343 344
	r1 := make(map[UniqueID]internalpb.MsgPosition)
	r2 := make(map[UniqueID]int64)
N
neza2017 已提交
345 346 347
	for _, seg := range segs {
		r1[seg] = *replica.endPositions[seg][0]
		r2[seg] = replica.segments[seg].numRows
348 349
	}
	return r1, r2
N
neza2017 已提交
350
}