collection_replica.go 16.9 KB
Newer Older
1
package querynode
2 3 4 5 6 7 8

/*

#cgo CFLAGS: -I${SRCDIR}/../core/output/include

#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib

F
FluorineDog 已提交
9 10
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
11 12 13 14

*/
import "C"
import (
C
cai.zhang 已提交
15
	"strconv"
G
godchen 已提交
16
	"sync"
C
cai.zhang 已提交
17

18
	"github.com/zilliztech/milvus-distributed/internal/errors"
19
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
20
	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
21 22
)

D
dragondriver 已提交
23 24 25 26 27 28 29 30 31
/*
 * collectionReplica contains a in-memory local copy of persistent collections.
 * In common cases, the system has multiple query nodes. Data of a collection will be
 * distributed across all the available query nodes, and each query node's collectionReplica
 * will maintain its own share (only part of the collection).
 * Every replica tracks a value called tSafe which is the maximum timestamp that the replica
 * is up-to-date.
 */
type collectionReplica interface {
G
godchen 已提交
32
	// collection
33
	getCollectionIDs() []UniqueID
34
	addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error
G
godchen 已提交
35 36
	removeCollection(collectionID UniqueID) error
	getCollectionByID(collectionID UniqueID) (*Collection, error)
F
FluorineDog 已提交
37
	hasCollection(collectionID UniqueID) bool
B
bigsheeper 已提交
38 39 40
	getCollectionNum() int
	getPartitionIDs(collectionID UniqueID) ([]UniqueID, error)

41 42
	getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error)
	getFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error)
G
godchen 已提交
43 44

	// partition
C
cai.zhang 已提交
45
	addPartition(collectionID UniqueID, partitionID UniqueID) error
B
bigsheeper 已提交
46 47 48 49 50 51
	removePartition(partitionID UniqueID) error
	getPartitionByID(partitionID UniqueID) (*Partition, error)
	hasPartition(partitionID UniqueID) bool
	getPartitionNum() int
	getSegmentIDs(partitionID UniqueID) ([]UniqueID, error)

52 53
	enablePartition(partitionID UniqueID) error
	disablePartition(partitionID UniqueID) error
G
godchen 已提交
54 55

	// segment
Z
zhenshan.cao 已提交
56
	addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error
G
godchen 已提交
57 58 59
	removeSegment(segmentID UniqueID) error
	getSegmentByID(segmentID UniqueID) (*Segment, error)
	hasSegment(segmentID UniqueID) bool
B
bigsheeper 已提交
60 61 62
	getSegmentNum() int

	getSegmentStatistics() []*internalpb2.SegmentStats
B
bigsheeper 已提交
63 64
	getEnabledSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID)
	getSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID)
65
	replaceGrowingSegmentBySealedSegment(segment *Segment) error
B
bigsheeper 已提交
66

B
bigsheeper 已提交
67
	getTSafe() tSafe
B
bigsheeper 已提交
68
	freeAll()
G
godchen 已提交
69 70
}

D
dragondriver 已提交
71
type collectionReplicaImpl struct {
Q
quicksilver 已提交
72 73
	tSafe tSafe

B
bigsheeper 已提交
74 75 76
	mu          sync.RWMutex // guards all
	collections map[UniqueID]*Collection
	partitions  map[UniqueID]*Partition
77 78 79 80
	segments    map[UniqueID]*Segment
}

//----------------------------------------------------------------------------------------------------- collection
81 82 83 84 85 86 87 88 89 90
func (colReplica *collectionReplicaImpl) getCollectionIDs() []UniqueID {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
	collectionIDs := make([]UniqueID, 0)
	for id := range colReplica.collections {
		collectionIDs = append(collectionIDs, id)
	}
	return collectionIDs
}

91
func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
D
dragondriver 已提交
92 93
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
G
godchen 已提交
94

B
bigsheeper 已提交
95 96 97 98
	if ok := colReplica.hasCollectionPrivate(collectionID); ok {
		return errors.New("collection has been existed, id = " + strconv.FormatInt(collectionID, 10))
	}

99
	var newCollection = newCollection(collectionID, schema)
B
bigsheeper 已提交
100
	colReplica.collections[collectionID] = newCollection
101

G
godchen 已提交
102
	return nil
103 104
}

D
dragondriver 已提交
105 106 107
func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error {
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
B
bigsheeper 已提交
108 109
	return colReplica.removeCollectionPrivate(collectionID)
}
G
godchen 已提交
110

