segment.go 13.1 KB
Newer Older
1
package querynode
B
bigsheeper 已提交
2

B
bigsheeper 已提交
3 4
/*

5
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
B
bigsheeper 已提交
6

G
GuoRentong 已提交
7
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
B
bigsheeper 已提交
8

F
FluorineDog 已提交
9 10 11
#include "segcore/collection_c.h"
#include "segcore/plan_c.h"
#include "segcore/reduce_c.h"
B
bigsheeper 已提交
12
*/
B
bigsheeper 已提交
13
import "C"
14
import (
Z
zhenshan.cao 已提交
15
	"strconv"
B
bigsheeper 已提交
16
	"sync"
B
bigsheeper 已提交
17
	"unsafe"
Z
zhenshan.cao 已提交
18

19
	"github.com/stretchr/testify/assert"
B
bigsheeper 已提交
20
	"go.uber.org/zap"
21

S
sunby 已提交
22 23
	"errors"

B
bigsheeper 已提交
24
	"github.com/zilliztech/milvus-distributed/internal/log"
N
neza2017 已提交
25
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
26 27
)

28 29
type segmentType int32

Z
zhenshan.cao 已提交
30
const (
31 32 33
	segmentTypeInvalid segmentType = iota
	segmentTypeGrowing
	segmentTypeSealed
Q
quicksilver 已提交
34
	segTypeIndexing
Z
zhenshan.cao 已提交
35 36
)

B
bigsheeper 已提交
37 38
type indexParam = map[string]string

B
bigsheeper 已提交
39
type Segment struct {
40 41
	segmentPtr C.CSegmentInterface

Q
quicksilver 已提交
42
	segmentID    UniqueID
Z
zhenshan.cao 已提交
43
	partitionID  UniqueID
Q
quicksilver 已提交
44 45 46 47
	collectionID UniqueID
	lastMemSize  int64
	lastRowCount int64

48 49 50
	once        sync.Once // guards enableIndex
	enableIndex bool

Q
quicksilver 已提交
51
	rmMutex          sync.Mutex // guards recentlyModified
B
bigsheeper 已提交
52
	recentlyModified bool
Q
quicksilver 已提交
53

54
	typeMu      sync.Mutex // guards builtIndex
55
	segmentType segmentType
56

B
bigsheeper 已提交
57
	paramMutex sync.RWMutex // guards index
Q
quicksilver 已提交
58
	indexParam map[int64]indexParam
B
bigsheeper 已提交
59 60
	indexName  string
	indexID    UniqueID
B
bigsheeper 已提交
61 62
}

Z
zhenshan.cao 已提交
63
//-------------------------------------------------------------------------------------- common interfaces
64 65 66 67
func (s *Segment) ID() UniqueID {
	return s.segmentID
}

68 69 70 71 72 73 74 75 76 77 78 79
func (s *Segment) setEnableIndex(enable bool) {
	setOnce := func() {
		s.enableIndex = enable
	}

	s.once.Do(setOnce)
}

func (s *Segment) getEnableIndex() bool {
	return s.enableIndex
}

80
func (s *Segment) setRecentlyModified(modify bool) {
Q
quicksilver 已提交
81 82
	s.rmMutex.Lock()
	defer s.rmMutex.Unlock()
B
bigsheeper 已提交
83 84 85
	s.recentlyModified = modify
}

86
func (s *Segment) getRecentlyModified() bool {
Q
quicksilver 已提交
87 88
	s.rmMutex.Lock()
	defer s.rmMutex.Unlock()
B
bigsheeper 已提交
89 90 91
	return s.recentlyModified
}

B
bigsheeper 已提交
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
func (s *Segment) setIndexName(name string) {
	s.rmMutex.Lock()
	defer s.rmMutex.Unlock()
	s.indexName = name
}

func (s *Segment) getIndexName() string {
	s.rmMutex.Lock()
	defer s.rmMutex.Unlock()
	return s.indexName
}

func (s *Segment) setIndexID(id UniqueID) {
	s.rmMutex.Lock()
	defer s.rmMutex.Unlock()
	s.indexID = id
}

func (s *Segment) getIndexID() UniqueID {
	s.rmMutex.Lock()
	defer s.rmMutex.Unlock()
	return s.indexID
}

116 117 118 119 120 121 122 123 124 125
func (s *Segment) setType(segType segmentType) {
	s.typeMu.Lock()
	defer s.typeMu.Unlock()
	s.segmentType = segType
}

