collection_replica.go 16.8 KB
Newer Older
1
package querynode
2 3 4 5 6 7 8

/*

#cgo CFLAGS: -I${SRCDIR}/../core/output/include

#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib

F
FluorineDog 已提交
9 10
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
11 12 13 14

*/
import "C"
import (
C
cai.zhang 已提交
15
	"strconv"
G
godchen 已提交
16
	"sync"
C
cai.zhang 已提交
17

18
	"github.com/zilliztech/milvus-distributed/internal/errors"
19
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
20
	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
21 22
)

D
dragondriver 已提交
23 24 25 26 27 28 29 30 31
/*
 * collectionReplica contains a in-memory local copy of persistent collections.
 * In common cases, the system has multiple query nodes. Data of a collection will be
 * distributed across all the available query nodes, and each query node's collectionReplica
 * will maintain its own share (only part of the collection).
 * Every replica tracks a value called tSafe which is the maximum timestamp that the replica
 * is up-to-date.
 */
type collectionReplica interface {
G
godchen 已提交
32
	// collection
33
	getCollectionIDs() []UniqueID
34
	addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error
G
godchen 已提交
35 36
	removeCollection(collectionID UniqueID) error
	getCollectionByID(collectionID UniqueID) (*Collection, error)
F
FluorineDog 已提交
37
	hasCollection(collectionID UniqueID) bool
B
bigsheeper 已提交
38 39 40
	getCollectionNum() int
	getPartitionIDs(collectionID UniqueID) ([]UniqueID, error)

41 42
	getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error)
	getFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error)
G
godchen 已提交
43 44

	// partition
C
cai.zhang 已提交
45
	addPartition(collectionID UniqueID, partitionID UniqueID) error
B
bigsheeper 已提交
46 47 48 49 50 51
	removePartition(partitionID UniqueID) error
	getPartitionByID(partitionID UniqueID) (*Partition, error)
	hasPartition(partitionID UniqueID) bool
	getPartitionNum() int
	getSegmentIDs(partitionID UniqueID) ([]UniqueID, error)

52 53
	enablePartition(partitionID UniqueID) error
	disablePartition(partitionID UniqueID) error
G
godchen 已提交
54 55

	// segment
Z
zhenshan.cao 已提交
56
	addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error
G
godchen 已提交
57 58 59
	removeSegment(segmentID UniqueID) error
	getSegmentByID(segmentID UniqueID) (*Segment, error)
	hasSegment(segmentID UniqueID) bool
B
bigsheeper 已提交
60 61 62
	getSegmentNum() int

	getSegmentStatistics() []*internalpb2.SegmentStats
63
	getEnabledSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID)
B
bigsheeper 已提交
64
	getSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID)
65
	replaceGrowingSegmentBySealedSegment(segment *Segment) error
B
bigsheeper 已提交
66

B
bigsheeper 已提交
67
	getTSafe() tSafe
B
bigsheeper 已提交
68
	freeAll()
G
godchen 已提交
69 70
}

D
dragondriver 已提交
71
type collectionReplicaImpl struct {
Q
quicksilver 已提交
72 73
	tSafe tSafe

B
bigsheeper 已提交
74 75 76
	mu          sync.RWMutex // guards all
	collections map[UniqueID]*Collection
	partitions  map[UniqueID]*Partition
77 78 79 80
	segments    map[UniqueID]*Segment
}

//----------------------------------------------------------------------------------------------------- collection
81 82 83 84 85 86 87 88 89 90
func (colReplica *collectionReplicaImpl) getCollectionIDs() []UniqueID {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
	collectionIDs := make([]UniqueID, 0)
	for id := range colReplica.collections {
		collectionIDs = append(collectionIDs, id)
	}
	return collectionIDs
}

91
func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
D
dragondriver 已提交
92 93
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
G
godchen 已提交
94

B
bigsheeper 已提交
95 96 97 98
	if ok := colReplica.hasCollectionPrivate(collectionID); ok {
		return errors.New("collection has been existed, id = " + strconv.FormatInt(collectionID, 10))
	}

99
	var newCollection = newCollection(collectionID, schema)
B
bigsheeper 已提交
100
	colReplica.collections[collectionID] = newCollection
101

G
godchen 已提交
102
	return nil
103 104
}

D
dragondriver 已提交
105 106 107
func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error {
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
B
bigsheeper 已提交
108 109
	return colReplica.removeCollectionPrivate(collectionID)
}
G
godchen 已提交
110

