payload.go 17.0 KB
Newer Older
G
godchen 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

Z
zhenshan.cao 已提交
12 13 14 15 16
package storage

/*
#cgo CFLAGS: -I${SRCDIR}/cwrapper

F
FluorineDog 已提交
17
#cgo LDFLAGS: -L${SRCDIR}/cwrapper/output -lwrapper -lparquet -larrow -lthrift -lutf8proc -lstdc++ -lm
Z
zhenshan.cao 已提交
18 19 20 21 22
#include <stdlib.h>
#include "ParquetWrapper.h"
*/
import "C"
import (
S
sunby 已提交
23
	"errors"
24
	"fmt"
C
Cai Yudong 已提交
25
	"unsafe"
S
sunby 已提交
26

27
	"github.com/milvus-io/milvus/internal/log"
X
Xiangyu Wang 已提交
28 29
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/schemapb"
Z
zhenshan.cao 已提交
30 31
)

S
sunby 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
type PayloadWriterInterface interface {
	AddDataToPayload(msgs interface{}, dim ...int) error
	AddBoolToPayload(msgs []bool) error
	AddInt8ToPayload(msgs []int8) error
	AddInt16ToPayload(msgs []int16) error
	AddInt32ToPayload(msgs []int32) error
	AddInt64ToPayload(msgs []int64) error
	AddFloatToPayload(msgs []float32) error
	AddDoubleToPayload(msgs []float64) error
	AddOneStringToPayload(msgs string) error
	AddBinaryVectorToPayload(binVec []byte, dim int) error
	AddFloatVectorToPayload(binVec []float32, dim int) error
	FinishPayloadWriter() error
	GetPayloadBufferFromWriter() ([]byte, error)
	GetPayloadLengthFromWriter() (int, error)
	ReleasePayloadWriter() error
	Close() error
}

type PayloadReaderInterface interface {
	GetDataFromPayload(idx ...int) (interface{}, int, error)
	GetBoolFromPayload() ([]bool, error)
	GetInt8FromPayload() ([]int8, error)
	GetInt16FromPayload() ([]int16, error)
	GetInt32FromPayload() ([]int32, error)
	GetInt64FromPayload() ([]int64, error)
	GetFloatFromPayload() ([]float32, error)
	GetDoubleFromPayload() ([]float64, error)
	GetOneStringFromPayload(idx int) (string, error)
	GetBinaryVectorFromPayload() ([]byte, int, error)
	GetFloatVectorFromPayload() ([]float32, int, error)
	GetPayloadLengthFromReader() (int, error)
	ReleasePayloadReader() error
	Close() error
}
C
Cai Yudong 已提交
67

G
godchen 已提交
68 69 70 71
type PayloadWriter struct {
	payloadWriterPtr C.CPayloadWriter
	colType          schemapb.DataType
}
X
XuanYang-cn 已提交
72

G
godchen 已提交
73 74 75 76
type PayloadReader struct {
	payloadReaderPtr C.CPayloadReader
	colType          schemapb.DataType
}
Z
zhenshan.cao 已提交
77 78 79 80 81 82

func NewPayloadWriter(colType schemapb.DataType) (*PayloadWriter, error) {
	w := C.NewPayloadWriter(C.int(colType))
	if w == nil {
		return nil, errors.New("create Payload writer failed")
	}
X
XuanYang-cn 已提交
83 84 85 86 87 88 89
	return &PayloadWriter{payloadWriterPtr: w, colType: colType}, nil
}