func (s *Segment) getType() segmentType {
	s.typeMu.Lock()
	defer s.typeMu.Unlock()
	return s.segmentType
126 127
}

Z
zhenshan.cao 已提交
128 129 130 131 132 133
func newSegment(collection *Collection, segmentID int64, partitionID UniqueID, collectionID UniqueID, segType segmentType) *Segment {
	/*
		CSegmentInterface
		NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type);
	*/
	initIndexParam := make(map[int64]indexParam)
B
bigsheeper 已提交
134 135
	var segmentPtr C.CSegmentInterface
	switch segType {
136
	case segmentTypeInvalid:
B
bigsheeper 已提交
137
		log.Error("illegal segment type when create segment")
B
bigsheeper 已提交
138
		return nil
139
	case segmentTypeSealed:
B
bigsheeper 已提交
140
		segmentPtr = C.NewSegment(collection.collectionPtr, C.ulong(segmentID), C.Sealed)
141
	case segmentTypeGrowing:
B
bigsheeper 已提交
142 143
		segmentPtr = C.NewSegment(collection.collectionPtr, C.ulong(segmentID), C.Growing)
	default:
B
bigsheeper 已提交
144
		log.Error("illegal segment type when create segment")
B
bigsheeper 已提交
145 146 147
		return nil
	}

B
bigsheeper 已提交
148 149
	log.Debug("create segment", zap.Int64("segmentID", segmentID))

Z
zhenshan.cao 已提交
150 151
	var newSegment = &Segment{
		segmentPtr:   segmentPtr,
C
cai.zhang 已提交
152
		segmentType:  segType,
Z
zhenshan.cao 已提交
153 154 155 156 157 158 159 160 161
		segmentID:    segmentID,
		partitionID:  partitionID,
		collectionID: collectionID,
		indexParam:   initIndexParam,
	}

	return newSegment
}

162 163 164
func deleteSegment(segment *Segment) {
	/*
		void
X
XuanYang-cn 已提交
165
		deleteSegment(CSegmentInterface segment);
166 167 168
	*/
	cPtr := segment.segmentPtr
	C.DeleteSegment(cPtr)
169
	segment.segmentPtr = nil
B
bigsheeper 已提交
170 171 172

	log.Debug("delete segment", zap.Int64("segmentID", segment.ID()))

173
	segment = nil
174 175
}

B
bigsheeper 已提交
176
func (s *Segment) getRowCount() int64 {
B
bigsheeper 已提交
177
	/*
B
bigsheeper 已提交
178
		long int
X
XuanYang-cn 已提交
179
		getRowCount(CSegmentInterface c_segment);
B
bigsheeper 已提交
180
	*/
181 182 183
	if s.segmentPtr == nil {
		return -1
	}
184
	var rowCount = C.GetRowCount(s.segmentPtr)
B
bigsheeper 已提交
185
	return int64(rowCount)
B
bigsheeper 已提交
186 187
}

B
bigsheeper 已提交
188
func (s *Segment) getDeletedCount() int64 {
B
bigsheeper 已提交
189
	/*
B
bigsheeper 已提交
190
		long int
X
XuanYang-cn 已提交
191
		getDeletedCount(CSegmentInterface c_segment);
B
bigsheeper 已提交
192
	*/
193 194 195
	if s.segmentPtr == nil {
		return -1
	}
196
	var deletedCount = C.GetDeletedCount(s.segmentPtr)
B
bigsheeper 已提交
197 198 199
	return int64(deletedCount)
}

B
bigsheeper 已提交
200
func (s *Segment) getMemSize() int64 {
B
bigsheeper 已提交
201
	/*
B
bigsheeper 已提交
202
		long int
X
XuanYang-cn 已提交
203
		GetMemoryUsageInBytes(CSegmentInterface c_segment);
204
	*/
205 206 207
	if s.segmentPtr == nil {
		return -1
	}
208
	var memoryUsageInBytes = C.GetMemoryUsageInBytes(s.segmentPtr)
209

Z
zhenshan.cao 已提交
210
	return int64(memoryUsageInBytes)
211 212
}

