meta_replica.go 27.5 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package querynode
18 19

/*
20
#cgo pkg-config: milvus_segcore
21

F
FluorineDog 已提交
22 23
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
24 25 26
*/
import "C"
import (
27
	"errors"
S
sunby 已提交
28
	"fmt"
C
cai.zhang 已提交
29
	"strconv"
G
godchen 已提交
30
	"sync"
C
cai.zhang 已提交
31

B
bigsheeper 已提交
32 33
	"go.uber.org/zap"

34
	"github.com/milvus-io/milvus/internal/common"
X
Xiangyu Wang 已提交
35
	"github.com/milvus-io/milvus/internal/log"
36
	"github.com/milvus-io/milvus/internal/metrics"
37
	"github.com/milvus-io/milvus/internal/proto/datapb"
X
Xiangyu Wang 已提交
38
	"github.com/milvus-io/milvus/internal/proto/internalpb"
39
	"github.com/milvus-io/milvus/internal/proto/querypb"
X
Xiangyu Wang 已提交
40
	"github.com/milvus-io/milvus/internal/proto/schemapb"
41 42
)

43 44 45
// ReplicaInterface specifies all the methods that the Collection object needs to implement in QueryNode.
// In common cases, the system has multiple query nodes. The full data of a collection will be distributed
// across multiple query nodes, and each query node's collectionReplica will maintain its own part.
46
type ReplicaInterface interface {
G
godchen 已提交
47
	// collection
48
	// getCollectionIDs returns all collection ids in the collectionReplica
49
	getCollectionIDs() []UniqueID
50
	// addCollection creates a new collection and add it to collectionReplica
51
	addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Collection
52
	// removeCollection removes the collection from collectionReplica
G
godchen 已提交
53
	removeCollection(collectionID UniqueID) error
54
	// getCollectionByID gets the collection which id is collectionID
G
godchen 已提交
55
	getCollectionByID(collectionID UniqueID) (*Collection, error)
56
	// hasCollection checks if collectionReplica has the collection which id is collectionID
F
FluorineDog 已提交
57
	hasCollection(collectionID UniqueID) bool
58
	// getCollectionNum returns num of collections in collectionReplica
B
bigsheeper 已提交
59
	getCollectionNum() int
60
	// getPartitionIDs returns partition ids of collection
B
bigsheeper 已提交
61
	getPartitionIDs(collectionID UniqueID) ([]UniqueID, error)
62
	// getVecFieldIDsByCollectionID returns vector field ids of collection
63
	getVecFieldIDsByCollectionID(collectionID UniqueID) ([]FieldID, error)
64
	// getPKFieldIDsByCollectionID returns vector field ids of collection
65
	getPKFieldIDByCollectionID(collectionID UniqueID) (FieldID, error)
66
	// getSegmentInfosByColID return segments info by collectionID
67
	getSegmentInfosByColID(collectionID UniqueID) []*querypb.SegmentInfo
G
godchen 已提交
68 69

	// partition
70
	// addPartition adds a new partition to collection
C
cai.zhang 已提交
71
	addPartition(collectionID UniqueID, partitionID UniqueID) error
72
	// removePartition removes the partition from collectionReplica
B
bigsheeper 已提交
73
	removePartition(partitionID UniqueID) error
74
	// getPartitionByID returns the partition which id is partitionID
B
bigsheeper 已提交
75
	getPartitionByID(partitionID UniqueID) (*Partition, error)
76
	// hasPartition returns true if collectionReplica has the partition, false otherwise
B
bigsheeper 已提交
77
	hasPartition(partitionID UniqueID) bool
78
	// getPartitionNum returns num of partitions
B
bigsheeper 已提交
79
	getPartitionNum() int
80
	// getSegmentIDs returns segment ids
81
	getSegmentIDs(partitionID UniqueID, segType segmentType) ([]UniqueID, error)
82
	// getSegmentIDsByVChannel returns segment ids which virtual channel is vChannel
83
	getSegmentIDsByVChannel(partitionID UniqueID, vChannel Channel) ([]UniqueID, error)
G
godchen 已提交
84 85

	// segment
86
	// addSegment add a new segment to collectionReplica
87
	addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, vChannelID Channel, segType segmentType) error
88
	// setSegment adds a segment to collectionReplica
X
xige-16 已提交
89
	setSegment(segment *Segment) error
90
	// removeSegment removes a segment from collectionReplica
