payload.go 18.2 KB
Newer Older
G
godchen 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

Z
zhenshan.cao 已提交
12 13 14 15 16
package storage

/*
#cgo CFLAGS: -I${SRCDIR}/cwrapper

17
#cgo LDFLAGS: -L${SRCDIR}/cwrapper/output/lib -lwrapper -lparquet -larrow -larrow_bundled_dependencies -lstdc++ -lm
Z
zhenshan.cao 已提交
18 19 20 21 22
#include <stdlib.h>
#include "ParquetWrapper.h"
*/
import "C"
import (
S
sunby 已提交
23
	"errors"
24
	"fmt"
25
	"reflect"
C
Cai Yudong 已提交
26
	"unsafe"
S
sunby 已提交
27

28
	"github.com/milvus-io/milvus/internal/log"
X
Xiangyu Wang 已提交
29 30
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/schemapb"
Z
zhenshan.cao 已提交
31 32
)

33
// PayloadWriterInterface abstracts PayloadWriter
S
sunby 已提交
34 35 36
type PayloadWriterInterface interface {
	AddDataToPayload(msgs interface{}, dim ...int) error
	AddBoolToPayload(msgs []bool) error
G
godchen 已提交
37
	AddByteToPayload(msgs []byte) error
S
sunby 已提交
38 39 40 41 42 43 44 45 46 47 48 49
	AddInt8ToPayload(msgs []int8) error
	AddInt16ToPayload(msgs []int16) error
	AddInt32ToPayload(msgs []int32) error
	AddInt64ToPayload(msgs []int64) error
	AddFloatToPayload(msgs []float32) error
	AddDoubleToPayload(msgs []float64) error
	AddOneStringToPayload(msgs string) error
	AddBinaryVectorToPayload(binVec []byte, dim int) error
	AddFloatVectorToPayload(binVec []float32, dim int) error
	FinishPayloadWriter() error
	GetPayloadBufferFromWriter() ([]byte, error)
	GetPayloadLengthFromWriter() (int, error)
50 51
	ReleasePayloadWriter()
	Close()
S
sunby 已提交
52 53
}

54
// PayloadReaderInterface abstracts PayloadReader
S
sunby 已提交
55 56 57
type PayloadReaderInterface interface {
	GetDataFromPayload(idx ...int) (interface{}, int, error)
	GetBoolFromPayload() ([]bool, error)
G
godchen 已提交
58
	GetByteFromPayload() ([]byte, error)
S
sunby 已提交
59 60 61 62 63 64 65 66 67 68
	GetInt8FromPayload() ([]int8, error)
	GetInt16FromPayload() ([]int16, error)
	GetInt32FromPayload() ([]int32, error)
	GetInt64FromPayload() ([]int64, error)
	GetFloatFromPayload() ([]float32, error)
	GetDoubleFromPayload() ([]float64, error)
	GetOneStringFromPayload(idx int) (string, error)
	GetBinaryVectorFromPayload() ([]byte, int, error)
	GetFloatVectorFromPayload() ([]float32, int, error)
	GetPayloadLengthFromReader() (int, error)
69 70
	ReleasePayloadReader()
	Close()
S
sunby 已提交
71
}
C
Cai Yudong 已提交
72

G
godchen 已提交
73 74 75 76
type PayloadWriter struct {
	payloadWriterPtr C.CPayloadWriter
	colType          schemapb.DataType
}
X
XuanYang-cn 已提交
77

G
godchen 已提交
78 79 80 81
type PayloadReader struct {
	payloadReaderPtr C.CPayloadReader
	colType          schemapb.DataType
}
Z
zhenshan.cao 已提交
82

83
// NewPayloadWriter is constructor of PayloadWriter
Z
zhenshan.cao 已提交
84 85 86 87 88
func NewPayloadWriter(colType schemapb.DataType) (*PayloadWriter, error) {
	w := C.NewPayloadWriter(C.int(colType))
	if w == nil {
		return nil, errors.New("create Payload writer failed")
	}
X
XuanYang-cn 已提交
89 90 91 92 93 94 95
	return &PayloadWriter{payloadWriterPtr: w, colType: colType}, nil
}

