segment.go 12.9 KB
Newer Older
1
package querynode
B
bigsheeper 已提交
2

B
bigsheeper 已提交
3 4
/*

5
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
B
bigsheeper 已提交
6

G
GuoRentong 已提交
7
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
B
bigsheeper 已提交
8

F
FluorineDog 已提交
9 10 11
#include "segcore/collection_c.h"
#include "segcore/plan_c.h"
#include "segcore/reduce_c.h"
B
bigsheeper 已提交
12
*/
B
bigsheeper 已提交
13
import "C"
14
import (
Z
zhenshan.cao 已提交
15
	"strconv"
B
bigsheeper 已提交
16
	"sync"
B
bigsheeper 已提交
17
	"unsafe"
Z
zhenshan.cao 已提交
18

19
	"github.com/stretchr/testify/assert"
B
bigsheeper 已提交
20
	"go.uber.org/zap"
21

22
	"github.com/zilliztech/milvus-distributed/internal/errors"
B
bigsheeper 已提交
23
	"github.com/zilliztech/milvus-distributed/internal/log"
N
neza2017 已提交
24
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
25 26
)

Z
zhenshan.cao 已提交
27
const (
B
bigsheeper 已提交
28 29 30 31
	segTypeInvalid  = 0
	segTypeGrowing  = 1
	segTypeSealed   = 2
	segTypeIndexing = 3
Z
zhenshan.cao 已提交
32 33
)

B
bigsheeper 已提交
34
type segmentType = int
B
bigsheeper 已提交
35 36
type indexParam = map[string]string

B
bigsheeper 已提交
37
type Segment struct {
38 39
	segmentPtr C.CSegmentInterface

Q
quicksilver 已提交
40
	segmentID    UniqueID
Z
zhenshan.cao 已提交
41
	partitionID  UniqueID
Q
quicksilver 已提交
42 43 44 45 46
	collectionID UniqueID
	lastMemSize  int64
	lastRowCount int64

	rmMutex          sync.Mutex // guards recentlyModified
B
bigsheeper 已提交
47
	recentlyModified bool
Q
quicksilver 已提交
48

49
	typeMu      sync.Mutex // guards builtIndex
B
bigsheeper 已提交
50
	segmentType int
51

B
bigsheeper 已提交
52
	paramMutex sync.RWMutex // guards index
Q
quicksilver 已提交
53
	indexParam map[int64]indexParam
B
bigsheeper 已提交
54 55
	indexName  string
	indexID    UniqueID
B
bigsheeper 已提交
56 57
}

Z
zhenshan.cao 已提交
58
//-------------------------------------------------------------------------------------- common interfaces
59 60 61 62
func (s *Segment) ID() UniqueID {
	return s.segmentID
}

63
func (s *Segment) setRecentlyModified(modify bool) {
Q
quicksilver 已提交
64 65
	s.rmMutex.Lock()
	defer s.rmMutex.Unlock()
B
bigsheeper 已提交
66 67 68
	s.recentlyModified = modify
}

69
func (s *Segment) getRecentlyModified() bool {
Q
quicksilver 已提交
70 71
	s.rmMutex.Lock()
	defer s.rmMutex.Unlock()
B
bigsheeper 已提交
72 73 74
	return s.recentlyModified
}

B
bigsheeper 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
func (s *Segment) setIndexName(name string) {
	s.rmMutex.Lock()
	defer s.rmMutex.Unlock()
	s.indexName = name
}

func (s *Segment) getIndexName() string {
	s.rmMutex.Lock()
	defer s.rmMutex.Unlock()
	return s.indexName
}

func (s *Segment) setIndexID(id UniqueID) {
	s.rmMutex.Lock()
	defer s.rmMutex.Unlock()
	s.indexID = id
}

func (s *Segment) getIndexID() UniqueID {
	s.rmMutex.Lock()
	defer s.rmMutex.Unlock()
	return s.indexID
}

99 100 101 102 103 104 105 106 107 108
func (s *Segment) setType(segType segmentType) {
	s.typeMu.Lock()
	defer s.typeMu.Unlock()
	s.segmentType = segType
}

func (s *Segment) getType() segmentType {
	s.typeMu.Lock()
	defer s.typeMu.Unlock()
	return s.segmentType
109 110
}