B
bigsheeper 已提交
111
func (colReplica *collectionReplicaImpl) removeCollectionPrivate(collectionID UniqueID) error {
Q
quicksilver 已提交
112
	collection, err := colReplica.getCollectionByIDPrivate(collectionID)
G
godchen 已提交
113 114
	if err != nil {
		return err
115 116 117
	}

	deleteCollection(collection)
B
bigsheeper 已提交
118
	delete(colReplica.collections, collectionID)
119

B
bigsheeper 已提交
120 121 122 123
	// delete partitions
	for _, partitionID := range collection.partitionIDs {
		// ignore error, try to delete
		_ = colReplica.removePartitionPrivate(partitionID)
124 125 126 127 128
	}

	return nil
}

D
dragondriver 已提交
129 130 131
func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
Q
quicksilver 已提交
132 133 134 135
	return colReplica.getCollectionByIDPrivate(collectionID)
}

func (colReplica *collectionReplicaImpl) getCollectionByIDPrivate(collectionID UniqueID) (*Collection, error) {
B
bigsheeper 已提交
136 137 138
	collection, ok := colReplica.collections[collectionID]
	if !ok {
		return nil, errors.New("cannot find collection, id = " + strconv.FormatInt(collectionID, 10))
139 140
	}

B
bigsheeper 已提交
141
	return collection, nil
142 143
}

F
FluorineDog 已提交
144 145 146
func (colReplica *collectionReplicaImpl) hasCollection(collectionID UniqueID) bool {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
B
bigsheeper 已提交
147 148
	return colReplica.hasCollectionPrivate(collectionID)
}
F
FluorineDog 已提交
149

B
bigsheeper 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
func (colReplica *collectionReplicaImpl) hasCollectionPrivate(collectionID UniqueID) bool {
	_, ok := colReplica.collections[collectionID]
	return ok
}

func (colReplica *collectionReplicaImpl) getCollectionNum() int {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
	return len(colReplica.collections)
}

func (colReplica *collectionReplicaImpl) getPartitionIDs(collectionID UniqueID) ([]UniqueID, error) {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()

	collection, err := colReplica.getCollectionByIDPrivate(collectionID)
	if err != nil {
		return nil, err
F
FluorineDog 已提交
168
	}
B
bigsheeper 已提交
169 170

	return collection.partitionIDs, nil
F
FluorineDog 已提交
171 172
}

173
func (colReplica *collectionReplicaImpl) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) {
174 175 176
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()

177
	fields, err := colReplica.getFieldsByCollectionIDPrivate(collectionID)
178 179 180 181
	if err != nil {
		return nil, err
	}

B
bigsheeper 已提交
182
	vecFields := make([]int64, 0)
183
	for _, field := range fields {
184
		if field.DataType == schemapb.DataType_VECTOR_BINARY || field.DataType == schemapb.DataType_VECTOR_FLOAT {
B
bigsheeper 已提交
185
			vecFields = append(vecFields, field.FieldID)
186 187 188 189
		}
	}

	if len(vecFields) <= 0 {
190
		return nil, errors.New("no vector field in collection " + strconv.FormatInt(collectionID, 10))
191 192 193 194 195
	}

	return vecFields, nil
}

196 197 198 199 200 201 202 203 204 205 206 207 208 209
func (colReplica *collectionReplicaImpl) getFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()

	fields, err := colReplica.getFieldsByCollectionIDPrivate(collectionID)
	if err != nil {
		return nil, err
	}

	targetFields := make([]int64, 0)
	for _, field := range fields {
		targetFields = append(targetFields, field.FieldID)
	}

B
bigsheeper 已提交
210 211 212
	// add row id field
	targetFields = append(targetFields, rowIDFieldID)

213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
	return targetFields, nil
}

func (colReplica *collectionReplicaImpl) getFieldsByCollectionIDPrivate(collectionID UniqueID) ([]*schemapb.FieldSchema, error) {
	collection, err := colReplica.getCollectionByIDPrivate(collectionID)
	if err != nil {
		return nil, err
	}

	if len(collection.Schema().Fields) <= 0 {
		return nil, errors.New("no field in collection " + strconv.FormatInt(collectionID, 10))
	}

	return collection.Schema().Fields, nil
}

229
//----------------------------------------------------------------------------------------------------- partition
C
cai.zhang 已提交
230 231 232
func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionID UniqueID) error {
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
B
bigsheeper 已提交
233 234
	return colReplica.addPartitionPrivate(collectionID, partitionID)
}
C
cai.zhang 已提交
235