B
bigsheeper 已提交
111
func (colReplica *collectionReplicaImpl) removeCollectionPrivate(collectionID UniqueID) error {
Q
quicksilver 已提交
112
	collection, err := colReplica.getCollectionByIDPrivate(collectionID)
G
godchen 已提交
113 114
	if err != nil {
		return err
115 116 117
	}

	deleteCollection(collection)
B
bigsheeper 已提交
118
	delete(colReplica.collections, collectionID)
119

B
bigsheeper 已提交
120 121 122 123
	// delete partitions
	for _, partitionID := range collection.partitionIDs {
		// ignore error, try to delete
		_ = colReplica.removePartitionPrivate(partitionID)
124 125 126 127 128
	}

	return nil
}

D
dragondriver 已提交
129 130 131
func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
Q
quicksilver 已提交
132 133 134 135
	return colReplica.getCollectionByIDPrivate(collectionID)
}

func (colReplica *collectionReplicaImpl) getCollectionByIDPrivate(collectionID UniqueID) (*Collection, error) {
B
bigsheeper 已提交
136 137 138
	collection, ok := colReplica.collections[collectionID]
	if !ok {
		return nil, errors.New("cannot find collection, id = " + strconv.FormatInt(collectionID, 10))
139 140
	}

B
bigsheeper 已提交
141
	return collection, nil
142 143
}

F
FluorineDog 已提交
144 145 146
func (colReplica *collectionReplicaImpl) hasCollection(collectionID UniqueID) bool {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
B
bigsheeper 已提交
147 148
	return colReplica.hasCollectionPrivate(collectionID)
}
F
FluorineDog 已提交
149

B
bigsheeper 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
func (colReplica *collectionReplicaImpl) hasCollectionPrivate(collectionID UniqueID) bool {
	_, ok := colReplica.collections[collectionID]
	return ok
}

func (colReplica *collectionReplicaImpl) getCollectionNum() int {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
	return len(colReplica.collections)
}

func (colReplica *collectionReplicaImpl) getPartitionIDs(collectionID UniqueID) ([]UniqueID, error) {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()

	collection, err := colReplica.getCollectionByIDPrivate(collectionID)
	if err != nil {
		return nil, err
F
FluorineDog 已提交
168
	}
B
bigsheeper 已提交
169 170

	return collection.partitionIDs, nil
F
FluorineDog 已提交
171 172
}

173
func (colReplica *collectionReplicaImpl) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) {
174 175 176
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()

177
	fields, err := colReplica.getFieldsByCollectionIDPrivate(collectionID)
178 179 180 181
	if err != nil {
		return nil, err
	}

B
bigsheeper 已提交
182
	vecFields := make([]int64, 0)
183
	for _, field := range fields {
184
		if field.DataType == schemapb.DataType_VECTOR_BINARY || field.DataType == schemapb.DataType_VECTOR_FLOAT {
B
bigsheeper 已提交
185
			vecFields = append(vecFields, field.FieldID)
186 187 188 189
		}
	}

	if len(vecFields) <= 0 {
190
		return nil, errors.New("no vector field in collection " + strconv.FormatInt(collectionID, 10))
191 192 193 194 195
	}

	return vecFields, nil
}

196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
func (colReplica *collectionReplicaImpl) getFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()

	fields, err := colReplica.getFieldsByCollectionIDPrivate(collectionID)
	if err != nil {
		return nil, err
	}

	targetFields := make([]int64, 0)
	for _, field := range fields {
		targetFields = append(targetFields, field.FieldID)
	}

	return targetFields, nil
}

func (colReplica *collectionReplicaImpl) getFieldsByCollectionIDPrivate(collectionID UniqueID) ([]*schemapb.FieldSchema, error) {
	collection, err := colReplica.getCollectionByIDPrivate(collectionID)
	if err != nil {
		return nil, err
	}

	if len(collection.Schema().Fields) <= 0 {
		return nil, errors.New("no field in collection " + strconv.FormatInt(collectionID, 10))
	}

	return collection.Schema().Fields, nil
}

226
//----------------------------------------------------------------------------------------------------- partition
C
cai.zhang 已提交
227 228 229
func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionID UniqueID) error {
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
B
bigsheeper 已提交
230 231
	return colReplica.addPartitionPrivate(collectionID, partitionID)
}
C
cai.zhang 已提交
232

