payload.go 17.2 KB
Newer Older
G
godchen 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

Z
zhenshan.cao 已提交
12 13 14 15 16
package storage

/*
#cgo CFLAGS: -I${SRCDIR}/cwrapper

17
#cgo LDFLAGS: -L${SRCDIR}/cwrapper/output/lib -lwrapper -lparquet -larrow -larrow_bundled_dependencies -lstdc++ -lm
Z
zhenshan.cao 已提交
18 19 20 21 22
#include <stdlib.h>
#include "ParquetWrapper.h"
*/
import "C"
import (
S
sunby 已提交
23
	"errors"
24
	"fmt"
25
	"reflect"
C
Cai Yudong 已提交
26
	"unsafe"
S
sunby 已提交
27

28
	"github.com/milvus-io/milvus/internal/log"
X
Xiangyu Wang 已提交
29 30
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/schemapb"
Z
zhenshan.cao 已提交
31 32
)

33
// PayloadWriterInterface abstracts PayloadWriter
S
sunby 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
type PayloadWriterInterface interface {
	AddDataToPayload(msgs interface{}, dim ...int) error
	AddBoolToPayload(msgs []bool) error
	AddInt8ToPayload(msgs []int8) error
	AddInt16ToPayload(msgs []int16) error
	AddInt32ToPayload(msgs []int32) error
	AddInt64ToPayload(msgs []int64) error
	AddFloatToPayload(msgs []float32) error
	AddDoubleToPayload(msgs []float64) error
	AddOneStringToPayload(msgs string) error
	AddBinaryVectorToPayload(binVec []byte, dim int) error
	AddFloatVectorToPayload(binVec []float32, dim int) error
	FinishPayloadWriter() error
	GetPayloadBufferFromWriter() ([]byte, error)
	GetPayloadLengthFromWriter() (int, error)
49 50
	ReleasePayloadWriter()
	Close()
S
sunby 已提交
51 52
}

53
// PayloadReaderInterface abstracts PayloadReader
S
sunby 已提交
54 55 56 57 58 59 60 61 62 63 64 65 66
type PayloadReaderInterface interface {
	GetDataFromPayload(idx ...int) (interface{}, int, error)
	GetBoolFromPayload() ([]bool, error)
	GetInt8FromPayload() ([]int8, error)
	GetInt16FromPayload() ([]int16, error)
	GetInt32FromPayload() ([]int32, error)
	GetInt64FromPayload() ([]int64, error)
	GetFloatFromPayload() ([]float32, error)
	GetDoubleFromPayload() ([]float64, error)
	GetOneStringFromPayload(idx int) (string, error)
	GetBinaryVectorFromPayload() ([]byte, int, error)
	GetFloatVectorFromPayload() ([]float32, int, error)
	GetPayloadLengthFromReader() (int, error)
67 68
	ReleasePayloadReader()
	Close()
S
sunby 已提交
69
}
C
Cai Yudong 已提交
70

G
godchen 已提交
71 72 73 74
type PayloadWriter struct {
	payloadWriterPtr C.CPayloadWriter
	colType          schemapb.DataType
}
X
XuanYang-cn 已提交
75

G
godchen 已提交
76 77 78 79
type PayloadReader struct {
	payloadReaderPtr C.CPayloadReader
	colType          schemapb.DataType
}
Z
zhenshan.cao 已提交
80 81 82 83 84 85

func NewPayloadWriter(colType schemapb.DataType) (*PayloadWriter, error) {
	w := C.NewPayloadWriter(C.int(colType))
	if w == nil {
		return nil, errors.New("create Payload writer failed")
	}
X
XuanYang-cn 已提交
86 87 88 89 90 91 92
	return &PayloadWriter{payloadWriterPtr: w, colType: colType}, nil
}

