payload.go 12.0 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
G
godchen 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
G
godchen 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
G
godchen 已提交
16

Z
zhenshan.cao 已提交
17 18 19
package storage

/*
20
#cgo pkg-config: milvus_storage
Z
zhenshan.cao 已提交
21 22

#include <stdlib.h>
23
#include "storage/parquet_c.h"
Z
zhenshan.cao 已提交
24 25 26
*/
import "C"
import (
X
xige-16 已提交
27
	"fmt"
28
	"reflect"
C
Cai Yudong 已提交
29
	"unsafe"
S
sunby 已提交
30

31
	"github.com/cockroachdb/errors"
E
Enwei Jiao 已提交
32
	"github.com/golang/protobuf/proto"
33

34 35
	"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
	"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
E
Enwei Jiao 已提交
36
	"github.com/milvus-io/milvus/pkg/log"
Z
zhenshan.cao 已提交
37 38
)

39
// PayloadWriterInterface abstracts PayloadWriter
S
sunby 已提交
40 41 42
type PayloadWriterInterface interface {
	AddDataToPayload(msgs interface{}, dim ...int) error
	AddBoolToPayload(msgs []bool) error
G
godchen 已提交
43
	AddByteToPayload(msgs []byte) error
S
sunby 已提交
44 45 46 47 48 49 50
	AddInt8ToPayload(msgs []int8) error
	AddInt16ToPayload(msgs []int16) error
	AddInt32ToPayload(msgs []int32) error
	AddInt64ToPayload(msgs []int64) error
	AddFloatToPayload(msgs []float32) error
	AddDoubleToPayload(msgs []float64) error
	AddOneStringToPayload(msgs string) error
E
Enwei Jiao 已提交
51 52
	AddOneArrayToPayload(msg *schemapb.ScalarField) error
	AddOneJSONToPayload(msg []byte) error
S
sunby 已提交
53 54 55 56 57
	AddBinaryVectorToPayload(binVec []byte, dim int) error
	AddFloatVectorToPayload(binVec []float32, dim int) error
	FinishPayloadWriter() error
	GetPayloadBufferFromWriter() ([]byte, error)
	GetPayloadLengthFromWriter() (int, error)
58 59
	ReleasePayloadWriter()
	Close()
S
sunby 已提交
60 61
}

62
// PayloadReaderInterface abstracts PayloadReader
S
sunby 已提交
63
type PayloadReaderInterface interface {
64
	GetDataFromPayload() (interface{}, int, error)
S
sunby 已提交
65
	GetBoolFromPayload() ([]bool, error)
G
godchen 已提交
66
	GetByteFromPayload() ([]byte, error)
S
sunby 已提交
67 68 69 70 71 72
	GetInt8FromPayload() ([]int8, error)
	GetInt16FromPayload() ([]int16, error)
	GetInt32FromPayload() ([]int32, error)
	GetInt64FromPayload() ([]int64, error)
	GetFloatFromPayload() ([]float32, error)
	GetDoubleFromPayload() ([]float64, error)
73
	GetStringFromPayload() ([]string, error)
E
Enwei Jiao 已提交
74 75
	GetArrayFromPayload() ([]*schemapb.ScalarField, error)
	GetJSONFromPayload() ([][]byte, error)
S
sunby 已提交
76 77 78
	GetBinaryVectorFromPayload() ([]byte, int, error)
	GetFloatVectorFromPayload() ([]float32, int, error)
	GetPayloadLengthFromReader() (int, error)
79 80
	ReleasePayloadReader() error
	Close() error
S
sunby 已提交
81
}
C
Cai Yudong 已提交
82

83
// PayloadWriter writes data into payload
G
godchen 已提交
84 85 86 87
type PayloadWriter struct {
	payloadWriterPtr C.CPayloadWriter
	colType          schemapb.DataType
}
X
XuanYang-cn 已提交
88

89
// NewPayloadWriter is constructor of PayloadWriter
Y
yah01 已提交
90 91
func NewPayloadWriter(colType schemapb.DataType, dim ...int) (PayloadWriterInterface, error) {
	return NewPurePayloadWriter(colType, dim...)
X
XuanYang-cn 已提交
92 93
}