91
	removeSegment(segmentID UniqueID, segType segmentType)
92
	// getSegmentByID returns the segment which id is segmentID
93
	getSegmentByID(segmentID UniqueID, segType segmentType) (*Segment, error)
94
	// hasSegment returns true if collectionReplica has the segment, false otherwise
95
	hasSegment(segmentID UniqueID, segType segmentType) (bool, error)
96
	// getSegmentNum returns num of segments in collectionReplica
97
	getSegmentNum(segType segmentType) int
98
	//  getSegmentStatistics returns the statistics of segments in collectionReplica
G
godchen 已提交
99
	getSegmentStatistics() []*internalpb.SegmentStats
100

Z
zhenshan.cao 已提交
101
	// excluded segments
102
	//  removeExcludedSegments will remove excludedSegments from collectionReplica
Z
zhenshan.cao 已提交
103
	removeExcludedSegments(collectionID UniqueID)
104
	// addExcludedSegments will add excludedSegments to collectionReplica
105
	addExcludedSegments(collectionID UniqueID, segmentInfos []*datapb.SegmentInfo)
106
	// getExcludedSegments returns excludedSegments of collectionReplica
107
	getExcludedSegments(collectionID UniqueID) ([]*datapb.SegmentInfo, error)
Z
zhenshan.cao 已提交
108

109
	// getSegmentsMemSize get the memory size in bytes of all the Segments
110
	getSegmentsMemSize() int64
111
	// freeAll will free all meta info from collectionReplica
B
bigsheeper 已提交
112
	freeAll()
113
	// printReplica prints the collections, partitions and segments in the collectionReplica
114
	printReplica()
G
godchen 已提交
115 116
}

117 118
// collectionReplica is the data replication of memory data in query node.
// It implements `ReplicaInterface` interface.
119
type metaReplica struct {
120 121 122 123 124
	mu              sync.RWMutex // guards all
	collections     map[UniqueID]*Collection
	partitions      map[UniqueID]*Partition
	growingSegments map[UniqueID]*Segment
	sealedSegments  map[UniqueID]*Segment
Z
zhenshan.cao 已提交
125

126
	excludedSegments map[UniqueID][]*datapb.SegmentInfo // map[collectionID]segmentIDs
127 128
}

129
// getSegmentsMemSize get the memory size in bytes of all the Segments
130 131 132
func (replica *metaReplica) getSegmentsMemSize() int64 {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
133 134

	memSize := int64(0)
135 136 137 138
	for _, segment := range replica.growingSegments {
		memSize += segment.getMemSize()
	}
	for _, segment := range replica.sealedSegments {
139 140 141 142 143
		memSize += segment.getMemSize()
	}
	return memSize
}

144
// printReplica prints the collections, partitions and segments in the collectionReplica
145 146 147
func (replica *metaReplica) printReplica() {
	replica.mu.Lock()
	defer replica.mu.Unlock()
148

149 150
	log.Info("collections in collectionReplica", zap.Any("info", replica.collections))
	log.Info("partitions in collectionReplica", zap.Any("info", replica.partitions))
151 152
	log.Info("growingSegments in collectionReplica", zap.Any("info", replica.growingSegments))
	log.Info("sealedSegments in collectionReplica", zap.Any("info", replica.sealedSegments))
153
	log.Info("excludedSegments in collectionReplica", zap.Any("info", replica.excludedSegments))
154 155
}

156
//----------------------------------------------------------------------------------------------------- collection
157
// getCollectionIDs gets all the collection ids in the collectionReplica
158 159 160
func (replica *metaReplica) getCollectionIDs() []UniqueID {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
161
	collectionIDs := make([]UniqueID, 0)
162
	for id := range replica.collections {
163 164 165 166 167
		collectionIDs = append(collectionIDs, id)
	}
	return collectionIDs
}

168
// addCollection creates a new collection and add it to collectionReplica
169 170 171
func (replica *metaReplica) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Collection {
	replica.mu.Lock()
	defer replica.mu.Unlock()
G
godchen 已提交
172

173
	if col, ok := replica.collections[collectionID]; ok {
174
		return col
B
bigsheeper 已提交
175 176
	}

177 178
	var newC = newCollection(collectionID, schema)
	replica.collections[collectionID] = newC
179
	metrics.QueryNodeNumCollections.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(len(replica.collections)))
180
	return newC
