segment.go 6.0 KB
Newer Older
B
bigsheeper 已提交
1 2
package reader

B
bigsheeper 已提交
3 4
/*

F
FluorineDog 已提交
5
#cgo CFLAGS: -I${SRCDIR}/../../core/include
B
bigsheeper 已提交
6

F
FluorineDog 已提交
7
#cgo LDFLAGS: -L${SRCDIR}/../../core/lib -lmilvus_dog_segment -Wl,-rpath=${SRCDIR}/../../core/lib
B
bigsheeper 已提交
8

9 10
#include "collection_c.h"
#include "partition_c.h"
B
bigsheeper 已提交
11 12 13
#include "segment_c.h"

*/
B
bigsheeper 已提交
14
import "C"
15
import (
B
bigsheeper 已提交
16
	"fmt"
17
	"github.com/czs007/suvlim/errors"
Z
zhenshan.cao 已提交
18
	msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
B
bigsheeper 已提交
19
	"strconv"
B
bigsheeper 已提交
20
	"unsafe"
21 22
)

B
bigsheeper 已提交
23
const SegmentLifetime = 20000
B
bigsheeper 已提交
24

B
bigsheeper 已提交
25
const (
B
bigsheeper 已提交
26 27 28 29
	SegmentOpened  = 0
	SegmentClosed  = 1
	SegmentIndexing = 2
	SegmentIndexed = 3
B
bigsheeper 已提交
30 31
)

B
bigsheeper 已提交
32
type Segment struct {
33 34
	SegmentPtr       C.CSegmentBase
	SegmentId        int64
B
bigsheeper 已提交
35
	SegmentCloseTime uint64
36
	LastMemSize      uint64
B
bigsheeper 已提交
37
	SegmentStatus    int
B
bigsheeper 已提交
38 39
}

B
bigsheeper 已提交
40
func (s *Segment) GetStatus() int {
B
bigsheeper 已提交
41
	/*
B
bigsheeper 已提交
42 43 44 45 46 47 48 49 50
	bool
	IsOpened(CSegmentBase c_segment);
	*/
	var isOpened = C.IsOpened(s.SegmentPtr)
	if isOpened {
		return SegmentOpened
	} else {
		return SegmentClosed
	}
B
bigsheeper 已提交
51 52
}

B
bigsheeper 已提交
53
func (s *Segment) GetRowCount() int64 {
B
bigsheeper 已提交
54
	/*
B
bigsheeper 已提交
55 56 57 58 59
	long int
	GetRowCount(CSegmentBase c_segment);
	*/
	var rowCount = C.GetRowCount(s.SegmentPtr)
	return int64(rowCount)
B
bigsheeper 已提交
60 61
}

B
bigsheeper 已提交
62
func (s *Segment) GetDeletedCount() int64 {
B
bigsheeper 已提交
63
	/*
B
bigsheeper 已提交
64 65 66 67 68 69 70
	long int
	GetDeletedCount(CSegmentBase c_segment);
	*/
	var deletedCount = C.GetDeletedCount(s.SegmentPtr)
	return int64(deletedCount)
}

B
bigsheeper 已提交
71 72
func (s *Segment) CloseSegment(collection* Collection) error {
	/*
B
bigsheeper 已提交
73 74 75
	int
	Close(CSegmentBase c_segment);
	*/
X
xige-16 已提交
76 77
	fmt.Println("Closing segment :", s.SegmentId)

B
bigsheeper 已提交
78
	var status = C.Close(s.SegmentPtr)
B
bigsheeper 已提交
79 80
	s.SegmentStatus = SegmentClosed

B
bigsheeper 已提交
81 82 83
	if status != 0 {
		return errors.New("Close segment failed, error code = " + strconv.Itoa(int(status)))
	}
Z
zhenshan.cao 已提交
84 85

	// Build index after closing segment
B
bigsheeper 已提交
86 87 88
	//s.SegmentStatus = SegmentIndexing
	//fmt.Println("Building index...")
	//s.buildIndex(collection)
B
bigsheeper 已提交
89 90 91

	// TODO: remove redundant segment indexed status
	// Change segment status to indexed
B
bigsheeper 已提交
92 93
	//s.SegmentStatus = SegmentIndexed
	//fmt.Println("Segment closed and indexed")
B
bigsheeper 已提交
94

B
bigsheeper 已提交
95
	fmt.Println("Segment closed")
B
bigsheeper 已提交
96
	return nil
B
bigsheeper 已提交
97
}
98

99
func (s *Segment) GetMemSize() uint64 {
B
bigsheeper 已提交
100
	/*
101 102 103 104 105 106
	long int
	GetMemoryUsageInBytes(CSegmentBase c_segment);
	*/
	var memoryUsageInBytes = C.GetMemoryUsageInBytes(s.SegmentPtr)

	return uint64(memoryUsageInBytes)
107 108
}

109
////////////////////////////////////////////////////////////////////////////
110
func (s *Segment) SegmentPreInsert(numOfRecords int) int64 {
B
bigsheeper 已提交
111
	/*
112 113 114
	long int
	PreInsert(CSegmentBase c_segment, long int size);
	*/
Z
zhenshan.cao 已提交
115
	var offset = C.PreInsert(s.SegmentPtr, C.long(int64(numOfRecords)))
116

Z
zhenshan.cao 已提交
117
	return int64(offset)
118 119 120
}

func (s *Segment) SegmentPreDelete(numOfRecords int) int64 {
B
bigsheeper 已提交
121
	/*
122 123 124
	long int
	PreDelete(CSegmentBase c_segment, long int size);
	*/
Z
zhenshan.cao 已提交
125
	var offset = C.PreDelete(s.SegmentPtr, C.long(int64(numOfRecords)))
126

Z
zhenshan.cao 已提交
127
	return int64(offset)
128 129 130
}