B
bigsheeper 已提交
233
func (colReplica *collectionReplicaImpl) addPartitionPrivate(collectionID UniqueID, partitionID UniqueID) error {
C
cai.zhang 已提交
234 235 236 237 238
	collection, err := colReplica.getCollectionByIDPrivate(collectionID)
	if err != nil {
		return err
	}

B
bigsheeper 已提交
239 240 241
	collection.addPartitionID(partitionID)
	var newPartition = newPartition(collectionID, partitionID)
	colReplica.partitions[partitionID] = newPartition
G
godchen 已提交
242
	return nil
243 244
}

B
bigsheeper 已提交
245
func (colReplica *collectionReplicaImpl) removePartition(partitionID UniqueID) error {
Q
quicksilver 已提交
246 247
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
B
bigsheeper 已提交
248
	return colReplica.removePartitionPrivate(partitionID)
Q
quicksilver 已提交
249 250
}

B
bigsheeper 已提交
251 252
func (colReplica *collectionReplicaImpl) removePartitionPrivate(partitionID UniqueID) error {
	partition, err := colReplica.getPartitionByIDPrivate(partitionID)
G
godchen 已提交
253 254
	if err != nil {
		return err
255 256
	}

B
bigsheeper 已提交
257
	collection, err := colReplica.getCollectionByIDPrivate(partition.collectionID)
F
FluorineDog 已提交
258 259 260 261
	if err != nil {
		return err
	}

B
bigsheeper 已提交
262 263
	collection.removePartitionID(partitionID)
	delete(colReplica.partitions, partitionID)
F
FluorineDog 已提交
264

B
bigsheeper 已提交
265 266 267 268
	// delete segments
	for _, segmentID := range partition.segmentIDs {
		// try to delete, ignore error
		_ = colReplica.removeSegmentPrivate(segmentID)
F
FluorineDog 已提交
269 270 271 272
	}
	return nil
}

B
bigsheeper 已提交
273
func (colReplica *collectionReplicaImpl) getPartitionByID(partitionID UniqueID) (*Partition, error) {
Z
zhenshan.cao 已提交
274 275
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
B
bigsheeper 已提交
276
	return colReplica.getPartitionByIDPrivate(partitionID)
Z
zhenshan.cao 已提交
277 278
}

B
bigsheeper 已提交
279 280 281 282
func (colReplica *collectionReplicaImpl) getPartitionByIDPrivate(partitionID UniqueID) (*Partition, error) {
	partition, ok := colReplica.partitions[partitionID]
	if !ok {
		return nil, errors.New("cannot find partition, id = " + strconv.FormatInt(partitionID, 10))
Z
zhenshan.cao 已提交
283 284
	}

B
bigsheeper 已提交
285 286 287 288 289 290 291 292
	return partition, nil
}

func (colReplica *collectionReplicaImpl) hasPartition(partitionID UniqueID) bool {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
	return colReplica.hasPartitionPrivate(partitionID)
}
Z
zhenshan.cao 已提交
293

B
bigsheeper 已提交
294 295 296
func (colReplica *collectionReplicaImpl) hasPartitionPrivate(partitionID UniqueID) bool {
	_, ok := colReplica.partitions[partitionID]
	return ok
Z
zhenshan.cao 已提交
297 298
}

B
bigsheeper 已提交
299
func (colReplica *collectionReplicaImpl) getPartitionNum() int {
Q
quicksilver 已提交
300 301
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
B
bigsheeper 已提交
302 303
	return len(colReplica.partitions)
}
Q
quicksilver 已提交
304

B
bigsheeper 已提交
305 306 307
func (colReplica *collectionReplicaImpl) getSegmentIDs(partitionID UniqueID) ([]UniqueID, error) {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
308 309
	return colReplica.getSegmentIDsPrivate(partitionID)
}
F
FluorineDog 已提交
310

311
func (colReplica *collectionReplicaImpl) getSegmentIDsPrivate(partitionID UniqueID) ([]UniqueID, error) {
B
bigsheeper 已提交
312 313 314
	partition, err2 := colReplica.getPartitionByIDPrivate(partitionID)
	if err2 != nil {
		return nil, err2
F
FluorineDog 已提交
315
	}
B
bigsheeper 已提交
316
	return partition.segmentIDs, nil
F
FluorineDog 已提交
317 318
}