181 182
}

183
// removeCollection removes the collection from collectionReplica
184 185 186 187
func (replica *metaReplica) removeCollection(collectionID UniqueID) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
	return replica.removeCollectionPrivate(collectionID)
B
bigsheeper 已提交
188
}
G
godchen 已提交
189

190
// removeCollectionPrivate is the private function in collectionReplica, to remove collection from collectionReplica
191 192
func (replica *metaReplica) removeCollectionPrivate(collectionID UniqueID) error {
	collection, err := replica.getCollectionByIDPrivate(collectionID)
G
godchen 已提交
193 194
	if err != nil {
		return err
195 196
	}

197 198 199 200
	// block incoming search&query
	collection.Lock()
	defer collection.Unlock()

B
bigsheeper 已提交
201 202 203
	// delete partitions
	for _, partitionID := range collection.partitionIDs {
		// ignore error, try to delete
204
		_ = replica.removePartitionPrivate(partitionID, true)
205 206
	}

B
bigsheeper 已提交
207
	deleteCollection(collection)
208
	delete(replica.collections, collectionID)
B
bigsheeper 已提交
209

210
	metrics.QueryNodeNumCollections.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(len(replica.collections)))
X
Xiaofan 已提交
211
	metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Sub(float64(len(collection.partitionIDs)))
212 213 214
	return nil
}

215
// getCollectionByID gets the collection which id is collectionID
216 217 218 219
func (replica *metaReplica) getCollectionByID(collectionID UniqueID) (*Collection, error) {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
	return replica.getCollectionByIDPrivate(collectionID)
Q
quicksilver 已提交
220 221
}

222
// getCollectionByIDPrivate is the private function in collectionReplica, to get collection from collectionReplica
223 224
func (replica *metaReplica) getCollectionByIDPrivate(collectionID UniqueID) (*Collection, error) {
	collection, ok := replica.collections[collectionID]
B
bigsheeper 已提交
225
	if !ok {
226
		return nil, fmt.Errorf("collection hasn't been loaded or has been released, collection id = %d", collectionID)
227 228
	}

B
bigsheeper 已提交
229
	return collection, nil
230 231
}

232
// hasCollection checks if collectionReplica has the collection which id is collectionID
233 234 235 236
func (replica *metaReplica) hasCollection(collectionID UniqueID) bool {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
	return replica.hasCollectionPrivate(collectionID)
B
bigsheeper 已提交
237
}
F
FluorineDog 已提交
238

239
// hasCollectionPrivate is the private function in collectionReplica, to check collection in collectionReplica
240 241
func (replica *metaReplica) hasCollectionPrivate(collectionID UniqueID) bool {
	_, ok := replica.collections[collectionID]
B
bigsheeper 已提交
242 243 244
	return ok
}

245
// getCollectionNum returns num of collections in collectionReplica
246 247 248 249
func (replica *metaReplica) getCollectionNum() int {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
	return len(replica.collections)
B
bigsheeper 已提交
250 251
}

252
// getPartitionIDs returns partition ids of collection
253 254 255
func (replica *metaReplica) getPartitionIDs(collectionID UniqueID) ([]UniqueID, error) {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
B
bigsheeper 已提交
256

257
	collection, err := replica.getCollectionByIDPrivate(collectionID)
B
bigsheeper 已提交
258 259
	if err != nil {
		return nil, err
F
FluorineDog 已提交
260
	}
B
bigsheeper 已提交
261

262
	return collection.getPartitionIDs(), nil
F
FluorineDog 已提交
263 264
}

265 266
func (replica *metaReplica) getIndexedFieldIDByCollectionIDPrivate(collectionID UniqueID, segment *Segment) ([]FieldID, error) {
	fields, err := replica.getFieldsByCollectionIDPrivate(collectionID)
267 268 269 270 271 272 273 274 275 276 277 278 279
	if err != nil {
		return nil, err
	}

	fieldIDS := make([]FieldID, 0)
	for _, field := range fields {
		if segment.hasLoadIndexForIndexedField(field.FieldID) {
			fieldIDS = append(fieldIDS, field.FieldID)
		}
	}
	return fieldIDS, nil
}

