payload.go 18.4 KB
Newer Older
G
godchen 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

Z
zhenshan.cao 已提交
12 13 14 15 16
package storage

/*
#cgo CFLAGS: -I${SRCDIR}/cwrapper

S
shaoyue 已提交
17
#cgo LDFLAGS: -L${SRCDIR}/cwrapper/output/lib -L${SRCDIR}/cwrapper/output/lib64 -lwrapper -lparquet -larrow -larrow_bundled_dependencies -lstdc++ -lm
Z
zhenshan.cao 已提交
18 19 20 21 22
#include <stdlib.h>
#include "ParquetWrapper.h"
*/
import "C"
import (
S
sunby 已提交
23
	"errors"
24
	"fmt"
25
	"reflect"
C
Cai Yudong 已提交
26
	"unsafe"
S
sunby 已提交
27

28
	"github.com/milvus-io/milvus/internal/log"
X
Xiangyu Wang 已提交
29 30
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/schemapb"
Z
zhenshan.cao 已提交
31 32
)

33
// PayloadWriterInterface abstracts PayloadWriter
S
sunby 已提交
34 35 36
type PayloadWriterInterface interface {
	AddDataToPayload(msgs interface{}, dim ...int) error
	AddBoolToPayload(msgs []bool) error
G
godchen 已提交
37
	AddByteToPayload(msgs []byte) error
S
sunby 已提交
38 39 40 41 42 43 44 45 46 47 48 49
	AddInt8ToPayload(msgs []int8) error
	AddInt16ToPayload(msgs []int16) error
	AddInt32ToPayload(msgs []int32) error
	AddInt64ToPayload(msgs []int64) error
	AddFloatToPayload(msgs []float32) error
	AddDoubleToPayload(msgs []float64) error
	AddOneStringToPayload(msgs string) error
	AddBinaryVectorToPayload(binVec []byte, dim int) error
	AddFloatVectorToPayload(binVec []float32, dim int) error
	FinishPayloadWriter() error
	GetPayloadBufferFromWriter() ([]byte, error)
	GetPayloadLengthFromWriter() (int, error)
50 51
	ReleasePayloadWriter()
	Close()
S
sunby 已提交
52 53
}

54
// PayloadReaderInterface abstracts PayloadReader
S
sunby 已提交
55 56 57
type PayloadReaderInterface interface {
	GetDataFromPayload(idx ...int) (interface{}, int, error)
	GetBoolFromPayload() ([]bool, error)
G
godchen 已提交
58
	GetByteFromPayload() ([]byte, error)
S
sunby 已提交
59 60 61 62 63 64 65 66 67 68
	GetInt8FromPayload() ([]int8, error)
	GetInt16FromPayload() ([]int16, error)
	GetInt32FromPayload() ([]int32, error)
	GetInt64FromPayload() ([]int64, error)
	GetFloatFromPayload() ([]float32, error)
	GetDoubleFromPayload() ([]float64, error)
	GetOneStringFromPayload(idx int) (string, error)
	GetBinaryVectorFromPayload() ([]byte, int, error)
	GetFloatVectorFromPayload() ([]float32, int, error)
	GetPayloadLengthFromReader() (int, error)
69 70
	ReleasePayloadReader()
	Close()
S
sunby 已提交
71
}
C
Cai Yudong 已提交
72

G
godchen 已提交
73 74 75 76
type PayloadWriter struct {
	payloadWriterPtr C.CPayloadWriter
	colType          schemapb.DataType
}
X
XuanYang-cn 已提交
77

G
godchen 已提交
78 79 80 81
type PayloadReader struct {
	payloadReaderPtr C.CPayloadReader
	colType          schemapb.DataType
}
Z
zhenshan.cao 已提交
82

83
// NewPayloadWriter is constructor of PayloadWriter
Z
zhenshan.cao 已提交
84 85 86 87 88
func NewPayloadWriter(colType schemapb.DataType) (*PayloadWriter, error) {
	w := C.NewPayloadWriter(C.int(colType))
	if w == nil {
		return nil, errors.New("create Payload writer failed")
	}
X
XuanYang-cn 已提交
89 90 91
	return &PayloadWriter{payloadWriterPtr: w, colType: colType}, nil
}