B
bigsheeper 已提交
236
func (colReplica *collectionReplicaImpl) addPartitionPrivate(collectionID UniqueID, partitionID UniqueID) error {
C
cai.zhang 已提交
237 238 239 240 241
	collection, err := colReplica.getCollectionByIDPrivate(collectionID)
	if err != nil {
		return err
	}

B
bigsheeper 已提交
242 243 244
	collection.addPartitionID(partitionID)
	var newPartition = newPartition(collectionID, partitionID)
	colReplica.partitions[partitionID] = newPartition
G
godchen 已提交
245
	return nil
246 247
}

B
bigsheeper 已提交
248
func (colReplica *collectionReplicaImpl) removePartition(partitionID UniqueID) error {
Q
quicksilver 已提交
249 250
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
B
bigsheeper 已提交
251
	return colReplica.removePartitionPrivate(partitionID)
Q
quicksilver 已提交
252 253
}

B
bigsheeper 已提交
254 255
func (colReplica *collectionReplicaImpl) removePartitionPrivate(partitionID UniqueID) error {
	partition, err := colReplica.getPartitionByIDPrivate(partitionID)
G
godchen 已提交
256 257
	if err != nil {
		return err
258 259
	}

B
bigsheeper 已提交
260
	collection, err := colReplica.getCollectionByIDPrivate(partition.collectionID)
F
FluorineDog 已提交
261 262 263 264
	if err != nil {
		return err
	}

B
bigsheeper 已提交
265 266
	collection.removePartitionID(partitionID)
	delete(colReplica.partitions, partitionID)
F
FluorineDog 已提交
267

B
bigsheeper 已提交
268 269 270 271
	// delete segments
	for _, segmentID := range partition.segmentIDs {
		// try to delete, ignore error
		_ = colReplica.removeSegmentPrivate(segmentID)
F
FluorineDog 已提交
272 273 274 275
	}
	return nil
}

B
bigsheeper 已提交
276
func (colReplica *collectionReplicaImpl) getPartitionByID(partitionID UniqueID) (*Partition, error) {
Z
zhenshan.cao 已提交
277 278
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
B
bigsheeper 已提交
279
	return colReplica.getPartitionByIDPrivate(partitionID)
Z
zhenshan.cao 已提交
280 281
}

B
bigsheeper 已提交
282 283 284 285
func (colReplica *collectionReplicaImpl) getPartitionByIDPrivate(partitionID UniqueID) (*Partition, error) {
	partition, ok := colReplica.partitions[partitionID]
	if !ok {
		return nil, errors.New("cannot find partition, id = " + strconv.FormatInt(partitionID, 10))
Z
zhenshan.cao 已提交
286 287
	}

B
bigsheeper 已提交
288 289 290 291 292 293 294 295
	return partition, nil
}

func (colReplica *collectionReplicaImpl) hasPartition(partitionID UniqueID) bool {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
	return colReplica.hasPartitionPrivate(partitionID)
}
Z
zhenshan.cao 已提交
296

B
bigsheeper 已提交
297 298 299
func (colReplica *collectionReplicaImpl) hasPartitionPrivate(partitionID UniqueID) bool {
	_, ok := colReplica.partitions[partitionID]
	return ok
Z
zhenshan.cao 已提交
300 301
}

B
bigsheeper 已提交
302
func (colReplica *collectionReplicaImpl) getPartitionNum() int {
Q
quicksilver 已提交
303 304
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
B
bigsheeper 已提交
305 306
	return len(colReplica.partitions)
}
Q
quicksilver 已提交
307

B
bigsheeper 已提交
308 309 310
func (colReplica *collectionReplicaImpl) getSegmentIDs(partitionID UniqueID) ([]UniqueID, error) {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
311 312
	return colReplica.getSegmentIDsPrivate(partitionID)
}
F
FluorineDog 已提交
313

314
func (colReplica *collectionReplicaImpl) getSegmentIDsPrivate(partitionID UniqueID) ([]UniqueID, error) {
B
bigsheeper 已提交
315 316 317
	partition, err2 := colReplica.getPartitionByIDPrivate(partitionID)
	if err2 != nil {
		return nil, err2
F
FluorineDog 已提交
318
	}
B
bigsheeper 已提交
319
	return partition.segmentIDs, nil
F
FluorineDog 已提交
320 321
}