94
// AddDataToPayload adds @msgs into payload, if @msgs is vector, dimension should be specified by @dim
X
XuanYang-cn 已提交
95 96 97 98
func (w *PayloadWriter) AddDataToPayload(msgs interface{}, dim ...int) error {
	switch len(dim) {
	case 0:
		switch w.colType {
G
godchen 已提交
99
		case schemapb.DataType_Bool:
X
XuanYang-cn 已提交
100 101 102 103 104
			val, ok := msgs.([]bool)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddBoolToPayload(val)
G
godchen 已提交
105
		case schemapb.DataType_Int8:
X
XuanYang-cn 已提交
106 107 108 109 110
			val, ok := msgs.([]int8)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt8ToPayload(val)
G
godchen 已提交
111
		case schemapb.DataType_Int16:
X
XuanYang-cn 已提交
112 113 114 115 116
			val, ok := msgs.([]int16)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt16ToPayload(val)
G
godchen 已提交
117
		case schemapb.DataType_Int32:
X
XuanYang-cn 已提交
118 119 120 121 122
			val, ok := msgs.([]int32)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt32ToPayload(val)
G
godchen 已提交
123
		case schemapb.DataType_Int64:
X
XuanYang-cn 已提交
124 125 126 127 128
			val, ok := msgs.([]int64)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddInt64ToPayload(val)
G
godchen 已提交
129
		case schemapb.DataType_Float:
X
XuanYang-cn 已提交
130 131 132 133 134
			val, ok := msgs.([]float32)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddFloatToPayload(val)
G
godchen 已提交
135
		case schemapb.DataType_Double:
X
XuanYang-cn 已提交
136 137 138 139 140
			val, ok := msgs.([]float64)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddDoubleToPayload(val)
X
xige-16 已提交
141
		case schemapb.DataType_String, schemapb.DataType_VarChar:
X
XuanYang-cn 已提交
142 143 144 145 146
			val, ok := msgs.(string)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddOneStringToPayload(val)
E
Enwei Jiao 已提交
147 148 149 150 151 152 153 154 155 156 157 158
		case schemapb.DataType_Array:
			val, ok := msgs.(*schemapb.ScalarField)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddOneArrayToPayload(val)
		case schemapb.DataType_JSON:
			val, ok := msgs.([]byte)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddOneJSONToPayload(val)
G
godchen 已提交
159 160
		default:
			return errors.New("incorrect datatype")
X
XuanYang-cn 已提交
161 162 163
		}
	case 1:
		switch w.colType {
G
godchen 已提交
164
		case schemapb.DataType_BinaryVector:
X
XuanYang-cn 已提交
165 166 167 168 169
			val, ok := msgs.([]byte)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddBinaryVectorToPayload(val, dim[0])
G
godchen 已提交
170
		case schemapb.DataType_FloatVector:
X
XuanYang-cn 已提交
171 172 173 174 175
			val, ok := msgs.([]float32)
			if !ok {
				return errors.New("incorrect data type")
			}
			return w.AddFloatVectorToPayload(val, dim[0])
G
godchen 已提交
176 177
		default:
			return errors.New("incorrect datatype")
X
XuanYang-cn 已提交
178 179 180 181 182 183
		}
	default:
		return errors.New("incorrect input numbers")
	}
}

184
// AddBoolToPayload adds @msgs into payload
X
XuanYang-cn 已提交
185 186 187
func (w *PayloadWriter) AddBoolToPayload(msgs []bool) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
188
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
189 190 191 192 193 194
	}

	cMsgs := (*C.bool)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddBooleanToPayload(w.payloadWriterPtr, cMsgs, cLength)
195
	return HandleCStatus(&status, "AddBoolToPayload failed")
X
XuanYang-cn 已提交
196 197
}