func (w *PayloadWriter) AddDataToPayload(msgs interface{}, dim ...int) error {
	switch len(dim) {
	case 0:
		switch w.colType {
G
godchen 已提交
90
		case schemapb.DataType_Bool:
X
XuanYang-cn 已提交
91 92 93 94 95
			val, ok := msgs.([]bool)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddBoolToPayload(val)
G
godchen 已提交
96
		case schemapb.DataType_Int8:
X
XuanYang-cn 已提交
97 98 99 100 101
			val, ok := msgs.([]int8)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt8ToPayload(val)
G
godchen 已提交
102
		case schemapb.DataType_Int16:
X
XuanYang-cn 已提交
103 104 105 106 107
			val, ok := msgs.([]int16)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt16ToPayload(val)
G
godchen 已提交
108
		case schemapb.DataType_Int32:
X
XuanYang-cn 已提交
109 110 111 112 113
			val, ok := msgs.([]int32)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt32ToPayload(val)
G
godchen 已提交
114
		case schemapb.DataType_Int64:
X
XuanYang-cn 已提交
115 116 117 118 119
			val, ok := msgs.([]int64)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt64ToPayload(val)
G
godchen 已提交
120
		case schemapb.DataType_Float:
X
XuanYang-cn 已提交
121 122 123 124 125
			val, ok := msgs.([]float32)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddFloatToPayload(val)
G
godchen 已提交
126
		case schemapb.DataType_Double:
X
XuanYang-cn 已提交
127 128 129 130 131
			val, ok := msgs.([]float64)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddDoubleToPayload(val)
G
godchen 已提交
132
		case schemapb.DataType_String:
X
XuanYang-cn 已提交
133 134 135 136 137
			val, ok := msgs.(string)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddOneStringToPayload(val)
G
godchen 已提交
138 139
		default:
			return errors.New("incorrect datatype")
X
XuanYang-cn 已提交
140 141 142
		}
	case 1:
		switch w.colType {
G
godchen 已提交
143
		case schemapb.DataType_BinaryVector:
X
XuanYang-cn 已提交
144 145 146 147 148
			val, ok := msgs.([]byte)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddBinaryVectorToPayload(val, dim[0])
G
godchen 已提交
149
		case schemapb.DataType_FloatVector:
X
XuanYang-cn 已提交
150 151 152 153 154
			val, ok := msgs.([]float32)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddFloatVectorToPayload(val, dim[0])
G
godchen 已提交
155 156
		default:
			return errors.New("incorrect datatype")
X
XuanYang-cn 已提交
157 158 159 160 161 162 163 164 165
		}
	default:
		return errors.New("incorrect input numbers")
	}
}

func (w *PayloadWriter) AddBoolToPayload(msgs []bool) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
166
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
167 168 169 170 171 172
	}

	cMsgs := (*C.bool)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddBooleanToPayload(w.payloadWriterPtr, cMsgs, cLength)
173
	return HandleCStatus(&status, "AddBoolToPayload failed")
X
XuanYang-cn 已提交
174 175 176 177 178
}

func (w *PayloadWriter) AddInt8ToPayload(msgs []int8) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
179
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
180 181 182 183 184
	}
	cMsgs := (*C.int8_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt8ToPayload(w.payloadWriterPtr, cMsgs, cLength)
185
	return HandleCStatus(&status, "AddInt8ToPayload failed")
X
XuanYang-cn 已提交
186 187 188 189 190
}

func (w *PayloadWriter) AddInt16ToPayload(msgs []int16) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
191
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
192 193 194 195 196 197
	}

	cMsgs := (*C.int16_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt16ToPayload(w.payloadWriterPtr, cMsgs, cLength)
198
	return HandleCStatus(&status, "AddInt16ToPayload failed")
X
XuanYang-cn 已提交
199 200 201 202 203
}

func (w *PayloadWriter) AddInt32ToPayload(msgs []int32) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
204
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
205 206 207 208 209 210
	}

	cMsgs := (*C.int32_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt32ToPayload(w.payloadWriterPtr, cMsgs, cLength)
211
	return HandleCStatus(&status, "AddInt32ToPayload failed")
X
XuanYang-cn 已提交
212 213 214 215 216
}