322
func (colReplica *collectionReplicaImpl) enablePartition(partitionID UniqueID) error {
C
cai.zhang 已提交
323 324 325
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()

B
bigsheeper 已提交
326
	partition, err := colReplica.getPartitionByIDPrivate(partitionID)
C
cai.zhang 已提交
327 328 329 330
	if err != nil {
		return err
	}

331
	partition.enable = true
C
cai.zhang 已提交
332 333 334
	return nil
}

335
func (colReplica *collectionReplicaImpl) disablePartition(partitionID UniqueID) error {
C
cai.zhang 已提交
336 337 338
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()

B
bigsheeper 已提交
339
	partition, err := colReplica.getPartitionByIDPrivate(partitionID)
C
cai.zhang 已提交
340 341 342 343
	if err != nil {
		return err
	}

344
	partition.enable = false
C
cai.zhang 已提交
345 346 347
	return nil
}

348 349 350 351 352 353
func (colReplica *collectionReplicaImpl) getEnabledPartitionIDsPrivate() []UniqueID {
	partitionIDs := make([]UniqueID, 0)
	for _, partition := range colReplica.partitions {
		if partition.enable {
			partitionIDs = append(partitionIDs, partition.partitionID)
		}
C
cai.zhang 已提交
354
	}
355
	return partitionIDs
C
cai.zhang 已提交
356 357
}

358
//----------------------------------------------------------------------------------------------------- segment
Z
zhenshan.cao 已提交
359 360 361
func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error {
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
B
bigsheeper 已提交
362 363
	return colReplica.addSegmentPrivate(segmentID, partitionID, collectionID, segType)
}
Z
zhenshan.cao 已提交
364

B
bigsheeper 已提交
365
func (colReplica *collectionReplicaImpl) addSegmentPrivate(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error {
Z
zhenshan.cao 已提交
366 367 368 369 370
	collection, err := colReplica.getCollectionByIDPrivate(collectionID)
	if err != nil {
		return err
	}

B
bigsheeper 已提交
371 372 373
	partition, err := colReplica.getPartitionByIDPrivate(partitionID)
	if err != nil {
		return err
Z
zhenshan.cao 已提交
374 375
	}

B
bigsheeper 已提交
376
	partition.addSegmentID(segmentID)
Z
zhenshan.cao 已提交
377
	var newSegment = newSegment(collection, segmentID, partitionID, collectionID, segType)
D
dragondriver 已提交
378
	colReplica.segments[segmentID] = newSegment
G
godchen 已提交
379 380

	return nil
Z
zhenshan.cao 已提交
381
}
382

D
dragondriver 已提交
383 384 385
func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error {
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
386 387 388 389
	return colReplica.removeSegmentPrivate(segmentID)
}

func (colReplica *collectionReplicaImpl) removeSegmentPrivate(segmentID UniqueID) error {
B
bigsheeper 已提交
390 391 392
	segment, err := colReplica.getSegmentByIDPrivate(segmentID)
	if err != nil {
		return err
393 394
	}

B
bigsheeper 已提交
395 396 397
	partition, err := colReplica.getPartitionByIDPrivate(segment.partitionID)
	if err != nil {
		return err
398
	}
C
cai.zhang 已提交
399

B
bigsheeper 已提交
400 401 402 403
	partition.removeSegmentID(segmentID)
	delete(colReplica.segments, segmentID)
	deleteSegment(segment)

G
godchen 已提交
404
	return nil
405 406
}

D
dragondriver 已提交
407 408 409
func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
410 411 412 413
	return colReplica.getSegmentByIDPrivate(segmentID)
}

func (colReplica *collectionReplicaImpl) getSegmentByIDPrivate(segmentID UniqueID) (*Segment, error) {
B
bigsheeper 已提交
414
	segment, ok := colReplica.segments[segmentID]
415
	if !ok {
B
bigsheeper 已提交
416
		return nil, errors.New("cannot find segment, id = " + strconv.FormatInt(segmentID, 10))
417 418
	}

B
bigsheeper 已提交
419
	return segment, nil
420 421
}

D
dragondriver 已提交
422 423 424
func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
B
bigsheeper 已提交
425 426
	return colReplica.hasSegmentPrivate(segmentID)
}
G
godchen 已提交
427

B
bigsheeper 已提交
428
func (colReplica *collectionReplicaImpl) hasSegmentPrivate(segmentID UniqueID) bool {
D
dragondriver 已提交
429
	_, ok := colReplica.segments[segmentID]
430 431
	return ok
}
B
bigsheeper 已提交
432

B
bigsheeper 已提交
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463
func (colReplica *collectionReplicaImpl) getSegmentNum() int {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
	return len(colReplica.segments)
}

