From 1ef8f0fceb37ef0ddf610c4e2985c0a91e6c909e Mon Sep 17 00:00:00 2001 From: Enwei Jiao Date: Wed, 14 Jun 2023 18:04:38 +0800 Subject: [PATCH] Remove cgo PayloadWriter (#24892) Signed-off-by: Enwei Jiao --- internal/storage/payload.go | 351 +---------------------------- internal/storage/payload_writer.go | 2 +- 2 files changed, 3 insertions(+), 350 deletions(-) diff --git a/internal/storage/payload.go b/internal/storage/payload.go index ab1d77507..ab1c3e8b1 100644 --- a/internal/storage/payload.go +++ b/internal/storage/payload.go @@ -16,29 +16,13 @@ package storage -/* -#cgo pkg-config: milvus_storage - -#include -#include "storage/parquet_c.h" -*/ -import "C" import ( - "fmt" - "reflect" - "unsafe" - - "github.com/cockroachdb/errors" - "github.com/golang/protobuf/proto" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/pkg/log" ) // PayloadWriterInterface abstracts PayloadWriter type PayloadWriterInterface interface { - AddDataToPayload(msgs interface{}, dim ...int) error + AddDataToPayload(msgs any, dim ...int) error AddBoolToPayload(msgs []bool) error AddByteToPayload(msgs []byte) error AddInt8ToPayload(msgs []int8) error @@ -61,7 +45,7 @@ type PayloadWriterInterface interface { // PayloadReaderInterface abstracts PayloadReader type PayloadReaderInterface interface { - GetDataFromPayload() (interface{}, int, error) + GetDataFromPayload() (any, int, error) GetBoolFromPayload() ([]bool, error) GetByteFromPayload() ([]byte, error) GetInt8FromPayload() ([]int8, error) @@ -79,334 +63,3 @@ type PayloadReaderInterface interface { ReleasePayloadReader() error Close() error } - -// PayloadWriter writes data into payload -type PayloadWriter struct { - payloadWriterPtr C.CPayloadWriter - colType schemapb.DataType -} - -// NewPayloadWriter is constructor of PayloadWriter -func NewPayloadWriter(colType schemapb.DataType, dim ...int) (PayloadWriterInterface, error) { - return NewPurePayloadWriter(colType, dim...) -} - -// AddDataToPayload adds @msgs into payload, if @msgs is vector, dimension should be specified by @dim -func (w *PayloadWriter) AddDataToPayload(msgs interface{}, dim ...int) error { - switch len(dim) { - case 0: - switch w.colType { - case schemapb.DataType_Bool: - val, ok := msgs.([]bool) - if !ok { - return errors.New("incorrect data type") - } - return w.AddBoolToPayload(val) - case schemapb.DataType_Int8: - val, ok := msgs.([]int8) - if !ok { - return errors.New("incorrect data type") - } - return w.AddInt8ToPayload(val) - case schemapb.DataType_Int16: - val, ok := msgs.([]int16) - if !ok { - return errors.New("incorrect data type") - } - return w.AddInt16ToPayload(val) - case schemapb.DataType_Int32: - val, ok := msgs.([]int32) - if !ok { - return errors.New("incorrect data type") - } - return w.AddInt32ToPayload(val) - case schemapb.DataType_Int64: - val, ok := msgs.([]int64) - if !ok { - return errors.New("incorrect data type") - } - return w.AddInt64ToPayload(val) - case schemapb.DataType_Float: - val, ok := msgs.([]float32) - if !ok { - return errors.New("incorrect data type") - } - return w.AddFloatToPayload(val) - case schemapb.DataType_Double: - val, ok := msgs.([]float64) - if !ok { - return errors.New("incorrect data type") - } - return w.AddDoubleToPayload(val) - case schemapb.DataType_String, schemapb.DataType_VarChar: - val, ok := msgs.(string) - if !ok { - return errors.New("incorrect data type") - } - return w.AddOneStringToPayload(val) - case schemapb.DataType_Array: - val, ok := msgs.(*schemapb.ScalarField) - if !ok { - return errors.New("incorrect data type") - } - return w.AddOneArrayToPayload(val) - case schemapb.DataType_JSON: - val, ok := msgs.([]byte) - if !ok { - return errors.New("incorrect data type") - } - return w.AddOneJSONToPayload(val) - default: - return errors.New("incorrect datatype") - } - case 1: - switch w.colType { - case schemapb.DataType_BinaryVector: - val, ok := msgs.([]byte) - if !ok { - return errors.New("incorrect data type") - } - return w.AddBinaryVectorToPayload(val, dim[0]) - case schemapb.DataType_FloatVector: - val, ok := msgs.([]float32) - if !ok { - return errors.New("incorrect data type") - } - return w.AddFloatVectorToPayload(val, dim[0]) - default: - return errors.New("incorrect datatype") - } - default: - return errors.New("incorrect input numbers") - } -} - -// AddBoolToPayload adds @msgs into payload -func (w *PayloadWriter) AddBoolToPayload(msgs []bool) error { - length := len(msgs) - if length <= 0 { - return errors.New("can't add empty msgs into payload") - } - - cMsgs := (*C.bool)(unsafe.Pointer(&msgs[0])) - cLength := C.int(length) - - status := C.AddBooleanToPayload(w.payloadWriterPtr, cMsgs, cLength) - return HandleCStatus(&status, "AddBoolToPayload failed") -} - -// AddByteToPayload adds @msgs into payload -func (w *PayloadWriter) AddByteToPayload(msgs []byte) error { - length := len(msgs) - if length <= 0 { - return errors.New("can't add empty msgs into payload") - } - cMsgs := (*C.int8_t)(unsafe.Pointer(&msgs[0])) - cLength := C.int(length) - - status := C.AddInt8ToPayload(w.payloadWriterPtr, cMsgs, cLength) - return HandleCStatus(&status, "AddInt8ToPayload failed") -} - -func (w *PayloadWriter) AddInt8ToPayload(msgs []int8) error { - length := len(msgs) - if length <= 0 { - return errors.New("can't add empty msgs into payload") - } - cMsgs := (*C.int8_t)(unsafe.Pointer(&msgs[0])) - cLength := C.int(length) - - status := C.AddInt8ToPayload(w.payloadWriterPtr, cMsgs, cLength) - return HandleCStatus(&status, "AddInt8ToPayload failed") -} - -func (w *PayloadWriter) AddInt16ToPayload(msgs []int16) error { - length := len(msgs) - if length <= 0 { - return errors.New("can't add empty msgs into payload") - } - - cMsgs := (*C.int16_t)(unsafe.Pointer(&msgs[0])) - cLength := C.int(length) - - status := C.AddInt16ToPayload(w.payloadWriterPtr, cMsgs, cLength) - return HandleCStatus(&status, "AddInt16ToPayload failed") -} - -func (w *PayloadWriter) AddInt32ToPayload(msgs []int32) error { - length := len(msgs) - if length <= 0 { - return errors.New("can't add empty msgs into payload") - } - - cMsgs := (*C.int32_t)(unsafe.Pointer(&msgs[0])) - cLength := C.int(length) - - status := C.AddInt32ToPayload(w.payloadWriterPtr, cMsgs, cLength) - return HandleCStatus(&status, "AddInt32ToPayload failed") -} - -func (w *PayloadWriter) AddInt64ToPayload(msgs []int64) error { - length := len(msgs) - if length <= 0 { - return errors.New("can't add empty msgs into payload") - } - - cMsgs := (*C.int64_t)(unsafe.Pointer(&msgs[0])) - cLength := C.int(length) - - status := C.AddInt64ToPayload(w.payloadWriterPtr, cMsgs, cLength) - return HandleCStatus(&status, "AddInt64ToPayload failed") -} - -func (w *PayloadWriter) AddFloatToPayload(msgs []float32) error { - length := len(msgs) - if length <= 0 { - return errors.New("can't add empty msgs into payload") - } - - cMsgs := (*C.float)(unsafe.Pointer(&msgs[0])) - cLength := C.int(length) - - status := C.AddFloatToPayload(w.payloadWriterPtr, cMsgs, cLength) - return HandleCStatus(&status, "AddFloatToPayload failed") -} - -func (w *PayloadWriter) AddDoubleToPayload(msgs []float64) error { - length := len(msgs) - if length <= 0 { - return errors.New("can't add empty msgs into payload") - } - - cMsgs := (*C.double)(unsafe.Pointer(&msgs[0])) - cLength := C.int(length) - - status := C.AddDoubleToPayload(w.payloadWriterPtr, cMsgs, cLength) - return HandleCStatus(&status, "AddDoubleToPayload failed") -} - -func (w *PayloadWriter) AddOneStringToPayload(msg string) error { - length := len(msg) - cmsg := C.CString(msg) - clength := C.int(length) - defer C.free(unsafe.Pointer(cmsg)) - - // the C.AddOneStringToPayload can handle empty string - status := C.AddOneStringToPayload(w.payloadWriterPtr, cmsg, clength) - return HandleCStatus(&status, "AddOneStringToPayload failed") -} - -func (w *PayloadWriter) AddOneArrayToPayload(msg *schemapb.ScalarField) error { - bytes, err := proto.Marshal(msg) - if err != nil { - return errors.New("Marshal ListValue failed") - } - - length := len(bytes) - cmsg := (*C.uint8_t)(unsafe.Pointer(&bytes[0])) - clength := C.int(length) - // defer C.free(unsafe.Pointer(cmsg)) - - status := C.AddOneArrayToPayload(w.payloadWriterPtr, cmsg, clength) - return HandleCStatus(&status, "AddOneArrayToPayload failed") -} - -func (w *PayloadWriter) AddOneJSONToPayload(msg []byte) error { - bytes := msg - length := len(bytes) - cmsg := (*C.uint8_t)(unsafe.Pointer(&bytes[0])) - clength := C.int(length) - - status := C.AddOneJSONToPayload(w.payloadWriterPtr, cmsg, clength) - return HandleCStatus(&status, "AddOneJSONToPayload failed") -} - -// AddBinaryVectorToPayload dimension > 0 && (%8 == 0) -func (w *PayloadWriter) AddBinaryVectorToPayload(binVec []byte, dim int) error { - length := len(binVec) - if length <= 0 { - return errors.New("can't add empty binVec into payload") - } - if dim <= 0 { - return errors.New("dimension should be greater than 0") - } - - cBinVec := (*C.uint8_t)(&binVec[0]) - cDim := C.int(dim) - cLength := C.int(length / (dim / 8)) - - status := C.AddBinaryVectorToPayload(w.payloadWriterPtr, cBinVec, cDim, cLength) - return HandleCStatus(&status, "AddBinaryVectorToPayload failed") -} - -// AddFloatVectorToPayload dimension > 0 && (%8 == 0) -func (w *PayloadWriter) AddFloatVectorToPayload(floatVec []float32, dim int) error { - length := len(floatVec) - if length <= 0 { - return errors.New("can't add empty floatVec into payload") - } - if dim <= 0 { - return errors.New("dimension should be greater than 0") - } - - cVec := (*C.float)(&floatVec[0]) - cDim := C.int(dim) - cLength := C.int(length / dim) - - status := C.AddFloatVectorToPayload(w.payloadWriterPtr, cVec, cDim, cLength) - return HandleCStatus(&status, "AddFloatVectorToPayload failed") -} - -func (w *PayloadWriter) FinishPayloadWriter() error { - status := C.FinishPayloadWriter(w.payloadWriterPtr) - return HandleCStatus(&status, "FinishPayloadWriter failed") -} - -func (w *PayloadWriter) GetPayloadBufferFromWriter() ([]byte, error) { - cb := C.GetPayloadBufferFromWriter(w.payloadWriterPtr) - pointer := uintptr(unsafe.Pointer(cb.data)) - length := int(cb.length) - if length <= 0 { - return nil, errors.New("empty buffer") - } - - var data []byte - sh := (*reflect.SliceHeader)(unsafe.Pointer(&data)) - sh.Data = pointer - sh.Len = length - sh.Cap = length - - return data, nil -} - -func (w *PayloadWriter) GetPayloadLengthFromWriter() (int, error) { - length := C.GetPayloadLengthFromWriter(w.payloadWriterPtr) - return int(length), nil -} - -func (w *PayloadWriter) ReleasePayloadWriter() { - C.ReleasePayloadWriter(w.payloadWriterPtr) -} - -func (w *PayloadWriter) Close() { - w.ReleasePayloadWriter() -} - -// HandleCStatus deal with the error returned from CGO -func HandleCStatus(status *C.CStatus, extraInfo string) error { - if status.error_code == 0 { - return nil - } - errorCode := status.error_code - errorName, ok := commonpb.ErrorCode_name[int32(errorCode)] - if !ok { - errorName = "UnknownError" - } - errorMsg := C.GoString(status.error_msg) - defer C.free(unsafe.Pointer(status.error_msg)) - - finalMsg := fmt.Sprintf("[%s] %s", errorName, errorMsg) - logMsg := fmt.Sprintf("%s, C Runtime Exception: %s\n", extraInfo, finalMsg) - log.Warn(logMsg) - return errors.New(finalMsg) -} diff --git a/internal/storage/payload_writer.go b/internal/storage/payload_writer.go index 2f44cd53f..b58056cb9 100644 --- a/internal/storage/payload_writer.go +++ b/internal/storage/payload_writer.go @@ -47,7 +47,7 @@ type NativePayloadWriter struct { releaseOnce sync.Once } -func NewPurePayloadWriter(colType schemapb.DataType, dim ...int) (*NativePayloadWriter, error) { +func NewPayloadWriter(colType schemapb.DataType, dim ...int) (PayloadWriterInterface, error) { var arrowType arrow.DataType if typeutil.IsVectorType(colType) { if len(dim) != 1 { -- GitLab