segment.go 5.7 KB
Newer Older
B
bigsheeper 已提交
1 2
package reader

B
bigsheeper 已提交
3 4
/*

F
FluorineDog 已提交
5
#cgo CFLAGS: -I${SRCDIR}/../../core/include
B
bigsheeper 已提交
6

F
FluorineDog 已提交
7
#cgo LDFLAGS: -L${SRCDIR}/../../core/lib -lmilvus_dog_segment -Wl,-rpath=${SRCDIR}/../../core/lib
B
bigsheeper 已提交
8

9 10
#include "collection_c.h"
#include "partition_c.h"
B
bigsheeper 已提交
11 12 13
#include "segment_c.h"

*/
B
bigsheeper 已提交
14
import "C"
15
import (
B
bigsheeper 已提交
16
	"fmt"
17
	"github.com/czs007/suvlim/errors"
Z
zhenshan.cao 已提交
18
	msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
B
bigsheeper 已提交
19
	"strconv"
B
bigsheeper 已提交
20
	"unsafe"
21 22
)

B
bigsheeper 已提交
23
const SegmentLifetime = 20000
B
bigsheeper 已提交
24

B
bigsheeper 已提交
25
const (
B
bigsheeper 已提交
26 27 28 29
	SegmentOpened  = 0
	SegmentClosed  = 1
	SegmentIndexing = 2
	SegmentIndexed = 3
B
bigsheeper 已提交
30 31
)

B
bigsheeper 已提交
32
type Segment struct {
33 34
	SegmentPtr       C.CSegmentBase
	SegmentId        int64
B
bigsheeper 已提交
35
	SegmentCloseTime uint64
36
	LastMemSize      uint64
B
bigsheeper 已提交
37
	SegmentStatus    int
B
bigsheeper 已提交
38 39
}

B
bigsheeper 已提交
40
func (s *Segment) GetStatus() int {
B
bigsheeper 已提交
41
	/*
B
bigsheeper 已提交
42 43 44 45 46 47 48 49 50
	bool
	IsOpened(CSegmentBase c_segment);
	*/
	var isOpened = C.IsOpened(s.SegmentPtr)
	if isOpened {
		return SegmentOpened
	} else {
		return SegmentClosed
	}
B
bigsheeper 已提交
51 52
}

B
bigsheeper 已提交
53
func (s *Segment) GetRowCount() int64 {
B
bigsheeper 已提交
54
	/*
B
bigsheeper 已提交
55 56 57 58 59
	long int
	GetRowCount(CSegmentBase c_segment);
	*/
	var rowCount = C.GetRowCount(s.SegmentPtr)
	return int64(rowCount)
B
bigsheeper 已提交
60 61
}

B
bigsheeper 已提交
62
func (s *Segment) GetDeletedCount() int64 {
B
bigsheeper 已提交
63
	/*
B
bigsheeper 已提交
64 65 66 67 68 69 70
	long int
	GetDeletedCount(CSegmentBase c_segment);
	*/
	var deletedCount = C.GetDeletedCount(s.SegmentPtr)
	return int64(deletedCount)
}

B
bigsheeper 已提交
71 72
func (s *Segment) CloseSegment(collection* Collection) error {
	/*
B
bigsheeper 已提交
73 74 75 76
	int
	Close(CSegmentBase c_segment);
	*/
	var status = C.Close(s.SegmentPtr)
B
bigsheeper 已提交
77 78
	s.SegmentStatus = SegmentClosed

B
bigsheeper 已提交
79 80 81
	if status != 0 {
		return errors.New("Close segment failed, error code = " + strconv.Itoa(int(status)))
	}
Z
zhenshan.cao 已提交
82 83

	// Build index after closing segment
B
bigsheeper 已提交
84 85 86 87 88 89 90
	s.SegmentStatus = SegmentIndexing
	s.buildIndex(collection)

	// TODO: remove redundant segment indexed status
	// Change segment status to indexed
	s.SegmentStatus = SegmentIndexed

B
bigsheeper 已提交
91
	return nil
B
bigsheeper 已提交
92
}
93

94
func (s *Segment) GetMemSize() uint64 {
B
bigsheeper 已提交
95
	/*
96 97 98 99 100 101
	long int
	GetMemoryUsageInBytes(CSegmentBase c_segment);
	*/
	var memoryUsageInBytes = C.GetMemoryUsageInBytes(s.SegmentPtr)

	return uint64(memoryUsageInBytes)
102 103
}

104
////////////////////////////////////////////////////////////////////////////
105
func (s *Segment) SegmentPreInsert(numOfRecords int) int64 {
B
bigsheeper 已提交
106
	/*
107 108 109
	long int
	PreInsert(CSegmentBase c_segment, long int size);
	*/
Z
zhenshan.cao 已提交
110
	var offset = C.PreInsert(s.SegmentPtr, C.long(int64(numOfRecords)))
111

Z
zhenshan.cao 已提交
112
	return int64(offset)
113 114 115
}

func (s *Segment) SegmentPreDelete(numOfRecords int) int64 {
B
bigsheeper 已提交
116
	/*
117 118 119
	long int
	PreDelete(CSegmentBase c_segment, long int size);
	*/
Z
zhenshan.cao 已提交
120
	var offset = C.PreDelete(s.SegmentPtr, C.long(int64(numOfRecords)))
121

Z
zhenshan.cao 已提交
122
	return int64(offset)
123 124 125
}