func (w *PayloadWriter) AddDataToPayload(msgs interface{}, dim ...int) error {
	switch len(dim) {
	case 0:
		switch w.colType {
G
godchen 已提交
96
		case schemapb.DataType_Bool:
X
XuanYang-cn 已提交
97 98 99 100 101
			val, ok := msgs.([]bool)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddBoolToPayload(val)
G
godchen 已提交
102
		case schemapb.DataType_Int8:
X
XuanYang-cn 已提交
103 104 105 106 107
			val, ok := msgs.([]int8)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt8ToPayload(val)
G
godchen 已提交
108
		case schemapb.DataType_Int16:
X
XuanYang-cn 已提交
109 110 111 112 113
			val, ok := msgs.([]int16)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt16ToPayload(val)
G
godchen 已提交
114
		case schemapb.DataType_Int32:
X
XuanYang-cn 已提交
115 116 117 118 119
			val, ok := msgs.([]int32)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt32ToPayload(val)
G
godchen 已提交
120
		case schemapb.DataType_Int64:
X
XuanYang-cn 已提交
121 122 123 124 125
			val, ok := msgs.([]int64)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt64ToPayload(val)
G
godchen 已提交
126
		case schemapb.DataType_Float:
X
XuanYang-cn 已提交
127 128 129 130 131
			val, ok := msgs.([]float32)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddFloatToPayload(val)
G
godchen 已提交
132
		case schemapb.DataType_Double:
X
XuanYang-cn 已提交
133 134 135 136 137
			val, ok := msgs.([]float64)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddDoubleToPayload(val)
G
godchen 已提交
138
		case schemapb.DataType_String:
X
XuanYang-cn 已提交
139 140 141 142 143
			val, ok := msgs.(string)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddOneStringToPayload(val)
G
godchen 已提交
144 145
		default:
			return errors.New("incorrect datatype")
X
XuanYang-cn 已提交
146 147 148
		}
	case 1:
		switch w.colType {
G
godchen 已提交
149
		case schemapb.DataType_BinaryVector:
X
XuanYang-cn 已提交
150 151 152 153 154
			val, ok := msgs.([]byte)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddBinaryVectorToPayload(val, dim[0])
G
godchen 已提交
155
		case schemapb.DataType_FloatVector:
X
XuanYang-cn 已提交
156 157 158 159 160
			val, ok := msgs.([]float32)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddFloatVectorToPayload(val, dim[0])
G
godchen 已提交
161 162
		default:
			return errors.New("incorrect datatype")
X
XuanYang-cn 已提交
163 164 165 166 167 168 169 170 171
		}
	default:
		return errors.New("incorrect input numbers")
	}
}

func (w *PayloadWriter) AddBoolToPayload(msgs []bool) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
172
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
173 174 175 176 177 178
	}

	cMsgs := (*C.bool)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddBooleanToPayload(w.payloadWriterPtr, cMsgs, cLength)
179
	return HandleCStatus(&status, "AddBoolToPayload failed")
X
XuanYang-cn 已提交
180 181
}

G
godchen 已提交
182 183 184 185 186 187 188 189 190 191 192 193
func (w *PayloadWriter) AddByteToPayload(msgs []byte) error {
	length := len(msgs)
	if length <= 0 {
		return errors.New("can't add empty msgs into payload")
	}
	cMsgs := (*C.int8_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt8ToPayload(w.payloadWriterPtr, cMsgs, cLength)
	return HandleCStatus(&status, "AddInt8ToPayload failed")
}

X
XuanYang-cn 已提交
194 195 196
func (w *PayloadWriter) AddInt8ToPayload(msgs []int8) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
197
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
198 199 200 201 202
	}
	cMsgs := (*C.int8_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt8ToPayload(w.payloadWriterPtr, cMsgs, cLength)
203
	return HandleCStatus(&status, "AddInt8ToPayload failed")
X
XuanYang-cn 已提交
204 205 206 207 208
}

func (w *PayloadWriter) AddInt16ToPayload(msgs []int16) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
209
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
210 211 212 213 214 215
	}

	cMsgs := (*C.int16_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt16ToPayload(w.payloadWriterPtr, cMsgs, cLength)
216
	return HandleCStatus(&status, "AddInt16ToPayload failed")
X
XuanYang-cn 已提交
217 218 219 220 221
}

