collection_replica.go 11.2 KB
Newer Older
1 2 3 4 5 6 7 8
package reader

/*

#cgo CFLAGS: -I${SRCDIR}/../core/output/include

#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib

F
FluorineDog 已提交
9 10
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
11 12 13 14

*/
import "C"
import (
G
godchen 已提交
15
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
F
FluorineDog 已提交
16
	"log"
C
cai.zhang 已提交
17
	"strconv"
G
godchen 已提交
18
	"sync"
C
cai.zhang 已提交
19

20 21 22 23
	"github.com/zilliztech/milvus-distributed/internal/errors"
	"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
)

D
dragondriver 已提交
24 25 26 27 28 29 30 31 32
/*
 * collectionReplica contains a in-memory local copy of persistent collections.
 * In common cases, the system has multiple query nodes. Data of a collection will be
 * distributed across all the available query nodes, and each query node's collectionReplica
 * will maintain its own share (only part of the collection).
 * Every replica tracks a value called tSafe which is the maximum timestamp that the replica
 * is up-to-date.
 */
type collectionReplica interface {
33
	getTSafe() *tSafe
D
dragondriver 已提交
34

G
godchen 已提交
35 36
	// collection
	getCollectionNum() int
D
dragondriver 已提交
37
	addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error
G
godchen 已提交
38 39 40
	removeCollection(collectionID UniqueID) error
	getCollectionByID(collectionID UniqueID) (*Collection, error)
	getCollectionByName(collectionName string) (*Collection, error)
F
FluorineDog 已提交
41
	hasCollection(collectionID UniqueID) bool
G
godchen 已提交
42 43 44 45

	// partition
	// Partition tags in different collections are not unique,
	// so partition api should specify the target collection.
F
FluorineDog 已提交
46
	getPartitionNum(collectionID UniqueID) (int, error)
G
godchen 已提交
47 48
	addPartition(collectionID UniqueID, partitionTag string) error
	removePartition(collectionID UniqueID, partitionTag string) error
F
FluorineDog 已提交
49 50
	addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error
	removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error
G
godchen 已提交
51
	getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error)
F
FluorineDog 已提交
52
	hasPartition(collectionID UniqueID, partitionTag string) bool
G
godchen 已提交
53 54 55 56 57 58 59 60

	// segment
	getSegmentNum() int
	getSegmentStatistics() *internalpb.QueryNodeSegStats
	addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error
	removeSegment(segmentID UniqueID) error
	getSegmentByID(segmentID UniqueID) (*Segment, error)
	hasSegment(segmentID UniqueID) bool
B
bigsheeper 已提交
61 62

	freeAll()
G
godchen 已提交
63 64
}

D
dragondriver 已提交
65
type collectionReplicaImpl struct {
G
godchen 已提交
66
	mu          sync.RWMutex
67 68
	collections []*Collection
	segments    map[UniqueID]*Segment
69 70

	tSafe *tSafe
71 72
}

D
dragondriver 已提交
73
//----------------------------------------------------------------------------------------------------- tSafe
74
func (colReplica *collectionReplicaImpl) getTSafe() *tSafe {
D
dragondriver 已提交
75 76 77
	return colReplica.tSafe
}

78
//----------------------------------------------------------------------------------------------------- collection
D
dragondriver 已提交
79 80 81
func (colReplica *collectionReplicaImpl) getCollectionNum() int {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
G
godchen 已提交
82

D
dragondriver 已提交
83
	return len(colReplica.collections)
G
godchen 已提交
84 85
}

D
dragondriver 已提交
86 87 88
func (colReplica *collectionReplicaImpl) addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error {
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
G
godchen 已提交
89

D
dragondriver 已提交
90
	var newCollection = newCollection(collMeta, colMetaBlob)
D
dragondriver 已提交
91
	colReplica.collections = append(colReplica.collections, newCollection)
92

G
godchen 已提交
93
	return nil
94 95
}

D
dragondriver 已提交
96 97
func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error {
	collection, err := colReplica.getCollectionByID(collectionID)
G
godchen 已提交
98

D
dragondriver 已提交
99 100
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
G
godchen 已提交
101 102 103

	if err != nil {
		return err
104 105 106 107 108
	}

	deleteCollection(collection)

	tmpCollections := make([]*Collection, 0)
D
dragondriver 已提交
109
	for _, col := range colReplica.collections {
110
		if col.ID() == collectionID {
G
godchen 已提交
111
			for _, p := range *col.Partitions() {
112
				for _, s := range *p.Segments() {
D
dragondriver 已提交
113
					delete(colReplica.segments, s.ID())
114 115 116 117 118 119 120
				}
			}
		} else {
			tmpCollections = append(tmpCollections, col)
		}
	}

D
dragondriver 已提交
121
	colReplica.collections = tmpCollections
122 123 124
	return nil
}