Z
zhenshan.cao 已提交
111 112 113 114 115 116
func newSegment(collection *Collection, segmentID int64, partitionID UniqueID, collectionID UniqueID, segType segmentType) *Segment {
	/*
		CSegmentInterface
		NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type);
	*/
	initIndexParam := make(map[int64]indexParam)
B
bigsheeper 已提交
117 118 119
	var segmentPtr C.CSegmentInterface
	switch segType {
	case segTypeInvalid:
B
bigsheeper 已提交
120
		log.Error("illegal segment type when create segment")
B
bigsheeper 已提交
121 122 123 124 125 126
		return nil
	case segTypeSealed:
		segmentPtr = C.NewSegment(collection.collectionPtr, C.ulong(segmentID), C.Sealed)
	case segTypeGrowing:
		segmentPtr = C.NewSegment(collection.collectionPtr, C.ulong(segmentID), C.Growing)
	default:
B
bigsheeper 已提交
127
		log.Error("illegal segment type when create segment")
B
bigsheeper 已提交
128 129 130
		return nil
	}

B
bigsheeper 已提交
131 132
	log.Debug("create segment", zap.Int64("segmentID", segmentID))

Z
zhenshan.cao 已提交
133 134
	var newSegment = &Segment{
		segmentPtr:   segmentPtr,
C
cai.zhang 已提交
135
		segmentType:  segType,
Z
zhenshan.cao 已提交
136 137 138 139 140 141 142 143 144
		segmentID:    segmentID,
		partitionID:  partitionID,
		collectionID: collectionID,
		indexParam:   initIndexParam,
	}

	return newSegment
}

145 146 147
func deleteSegment(segment *Segment) {
	/*
		void
X
XuanYang-cn 已提交
148
		deleteSegment(CSegmentInterface segment);
149 150 151
	*/
	cPtr := segment.segmentPtr
	C.DeleteSegment(cPtr)
152
	segment.segmentPtr = nil
B
bigsheeper 已提交
153 154 155

	log.Debug("delete segment", zap.Int64("segmentID", segment.ID()))

156
	segment = nil
157 158
}

B
bigsheeper 已提交
159
func (s *Segment) getRowCount() int64 {
B
bigsheeper 已提交
160
	/*
B
bigsheeper 已提交
161
		long int
X
XuanYang-cn 已提交
162
		getRowCount(CSegmentInterface c_segment);
B
bigsheeper 已提交
163
	*/
164 165 166
	if s.segmentPtr == nil {
		return -1
	}
167
	var rowCount = C.GetRowCount(s.segmentPtr)
B
bigsheeper 已提交
168
	return int64(rowCount)
B
bigsheeper 已提交
169 170
}

B
bigsheeper 已提交
171
func (s *Segment) getDeletedCount() int64 {
B
bigsheeper 已提交
172
	/*
B
bigsheeper 已提交
173
		long int
X
XuanYang-cn 已提交
174
		getDeletedCount(CSegmentInterface c_segment);
B
bigsheeper 已提交
175
	*/
176 177 178
	if s.segmentPtr == nil {
		return -1
	}
179
	var deletedCount = C.GetDeletedCount(s.segmentPtr)
B
bigsheeper 已提交
180 181 182
	return int64(deletedCount)
}

B
bigsheeper 已提交
183
func (s *Segment) getMemSize() int64 {
B
bigsheeper 已提交
184
	/*
B
bigsheeper 已提交
185
		long int
X
XuanYang-cn 已提交
186
		GetMemoryUsageInBytes(CSegmentInterface c_segment);
187
	*/
188 189 190
	if s.segmentPtr == nil {
		return -1
	}
191
	var memoryUsageInBytes = C.GetMemoryUsageInBytes(s.segmentPtr)
192

Z
zhenshan.cao 已提交
193
	return int64(memoryUsageInBytes)
194 195
}