func (w *PayloadWriter) AddInt32ToPayload(msgs []int32) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
222
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
223 224 225 226 227 228
	}

	cMsgs := (*C.int32_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt32ToPayload(w.payloadWriterPtr, cMsgs, cLength)
229
	return HandleCStatus(&status, "AddInt32ToPayload failed")
X
XuanYang-cn 已提交
230 231 232 233 234
}

func (w *PayloadWriter) AddInt64ToPayload(msgs []int64) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
235
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
236 237 238 239 240 241
	}

	cMsgs := (*C.int64_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt64ToPayload(w.payloadWriterPtr, cMsgs, cLength)
242
	return HandleCStatus(&status, "AddInt64ToPayload failed")
X
XuanYang-cn 已提交
243 244 245 246 247
}

func (w *PayloadWriter) AddFloatToPayload(msgs []float32) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
248
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
249 250 251 252 253 254
	}

	cMsgs := (*C.float)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddFloatToPayload(w.payloadWriterPtr, cMsgs, cLength)
255
	return HandleCStatus(&status, "AddFloatToPayload failed")
X
XuanYang-cn 已提交
256 257 258 259 260
}

func (w *PayloadWriter) AddDoubleToPayload(msgs []float64) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
261
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
262 263 264 265 266 267
	}

	cMsgs := (*C.double)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddDoubleToPayload(w.payloadWriterPtr, cMsgs, cLength)
268
	return HandleCStatus(&status, "AddDoubleToPayload failed")
Z
zhenshan.cao 已提交
269 270 271
}

func (w *PayloadWriter) AddOneStringToPayload(msg string) error {
X
XuanYang-cn 已提交
272 273
	length := len(msg)
	if length == 0 {
Z
zhenshan.cao 已提交
274 275
		return errors.New("can't add empty string into payload")
	}
X
XuanYang-cn 已提交
276 277 278 279 280

	cmsg := C.CString(msg)
	clength := C.int(length)
	defer C.free(unsafe.Pointer(cmsg))

281 282
	status := C.AddOneStringToPayload(w.payloadWriterPtr, cmsg, clength)
	return HandleCStatus(&status, "AddOneStringToPayload failed")
X
XuanYang-cn 已提交
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
}

// dimension > 0 && (%8 == 0)
func (w *PayloadWriter) AddBinaryVectorToPayload(binVec []byte, dim int) error {
	length := len(binVec)
	if length <= 0 {
		return errors.New("can't add empty binVec into payload")
	}
	if dim <= 0 {
		return errors.New("dimension should be greater than 0")
	}

	cBinVec := (*C.uint8_t)(&binVec[0])
	cDim := C.int(dim)
	cLength := C.int(length / (dim / 8))

299 300
	status := C.AddBinaryVectorToPayload(w.payloadWriterPtr, cBinVec, cDim, cLength)
	return HandleCStatus(&status, "AddBinaryVectorToPayload failed")
X
XuanYang-cn 已提交
301 302
}

J
jaime 已提交
303
// AddFloatVectorToPayload dimension > 0 && (%8 == 0)
X
XuanYang-cn 已提交
304 305 306 307 308 309 310 311 312
func (w *PayloadWriter) AddFloatVectorToPayload(floatVec []float32, dim int) error {
	length := len(floatVec)
	if length <= 0 {
		return errors.New("can't add empty floatVec into payload")
	}
	if dim <= 0 {
		return errors.New("dimension should be greater than 0")
	}

C
Cai Yudong 已提交
313
	cVec := (*C.float)(&floatVec[0])
X
XuanYang-cn 已提交
314 315 316
	cDim := C.int(dim)
	cLength := C.int(length / dim)

317 318
	status := C.AddFloatVectorToPayload(w.payloadWriterPtr, cVec, cDim, cLength)
	return HandleCStatus(&status, "AddFloatVectorToPayload failed")
Z
zhenshan.cao 已提交
319 320 321
}

func (w *PayloadWriter) FinishPayloadWriter() error {
322 323
	status := C.FinishPayloadWriter(w.payloadWriterPtr)
	return HandleCStatus(&status, "FinishPayloadWriter failed")
Z
zhenshan.cao 已提交
324 325 326 327
}