280 281
func (replica *metaReplica) getVecFieldIDsByCollectionIDPrivate(collectionID UniqueID) ([]FieldID, error) {
	fields, err := replica.getFieldsByCollectionIDPrivate(collectionID)
282 283 284 285
	if err != nil {
		return nil, err
	}

286
	vecFields := make([]FieldID, 0)
287
	for _, field := range fields {
G
godchen 已提交
288
		if field.DataType == schemapb.DataType_BinaryVector || field.DataType == schemapb.DataType_FloatVector {
B
bigsheeper 已提交
289
			vecFields = append(vecFields, field.FieldID)
290 291 292 293 294
		}
	}
	return vecFields, nil
}

295
// getVecFieldIDsByCollectionID returns vector field ids of collection
296 297 298
func (replica *metaReplica) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]FieldID, error) {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
299

300
	return replica.getVecFieldIDsByCollectionIDPrivate(collectionID)
301 302
}

303
// getPKFieldIDsByCollectionID returns vector field ids of collection
304 305 306
func (replica *metaReplica) getPKFieldIDByCollectionID(collectionID UniqueID) (FieldID, error) {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
307

308
	fields, err := replica.getFieldsByCollectionIDPrivate(collectionID)
309 310 311 312 313 314 315 316 317 318 319 320
	if err != nil {
		return common.InvalidFieldID, err
	}

	for _, field := range fields {
		if field.IsPrimaryKey {
			return field.FieldID, nil
		}
	}
	return common.InvalidFieldID, nil
}

321
// getFieldsByCollectionIDPrivate is the private function in collectionReplica, to return vector field ids of collection
322 323
func (replica *metaReplica) getFieldsByCollectionIDPrivate(collectionID UniqueID) ([]*schemapb.FieldSchema, error) {
	collection, err := replica.getCollectionByIDPrivate(collectionID)
324 325 326 327 328
	if err != nil {
		return nil, err
	}

	if len(collection.Schema().Fields) <= 0 {
Z
zhenshan.cao 已提交
329
		return nil, errors.New("no field in collection %d" + strconv.FormatInt(collectionID, 10))
330 331 332 333 334
	}

	return collection.Schema().Fields, nil
}

335
// getSegmentInfosByColID return segments info by collectionID
336
func (replica *metaReplica) getSegmentInfosByColID(collectionID UniqueID) []*querypb.SegmentInfo {
337 338
	replica.mu.RLock()
	defer replica.mu.RUnlock()
339 340

	segmentInfos := make([]*querypb.SegmentInfo, 0)
341
	_, ok := replica.collections[collectionID]
342 343
	if !ok {
		// collection not exist, so result segmentInfos is empty
344
		return segmentInfos
345 346
	}

347 348 349 350
	for _, segment := range replica.growingSegments {
		if segment.collectionID == collectionID {
			segmentInfo := replica.getSegmentInfo(segment)
			segmentInfos = append(segmentInfos, segmentInfo)
351
		}
352 353 354
	}
	for _, segment := range replica.sealedSegments {
		if segment.collectionID == collectionID {
355
			segmentInfo := replica.getSegmentInfo(segment)
356 357 358 359
			segmentInfos = append(segmentInfos, segmentInfo)
		}
	}

360
	return segmentInfos
361 362
}

363
//----------------------------------------------------------------------------------------------------- partition
364
// addPartition adds a new partition to collection
365 366 367 368
func (replica *metaReplica) addPartition(collectionID UniqueID, partitionID UniqueID) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
	return replica.addPartitionPrivate(collectionID, partitionID)
B
bigsheeper 已提交
369
}
C
cai.zhang 已提交
370

371
// addPartitionPrivate is the private function in collectionReplica, to add a new partition to collection
372 373
func (replica *metaReplica) addPartitionPrivate(collectionID UniqueID, partitionID UniqueID) error {
	collection, err := replica.getCollectionByIDPrivate(collectionID)
C
cai.zhang 已提交
374 375 376 377
	if err != nil {
		return err
	}

378
	if !replica.hasPartitionPrivate(partitionID) {
379 380
		collection.addPartitionID(partitionID)
		var newPartition = newPartition(collectionID, partitionID)
381
		replica.partitions[partitionID] = newPartition
382
	}
383

384
	metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(len(replica.partitions)))
G
godchen 已提交
385
	return nil
386 387
}

388
// removePartition removes the partition from collectionReplica
389 390 391 392
func (replica *metaReplica) removePartition(partitionID UniqueID) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
	return replica.removePartitionPrivate(partitionID, false)
Q
quicksilver 已提交
393 394
}