198
// AddByteToPayload adds @msgs into payload
G
godchen 已提交
199 200 201 202 203 204 205 206 207 208 209 210
func (w *PayloadWriter) AddByteToPayload(msgs []byte) error {
	length := len(msgs)
	if length <= 0 {
		return errors.New("can't add empty msgs into payload")
	}
	cMsgs := (*C.int8_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt8ToPayload(w.payloadWriterPtr, cMsgs, cLength)
	return HandleCStatus(&status, "AddInt8ToPayload failed")
}

X
XuanYang-cn 已提交
211 212 213
func (w *PayloadWriter) AddInt8ToPayload(msgs []int8) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
214
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
215 216 217 218 219
	}
	cMsgs := (*C.int8_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt8ToPayload(w.payloadWriterPtr, cMsgs, cLength)
220
	return HandleCStatus(&status, "AddInt8ToPayload failed")
X
XuanYang-cn 已提交
221 222 223 224 225
}

func (w *PayloadWriter) AddInt16ToPayload(msgs []int16) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
226
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
227 228 229 230 231 232
	}

	cMsgs := (*C.int16_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt16ToPayload(w.payloadWriterPtr, cMsgs, cLength)
233
	return HandleCStatus(&status, "AddInt16ToPayload failed")
X
XuanYang-cn 已提交
234 235 236 237 238
}

func (w *PayloadWriter) AddInt32ToPayload(msgs []int32) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
239
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
240 241 242 243 244 245
	}

	cMsgs := (*C.int32_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt32ToPayload(w.payloadWriterPtr, cMsgs, cLength)
246
	return HandleCStatus(&status, "AddInt32ToPayload failed")
X
XuanYang-cn 已提交
247 248 249 250 251
}

func (w *PayloadWriter) AddInt64ToPayload(msgs []int64) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
252
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
253 254 255 256 257 258
	}

	cMsgs := (*C.int64_t)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddInt64ToPayload(w.payloadWriterPtr, cMsgs, cLength)
259
	return HandleCStatus(&status, "AddInt64ToPayload failed")
X
XuanYang-cn 已提交
260 261 262 263 264
}

func (w *PayloadWriter) AddFloatToPayload(msgs []float32) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
265
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
266 267 268 269 270 271
	}

	cMsgs := (*C.float)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddFloatToPayload(w.payloadWriterPtr, cMsgs, cLength)
272
	return HandleCStatus(&status, "AddFloatToPayload failed")
X
XuanYang-cn 已提交
273 274 275 276 277
}

func (w *PayloadWriter) AddDoubleToPayload(msgs []float64) error {
	length := len(msgs)
	if length <= 0 {
S
sunby 已提交
278
		return errors.New("can't add empty msgs into payload")
X
XuanYang-cn 已提交
279 280 281 282 283 284
	}

	cMsgs := (*C.double)(unsafe.Pointer(&msgs[0]))
	cLength := C.int(length)

	status := C.AddDoubleToPayload(w.payloadWriterPtr, cMsgs, cLength)
285
	return HandleCStatus(&status, "AddDoubleToPayload failed")
Z
zhenshan.cao 已提交
286 287 288
}

func (w *PayloadWriter) AddOneStringToPayload(msg string) error {
X
XuanYang-cn 已提交
289 290 291 292 293
	length := len(msg)
	cmsg := C.CString(msg)
	clength := C.int(length)
	defer C.free(unsafe.Pointer(cmsg))

294
	// the C.AddOneStringToPayload can handle empty string
295 296
	status := C.AddOneStringToPayload(w.payloadWriterPtr, cmsg, clength)
	return HandleCStatus(&status, "AddOneStringToPayload failed")
X
XuanYang-cn 已提交
297 298
}

E
Enwei Jiao 已提交
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
func (w *PayloadWriter) AddOneArrayToPayload(msg *schemapb.ScalarField) error {
	bytes, err := proto.Marshal(msg)
	if err != nil {
		return errors.New("Marshal ListValue failed")
	}

	length := len(bytes)
	cmsg := (*C.uint8_t)(unsafe.Pointer(&bytes[0]))
	clength := C.int(length)
	// defer C.free(unsafe.Pointer(cmsg))

	status := C.AddOneArrayToPayload(w.payloadWriterPtr, cmsg, clength)
	return HandleCStatus(&status, "AddOneArrayToPayload failed")
}

func (w *PayloadWriter) AddOneJSONToPayload(msg []byte) error {
	bytes := msg
	length := len(bytes)
	cmsg := (*C.uint8_t)(unsafe.Pointer(&bytes[0]))
	clength := C.int(length)

	status := C.AddOneJSONToPayload(w.payloadWriterPtr, cmsg, clength)
	return HandleCStatus(&status, "AddOneJSONToPayload failed")
}