92
// AddDataToPayload adds @msgs into payload, if @msgs is vector, dimension should be specified by @dim
X
XuanYang-cn 已提交
93 94 95 96
func (w *PayloadWriter) AddDataToPayload(msgs interface{}, dim ...int) error {
	switch len(dim) {
	case 0:
		switch w.colType {
G
godchen 已提交
97
		case schemapb.DataType_Bool:
X
XuanYang-cn 已提交
98 99 100 101 102
			val, ok := msgs.([]bool)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddBoolToPayload(val)
G
godchen 已提交
103
		case schemapb.DataType_Int8:
X
XuanYang-cn 已提交
104 105 106 107 108
			val, ok := msgs.([]int8)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt8ToPayload(val)
G
godchen 已提交
109
		case schemapb.DataType_Int16:
X
XuanYang-cn 已提交
110 111 112 113 114
			val, ok := msgs.([]int16)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt16ToPayload(val)
G
godchen 已提交
115
		case schemapb.DataType_Int32:
X
XuanYang-cn 已提交
116 117 118 119 120
			val, ok := msgs.([]int32)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt32ToPayload(val)
G
godchen 已提交
121
		case schemapb.DataType_Int64:
X
XuanYang-cn 已提交
122 123 124 125 126
			val, ok := msgs.([]int64)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt64ToPayload(val)
G
godchen 已提交
127
		case schemapb.DataType_Float:
X
XuanYang-cn 已提交
128 129 130 131 132
			val, ok := msgs.([]float32)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddFloatToPayload(val)
G
godchen 已提交
133
		case schemapb.DataType_Double:
X
XuanYang-cn 已提交
134 135 136 137 138
			val, ok := msgs.([]float64)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddDoubleToPayload(val)
G
godchen 已提交
139
		case schemapb.DataType_String:
X
XuanYang-cn 已提交
140 141 142 143 144
			val, ok := msgs.(string)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddOneStringToPayload(val)
G
godchen 已提交
145 146
		default:
			return errors.New("incorrect datatype")
X
XuanYang-cn 已提交
147 148 149
		}
	case 1:
		switch w.colType {
G
godchen 已提交
150
		case schemapb.DataType_BinaryVector:
X
XuanYang-cn 已提交
151 152 153 154 155
			val, ok := msgs.([]byte)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddBinaryVectorToPayload(val, dim[0])
G
godchen 已提交
156
		case schemapb.DataType_FloatVector:
X
XuanYang-cn 已提交
157 158 159 160 161
			val, ok := msgs.([]float32)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddFloatVectorToPayload(val, dim[0])
G
godchen 已提交
162 163
		default:
			return errors.New("incorrect datatype")
X
XuanYang-cn 已提交
164 165 166 167 168 169
		}
	default:
		return errors.New("incorrect input numbers")
	}
}

170
// AddBoolToPayload adds @msgs into payload
X
XuanYang-cn 已提交
171 172 173
func (w *PayloadWriter) AddBoolToPayload(msgs []bool) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
174
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
175 176 177 178 179 180
	}

	cMsgs := (*C.bool)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddBooleanToPayload(w.payloadWriterPtr, cMsgs, cLength)
181
	return HandleCStatus(&status, "AddBoolToPayload failed")
X
XuanYang-cn 已提交
182 183
}

G
godchen 已提交
184 185 186 187 188 189 190 191 192 193 194 195
func (w *PayloadWriter) AddByteToPayload(msgs []byte) error {
	length := len(msgs)
	if length <= 0 {
		return errors.New("can't add empty msgs into payload")
	}
	cMsgs := (*C.int8_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt8ToPayload(w.payloadWriterPtr, cMsgs, cLength)
	return HandleCStatus(&status, "AddInt8ToPayload failed")
}

X
XuanYang-cn 已提交
196 197 198
func (w *PayloadWriter) AddInt8ToPayload(msgs []int8) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
199
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
200 201 202 203 204
	}
	cMsgs := (*C.int8_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt8ToPayload(w.payloadWriterPtr, cMsgs, cLength)
205
	return HandleCStatus(&status, "AddInt8ToPayload failed")
X
XuanYang-cn 已提交
206 207 208 209 210
}