D
dragondriver 已提交
125 126 127
func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
G
godchen 已提交
128

D
dragondriver 已提交
129
	for _, collection := range colReplica.collections {
130 131 132 133 134 135 136 137
		if collection.ID() == collectionID {
			return collection, nil
		}
	}

	return nil, errors.New("cannot find collection, id = " + strconv.FormatInt(collectionID, 10))
}

D
dragondriver 已提交
138 139 140
func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName string) (*Collection, error) {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
G
godchen 已提交
141

D
dragondriver 已提交
142
	for _, collection := range colReplica.collections {
143 144 145 146 147 148 149 150
		if collection.Name() == collectionName {
			return collection, nil
		}
	}

	return nil, errors.New("Cannot found collection: " + collectionName)
}

F
FluorineDog 已提交
151 152 153 154 155 156 157 158 159 160 161 162
func (colReplica *collectionReplicaImpl) hasCollection(collectionID UniqueID) bool {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()

	for _, col := range colReplica.collections {
		if col.ID() == collectionID {
			return true
		}
	}
	return false
}

163
//----------------------------------------------------------------------------------------------------- partition
F
FluorineDog 已提交
164 165 166 167 168 169 170 171 172 173 174 175
func (colReplica *collectionReplicaImpl) getPartitionNum(collectionID UniqueID) (int, error) {
	collection, err := colReplica.getCollectionByID(collectionID)
	if err != nil {
		return -1, err
	}

	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()

	return len(collection.partitions), nil
}

D
dragondriver 已提交
176 177
func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionTag string) error {
	collection, err := colReplica.getCollectionByID(collectionID)
G
godchen 已提交
178 179
	if err != nil {
		return err
180 181
	}

D
dragondriver 已提交
182 183
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
184

G
godchen 已提交
185
	var newPartition = newPartition(partitionTag)
Z
zhenshan.cao 已提交
186

G
godchen 已提交
187 188
	*collection.Partitions() = append(*collection.Partitions(), newPartition)
	return nil
189 190
}

D
dragondriver 已提交
191 192
func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID, partitionTag string) error {
	collection, err := colReplica.getCollectionByID(collectionID)
G
godchen 已提交
193 194
	if err != nil {
		return err
195 196
	}

D
dragondriver 已提交
197 198
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
Z
zhenshan.cao 已提交
199

G
godchen 已提交
200 201 202 203
	var tmpPartitions = make([]*Partition, 0)
	for _, p := range *collection.Partitions() {
		if p.Tag() == partitionTag {
			for _, s := range *p.Segments() {
D
dragondriver 已提交
204
				delete(colReplica.segments, s.ID())
205
			}
G
godchen 已提交
206 207
		} else {
			tmpPartitions = append(tmpPartitions, p)
208 209 210
		}
	}

G
godchen 已提交
211 212
	*collection.Partitions() = tmpPartitions
	return nil
213 214
}

F
FluorineDog 已提交
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
func (colReplica *collectionReplicaImpl) addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error {
	if !colReplica.hasCollection(colMeta.ID) {
		err := errors.New("Cannot find collection, id = " + strconv.FormatInt(colMeta.ID, 10))
		return err
	}
	pToAdd := make([]string, 0)
	for _, partitionTag := range colMeta.PartitionTags {
		if !colReplica.hasPartition(colMeta.ID, partitionTag) {
			pToAdd = append(pToAdd, partitionTag)
		}
	}

	for _, tag := range pToAdd {
		err := colReplica.addPartition(colMeta.ID, tag)
		if err != nil {
			log.Println(err)
		}
	}

	return nil
}

func (colReplica *collectionReplicaImpl) removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error {
	col, err := colReplica.getCollectionByID(colMeta.ID)
	if err != nil {
		return err
	}

	colReplica.mu.Lock()

	pToDel := make([]string, 0)
	for _, partition := range col.partitions {
		hasPartition := false
		for _, tag := range colMeta.PartitionTags {
			if partition.partitionTag == tag {
				hasPartition = true
			}
		}
		if !hasPartition {
			pToDel = append(pToDel, partition.partitionTag)
		}
	}

	colReplica.mu.Unlock()

	for _, tag := range pToDel {
		err := colReplica.removePartition(col.ID(), tag)
		if err != nil {
			log.Println(err)
		}
	}

	return nil
}

D
dragondriver 已提交
270 271
func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) {
	collection, err := colReplica.getCollectionByID(collectionID)
272 273 274
	if err != nil {
		return nil, err
	}
G
godchen 已提交
275

D
dragondriver 已提交
276 277
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
G
godchen 已提交
278 279

	for _, p := range *collection.Partitions() {
280 281
		if p.Tag() == partitionTag {
			return p, nil
282 283 284 285 286 287
		}
	}

	return nil, errors.New("cannot find partition, tag = " + partitionTag)
}

