未验证 提交 c5149c7e 编写于 作者: J Jiquan Long 提交者: GitHub

Replace text codec of pb with binary codec (#16955)

Signed-off-by: Ndragondriver <jiquan.long@zilliz.com>
上级 63385b7c
......@@ -70,13 +70,15 @@ typedef struct CProto {
typedef struct CLoadFieldDataInfo {
int64_t field_id;
const char* blob;
const uint8_t* blob;
uint64_t blob_size;
int64_t row_count;
} CLoadFieldDataInfo;
typedef struct CLoadDeletedRecordInfo {
void* timestamps;
const char* primary_keys;
const uint8_t* primary_keys;
const uint64_t primary_keys_size;
int64_t row_count;
} CLoadDeletedRecordInfo;
......
......@@ -134,14 +134,13 @@ Insert(CSegmentInterface c_segment,
int64_t size,
const int64_t* row_ids,
const uint64_t* timestamps,
const char* data_info) {
const uint8_t* data_info,
const uint64_t data_info_len) {
try {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
auto proto = std::string(data_info);
Assert(!proto.empty());
auto insert_data = std::make_unique<milvus::InsertData>();
auto suc = google::protobuf::TextFormat::ParseFromString(proto, insert_data.get());
AssertInfo(suc, "unmarshal field data string failed");
auto suc = insert_data->ParseFromArray(data_info, data_info_len);
AssertInfo(suc, "failed to parse insert data from records");
segment->Insert(reserved_offset, size, row_ids, timestamps, insert_data.get());
return milvus::SuccessCStatus();
......@@ -162,14 +161,16 @@ PreInsert(CSegmentInterface c_segment, int64_t size, int64_t* offset) {
}
CStatus
Delete(
CSegmentInterface c_segment, int64_t reserved_offset, int64_t size, const char* ids, const uint64_t* timestamps) {
Delete(CSegmentInterface c_segment,
int64_t reserved_offset,
int64_t size,
const uint8_t* ids,
const uint64_t ids_size,
const uint64_t* timestamps) {
auto segment = (milvus::segcore::SegmentInterface*)c_segment;
auto proto = std::string(ids);
Assert(!proto.empty());
auto pks = std::make_unique<milvus::proto::schema::IDs>();
auto suc = google::protobuf::TextFormat::ParseFromString(proto, pks.get());
AssertInfo(suc, "unmarshal field data string failed");
auto suc = pks->ParseFromArray(ids, ids_size);
AssertInfo(suc, "failed to parse pks from ids");
try {
auto res = segment->Delete(reserved_offset, size, pks.get(), timestamps);
return milvus::SuccessCStatus();
......@@ -192,10 +193,8 @@ LoadFieldData(CSegmentInterface c_segment, CLoadFieldDataInfo load_field_data_in
auto segment_interface = reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
auto segment = dynamic_cast<milvus::segcore::SegmentSealed*>(segment_interface);
AssertInfo(segment != nullptr, "segment conversion failed");
auto proto = std::string(load_field_data_info.blob);
Assert(!proto.empty());
auto field_data = std::make_unique<milvus::DataArray>();
auto suc = google::protobuf::TextFormat::ParseFromString(proto, field_data.get());
auto suc = field_data->ParseFromArray(load_field_data_info.blob, load_field_data_info.blob_size);
AssertInfo(suc, "unmarshal field data string failed");
auto load_info =
LoadFieldDataInfo{load_field_data_info.field_id, field_data.get(), load_field_data_info.row_count};
......@@ -211,10 +210,8 @@ LoadDeletedRecord(CSegmentInterface c_segment, CLoadDeletedRecordInfo deleted_re
try {
auto segment_interface = reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
AssertInfo(segment_interface != nullptr, "segment conversion failed");
auto proto = std::string(deleted_record_info.primary_keys);
Assert(!proto.empty());
auto pks = std::make_unique<milvus::proto::schema::IDs>();
auto suc = google::protobuf::TextFormat::ParseFromString(proto, pks.get());
auto suc = pks->ParseFromArray(deleted_record_info.primary_keys, deleted_record_info.primary_keys_size);
AssertInfo(suc, "unmarshal field data string failed");
auto load_info =
LoadDeletedRecordInfo{deleted_record_info.timestamps, pks.get(), deleted_record_info.row_count};
......
......@@ -67,7 +67,8 @@ Insert(CSegmentInterface c_segment,
int64_t size,
const int64_t* row_ids,
const uint64_t* timestamps,
const char* data_info);
const uint8_t* data_info,
const uint64_t data_info_len);
CStatus
PreInsert(CSegmentInterface c_segment, int64_t size, int64_t* offset);
......@@ -90,7 +91,12 @@ DropSealedSegmentIndex(CSegmentInterface c_segment, int64_t field_id);
////////////////////////////// interfaces for SegmentInterface //////////////////////////////
CStatus
Delete(CSegmentInterface c_segment, int64_t reserved_offset, int64_t size, const char* ids, const uint64_t* timestamps);
Delete(CSegmentInterface c_segment,
int64_t reserved_offset,
int64_t size,
const uint8_t* ids,
const uint64_t ids_size,
const uint64_t* timestamps);
int64_t
PreDelete(CSegmentInterface c_segment, int64_t size);
......
......@@ -609,17 +609,6 @@ func (s *Segment) segmentPreDelete(numOfRecords int) int64 {
}
func (s *Segment) segmentInsert(offset int64, entityIDs []UniqueID, timestamps []Timestamp, record *segcorepb.InsertRecord) error {
/*
CStatus
Insert(CSegmentInterface c_segment,
long int reserved_offset,
signed long int size,
const long* primary_keys,
const unsigned long* timestamps,
void* raw_data,
int sizeof_per_row,
signed long int count);
*/
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
if s.segmentType != segmentTypeGrowing {
......@@ -630,10 +619,10 @@ func (s *Segment) segmentInsert(offset int64, entityIDs []UniqueID, timestamps [
return errors.New("null seg core pointer")
}
insertRecordBlob := proto.MarshalTextString(record)
cInsertRecordBlob := C.CString(insertRecordBlob)
defer C.free(unsafe.Pointer(cInsertRecordBlob))
insertRecordBlob, err := proto.Marshal(record)
if err != nil {
return fmt.Errorf("failed to marshal insert record: %s", err)
}
var numOfRow = len(entityIDs)
var cOffset = C.int64_t(offset)
......@@ -646,7 +635,8 @@ func (s *Segment) segmentInsert(offset int64, entityIDs []UniqueID, timestamps [
cNumOfRows,
cEntityIdsPtr,
cTimestampsPtr,
cInsertRecordBlob)
(*C.uint8_t)(unsafe.Pointer(&insertRecordBlob[0])),
(C.uint64_t)(len(insertRecordBlob)))
if err := HandleCStatus(&status, "Insert failed"); err != nil {
return err
}
......@@ -709,11 +699,12 @@ func (s *Segment) segmentDelete(offset int64, entityIDs []primaryKey, timestamps
return fmt.Errorf("invalid data type of primary keys")
}
dataBlob := proto.MarshalTextString(ids)
cDataBlob := C.CString(dataBlob)
defer C.free(unsafe.Pointer(cDataBlob))
dataBlob, err := proto.Marshal(ids)
if err != nil {
return fmt.Errorf("failed to marshal ids: %s", err)
}
status := C.Delete(s.segmentPtr, cOffset, cSize, cDataBlob, cTimestampsPtr)
status := C.Delete(s.segmentPtr, cOffset, cSize, (*C.uint8_t)(unsafe.Pointer(&dataBlob[0])), (C.uint64_t)(len(dataBlob)), cTimestampsPtr)
if err := HandleCStatus(&status, "Delete failed"); err != nil {
return err
}
......@@ -737,21 +728,15 @@ func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int64, data *sche
return errors.New(errMsg)
}
dataBlob := proto.MarshalTextString(data)
cDataBlob := C.CString(dataBlob)
defer C.free(unsafe.Pointer(cDataBlob))
dataBlob, err := proto.Marshal(data)
if err != nil {
return err
}
/*
typedef struct CLoadFieldDataInfo {
int64_t field_id;
void* blob;
int64_t row_count;
} CLoadFieldDataInfo;
*/
loadInfo := C.CLoadFieldDataInfo{
field_id: C.int64_t(fieldID),
blob: cDataBlob,
blob: (*C.uint8_t)(unsafe.Pointer(&dataBlob[0])),
blob_size: C.uint64_t(len(dataBlob)),
row_count: C.int64_t(rowCount),
}
......@@ -805,15 +790,16 @@ func (s *Segment) segmentLoadDeletedRecord(primaryKeys []primaryKey, timestamps
return fmt.Errorf("invalid data type of primary keys")
}
idsBlob := proto.MarshalTextString(ids)
cIdsBlob := C.CString(idsBlob)
defer C.free(unsafe.Pointer(cIdsBlob))
idsBlob, err := proto.Marshal(ids)
if err != nil {
return err
}
loadInfo := C.CLoadDeletedRecordInfo{
timestamps: unsafe.Pointer(&timestamps[0]),
primary_keys: cIdsBlob,
row_count: C.int64_t(rowCount),
timestamps: unsafe.Pointer(&timestamps[0]),
primary_keys: (*C.uint8_t)(unsafe.Pointer(&idsBlob[0])),
primary_keys_size: C.uint64_t(len(idsBlob)),
row_count: C.int64_t(rowCount),
}
/*
CStatus
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册