func (w *PayloadWriter) AddDataToPayload(msgs interface{}, dim ...int) error {
	switch len(dim) {
	case 0:
		switch w.colType {
G
godchen 已提交
93
		case schemapb.DataType_Bool:
X
XuanYang-cn 已提交
94 95 96 97 98
			val, ok := msgs.([]bool)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddBoolToPayload(val)
G
godchen 已提交
99
		case schemapb.DataType_Int8:
X
XuanYang-cn 已提交
100 101 102 103 104
			val, ok := msgs.([]int8)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt8ToPayload(val)
G
godchen 已提交
105
		case schemapb.DataType_Int16:
X
XuanYang-cn 已提交
106 107 108 109 110
			val, ok := msgs.([]int16)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt16ToPayload(val)
G
godchen 已提交
111
		case schemapb.DataType_Int32:
X
XuanYang-cn 已提交
112 113 114 115 116
			val, ok := msgs.([]int32)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt32ToPayload(val)
G
godchen 已提交
117
		case schemapb.DataType_Int64:
X
XuanYang-cn 已提交
118 119 120 121 122
			val, ok := msgs.([]int64)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt64ToPayload(val)
G
godchen 已提交
123
		case schemapb.DataType_Float:
X
XuanYang-cn 已提交
124 125 126 127 128
			val, ok := msgs.([]float32)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddFloatToPayload(val)
G
godchen 已提交
129
		case schemapb.DataType_Double:
X
XuanYang-cn 已提交
130 131 132 133 134
			val, ok := msgs.([]float64)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddDoubleToPayload(val)
G
godchen 已提交
135
		case schemapb.DataType_String:
X
XuanYang-cn 已提交
136 137 138 139 140
			val, ok := msgs.(string)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddOneStringToPayload(val)
G
godchen 已提交
141 142
		default:
			return errors.New("incorrect datatype")
X
XuanYang-cn 已提交
143 144 145
		}
	case 1:
		switch w.colType {
G
godchen 已提交
146
		case schemapb.DataType_BinaryVector:
X
XuanYang-cn 已提交
147 148 149 150 151
			val, ok := msgs.([]byte)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddBinaryVectorToPayload(val, dim[0])
G
godchen 已提交
152
		case schemapb.DataType_FloatVector:
X
XuanYang-cn 已提交
153 154 155 156 157
			val, ok := msgs.([]float32)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddFloatVectorToPayload(val, dim[0])
G
godchen 已提交
158 159
		default:
			return errors.New("incorrect datatype")
X
XuanYang-cn 已提交
160 161 162 163 164 165 166 167 168
		}
	default:
		return errors.New("incorrect input numbers")
	}
}

func (w *PayloadWriter) AddBoolToPayload(msgs []bool) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
169
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
170 171 172 173 174 175
	}

	cMsgs := (*C.bool)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddBooleanToPayload(w.payloadWriterPtr, cMsgs, cLength)
176
	return HandleCStatus(&status, "AddBoolToPayload failed")
X
XuanYang-cn 已提交
177 178 179 180 181
}

func (w *PayloadWriter) AddInt8ToPayload(msgs []int8) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
182
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
183 184 185 186 187
	}
	cMsgs := (*C.int8_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt8ToPayload(w.payloadWriterPtr, cMsgs, cLength)
188
	return HandleCStatus(&status, "AddInt8ToPayload failed")
X
XuanYang-cn 已提交
189 190 191 192 193
}

func (w *PayloadWriter) AddInt16ToPayload(msgs []int16) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
194
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
195 196 197 198 199 200
	}

	cMsgs := (*C.int16_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt16ToPayload(w.payloadWriterPtr, cMsgs, cLength)
201
	return HandleCStatus(&status, "AddInt16ToPayload failed")
X
XuanYang-cn 已提交
202 203 204 205 206
}

func (w *PayloadWriter) AddInt32ToPayload(msgs []int32) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
207
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
208 209 210 211 212 213
	}

	cMsgs := (*C.int32_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt32ToPayload(w.payloadWriterPtr, cMsgs, cLength)
214
	return HandleCStatus(&status, "AddInt32ToPayload failed")
X
XuanYang-cn 已提交
215 216 217 218 219
}