func (w *PayloadWriter) AddInt16ToPayload(msgs []int16) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
211
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
212 213 214 215 216 217
	}

	cMsgs := (*C.int16_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt16ToPayload(w.payloadWriterPtr, cMsgs, cLength)
218
	return HandleCStatus(&status, "AddInt16ToPayload failed")
X
XuanYang-cn 已提交
219 220 221 222 223
}

func (w *PayloadWriter) AddInt32ToPayload(msgs []int32) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
224
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
225 226 227 228 229 230
	}

	cMsgs := (*C.int32_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt32ToPayload(w.payloadWriterPtr, cMsgs, cLength)
231
	return HandleCStatus(&status, "AddInt32ToPayload failed")
X
XuanYang-cn 已提交
232 233 234 235 236
}

func (w *PayloadWriter) AddInt64ToPayload(msgs []int64) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
237
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
238 239 240 241 242 243
	}

	cMsgs := (*C.int64_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt64ToPayload(w.payloadWriterPtr, cMsgs, cLength)
244
	return HandleCStatus(&status, "AddInt64ToPayload failed")
X
XuanYang-cn 已提交
245 246 247 248 249
}

func (w *PayloadWriter) AddFloatToPayload(msgs []float32) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
250
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
251 252 253 254 255 256
	}

	cMsgs := (*C.float)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddFloatToPayload(w.payloadWriterPtr, cMsgs, cLength)
257
	return HandleCStatus(&status, "AddFloatToPayload failed")
X
XuanYang-cn 已提交
258 259 260 261 262
}

func (w *PayloadWriter) AddDoubleToPayload(msgs []float64) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
263
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
264 265 266 267 268 269
	}

	cMsgs := (*C.double)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddDoubleToPayload(w.payloadWriterPtr, cMsgs, cLength)
270
	return HandleCStatus(&status, "AddDoubleToPayload failed")
Z
zhenshan.cao 已提交
271 272 273
}

func (w *PayloadWriter) AddOneStringToPayload(msg string) error {
X
XuanYang-cn 已提交
274 275
	length := len(msg)
	if length == 0 {
Z
zhenshan.cao 已提交
276 277
		return errors.New("can't add empty string into payload")
	}
X
XuanYang-cn 已提交
278 279 280 281 282

	cmsg := C.CString(msg)
	clength := C.int(length)
	defer C.free(unsafe.Pointer(cmsg))

283 284
	status := C.AddOneStringToPayload(w.payloadWriterPtr, cmsg, clength)
	return HandleCStatus(&status, "AddOneStringToPayload failed")
X
XuanYang-cn 已提交
285 286
}

287
// AddBinaryVectorToPayload dimension > 0 && (%8 == 0)
X
XuanYang-cn 已提交
288 289 290 291 292 293 294 295 296 297 298 299 300
func (w *PayloadWriter) AddBinaryVectorToPayload(binVec []byte, dim int) error {
	length := len(binVec)
	if length <= 0 {
		return errors.New("can't add empty binVec into payload")
	}
	if dim <= 0 {
		return errors.New("dimension should be greater than 0")
	}

	cBinVec := (*C.uint8_t)(&binVec[0])
	cDim := C.int(dim)
	cLength := C.int(length / (dim / 8))

301 302
	status := C.AddBinaryVectorToPayload(w.payloadWriterPtr, cBinVec, cDim, cLength)
	return HandleCStatus(&status, "AddBinaryVectorToPayload failed")
X
XuanYang-cn 已提交
303 304
}

J
jaime 已提交
305
// AddFloatVectorToPayload dimension > 0 && (%8 == 0)
X
XuanYang-cn 已提交
306 307 308 309 310 311 312 313 314
func (w *PayloadWriter) AddFloatVectorToPayload(floatVec []float32, dim int) error {
	length := len(floatVec)
	if length <= 0 {
		return errors.New("can't add empty floatVec into payload")
	}
	if dim <= 0 {
		return errors.New("dimension should be greater than 0")
	}

C
Cai Yudong 已提交
315
	cVec := (*C.float)(&floatVec[0])
X
XuanYang-cn 已提交
316 317 318
	cDim := C.int(dim)
	cLength := C.int(length / dim)

319 320
	status := C.AddFloatVectorToPayload(w.payloadWriterPtr, cVec, cDim, cLength)
	return HandleCStatus(&status, "AddFloatVectorToPayload failed")
Z
zhenshan.cao 已提交
321 322 323
}