func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.SegmentStats {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()

	var statisticData = make([]*internalpb2.SegmentStats, 0)

	for segmentID, segment := range colReplica.segments {
		currentMemSize := segment.getMemSize()
		segment.lastMemSize = currentMemSize
		segmentNumOfRows := segment.getRowCount()

		stat := internalpb2.SegmentStats{
			SegmentID:        segmentID,
			MemorySize:       currentMemSize,
			NumRows:          segmentNumOfRows,
			RecentlyModified: segment.getRecentlyModified(),
		}

		statisticData = append(statisticData, &stat)
		segment.setRecentlyModified(false)
	}

	return statisticData
}

B
bigsheeper 已提交
464
func (colReplica *collectionReplicaImpl) getEnabledSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
465 466 467
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()

468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483
	targetCollectionIDs := make([]UniqueID, 0)
	targetPartitionIDs := make([]UniqueID, 0)
	targetSegmentIDs := make([]UniqueID, 0)

	for _, partitionID := range colReplica.getEnabledPartitionIDsPrivate() {
		segmentIDs, err := colReplica.getSegmentIDsPrivate(partitionID)
		if err != nil {
			continue
		}
		for _, segmentID := range segmentIDs {
			segment, err := colReplica.getSegmentByIDPrivate(segmentID)
			if err != nil {
				continue
			}
			if segment.getType() == segType {
				targetCollectionIDs = append(targetCollectionIDs, segment.collectionID)
B
bigsheeper 已提交
484
				targetPartitionIDs = append(targetPartitionIDs, segment.partitionID)
485 486
				targetSegmentIDs = append(targetSegmentIDs, segment.segmentID)
			}
487 488 489
		}
	}

490
	return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs
491 492
}

B
bigsheeper 已提交
493
func (colReplica *collectionReplicaImpl) getSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
B
bigsheeper 已提交
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()

	targetCollectionIDs := make([]UniqueID, 0)
	targetPartitionIDs := make([]UniqueID, 0)
	targetSegmentIDs := make([]UniqueID, 0)

	for _, segment := range colReplica.segments {
		if segment.getType() == segType {
			targetCollectionIDs = append(targetCollectionIDs, segment.collectionID)
			targetPartitionIDs = append(targetPartitionIDs, segment.partitionID)
			targetSegmentIDs = append(targetSegmentIDs, segment.segmentID)
		}
	}

	return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs
}

512 513 514
func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(segment *Segment) error {
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
B
bigsheeper 已提交
515 516 517 518
	if segment.segmentType != segTypeSealed && segment.segmentType != segTypeIndexing {
		return errors.New("unexpected segment type")
	}
	targetSegment, err := colReplica.getSegmentByIDPrivate(segment.ID())
B
bigsheeper 已提交
519
	if err == nil && targetSegment != nil {
520
		if targetSegment.segmentType != segTypeGrowing {
B
bigsheeper 已提交
521
			// target segment has been a sealed segment
522 523 524 525
			return nil
		}
		deleteSegment(targetSegment)
	}
B
bigsheeper 已提交
526

B
bigsheeper 已提交
527
	colReplica.segments[segment.ID()] = segment
528 529 530
	return nil
}

B
bigsheeper 已提交
531
//-----------------------------------------------------------------------------------------------------
B
bigsheeper 已提交
532 533 534 535
func (colReplica *collectionReplicaImpl) getTSafe() tSafe {
	return colReplica.tSafe
}

B
bigsheeper 已提交
536
func (colReplica *collectionReplicaImpl) freeAll() {
F
FluorineDog 已提交
537 538 539
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()

B
bigsheeper 已提交
540 541
	for id := range colReplica.collections {
		_ = colReplica.removeCollectionPrivate(id)
B
bigsheeper 已提交
542
	}
543

B
bigsheeper 已提交
544 545
	colReplica.collections = make(map[UniqueID]*Collection)
	colReplica.partitions = make(map[UniqueID]*Partition)
546
	colReplica.segments = make(map[UniqueID]*Segment)
B
bigsheeper 已提交
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564
}

func newCollectionReplicaImpl() collectionReplica {
	collections := make(map[int64]*Collection)
	partitions := make(map[int64]*Partition)
	segments := make(map[int64]*Segment)

	tSafe := newTSafe()

	var replica collectionReplica = &collectionReplicaImpl{
		collections: collections,
		partitions:  partitions,
		segments:    segments,

		tSafe: tSafe,
	}

	return replica
B
bigsheeper 已提交
565
}