collection_replica.go 8.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
package reader

/*

#cgo CFLAGS: -I${SRCDIR}/../core/output/include

#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib

#include "collection_c.h"
#include "segment_c.h"

*/
import "C"
import (
G
godchen 已提交
15
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
C
cai.zhang 已提交
16
	"strconv"
G
godchen 已提交
17
	"sync"
C
cai.zhang 已提交
18

19 20 21 22
	"github.com/zilliztech/milvus-distributed/internal/errors"
	"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
)

D
dragondriver 已提交
23 24 25 26 27 28 29 30 31
/*
 * collectionReplica contains a in-memory local copy of persistent collections.
 * In common cases, the system has multiple query nodes. Data of a collection will be
 * distributed across all the available query nodes, and each query node's collectionReplica
 * will maintain its own share (only part of the collection).
 * Every replica tracks a value called tSafe which is the maximum timestamp that the replica
 * is up-to-date.
 */
type collectionReplica interface {
32
	getTSafe() *tSafe
D
dragondriver 已提交
33

G
godchen 已提交
34 35
	// collection
	getCollectionNum() int
D
dragondriver 已提交
36
	addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error
G
godchen 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
	removeCollection(collectionID UniqueID) error
	getCollectionByID(collectionID UniqueID) (*Collection, error)
	getCollectionByName(collectionName string) (*Collection, error)

	// partition
	// Partition tags in different collections are not unique,
	// so partition api should specify the target collection.
	addPartition(collectionID UniqueID, partitionTag string) error
	removePartition(collectionID UniqueID, partitionTag string) error
	getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error)

	// segment
	getSegmentNum() int
	getSegmentStatistics() *internalpb.QueryNodeSegStats
	addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error
	removeSegment(segmentID UniqueID) error
	getSegmentByID(segmentID UniqueID) (*Segment, error)
	hasSegment(segmentID UniqueID) bool
N
neza2017 已提交
55 56

	freeAll()
G
godchen 已提交
57 58
}

D
dragondriver 已提交
59
type collectionReplicaImpl struct {
G
godchen 已提交
60
	mu          sync.RWMutex
61 62
	collections []*Collection
	segments    map[UniqueID]*Segment
63 64

	tSafe *tSafe
65 66
}

D
dragondriver 已提交
67
//----------------------------------------------------------------------------------------------------- tSafe
68
func (colReplica *collectionReplicaImpl) getTSafe() *tSafe {
D
dragondriver 已提交
69 70 71
	return colReplica.tSafe
}

72
//----------------------------------------------------------------------------------------------------- collection
D
dragondriver 已提交
73 74 75
func (colReplica *collectionReplicaImpl) getCollectionNum() int {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
G
godchen 已提交
76

D
dragondriver 已提交
77
	return len(colReplica.collections)
G
godchen 已提交
78 79
}

D
dragondriver 已提交
80 81 82
func (colReplica *collectionReplicaImpl) addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error {
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
G
godchen 已提交
83

D
dragondriver 已提交
84
	var newCollection = newCollection(collMeta, colMetaBlob)
D
dragondriver 已提交
85
	colReplica.collections = append(colReplica.collections, newCollection)
86

G
godchen 已提交
87
	return nil
88 89
}

D
dragondriver 已提交
90 91
func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error {
	collection, err := colReplica.getCollectionByID(collectionID)
G
godchen 已提交
92

D
dragondriver 已提交
93 94
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
G
godchen 已提交
95 96 97

	if err != nil {
		return err
98 99 100 101 102
	}

	deleteCollection(collection)

	tmpCollections := make([]*Collection, 0)
D
dragondriver 已提交
103
	for _, col := range colReplica.collections {
104
		if col.ID() == collectionID {
G
godchen 已提交
105
			for _, p := range *col.Partitions() {
106
				for _, s := range *p.Segments() {
D
dragondriver 已提交
107
					delete(colReplica.segments, s.ID())
108 109 110 111 112 113 114
				}
			}
		} else {
			tmpCollections = append(tmpCollections, col)
		}
	}

D
dragondriver 已提交
115
	colReplica.collections = tmpCollections
116 117 118
	return nil
}

D
dragondriver 已提交
119 120 121
func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
G
godchen 已提交
122

D
dragondriver 已提交
123
	for _, collection := range colReplica.collections {
124 125 126 127 128 129 130 131
		if collection.ID() == collectionID {
			return collection, nil
		}
	}

	return nil, errors.New("cannot find collection, id = " + strconv.FormatInt(collectionID, 10))
}

D
dragondriver 已提交
132 133 134
func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName string) (*Collection, error) {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
G
godchen 已提交
135

D
dragondriver 已提交
136
	for _, collection := range colReplica.collections {
137 138 139 140 141 142 143 144 145
		if collection.Name() == collectionName {
			return collection, nil
		}
	}

	return nil, errors.New("Cannot found collection: " + collectionName)
}

//----------------------------------------------------------------------------------------------------- partition
D
dragondriver 已提交
146 147
func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionTag string) error {
	collection, err := colReplica.getCollectionByID(collectionID)
G
godchen 已提交
148 149
	if err != nil {
		return err
150 151
	}

D
dragondriver 已提交
152 153
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
154

G
godchen 已提交
155
	var newPartition = newPartition(partitionTag)
Z
zhenshan.cao 已提交
156

G
godchen 已提交
157 158
	*collection.Partitions() = append(*collection.Partitions(), newPartition)
	return nil
159 160
}