func (w *PayloadWriter) FinishPayloadWriter() error {
324 325
	status := C.FinishPayloadWriter(w.payloadWriterPtr)
	return HandleCStatus(&status, "FinishPayloadWriter failed")
Z
zhenshan.cao 已提交
326 327 328 329
}

func (w *PayloadWriter) GetPayloadBufferFromWriter() ([]byte, error) {
	cb := C.GetPayloadBufferFromWriter(w.payloadWriterPtr)
330
	pointer := uintptr(unsafe.Pointer(cb.data))
Z
zhenshan.cao 已提交
331 332 333 334
	length := int(cb.length)
	if length <= 0 {
		return nil, errors.New("empty buffer")
	}
335 336 337 338 339 340 341 342

	var data []byte
	sh := (*reflect.SliceHeader)(unsafe.Pointer(&data))
	sh.Data = pointer
	sh.Len = length
	sh.Cap = length

	return data, nil
Z
zhenshan.cao 已提交
343 344 345 346 347 348 349
}

func (w *PayloadWriter) GetPayloadLengthFromWriter() (int, error) {
	length := C.GetPayloadLengthFromWriter(w.payloadWriterPtr)
	return int(length), nil
}

350 351
func (w *PayloadWriter) ReleasePayloadWriter() {
	C.ReleasePayloadWriter(w.payloadWriterPtr)
Z
zhenshan.cao 已提交
352 353
}

354 355
func (w *PayloadWriter) Close() {
	w.ReleasePayloadWriter()
Z
zhenshan.cao 已提交
356 357 358 359 360 361
}

func NewPayloadReader(colType schemapb.DataType, buf []byte) (*PayloadReader, error) {
	if len(buf) == 0 {
		return nil, errors.New("create Payload reader failed, buffer is empty")
	}
N
neza2017 已提交
362 363 364 365
	r := C.NewPayloadReader(C.int(colType), (*C.uint8_t)(unsafe.Pointer(&buf[0])), C.long(len(buf)))
	if r == nil {
		return nil, errors.New("failed to read parquet from buffer")
	}
X
XuanYang-cn 已提交
366 367 368
	return &PayloadReader{payloadReaderPtr: r, colType: colType}, nil
}

369
// GetDataFromPayload returns data,length from payload, returns err if failed
X
XuanYang-cn 已提交
370 371 372 373 374 375 376 377 378 379
// Params:
//      `idx`: String index
// Return:
//      `interface{}`: all types.
//      `int`: length, only meaningful to FLOAT/BINARY VECTOR type.
//      `error`: error.
func (r *PayloadReader) GetDataFromPayload(idx ...int) (interface{}, int, error) {
	switch len(idx) {
	case 1:
		switch r.colType {
G
godchen 已提交
380
		case schemapb.DataType_String:
X
XuanYang-cn 已提交
381 382
			val, err := r.GetOneStringFromPayload(idx[0])
			return val, 0, err
G
godchen 已提交
383
		default:
C
Cai Yudong 已提交
384
			return nil, 0, errors.New("unknown type")
X
XuanYang-cn 已提交
385 386 387
		}
	case 0:
		switch r.colType {
G
godchen 已提交
388
		case schemapb.DataType_Bool:
X
XuanYang-cn 已提交
389 390
			val, err := r.GetBoolFromPayload()
			return val, 0, err
G
godchen 已提交
391
		case schemapb.DataType_Int8:
X
XuanYang-cn 已提交
392 393
			val, err := r.GetInt8FromPayload()
			return val, 0, err
G
godchen 已提交
394
		case schemapb.DataType_Int16:
X
XuanYang-cn 已提交
395 396
			val, err := r.GetInt16FromPayload()
			return val, 0, err
G
godchen 已提交
397
		case schemapb.DataType_Int32:
X
XuanYang-cn 已提交
398 399
			val, err := r.GetInt32FromPayload()
			return val, 0, err
G
godchen 已提交
400
		case schemapb.DataType_Int64:
X
XuanYang-cn 已提交
401 402
			val, err := r.GetInt64FromPayload()
			return val, 0, err
G
godchen 已提交
403
		case schemapb.DataType_Float:
X
XuanYang-cn 已提交
404 405
			val, err := r.GetFloatFromPayload()
			return val, 0, err
G
godchen 已提交
406
		case schemapb.DataType_Double:
X
XuanYang-cn 已提交
407 408
			val, err := r.GetDoubleFromPayload()
			return val, 0, err
G
godchen 已提交
409
		case schemapb.DataType_BinaryVector:
X
XuanYang-cn 已提交
410
			return r.GetBinaryVectorFromPayload()
G
godchen 已提交
411
		case schemapb.DataType_FloatVector:
X
XuanYang-cn 已提交
412 413
			return r.GetFloatVectorFromPayload()
		default:
C
Cai Yudong 已提交
414
			return nil, 0, errors.New("unknown type")
X
XuanYang-cn 已提交
415 416 417 418
		}
	default:
		return nil, 0, errors.New("incorrect number of index")
	}
Z
zhenshan.cao 已提交
419 420
}