func (w *PayloadWriter) AddInt64ToPayload(msgs []int64) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
220
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
221 222 223 224 225 226
	}

	cMsgs := (*C.int64_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt64ToPayload(w.payloadWriterPtr, cMsgs, cLength)
227
	return HandleCStatus(&status, "AddInt64ToPayload failed")
X
XuanYang-cn 已提交
228 229 230 231 232
}

func (w *PayloadWriter) AddFloatToPayload(msgs []float32) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
233
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
234 235 236 237 238 239
	}

	cMsgs := (*C.float)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddFloatToPayload(w.payloadWriterPtr, cMsgs, cLength)
240
	return HandleCStatus(&status, "AddFloatToPayload failed")
X
XuanYang-cn 已提交
241 242 243 244 245
}

func (w *PayloadWriter) AddDoubleToPayload(msgs []float64) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
246
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
247 248 249 250 251 252
	}

	cMsgs := (*C.double)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddDoubleToPayload(w.payloadWriterPtr, cMsgs, cLength)
253
	return HandleCStatus(&status, "AddDoubleToPayload failed")
Z
zhenshan.cao 已提交
254 255 256
}

func (w *PayloadWriter) AddOneStringToPayload(msg string) error {
X
XuanYang-cn 已提交
257 258
	length := len(msg)
	if length == 0 {
Z
zhenshan.cao 已提交
259 260
		return errors.New("can't add empty string into payload")
	}
X
XuanYang-cn 已提交
261 262 263 264 265

	cmsg := C.CString(msg)
	clength := C.int(length)
	defer C.free(unsafe.Pointer(cmsg))

266 267
	status := C.AddOneStringToPayload(w.payloadWriterPtr, cmsg, clength)
	return HandleCStatus(&status, "AddOneStringToPayload failed")
X
XuanYang-cn 已提交
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
}

// dimension > 0 && (%8 == 0)
func (w *PayloadWriter) AddBinaryVectorToPayload(binVec []byte, dim int) error {
	length := len(binVec)
	if length <= 0 {
		return errors.New("can't add empty binVec into payload")
	}
	if dim <= 0 {
		return errors.New("dimension should be greater than 0")
	}

	cBinVec := (*C.uint8_t)(&binVec[0])
	cDim := C.int(dim)
	cLength := C.int(length / (dim / 8))

284 285
	status := C.AddBinaryVectorToPayload(w.payloadWriterPtr, cBinVec, cDim, cLength)
	return HandleCStatus(&status, "AddBinaryVectorToPayload failed")
X
XuanYang-cn 已提交
286 287 288 289 290 291 292 293 294 295 296 297
}

// dimension > 0 && (%8 == 0)
func (w *PayloadWriter) AddFloatVectorToPayload(floatVec []float32, dim int) error {
	length := len(floatVec)
	if length <= 0 {
		return errors.New("can't add empty floatVec into payload")
	}
	if dim <= 0 {
		return errors.New("dimension should be greater than 0")
	}

C
Cai Yudong 已提交
298
	cVec := (*C.float)(&floatVec[0])
X
XuanYang-cn 已提交
299 300 301
	cDim := C.int(dim)
	cLength := C.int(length / dim)

302 303
	status := C.AddFloatVectorToPayload(w.payloadWriterPtr, cVec, cDim, cLength)
	return HandleCStatus(&status, "AddFloatVectorToPayload failed")
Z
zhenshan.cao 已提交
304 305 306
}

func (w *PayloadWriter) FinishPayloadWriter() error {
307 308
	status := C.FinishPayloadWriter(w.payloadWriterPtr)
	return HandleCStatus(&status, "FinishPayloadWriter failed")
Z
zhenshan.cao 已提交
309 310 311 312
}