324
// AddBinaryVectorToPayload dimension > 0 && (%8 == 0)
X
XuanYang-cn 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337
func (w *PayloadWriter) AddBinaryVectorToPayload(binVec []byte, dim int) error {
	length := len(binVec)
	if length <= 0 {
		return errors.New("can't add empty binVec into payload")
	}
	if dim <= 0 {
		return errors.New("dimension should be greater than 0")
	}

	cBinVec := (*C.uint8_t)(&binVec[0])
	cDim := C.int(dim)
	cLength := C.int(length / (dim / 8))

338 339
	status := C.AddBinaryVectorToPayload(w.payloadWriterPtr, cBinVec, cDim, cLength)
	return HandleCStatus(&status, "AddBinaryVectorToPayload failed")
X
XuanYang-cn 已提交
340 341
}

J
jaime 已提交
342
// AddFloatVectorToPayload dimension > 0 && (%8 == 0)
X
XuanYang-cn 已提交
343 344 345 346 347 348 349 350 351
func (w *PayloadWriter) AddFloatVectorToPayload(floatVec []float32, dim int) error {
	length := len(floatVec)
	if length <= 0 {
		return errors.New("can't add empty floatVec into payload")
	}
	if dim <= 0 {
		return errors.New("dimension should be greater than 0")
	}

C
Cai Yudong 已提交
352
	cVec := (*C.float)(&floatVec[0])
X
XuanYang-cn 已提交
353 354 355
	cDim := C.int(dim)
	cLength := C.int(length / dim)

356 357
	status := C.AddFloatVectorToPayload(w.payloadWriterPtr, cVec, cDim, cLength)
	return HandleCStatus(&status, "AddFloatVectorToPayload failed")
Z
zhenshan.cao 已提交
358 359 360
}

func (w *PayloadWriter) FinishPayloadWriter() error {
361 362
	status := C.FinishPayloadWriter(w.payloadWriterPtr)
	return HandleCStatus(&status, "FinishPayloadWriter failed")
Z
zhenshan.cao 已提交
363 364 365 366
}

func (w *PayloadWriter) GetPayloadBufferFromWriter() ([]byte, error) {
	cb := C.GetPayloadBufferFromWriter(w.payloadWriterPtr)
367
	pointer := uintptr(unsafe.Pointer(cb.data))
Z
zhenshan.cao 已提交
368 369 370 371
	length := int(cb.length)
	if length <= 0 {
		return nil, errors.New("empty buffer")
	}
372 373 374 375 376 377 378 379

	var data []byte
	sh := (*reflect.SliceHeader)(unsafe.Pointer(&data))
	sh.Data = pointer
	sh.Len = length
	sh.Cap = length

	return data, nil
Z
zhenshan.cao 已提交
380 381 382 383 384 385 386
}

func (w *PayloadWriter) GetPayloadLengthFromWriter() (int, error) {
	length := C.GetPayloadLengthFromWriter(w.payloadWriterPtr)
	return int(length), nil
}

387 388
func (w *PayloadWriter) ReleasePayloadWriter() {
	C.ReleasePayloadWriter(w.payloadWriterPtr)
Z
zhenshan.cao 已提交
389 390
}

391 392
func (w *PayloadWriter) Close() {
	w.ReleasePayloadWriter()
Z
zhenshan.cao 已提交
393
}
E
Enwei Jiao 已提交
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412

// HandleCStatus deal with the error returned from CGO
func HandleCStatus(status *C.CStatus, extraInfo string) error {
	if status.error_code == 0 {
		return nil
	}
	errorCode := status.error_code
	errorName, ok := commonpb.ErrorCode_name[int32(errorCode)]
	if !ok {
		errorName = "UnknownError"
	}
	errorMsg := C.GoString(status.error_msg)
	defer C.free(unsafe.Pointer(status.error_msg))

	finalMsg := fmt.Sprintf("[%s] %s", errorName, errorMsg)
	logMsg := fmt.Sprintf("%s, C Runtime Exception: %s\n", extraInfo, finalMsg)
	log.Warn(logMsg)
	return errors.New(finalMsg)
}