395
// removePartitionPrivate is the private function in collectionReplica, to remove the partition from collectionReplica
396 397 398
// `locked` flag indicates whether corresponding collection lock is accquired before calling this method
func (replica *metaReplica) removePartitionPrivate(partitionID UniqueID, locked bool) error {
	partition, err := replica.getPartitionByIDPrivate(partitionID)
G
godchen 已提交
399 400
	if err != nil {
		return err
401 402
	}

403
	collection, err := replica.getCollectionByIDPrivate(partition.collectionID)
F
FluorineDog 已提交
404 405 406 407
	if err != nil {
		return err
	}

408 409 410 411 412
	if !locked {
		collection.Lock()
		defer collection.Unlock()
	}

B
bigsheeper 已提交
413
	// delete segments
414 415
	ids, _ := partition.getSegmentIDs(segmentTypeGrowing)
	for _, segmentID := range ids {
416
		replica.removeSegmentPrivate(segmentID, segmentTypeGrowing)
417 418 419
	}
	ids, _ = partition.getSegmentIDs(segmentTypeSealed)
	for _, segmentID := range ids {
420
		replica.removeSegmentPrivate(segmentID, segmentTypeSealed)
F
FluorineDog 已提交
421
	}
B
bigsheeper 已提交
422 423

	collection.removePartitionID(partitionID)
424
	delete(replica.partitions, partitionID)
B
bigsheeper 已提交
425

426
	metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(len(replica.partitions)))
F
FluorineDog 已提交
427 428 429
	return nil
}

430
// getPartitionByID returns the partition which id is partitionID
431 432 433 434
func (replica *metaReplica) getPartitionByID(partitionID UniqueID) (*Partition, error) {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
	return replica.getPartitionByIDPrivate(partitionID)
Z
zhenshan.cao 已提交
435 436
}

437
// getPartitionByIDPrivate is the private function in collectionReplica, to get the partition
438 439
func (replica *metaReplica) getPartitionByIDPrivate(partitionID UniqueID) (*Partition, error) {
	partition, ok := replica.partitions[partitionID]
B
bigsheeper 已提交
440
	if !ok {
441
		return nil, fmt.Errorf("partition %d hasn't been loaded or has been released", partitionID)
Z
zhenshan.cao 已提交
442 443
	}

B
bigsheeper 已提交
444 445 446
	return partition, nil
}

447
// hasPartition returns true if collectionReplica has the partition, false otherwise
448 449 450 451
func (replica *metaReplica) hasPartition(partitionID UniqueID) bool {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
	return replica.hasPartitionPrivate(partitionID)
B
bigsheeper 已提交
452
}
Z
zhenshan.cao 已提交
453

454
// hasPartitionPrivate is the private function in collectionReplica, to check if collectionReplica has the partition
455 456
func (replica *metaReplica) hasPartitionPrivate(partitionID UniqueID) bool {
	_, ok := replica.partitions[partitionID]
B
bigsheeper 已提交
457
	return ok
Z
zhenshan.cao 已提交
458 459
}

460
// getPartitionNum returns num of partitions
461 462 463 464
func (replica *metaReplica) getPartitionNum() int {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
	return len(replica.partitions)
B
bigsheeper 已提交
465
}
Q
quicksilver 已提交
466

467
// getSegmentIDs returns segment ids
468
func (replica *metaReplica) getSegmentIDs(partitionID UniqueID, segType segmentType) ([]UniqueID, error) {
469 470
	replica.mu.RLock()
	defer replica.mu.RUnlock()
471 472

	return replica.getSegmentIDsPrivate(partitionID, segType)
473
}
F
FluorineDog 已提交
474

475
// getSegmentIDsByVChannel returns segment ids which virtual channel is vChannel
476 477 478
func (replica *metaReplica) getSegmentIDsByVChannel(partitionID UniqueID, vChannel Channel) ([]UniqueID, error) {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
479
	segmentIDs, err := replica.getSegmentIDsPrivate(partitionID, segmentTypeGrowing)
C
cai.zhang 已提交
480
	if err != nil {
481
		return nil, err
C
cai.zhang 已提交
482
	}
483 484
	segmentIDsTmp := make([]UniqueID, 0)
	for _, segmentID := range segmentIDs {
485
		segment, err := replica.getSegmentByIDPrivate(segmentID, segmentTypeGrowing)
486 487 488 489 490 491
		if err != nil {
			return nil, err
		}
		if segment.vChannelID == vChannel {
			segmentIDsTmp = append(segmentIDsTmp, segment.ID())
		}
C
cai.zhang 已提交
492 493
	}

494
	return segmentIDsTmp, nil
C
cai.zhang 已提交
495 496
}