func (w *PayloadWriter) GetPayloadBufferFromWriter() ([]byte, error) {
	cb := C.GetPayloadBufferFromWriter(w.payloadWriterPtr)
313
	pointer := uintptr(unsafe.Pointer(cb.data))
Z
zhenshan.cao 已提交
314 315 316 317
	length := int(cb.length)
	if length <= 0 {
		return nil, errors.New("empty buffer")
	}
318 319 320 321 322 323 324 325

	var data []byte
	sh := (*reflect.SliceHeader)(unsafe.Pointer(&data))
	sh.Data = pointer
	sh.Len = length
	sh.Cap = length

	return data, nil
Z
zhenshan.cao 已提交
326 327 328 329 330 331 332
}

func (w *PayloadWriter) GetPayloadLengthFromWriter() (int, error) {
	length := C.GetPayloadLengthFromWriter(w.payloadWriterPtr)
	return int(length), nil
}

333 334
func (w *PayloadWriter) ReleasePayloadWriter() {
	C.ReleasePayloadWriter(w.payloadWriterPtr)
Z
zhenshan.cao 已提交
335 336
}

337 338
func (w *PayloadWriter) Close() {
	w.ReleasePayloadWriter()
Z
zhenshan.cao 已提交
339 340 341 342 343 344
}

func NewPayloadReader(colType schemapb.DataType, buf []byte) (*PayloadReader, error) {
	if len(buf) == 0 {
		return nil, errors.New("create Payload reader failed, buffer is empty")
	}
N
neza2017 已提交
345 346 347 348
	r := C.NewPayloadReader(C.int(colType), (*C.uint8_t)(unsafe.Pointer(&buf[0])), C.long(len(buf)))
	if r == nil {
		return nil, errors.New("failed to read parquet from buffer")
	}
X
XuanYang-cn 已提交
349 350 351
	return &PayloadReader{payloadReaderPtr: r, colType: colType}, nil
}

352
// GetDataFromPayload returns data,length from payload, returns err if failed
X
XuanYang-cn 已提交
353 354 355 356 357 358 359 360 361 362
// Params:
//      `idx`: String index
// Return:
//      `interface{}`: all types.
//      `int`: length, only meaningful to FLOAT/BINARY VECTOR type.
//      `error`: error.
func (r *PayloadReader) GetDataFromPayload(idx ...int) (interface{}, int, error) {
	switch len(idx) {
	case 1:
		switch r.colType {
G
godchen 已提交
363
		case schemapb.DataType_String:
X
XuanYang-cn 已提交
364 365
			val, err := r.GetOneStringFromPayload(idx[0])
			return val, 0, err
G
godchen 已提交
366
		default:
C
Cai Yudong 已提交
367
			return nil, 0, errors.New("unknown type")
X
XuanYang-cn 已提交
368 369 370
		}
	case 0:
		switch r.colType {
G
godchen 已提交
371
		case schemapb.DataType_Bool:
X
XuanYang-cn 已提交
372 373
			val, err := r.GetBoolFromPayload()
			return val, 0, err
G
godchen 已提交
374
		case schemapb.DataType_Int8:
X
XuanYang-cn 已提交
375 376
			val, err := r.GetInt8FromPayload()
			return val, 0, err
G
godchen 已提交
377
		case schemapb.DataType_Int16:
X
XuanYang-cn 已提交
378 379
			val, err := r.GetInt16FromPayload()
			return val, 0, err
G
godchen 已提交
380
		case schemapb.DataType_Int32:
X
XuanYang-cn 已提交
381 382
			val, err := r.GetInt32FromPayload()
			return val, 0, err
G
godchen 已提交
383
		case schemapb.DataType_Int64:
X
XuanYang-cn 已提交
384 385
			val, err := r.GetInt64FromPayload()
			return val, 0, err
G
godchen 已提交
386
		case schemapb.DataType_Float:
X
XuanYang-cn 已提交
387 388
			val, err := r.GetFloatFromPayload()
			return val, 0, err
G
godchen 已提交
389
		case schemapb.DataType_Double:
X
XuanYang-cn 已提交
390 391
			val, err := r.GetDoubleFromPayload()
			return val, 0, err
G
godchen 已提交
392
		case schemapb.DataType_BinaryVector:
X
XuanYang-cn 已提交
393
			return r.GetBinaryVectorFromPayload()
G
godchen 已提交
394
		case schemapb.DataType_FloatVector:
X
XuanYang-cn 已提交
395 396
			return r.GetFloatVectorFromPayload()
		default:
C
Cai Yudong 已提交
397
			return nil, 0, errors.New("unknown type")
X
XuanYang-cn 已提交
398 399 400 401
		}
	default:
		return nil, 0, errors.New("incorrect number of index")
	}
Z
zhenshan.cao 已提交
402 403
}