D
dragondriver 已提交
161 162
func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID, partitionTag string) error {
	collection, err := colReplica.getCollectionByID(collectionID)
G
godchen 已提交
163 164
	if err != nil {
		return err
165 166
	}

D
dragondriver 已提交
167 168
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
Z
zhenshan.cao 已提交
169

G
godchen 已提交
170 171 172 173
	var tmpPartitions = make([]*Partition, 0)
	for _, p := range *collection.Partitions() {
		if p.Tag() == partitionTag {
			for _, s := range *p.Segments() {
D
dragondriver 已提交
174
				delete(colReplica.segments, s.ID())
175
			}
G
godchen 已提交
176 177
		} else {
			tmpPartitions = append(tmpPartitions, p)
178 179 180
		}
	}

G
godchen 已提交
181 182
	*collection.Partitions() = tmpPartitions
	return nil
183 184
}

D
dragondriver 已提交
185 186
func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) {
	collection, err := colReplica.getCollectionByID(collectionID)
187 188 189
	if err != nil {
		return nil, err
	}
G
godchen 已提交
190

D
dragondriver 已提交
191 192
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
G
godchen 已提交
193 194

	for _, p := range *collection.Partitions() {
195 196
		if p.Tag() == partitionTag {
			return p, nil
197 198 199 200 201 202 203
		}
	}

	return nil, errors.New("cannot find partition, tag = " + partitionTag)
}

//----------------------------------------------------------------------------------------------------- segment
D
dragondriver 已提交
204 205 206
func (colReplica *collectionReplicaImpl) getSegmentNum() int {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
G
godchen 已提交
207

D
dragondriver 已提交
208
	return len(colReplica.segments)
G
godchen 已提交
209 210
}

D
dragondriver 已提交
211
func (colReplica *collectionReplicaImpl) getSegmentStatistics() *internalpb.QueryNodeSegStats {
G
godchen 已提交
212 213
	var statisticData = make([]*internalpb.SegmentStats, 0)

D
dragondriver 已提交
214
	for segmentID, segment := range colReplica.segments {
G
godchen 已提交
215 216 217 218 219 220 221 222 223 224 225 226
		currentMemSize := segment.getMemSize()
		segment.lastMemSize = currentMemSize
		segmentNumOfRows := segment.getRowCount()

		stat := internalpb.SegmentStats{
			SegmentID:        segmentID,
			MemorySize:       currentMemSize,
			NumRows:          segmentNumOfRows,
			RecentlyModified: segment.recentlyModified,
		}

		statisticData = append(statisticData, &stat)
D
dragondriver 已提交
227
		segment.recentlyModified = false
228
	}
229

G
godchen 已提交
230 231 232
	return &internalpb.QueryNodeSegStats{
		MsgType:  internalpb.MsgType_kQueryNodeSegStats,
		SegStats: statisticData,
233
	}
G
godchen 已提交
234
}
235

D
dragondriver 已提交
236 237
func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error {
	collection, err := colReplica.getCollectionByID(collectionID)
G
godchen 已提交
238 239 240
	if err != nil {
		return err
	}
241

D
dragondriver 已提交
242
	partition, err := colReplica.getPartitionByTag(collectionID, partitionTag)
G
godchen 已提交
243 244
	if err != nil {
		return err
Z
zhenshan.cao 已提交
245
	}
246

D
dragondriver 已提交
247 248
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
G
godchen 已提交
249 250 251

	var newSegment = newSegment(collection, segmentID)

D
dragondriver 已提交
252
	colReplica.segments[segmentID] = newSegment
G
godchen 已提交
253 254 255
	*partition.Segments() = append(*partition.Segments(), newSegment)

	return nil
Z
zhenshan.cao 已提交
256
}
257

D
dragondriver 已提交
258 259 260
func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error {
	colReplica.mu.Lock()
	defer colReplica.mu.Unlock()
G
godchen 已提交
261

262
	var targetPartition *Partition
G
godchen 已提交
263
	var segmentIndex = -1
264

D
dragondriver 已提交
265
	for _, col := range colReplica.collections {
266
		for _, p := range *col.Partitions() {
G
godchen 已提交
267 268
			for i, s := range *p.Segments() {
				if s.ID() == segmentID {
269
					targetPartition = p
G
godchen 已提交
270
					segmentIndex = i
271 272 273 274 275
				}
			}
		}
	}

D
dragondriver 已提交
276
	delete(colReplica.segments, segmentID)
G
godchen 已提交
277 278 279

	if targetPartition != nil && segmentIndex > 0 {
		targetPartition.segments = append(targetPartition.segments[:segmentIndex], targetPartition.segments[segmentIndex+1:]...)
280
	}
C
cai.zhang 已提交
281

G
godchen 已提交
282
	return nil
283 284
}

D
dragondriver 已提交
285 286 287
func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
G
godchen 已提交
288

D
dragondriver 已提交
289
	targetSegment, ok := colReplica.segments[segmentID]
290 291 292 293 294 295 296 297

	if !ok {
		return nil, errors.New("cannot found segment with id = " + strconv.FormatInt(segmentID, 10))
	}

	return targetSegment, nil
}

D
dragondriver 已提交
298 299 300
func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool {
	colReplica.mu.RLock()
	defer colReplica.mu.RUnlock()
G
godchen 已提交
301

D
dragondriver 已提交
302
	_, ok := colReplica.segments[segmentID]
303 304 305

	return ok
}
N
neza2017 已提交
306 307 308 309 310 311 312 313 314 315

//-----------------------------------------------------------------------------------------------------
func (colReplica *collectionReplicaImpl) freeAll() {
	for _, seg := range colReplica.segments {
		deleteSegment(seg)
	}
	for _, col := range colReplica.collections {
		deleteCollection(col)
	}
}