Z
zhenshan.cao 已提交
196 197 198 199 200 201 202 203 204 205 206 207
func (s *Segment) segmentSearch(plan *Plan,
	placeHolderGroups []*PlaceholderGroup,
	timestamp []Timestamp) (*SearchResult, error) {
	/*
		CStatus
		Search(void* plan,
			void* placeholder_groups,
			uint64_t* timestamps,
			int num_groups,
			long int* result_ids,
			float* result_distances);
	*/
208 209 210
	if s.segmentPtr == nil {
		return nil, errors.New("null seg core pointer")
	}
Z
zhenshan.cao 已提交
211 212 213 214 215 216 217 218 219 220
	cPlaceholderGroups := make([]C.CPlaceholderGroup, 0)
	for _, pg := range placeHolderGroups {
		cPlaceholderGroups = append(cPlaceholderGroups, (*pg).cPlaceholderGroup)
	}

	var searchResult SearchResult
	var cTimestamp = (*C.ulong)(&timestamp[0])
	var cPlaceHolder = (*C.CPlaceholderGroup)(&cPlaceholderGroups[0])
	var cNumGroups = C.int(len(placeHolderGroups))

B
bigsheeper 已提交
221
	log.Debug("do search on segment", zap.Int64("segmentID", s.segmentID), zap.Int("segType", s.segmentType))
X
xige-16 已提交
222
	var status = C.Search(s.segmentPtr, plan.cPlan, cPlaceHolder, cTimestamp, cNumGroups, &searchResult.cQueryResult)
Z
zhenshan.cao 已提交
223 224 225 226 227 228 229 230 231 232 233 234 235
	errorCode := status.error_code

	if errorCode != 0 {
		errorMsg := C.GoString(status.error_msg)
		defer C.free(unsafe.Pointer(status.error_msg))
		return nil, errors.New("Search failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
	}

	return &searchResult, nil
}

func (s *Segment) fillTargetEntry(plan *Plan,
	result *SearchResult) error {
236 237 238
	if s.segmentPtr == nil {
		return errors.New("null seg core pointer")
	}
Z
zhenshan.cao 已提交
239 240 241 242 243 244 245 246 247 248 249 250 251

	var status = C.FillTargetEntry(s.segmentPtr, plan.cPlan, result.cQueryResult)
	errorCode := status.error_code

	if errorCode != 0 {
		errorMsg := C.GoString(status.error_msg)
		defer C.free(unsafe.Pointer(status.error_msg))
		return errors.New("FillTargetEntry failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
	}

	return nil
}

252
// segment, err := loadService.replica.getSegmentByID(segmentID)
Z
zhenshan.cao 已提交
253
func (s *Segment) updateSegmentIndex(loadIndexInfo *LoadIndexInfo) error {
254 255 256
	if s.segmentPtr == nil {
		return errors.New("null seg core pointer")
	}
C
cai.zhang 已提交
257 258 259 260 261 262 263 264 265 266
	var status C.CStatus

	if s.segmentType == segTypeGrowing {
		status = C.UpdateSegmentIndex(s.segmentPtr, loadIndexInfo.cLoadIndexInfo)
	} else if s.segmentType == segTypeSealed {
		status = C.UpdateSealedSegmentIndex(s.segmentPtr, loadIndexInfo.cLoadIndexInfo)
	} else {
		return errors.New("illegal segment type")
	}

Z
zhenshan.cao 已提交
267 268 269 270 271 272 273 274
	errorCode := status.error_code

	if errorCode != 0 {
		errorMsg := C.GoString(status.error_msg)
		defer C.free(unsafe.Pointer(status.error_msg))
		return errors.New("updateSegmentIndex failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
	}

275 276
	s.setType(segTypeIndexing)

Z
zhenshan.cao 已提交
277 278 279 280 281 282 283 284
	return nil
}

func (s *Segment) setIndexParam(fieldID int64, indexParamKv []*commonpb.KeyValuePair) error {
	s.paramMutex.Lock()
	defer s.paramMutex.Unlock()
	indexParamMap := make(indexParam)
	if indexParamKv == nil {
285
		return errors.New("empty loadIndexMsg's indexParam")
Z
zhenshan.cao 已提交
286 287 288 289 290 291 292 293
	}
	for _, param := range indexParamKv {
		indexParamMap[param.Key] = param.Value
	}
	s.indexParam[fieldID] = indexParamMap
	return nil
}

294
func (s *Segment) matchIndexParam(fieldID int64, indexParams indexParam) bool {
Z
zhenshan.cao 已提交
295 296 297 298 299 300 301 302
	s.paramMutex.RLock()
	defer s.paramMutex.RUnlock()
	fieldIndexParam := s.indexParam[fieldID]
	if fieldIndexParam == nil {
		return false
	}
	paramSize := len(s.indexParam)
	matchCount := 0
303 304
	for k, v := range indexParams {
		value, ok := fieldIndexParam[k]
Z
zhenshan.cao 已提交
305 306 307
		if !ok {
			return false
		}
308
		if v != value {
Z
zhenshan.cao 已提交
309 310 311 312 313 314 315 316
			return false
		}
		matchCount++
	}
	return paramSize == matchCount
}

//-------------------------------------------------------------------------------------- interfaces for growing segment
B
bigsheeper 已提交
317
func (s *Segment) segmentPreInsert(numOfRecords int) int64 {
B
bigsheeper 已提交
318
	/*
B
bigsheeper 已提交
319
		long int
X
XuanYang-cn 已提交
320
		PreInsert(CSegmentInterface c_segment, long int size);
321
	*/
322
	var offset = C.PreInsert(s.segmentPtr, C.long(int64(numOfRecords)))
323

Z
zhenshan.cao 已提交
324
	return int64(offset)
325 326
}

B
bigsheeper 已提交
327
func (s *Segment) segmentPreDelete(numOfRecords int) int64 {
B
bigsheeper 已提交
328
	/*
B
bigsheeper 已提交
329
		long int
X
XuanYang-cn 已提交
330
		PreDelete(CSegmentInterface c_segment, long int size);
331
	*/
332
	var offset = C.PreDelete(s.segmentPtr, C.long(int64(numOfRecords)))
333

Z
zhenshan.cao 已提交
334
	return int64(offset)
335 336
}

B
bigsheeper 已提交
337
func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps *[]Timestamp, records *[]*commonpb.Blob) error {
B
bigsheeper 已提交
338
	/*
N
neza2017 已提交
339
		CStatus
X
XuanYang-cn 已提交
340
		Insert(CSegmentInterface c_segment,
B
bigsheeper 已提交
341 342 343 344 345 346 347
		           long int reserved_offset,
		           signed long int size,
		           const long* primary_keys,
		           const unsigned long* timestamps,
		           void* raw_data,
		           int sizeof_per_row,
		           signed long int count);
B
bigsheeper 已提交
348
	*/
349 350 351
	if s.segmentPtr == nil {
		return errors.New("null seg core pointer")
	}
352
	// Blobs to one big blob
B
bigsheeper 已提交
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
	var numOfRow = len(*entityIDs)
	var sizeofPerRow = len((*records)[0].Value)

	assert.Equal(nil, numOfRow, len(*records))

	var rawData = make([]byte, numOfRow*sizeofPerRow)
	var copyOffset = 0
	for i := 0; i < len(*records); i++ {
		copy(rawData[copyOffset:], (*records)[i].Value)
		copyOffset += sizeofPerRow
	}

	var cOffset = C.long(offset)
	var cNumOfRows = C.long(numOfRow)
	var cEntityIdsPtr = (*C.long)(&(*entityIDs)[0])
	var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0])
	var cSizeofPerRow = C.int(sizeofPerRow)
	var cRawDataVoidPtr = unsafe.Pointer(&rawData[0])

372
	var status = C.Insert(s.segmentPtr,
B
bigsheeper 已提交
373 374 375 376 377 378 379 380
		cOffset,
		cNumOfRows,
		cEntityIdsPtr,
		cTimestampsPtr,
		cRawDataVoidPtr,
		cSizeofPerRow,
		cNumOfRows)

N
neza2017 已提交
381 382 383 384 385 386
	errorCode := status.error_code

	if errorCode != 0 {
		errorMsg := C.GoString(status.error_msg)
		defer C.free(unsafe.Pointer(status.error_msg))
		return errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
B
bigsheeper 已提交
387
	}
388

389
	s.setRecentlyModified(true)
390
	return nil
391 392
}