497
// getSegmentIDsPrivate is private function in collectionReplica, it returns segment ids
498
func (replica *metaReplica) getSegmentIDsPrivate(partitionID UniqueID, segType segmentType) ([]UniqueID, error) {
499
	partition, err2 := replica.getPartitionByIDPrivate(partitionID)
500 501
	if err2 != nil {
		return nil, err2
C
cai.zhang 已提交
502
	}
503

504
	return partition.getSegmentIDs(segType)
C
cai.zhang 已提交
505 506
}

507
//----------------------------------------------------------------------------------------------------- segment
508
// addSegment add a new segment to collectionReplica
509
func (replica *metaReplica) addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, vChannelID Channel, segType segmentType) error {
510 511
	replica.mu.Lock()
	defer replica.mu.Unlock()
512

513
	collection, err := replica.getCollectionByIDPrivate(collectionID)
Z
zhenshan.cao 已提交
514 515 516
	if err != nil {
		return err
	}
517
	seg, err := newSegment(collection, segmentID, partitionID, collectionID, vChannelID, segType)
518 519 520
	if err != nil {
		return err
	}
521
	return replica.addSegmentPrivate(segmentID, partitionID, seg)
X
xige-16 已提交
522
}
Z
zhenshan.cao 已提交
523

524
// addSegmentPrivate is private function in collectionReplica, to add a new segment to collectionReplica
525 526
func (replica *metaReplica) addSegmentPrivate(segmentID UniqueID, partitionID UniqueID, segment *Segment) error {
	partition, err := replica.getPartitionByIDPrivate(partitionID)
B
bigsheeper 已提交
527 528
	if err != nil {
		return err
Z
zhenshan.cao 已提交
529 530
	}

531 532 533 534 535 536
	segType := segment.getType()
	ok, err := replica.hasSegmentPrivate(segmentID, segType)
	if err != nil {
		return err
	}
	if ok {
537 538
		return nil
	}
539 540 541 542 543 544 545 546 547 548
	partition.addSegmentID(segmentID, segType)

	switch segType {
	case segmentTypeGrowing:
		replica.growingSegments[segmentID] = segment
	case segmentTypeSealed:
		replica.sealedSegments[segmentID] = segment
	default:
		return fmt.Errorf("unexpected segment type, segmentID = %d, segmentType = %s", segmentID, segType.String())
	}
G
godchen 已提交
549

X
Xiaofan 已提交
550
	metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Inc()
551 552 553 554
	rowCount := segment.getRowCount()
	if rowCount > 0 {
		metrics.QueryNodeNumEntities.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Add(float64(rowCount))
	}
G
godchen 已提交
555
	return nil
Z
zhenshan.cao 已提交
556
}
557

558
// setSegment adds a segment to collectionReplica
559 560 561
func (replica *metaReplica) setSegment(segment *Segment) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
562 563 564 565 566

	if segment == nil {
		return fmt.Errorf("nil segment when setSegment")
	}

567
	_, err := replica.getCollectionByIDPrivate(segment.collectionID)
X
xige-16 已提交
568 569 570
	if err != nil {
		return err
	}
571

572
	return replica.addSegmentPrivate(segment.segmentID, segment.partitionID, segment)
X
xige-16 已提交
573 574
}

575
// removeSegment removes a segment from collectionReplica
576
func (replica *metaReplica) removeSegment(segmentID UniqueID, segType segmentType) {
577 578
	replica.mu.Lock()
	defer replica.mu.Unlock()
579
	replica.removeSegmentPrivate(segmentID, segType)
580 581
}