func (w *PayloadWriter) AddInt64ToPayload(msgs []int64) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
217
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
218 219 220 221 222 223
	}

	cMsgs := (*C.int64_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt64ToPayload(w.payloadWriterPtr, cMsgs, cLength)
224
	return HandleCStatus(&status, "AddInt64ToPayload failed")
X
XuanYang-cn 已提交
225 226 227 228 229
}

func (w *PayloadWriter) AddFloatToPayload(msgs []float32) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
230
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
231 232 233 234 235 236
	}

	cMsgs := (*C.float)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddFloatToPayload(w.payloadWriterPtr, cMsgs, cLength)
237
	return HandleCStatus(&status, "AddFloatToPayload failed")
X
XuanYang-cn 已提交
238 239 240 241 242
}

func (w *PayloadWriter) AddDoubleToPayload(msgs []float64) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
243
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
244 245 246 247 248 249
	}

	cMsgs := (*C.double)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddDoubleToPayload(w.payloadWriterPtr, cMsgs, cLength)
250
	return HandleCStatus(&status, "AddDoubleToPayload failed")
Z
zhenshan.cao 已提交
251 252 253
}

func (w *PayloadWriter) AddOneStringToPayload(msg string) error {
X
XuanYang-cn 已提交
254 255
	length := len(msg)
	if length == 0 {
Z
zhenshan.cao 已提交
256 257
		return errors.New("can't add empty string into payload")
	}
X
XuanYang-cn 已提交
258 259 260 261 262

	cmsg := C.CString(msg)
	clength := C.int(length)
	defer C.free(unsafe.Pointer(cmsg))

263 264
	status := C.AddOneStringToPayload(w.payloadWriterPtr, cmsg, clength)
	return HandleCStatus(&status, "AddOneStringToPayload failed")
X
XuanYang-cn 已提交
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
}

// dimension > 0 && (%8 == 0)
func (w *PayloadWriter) AddBinaryVectorToPayload(binVec []byte, dim int) error {
	length := len(binVec)
	if length <= 0 {
		return errors.New("can't add empty binVec into payload")
	}
	if dim <= 0 {
		return errors.New("dimension should be greater than 0")
	}

	cBinVec := (*C.uint8_t)(&binVec[0])
	cDim := C.int(dim)
	cLength := C.int(length / (dim / 8))

281 282
	status := C.AddBinaryVectorToPayload(w.payloadWriterPtr, cBinVec, cDim, cLength)
	return HandleCStatus(&status, "AddBinaryVectorToPayload failed")
X
XuanYang-cn 已提交
283 284 285 286 287 288 289 290 291 292 293 294
}

// dimension > 0 && (%8 == 0)
func (w *PayloadWriter) AddFloatVectorToPayload(floatVec []float32, dim int) error {
	length := len(floatVec)
	if length <= 0 {
		return errors.New("can't add empty floatVec into payload")
	}
	if dim <= 0 {
		return errors.New("dimension should be greater than 0")
	}

C
Cai Yudong 已提交
295
	cVec := (*C.float)(&floatVec[0])
X
XuanYang-cn 已提交
296 297 298
	cDim := C.int(dim)
	cLength := C.int(length / dim)

299 300
	status := C.AddFloatVectorToPayload(w.payloadWriterPtr, cVec, cDim, cLength)
	return HandleCStatus(&status, "AddFloatVectorToPayload failed")
Z
zhenshan.cao 已提交
301 302 303
}

func (w *PayloadWriter) FinishPayloadWriter() error {
304 305
	status := C.FinishPayloadWriter(w.payloadWriterPtr)
	return HandleCStatus(&status, "FinishPayloadWriter failed")
Z
zhenshan.cao 已提交
306 307 308 309 310 311 312 313 314
}