B
bigsheeper 已提交
393
func (s *Segment) segmentDelete(offset int64, entityIDs *[]UniqueID, timestamps *[]Timestamp) error {
B
bigsheeper 已提交
394
	/*
N
neza2017 已提交
395
		CStatus
X
XuanYang-cn 已提交
396
		Delete(CSegmentInterface c_segment,
B
bigsheeper 已提交
397 398 399 400
		           long int reserved_offset,
		           long size,
		           const long* primary_keys,
		           const unsigned long* timestamps);
B
bigsheeper 已提交
401
	*/
402 403 404
	if s.segmentPtr == nil {
		return errors.New("null seg core pointer")
	}
405 406
	var cOffset = C.long(offset)
	var cSize = C.long(len(*entityIDs))
Z
zhenshan.cao 已提交
407
	var cEntityIdsPtr = (*C.long)(&(*entityIDs)[0])
408
	var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0])
B
bigsheeper 已提交
409

410
	var status = C.Delete(s.segmentPtr, cOffset, cSize, cEntityIdsPtr, cTimestampsPtr)
411

N
neza2017 已提交
412 413 414 415 416 417
	errorCode := status.error_code

	if errorCode != 0 {
		errorMsg := C.GoString(status.error_msg)
		defer C.free(unsafe.Pointer(status.error_msg))
		return errors.New("Delete failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
418
	}
B
bigsheeper 已提交
419

420
	return nil
421 422
}

