提交 4b887cf0 编写于 作者: P peng.xu

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

add PreloadTable and some parameters

See merge request megasearch/milvus!355

Former-commit-id: b9e0a21d137dfb22983d1ed4e71e1f91354bd49c
......@@ -28,6 +28,7 @@ public:
virtual Status HasTable(const std::string& table_id, bool& has_or_not_) = 0;
virtual Status AllTables(std::vector<meta::TableSchema>& table_schema_array) = 0;
virtual Status GetTableRowCount(const std::string& table_id, uint64_t& row_count) = 0;
virtual Status PreloadTable(const std::string& table_id) = 0;
virtual Status InsertVectors(const std::string& table_id_,
uint64_t n, const float* vectors, IDNumbers& vector_ids_) = 0;
......
......@@ -127,6 +127,46 @@ Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
return meta_ptr_->AllTables(table_schema_array);
}
Status DBImpl::PreloadTable(const std::string &table_id) {
meta::DatePartionedTableFilesSchema files;
meta::DatesT dates;
auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
if (!status.ok()) {
return status;
}
int64_t size = 0;
int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
int64_t available_size = cache_total - cache_usage;
for(auto &day_files : files) {
for (auto &file : day_files.second) {
ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
if(engine == nullptr) {
ENGINE_LOG_ERROR << "Invalid engine type";
return Status::Error("Invalid engine type");
}
size += engine->PhysicalSize();
if (size > available_size) {
break;
} else {
try {
//step 1: load index
engine->Load(true);
} catch (std::exception &ex) {
std::string msg = "load to cache exception" + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
return Status::Error(msg);
}
}
}
}
return Status::OK();
}
Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
return meta_ptr_->Count(table_id, row_count);
}
......
......@@ -51,6 +51,9 @@ class DBImpl : public DB {
Status
AllTables(std::vector<meta::TableSchema> &table_schema_array) override;
Status
PreloadTable(const std::string &table_id) override;
Status
GetTableRowCount(const std::string &table_id, uint64_t &row_count) override;
......
......@@ -838,6 +838,7 @@ class InsertParam :
enum : int {
kRowRecordArrayFieldNumber = 2,
kRowIdArrayFieldNumber = 3,
kTableNameFieldNumber = 1,
};
// repeated .milvus.grpc.RowRecord row_record_array = 2;
......@@ -851,6 +852,17 @@ class InsertParam :
const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::RowRecord >&
row_record_array() const;
// repeated int64 row_id_array = 3;
int row_id_array_size() const;
void clear_row_id_array();
::PROTOBUF_NAMESPACE_ID::int64 row_id_array(int index) const;
void set_row_id_array(int index, ::PROTOBUF_NAMESPACE_ID::int64 value);
void add_row_id_array(::PROTOBUF_NAMESPACE_ID::int64 value);
const ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >&
row_id_array() const;
::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >*
mutable_row_id_array();
// string table_name = 1;
void clear_table_name();
const std::string& table_name() const;
......@@ -868,6 +880,8 @@ class InsertParam :
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::RowRecord > row_record_array_;
::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 > row_id_array_;
mutable std::atomic<int> _row_id_array_cached_byte_size_;
::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr table_name_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_milvus_2eproto;
......@@ -1139,6 +1153,7 @@ class SearchParam :
kQueryRangeArrayFieldNumber = 3,
kTableNameFieldNumber = 1,
kTopkFieldNumber = 4,
kNprobeFieldNumber = 5,
};
// repeated .milvus.grpc.RowRecord query_record_array = 2;
int query_record_array_size() const;
......@@ -1178,6 +1193,11 @@ class SearchParam :
::PROTOBUF_NAMESPACE_ID::int64 topk() const;
void set_topk(::PROTOBUF_NAMESPACE_ID::int64 value);
// int64 nprobe = 5;
void clear_nprobe();
::PROTOBUF_NAMESPACE_ID::int64 nprobe() const;
void set_nprobe(::PROTOBUF_NAMESPACE_ID::int64 value);
// @@protoc_insertion_point(class_scope:milvus.grpc.SearchParam)
private:
class _Internal;
......@@ -1187,6 +1207,7 @@ class SearchParam :
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::Range > query_range_array_;
::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr table_name_;
::PROTOBUF_NAMESPACE_ID::int64 topk_;
::PROTOBUF_NAMESPACE_ID::int64 nprobe_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_milvus_2eproto;
};
......@@ -2312,6 +2333,7 @@ class Index :
kNlistFieldNumber = 2,
kIndexTypeFieldNumber = 1,
kIndexFileSizeFieldNumber = 3,
kMetricTypeFieldNumber = 4,
};
// int64 nlist = 2;
void clear_nlist();
......@@ -2328,6 +2350,11 @@ class Index :
::PROTOBUF_NAMESPACE_ID::int32 index_file_size() const;
void set_index_file_size(::PROTOBUF_NAMESPACE_ID::int32 value);
// int32 metric_type = 4;
void clear_metric_type();
::PROTOBUF_NAMESPACE_ID::int32 metric_type() const;
void set_metric_type(::PROTOBUF_NAMESPACE_ID::int32 value);
// @@protoc_insertion_point(class_scope:milvus.grpc.Index)
private:
class _Internal;
......@@ -2336,6 +2363,7 @@ class Index :
::PROTOBUF_NAMESPACE_ID::int64 nlist_;
::PROTOBUF_NAMESPACE_ID::int32 index_type_;
::PROTOBUF_NAMESPACE_ID::int32 index_file_size_;
::PROTOBUF_NAMESPACE_ID::int32 metric_type_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_milvus_2eproto;
};
......@@ -3059,6 +3087,36 @@ InsertParam::row_record_array() const {
return row_record_array_;
}
// repeated int64 row_id_array = 3;
inline int InsertParam::row_id_array_size() const {
return row_id_array_.size();
}
inline void InsertParam::clear_row_id_array() {
row_id_array_.Clear();
}
inline ::PROTOBUF_NAMESPACE_ID::int64 InsertParam::row_id_array(int index) const {
// @@protoc_insertion_point(field_get:milvus.grpc.InsertParam.row_id_array)
return row_id_array_.Get(index);
}
inline void InsertParam::set_row_id_array(int index, ::PROTOBUF_NAMESPACE_ID::int64 value) {
row_id_array_.Set(index, value);
// @@protoc_insertion_point(field_set:milvus.grpc.InsertParam.row_id_array)
}
inline void InsertParam::add_row_id_array(::PROTOBUF_NAMESPACE_ID::int64 value) {
row_id_array_.Add(value);
// @@protoc_insertion_point(field_add:milvus.grpc.InsertParam.row_id_array)
}
inline const ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >&
InsertParam::row_id_array() const {
// @@protoc_insertion_point(field_list:milvus.grpc.InsertParam.row_id_array)
return row_id_array_;
}
inline ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >*
InsertParam::mutable_row_id_array() {
// @@protoc_insertion_point(field_mutable_list:milvus.grpc.InsertParam.row_id_array)
return &row_id_array_;
}
// -------------------------------------------------------------------
// VectorIds
......@@ -3267,6 +3325,20 @@ inline void SearchParam::set_topk(::PROTOBUF_NAMESPACE_ID::int64 value) {
// @@protoc_insertion_point(field_set:milvus.grpc.SearchParam.topk)
}
// int64 nprobe = 5;
inline void SearchParam::clear_nprobe() {
nprobe_ = PROTOBUF_LONGLONG(0);
}
inline ::PROTOBUF_NAMESPACE_ID::int64 SearchParam::nprobe() const {
// @@protoc_insertion_point(field_get:milvus.grpc.SearchParam.nprobe)
return nprobe_;
}
inline void SearchParam::set_nprobe(::PROTOBUF_NAMESPACE_ID::int64 value) {
nprobe_ = value;
// @@protoc_insertion_point(field_set:milvus.grpc.SearchParam.nprobe)
}
// -------------------------------------------------------------------
// SearchInFilesParam
......@@ -3825,6 +3897,20 @@ inline void Index::set_index_file_size(::PROTOBUF_NAMESPACE_ID::int32 value) {
// @@protoc_insertion_point(field_set:milvus.grpc.Index.index_file_size)
}
// int32 metric_type = 4;
inline void Index::clear_metric_type() {
metric_type_ = 0;
}
inline ::PROTOBUF_NAMESPACE_ID::int32 Index::metric_type() const {
// @@protoc_insertion_point(field_get:milvus.grpc.Index.metric_type)
return metric_type_;
}
inline void Index::set_metric_type(::PROTOBUF_NAMESPACE_ID::int32 value) {
metric_type_ = value;
// @@protoc_insertion_point(field_set:milvus.grpc.Index.metric_type)
}
// -------------------------------------------------------------------
// IndexParam
......
......@@ -43,6 +43,7 @@ message RowRecord {
message InsertParam {
string table_name = 1;
repeated RowRecord row_record_array = 2;
repeated int64 row_id_array = 3; //optional
}
/**
......@@ -61,6 +62,7 @@ message SearchParam {
repeated RowRecord query_record_array = 2;
repeated Range query_range_array = 3;
int64 topk = 4;
int64 nprobe = 5;
}
/**
......@@ -125,6 +127,7 @@ message Index {
int32 index_type = 1;
int64 nlist = 2;
int32 index_file_size = 3;
int32 metric_type = 4;
}
/**
......
......@@ -328,7 +328,14 @@ ClientProxy::DeleteByRange(milvus::Range &range, const std::string &table_name)
Status
ClientProxy::PreloadTable(const std::string &table_name) const {
try {
::milvus::grpc::TableName grpc_table_name;
grpc_table_name.set_table_name(table_name);
Status status = client_ptr_->PreloadTable(grpc_table_name);
return status;
} catch (std::exception &ex) {
return Status(StatusCode::UnknownError, "fail to show tables: " + std::string(ex.what()));
}
}
IndexParam
......
......@@ -234,7 +234,7 @@ GrpcClient::Cmd(std::string &result,
result = response.string_reply();
if (!grpc_status.ok()) {
std::cerr << "Ping gRPC failed!" << std::endl;
std::cerr << "Cmd gRPC failed!" << std::endl;
return Status(StatusCode::RPCFailed, grpc_status.error_message());
}
......@@ -246,6 +246,24 @@ GrpcClient::Cmd(std::string &result,
return Status::OK();
}
Status
GrpcClient::PreloadTable(milvus::grpc::TableName &table_name) {
ClientContext context;
::milvus::grpc::Status response;
::grpc::Status grpc_status = stub_->PreloadTable(&context, table_name, &response);
if (!grpc_status.ok()) {
std::cerr << "PreloadTable gRPC failed!" << std::endl;
return Status(StatusCode::RPCFailed, grpc_status.error_message());
}
if (response.error_code() != grpc::SUCCESS) {
std::cerr << response.reason() << std::endl;
return Status(StatusCode::ServerFailed, response.reason());
}
return Status::OK();
}
Status
GrpcClient::Disconnect() {
stub_.release();
......
......@@ -175,7 +175,12 @@ GrpcRequestHandler::DeleteByRange(::grpc::ServerContext *context,
GrpcRequestHandler::PreloadTable(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request,
::milvus::grpc::Status *response) {
BaseTaskPtr task_ptr = PreloadTableTask::Create(request->table_name());
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
response->set_reason(grpc_status.reason());
response->set_error_code(grpc_status.error_code());
return ::grpc::Status::OK;
}
::grpc::Status
......
......@@ -706,6 +706,45 @@ CmdTask::OnExecute() {
return SERVER_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
PreloadTableTask::PreloadTableTask(const std::string &table_name)
: GrpcBaseTask(DDL_DML_TASK_GROUP),
table_name_(table_name) {
}
BaseTaskPtr
PreloadTableTask::Create(const std::string &table_name){
return std::shared_ptr<GrpcBaseTask>(new PreloadTableTask(table_name));
}
ServerError
PreloadTableTask::OnExecute() {
try {
TimeRecorder rc("PreloadTableTask");
//step 1: check arguments
ServerError res = ValidationUtil::ValidateTableName(table_name_);
if (res != SERVER_SUCCESS) {
return SetError(res, "Invalid table name: " + table_name_);
}
//step 2: check table existence
engine::Status stat = DBWrapper::DB()->PreloadTable(table_name_);
if (!stat.ok()) {
return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
}
rc.ElapseFromBegin("totally cost");
} catch (std::exception &ex) {
return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
}
return SERVER_SUCCESS;
}
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册