Z
zhenshan.cao 已提交
213 214 215 216 217 218 219 220 221 222 223 224
func (s *Segment) segmentSearch(plan *Plan,
	placeHolderGroups []*PlaceholderGroup,
	timestamp []Timestamp) (*SearchResult, error) {
	/*
		CStatus
		Search(void* plan,
			void* placeholder_groups,
			uint64_t* timestamps,
			int num_groups,
			long int* result_ids,
			float* result_distances);
	*/
225 226 227
	if s.segmentPtr == nil {
		return nil, errors.New("null seg core pointer")
	}
Z
zhenshan.cao 已提交
228 229 230 231 232 233 234 235 236 237
	cPlaceholderGroups := make([]C.CPlaceholderGroup, 0)
	for _, pg := range placeHolderGroups {
		cPlaceholderGroups = append(cPlaceholderGroups, (*pg).cPlaceholderGroup)
	}

	var searchResult SearchResult
	var cTimestamp = (*C.ulong)(&timestamp[0])
	var cPlaceHolder = (*C.CPlaceholderGroup)(&cPlaceholderGroups[0])
	var cNumGroups = C.int(len(placeHolderGroups))

238
	log.Debug("do search on segment", zap.Int64("segmentID", s.segmentID), zap.Int32("segmentType", int32(s.segmentType)))
X
xige-16 已提交
239
	var status = C.Search(s.segmentPtr, plan.cPlan, cPlaceHolder, cTimestamp, cNumGroups, &searchResult.cQueryResult)
Z
zhenshan.cao 已提交
240 241 242 243 244 245 246 247 248 249 250 251 252
	errorCode := status.error_code

	if errorCode != 0 {
		errorMsg := C.GoString(status.error_msg)
		defer C.free(unsafe.Pointer(status.error_msg))
		return nil, errors.New("Search failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
	}

	return &searchResult, nil
}

func (s *Segment) fillTargetEntry(plan *Plan,
	result *SearchResult) error {
253 254 255
	if s.segmentPtr == nil {
		return errors.New("null seg core pointer")
	}
Z
zhenshan.cao 已提交
256 257 258 259 260 261 262 263 264 265 266 267 268

	var status = C.FillTargetEntry(s.segmentPtr, plan.cPlan, result.cQueryResult)
	errorCode := status.error_code

	if errorCode != 0 {
		errorMsg := C.GoString(status.error_msg)
		defer C.free(unsafe.Pointer(status.error_msg))
		return errors.New("FillTargetEntry failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
	}

	return nil
}

269
// segment, err := loadService.replica.getSegmentByID(segmentID)
Z
zhenshan.cao 已提交
270
func (s *Segment) updateSegmentIndex(loadIndexInfo *LoadIndexInfo) error {
271 272 273
	if s.segmentPtr == nil {
		return errors.New("null seg core pointer")
	}
C
cai.zhang 已提交
274 275
	var status C.CStatus

276
	if s.segmentType == segmentTypeGrowing {
C
cai.zhang 已提交
277
		status = C.UpdateSegmentIndex(s.segmentPtr, loadIndexInfo.cLoadIndexInfo)
278
	} else if s.segmentType == segmentTypeSealed {
C
cai.zhang 已提交
279 280 281 282 283
		status = C.UpdateSealedSegmentIndex(s.segmentPtr, loadIndexInfo.cLoadIndexInfo)
	} else {
		return errors.New("illegal segment type")
	}

Z
zhenshan.cao 已提交
284 285 286 287 288 289 290 291
	errorCode := status.error_code

	if errorCode != 0 {
		errorMsg := C.GoString(status.error_msg)
		defer C.free(unsafe.Pointer(status.error_msg))
		return errors.New("updateSegmentIndex failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
	}

292 293
	s.setType(segTypeIndexing)

Z
zhenshan.cao 已提交
294 295 296 297 298 299 300 301
	return nil
}

func (s *Segment) setIndexParam(fieldID int64, indexParamKv []*commonpb.KeyValuePair) error {
	s.paramMutex.Lock()
	defer s.paramMutex.Unlock()
	indexParamMap := make(indexParam)
	if indexParamKv == nil {
302
		return errors.New("empty loadIndexMsg's indexParam")
Z
zhenshan.cao 已提交
303 304 305 306 307 308 309 310
	}
	for _, param := range indexParamKv {
		indexParamMap[param.Key] = param.Value
	}
	s.indexParam[fieldID] = indexParamMap
	return nil
}

311
func (s *Segment) matchIndexParam(fieldID int64, indexParams indexParam) bool {
Z
zhenshan.cao 已提交
312 313 314 315 316 317 318 319
	s.paramMutex.RLock()
	defer s.paramMutex.RUnlock()
	fieldIndexParam := s.indexParam[fieldID]
	if fieldIndexParam == nil {
		return false
	}
	paramSize := len(s.indexParam)
	matchCount := 0
320 321
	for k, v := range indexParams {
		value, ok := fieldIndexParam[k]
Z
zhenshan.cao 已提交
322 323 324
		if !ok {
			return false
		}
325
		if v != value {
Z
zhenshan.cao 已提交
326 327 328 329 330 331 332 333
			return false
		}
		matchCount++
	}
	return paramSize == matchCount
}

//-------------------------------------------------------------------------------------- interfaces for growing segment
B
bigsheeper 已提交
334
func (s *Segment) segmentPreInsert(numOfRecords int) int64 {
B
bigsheeper 已提交
335
	/*
B
bigsheeper 已提交
336
		long int
X
XuanYang-cn 已提交
337
		PreInsert(CSegmentInterface c_segment, long int size);
338
	*/
339
	var offset = C.PreInsert(s.segmentPtr, C.long(int64(numOfRecords)))
340

Z
zhenshan.cao 已提交
341
	return int64(offset)
342 343
}

B
bigsheeper 已提交
344
func (s *Segment) segmentPreDelete(numOfRecords int) int64 {
B
bigsheeper 已提交
345
	/*
B
bigsheeper 已提交
346
		long int
X
XuanYang-cn 已提交
347
		PreDelete(CSegmentInterface c_segment, long int size);
348
	*/
349
	var offset = C.PreDelete(s.segmentPtr, C.long(int64(numOfRecords)))
350

Z
zhenshan.cao 已提交
351
	return int64(offset)
352 353
}

B
bigsheeper 已提交
354
func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps *[]Timestamp, records *[]*commonpb.Blob) error {
B
bigsheeper 已提交
355
	/*
N
neza2017 已提交
356
		CStatus
X
XuanYang-cn 已提交
357
		Insert(CSegmentInterface c_segment,
B
bigsheeper 已提交
358 359 360 361 362 363 364
		           long int reserved_offset,
		           signed long int size,
		           const long* primary_keys,
		           const unsigned long* timestamps,
		           void* raw_data,
		           int sizeof_per_row,
		           signed long int count);
B
bigsheeper 已提交
365
	*/
366 367 368
	if s.segmentPtr == nil {
		return errors.New("null seg core pointer")
	}
369
	// Blobs to one big blob
B
bigsheeper 已提交
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
	var numOfRow = len(*entityIDs)
	var sizeofPerRow = len((*records)[0].Value)

	assert.Equal(nil, numOfRow, len(*records))

	var rawData = make([]byte, numOfRow*sizeofPerRow)
	var copyOffset = 0
	for i := 0; i < len(*records); i++ {
		copy(rawData[copyOffset:], (*records)[i].Value)
		copyOffset += sizeofPerRow
	}

	var cOffset = C.long(offset)
	var cNumOfRows = C.long(numOfRow)
	var cEntityIdsPtr = (*C.long)(&(*entityIDs)[0])
	var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0])
	var cSizeofPerRow = C.int(sizeofPerRow)
	var cRawDataVoidPtr = unsafe.Pointer(&rawData[0])

389
	var status = C.Insert(s.segmentPtr,
B
bigsheeper 已提交
390 391 392 393 394 395 396 397
		cOffset,
		cNumOfRows,
		cEntityIdsPtr,
		cTimestampsPtr,
		cRawDataVoidPtr,
		cSizeofPerRow,
		cNumOfRows)

N
neza2017 已提交
398 399 400 401 402 403
	errorCode := status.error_code

	if errorCode != 0 {
		errorMsg := C.GoString(status.error_msg)
		defer C.free(unsafe.Pointer(status.error_msg))
		return errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
B
bigsheeper 已提交
404
	}
405

406
	s.setRecentlyModified(true)
407
	return nil
408 409
}

