未验证 提交 1f705a20 编写于 作者: G groot 提交者: GitHub

define binary data type (#3115)

* define binary data type
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* typo
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* refine code
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>
Co-authored-by: Nshengjun.li <shengjun.li@zilliz.com>
上级 19c5dc07
......@@ -20,9 +20,8 @@
#include <fcntl.h>
#include <unistd.h>
#include <algorithm>
#include <memory>
#include <boost/filesystem.hpp>
#include <memory>
#include "utils/Exception.h"
#include "utils/Log.h"
......@@ -32,7 +31,7 @@ namespace milvus {
namespace codec {
void
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, std::vector<uint8_t>& raw) {
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, engine::BinaryDataPtr& raw) {
if (!fs_ptr->reader_ptr_->open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
......@@ -42,15 +41,16 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
size_t num_bytes;
fs_ptr->reader_ptr_->read(&num_bytes, sizeof(size_t));
raw.resize(num_bytes);
fs_ptr->reader_ptr_->read(raw.data(), num_bytes);
raw = std::make_shared<engine::BinaryData>();
raw->data_.resize(num_bytes);
fs_ptr->reader_ptr_->read(raw->data_.data(), num_bytes);
fs_ptr->reader_ptr_->close();
}
void
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, int64_t offset, int64_t num_bytes,
std::vector<uint8_t>& raw) {
engine::BinaryDataPtr& raw) {
if (offset < 0 || num_bytes <= 0) {
std::string err_msg = "Invalid input to read: " + file_path;
LOG_ENGINE_ERROR_ << err_msg;
......@@ -73,15 +73,16 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
throw Exception(SERVER_INVALID_ARGUMENT, err_msg);
}
raw.resize(num_bytes);
raw = std::make_shared<engine::BinaryData>();
raw->data_.resize(num_bytes);
fs_ptr->reader_ptr_->seekg(offset);
fs_ptr->reader_ptr_->read(raw.data(), num_bytes);
fs_ptr->reader_ptr_->read(raw->data_.data(), num_bytes);
fs_ptr->reader_ptr_->close();
}
void
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const ReadRanges& read_ranges,
std::vector<uint8_t>& raw) {
engine::BinaryDataPtr& raw) {
if (read_ranges.empty()) {
return;
}
......@@ -106,13 +107,13 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
total_bytes += range.num_bytes_;
}
raw.clear();
raw.resize(total_bytes);
raw = std::make_shared<engine::BinaryData>();
raw->data_.resize(total_bytes);
int64_t poz = 0;
for (auto& range : read_ranges) {
int64_t offset = range.offset_ + sizeof(size_t);
fs_ptr->reader_ptr_->seekg(offset);
fs_ptr->reader_ptr_->read(raw.data() + poz, range.num_bytes_);
fs_ptr->reader_ptr_->read(raw->data_.data() + poz, range.num_bytes_);
poz += range.num_bytes_;
}
......@@ -120,16 +121,21 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
}
void
BlockFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const std::vector<uint8_t>& raw) {
BlockFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const engine::BinaryDataPtr& raw) {
if (raw == nullptr) {
return;
}
if (!fs_ptr->writer_ptr_->open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
size_t num_bytes = raw.size();
size_t num_bytes = raw->data_.size();
fs_ptr->writer_ptr_->write(&num_bytes, sizeof(size_t));
fs_ptr->writer_ptr_->write((void*)raw.data(), num_bytes);
fs_ptr->writer_ptr_->write((void*)(raw->data_.data()), num_bytes);
fs_ptr->writer_ptr_->close();
}
......
......@@ -21,6 +21,7 @@
#include <string>
#include <vector>
#include "db/Types.h"
#include "knowhere/common/BinarySet.h"
#include "storage/FSHandler.h"
......@@ -41,18 +42,18 @@ class BlockFormat {
BlockFormat() = default;
void
Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, std::vector<uint8_t>& raw);
Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, engine::BinaryDataPtr& raw);
void
Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, int64_t offset, int64_t num_bytes,
std::vector<uint8_t>& raw);
engine::BinaryDataPtr& raw);
void
Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const ReadRanges& read_ranges,
std::vector<uint8_t>& raw);
engine::BinaryDataPtr& raw);
void
Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const std::vector<uint8_t>& raw);
Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const engine::BinaryDataPtr& raw);
// No copy and move
BlockFormat(const BlockFormat&) = delete;
......
......@@ -128,10 +128,15 @@ VectorIndexFormat::ReadCompress(const storage::FSHandlerPtr& fs_ptr, const std::
}
void
VectorIndexFormat::ConvertRaw(const std::vector<uint8_t>& raw, knowhere::BinaryPtr& data) {
VectorIndexFormat::ConvertRaw(const engine::BinaryDataPtr& raw, knowhere::BinaryPtr& data) {
data = std::make_shared<knowhere::Binary>();
data->size = raw.size();
data->data = std::shared_ptr<uint8_t[]>(new uint8_t[data->size]);
if (raw == nullptr) {
return;
}
data->size = raw->Size();
data->data = std::shared_ptr<uint8_t[]>(new uint8_t[data->size], std::default_delete<uint8_t[]>());
memcpy(data->data.get(), raw->data_.data(), data->size);
}
void
......
......@@ -44,7 +44,7 @@ class VectorIndexFormat {
ReadCompress(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::BinaryPtr& data);
void
ConvertRaw(const std::vector<uint8_t>& raw, knowhere::BinaryPtr& data);
ConvertRaw(const engine::BinaryDataPtr& raw, knowhere::BinaryPtr& data);
void
ConstructIndex(const std::string& index_name, knowhere::BinarySet& index_data, knowhere::BinaryPtr& raw_data,
......
......@@ -212,18 +212,25 @@ DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
CHECK_INITIALIZED;
auto ctx = context;
// check uid existence/validation
// check uid params
bool has_uid = false;
for (auto& pair : ctx.fields_schema) {
if (pair.first->GetFtype() == DataType::UID) {
if (pair.first->GetName() == DEFAULT_UID_NAME) {
has_uid = true;
json params = pair.first->GetParams();
if (params.find(PARAM_UID_AUTOGEN) == params.end()) {
params[PARAM_UID_AUTOGEN] = true;
pair.first->SetParams(params);
}
break;
}
}
// add uid field if not specified
if (!has_uid) {
auto uid_field = std::make_shared<snapshot::Field>(DEFAULT_UID_NAME, 0, milvus::engine::DataType::UID);
json params;
params[PARAM_UID_AUTOGEN] = true;
auto uid_field = std::make_shared<snapshot::Field>(DEFAULT_UID_NAME, 0, DataType::INT64, params);
auto bloom_filter_element = std::make_shared<snapshot::FieldElement>(
0, 0, DEFAULT_BLOOM_FILTER_NAME, milvus::engine::FieldElementType::FET_BLOOM_FILTER);
auto delete_doc_element = std::make_shared<snapshot::FieldElement>(
......@@ -480,14 +487,47 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_
return Status(DB_NOT_FOUND, "Fail to get partition " + partition_name);
}
// Generate id
if (data_chunk->fixed_fields_.find(engine::DEFAULT_UID_NAME) == data_chunk->fixed_fields_.end()) {
auto id_field = ss->GetField(DEFAULT_UID_NAME);
if (id_field == nullptr) {
return Status(DB_ERROR, "Field '_id' not found");
}
auto& params = id_field->GetParams();
bool auto_increment = true;
if (params.find(PARAM_UID_AUTOGEN) != params.end()) {
auto_increment = params[PARAM_UID_AUTOGEN];
}
// id is auto increment, but client provides id, return error
FIXEDX_FIELD_MAP& fields = data_chunk->fixed_fields_;
if (auto_increment) {
auto pair = fields.find(engine::DEFAULT_UID_NAME);
if (pair != fields.end() && pair->second != nullptr) {
return Status(DB_ERROR, "Field '_id' is auto increment, no need to provide id");
}
}
// id is not auto increment, but client doesn't provide id, return error
if (!auto_increment) {
auto pair = fields.find(engine::DEFAULT_UID_NAME);
if (pair == fields.end() || pair->second == nullptr) {
return Status(DB_ERROR, "Field '_id' is user defined");
}
}
// generate id
DataChunkPtr new_chunk = std::make_shared<DataChunk>();
new_chunk->fixed_fields_ = data_chunk->fixed_fields_;
new_chunk->variable_fields_ = data_chunk->variable_fields_;
new_chunk->count_ = data_chunk->count_;
if (auto_increment) {
SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
IDNumbers ids;
STATUS_CHECK(id_generator.GetNextIDNumbers(data_chunk->count_, ids));
FIXED_FIELD_DATA& id_data = data_chunk->fixed_fields_[engine::DEFAULT_UID_NAME];
id_data.resize(ids.size() * sizeof(int64_t));
memcpy(id_data.data(), ids.data(), ids.size() * sizeof(int64_t));
BinaryDataPtr id_data = std::make_shared<BinaryData>();
id_data->data_.resize(ids.size() * sizeof(int64_t));
memcpy(id_data->data_.data(), ids.data(), ids.size() * sizeof(int64_t));
new_chunk->fixed_fields_[engine::DEFAULT_UID_NAME] = id_data;
}
if (options_.wal_enable_) {
......@@ -509,8 +549,8 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_
record.lsn = 0;
record.collection_id = collection_name;
record.partition_tag = partition_name;
record.data_chunk = data_chunk;
record.length = data_chunk->count_;
record.data_chunk = new_chunk;
record.length = new_chunk->count_;
record.type = wal::MXLogType::Entity;
STATUS_CHECK(ExecWalRecord(record));
......
......@@ -16,6 +16,7 @@ namespace milvus {
namespace engine {
const char* DEFAULT_UID_NAME = "_uid";
const char* PARAM_UID_AUTOGEN = "auto_increment";
const char* DEFAULT_RAW_DATA_NAME = "_raw";
const char* DEFAULT_BLOOM_FILTER_NAME = "_blf";
......
......@@ -22,6 +22,7 @@
#include <utility>
#include <vector>
#include "cache/DataObj.h"
#include "knowhere/index/vector_index/VecIndex.h"
#include "utils/Json.h"
......@@ -53,18 +54,39 @@ enum DataType {
STRING = 20,
UID = 30,
VECTOR_BINARY = 100,
VECTOR_FLOAT = 101,
};
class BinaryData : public cache::DataObj {
public:
int64_t
Size() {
return data_.size();
}
public:
std::vector<uint8_t> data_;
};
using BinaryDataPtr = std::shared_ptr<BinaryData>;
class VaribleData : public cache::DataObj {
public:
int64_t
Size() {
return data_.size();
}
public:
std::vector<uint8_t> data_;
std::vector<int64_t> offset_;
};
using VaribleDataPtr = std::shared_ptr<VaribleData>;
using FIELD_TYPE_MAP = std::unordered_map<std::string, DataType>;
using FIELD_WIDTH_MAP = std::unordered_map<std::string, int64_t>;
using FIXED_FIELD_DATA = std::vector<uint8_t>;
using FIXEDX_FIELD_MAP = std::unordered_map<std::string, FIXED_FIELD_DATA>;
using VARIABLE_FIELD_DATA = std::vector<std::string>;
using VARIABLE_FIELD_MAP = std::unordered_map<std::string, VARIABLE_FIELD_DATA>;
using FIXEDX_FIELD_MAP = std::unordered_map<std::string, BinaryDataPtr>;
using VARIABLE_FIELD_MAP = std::unordered_map<std::string, VaribleDataPtr>;
using VECTOR_INDEX_MAP = std::unordered_map<std::string, knowhere::VecIndexPtr>;
using STRUCTURED_INDEX_MAP = std::unordered_map<std::string, knowhere::IndexPtr>;
......@@ -116,6 +138,7 @@ using File2ErrArray = std::map<std::string, std::vector<std::string>>;
using Table2FileErr = std::map<std::string, File2ErrArray>;
extern const char* DEFAULT_UID_NAME;
extern const char* PARAM_UID_AUTOGEN;
extern const char* DEFAULT_RAW_DATA_NAME;
extern const char* DEFAULT_BLOOM_FILTER_NAME;
......
......@@ -51,10 +51,14 @@ namespace engine {
namespace {
template <typename T>
knowhere::IndexPtr
CreateSortedIndex(std::vector<uint8_t>& raw_data) {
auto count = raw_data.size() / sizeof(T);
CreateSortedIndex(engine::BinaryDataPtr& raw_data) {
if (raw_data == nullptr) {
return nullptr;
}
auto count = raw_data->data_.size() / sizeof(T);
auto index_ptr =
std::make_shared<knowhere::StructuredIndexSort<T>>(count, reinterpret_cast<const T*>(raw_data.data()));
std::make_shared<knowhere::StructuredIndexSort<T>>(count, reinterpret_cast<const T*>(raw_data->data_.data()));
return std::static_pointer_cast<knowhere::Index>(index_ptr);
}
} // namespace
......@@ -97,14 +101,13 @@ ExecutionEngineImpl::LoadForSearch(const query::QueryPtr& query_ptr) {
}
Status
ExecutionEngineImpl::CreateStructuredIndex(const DataType field_type, std::vector<uint8_t>& raw_data,
ExecutionEngineImpl::CreateStructuredIndex(const DataType field_type, engine::BinaryDataPtr& raw_data,
knowhere::IndexPtr& index_ptr) {
switch (field_type) {
case engine::DataType::INT32: {
index_ptr = CreateSortedIndex<int32_t>(raw_data);
break;
}
case engine::DataType::UID:
case engine::DataType::INT64: {
index_ptr = CreateSortedIndex<int64_t>(raw_data);
break;
......@@ -145,7 +148,7 @@ ExecutionEngineImpl::Load(const TargetFields& field_names) {
if (!index_exist) {
// for structured field, create a simple sorted index for it
// we also can do this in BuildIndex step, but for now we do this in Load step
std::vector<uint8_t> raw_data;
BinaryDataPtr raw_data;
segment_reader_->LoadField(name, raw_data);
STATUS_CHECK(CreateStructuredIndex(field_type, raw_data, index_ptr));
segment_ptr->SetStructuredIndex(name, index_ptr);
......@@ -155,7 +158,7 @@ ExecutionEngineImpl::Load(const TargetFields& field_names) {
// index not yet build, load raw data
if (!index_exist) {
std::vector<uint8_t> raw;
BinaryDataPtr raw;
segment_reader_->LoadField(name, raw);
}
......@@ -287,8 +290,6 @@ ExecutionEngineImpl::Search(ExecutionEngineContext& context) {
if (field->GetFtype() == (int)engine::DataType::VECTOR_FLOAT ||
field->GetFtype() == (int)engine::DataType::VECTOR_BINARY) {
segment_ptr->GetVectorIndex(field->GetName(), vec_index);
} else if (type == (int)engine::DataType::UID) {
continue;
} else {
attr_type.insert(std::make_pair(field->GetName(), (engine::DataType)type));
}
......
......@@ -49,7 +49,7 @@ class ExecutionEngineImpl : public ExecutionEngine {
CreateVecIndex(const std::string& index_name);
Status
CreateStructuredIndex(const engine::DataType field_type, std::vector<uint8_t>& raw_data,
CreateStructuredIndex(const engine::DataType field_type, engine::BinaryDataPtr& raw_data,
knowhere::IndexPtr& index_ptr);
Status
......
......@@ -69,8 +69,11 @@ MemManagerImpl::ValidateChunk(int64_t collection_id, const DataChunkPtr& chunk)
LOG_ENGINE_ERROR_ << err_msg;
return Status(DB_ERROR, err_msg);
}
if (iter->second == nullptr) {
continue;
}
size_t data_size = iter->second.size();
size_t data_size = iter->second->data_.size();
snapshot::FieldPtr field = ss->GetField(name);
DataType ftype = static_cast<DataType>(field->GetFtype());
......@@ -106,7 +109,6 @@ MemManagerImpl::ValidateChunk(int64_t collection_id, const DataChunkPtr& chunk)
return Status(DB_ERROR, err_msg + name);
}
break;
case DataType::UID:
case DataType::INT64:
if (data_size != chunk->count_ * sizeof(uint64_t)) {
return Status(DB_ERROR, err_msg + name);
......
......@@ -145,7 +145,6 @@ MemSegment::GetSingleEntitySize(int64_t& single_size) {
case DataType::INT32:
single_size += sizeof(uint32_t);
break;
case DataType::UID:
case DataType::INT64:
single_size += sizeof(uint64_t);
break;
......@@ -202,13 +201,13 @@ MemSegment::Delete(segment::doc_id_t doc_id) {
segment_writer_ptr_->GetSegment(segment_ptr);
// Check wither the doc_id is present, if yes, delete it's corresponding buffer
engine::FIXED_FIELD_DATA raw_data;
engine::BinaryDataPtr raw_data;
auto status = segment_ptr->GetFixedFieldData(engine::DEFAULT_UID_NAME, raw_data);
if (!status.ok()) {
return Status::OK();
}
int64_t* uids = reinterpret_cast<int64_t*>(raw_data.data());
int64_t* uids = reinterpret_cast<int64_t*>(raw_data->data_.data());
int64_t row_count = segment_ptr->GetRowCount();
for (int64_t i = 0; i < row_count; i++) {
if (doc_id == uids[i]) {
......@@ -231,13 +230,13 @@ MemSegment::Delete(const std::vector<segment::doc_id_t>& doc_ids) {
std::sort(temp.begin(), temp.end());
engine::FIXED_FIELD_DATA raw_data;
engine::BinaryDataPtr raw_data;
auto status = segment_ptr->GetFixedFieldData(engine::DEFAULT_UID_NAME, raw_data);
if (!status.ok()) {
return Status::OK();
}
int64_t* uids = reinterpret_cast<int64_t*>(raw_data.data());
int64_t* uids = reinterpret_cast<int64_t*>(raw_data->data_.data());
int64_t row_count = segment_ptr->GetRowCount();
size_t deleted = 0;
for (int64_t i = 0; i < row_count; ++i) {
......
......@@ -1306,58 +1306,58 @@ const char descriptor_table_protodef_milvus_2eproto[] PROTOBUF_SECTION_VARIABLE(
"ame\030\001 \001(\t\022\033\n\023partition_tag_array\030\002 \003(\t\0220"
"\n\rgeneral_query\030\003 \001(\0132\031.milvus.grpc.Gene"
"ralQuery\022/\n\014extra_params\030\004 \003(\0132\031.milvus."
"grpc.KeyValuePair*\236\001\n\010DataType\022\010\n\004NONE\020\000"
"grpc.KeyValuePair*\221\001\n\010DataType\022\010\n\004NONE\020\000"
"\022\010\n\004BOOL\020\001\022\010\n\004INT8\020\002\022\t\n\005INT16\020\003\022\t\n\005INT32"
"\020\004\022\t\n\005INT64\020\005\022\t\n\005FLOAT\020\n\022\n\n\006DOUBLE\020\013\022\n\n\006"
"STRING\020\024\022\021\n\rVECTOR_BINARY\020d\022\020\n\014VECTOR_FL"
"OAT\020e\022\013\n\006VECTOR\020\310\001*C\n\017CompareOperator\022\006\n"
"\002LT\020\000\022\007\n\003LTE\020\001\022\006\n\002EQ\020\002\022\006\n\002GT\020\003\022\007\n\003GTE\020\004\022"
"\006\n\002NE\020\005*8\n\005Occur\022\013\n\007INVALID\020\000\022\010\n\004MUST\020\001\022"
"\n\n\006SHOULD\020\002\022\014\n\010MUST_NOT\020\0032\335\r\n\rMilvusServ"
"ice\022\?\n\020CreateCollection\022\024.milvus.grpc.Ma"
"pping\032\023.milvus.grpc.Status\"\000\022F\n\rHasColle"
"ction\022\033.milvus.grpc.CollectionName\032\026.mil"
"vus.grpc.BoolReply\"\000\022I\n\022DescribeCollecti"
"on\022\033.milvus.grpc.CollectionName\032\024.milvus"
".grpc.Mapping\"\000\022Q\n\017CountCollection\022\033.mil"
"vus.grpc.CollectionName\032\037.milvus.grpc.Co"
"llectionRowCount\"\000\022J\n\017ShowCollections\022\024."
"milvus.grpc.Command\032\037.milvus.grpc.Collec"
"tionNameList\"\000\022P\n\022ShowCollectionInfo\022\033.m"
"ilvus.grpc.CollectionName\032\033.milvus.grpc."
"CollectionInfo\"\000\022D\n\016DropCollection\022\033.mil"
"vus.grpc.CollectionName\032\023.milvus.grpc.St"
"atus\"\000\022=\n\013CreateIndex\022\027.milvus.grpc.Inde"
"xParam\032\023.milvus.grpc.Status\"\000\022C\n\rDescrib"
"eIndex\022\027.milvus.grpc.IndexParam\032\027.milvus"
".grpc.IndexParam\"\000\022;\n\tDropIndex\022\027.milvus"
".grpc.IndexParam\032\023.milvus.grpc.Status\"\000\022"
"E\n\017CreatePartition\022\033.milvus.grpc.Partiti"
"onParam\032\023.milvus.grpc.Status\"\000\022E\n\014HasPar"
"tition\022\033.milvus.grpc.PartitionParam\032\026.mi"
"lvus.grpc.BoolReply\"\000\022K\n\016ShowPartitions\022"
"\033.milvus.grpc.CollectionName\032\032.milvus.gr"
"pc.PartitionList\"\000\022C\n\rDropPartition\022\033.mi"
"lvus.grpc.PartitionParam\032\023.milvus.grpc.S"
"tatus\"\000\022<\n\006Insert\022\030.milvus.grpc.InsertPa"
"ram\032\026.milvus.grpc.EntityIds\"\000\022E\n\rGetEnti"
"tyByID\022\033.milvus.grpc.EntityIdentity\032\025.mi"
"lvus.grpc.Entities\"\000\022H\n\014GetEntityIDs\022\036.m"
"ilvus.grpc.GetEntityIDsParam\032\026.milvus.gr"
"pc.EntityIds\"\000\022>\n\006Search\022\030.milvus.grpc.S"
"earchParam\032\030.milvus.grpc.QueryResult\"\000\022P"
"\n\017SearchInSegment\022!.milvus.grpc.SearchIn"
"SegmentParam\032\030.milvus.grpc.QueryResult\"\000"
"\0227\n\003Cmd\022\024.milvus.grpc.Command\032\030.milvus.g"
"rpc.StringReply\"\000\022A\n\nDeleteByID\022\034.milvus"
".grpc.DeleteByIDParam\032\023.milvus.grpc.Stat"
"us\"\000\022G\n\021PreloadCollection\022\033.milvus.grpc."
"CollectionName\032\023.milvus.grpc.Status\"\000\0227\n"
"\005Flush\022\027.milvus.grpc.FlushParam\032\023.milvus"
".grpc.Status\"\000\022=\n\007Compact\022\033.milvus.grpc."
"CollectionName\032\023.milvus.grpc.Status\"\000\022B\n"
"\010SearchPB\022\032.milvus.grpc.SearchParamPB\032\030."
"milvus.grpc.QueryResult\"\000b\006proto3"
"OAT\020e*C\n\017CompareOperator\022\006\n\002LT\020\000\022\007\n\003LTE\020"
"\001\022\006\n\002EQ\020\002\022\006\n\002GT\020\003\022\007\n\003GTE\020\004\022\006\n\002NE\020\005*8\n\005Oc"
"cur\022\013\n\007INVALID\020\000\022\010\n\004MUST\020\001\022\n\n\006SHOULD\020\002\022\014"
"\n\010MUST_NOT\020\0032\335\r\n\rMilvusService\022\?\n\020Create"
"Collection\022\024.milvus.grpc.Mapping\032\023.milvu"
"s.grpc.Status\"\000\022F\n\rHasCollection\022\033.milvu"
"s.grpc.CollectionName\032\026.milvus.grpc.Bool"
"Reply\"\000\022I\n\022DescribeCollection\022\033.milvus.g"
"rpc.CollectionName\032\024.milvus.grpc.Mapping"
"\"\000\022Q\n\017CountCollection\022\033.milvus.grpc.Coll"
"ectionName\032\037.milvus.grpc.CollectionRowCo"
"unt\"\000\022J\n\017ShowCollections\022\024.milvus.grpc.C"
"ommand\032\037.milvus.grpc.CollectionNameList\""
"\000\022P\n\022ShowCollectionInfo\022\033.milvus.grpc.Co"
"llectionName\032\033.milvus.grpc.CollectionInf"
"o\"\000\022D\n\016DropCollection\022\033.milvus.grpc.Coll"
"ectionName\032\023.milvus.grpc.Status\"\000\022=\n\013Cre"
"ateIndex\022\027.milvus.grpc.IndexParam\032\023.milv"
"us.grpc.Status\"\000\022C\n\rDescribeIndex\022\027.milv"
"us.grpc.IndexParam\032\027.milvus.grpc.IndexPa"
"ram\"\000\022;\n\tDropIndex\022\027.milvus.grpc.IndexPa"
"ram\032\023.milvus.grpc.Status\"\000\022E\n\017CreatePart"
"ition\022\033.milvus.grpc.PartitionParam\032\023.mil"
"vus.grpc.Status\"\000\022E\n\014HasPartition\022\033.milv"
"us.grpc.PartitionParam\032\026.milvus.grpc.Boo"
"lReply\"\000\022K\n\016ShowPartitions\022\033.milvus.grpc"
".CollectionName\032\032.milvus.grpc.PartitionL"
"ist\"\000\022C\n\rDropPartition\022\033.milvus.grpc.Par"
"titionParam\032\023.milvus.grpc.Status\"\000\022<\n\006In"
"sert\022\030.milvus.grpc.InsertParam\032\026.milvus."
"grpc.EntityIds\"\000\022E\n\rGetEntityByID\022\033.milv"
"us.grpc.EntityIdentity\032\025.milvus.grpc.Ent"
"ities\"\000\022H\n\014GetEntityIDs\022\036.milvus.grpc.Ge"
"tEntityIDsParam\032\026.milvus.grpc.EntityIds\""
"\000\022>\n\006Search\022\030.milvus.grpc.SearchParam\032\030."
"milvus.grpc.QueryResult\"\000\022P\n\017SearchInSeg"
"ment\022!.milvus.grpc.SearchInSegmentParam\032"
"\030.milvus.grpc.QueryResult\"\000\0227\n\003Cmd\022\024.mil"
"vus.grpc.Command\032\030.milvus.grpc.StringRep"
"ly\"\000\022A\n\nDeleteByID\022\034.milvus.grpc.DeleteB"
"yIDParam\032\023.milvus.grpc.Status\"\000\022G\n\021Prelo"
"adCollection\022\033.milvus.grpc.CollectionNam"
"e\032\023.milvus.grpc.Status\"\000\0227\n\005Flush\022\027.milv"
"us.grpc.FlushParam\032\023.milvus.grpc.Status\""
"\000\022=\n\007Compact\022\033.milvus.grpc.CollectionNam"
"e\032\023.milvus.grpc.Status\"\000\022B\n\010SearchPB\022\032.m"
"ilvus.grpc.SearchParamPB\032\030.milvus.grpc.Q"
"ueryResult\"\000b\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_milvus_2eproto_deps[1] = {
&::descriptor_table_status_2eproto,
......@@ -1406,7 +1406,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_mil
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_milvus_2eproto_once;
static bool descriptor_table_milvus_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_milvus_2eproto = {
&descriptor_table_milvus_2eproto_initialized, descriptor_table_protodef_milvus_2eproto, "milvus.proto", 6233,
&descriptor_table_milvus_2eproto_initialized, descriptor_table_protodef_milvus_2eproto, "milvus.proto", 6220,
&descriptor_table_milvus_2eproto_once, descriptor_table_milvus_2eproto_sccs, descriptor_table_milvus_2eproto_deps, 39, 1,
schemas, file_default_instances, TableStruct_milvus_2eproto::offsets,
file_level_metadata_milvus_2eproto, 40, file_level_enum_descriptors_milvus_2eproto, file_level_service_descriptors_milvus_2eproto,
......@@ -1433,7 +1433,6 @@ bool DataType_IsValid(int value) {
case 20:
case 100:
case 101:
case 200:
return true;
default:
return false;
......
......@@ -237,13 +237,12 @@ enum DataType : int {
STRING = 20,
VECTOR_BINARY = 100,
VECTOR_FLOAT = 101,
VECTOR = 200,
DataType_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(),
DataType_INT_MAX_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::max()
};
bool DataType_IsValid(int value);
constexpr DataType DataType_MIN = NONE;
constexpr DataType DataType_MAX = VECTOR;
constexpr DataType DataType_MAX = VECTOR_FLOAT;
constexpr int DataType_ARRAYSIZE = DataType_MAX + 1;
const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* DataType_descriptor();
......
......@@ -22,7 +22,6 @@ enum DataType {
VECTOR_BINARY = 100;
VECTOR_FLOAT = 101;
VECTOR = 200;
}
/**
......
......@@ -51,7 +51,6 @@ Segment::AddField(const std::string& field_name, DataType field_type, int64_t fi
case DataType::INT32:
real_field_width = sizeof(uint32_t);
break;
case DataType::UID:
case DataType::INT64:
real_field_width = sizeof(uint64_t);
break;
......@@ -92,12 +91,16 @@ Segment::AddChunk(const DataChunkPtr& chunk_ptr, int64_t from, int64_t to) {
// check input
for (auto& iter : chunk_ptr->fixed_fields_) {
if (iter.second == nullptr) {
return Status(DB_ERROR, "illegal field: " + iter.first);
}
auto width_iter = fixed_fields_width_.find(iter.first);
if (width_iter == fixed_fields_width_.end()) {
return Status(DB_ERROR, "field not yet defined: " + iter.first);
}
if (iter.second.size() != width_iter->second * chunk_ptr->count_) {
if (iter.second->Size() != width_iter->second * chunk_ptr->count_) {
return Status(DB_ERROR, "illegal field: " + iter.first);
}
}
......@@ -107,21 +110,27 @@ Segment::AddChunk(const DataChunkPtr& chunk_ptr, int64_t from, int64_t to) {
for (auto& width_iter : fixed_fields_width_) {
auto input = chunk_ptr->fixed_fields_.find(width_iter.first);
auto& data = fixed_fields_[width_iter.first];
size_t origin_bytes = data.size();
if (data == nullptr) {
fixed_fields_[width_iter.first] = input->second;
continue;
}
size_t origin_bytes = data->data_.size();
int64_t add_bytes = add_count * width_iter.second;
int64_t previous_bytes = row_count_ * width_iter.second;
int64_t target_bytes = previous_bytes + add_bytes;
data.resize(target_bytes);
data->data_.resize(target_bytes);
if (input == chunk_ptr->fixed_fields_.end()) {
// this field is not provided, complicate by 0
memset(data.data() + origin_bytes, 0, target_bytes - origin_bytes);
memset(data->data_.data() + origin_bytes, 0, target_bytes - origin_bytes);
} else {
// complicate by 0
if (origin_bytes < previous_bytes) {
memset(data.data() + origin_bytes, 0, previous_bytes - origin_bytes);
memset(data->data_.data() + origin_bytes, 0, previous_bytes - origin_bytes);
}
// copy input into this field
memcpy(data.data() + previous_bytes, input->second.data() + from * width_iter.second, add_bytes);
memcpy(data->data_.data() + previous_bytes, input->second->data_.data() + from * width_iter.second,
add_bytes);
}
}
......@@ -132,12 +141,20 @@ Segment::AddChunk(const DataChunkPtr& chunk_ptr, int64_t from, int64_t to) {
Status
Segment::DeleteEntity(int64_t offset) {
if (offset > row_count_) {
return Status(DB_ERROR, "Invalid input");
}
for (auto& pair : fixed_fields_) {
int64_t width = fixed_fields_width_[pair.first];
if (width != 0) {
auto step = offset * width;
FIXED_FIELD_DATA& data = pair.second;
data.erase(data.begin() + step, data.begin() + step + width);
BinaryDataPtr& data = pair.second;
if (data == nullptr) {
continue;
}
data->data_.erase(data->data_.begin() + step, data->data_.begin() + step + width);
}
}
......@@ -167,7 +184,7 @@ Segment::GetFixedFieldWidth(const std::string& field_name, int64_t& width) {
}
Status
Segment::GetFixedFieldData(const std::string& field_name, FIXED_FIELD_DATA& data) {
Segment::GetFixedFieldData(const std::string& field_name, BinaryDataPtr& data) {
auto iter = fixed_fields_.find(field_name);
if (iter == fixed_fields_.end()) {
return Status(DB_ERROR, "invalid field name: " + field_name);
......
......@@ -52,7 +52,7 @@ class Segment {
GetFixedFieldWidth(const std::string& field_name, int64_t& width);
Status
GetFixedFieldData(const std::string& field_name, FIXED_FIELD_DATA& data);
GetFixedFieldData(const std::string& field_name, BinaryDataPtr& data);
Status
GetVectorIndex(const std::string& field_name, knowhere::VecIndexPtr& index);
......
......@@ -101,7 +101,7 @@ SegmentReader::Load() {
}
Status
SegmentReader::LoadField(const std::string& field_name, std::vector<uint8_t>& raw) {
SegmentReader::LoadField(const std::string& field_name, engine::BinaryDataPtr& raw) {
try {
engine::FIXEDX_FIELD_MAP& field_map = segment_ptr_->GetFixedFields();
auto pair = field_map.find(field_name);
......@@ -133,10 +133,10 @@ SegmentReader::LoadFields() {
for (auto& iter : field_visitors_map) {
const engine::snapshot::FieldPtr& field = iter.second->GetField();
std::string name = field->GetName();
engine::FIXED_FIELD_DATA raw_data;
engine::BinaryDataPtr raw_data;
auto status = segment_ptr_->GetFixedFieldData(name, raw_data);
if (!status.ok() || raw_data.empty()) {
if (!status.ok() || raw_data == nullptr) {
auto element_visitor = iter.second->GetElementVisitor(engine::FieldElementType::FET_RAW);
std::string file_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(
dir_collections_, element_visitor->GetFile());
......@@ -149,7 +149,7 @@ SegmentReader::LoadFields() {
Status
SegmentReader::LoadEntities(const std::string& field_name, const std::vector<int64_t>& offsets,
std::vector<uint8_t>& raw) {
engine::BinaryDataPtr& raw) {
try {
auto field_visitor = segment_visitor_->GetFieldVisitor(field_name);
auto raw_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW);
......@@ -185,16 +185,18 @@ SegmentReader::LoadFieldsEntities(const std::vector<std::string>& fields_name, c
}
data_chunk->count_ += offsets.size();
for (auto& name : fields_name) {
engine::FIXED_FIELD_DATA raw_data;
engine::BinaryDataPtr raw_data;
auto status = LoadEntities(name, offsets, raw_data);
if (!status.ok()) {
if (!status.ok() || raw_data == nullptr) {
return status;
}
if (!data_chunk->fixed_fields_[name].empty()) {
auto chunk_size = data_chunk->fixed_fields_[name].size();
auto raw_data_size = raw_data.size();
data_chunk->fixed_fields_[name].resize(chunk_size + raw_data_size);
memcpy(data_chunk->fixed_fields_[name].data() + chunk_size, raw_data.data(), raw_data_size);
auto& target_data = data_chunk->fixed_fields_[name];
if (target_data != nullptr) {
auto chunk_size = target_data->Size();
auto raw_data_size = raw_data->Size();
target_data->data_.resize(chunk_size + raw_data_size);
memcpy(target_data->data_.data() + chunk_size, raw_data->data_.data(), raw_data_size);
} else {
data_chunk->fixed_fields_[name] = raw_data;
}
......@@ -204,22 +206,22 @@ SegmentReader::LoadFieldsEntities(const std::vector<std::string>& fields_name, c
Status
SegmentReader::LoadUids(std::vector<int64_t>& uids) {
std::vector<uint8_t> raw;
engine::BinaryDataPtr raw;
auto status = LoadField(engine::DEFAULT_UID_NAME, raw);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << status.message();
return status;
}
if (raw.size() % sizeof(int64_t) != 0) {
if (raw->data_.size() % sizeof(int64_t) != 0) {
std::string err_msg = "Failed to load uids: illegal file size";
LOG_ENGINE_ERROR_ << err_msg;
return Status(DB_ERROR, err_msg);
}
uids.clear();
uids.resize(raw.size() / sizeof(int64_t));
memcpy(uids.data(), raw.data(), raw.size());
uids.resize(raw->data_.size() / sizeof(int64_t));
memcpy(uids.data(), raw->data_.data(), raw->data_.size());
return Status::OK();
}
......@@ -264,7 +266,7 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
knowhere::BinaryPtr raw_data, compress_data;
auto read_raw = [&]() -> void {
engine::FIXED_FIELD_DATA fixed_data;
engine::BinaryDataPtr fixed_data;
auto status = segment_ptr_->GetFixedFieldData(field_name, fixed_data);
if (status.ok()) {
ss_codec.GetVectorIndexFormat()->ConvertRaw(fixed_data, raw_data);
......@@ -283,9 +285,10 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
return Status(DB_ERROR, "Vector field dimension undefined");
}
int64_t dimension = json[knowhere::meta::DIM];
std::vector<uint8_t> raw;
engine::BinaryDataPtr raw;
STATUS_CHECK(LoadField(field_name, raw));
auto dataset = knowhere::GenDataset(segment_commit->GetRowCount(), dimension, raw.data());
auto dataset = knowhere::GenDataset(segment_commit->GetRowCount(), dimension, raw->data_.data());
knowhere::VecIndexFactory& vec_index_factory = knowhere::VecIndexFactory::GetInstance();
index_ptr =
......
......@@ -37,13 +37,13 @@ class SegmentReader {
Load();
Status
LoadField(const std::string& field_name, std::vector<uint8_t>& raw);
LoadField(const std::string& field_name, engine::BinaryDataPtr& raw);
Status
LoadFields();
Status
LoadEntities(const std::string& field_name, const std::vector<int64_t>& offsets, std::vector<uint8_t>& raw);
LoadEntities(const std::string& field_name, const std::vector<int64_t>& offsets, engine::BinaryDataPtr& raw);
Status
LoadFieldsEntities(const std::vector<std::string>& fields_name, const std::vector<int64_t>& offsets,
......
......@@ -112,7 +112,7 @@ SegmentWriter::Serialize() {
}
Status
SegmentWriter::WriteField(const std::string& file_path, const engine::FIXED_FIELD_DATA& raw) {
SegmentWriter::WriteField(const std::string& file_path, const engine::BinaryDataPtr& raw) {
try {
auto& ss_codec = codec::Codec::instance();
ss_codec.GetBlockFormat()->Write(fs_ptr_, file_path, raw);
......@@ -134,7 +134,7 @@ SegmentWriter::WriteFields() {
for (auto& iter : field_visitors_map) {
const engine::snapshot::FieldPtr& field = iter.second->GetField();
std::string name = field->GetName();
engine::FIXED_FIELD_DATA raw_data;
engine::BinaryDataPtr raw_data;
segment_ptr_->GetFixedFieldData(name, raw_data);
auto element_visitor = iter.second->GetElementVisitor(engine::FieldElementType::FET_RAW);
......@@ -161,7 +161,7 @@ SegmentWriter::WriteBloomFilter() {
try {
TimeRecorder recorder("SegmentWriter::WriteBloomFilter");
engine::FIXED_FIELD_DATA uid_data;
engine::BinaryDataPtr uid_data;
auto status = segment_ptr_->GetFixedFieldData(engine::DEFAULT_UID_NAME, uid_data);
if (!status.ok()) {
return status;
......@@ -179,7 +179,7 @@ SegmentWriter::WriteBloomFilter() {
segment::IdBloomFilterPtr bloom_filter_ptr;
ss_codec.GetIdBloomFilterFormat()->Create(fs_ptr_, file_path, bloom_filter_ptr);
int64_t* uids = (int64_t*)(uid_data.data());
int64_t* uids = (int64_t*)(uid_data->data_.data());
int64_t row_count = segment_ptr_->GetRowCount();
for (int64_t i = 0; i < row_count; i++) {
bloom_filter_ptr->Add(uids[i]);
......@@ -332,13 +332,13 @@ SegmentWriter::Merge(const SegmentReaderPtr& segment_reader) {
for (auto& iter : field_visitors_map) {
const engine::snapshot::FieldPtr& field = iter.second->GetField();
std::string name = field->GetName();
engine::FIXED_FIELD_DATA raw_data;
engine::BinaryDataPtr raw_data;
segment_reader->LoadField(name, raw_data);
chunk->fixed_fields_[name] = raw_data;
}
auto& uid_data = chunk->fixed_fields_[engine::DEFAULT_UID_NAME];
chunk->count_ = uid_data.size() / sizeof(int64_t);
chunk->count_ = uid_data->data_.size() / sizeof(int64_t);
status = AddChunk(chunk);
if (!status.ok()) {
return status;
......
......@@ -96,7 +96,7 @@ class SegmentWriter {
Initialize();
Status
WriteField(const std::string& file_path, const engine::FIXED_FIELD_DATA& raw);
WriteField(const std::string& file_path, const engine::BinaryDataPtr& raw);
Status
WriteFields();
......
......@@ -77,6 +77,14 @@ CreateCollectionReq::OnExecute() {
index_name = index_params["name"];
}
// validate id field
if (field_name == engine::DEFAULT_UID_NAME) {
if (field_type != engine::DataType::INT64) {
return Status(DB_ERROR, "Field '_id' data type must be int64");
}
}
// validate vector field dimension
if (field_type == engine::DataType::VECTOR_FLOAT || field_type == engine::DataType::VECTOR_BINARY) {
if (!field_params.contains(engine::PARAM_DIMENSION)) {
return Status(SERVER_INVALID_VECTOR_DIMENSION, "Dimension not defined in field_params");
......
......@@ -51,9 +51,6 @@ GetCollectionInfoReq::OnExecute() {
collection_schema_.extra_params_ = collection->GetParams();
for (auto& field_kv : field_mappings) {
auto field = field_kv.first;
if (field->GetFtype() == (engine::snapshot::FTYPE_TYPE)engine::DataType::UID) {
continue;
}
milvus::json field_index_param;
auto field_elements = field_kv.second;
......
......@@ -67,13 +67,17 @@ InsertReq::OnExecute() {
engine::DataChunkPtr data_chunk = std::make_shared<engine::DataChunk>();
data_chunk->count_ = row_count_;
data_chunk->fixed_fields_.swap(chunk_data_);
for (auto& pair : chunk_data_) {
engine::BinaryDataPtr bin = std::make_shared<engine::BinaryData>();
bin->data_.swap(pair.second);
data_chunk->fixed_fields_.insert(std::make_pair(pair.first, bin));
}
status = DBWrapper::DB()->Insert(collection_name_, partition_name_, data_chunk);
if (!status.ok()) {
LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "Insert", 0, status.message().c_str());
return status;
}
chunk_data_[engine::DEFAULT_UID_NAME] = data_chunk->fixed_fields_[engine::DEFAULT_UID_NAME];
chunk_data_[engine::DEFAULT_UID_NAME] = data_chunk->fixed_fields_[engine::DEFAULT_UID_NAME]->data_;
rc.ElapseFromBegin("done");
} catch (std::exception& ex) {
......
......@@ -271,18 +271,18 @@ CopyDataChunkToEntity(const engine::DataChunkPtr& data_chunk,
std::string name = it.first->GetName();
// judge whether data exists
std::vector<uint8_t> data = data_chunk->fixed_fields_[name];
if (data.empty())
engine::BinaryDataPtr data = data_chunk->fixed_fields_[name];
if (data == nullptr || data->data_.empty())
continue;
auto single_size = data.size() / id_size;
auto single_size = data->data_.size() / id_size;
if (type == engine::DataType::UID) {
if (name == engine::DEFAULT_UID_NAME) {
int64_t int64_value;
auto int64_size = single_size * sizeof(int8_t) / sizeof(int64_t);
for (int i = 0; i < id_size; i++) {
auto offset = i * single_size;
memcpy(&int64_value, data.data() + offset, single_size);
memcpy(&int64_value, data->data_.data() + offset, single_size);
response->add_ids(int64_value);
}
continue;
......@@ -302,7 +302,7 @@ CopyDataChunkToEntity(const engine::DataChunkPtr& data_chunk,
for (int i = 0; i < id_size; i++) {
auto vector_row_record = vector_record->add_records();
auto offset = i * single_size;
memcpy(binary_vector.data(), data.data() + offset, single_size);
memcpy(binary_vector.data(), data->data_.data() + offset, single_size);
vector_row_record->mutable_binary_data()->resize(binary_vector.size());
memcpy(vector_row_record->mutable_binary_data()->data(), binary_vector.data(), binary_vector.size());
}
......@@ -315,7 +315,7 @@ CopyDataChunkToEntity(const engine::DataChunkPtr& data_chunk,
for (int i = 0; i < id_size; i++) {
auto vector_row_record = vector_record->add_records();
auto offset = i * single_size;
memcpy(float_vector.data(), data.data() + offset, single_size);
memcpy(float_vector.data(), data->data_.data() + offset, single_size);
vector_row_record->mutable_float_data()->Resize(vector_size, 0.0);
memcpy(vector_row_record->mutable_float_data()->mutable_data(), float_vector.data(),
float_vector.size() * sizeof(float));
......@@ -329,7 +329,7 @@ CopyDataChunkToEntity(const engine::DataChunkPtr& data_chunk,
auto int32_size = single_size * sizeof(int8_t) / sizeof(int32_t);
for (int i = 0; i < id_size; i++) {
auto offset = i * single_size;
memcpy(&int32_value, data.data() + offset, single_size);
memcpy(&int32_value, data->data_.data() + offset, single_size);
attr_record->add_int32_value(int32_value);
}
} else if (type == engine::DataType::INT64) {
......@@ -338,7 +338,7 @@ CopyDataChunkToEntity(const engine::DataChunkPtr& data_chunk,
auto int64_size = single_size * sizeof(int8_t) / sizeof(int64_t);
for (int i = 0; i < id_size; i++) {
auto offset = i * single_size;
memcpy(&int64_value, data.data() + offset, single_size);
memcpy(&int64_value, data->data_.data() + offset, single_size);
attr_record->add_int64_value(int64_value);
}
} else if (type == engine::DataType::DOUBLE) {
......@@ -347,7 +347,7 @@ CopyDataChunkToEntity(const engine::DataChunkPtr& data_chunk,
auto int32_size = single_size * sizeof(int8_t) / sizeof(double);
for (int i = 0; i < id_size; i++) {
auto offset = i * single_size;
memcpy(&double_value, data.data() + offset, single_size);
memcpy(&double_value, data->data_.data() + offset, single_size);
attr_record->add_double_value(double_value);
}
} else if (type == engine::DataType::FLOAT) {
......@@ -356,7 +356,7 @@ CopyDataChunkToEntity(const engine::DataChunkPtr& data_chunk,
auto float_size = single_size * sizeof(int8_t) / sizeof(float);
for (int i = 0; i < id_size; i++) {
auto offset = i * single_size;
memcpy(&float_value, data.data() + offset, single_size);
memcpy(&float_value, data->data_.data() + offset, single_size);
attr_record->add_float_value(float_value);
}
}
......
......@@ -814,12 +814,12 @@ WebRequestHandler::GetEntityByIDs(const std::string& collection_name, const std:
if (!status.ok()) {
return status;
}
std::vector<uint8_t> id_array = data_chunk->fixed_fields_[engine::DEFAULT_UID_NAME];
std::vector<uint8_t> id_array = data_chunk->fixed_fields_[engine::DEFAULT_UID_NAME]->data_;
for (const auto& it : field_mappings) {
std::string name = it.first->GetName();
uint64_t type = it.first->GetFtype();
std::vector<uint8_t> data = data_chunk->fixed_fields_[name];
std::vector<uint8_t>& data = data_chunk->fixed_fields_[name]->data_;
if (type == engine::DataType::VECTOR_BINARY) {
engine::VectorsData vectors_data;
memcpy(vectors_data.binary_data_.data(), data.data(), data.size());
......
......@@ -95,9 +95,10 @@ BuildEntities(uint64_t n, uint64_t batch_index, milvus::engine::DataChunkPtr& da
vectors.id_array_.push_back(n * batch_index + i);
}
milvus::engine::FIXED_FIELD_DATA& raw = data_chunk->fixed_fields_[VECTOR_FIELD_NAME];
raw.resize(vectors.float_data_.size() * sizeof(float));
memcpy(raw.data(), vectors.float_data_.data(), vectors.float_data_.size() * sizeof(float));
milvus::engine::BinaryDataPtr raw = std::make_shared<milvus::engine::BinaryData>();
raw->data_.resize(vectors.float_data_.size() * sizeof(float));
memcpy(raw->data_.data(), vectors.float_data_.data(), vectors.float_data_.size() * sizeof(float));
data_chunk->fixed_fields_[VECTOR_FIELD_NAME] = raw;
std::vector<int32_t> value_0;
std::vector<int64_t> value_1;
......@@ -115,21 +116,24 @@ BuildEntities(uint64_t n, uint64_t batch_index, milvus::engine::DataChunkPtr& da
}
{
milvus::engine::FIXED_FIELD_DATA& raw = data_chunk->fixed_fields_["field_0"];
raw.resize(value_0.size() * sizeof(int32_t));
memcpy(raw.data(), value_0.data(), value_0.size() * sizeof(int32_t));
milvus::engine::BinaryDataPtr raw = std::make_shared<milvus::engine::BinaryData>();
raw->data_.resize(value_0.size() * sizeof(int32_t));
memcpy(raw->data_.data(), value_0.data(), value_0.size() * sizeof(int32_t));
data_chunk->fixed_fields_["field_0"] = raw;
}
{
milvus::engine::FIXED_FIELD_DATA& raw = data_chunk->fixed_fields_["field_1"];
raw.resize(value_1.size() * sizeof(int64_t));
memcpy(raw.data(), value_1.data(), value_1.size() * sizeof(int64_t));
milvus::engine::BinaryDataPtr raw = std::make_shared<milvus::engine::BinaryData>();
raw->data_.resize(value_1.size() * sizeof(int64_t));
memcpy(raw->data_.data(), value_1.data(), value_1.size() * sizeof(int64_t));
data_chunk->fixed_fields_["field_1"] = raw;
}
{
milvus::engine::FIXED_FIELD_DATA& raw = data_chunk->fixed_fields_["field_2"];
raw.resize(value_2.size() * sizeof(double));
memcpy(raw.data(), value_2.data(), value_2.size() * sizeof(double));
milvus::engine::BinaryDataPtr raw = std::make_shared<milvus::engine::BinaryData>();
raw->data_.resize(value_2.size() * sizeof(double));
memcpy(raw->data_.data(), value_2.data(), value_2.size() * sizeof(double));
data_chunk->fixed_fields_["field_2"] = raw;
}
}
} // namespace
......
......@@ -39,7 +39,7 @@ CreateCollection(std::shared_ptr<DBImpl> db, const std::string& collection_name,
int64_t field_id = 0;
/* field uid */
auto uid_field = std::make_shared<Field>(milvus::engine::DEFAULT_UID_NAME, 0,
milvus::engine::DataType::UID, milvus::engine::snapshot::JEmpty, field_id);
milvus::engine::DataType::INT64, milvus::engine::snapshot::JEmpty, field_id);
auto uid_field_element_blt = std::make_shared<FieldElement>(collection_id, field_id,
milvus::engine::DEFAULT_BLOOM_FILTER_NAME, milvus::engine::FieldElementType::FET_BLOOM_FILTER);
auto uid_field_element_del = std::make_shared<FieldElement>(collection_id, field_id,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册