func (w *PayloadWriter) GetPayloadBufferFromWriter() ([]byte, error) {
	cb := C.GetPayloadBufferFromWriter(w.payloadWriterPtr)
328
	pointer := uintptr(unsafe.Pointer(cb.data))
Z
zhenshan.cao 已提交
329 330 331 332
	length := int(cb.length)
	if length <= 0 {
		return nil, errors.New("empty buffer")
	}
333 334 335 336 337 338 339 340

	var data []byte
	sh := (*reflect.SliceHeader)(unsafe.Pointer(&data))
	sh.Data = pointer
	sh.Len = length
	sh.Cap = length

	return data, nil
Z
zhenshan.cao 已提交
341 342 343 344 345 346 347
}

func (w *PayloadWriter) GetPayloadLengthFromWriter() (int, error) {
	length := C.GetPayloadLengthFromWriter(w.payloadWriterPtr)
	return int(length), nil
}

348 349
func (w *PayloadWriter) ReleasePayloadWriter() {
	C.ReleasePayloadWriter(w.payloadWriterPtr)
Z
zhenshan.cao 已提交
350 351
}

352 353
func (w *PayloadWriter) Close() {
	w.ReleasePayloadWriter()
Z
zhenshan.cao 已提交
354 355 356 357 358 359
}

func NewPayloadReader(colType schemapb.DataType, buf []byte) (*PayloadReader, error) {
	if len(buf) == 0 {
		return nil, errors.New("create Payload reader failed, buffer is empty")
	}
N
neza2017 已提交
360 361 362 363
	r := C.NewPayloadReader(C.int(colType), (*C.uint8_t)(unsafe.Pointer(&buf[0])), C.long(len(buf)))
	if r == nil {
		return nil, errors.New("failed to read parquet from buffer")
	}
X
XuanYang-cn 已提交
364 365 366
	return &PayloadReader{payloadReaderPtr: r, colType: colType}, nil
}

367
// GetDataFromPayload returns data,length from payload, returns err if failed
X
XuanYang-cn 已提交
368 369 370 371 372 373 374 375 376 377
// Params:
//      `idx`: String index
// Return:
//      `interface{}`: all types.
//      `int`: length, only meaningful to FLOAT/BINARY VECTOR type.
//      `error`: error.
func (r *PayloadReader) GetDataFromPayload(idx ...int) (interface{}, int, error) {
	switch len(idx) {
	case 1:
		switch r.colType {
G
godchen 已提交
378
		case schemapb.DataType_String:
X
XuanYang-cn 已提交
379 380
			val, err := r.GetOneStringFromPayload(idx[0])
			return val, 0, err
G
godchen 已提交
381
		default:
C
Cai Yudong 已提交
382
			return nil, 0, errors.New("unknown type")
X
XuanYang-cn 已提交
383 384 385
		}
	case 0:
		switch r.colType {
G
godchen 已提交
386
		case schemapb.DataType_Bool:
X
XuanYang-cn 已提交
387 388
			val, err := r.GetBoolFromPayload()
			return val, 0, err
G
godchen 已提交
389
		case schemapb.DataType_Int8:
X
XuanYang-cn 已提交
390 391
			val, err := r.GetInt8FromPayload()
			return val, 0, err
G
godchen 已提交
392
		case schemapb.DataType_Int16:
X
XuanYang-cn 已提交
393 394
			val, err := r.GetInt16FromPayload()
			return val, 0, err
G
godchen 已提交
395
		case schemapb.DataType_Int32:
X
XuanYang-cn 已提交
396 397
			val, err := r.GetInt32FromPayload()
			return val, 0, err
G
godchen 已提交
398
		case schemapb.DataType_Int64:
X
XuanYang-cn 已提交
399 400
			val, err := r.GetInt64FromPayload()
			return val, 0, err
G
godchen 已提交
401
		case schemapb.DataType_Float:
X
XuanYang-cn 已提交
402 403
			val, err := r.GetFloatFromPayload()
			return val, 0, err
G
godchen 已提交
404
		case schemapb.DataType_Double:
X
XuanYang-cn 已提交
405 406
			val, err := r.GetDoubleFromPayload()
			return val, 0, err
G
godchen 已提交
407
		case schemapb.DataType_BinaryVector:
X
XuanYang-cn 已提交
408
			return r.GetBinaryVectorFromPayload()
G
godchen 已提交
409
		case schemapb.DataType_FloatVector:
X
XuanYang-cn 已提交
410 411
			return r.GetFloatVectorFromPayload()
		default:
C
Cai Yudong 已提交
412
			return nil, 0, errors.New("unknown type")
X
XuanYang-cn 已提交
413 414 415 416
		}
	default:
		return nil, 0, errors.New("incorrect number of index")
	}
Z
zhenshan.cao 已提交
417 418
}