319
func (colReplica *collectionReplicaImpl) enablePartition(partitionID UniqueID) error {
C
cai.zhang 已提交
320 321 322
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()

B
bigsheeper 已提交
323
	partition, err := colReplica.getPartitionByIDPrivate(partitionID)
C
cai.zhang 已提交
324 325 326 327
	if err != nil {
		return err
	}

328
	partition.enable = true
C
cai.zhang 已提交
329 330 331
	return nil
}

332
func (colReplica *collectionReplicaImpl) disablePartition(partitionID UniqueID) error {
C
cai.zhang 已提交
333 334 335
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()

B
bigsheeper 已提交
336
	partition, err := colReplica.getPartitionByIDPrivate(partitionID)
C
cai.zhang 已提交
337 338 339 340
	if err != nil {
		return err
	}

341
	partition.enable = false
C
cai.zhang 已提交
342 343 344
	return nil
}

345 346 347 348 349 350
func (colReplica *collectionReplicaImpl) getEnabledPartitionIDsPrivate() []UniqueID {
	partitionIDs := make([]UniqueID, 0)
	for _, partition := range colReplica.partitions {
		if partition.enable {
			partitionIDs = append(partitionIDs, partition.partitionID)
		}
C
cai.zhang 已提交
351
	}
352
	return partitionIDs
C
cai.zhang 已提交
353 354
}

355
//----------------------------------------------------------------------------------------------------- segment
Z
zhenshan.cao 已提交
356 357 358
func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error {
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
B
bigsheeper 已提交
359 360
	return colReplica.addSegmentPrivate(segmentID, partitionID, collectionID, segType)
}
Z
zhenshan.cao 已提交
361

B
bigsheeper 已提交
362
func (colReplica *collectionReplicaImpl) addSegmentPrivate(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error {
Z
zhenshan.cao 已提交
363 364 365 366 367
	collection, err := colReplica.getCollectionByIDPrivate(collectionID)
	if err != nil {
		return err
	}

B
bigsheeper 已提交
368 369 370
	partition, err := colReplica.getPartitionByIDPrivate(partitionID)
	if err != nil {
		return err
Z
zhenshan.cao 已提交
371 372
	}

B
bigsheeper 已提交
373
	partition.addSegmentID(segmentID)
Z
zhenshan.cao 已提交
374
	var newSegment = newSegment(collection, segmentID, partitionID, collectionID, segType)
D
dragondriver 已提交
375
	colReplica.segments[segmentID] = newSegment
G
godchen 已提交
376 377

	return nil
Z
zhenshan.cao 已提交
378
}
379

D
dragondriver 已提交
380 381 382
func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error {
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
383 384 385 386
	return colReplica.removeSegmentPrivate(segmentID)
}

func (colReplica *collectionReplicaImpl) removeSegmentPrivate(segmentID UniqueID) error {
B
bigsheeper 已提交
387 388 389
	segment, err := colReplica.getSegmentByIDPrivate(segmentID)
	if err != nil {
		return err
390 391
	}

B
bigsheeper 已提交
392 393 394
	partition, err := colReplica.getPartitionByIDPrivate(segment.partitionID)
	if err != nil {
		return err
395
	}
C
cai.zhang 已提交
396

B
bigsheeper 已提交
397 398 399 400
	partition.removeSegmentID(segmentID)
	delete(colReplica.segments, segmentID)
	deleteSegment(segment)

G
godchen 已提交
401
	return nil
402 403
}

D
dragondriver 已提交
404 405 406
func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
407 408 409 410
	return colReplica.getSegmentByIDPrivate(segmentID)
}

func (colReplica *collectionReplicaImpl) getSegmentByIDPrivate(segmentID UniqueID) (*Segment, error) {
B
bigsheeper 已提交
411
	segment, ok := colReplica.segments[segmentID]
412
	if !ok {
B
bigsheeper 已提交
413
		return nil, errors.New("cannot find segment, id = " + strconv.FormatInt(segmentID, 10))
414 415
	}

B
bigsheeper 已提交
416
	return segment, nil
417 418
}

D
dragondriver 已提交
419 420 421
func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
B
bigsheeper 已提交
422 423
	return colReplica.hasSegmentPrivate(segmentID)
}
G
godchen 已提交
424

B
bigsheeper 已提交
425
func (colReplica *collectionReplicaImpl) hasSegmentPrivate(segmentID UniqueID) bool {
D
dragondriver 已提交
426
	_, ok := colReplica.segments[segmentID]
427 428
	return ok
}
B
bigsheeper 已提交
429

B
bigsheeper 已提交
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
func (colReplica *collectionReplicaImpl) getSegmentNum() int {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
	return len(colReplica.segments)
}