582
// removeSegmentPrivate is private function in collectionReplica, to remove a segment from collectionReplica
583
func (replica *metaReplica) removeSegmentPrivate(segmentID UniqueID, segType segmentType) {
584
	var rowCount int64
585 586
	switch segType {
	case segmentTypeGrowing:
587 588 589 590
		if segment, ok := replica.growingSegments[segmentID]; ok {
			if partition, ok := replica.partitions[segment.partitionID]; ok {
				partition.removeSegmentID(segmentID, segType)
			}
591
			rowCount = segment.getRowCount()
592 593 594
			delete(replica.growingSegments, segmentID)
			deleteSegment(segment)
		}
595
	case segmentTypeSealed:
596 597 598 599 600
		if segment, ok := replica.sealedSegments[segmentID]; ok {
			if partition, ok := replica.partitions[segment.partitionID]; ok {
				partition.removeSegmentID(segmentID, segType)
			}

601
			rowCount = segment.getRowCount()
602 603 604
			delete(replica.sealedSegments, segmentID)
			deleteSegment(segment)
		}
605
	default:
606
		panic(fmt.Sprintf("unsupported segment type %s", segType.String()))
607
	}
608

X
Xiaofan 已提交
609
	metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Dec()
610 611 612
	if rowCount > 0 {
		metrics.QueryNodeNumEntities.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Sub(float64(rowCount))
	}
613 614
}

615
// getSegmentByID returns the segment which id is segmentID
616
func (replica *metaReplica) getSegmentByID(segmentID UniqueID, segType segmentType) (*Segment, error) {
617 618
	replica.mu.RLock()
	defer replica.mu.RUnlock()
619
	return replica.getSegmentByIDPrivate(segmentID, segType)
620 621
}

622
// getSegmentByIDPrivate is private function in collectionReplica, it returns the segment which id is segmentID
623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638
func (replica *metaReplica) getSegmentByIDPrivate(segmentID UniqueID, segType segmentType) (*Segment, error) {
	switch segType {
	case segmentTypeGrowing:
		segment, ok := replica.growingSegments[segmentID]
		if !ok {
			return nil, fmt.Errorf("cannot find growing segment %d in QueryNode", segmentID)
		}
		return segment, nil
	case segmentTypeSealed:
		segment, ok := replica.sealedSegments[segmentID]
		if !ok {
			return nil, fmt.Errorf("cannot find sealed segment %d in QueryNode", segmentID)
		}
		return segment, nil
	default:
		return nil, fmt.Errorf("unexpected segment type, segmentID = %d, segmentType = %s", segmentID, segType.String())
639 640 641
	}
}

642
// hasSegment returns true if collectionReplica has the segment, false otherwise
643
func (replica *metaReplica) hasSegment(segmentID UniqueID, segType segmentType) (bool, error) {
644 645
	replica.mu.RLock()
	defer replica.mu.RUnlock()
646
	return replica.hasSegmentPrivate(segmentID, segType)
B
bigsheeper 已提交
647
}
G
godchen 已提交
648

649
// hasSegmentPrivate is private function in collectionReplica, to check if collectionReplica has the segment
650 651 652 653 654 655 656 657 658 659 660
func (replica *metaReplica) hasSegmentPrivate(segmentID UniqueID, segType segmentType) (bool, error) {
	switch segType {
	case segmentTypeGrowing:
		_, ok := replica.growingSegments[segmentID]
		return ok, nil
	case segmentTypeSealed:
		_, ok := replica.sealedSegments[segmentID]
		return ok, nil
	default:
		return false, fmt.Errorf("unexpected segment type, segmentID = %d, segmentType = %s", segmentID, segType.String())
	}
661
}
B
bigsheeper 已提交
662

663
// getSegmentNum returns num of segments in collectionReplica
664
func (replica *metaReplica) getSegmentNum(segType segmentType) int {
665 666
	replica.mu.RLock()
	defer replica.mu.RUnlock()
667 668 669 670 671 672 673 674 675 676

	switch segType {
	case segmentTypeGrowing:
		return len(replica.growingSegments)
	case segmentTypeSealed:
		return len(replica.sealedSegments)
	default:
		log.Error("unexpected segment type", zap.String("segmentType", segType.String()))
		return 0
	}
B
bigsheeper 已提交
677 678
}

679
//  getSegmentStatistics returns the statistics of segments in collectionReplica
680
func (replica *metaReplica) getSegmentStatistics() []*internalpb.SegmentStats {
681 682
	// TODO: deprecated
	return nil
B
bigsheeper 已提交
683 684
}

685
//  removeExcludedSegments will remove excludedSegments from collectionReplica
686 687 688
func (replica *metaReplica) removeExcludedSegments(collectionID UniqueID) {
	replica.mu.Lock()
	defer replica.mu.Unlock()
F
FluorineDog 已提交
689

690
	delete(replica.excludedSegments, collectionID)
Z
zhenshan.cao 已提交
691 692
}