func (w *PayloadWriter) GetPayloadBufferFromWriter() ([]byte, error) {
	cb := C.GetPayloadBufferFromWriter(w.payloadWriterPtr)
	pointer := unsafe.Pointer(cb.data)
	length := int(cb.length)
	if length <= 0 {
		return nil, errors.New("empty buffer")
	}
X
XuanYang-cn 已提交
315
	// refer to: https://github.com/golang/go/wiki/cgo#turning-c-arrays-into-go-slices
Z
zhenshan.cao 已提交
316 317 318 319 320 321 322 323 324 325
	slice := (*[1 << 28]byte)(pointer)[:length:length]
	return slice, nil
}

func (w *PayloadWriter) GetPayloadLengthFromWriter() (int, error) {
	length := C.GetPayloadLengthFromWriter(w.payloadWriterPtr)
	return int(length), nil
}

func (w *PayloadWriter) ReleasePayloadWriter() error {
326 327
	status := C.ReleasePayloadWriter(w.payloadWriterPtr)
	return HandleCStatus(&status, "ReleasePayloadWriter failed")
Z
zhenshan.cao 已提交
328 329 330 331 332 333 334 335 336 337
}

func (w *PayloadWriter) Close() error {
	return w.ReleasePayloadWriter()
}

func NewPayloadReader(colType schemapb.DataType, buf []byte) (*PayloadReader, error) {
	if len(buf) == 0 {
		return nil, errors.New("create Payload reader failed, buffer is empty")
	}
N
neza2017 已提交
338 339 340 341
	r := C.NewPayloadReader(C.int(colType), (*C.uint8_t)(unsafe.Pointer(&buf[0])), C.long(len(buf)))
	if r == nil {
		return nil, errors.New("failed to read parquet from buffer")
	}
X
XuanYang-cn 已提交
342 343 344 345 346 347 348 349 350 351 352 353 354
	return &PayloadReader{payloadReaderPtr: r, colType: colType}, nil
}

// Params:
//      `idx`: String index
// Return:
//      `interface{}`: all types.
//      `int`: length, only meaningful to FLOAT/BINARY VECTOR type.
//      `error`: error.
func (r *PayloadReader) GetDataFromPayload(idx ...int) (interface{}, int, error) {
	switch len(idx) {
	case 1:
		switch r.colType {
G
godchen 已提交
355
		case schemapb.DataType_String:
X
XuanYang-cn 已提交
356 357
			val, err := r.GetOneStringFromPayload(idx[0])
			return val, 0, err
G
godchen 已提交
358
		default:
C
Cai Yudong 已提交
359
			return nil, 0, errors.New("unknown type")
X
XuanYang-cn 已提交
360 361 362
		}
	case 0:
		switch r.colType {
G
godchen 已提交
363
		case schemapb.DataType_Bool:
X
XuanYang-cn 已提交
364 365
			val, err := r.GetBoolFromPayload()
			return val, 0, err
G
godchen 已提交
366
		case schemapb.DataType_Int8:
X
XuanYang-cn 已提交
367 368
			val, err := r.GetInt8FromPayload()
			return val, 0, err
G
godchen 已提交
369
		case schemapb.DataType_Int16:
X
XuanYang-cn 已提交
370 371
			val, err := r.GetInt16FromPayload()
			return val, 0, err
G
godchen 已提交
372
		case schemapb.DataType_Int32:
X
XuanYang-cn 已提交
373 374
			val, err := r.GetInt32FromPayload()
			return val, 0, err
G
godchen 已提交
375
		case schemapb.DataType_Int64:
X
XuanYang-cn 已提交
376 377
			val, err := r.GetInt64FromPayload()
			return val, 0, err
G
godchen 已提交
378
		case schemapb.DataType_Float:
X
XuanYang-cn 已提交
379 380
			val, err := r.GetFloatFromPayload()
			return val, 0, err
G
godchen 已提交
381
		case schemapb.DataType_Double:
X
XuanYang-cn 已提交
382 383
			val, err := r.GetDoubleFromPayload()
			return val, 0, err
G
godchen 已提交
384
		case schemapb.DataType_BinaryVector:
X
XuanYang-cn 已提交
385
			return r.GetBinaryVectorFromPayload()
G
godchen 已提交
386
		case schemapb.DataType_FloatVector:
X
XuanYang-cn 已提交
387 388
			return r.GetFloatVectorFromPayload()
		default:
C
Cai Yudong 已提交
389
			return nil, 0, errors.New("unknown type")
X
XuanYang-cn 已提交
390 391 392 393
		}
	default:
		return nil, 0, errors.New("incorrect number of index")
	}
Z
zhenshan.cao 已提交
394 395 396
}