func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.SegmentStats {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()

	var statisticData = make([]*internalpb2.SegmentStats, 0)

	for segmentID, segment := range colReplica.segments {
		currentMemSize := segment.getMemSize()
		segment.lastMemSize = currentMemSize
		segmentNumOfRows := segment.getRowCount()

		stat := internalpb2.SegmentStats{
			SegmentID:        segmentID,
			MemorySize:       currentMemSize,
			NumRows:          segmentNumOfRows,
			RecentlyModified: segment.getRecentlyModified(),
		}

		statisticData = append(statisticData, &stat)
		segment.setRecentlyModified(false)
	}

	return statisticData
}

461
func (colReplica *collectionReplicaImpl) getEnabledSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
462 463 464
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()

465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480
	targetCollectionIDs := make([]UniqueID, 0)
	targetPartitionIDs := make([]UniqueID, 0)
	targetSegmentIDs := make([]UniqueID, 0)

	for _, partitionID := range colReplica.getEnabledPartitionIDsPrivate() {
		segmentIDs, err := colReplica.getSegmentIDsPrivate(partitionID)
		if err != nil {
			continue
		}
		for _, segmentID := range segmentIDs {
			segment, err := colReplica.getSegmentByIDPrivate(segmentID)
			if err != nil {
				continue
			}
			if segment.getType() == segType {
				targetCollectionIDs = append(targetCollectionIDs, segment.collectionID)
B
bigsheeper 已提交
481
				targetPartitionIDs = append(targetPartitionIDs, segment.partitionID)
482 483
				targetSegmentIDs = append(targetSegmentIDs, segment.segmentID)
			}
484 485 486
		}
	}

487
	return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs
488 489
}

B
bigsheeper 已提交
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
func (colReplica *collectionReplicaImpl) getSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()

	targetCollectionIDs := make([]UniqueID, 0)
	targetPartitionIDs := make([]UniqueID, 0)
	targetSegmentIDs := make([]UniqueID, 0)

	for _, segment := range colReplica.segments {
		if segment.getType() == segType {
			targetCollectionIDs = append(targetCollectionIDs, segment.collectionID)
			targetPartitionIDs = append(targetPartitionIDs, segment.partitionID)
			targetSegmentIDs = append(targetSegmentIDs, segment.segmentID)
		}
	}

	return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs
}

509 510 511
func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(segment *Segment) error {
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
B
bigsheeper 已提交
512 513 514 515
	if segment.segmentType != segTypeSealed && segment.segmentType != segTypeIndexing {
		return errors.New("unexpected segment type")
	}
	targetSegment, err := colReplica.getSegmentByIDPrivate(segment.ID())
Z
zhenshan.cao 已提交
516
	if err != nil && targetSegment != nil {
517
		if targetSegment.segmentType != segTypeGrowing {
B
bigsheeper 已提交
518
			// target segment has been a sealed segment
519 520 521 522
			return nil
		}
		deleteSegment(targetSegment)
	}
B
bigsheeper 已提交
523

Z
zhenshan.cao 已提交
524
	targetSegment = segment
525 526 527
	return nil
}

B
bigsheeper 已提交
528
//-----------------------------------------------------------------------------------------------------
B
bigsheeper 已提交
529 530 531 532
func (colReplica *collectionReplicaImpl) getTSafe() tSafe {
	return colReplica.tSafe
}

B
bigsheeper 已提交
533
func (colReplica *collectionReplicaImpl) freeAll() {
F
FluorineDog 已提交
534 535 536
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()

B
bigsheeper 已提交
537 538
	for id := range colReplica.collections {
		_ = colReplica.removeCollectionPrivate(id)
B
bigsheeper 已提交
539
	}
540

B
bigsheeper 已提交
541 542
	colReplica.collections = make(map[UniqueID]*Collection)
	colReplica.partitions = make(map[UniqueID]*Partition)
543
	colReplica.segments = make(map[UniqueID]*Segment)
B
bigsheeper 已提交
544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561
}

func newCollectionReplicaImpl() collectionReplica {
	collections := make(map[int64]*Collection)
	partitions := make(map[int64]*Partition)
	segments := make(map[int64]*Segment)

	tSafe := newTSafe()

	var replica collectionReplica = &collectionReplicaImpl{
		collections: collections,
		partitions:  partitions,
		segments:    segments,

		tSafe: tSafe,
	}

	return replica
B
bigsheeper 已提交
562
}