419
// ReleasePayloadReader release payload reader.
420 421
func (r *PayloadReader) ReleasePayloadReader() {
	C.ReleasePayloadReader(r.payloadReaderPtr)
Z
zhenshan.cao 已提交
422 423
}

424
// GetBoolFromPayload returns bool slice from payload.
X
XuanYang-cn 已提交
425
func (r *PayloadReader) GetBoolFromPayload() ([]bool, error) {
G
godchen 已提交
426
	if r.colType != schemapb.DataType_Bool {
N
neza2017 已提交
427 428 429
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
430 431 432
	var cMsg *C.bool
	var cSize C.int

433 434 435
	status := C.GetBoolFromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetBoolFromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
436 437 438 439 440 441
	}

	slice := (*[1 << 28]bool)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

G
godchen 已提交
442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
// GetByteFromPayload returns byte slice from payload
func (r *PayloadReader) GetByteFromPayload() ([]byte, error) {
	if r.colType != schemapb.DataType_Int8 {
		return nil, errors.New("incorrect data type")
	}

	var cMsg *C.int8_t
	var cSize C.int

	status := C.GetInt8FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt8FromPayload failed"); err != nil {
		return nil, err
	}

	slice := (*[1 << 28]byte)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

460
// GetInt8FromPayload returns int8 slice from payload
X
XuanYang-cn 已提交
461
func (r *PayloadReader) GetInt8FromPayload() ([]int8, error) {
G
godchen 已提交
462
	if r.colType != schemapb.DataType_Int8 {
N
neza2017 已提交
463 464 465
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
466 467 468
	var cMsg *C.int8_t
	var cSize C.int

469 470 471
	status := C.GetInt8FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt8FromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
472 473 474 475 476 477 478
	}

	slice := (*[1 << 28]int8)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetInt16FromPayload() ([]int16, error) {
G
godchen 已提交
479
	if r.colType != schemapb.DataType_Int16 {
N
neza2017 已提交
480 481 482
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
483 484 485
	var cMsg *C.int16_t
	var cSize C.int

486 487 488
	status := C.GetInt16FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt16FromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
489 490 491 492 493 494 495
	}

	slice := (*[1 << 28]int16)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetInt32FromPayload() ([]int32, error) {
G
godchen 已提交
496
	if r.colType != schemapb.DataType_Int32 {
N
neza2017 已提交
497 498 499
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
500 501 502
	var cMsg *C.int32_t
	var cSize C.int

503 504 505
	status := C.GetInt32FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt32FromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
506 507 508 509 510 511 512
	}

	slice := (*[1 << 28]int32)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetInt64FromPayload() ([]int64, error) {
G
godchen 已提交
513
	if r.colType != schemapb.DataType_Int64 {
N
neza2017 已提交
514 515 516
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
517 518 519
	var cMsg *C.int64_t
	var cSize C.int

520 521 522
	status := C.GetInt64FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt64FromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
523 524 525 526 527 528 529
	}

	slice := (*[1 << 28]int64)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetFloatFromPayload() ([]float32, error) {
G
godchen 已提交
530
	if r.colType != schemapb.DataType_Float {
N
neza2017 已提交
531 532 533
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
534 535 536
	var cMsg *C.float
	var cSize C.int

537 538 539
	status := C.GetFloatFromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetFloatFromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
540 541 542 543 544 545 546
	}

	slice := (*[1 << 28]float32)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetDoubleFromPayload() ([]float64, error) {
G
godchen 已提交
547
	if r.colType != schemapb.DataType_Double {
N
neza2017 已提交
548 549 550
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
551 552 553
	var cMsg *C.double
	var cSize C.int

554 555 556
	status := C.GetDoubleFromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetDoubleFromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
557 558 559 560 561 562
	}

	slice := (*[1 << 28]float64)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

Z
zhenshan.cao 已提交
563
func (r *PayloadReader) GetOneStringFromPayload(idx int) (string, error) {
G
godchen 已提交
564
	if r.colType != schemapb.DataType_String {
N
neza2017 已提交
565 566 567
		return "", errors.New("incorrect data type")
	}

Z
zhenshan.cao 已提交
568
	var cStr *C.char
X
XuanYang-cn 已提交
569 570
	var cSize C.int

571 572 573
	status := C.GetOneStringFromPayload(r.payloadReaderPtr, C.int(idx), &cStr, &cSize)
	if err := HandleCStatus(&status, "GetOneStringFromPayload failed"); err != nil {
		return "", err
Z
zhenshan.cao 已提交
574
	}
X
XuanYang-cn 已提交
575 576 577
	return C.GoStringN(cStr, cSize), nil
}

578
// GetBinaryVectorFromPayload returns vector, dimension, error
X
XuanYang-cn 已提交
579
func (r *PayloadReader) GetBinaryVectorFromPayload() ([]byte, int, error) {
G
godchen 已提交
580
	if r.colType != schemapb.DataType_BinaryVector {
N
neza2017 已提交
581 582 583
		return nil, 0, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
584 585 586 587
	var cMsg *C.uint8_t
	var cDim C.int
	var cLen C.int

588 589 590
	status := C.GetBinaryVectorFromPayload(r.payloadReaderPtr, &cMsg, &cDim, &cLen)
	if err := HandleCStatus(&status, "GetBinaryVectorFromPayload failed"); err != nil {
		return nil, 0, err
X
XuanYang-cn 已提交
591 592 593 594 595 596 597
	}
	length := (cDim / 8) * cLen

	slice := (*[1 << 28]byte)(unsafe.Pointer(cMsg))[:length:length]
	return slice, int(cDim), nil
}

598
// GetFloatVectorFromPayload returns vector, dimension, error
X
XuanYang-cn 已提交
599
func (r *PayloadReader) GetFloatVectorFromPayload() ([]float32, int, error) {
G
godchen 已提交
600
	if r.colType != schemapb.DataType_FloatVector {
N
neza2017 已提交
601 602 603
		return nil, 0, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
604 605 606 607
	var cMsg *C.float
	var cDim C.int
	var cLen C.int

608 609 610
	status := C.GetFloatVectorFromPayload(r.payloadReaderPtr, &cMsg, &cDim, &cLen)
	if err := HandleCStatus(&status, "GetFloatVectorFromPayload failed"); err != nil {
		return nil, 0, err
X
XuanYang-cn 已提交
611 612 613 614 615
	}
	length := cDim * cLen

	slice := (*[1 << 28]float32)(unsafe.Pointer(cMsg))[:length:length]
	return slice, int(cDim), nil
Z
zhenshan.cao 已提交
616 617 618 619 620 621 622
}

func (r *PayloadReader) GetPayloadLengthFromReader() (int, error) {
	length := C.GetPayloadLengthFromReader(r.payloadReaderPtr)
	return int(length), nil
}

623
// Close closes the payload reader
624 625
func (r *PayloadReader) Close() {
	r.ReleasePayloadReader()
Z
zhenshan.cao 已提交
626
}
627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645

// HandleCStatus deal with the error returned from CGO
func HandleCStatus(status *C.CStatus, extraInfo string) error {
	if status.error_code == 0 {
		return nil
	}
	errorCode := status.error_code
	errorName, ok := commonpb.ErrorCode_name[int32(errorCode)]
	if !ok {
		errorName = "UnknownError"
	}
	errorMsg := C.GoString(status.error_msg)
	defer C.free(unsafe.Pointer(status.error_msg))

	finalMsg := fmt.Sprintf("[%s] %s", errorName, errorMsg)
	logMsg := fmt.Sprintf("%s, C Runtime Exception: %s\n", extraInfo, finalMsg)
	log.Warn(logMsg)
	return errors.New(finalMsg)
}