B
bigsheeper 已提交
410
func (s *Segment) segmentDelete(offset int64, entityIDs *[]UniqueID, timestamps *[]Timestamp) error {
B
bigsheeper 已提交
411
	/*
N
neza2017 已提交
412
		CStatus
X
XuanYang-cn 已提交
413
		Delete(CSegmentInterface c_segment,
B
bigsheeper 已提交
414 415 416 417
		           long int reserved_offset,
		           long size,
		           const long* primary_keys,
		           const unsigned long* timestamps);
B
bigsheeper 已提交
418
	*/
419 420 421
	if s.segmentPtr == nil {
		return errors.New("null seg core pointer")
	}
422 423
	var cOffset = C.long(offset)
	var cSize = C.long(len(*entityIDs))
Z
zhenshan.cao 已提交
424
	var cEntityIdsPtr = (*C.long)(&(*entityIDs)[0])
425
	var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0])
B
bigsheeper 已提交
426

427
	var status = C.Delete(s.segmentPtr, cOffset, cSize, cEntityIdsPtr, cTimestampsPtr)
428

N
neza2017 已提交
429 430 431 432 433 434
	errorCode := status.error_code

	if errorCode != 0 {
		errorMsg := C.GoString(status.error_msg)
		defer C.free(unsafe.Pointer(status.error_msg))
		return errors.New("Delete failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
435
	}
B
bigsheeper 已提交
436

437
	return nil
438 439
}