func (r *PayloadReader) ReleasePayloadReader() error {
397 398
	status := C.ReleasePayloadReader(r.payloadReaderPtr)
	return HandleCStatus(&status, "ReleasePayloadReader failed")
Z
zhenshan.cao 已提交
399 400
}

X
XuanYang-cn 已提交
401
func (r *PayloadReader) GetBoolFromPayload() ([]bool, error) {
G
godchen 已提交
402
	if r.colType != schemapb.DataType_Bool {
N
neza2017 已提交
403 404 405
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
406 407 408
	var cMsg *C.bool
	var cSize C.int

409 410 411
	status := C.GetBoolFromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetBoolFromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
412 413 414 415 416 417 418
	}

	slice := (*[1 << 28]bool)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetInt8FromPayload() ([]int8, error) {
G
godchen 已提交
419
	if r.colType != schemapb.DataType_Int8 {
N
neza2017 已提交
420 421 422
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
423 424 425
	var cMsg *C.int8_t
	var cSize C.int

426 427 428
	status := C.GetInt8FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt8FromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
429 430 431 432 433 434 435
	}

	slice := (*[1 << 28]int8)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetInt16FromPayload() ([]int16, error) {
G
godchen 已提交
436
	if r.colType != schemapb.DataType_Int16 {
N
neza2017 已提交
437 438 439
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
440 441 442
	var cMsg *C.int16_t
	var cSize C.int

443 444 445
	status := C.GetInt16FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt16FromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
446 447 448 449 450 451 452
	}

	slice := (*[1 << 28]int16)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetInt32FromPayload() ([]int32, error) {
G
godchen 已提交
453
	if r.colType != schemapb.DataType_Int32 {
N
neza2017 已提交
454 455 456
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
457 458 459
	var cMsg *C.int32_t
	var cSize C.int

460 461 462
	status := C.GetInt32FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt32FromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
463 464 465 466 467 468 469
	}

	slice := (*[1 << 28]int32)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetInt64FromPayload() ([]int64, error) {
G
godchen 已提交
470
	if r.colType != schemapb.DataType_Int64 {
N
neza2017 已提交
471 472 473
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
474 475 476
	var cMsg *C.int64_t
	var cSize C.int

477 478 479
	status := C.GetInt64FromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetInt64FromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
480 481 482 483 484 485 486
	}

	slice := (*[1 << 28]int64)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetFloatFromPayload() ([]float32, error) {
G
godchen 已提交
487
	if r.colType != schemapb.DataType_Float {
N
neza2017 已提交
488 489 490
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
491 492 493
	var cMsg *C.float
	var cSize C.int

494 495 496
	status := C.GetFloatFromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetFloatFromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
497 498 499 500 501 502 503
	}

	slice := (*[1 << 28]float32)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

func (r *PayloadReader) GetDoubleFromPayload() ([]float64, error) {
G
godchen 已提交
504
	if r.colType != schemapb.DataType_Double {
N
neza2017 已提交
505 506 507
		return nil, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
508 509 510
	var cMsg *C.double
	var cSize C.int

511 512 513
	status := C.GetDoubleFromPayload(r.payloadReaderPtr, &cMsg, &cSize)
	if err := HandleCStatus(&status, "GetDoubleFromPayload failed"); err != nil {
		return nil, err
X
XuanYang-cn 已提交
514 515 516 517 518 519
	}

	slice := (*[1 << 28]float64)(unsafe.Pointer(cMsg))[:cSize:cSize]
	return slice, nil
}

Z
zhenshan.cao 已提交
520
func (r *PayloadReader) GetOneStringFromPayload(idx int) (string, error) {
G
godchen 已提交
521
	if r.colType != schemapb.DataType_String {
N
neza2017 已提交
522 523 524
		return "", errors.New("incorrect data type")
	}

Z
zhenshan.cao 已提交
525
	var cStr *C.char
X
XuanYang-cn 已提交
526 527
	var cSize C.int

528 529 530
	status := C.GetOneStringFromPayload(r.payloadReaderPtr, C.int(idx), &cStr, &cSize)
	if err := HandleCStatus(&status, "GetOneStringFromPayload failed"); err != nil {
		return "", err
Z
zhenshan.cao 已提交
531
	}
X
XuanYang-cn 已提交
532 533 534 535 536
	return C.GoStringN(cStr, cSize), nil
}

// ,dimension, error
func (r *PayloadReader) GetBinaryVectorFromPayload() ([]byte, int, error) {
G
godchen 已提交
537
	if r.colType != schemapb.DataType_BinaryVector {
N
neza2017 已提交
538 539 540
		return nil, 0, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
541 542 543 544
	var cMsg *C.uint8_t
	var cDim C.int
	var cLen C.int

545 546 547
	status := C.GetBinaryVectorFromPayload(r.payloadReaderPtr, &cMsg, &cDim, &cLen)
	if err := HandleCStatus(&status, "GetBinaryVectorFromPayload failed"); err != nil {
		return nil, 0, err
X
XuanYang-cn 已提交
548 549 550 551 552 553 554 555 556
	}
	length := (cDim / 8) * cLen

	slice := (*[1 << 28]byte)(unsafe.Pointer(cMsg))[:length:length]
	return slice, int(cDim), nil
}

// ,dimension, error
func (r *PayloadReader) GetFloatVectorFromPayload() ([]float32, int, error) {
G
godchen 已提交
557
	if r.colType != schemapb.DataType_FloatVector {
N
neza2017 已提交
558 559 560
		return nil, 0, errors.New("incorrect data type")
	}

X
XuanYang-cn 已提交
561 562 563 564
	var cMsg *C.float
	var cDim C.int
	var cLen C.int

565 566 567
	status := C.GetFloatVectorFromPayload(r.payloadReaderPtr, &cMsg, &cDim, &cLen)
	if err := HandleCStatus(&status, "GetFloatVectorFromPayload failed"); err != nil {
		return nil, 0, err
X
XuanYang-cn 已提交
568 569 570 571 572
	}
	length := cDim * cLen

	slice := (*[1 << 28]float32)(unsafe.Pointer(cMsg))[:length:length]
	return slice, int(cDim), nil
Z
zhenshan.cao 已提交
573 574 575 576 577 578 579 580 581 582
}

func (r *PayloadReader) GetPayloadLengthFromReader() (int, error) {
	length := C.GetPayloadLengthFromReader(r.payloadReaderPtr)
	return int(length), nil
}

func (r *PayloadReader) Close() error {
	return r.ReleasePayloadReader()
}
583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601

// HandleCStatus deal with the error returned from CGO
func HandleCStatus(status *C.CStatus, extraInfo string) error {
	if status.error_code == 0 {
		return nil
	}
	errorCode := status.error_code
	errorName, ok := commonpb.ErrorCode_name[int32(errorCode)]
	if !ok {
		errorName = "UnknownError"
	}
	errorMsg := C.GoString(status.error_msg)
	defer C.free(unsafe.Pointer(status.error_msg))

	finalMsg := fmt.Sprintf("[%s] %s", errorName, errorMsg)
	logMsg := fmt.Sprintf("%s, C Runtime Exception: %s\n", extraInfo, finalMsg)
	log.Warn(logMsg)
	return errors.New(finalMsg)
}