404
// ReleasePayloadReader release payload reader.
405 406
func (r *PayloadReader) ReleasePayloadReader() {
	C.ReleasePayloadReader(r.payloadReaderPtr)
Z
zhenshan.cao 已提交
407 408
}

409
// GetBoolFromPayload returns bool slice from payload.
X
XuanYang-cn 已提交
410
func (r *PayloadReader) GetBoolFromPayload() ([]bool, error) {
G
godchen 已提交
411
	if r.colType != schemapb.DataType_Bool {
N
neza2017 已提交
412 413 414
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
415 416 417
	var cMsg *C.bool
	var cSize C.int

418 419 420
	status := C.GetBoolFromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetBoolFromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
421 422 423 424 425 426
	}

	slice := (*[1 << 28]bool)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

427
// GetInt8FromPayload returns int8 slice from payload
X
XuanYang-cn 已提交
428
func (r *PayloadReader) GetInt8FromPayload() ([]int8, error) {
G
godchen 已提交
429
	if r.colType != schemapb.DataType_Int8 {
N
neza2017 已提交
430 431 432
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
433 434 435
	var cMsg *C.int8_t
	var cSize C.int

436 437 438
	status := C.GetInt8FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt8FromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
439 440 441 442 443 444 445
	}

	slice := (*[1 << 28]int8)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetInt16FromPayload() ([]int16, error) {
G
godchen 已提交
446
	if r.colType != schemapb.DataType_Int16 {
N
neza2017 已提交
447 448 449
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
450 451 452
	var cMsg *C.int16_t
	var cSize C.int

453 454 455
	status := C.GetInt16FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt16FromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
456 457 458 459 460 461 462
	}

	slice := (*[1 << 28]int16)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetInt32FromPayload() ([]int32, error) {
G
godchen 已提交
463
	if r.colType != schemapb.DataType_Int32 {
N
neza2017 已提交
464 465 466
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
467 468 469
	var cMsg *C.int32_t
	var cSize C.int

470 471 472
	status := C.GetInt32FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt32FromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
473 474 475 476 477 478 479
	}

	slice := (*[1 << 28]int32)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetInt64FromPayload() ([]int64, error) {
G
godchen 已提交
480
	if r.colType != schemapb.DataType_Int64 {
N
neza2017 已提交
481 482 483
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
484 485 486
	var cMsg *C.int64_t
	var cSize C.int

487 488 489
	status := C.GetInt64FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt64FromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
490 491 492 493 494 495 496
	}

	slice := (*[1 << 28]int64)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetFloatFromPayload() ([]float32, error) {
G
godchen 已提交
497
	if r.colType != schemapb.DataType_Float {
N
neza2017 已提交
498 499 500
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
501 502 503
	var cMsg *C.float
	var cSize C.int

504 505 506
	status := C.GetFloatFromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetFloatFromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
507 508 509 510 511 512 513
	}

	slice := (*[1 << 28]float32)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetDoubleFromPayload() ([]float64, error) {
G
godchen 已提交
514
	if r.colType != schemapb.DataType_Double {
N
neza2017 已提交
515 516 517
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
518 519 520
	var cMsg *C.double
	var cSize C.int

521 522 523
	status := C.GetDoubleFromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetDoubleFromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
524 525 526 527 528 529
	}

	slice := (*[1 << 28]float64)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

Z
zhenshan.cao 已提交
530
func (r *PayloadReader) GetOneStringFromPayload(idx int) (string, error) {
G
godchen 已提交
531
	if r.colType != schemapb.DataType_String {
N
neza2017 已提交
532 533 534
		return "", errors.New("incorrect data type")
	}

Z
zhenshan.cao 已提交
535
	var cStr *C.char
X
XuanYang-cn 已提交
536 537
	var cSize C.int

538 539 540
	status := C.GetOneStringFromPayload(r.payloadReaderPtr, C.int(idx), &cStr, &cSize)
	if err := HandleCStatus(&status, "GetOneStringFromPayload failed"); err != nil {
		return "", err
Z
zhenshan.cao 已提交
541
	}
X
XuanYang-cn 已提交
542 543 544
	return C.GoStringN(cStr, cSize), nil
}

545
// GetBinaryVectorFromPayload returns vector, dimension, error
X
XuanYang-cn 已提交
546
func (r *PayloadReader) GetBinaryVectorFromPayload() ([]byte, int, error) {
G
godchen 已提交
547
	if r.colType != schemapb.DataType_BinaryVector {
N
neza2017 已提交
548 549 550
		return nil, 0, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
551 552 553 554
	var cMsg *C.uint8_t
	var cDim C.int
	var cLen C.int

555 556 557
	status := C.GetBinaryVectorFromPayload(r.payloadReaderPtr, &cMsg, &cDim, &cLen)
	if err := HandleCStatus(&status, "GetBinaryVectorFromPayload failed"); err != nil {
		return nil, 0, err
X
XuanYang-cn 已提交
558 559 560 561 562 563 564
	}
	length := (cDim / 8) * cLen

	slice := (*[1 << 28]byte)(unsafe.Pointer(cMsg))[:length:length]
	return slice, int(cDim), nil
}

565
// GetFloatVectorFromPayload returns vector, dimension, error
X
XuanYang-cn 已提交
566
func (r *PayloadReader) GetFloatVectorFromPayload() ([]float32, int, error) {
G
godchen 已提交
567
	if r.colType != schemapb.DataType_FloatVector {
N
neza2017 已提交
568 569 570
		return nil, 0, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
571 572 573 574
	var cMsg *C.float
	var cDim C.int
	var cLen C.int

575 576 577
	status := C.GetFloatVectorFromPayload(r.payloadReaderPtr, &cMsg, &cDim, &cLen)
	if err := HandleCStatus(&status, "GetFloatVectorFromPayload failed"); err != nil {
		return nil, 0, err
X
XuanYang-cn 已提交
578 579 580 581 582
	}
	length := cDim * cLen

	slice := (*[1 << 28]float32)(unsafe.Pointer(cMsg))[:length:length]
	return slice, int(cDim), nil
Z
zhenshan.cao 已提交
583 584 585 586 587 588 589
}

func (r *PayloadReader) GetPayloadLengthFromReader() (int, error) {
	length := C.GetPayloadLengthFromReader(r.payloadReaderPtr)
	return int(length), nil
}

590 591
func (r *PayloadReader) Close() {
	r.ReleasePayloadReader()
Z
zhenshan.cao 已提交
592
}
593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611

// HandleCStatus deal with the error returned from CGO
func HandleCStatus(status *C.CStatus, extraInfo string) error {
	if status.error_code == 0 {
		return nil
	}
	errorCode := status.error_code
	errorName, ok := commonpb.ErrorCode_name[int32(errorCode)]
	if !ok {
		errorName = "UnknownError"
	}
	errorMsg := C.GoString(status.error_msg)
	defer C.free(unsafe.Pointer(status.error_msg))

	finalMsg := fmt.Sprintf("[%s] %s", errorName, errorMsg)
	logMsg := fmt.Sprintf("%s, C Runtime Exception: %s\n", extraInfo, finalMsg)
	log.Warn(logMsg)
	return errors.New(finalMsg)
}