Z
zhenshan.cao 已提交
440
//-------------------------------------------------------------------------------------- interfaces for sealed segment
C
cai.zhang 已提交
441
func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int, data interface{}) error {
442
	/*
N
neza2017 已提交
443
		CStatus
Z
zhenshan.cao 已提交
444
		LoadFieldData(CSegmentInterface c_segment, CLoadFieldDataInfo load_field_data_info);
445
	*/
446 447 448
	if s.segmentPtr == nil {
		return errors.New("null seg core pointer")
	}
449
	if s.segmentType != segmentTypeSealed {
Z
zhenshan.cao 已提交
450
		return errors.New("illegal segment type when loading field data")
N
neza2017 已提交
451 452
	}

C
cai.zhang 已提交
453 454 455 456 457 458 459 460 461
	// data interface check
	var dataPointer unsafe.Pointer
	emptyErr := errors.New("null field data to be loaded")
	switch d := data.(type) {
	case []bool:
		if len(d) <= 0 {
			return emptyErr
		}
		dataPointer = unsafe.Pointer(&d[0])
X
xige-16 已提交
462 463 464 465 466
	case []byte:
		if len(d) <= 0 {
			return emptyErr
		}
		dataPointer = unsafe.Pointer(&d[0])
C
cai.zhang 已提交
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
	case []int8:
		if len(d) <= 0 {
			return emptyErr
		}
		dataPointer = unsafe.Pointer(&d[0])
	case []int16:
		if len(d) <= 0 {
			return emptyErr
		}
		dataPointer = unsafe.Pointer(&d[0])
	case []int32:
		if len(d) <= 0 {
			return emptyErr
		}
		dataPointer = unsafe.Pointer(&d[0])
	case []int64:
		if len(d) <= 0 {
			return emptyErr
		}
		dataPointer = unsafe.Pointer(&d[0])
	case []float32:
		if len(d) <= 0 {
			return emptyErr
		}
		dataPointer = unsafe.Pointer(&d[0])
	case []float64:
		if len(d) <= 0 {
			return emptyErr
		}
		dataPointer = unsafe.Pointer(&d[0])
	case []string:
		// TODO: support string type
		return errors.New("we cannot support string type now")
	default:
		return errors.New("illegal field data type")
	}

Z
zhenshan.cao 已提交
504
	/*
C
cai.zhang 已提交
505
		typedef struct CLoadFieldDataInfo {
Z
zhenshan.cao 已提交
506 507 508
		    int64_t field_id;
		    void* blob;
		    int64_t row_count;
C
cai.zhang 已提交
509
		} CLoadFieldDataInfo;
Z
zhenshan.cao 已提交
510 511 512
	*/
	loadInfo := C.CLoadFieldDataInfo{
		field_id:  C.int64_t(fieldID),
C
cai.zhang 已提交
513
		blob:      dataPointer,
Z
zhenshan.cao 已提交
514
		row_count: C.int64_t(rowCount),
515 516
	}

Z
zhenshan.cao 已提交
517
	var status = C.LoadFieldData(s.segmentPtr, loadInfo)
518 519 520 521
	errorCode := status.error_code
	if errorCode != 0 {
		errorMsg := C.GoString(status.error_msg)
		defer C.free(unsafe.Pointer(status.error_msg))
Z
zhenshan.cao 已提交
522
		return errors.New("LoadFieldData failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
523 524
	}

B
bigsheeper 已提交
525 526
	log.Debug("load field done", zap.Int64("fieldID", fieldID), zap.Int("row count", rowCount))

527 528
	return nil
}