693
// addExcludedSegments will add excludedSegments to collectionReplica
694 695 696
func (replica *metaReplica) addExcludedSegments(collectionID UniqueID, segmentInfos []*datapb.SegmentInfo) {
	replica.mu.Lock()
	defer replica.mu.Unlock()
Z
zhenshan.cao 已提交
697

698 699
	if _, ok := replica.excludedSegments[collectionID]; !ok {
		replica.excludedSegments[collectionID] = make([]*datapb.SegmentInfo, 0)
B
bigsheeper 已提交
700
	}
701

702
	replica.excludedSegments[collectionID] = append(replica.excludedSegments[collectionID], segmentInfos...)
B
bigsheeper 已提交
703 704
}

705
// getExcludedSegments returns excludedSegments of collectionReplica
706 707 708
func (replica *metaReplica) getExcludedSegments(collectionID UniqueID) ([]*datapb.SegmentInfo, error) {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
B
bigsheeper 已提交
709

710
	if _, ok := replica.excludedSegments[collectionID]; !ok {
Z
zhenshan.cao 已提交
711 712
		return nil, errors.New("getExcludedSegments failed, cannot found collection, id =" + fmt.Sprintln(collectionID))
	}
B
bigsheeper 已提交
713

714
	return replica.excludedSegments[collectionID], nil
Z
zhenshan.cao 已提交
715 716
}

717
// freeAll will free all meta info from collectionReplica
718 719 720
func (replica *metaReplica) freeAll() {
	replica.mu.Lock()
	defer replica.mu.Unlock()
Z
zhenshan.cao 已提交
721

722 723
	for id := range replica.collections {
		_ = replica.removeCollectionPrivate(id)
B
bigsheeper 已提交
724 725
	}

726 727
	replica.collections = make(map[UniqueID]*Collection)
	replica.partitions = make(map[UniqueID]*Partition)
728 729
	replica.growingSegments = make(map[UniqueID]*Segment)
	replica.sealedSegments = make(map[UniqueID]*Segment)
B
bigsheeper 已提交
730
}
731

732
// newCollectionReplica returns a new ReplicaInterface
733
func newCollectionReplica() ReplicaInterface {
734
	var replica ReplicaInterface = &metaReplica{
735 736 737 738
		collections:     make(map[UniqueID]*Collection),
		partitions:      make(map[UniqueID]*Partition),
		growingSegments: make(map[UniqueID]*Segment),
		sealedSegments:  make(map[UniqueID]*Segment),
Z
zhenshan.cao 已提交
739

740
		excludedSegments: make(map[UniqueID][]*datapb.SegmentInfo),
Z
zhenshan.cao 已提交
741 742 743 744
	}

	return replica
}
745 746

// trans segment to queryPb.segmentInfo
747
func (replica *metaReplica) getSegmentInfo(segment *Segment) *querypb.SegmentInfo {
748 749
	var indexName string
	var indexID int64
750
	var indexInfos []*querypb.FieldIndexInfo
751
	// TODO:: segment has multi vec column
752
	indexedFieldIDs, _ := replica.getIndexedFieldIDByCollectionIDPrivate(segment.collectionID, segment)
753 754 755 756 757 758
	for _, fieldID := range indexedFieldIDs {
		fieldInfo, err := segment.getIndexedFieldInfo(fieldID)
		if err == nil {
			indexName = fieldInfo.indexInfo.IndexName
			indexID = fieldInfo.indexInfo.IndexID
			indexInfos = append(indexInfos, fieldInfo.indexInfo)
759
		}
760 761 762 763 764
	}
	info := &querypb.SegmentInfo{
		SegmentID:    segment.ID(),
		CollectionID: segment.collectionID,
		PartitionID:  segment.partitionID,
X
Xiaofan 已提交
765
		NodeID:       Params.QueryNodeCfg.GetNodeID(),
766 767 768 769
		MemSize:      segment.getMemSize(),
		NumRows:      segment.getRowCount(),
		IndexName:    indexName,
		IndexID:      indexID,
X
xige-16 已提交
770
		DmChannel:    segment.vChannelID,
771
		SegmentState: segment.segmentType,
772
		IndexInfos:   indexInfos,
773
		NodeIds:      []UniqueID{Params.QueryNodeCfg.GetNodeID()},
774 775 776
	}
	return info
}