F
FluorineDog 已提交
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
func (colReplica *collectionReplicaImpl) hasPartition(collectionID UniqueID, partitionTag string) bool {
	collection, err := colReplica.getCollectionByID(collectionID)
	if err != nil {
		log.Println(err)
		return false
	}

	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()

	for _, p := range *collection.Partitions() {
		if p.Tag() == partitionTag {
			return true
		}
	}

	return false
}

307
//----------------------------------------------------------------------------------------------------- segment
D
dragondriver 已提交
308 309 310
func (colReplica *collectionReplicaImpl) getSegmentNum() int {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
G
godchen 已提交
311

D
dragondriver 已提交
312
	return len(colReplica.segments)
G
godchen 已提交
313 314
}

D
dragondriver 已提交
315
func (colReplica *collectionReplicaImpl) getSegmentStatistics() *internalpb.QueryNodeSegStats {
F
FluorineDog 已提交
316 317 318
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()

G
godchen 已提交
319 320
	var statisticData = make([]*internalpb.SegmentStats, 0)

D
dragondriver 已提交
321
	for segmentID, segment := range colReplica.segments {
G
godchen 已提交
322 323 324 325 326 327 328 329 330 331 332 333
		currentMemSize := segment.getMemSize()
		segment.lastMemSize = currentMemSize
		segmentNumOfRows := segment.getRowCount()

		stat := internalpb.SegmentStats{
			SegmentID:        segmentID,
			MemorySize:       currentMemSize,
			NumRows:          segmentNumOfRows,
			RecentlyModified: segment.recentlyModified,
		}

		statisticData = append(statisticData, &stat)
D
dragondriver 已提交
334
		segment.recentlyModified = false
335
	}
336

G
godchen 已提交
337 338 339
	return &internalpb.QueryNodeSegStats{
		MsgType:  internalpb.MsgType_kQueryNodeSegStats,
		SegStats: statisticData,
340
	}
G
godchen 已提交
341
}
342

D
dragondriver 已提交
343 344
func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error {
	collection, err := colReplica.getCollectionByID(collectionID)
G
godchen 已提交
345 346 347
	if err != nil {
		return err
	}
348

D
dragondriver 已提交
349
	partition, err := colReplica.getPartitionByTag(collectionID, partitionTag)
G
godchen 已提交
350 351
	if err != nil {
		return err
Z
zhenshan.cao 已提交
352
	}
353

D
dragondriver 已提交
354 355
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
G
godchen 已提交
356 357 358

	var newSegment = newSegment(collection, segmentID)

D
dragondriver 已提交
359
	colReplica.segments[segmentID] = newSegment
G
godchen 已提交
360 361 362
	*partition.Segments() = append(*partition.Segments(), newSegment)

	return nil
Z
zhenshan.cao 已提交
363
}
364

D
dragondriver 已提交
365 366 367
func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error {
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
G
godchen 已提交
368

369
	var targetPartition *Partition
G
godchen 已提交
370
	var segmentIndex = -1
371

D
dragondriver 已提交
372
	for _, col := range colReplica.collections {
373
		for _, p := range *col.Partitions() {
G
godchen 已提交
374 375
			for i, s := range *p.Segments() {
				if s.ID() == segmentID {
376
					targetPartition = p
G
godchen 已提交
377
					segmentIndex = i
378 379 380 381 382
				}
			}
		}
	}

D
dragondriver 已提交
383
	delete(colReplica.segments, segmentID)
G
godchen 已提交
384 385 386

	if targetPartition != nil && segmentIndex > 0 {
		targetPartition.segments = append(targetPartition.segments[:segmentIndex], targetPartition.segments[segmentIndex+1:]...)
387
	}
C
cai.zhang 已提交
388

G
godchen 已提交
389
	return nil
390 391
}

D
dragondriver 已提交
392 393 394
func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
G
godchen 已提交
395

D
dragondriver 已提交
396
	targetSegment, ok := colReplica.segments[segmentID]
397 398 399 400 401 402 403 404

	if !ok {
		return nil, errors.New("cannot found segment with id = " + strconv.FormatInt(segmentID, 10))
	}

	return targetSegment, nil
}

D
dragondriver 已提交
405 406 407
func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
G
godchen 已提交
408

D
dragondriver 已提交
409
	_, ok := colReplica.segments[segmentID]
410 411 412

	return ok
}
B
bigsheeper 已提交
413 414 415

//-----------------------------------------------------------------------------------------------------
func (colReplica *collectionReplicaImpl) freeAll() {
F
FluorineDog 已提交
416 417 418
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()

B
bigsheeper 已提交
419 420 421 422 423 424 425
	for _, seg := range colReplica.segments {
		deleteSegment(seg)
	}
	for _, col := range colReplica.collections {
		deleteCollection(col)
	}
}