func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[]uint64, records *[][]byte) error {
B
bigsheeper 已提交
131
	/*
B
bigsheeper 已提交
132 133
	int
	Insert(CSegmentBase c_segment,
134
	           long int reserved_offset,
B
bigsheeper 已提交
135
	           signed long int size,
136
	           const long* primary_keys,
B
bigsheeper 已提交
137 138 139
	           const unsigned long* timestamps,
	           void* raw_data,
	           int sizeof_per_row,
140
	           signed long int count);
B
bigsheeper 已提交
141
	*/
142
	// Blobs to one big blob
B
bigsheeper 已提交
143 144 145
	var numOfRow = len(*entityIDs)
	var sizeofPerRow = len((*records)[0])

Z
zhenshan.cao 已提交
146
	var rawData = make([]byte, numOfRow*sizeofPerRow)
147
	var copyOffset = 0
148
	for i := 0; i < len(*records); i++ {
149
		copy(rawData[copyOffset:], (*records)[i])
Z
zhenshan.cao 已提交
150
		copyOffset += len((*records)[i])
151 152
	}

153
	var cOffset = C.long(offset)
B
bigsheeper 已提交
154
	var cNumOfRows = C.long(numOfRow)
Z
zhenshan.cao 已提交
155
	var cEntityIdsPtr = (*C.long)(&(*entityIDs)[0])
156
	var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0])
B
bigsheeper 已提交
157
	var cSizeofPerRow = C.int(sizeofPerRow)
158 159 160
	var cRawDataVoidPtr = unsafe.Pointer(&rawData[0])

	var status = C.Insert(s.SegmentPtr,
161 162 163 164
		cOffset,
		cNumOfRows,
		cEntityIdsPtr,
		cTimestampsPtr,
165
		cRawDataVoidPtr,
166 167
		cSizeofPerRow,
		cNumOfRows)
168 169

	if status != 0 {
170
		return errors.New("Insert failed, error code = " + strconv.Itoa(int(status)))
171 172
	}

173
	return nil
174 175
}

176
func (s *Segment) SegmentDelete(offset int64, entityIDs *[]int64, timestamps *[]uint64) error {
B
bigsheeper 已提交
177
	/*
B
bigsheeper 已提交
178 179
	int
	Delete(CSegmentBase c_segment,
180
	           long int reserved_offset,
B
bigsheeper 已提交
181
	           long size,
182 183
	           const long* primary_keys,
	           const unsigned long* timestamps);
B
bigsheeper 已提交
184
	*/
185 186
	var cOffset = C.long(offset)
	var cSize = C.long(len(*entityIDs))
Z
zhenshan.cao 已提交
187
	var cEntityIdsPtr = (*C.long)(&(*entityIDs)[0])
188
	var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0])
B
bigsheeper 已提交
189

190
	var status = C.Delete(s.SegmentPtr, cOffset, cSize, cEntityIdsPtr, cTimestampsPtr)
191 192

	if status != 0 {
193
		return errors.New("Delete failed, error code = " + strconv.Itoa(int(status)))
194
	}
B
bigsheeper 已提交
195

196
	return nil
197 198
}

B
bigsheeper 已提交
199
func (s *Segment) SegmentSearch(query *QueryInfo, timestamp uint64, vectorRecord *msgPb.VectorRowRecord) (*SearchResult, error) {
B
bigsheeper 已提交
200
	/*
B
bigsheeper 已提交
201 202
	int
	Search(CSegmentBase c_segment,
Z
zhenshan.cao 已提交
203 204 205 206 207 208
	       CQueryInfo  c_query_info,
	       unsigned long timestamp,
	       float* query_raw_data,
	       int num_of_query_raw_data,
	       long int* result_ids,
	       float* result_distances);
B
bigsheeper 已提交
209
	*/
B
bigsheeper 已提交
210
	//type CQueryInfo C.CQueryInfo
Z
zhenshan.cao 已提交
211 212 213 214 215 216

	cQuery := C.CQueryInfo{
		num_queries: C.long(query.NumQueries),
		topK:        C.int(query.TopK),
		field_name:  C.CString(query.FieldName),
	}
B
bigsheeper 已提交
217

Z
zhenshan.cao 已提交
218 219
	resultIds := make([]int64, query.TopK)
	resultDistances := make([]float32, query.TopK)
B
bigsheeper 已提交
220

221 222 223
	var cTimestamp = C.ulong(timestamp)
	var cResultIds = (*C.long)(&resultIds[0])
	var cResultDistances = (*C.float)(&resultDistances[0])
224 225
	var cQueryRawData *C.float
	var cQueryRawDataLength C.int
226

227 228 229 230 231 232 233 234 235
	if vectorRecord.BinaryData != nil {
		return nil, errors.New("Data of binary type is not supported yet")
	} else if len(vectorRecord.FloatData) <= 0 {
		return nil, errors.New("Null query vector data")
	} else {
		cQueryRawData = (*C.float)(&vectorRecord.FloatData[0])
		cQueryRawDataLength = (C.int)(len(vectorRecord.FloatData))
	}

Z
zhenshan.cao 已提交
236
	var status = C.Search(s.SegmentPtr, cQuery, cTimestamp, cQueryRawData, cQueryRawDataLength, cResultIds, cResultDistances)
237

238 239
	if status != 0 {
		return nil, errors.New("Search failed, error code = " + strconv.Itoa(int(status)))
B
bigsheeper 已提交
240 241
	}

B
bigsheeper 已提交
242 243
	fmt.Println("Search Result---- Ids =", resultIds, ", Distances =", resultDistances)

244
	return &SearchResult{ResultIds: resultIds, ResultDistances: resultDistances}, nil
245
}