421
// ReleasePayloadReader release payload reader.
422 423
func (r *PayloadReader) ReleasePayloadReader() {
	C.ReleasePayloadReader(r.payloadReaderPtr)
Z
zhenshan.cao 已提交
424 425
}

426
// GetBoolFromPayload returns bool slice from payload.
X
XuanYang-cn 已提交
427
func (r *PayloadReader) GetBoolFromPayload() ([]bool, error) {
G
godchen 已提交
428
	if r.colType != schemapb.DataType_Bool {
N
neza2017 已提交
429 430 431
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
432 433 434
	var cMsg *C.bool
	var cSize C.int

435 436 437
	status := C.GetBoolFromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetBoolFromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
438 439 440 441 442 443
	}

	slice := (*[1 << 28]bool)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

G
godchen 已提交
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
// GetByteFromPayload returns byte slice from payload
func (r *PayloadReader) GetByteFromPayload() ([]byte, error) {
	if r.colType != schemapb.DataType_Int8 {
		return nil, errors.New("incorrect data type")
	}

	var cMsg *C.int8_t
	var cSize C.int

	status := C.GetInt8FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt8FromPayload failed"); err != nil {
		return nil, err
	}

	slice := (*[1 << 28]byte)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

462
// GetInt8FromPayload returns int8 slice from payload
X
XuanYang-cn 已提交
463
func (r *PayloadReader) GetInt8FromPayload() ([]int8, error) {
G
godchen 已提交
464
	if r.colType != schemapb.DataType_Int8 {
N
neza2017 已提交
465 466 467
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
468 469 470
	var cMsg *C.int8_t
	var cSize C.int

471 472 473
	status := C.GetInt8FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt8FromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
474 475 476 477 478 479 480
	}

	slice := (*[1 << 28]int8)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetInt16FromPayload() ([]int16, error) {
G
godchen 已提交
481
	if r.colType != schemapb.DataType_Int16 {
N
neza2017 已提交
482 483 484
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
485 486 487
	var cMsg *C.int16_t
	var cSize C.int

488 489 490
	status := C.GetInt16FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt16FromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
491 492 493 494 495 496 497
	}

	slice := (*[1 << 28]int16)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetInt32FromPayload() ([]int32, error) {
G
godchen 已提交
498
	if r.colType != schemapb.DataType_Int32 {
N
neza2017 已提交
499 500 501
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
502 503 504
	var cMsg *C.int32_t
	var cSize C.int

505 506 507
	status := C.GetInt32FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt32FromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
508 509 510 511 512 513 514
	}

	slice := (*[1 << 28]int32)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetInt64FromPayload() ([]int64, error) {
G
godchen 已提交
515
	if r.colType != schemapb.DataType_Int64 {
N
neza2017 已提交
516 517 518
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
519 520 521
	var cMsg *C.int64_t
	var cSize C.int

522 523 524
	status := C.GetInt64FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt64FromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
525 526 527 528 529 530 531
	}

	slice := (*[1 << 28]int64)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetFloatFromPayload() ([]float32, error) {
G
godchen 已提交
532
	if r.colType != schemapb.DataType_Float {
N
neza2017 已提交
533 534 535
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
536 537 538
	var cMsg *C.float
	var cSize C.int

539 540 541
	status := C.GetFloatFromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetFloatFromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
542 543 544 545 546 547 548
	}

	slice := (*[1 << 28]float32)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetDoubleFromPayload() ([]float64, error) {
G
godchen 已提交
549
	if r.colType != schemapb.DataType_Double {
N
neza2017 已提交
550 551 552
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
553 554 555
	var cMsg *C.double
	var cSize C.int

556 557 558
	status := C.GetDoubleFromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetDoubleFromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
559 560 561 562 563 564
	}

	slice := (*[1 << 28]float64)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

Z
zhenshan.cao 已提交
565
func (r *PayloadReader) GetOneStringFromPayload(idx int) (string, error) {
G
godchen 已提交
566
	if r.colType != schemapb.DataType_String {
N
neza2017 已提交
567 568 569
		return "", errors.New("incorrect data type")
	}

Z
zhenshan.cao 已提交
570
	var cStr *C.char
X
XuanYang-cn 已提交
571 572
	var cSize C.int

573 574 575
	status := C.GetOneStringFromPayload(r.payloadReaderPtr, C.int(idx), &cStr, &cSize)
	if err := HandleCStatus(&status, "GetOneStringFromPayload failed"); err != nil {
		return "", err
Z
zhenshan.cao 已提交
576
	}
X
XuanYang-cn 已提交
577 578 579
	return C.GoStringN(cStr, cSize), nil
}

580
// GetBinaryVectorFromPayload returns vector, dimension, error
X
XuanYang-cn 已提交
581
func (r *PayloadReader) GetBinaryVectorFromPayload() ([]byte, int, error) {
G
godchen 已提交
582
	if r.colType != schemapb.DataType_BinaryVector {
N
neza2017 已提交
583 584 585
		return nil, 0, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
586 587 588 589
	var cMsg *C.uint8_t
	var cDim C.int
	var cLen C.int

590 591 592
	status := C.GetBinaryVectorFromPayload(r.payloadReaderPtr, &cMsg, &cDim, &cLen)
	if err := HandleCStatus(&status, "GetBinaryVectorFromPayload failed"); err != nil {
		return nil, 0, err
X
XuanYang-cn 已提交
593 594 595 596 597 598 599
	}
	length := (cDim / 8) * cLen

	slice := (*[1 << 28]byte)(unsafe.Pointer(cMsg))[:length:length]
	return slice, int(cDim), nil
}

600
// GetFloatVectorFromPayload returns vector, dimension, error
X
XuanYang-cn 已提交
601
func (r *PayloadReader) GetFloatVectorFromPayload() ([]float32, int, error) {
G
godchen 已提交
602
	if r.colType != schemapb.DataType_FloatVector {
N
neza2017 已提交
603 604 605
		return nil, 0, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
606 607 608 609
	var cMsg *C.float
	var cDim C.int
	var cLen C.int

610 611 612
	status := C.GetFloatVectorFromPayload(r.payloadReaderPtr, &cMsg, &cDim, &cLen)
	if err := HandleCStatus(&status, "GetFloatVectorFromPayload failed"); err != nil {
		return nil, 0, err
X
XuanYang-cn 已提交
613 614 615 616 617
	}
	length := cDim * cLen

	slice := (*[1 << 28]float32)(unsafe.Pointer(cMsg))[:length:length]
	return slice, int(cDim), nil
Z
zhenshan.cao 已提交
618 619 620 621 622 623 624
}

func (r *PayloadReader) GetPayloadLengthFromReader() (int, error) {
	length := C.GetPayloadLengthFromReader(r.payloadReaderPtr)
	return int(length), nil
}

625
// Close closes the payload reader
626 627
func (r *PayloadReader) Close() {
	r.ReleasePayloadReader()
Z
zhenshan.cao 已提交
628
}
629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647

// HandleCStatus deal with the error returned from CGO
func HandleCStatus(status *C.CStatus, extraInfo string) error {
	if status.error_code == 0 {
		return nil
	}
	errorCode := status.error_code
	errorName, ok := commonpb.ErrorCode_name[int32(errorCode)]
	if !ok {
		errorName = "UnknownError"
	}
	errorMsg := C.GoString(status.error_msg)
	defer C.free(unsafe.Pointer(status.error_msg))

	finalMsg := fmt.Sprintf("[%s] %s", errorName, errorMsg)
	logMsg := fmt.Sprintf("%s, C Runtime Exception: %s\n", extraInfo, finalMsg)
	log.Warn(logMsg)
	return errors.New(finalMsg)
}