Z
zhenshan.cao 已提交
423
//-------------------------------------------------------------------------------------- interfaces for sealed segment
C
cai.zhang 已提交
424
func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int, data interface{}) error {
425
	/*
N
neza2017 已提交
426
		CStatus
Z
zhenshan.cao 已提交
427
		LoadFieldData(CSegmentInterface c_segment, CLoadFieldDataInfo load_field_data_info);
428
	*/
429 430 431
	if s.segmentPtr == nil {
		return errors.New("null seg core pointer")
	}
Z
zhenshan.cao 已提交
432 433
	if s.segmentType != segTypeSealed {
		return errors.New("illegal segment type when loading field data")
N
neza2017 已提交
434 435
	}

C
cai.zhang 已提交
436 437 438 439 440 441 442 443 444
	// data interface check
	var dataPointer unsafe.Pointer
	emptyErr := errors.New("null field data to be loaded")
	switch d := data.(type) {
	case []bool:
		if len(d) <= 0 {
			return emptyErr
		}
		dataPointer = unsafe.Pointer(&d[0])
X
xige-16 已提交
445 446 447 448 449
	case []byte:
		if len(d) <= 0 {
			return emptyErr
		}
		dataPointer = unsafe.Pointer(&d[0])
C
cai.zhang 已提交
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
	case []int8:
		if len(d) <= 0 {
			return emptyErr
		}
		dataPointer = unsafe.Pointer(&d[0])
	case []int16:
		if len(d) <= 0 {
			return emptyErr
		}
		dataPointer = unsafe.Pointer(&d[0])
	case []int32:
		if len(d) <= 0 {
			return emptyErr
		}
		dataPointer = unsafe.Pointer(&d[0])
	case []int64:
		if len(d) <= 0 {
			return emptyErr
		}
		dataPointer = unsafe.Pointer(&d[0])
	case []float32:
		if len(d) <= 0 {
			return emptyErr
		}
		dataPointer = unsafe.Pointer(&d[0])
	case []float64:
		if len(d) <= 0 {
			return emptyErr
		}
		dataPointer = unsafe.Pointer(&d[0])
	case []string:
		// TODO: support string type
		return errors.New("we cannot support string type now")
	default:
		return errors.New("illegal field data type")
	}

Z
zhenshan.cao 已提交
487
	/*
C
cai.zhang 已提交
488
		typedef struct CLoadFieldDataInfo {
Z
zhenshan.cao 已提交
489 490 491
		    int64_t field_id;
		    void* blob;
		    int64_t row_count;
C
cai.zhang 已提交
492
		} CLoadFieldDataInfo;
Z
zhenshan.cao 已提交
493 494 495
	*/
	loadInfo := C.CLoadFieldDataInfo{
		field_id:  C.int64_t(fieldID),
C
cai.zhang 已提交
496
		blob:      dataPointer,
Z
zhenshan.cao 已提交
497
		row_count: C.int64_t(rowCount),
498 499
	}

Z
zhenshan.cao 已提交
500
	var status = C.LoadFieldData(s.segmentPtr, loadInfo)
501 502 503 504
	errorCode := status.error_code
	if errorCode != 0 {
		errorMsg := C.GoString(status.error_msg)
		defer C.free(unsafe.Pointer(status.error_msg))
Z
zhenshan.cao 已提交
505
		return errors.New("LoadFieldData failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
506 507
	}

B
bigsheeper 已提交
508 509
	log.Debug("load field done", zap.Int64("fieldID", fieldID), zap.Int("row count", rowCount))

510 511
	return nil
}