func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[]uint64, records *[][]byte) error {
B
bigsheeper 已提交
126
	/*
B
bigsheeper 已提交
127 128
	int
	Insert(CSegmentBase c_segment,
129
	           long int reserved_offset,
B
bigsheeper 已提交
130
	           signed long int size,
131
	           const long* primary_keys,
B
bigsheeper 已提交
132 133 134
	           const unsigned long* timestamps,
	           void* raw_data,
	           int sizeof_per_row,
135
	           signed long int count);
B
bigsheeper 已提交
136
	*/
137
	// Blobs to one big blob
B
bigsheeper 已提交
138 139 140
	var numOfRow = len(*entityIDs)
	var sizeofPerRow = len((*records)[0])

141
	var rawData = make([]byte, numOfRow*sizeofPerRow)
142
	for i := 0; i < len(*records); i++ {
143
		copy(rawData, (*records)[i])
144 145
	}

146
	var cOffset = C.long(offset)
B
bigsheeper 已提交
147
	var cNumOfRows = C.long(numOfRow)
Z
zhenshan.cao 已提交
148
	var cEntityIdsPtr = (*C.long)(&(*entityIDs)[0])
149
	var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0])
B
bigsheeper 已提交
150
	var cSizeofPerRow = C.int(sizeofPerRow)
151 152 153
	var cRawDataVoidPtr = unsafe.Pointer(&rawData[0])

	var status = C.Insert(s.SegmentPtr,
154 155 156 157
		cOffset,
		cNumOfRows,
		cEntityIdsPtr,
		cTimestampsPtr,
158
		cRawDataVoidPtr,
159 160
		cSizeofPerRow,
		cNumOfRows)
161 162

	if status != 0 {
163
		return errors.New("Insert failed, error code = " + strconv.Itoa(int(status)))
164 165
	}

166
	return nil
167 168
}

169
func (s *Segment) SegmentDelete(offset int64, entityIDs *[]int64, timestamps *[]uint64) error {
B
bigsheeper 已提交
170
	/*
B
bigsheeper 已提交
171 172
	int
	Delete(CSegmentBase c_segment,
173
	           long int reserved_offset,
B
bigsheeper 已提交
174
	           long size,
175 176
	           const long* primary_keys,
	           const unsigned long* timestamps);
B
bigsheeper 已提交
177
	*/
178 179
	var cOffset = C.long(offset)
	var cSize = C.long(len(*entityIDs))
Z
zhenshan.cao 已提交
180
	var cEntityIdsPtr = (*C.long)(&(*entityIDs)[0])
181
	var cTimestampsPtr = (*C.ulong)(&(*timestamps)[0])
B
bigsheeper 已提交
182

183
	var status = C.Delete(s.SegmentPtr, cOffset, cSize, cEntityIdsPtr, cTimestampsPtr)
184 185

	if status != 0 {
186
		return errors.New("Delete failed, error code = " + strconv.Itoa(int(status)))
187
	}
B
bigsheeper 已提交
188

189
	return nil
190 191
}

B
bigsheeper 已提交
192
func (s *Segment) SegmentSearch(query *QueryInfo, timestamp uint64, vectorRecord *msgPb.VectorRowRecord) (*SearchResult, error) {
B
bigsheeper 已提交
193
	/*
B
bigsheeper 已提交
194 195
	int
	Search(CSegmentBase c_segment,
Z
zhenshan.cao 已提交
196 197 198 199 200 201
	       CQueryInfo  c_query_info,
	       unsigned long timestamp,
	       float* query_raw_data,
	       int num_of_query_raw_data,
	       long int* result_ids,
	       float* result_distances);
B
bigsheeper 已提交
202
	*/
B
bigsheeper 已提交
203
	//type CQueryInfo C.CQueryInfo
Z
zhenshan.cao 已提交
204 205 206 207 208 209

	cQuery := C.CQueryInfo{
		num_queries: C.long(query.NumQueries),
		topK:        C.int(query.TopK),
		field_name:  C.CString(query.FieldName),
	}
B
bigsheeper 已提交
210

Z
zhenshan.cao 已提交
211 212
	resultIds := make([]int64, query.TopK)
	resultDistances := make([]float32, query.TopK)
B
bigsheeper 已提交
213

214 215 216
	var cTimestamp = C.ulong(timestamp)
	var cResultIds = (*C.long)(&resultIds[0])
	var cResultDistances = (*C.float)(&resultDistances[0])
217 218
	var cQueryRawData *C.float
	var cQueryRawDataLength C.int
219

220 221 222 223 224 225 226 227 228
	if vectorRecord.BinaryData != nil {
		return nil, errors.New("Data of binary type is not supported yet")
	} else if len(vectorRecord.FloatData) <= 0 {
		return nil, errors.New("Null query vector data")
	} else {
		cQueryRawData = (*C.float)(&vectorRecord.FloatData[0])
		cQueryRawDataLength = (C.int)(len(vectorRecord.FloatData))
	}

Z
zhenshan.cao 已提交
229
	var status = C.Search(s.SegmentPtr, cQuery, cTimestamp, cQueryRawData, cQueryRawDataLength, cResultIds, cResultDistances)
230

231 232
	if status != 0 {
		return nil, errors.New("Search failed, error code = " + strconv.Itoa(int(status)))
B
bigsheeper 已提交
233 234
	}

B
bigsheeper 已提交
235 236
	fmt.Println("Search Result---- Ids =", resultIds, ", Distances =", resultDistances)

237
	return &SearchResult{ResultIds: resultIds, ResultDistances: resultDistances}, nil
238
}