From 26502bcbd8afaa18f609e93871f3f8d426db14ea Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Thu, 23 Jul 2020 17:31:13 +0800 Subject: [PATCH] grpc partition related APIs use snapshot (#2982) * opt interface Signed-off-by: yudong.cai * update partition grpc APIs Signed-off-by: yudong.cai --- core/CMakeLists.txt | 1 - core/src/db/DBFactory.cpp | 4 -- core/src/db/DBImpl.cpp | 1 + core/src/db/DBImpl.h | 1 - core/src/db/SSDB.h | 6 +-- core/src/db/SSDBImpl.cpp | 5 +- core/src/db/SSDBImpl.h | 3 +- core/src/server/delivery/RequestHandler.cpp | 2 +- core/src/server/delivery/RequestHandler.h | 2 +- .../src/server/delivery/request/BaseRequest.h | 12 ----- .../request/CreatePartitionRequest.cpp | 53 +++++-------------- .../delivery/request/CreatePartitionRequest.h | 2 +- .../delivery/request/DropPartitionRequest.cpp | 44 ++++----------- .../request/ShowPartitionsRequest.cpp | 49 ++++------------- .../delivery/request/ShowPartitionsRequest.h | 6 +-- .../server/grpc_impl/GrpcRequestHandler.cpp | 8 +-- .../web_impl/handler/WebRequestHandler.cpp | 16 +++--- 17 files changed, 59 insertions(+), 156 deletions(-) diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 0a66ab11..91170134 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -71,7 +71,6 @@ endif () set_milvus_definition(MILVUS_WITH_PROMETHEUS "MILVUS_WITH_PROMETHEUS") set_milvus_definition(ENABLE_CPU_PROFILING "ENABLE_CPU_PROFILING") set_milvus_definition(MILVUS_WITH_FIU "FIU_ENABLE") -set_milvus_definition(MILVUS_USE_SNAPSHOT "MILVUS_USE_SNAPSHOT") config_summary() diff --git a/core/src/db/DBFactory.cpp b/core/src/db/DBFactory.cpp index 0cf8b485..645da8e8 100644 --- a/core/src/db/DBFactory.cpp +++ b/core/src/db/DBFactory.cpp @@ -10,12 +10,8 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "db/DBFactory.h" -#ifdef MILVUS_USE_SNAPSHOT -#include "SSDBImpl.h" -#else #include "DBImpl.h" #include "SSDBImpl.h" -#endif #include "meta/MetaFactory.h" #include "meta/MySQLMetaImpl.h" #include "meta/SqliteMetaImpl.h" diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 7e580bf3..eb2236e4 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index 58dcf6c8..7465fa80 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -13,7 +13,6 @@ #include #include -#include #include #include #include diff --git a/core/src/db/SSDB.h b/core/src/db/SSDB.h index e6592b8d..ab77ebcc 100644 --- a/core/src/db/SSDB.h +++ b/core/src/db/SSDB.h @@ -11,7 +11,6 @@ #pragma once -#include #include #include #include @@ -54,8 +53,9 @@ class SSDB { DropCollection(const std::string& name) = 0; virtual Status - DescribeCollection(const std::string& collection_name, snapshot::CollectionPtr& collection, - std::map>& fields_schema) = 0; + DescribeCollection( + const std::string& collection_name, snapshot::CollectionPtr& collection, + std::unordered_map>& fields_schema) = 0; virtual Status HasCollection(const std::string& collection_name, bool& has_or_not) = 0; diff --git a/core/src/db/SSDBImpl.cpp b/core/src/db/SSDBImpl.cpp index 34b9b403..cec07ff2 100644 --- a/core/src/db/SSDBImpl.cpp +++ b/core/src/db/SSDBImpl.cpp @@ -234,8 +234,9 @@ SSDBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) { } Status -SSDBImpl::DescribeCollection(const std::string& collection_name, snapshot::CollectionPtr& collection, - std::map>& fields_schema) { +SSDBImpl::DescribeCollection( + const std::string& collection_name, snapshot::CollectionPtr& collection, + std::unordered_map>& fields_schema) { CHECK_INITIALIZED; snapshot::ScopedSnapshotT ss; diff --git a/core/src/db/SSDBImpl.h b/core/src/db/SSDBImpl.h index 3c18ab4d..cf64e541 100644 --- a/core/src/db/SSDBImpl.h +++ b/core/src/db/SSDBImpl.h @@ -13,7 +13,6 @@ #include #include -#include #include #include #include @@ -50,7 +49,7 @@ class SSDBImpl : public SSDB { Status DescribeCollection(const std::string& collection_name, snapshot::CollectionPtr& collection, - std::map>& fields_schema) override; + std::unordered_map>& fields_schema); Status HasCollection(const std::string& collection_name, bool& has_or_not) override; diff --git a/core/src/server/delivery/RequestHandler.cpp b/core/src/server/delivery/RequestHandler.cpp index d787a1c7..04e408c0 100644 --- a/core/src/server/delivery/RequestHandler.cpp +++ b/core/src/server/delivery/RequestHandler.cpp @@ -246,7 +246,7 @@ RequestHandler::HasPartition(const std::shared_ptr& context, const std: Status RequestHandler::ShowPartitions(const std::shared_ptr& context, const std::string& collection_name, - std::vector& partitions) { + std::vector& partitions) { BaseRequestPtr request_ptr = ShowPartitionsRequest::Create(context, collection_name, partitions); RequestScheduler::ExecRequest(request_ptr); diff --git a/core/src/server/delivery/RequestHandler.h b/core/src/server/delivery/RequestHandler.h index 820ba507..2808649b 100644 --- a/core/src/server/delivery/RequestHandler.h +++ b/core/src/server/delivery/RequestHandler.h @@ -110,7 +110,7 @@ class RequestHandler { Status ShowPartitions(const std::shared_ptr& context, const std::string& collection_name, - std::vector& partitions); + std::vector& partitions); Status DropPartition(const std::shared_ptr& context, const std::string& collection_name, const std::string& tag); diff --git a/core/src/server/delivery/request/BaseRequest.h b/core/src/server/delivery/request/BaseRequest.h index 526d6fe1..ac350ef7 100644 --- a/core/src/server/delivery/request/BaseRequest.h +++ b/core/src/server/delivery/request/BaseRequest.h @@ -102,18 +102,6 @@ struct IndexParam { } }; -struct PartitionParam { - std::string collection_name_; - std::string tag_; - - PartitionParam() = default; - - PartitionParam(const std::string& collection_name, const std::string& tag) { - collection_name_ = collection_name; - tag_ = tag; - } -}; - class Context; class BaseRequest { diff --git a/core/src/server/delivery/request/CreatePartitionRequest.cpp b/core/src/server/delivery/request/CreatePartitionRequest.cpp index b5528043..d1a84eac 100644 --- a/core/src/server/delivery/request/CreatePartitionRequest.cpp +++ b/core/src/server/delivery/request/CreatePartitionRequest.cpp @@ -43,69 +43,44 @@ CreatePartitionRequest::OnExecute() { try { // step 1: check arguments - auto status = ValidateCollectionName(collection_name_); - fiu_do_on("CreatePartitionRequest.OnExecute.invalid_collection_name", - status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); - if (!status.ok()) { - return status; - } - if (tag_ == milvus::engine::DEFAULT_PARTITON_TAG) { return Status(SERVER_INVALID_PARTITION_TAG, "'_default' is built-in partition tag"); } - status = ValidatePartitionTags({tag_}); - fiu_do_on("CreatePartitionRequest.OnExecute.invalid_partition_name", + auto status = ValidatePartitionTags({tag_}); + fiu_do_on("CreatePartitionRequest.OnExecute.invalid_partition_tags", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { return status; } // only process root collection, ignore partition collection - engine::meta::CollectionSchema collection_schema; - collection_schema.collection_id_ = collection_name_; - status = DBWrapper::DB()->DescribeCollection(collection_schema); - fiu_do_on("CreatePartitionRequest.OnExecute.invalid_partition_tags", - status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); - if (!status.ok()) { - if (status.code() == DB_NOT_FOUND) { - return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); - } else { - return status; - } - } else { - if (!collection_schema.owner_collection_.empty()) { - return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); - } + bool exist = false; + status = DBWrapper::DB()->HasCollection(collection_name_, exist); + if (!exist) { + return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); } // check partition total count - std::vector schema_array; - status = DBWrapper::DB()->ShowPartitions(collection_name_, schema_array); - if (schema_array.size() >= MAX_PARTITION_LIMIT) { - return Status(SERVER_UNSUPPORTED_ERROR, "The number of partitions exceeds the upper limit(4096)"); + std::vector partition_names; + status = DBWrapper::SSDB()->ShowPartitions(collection_name_, partition_names); + if (partition_names.size() >= MAX_PARTITION_LIMIT) { + std::stringstream err_ss; + err_ss << "The number of partitions exceeds the upper limit (" << MAX_PARTITION_LIMIT << ")"; + return Status(SERVER_UNSUPPORTED_ERROR, err_ss.str()); } rc.RecordSection("check validation"); // step 2: create partition - status = DBWrapper::DB()->CreatePartition(collection_name_, "", tag_); - fiu_do_on("CreatePartitionRequest.OnExecute.db_already_exist", status = Status(milvus::DB_ALREADY_EXIST, "")); + status = DBWrapper::SSDB()->CreatePartition(collection_name_, tag_); fiu_do_on("CreatePartitionRequest.OnExecute.create_partition_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); fiu_do_on("CreatePartitionRequest.OnExecute.throw_std_exception", throw std::exception()); - if (!status.ok()) { - // partition could exist - if (status.code() == DB_ALREADY_EXIST) { - return Status(SERVER_INVALID_COLLECTION_NAME, status.message()); - } - return status; - } + return status; } catch (std::exception& ex) { return Status(SERVER_UNEXPECTED_ERROR, ex.what()); } - - return Status::OK(); } } // namespace server diff --git a/core/src/server/delivery/request/CreatePartitionRequest.h b/core/src/server/delivery/request/CreatePartitionRequest.h index c1809ae4..f3f76c25 100644 --- a/core/src/server/delivery/request/CreatePartitionRequest.h +++ b/core/src/server/delivery/request/CreatePartitionRequest.h @@ -21,7 +21,7 @@ namespace server { class CreatePartitionRequest : public BaseRequest { public: static BaseRequestPtr - Create(const std::shared_ptr& context, const std::string& partition_name, + Create(const std::shared_ptr& context, const std::string& collection_name, const std::string& tag); protected: diff --git a/core/src/server/delivery/request/DropPartitionRequest.cpp b/core/src/server/delivery/request/DropPartitionRequest.cpp index f1132719..b18767e7 100644 --- a/core/src/server/delivery/request/DropPartitionRequest.cpp +++ b/core/src/server/delivery/request/DropPartitionRequest.cpp @@ -38,50 +38,24 @@ DropPartitionRequest::OnExecute() { std::string hdr = "DropPartitionRequest(collection=" + collection_name_ + ", partition_tag=" + tag_ + ")"; TimeRecorderAuto rc(hdr); - std::string collection_name = collection_name_; - std::string partition_tag = tag_; - - // step 1: check collection name - auto status = ValidateCollectionName(collection_name); - fiu_do_on("DropPartitionRequest.OnExecute.invalid_collection_name", - status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); - if (!status.ok()) { - return status; - } - - // step 2: check partition tag - if (partition_tag == milvus::engine::DEFAULT_PARTITON_TAG) { + /* check partition tag */ + if (tag_ == milvus::engine::DEFAULT_PARTITON_TAG) { std::string msg = "Default partition cannot be dropped."; LOG_SERVER_ERROR_ << msg; return Status(SERVER_INVALID_COLLECTION_NAME, msg); } - status = ValidatePartitionTags({partition_tag}); - if (!status.ok()) { - return status; - } - - // step 3: check collection - // only process root collection, ignore partition collection - engine::meta::CollectionSchema collection_schema; - collection_schema.collection_id_ = collection_name_; - status = DBWrapper::DB()->DescribeCollection(collection_schema); - if (!status.ok()) { - if (status.code() == DB_NOT_FOUND) { - return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); - } else { - return status; - } - } else { - if (!collection_schema.owner_collection_.empty()) { - return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); - } + /* check collection */ + bool exist = false; + auto status = DBWrapper::DB()->HasCollection(collection_name_, exist); + if (!exist) { + return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); } rc.RecordSection("check validation"); - // step 4: drop partition - return DBWrapper::DB()->DropPartitionByTag(collection_name, partition_tag); + /* drop partition */ + return DBWrapper::SSDB()->DropPartition(collection_name_, tag_); } } // namespace server diff --git a/core/src/server/delivery/request/ShowPartitionsRequest.cpp b/core/src/server/delivery/request/ShowPartitionsRequest.cpp index 983c910d..d9856171 100644 --- a/core/src/server/delivery/request/ShowPartitionsRequest.cpp +++ b/core/src/server/delivery/request/ShowPartitionsRequest.cpp @@ -24,7 +24,7 @@ namespace server { ShowPartitionsRequest::ShowPartitionsRequest(const std::shared_ptr& context, const std::string& collection_name, - std::vector& partition_list) + std::vector& partition_list) : BaseRequest(context, BaseRequest::kShowPartitions), collection_name_(collection_name), partition_list_(partition_list) { @@ -32,7 +32,7 @@ ShowPartitionsRequest::ShowPartitionsRequest(const std::shared_ptr& context, - const std::string& collection_name, std::vector& partition_list) { + const std::string& collection_name, std::vector& partition_list) { return std::shared_ptr(new ShowPartitionsRequest(context, collection_name, partition_list)); } @@ -41,47 +41,18 @@ ShowPartitionsRequest::OnExecute() { std::string hdr = "ShowPartitionsRequest(collection=" + collection_name_ + ")"; TimeRecorderAuto rc(hdr); - // step 1: check collection name - auto status = ValidateCollectionName(collection_name_); - fiu_do_on("ShowPartitionsRequest.OnExecute.invalid_collection_name", - status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); - if (!status.ok()) { - return status; - } - - // step 2: check collection existence - // only process root collection, ignore partition collection - engine::meta::CollectionSchema collection_schema; - collection_schema.collection_id_ = collection_name_; - status = DBWrapper::DB()->DescribeCollection(collection_schema); - if (!status.ok()) { - if (status.code() == DB_NOT_FOUND) { - return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); - } else { - return status; - } - } else { - if (!collection_schema.owner_collection_.empty()) { - return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); - } + /* check collection existence */ + bool exist = false; + auto status = DBWrapper::DB()->HasCollection(collection_name_, exist); + if (!exist) { + return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); } - // step 3: get partitions - std::vector schema_array; - status = DBWrapper::DB()->ShowPartitions(collection_name_, schema_array); + /* get partitions */ + status = DBWrapper::SSDB()->ShowPartitions(collection_name_, partition_list_); fiu_do_on("ShowPartitionsRequest.OnExecute.show_partition_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); - if (!status.ok()) { - return status; - } - - partition_list_.clear(); - partition_list_.emplace_back(collection_name_, milvus::engine::DEFAULT_PARTITON_TAG); - for (auto& schema : schema_array) { - partition_list_.emplace_back(schema.owner_collection_, schema.partition_tag_); - } - - return Status::OK(); + return status; } } // namespace server diff --git a/core/src/server/delivery/request/ShowPartitionsRequest.h b/core/src/server/delivery/request/ShowPartitionsRequest.h index 1ef1b99e..f044d402 100644 --- a/core/src/server/delivery/request/ShowPartitionsRequest.h +++ b/core/src/server/delivery/request/ShowPartitionsRequest.h @@ -24,18 +24,18 @@ class ShowPartitionsRequest : public BaseRequest { public: static BaseRequestPtr Create(const std::shared_ptr& context, const std::string& collection_name, - std::vector& partition_list); + std::vector& partition_list); protected: ShowPartitionsRequest(const std::shared_ptr& context, const std::string& collection_name, - std::vector& partition_list); + std::vector& partition_list); Status OnExecute() override; private: const std::string collection_name_; - std::vector& partition_list_; + std::vector& partition_list_; }; } // namespace server diff --git a/core/src/server/grpc_impl/GrpcRequestHandler.cpp b/core/src/server/grpc_impl/GrpcRequestHandler.cpp index 1d7f4d13..eb493c3d 100644 --- a/core/src/server/grpc_impl/GrpcRequestHandler.cpp +++ b/core/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -1137,10 +1137,10 @@ GrpcRequestHandler::ShowPartitions(::grpc::ServerContext* context, const ::milvu CHECK_NULLPTR_RETURN(request); LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->RequestID().c_str(), __func__); - std::vector partitions; - Status status = request_handler_.ShowPartitions(GetContext(context), request->collection_name(), partitions); - for (auto& partition : partitions) { - response->add_partition_tag_array(partition.tag_); + std::vector partition_names; + Status status = request_handler_.ShowPartitions(GetContext(context), request->collection_name(), partition_names); + for (auto& pn : partition_names) { + response->add_partition_tag_array(pn); } LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->RequestID().c_str(), __func__); diff --git a/core/src/server/web_impl/handler/WebRequestHandler.cpp b/core/src/server/web_impl/handler/WebRequestHandler.cpp index 531b3b7a..63ad58d3 100644 --- a/core/src/server/web_impl/handler/WebRequestHandler.cpp +++ b/core/src/server/web_impl/handler/WebRequestHandler.cpp @@ -1507,27 +1507,27 @@ WebRequestHandler::ShowPartitions(const OString& collection_name, const OQueryPa all_required = required_str == "True" || required_str == "true"; } - std::vector partitions; - status = request_handler_.ShowPartitions(context_ptr_, collection_name->std_str(), partitions); + std::vector partition_names; + status = request_handler_.ShowPartitions(context_ptr_, collection_name->std_str(), partition_names); if (!status.ok()) { ASSIGN_RETURN_STATUS_DTO(status) } if (all_required) { offset = 0; - page_size = partitions.size(); + page_size = partition_names.size(); } else { - offset = std::min((size_t)offset, partitions.size()); - page_size = std::min(partitions.size() - offset, (size_t)page_size); + offset = std::min((size_t)offset, partition_names.size()); + page_size = std::min(partition_names.size() - offset, (size_t)page_size); } - partition_list_dto->count = partitions.size(); + partition_list_dto->count = partition_names.size(); partition_list_dto->partitions = partition_list_dto->partitions->createShared(); - if (offset < (int64_t)(partitions.size())) { + if (offset < (int64_t)(partition_names.size())) { for (int64_t i = offset; i < page_size + offset; i++) { auto partition_dto = PartitionFieldsDto::createShared(); - partition_dto->partition_tag = partitions.at(i).tag_.c_str(); + partition_dto->partition_tag = partition_names.at(i).c_str(); partition_list_dto->partitions->pushBack(partition_dto); } } -- GitLab