From 7cb09006bbd837d5a7de1544ad5d445da0d99a73 Mon Sep 17 00:00:00 2001 From: yukun Date: Thu, 23 Jul 2020 21:04:30 +0800 Subject: [PATCH] Collection and Search call SSDBImpl (#2988) * Fix test_rpc Signed-off-by: fishpenguin * Fix test_search.py and test_index.py Signed-off-by: fishpenguin * CreateCollection to SSDBImpl Signed-off-by: fishpenguin * Collection to SSDBImpl Signed-off-by: fishpenguin * Search to SSDBImpl Signed-off-by: fishpenguin * code format Signed-off-by: fishpenguin * Fix DescribeCollection Signed-off-by: fishpenguin * Index to SSDB Signed-off-by: fishpenguin --- core/src/db/meta/MetaTypes.h | 2 + core/src/server/ValidationUtil.cpp | 5 +- core/src/server/ValidationUtil.h | 3 +- core/src/server/delivery/RequestHandler.cpp | 40 +----- core/src/server/delivery/RequestHandler.h | 18 +-- .../CreateHybridCollectionRequest.cpp | 17 ++- .../CreateHybridIndexRequest.cpp | 136 ------------------ .../hybrid_request/CreateHybridIndexRequest.h | 45 ------ .../DescribeHybridCollectionRequest.cpp | 30 ++-- .../hybrid_request/HybridSearchRequest.cpp | 74 ++++------ .../hybrid_request/HybridSearchRequest.h | 19 +-- .../delivery/request/CompactRequest.cpp | 14 +- .../request/CountCollectionRequest.cpp | 14 +- .../request/CreateCollectionRequest.cpp | 114 --------------- .../request/CreateCollectionRequest.h | 43 ------ .../delivery/request/CreateIndexRequest.cpp | 30 ++-- .../delivery/request/DeleteByIDRequest.cpp | 22 ++- .../delivery/request/DeleteByIDRequest.h | 6 +- .../request/DescribeCollectionRequest.cpp | 79 ---------- .../request/DescribeCollectionRequest.h | 41 ------ .../delivery/request/DescribeIndexRequest.cpp | 25 +++- .../request/DropCollectionRequest.cpp | 16 +-- .../delivery/request/DropIndexRequest.cpp | 17 +-- .../server/delivery/request/FlushRequest.cpp | 13 +- .../delivery/request/HasCollectionRequest.cpp | 2 +- .../request/PreloadCollectionRequest.cpp | 20 +-- .../request/ShowCollectionInfoRequest.cpp | 12 +- .../request/ShowCollectionsRequest.cpp | 2 +- .../server/grpc_impl/GrpcRequestHandler.cpp | 47 +++--- .../web_impl/handler/WebRequestHandler.cpp | 35 ++--- core/unittest/ssdb/CMakeLists.txt | 1 + 31 files changed, 224 insertions(+), 718 deletions(-) delete mode 100644 core/src/server/delivery/hybrid_request/CreateHybridIndexRequest.cpp delete mode 100644 core/src/server/delivery/hybrid_request/CreateHybridIndexRequest.h delete mode 100644 core/src/server/delivery/request/CreateCollectionRequest.cpp delete mode 100644 core/src/server/delivery/request/CreateCollectionRequest.h delete mode 100644 core/src/server/delivery/request/DescribeCollectionRequest.cpp delete mode 100644 core/src/server/delivery/request/DescribeCollectionRequest.h diff --git a/core/src/db/meta/MetaTypes.h b/core/src/db/meta/MetaTypes.h index 03866856..56b0a6b3 100644 --- a/core/src/db/meta/MetaTypes.h +++ b/core/src/db/meta/MetaTypes.h @@ -23,6 +23,8 @@ namespace milvus { namespace engine { +static const char* DIMENSION = "dim"; + // TODO(linxj): replace with VecIndex::IndexType enum class EngineType { INVALID = 0, diff --git a/core/src/server/ValidationUtil.cpp b/core/src/server/ValidationUtil.cpp index fe6d8f2a..3b22457d 100644 --- a/core/src/server/ValidationUtil.cpp +++ b/core/src/server/ValidationUtil.cpp @@ -255,8 +255,7 @@ ValidateCollectionIndexType(int32_t index_type) { } Status -ValidateIndexParams(const milvus::json& index_params, const engine::meta::CollectionSchema& collection_schema, - int32_t index_type) { +ValidateIndexParams(const milvus::json& index_params, int64_t dimension, int32_t index_type) { switch (index_type) { case (int32_t)engine::EngineType::FAISS_IDMAP: case (int32_t)engine::EngineType::FAISS_BIN_IDMAP: { @@ -286,7 +285,7 @@ ValidateIndexParams(const milvus::json& index_params, const engine::meta::Collec // special check for 'm' parameter std::vector resset; - milvus::knowhere::IVFPQConfAdapter::GetValidMList(collection_schema.dimension_, resset); + milvus::knowhere::IVFPQConfAdapter::GetValidMList(dimension, resset); int64_t m_value = index_params[knowhere::IndexParams::m]; if (resset.empty()) { std::string msg = "Invalid collection dimension, unable to get reasonable values for 'm'"; diff --git a/core/src/server/ValidationUtil.h b/core/src/server/ValidationUtil.h index dc509dcf..1e089c20 100644 --- a/core/src/server/ValidationUtil.h +++ b/core/src/server/ValidationUtil.h @@ -40,8 +40,7 @@ extern Status ValidateCollectionIndexType(int32_t index_type); extern Status -ValidateIndexParams(const milvus::json& index_params, const engine::meta::CollectionSchema& collection_schema, - int32_t index_type); +ValidateIndexParams(const milvus::json& index_params, int64_t dimension, int32_t index_type); extern Status ValidateSearchParams(const milvus::json& search_params, const engine::meta::CollectionSchema& collection_schema, diff --git a/core/src/server/delivery/RequestHandler.cpp b/core/src/server/delivery/RequestHandler.cpp index 04e408c0..81a19190 100644 --- a/core/src/server/delivery/RequestHandler.cpp +++ b/core/src/server/delivery/RequestHandler.cpp @@ -18,11 +18,9 @@ #include "server/delivery/request/CmdRequest.h" #include "server/delivery/request/CompactRequest.h" #include "server/delivery/request/CountCollectionRequest.h" -#include "server/delivery/request/CreateCollectionRequest.h" #include "server/delivery/request/CreateIndexRequest.h" #include "server/delivery/request/CreatePartitionRequest.h" #include "server/delivery/request/DeleteByIDRequest.h" -#include "server/delivery/request/DescribeCollectionRequest.h" #include "server/delivery/request/DescribeIndexRequest.h" #include "server/delivery/request/DropCollectionRequest.h" #include "server/delivery/request/DropIndexRequest.h" @@ -42,7 +40,6 @@ #include "server/delivery/request/ShowPartitionsRequest.h" #include "server/delivery/hybrid_request/CreateHybridCollectionRequest.h" -#include "server/delivery/hybrid_request/CreateHybridIndexRequest.h" #include "server/delivery/hybrid_request/DescribeHybridCollectionRequest.h" #include "server/delivery/hybrid_request/GetEntityByIDRequest.h" #include "server/delivery/hybrid_request/HybridSearchRequest.h" @@ -51,16 +48,6 @@ namespace milvus { namespace server { -Status -RequestHandler::CreateCollection(const std::shared_ptr& context, const std::string& collection_name, - int64_t dimension, int64_t index_file_size, int64_t metric_type) { - BaseRequestPtr request_ptr = - CreateCollectionRequest::Create(context, collection_name, dimension, index_file_size, metric_type); - RequestScheduler::ExecRequest(request_ptr); - - return request_ptr->status(); -} - Status RequestHandler::HasCollection(const std::shared_ptr& context, const std::string& collection_name, bool& has_collection) { @@ -156,15 +143,6 @@ RequestHandler::SearchByID(const std::shared_ptr& context, const std::s return request_ptr->status(); } -Status -RequestHandler::DescribeCollection(const std::shared_ptr& context, const std::string& collection_name, - CollectionSchema& collection_schema) { - BaseRequestPtr request_ptr = DescribeCollectionRequest::Create(context, collection_name, collection_schema); - RequestScheduler::ExecRequest(request_ptr); - - return request_ptr->status(); -} - Status RequestHandler::CountCollection(const std::shared_ptr& context, const std::string& collection_name, int64_t& count) { @@ -333,26 +311,14 @@ RequestHandler::GetEntityByID(const std::shared_ptr& context, const std } Status -RequestHandler::HybridSearch(const std::shared_ptr& context, const std::string& collection_name, - std::vector& partition_list, query::GeneralQueryPtr& general_query, - query::QueryPtr& query_ptr, milvus::json& json_params, - std::vector& field_names, engine::QueryResult& result) { - BaseRequestPtr request_ptr = HybridSearchRequest::Create(context, collection_name, partition_list, general_query, - query_ptr, json_params, field_names, result); +RequestHandler::HybridSearch(const std::shared_ptr& context, const query::QueryPtr& query_ptr, + const milvus::json& json_params, engine::QueryResultPtr& result) { + BaseRequestPtr request_ptr = HybridSearchRequest::Create(context, query_ptr, json_params, result); RequestScheduler::ExecRequest(request_ptr); return request_ptr->status(); } -Status -RequestHandler::CreateHybridIndex(const std::shared_ptr& context, const std::string& collection_name, - const std::vector& field_names, const milvus::json& json_params) { - BaseRequestPtr request_ptr = CreateHybridIndexRequest::Create(context, collection_name, field_names, json_params); - - RequestScheduler::ExecRequest(request_ptr); - return request_ptr->status(); -} - } // namespace server } // namespace milvus diff --git a/core/src/server/delivery/RequestHandler.h b/core/src/server/delivery/RequestHandler.h index 2808649b..6c879419 100644 --- a/core/src/server/delivery/RequestHandler.h +++ b/core/src/server/delivery/RequestHandler.h @@ -29,10 +29,6 @@ class RequestHandler { public: RequestHandler() = default; - Status - CreateCollection(const std::shared_ptr& context, const std::string& collection_name, int64_t dimension, - int64_t index_file_size, int64_t metric_type); - Status HasCollection(const std::shared_ptr& context, const std::string& collection_name, bool& has_collection); @@ -72,10 +68,6 @@ class RequestHandler { const std::vector& id_array, int64_t topk, const milvus::json& extra_params, const std::vector& partition_list, TopKQueryResult& result); - Status - DescribeCollection(const std::shared_ptr& context, const std::string& collection_name, - CollectionSchema& collection_schema); - Status CountCollection(const std::shared_ptr& context, const std::string& collection_name, int64_t& count); @@ -150,14 +142,8 @@ class RequestHandler { std::vector& attrs, std::vector& vectors); Status - HybridSearch(const std::shared_ptr& context, const std::string& collection_name, - std::vector& partition_list, query::GeneralQueryPtr& general_query, - query::QueryPtr& query_ptr, milvus::json& json_params, std::vector& field_names, - engine::QueryResult& result); - - Status - CreateHybridIndex(const std::shared_ptr& context, const std::string& collection_name, - const std::vector& field_names, const milvus::json& json_params); + HybridSearch(const std::shared_ptr& context, const query::QueryPtr& query_ptr, + const milvus::json& json_params, engine::QueryResultPtr& result); }; } // namespace server diff --git a/core/src/server/delivery/hybrid_request/CreateHybridCollectionRequest.cpp b/core/src/server/delivery/hybrid_request/CreateHybridCollectionRequest.cpp index 14e46aa8..9557f830 100644 --- a/core/src/server/delivery/hybrid_request/CreateHybridCollectionRequest.cpp +++ b/core/src/server/delivery/hybrid_request/CreateHybridCollectionRequest.cpp @@ -19,6 +19,7 @@ #include "utils/TimeRecorder.h" #include +#include #include #include #include @@ -123,8 +124,20 @@ CreateHybridCollectionRequest::OnExecute() { collection_info.metric_type_ = metric_type; } - // step 3: create collection - status = DBWrapper::DB()->CreateHybridCollection(collection_info, fields_schema); + // step 3: create snapshot collection + engine::snapshot::CreateCollectionContext create_collection_context; + auto ss_collection_schema = std::make_shared(collection_name_); + create_collection_context.collection = ss_collection_schema; + for (const auto& schema : fields_schema.fields_schema_) { + auto field = std::make_shared( + schema.field_name_, 0, (engine::FieldType)schema.field_type_, json::parse(schema.field_params_)); + + auto field_element = std::make_shared( + 0, 0, schema.index_name_, engine::FieldElementType::FET_INDEX, json::parse(schema.index_param_)); + create_collection_context.fields_schema[field] = {field_element}; + } + + status = DBWrapper::SSDB()->CreateCollection(create_collection_context); fiu_do_on("CreateHybridCollectionRequest.OnExecute.invalid_db_execute", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { diff --git a/core/src/server/delivery/hybrid_request/CreateHybridIndexRequest.cpp b/core/src/server/delivery/hybrid_request/CreateHybridIndexRequest.cpp deleted file mode 100644 index de7f2cce..00000000 --- a/core/src/server/delivery/hybrid_request/CreateHybridIndexRequest.cpp +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#include "server/delivery/hybrid_request/CreateHybridIndexRequest.h" -#include "db/Utils.h" -#include "server/DBWrapper.h" -#include "server/ValidationUtil.h" -#include "utils/Log.h" -#include "utils/TimeRecorder.h" - -#include -#include -#include -#include - -namespace milvus { -namespace server { - -CreateHybridIndexRequest::CreateHybridIndexRequest(const std::shared_ptr& context, - const std::string& collection_name, - const std::vector& field_names, - const milvus::json& extra_params) - : BaseRequest(context, BaseRequest::kCreateHybridIndex), - collection_name_(collection_name), - field_names_(field_names), - extra_params_(extra_params) { -} - -BaseRequestPtr -CreateHybridIndexRequest::Create(const std::shared_ptr& context, - const std::string& collection_name, const std::vector& field_names, - const milvus::json& extra_params) { - return std::shared_ptr( - new CreateHybridIndexRequest(context, collection_name, field_names, extra_params)); -} - -Status -CreateHybridIndexRequest::OnExecute() { - try { - std::string hdr = "CreateIndexRequest(collection=" + collection_name_ + ")"; - TimeRecorderAuto rc(hdr); - - // step 1: check arguments - auto status = ValidateCollectionName(collection_name_); - if (!status.ok()) { - return status; - } - - // only process root collection, ignore partition collection - engine::meta::CollectionSchema collection_schema; - engine::meta::hybrid::FieldsSchema fields_schema; - collection_schema.collection_id_ = collection_name_; - status = DBWrapper::DB()->DescribeHybridCollection(collection_schema, fields_schema); - - std::unordered_map attr_types; - for (const auto& schema : fields_schema.fields_schema_) { - attr_types.insert(std::make_pair(schema.field_name_, (engine::meta::hybrid::DataType)schema.field_type_)); - } - - fiu_do_on("CreateIndexRequest.OnExecute.not_has_collection", - status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); - fiu_do_on("CreateIndexRequest.OnExecute.throw_std.exception", throw std::exception()); - if (!status.ok()) { - if (status.code() == DB_NOT_FOUND) { - return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); - } else { - return status; - } - } else { - if (!collection_schema.owner_collection_.empty()) { - return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); - } - } - - int32_t index_type = 0; - if (extra_params_.contains("index_type")) { - std::string index_type_str = extra_params_["index_type"]; - index_type = (int32_t)engine::s_map_engine_type.at(index_type_str); - } - - status = ValidateCollectionIndexType(index_type); - if (!status.ok()) { - return status; - } - - status = ValidateIndexParams(extra_params_, collection_schema, index_type); - if (!status.ok()) { - return status; - } - - // step 2: binary and float vector support different index/metric type, need to adapt here - int32_t adapter_index_type = index_type; - if (engine::utils::IsBinaryMetricType(collection_schema.metric_type_)) { // binary vector not allow - if (adapter_index_type == static_cast(engine::EngineType::FAISS_IDMAP)) { - adapter_index_type = static_cast(engine::EngineType::FAISS_BIN_IDMAP); - } else if (adapter_index_type == static_cast(engine::EngineType::FAISS_IVFFLAT)) { - adapter_index_type = static_cast(engine::EngineType::FAISS_BIN_IVFFLAT); - } else { - return Status(SERVER_INVALID_INDEX_TYPE, "Invalid index type for collection metric type"); - } - } - - rc.RecordSection("check validation"); - - // // step 3: create index - // status = DBWrapper::DB()->CreateStructuredIndex(context_, collection_name_, field_names_); - // fiu_do_on("CreateIndexRequest.OnExecute.create_index_fail", - // status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); - // if (!status.ok()) { - // return status; - // } - // - // engine::CollectionIndex index; - // index.engine_type_ = adapter_index_type; - // index.extra_params_ = extra_params_; - // status = DBWrapper::DB()->CreateIndex(context_, collection_name_, index); - // if (!status.ok()) { - // return status; - // } - } catch (std::exception& ex) { - return Status(SERVER_UNEXPECTED_ERROR, ex.what()); - } - - return Status::OK(); -} - -} // namespace server -} // namespace milvus diff --git a/core/src/server/delivery/hybrid_request/CreateHybridIndexRequest.h b/core/src/server/delivery/hybrid_request/CreateHybridIndexRequest.h deleted file mode 100644 index 40586bc7..00000000 --- a/core/src/server/delivery/hybrid_request/CreateHybridIndexRequest.h +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#pragma once - -#include -#include -#include -#include - -#include "server/delivery/request/BaseRequest.h" - -namespace milvus { -namespace server { - -class CreateHybridIndexRequest : public BaseRequest { - public: - static BaseRequestPtr - Create(const std::shared_ptr& context, const std::string& collection_name, - const std::vector& field_names, const milvus::json& extra_params); - - protected: - CreateHybridIndexRequest(const std::shared_ptr& context, - const std::string& collection_name, const std::vector& field_names, - const milvus::json& extra_params); - - Status - OnExecute() override; - - private: - const std::string collection_name_; - std::vector field_names_; - const milvus::json& extra_params_; -}; - -} // namespace server -} // namespace milvus diff --git a/core/src/server/delivery/hybrid_request/DescribeHybridCollectionRequest.cpp b/core/src/server/delivery/hybrid_request/DescribeHybridCollectionRequest.cpp index 0ee7513b..d0c750d7 100644 --- a/core/src/server/delivery/hybrid_request/DescribeHybridCollectionRequest.cpp +++ b/core/src/server/delivery/hybrid_request/DescribeHybridCollectionRequest.cpp @@ -48,17 +48,32 @@ DescribeHybridCollectionRequest::OnExecute() { TimeRecorderAuto rc(hdr); try { - engine::meta::CollectionSchema collection_schema; - engine::meta::hybrid::FieldsSchema fields_schema; - collection_schema.collection_id_ = collection_name_; - auto status = DBWrapper::DB()->DescribeHybridCollection(collection_schema, fields_schema); - fiu_do_on("DescribeHybridCollectionRequest.OnExecute.invalid_db_execute", - status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); + engine::snapshot::CollectionPtr collection; + std::unordered_map> fields_schema; + auto status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema); if (!status.ok()) { return status; } + collection_schema_.collection_name_ = collection_name_; + collection_schema_.extra_params_ = collection->GetParams(); + engine::meta::hybrid::FieldsSchema fields_info; + for (auto field_it = fields_schema.begin(); field_it != fields_schema.end(); field_it++) { + engine::meta::hybrid::FieldSchema schema; + auto field = field_it->first; + schema.field_name_ = field->GetName(); + schema.field_type_ = (int)field->GetFtype(); + schema.field_params_ = field->GetParams().dump(); + auto field_elements = field_it->second; + for (const auto& element : field_elements) { + if (element->GetFtype() == (int)engine::FieldElementType::FET_INDEX) { + schema.index_name_ = element->GetName(); + schema.index_param_ = element->GetParams().dump(); + break; + } + } + } - for (const auto& schema : fields_schema.fields_schema_) { + for (const auto& schema : fields_info.fields_schema_) { auto field_name = schema.field_name_; collection_schema_.field_types_.insert( std::make_pair(field_name, (engine::meta::hybrid::DataType)schema.field_type_)); @@ -67,7 +82,6 @@ DescribeHybridCollectionRequest::OnExecute() { milvus::json json_extra_param = milvus::json::parse(schema.field_params_); collection_schema_.field_params_.insert(std::make_pair(field_name, json_extra_param)); } - collection_schema_.extra_params_["segment_size"] = collection_schema.index_file_size_ / engine::MB; } catch (std::exception& ex) { return Status(SERVER_UNEXPECTED_ERROR, ex.what()); } diff --git a/core/src/server/delivery/hybrid_request/HybridSearchRequest.cpp b/core/src/server/delivery/hybrid_request/HybridSearchRequest.cpp index 1acc354c..ae5c0618 100644 --- a/core/src/server/delivery/hybrid_request/HybridSearchRequest.cpp +++ b/core/src/server/delivery/hybrid_request/HybridSearchRequest.cpp @@ -31,65 +31,53 @@ namespace milvus { namespace server { HybridSearchRequest::HybridSearchRequest(const std::shared_ptr& context, - const std::string& collection_name, std::vector& partition_list, - query::GeneralQueryPtr& general_query, query::QueryPtr& query_ptr, - milvus::json& json_params, std::vector& field_names, - engine::QueryResult& result) + const query::QueryPtr& query_ptr, const milvus::json& json_params, + engine::QueryResultPtr& result) : BaseRequest(context, BaseRequest::kHybridSearch), - collection_name_(collection_name), - partition_list_(partition_list), - general_query_(general_query), query_ptr_(query_ptr), json_params_(json_params), - field_names_(field_names), result_(result) { } BaseRequestPtr -HybridSearchRequest::Create(const std::shared_ptr& context, const std::string& collection_name, - std::vector& partition_list, query::GeneralQueryPtr& general_query, - query::QueryPtr& query_ptr, milvus::json& json_params, - std::vector& field_names, engine::QueryResult& result) { - return std::shared_ptr(new HybridSearchRequest(context, collection_name, partition_list, general_query, - query_ptr, json_params, field_names, result)); +HybridSearchRequest::Create(const std::shared_ptr& context, const query::QueryPtr& query_ptr, + const milvus::json& json_params, engine::QueryResultPtr& result) { + return std::shared_ptr(new HybridSearchRequest(context, query_ptr, json_params, result)); } Status HybridSearchRequest::OnExecute() { try { fiu_do_on("HybridSearchRequest.OnExecute.throw_std_exception", throw std::exception()); - std::string hdr = "SearchRequest(table=" + collection_name_; + std::string hdr = "SearchRequest(table=" + query_ptr_->collection_id; TimeRecorder rc(hdr); // step 1: check table name - auto status = ValidateCollectionName(collection_name_); + auto status = ValidateCollectionName(query_ptr_->collection_id); if (!status.ok()) { return status; } // step 2: check table existence // only process root table, ignore partition table - engine::meta::CollectionSchema collection_schema; - engine::meta::hybrid::FieldsSchema fields_schema; - collection_schema.collection_id_ = collection_name_; - status = DBWrapper::DB()->DescribeHybridCollection(collection_schema, fields_schema); + engine::snapshot::CollectionPtr collection; + std::unordered_map> fields_schema; + status = DBWrapper::SSDB()->DescribeCollection(query_ptr_->collection_id, collection, fields_schema); fiu_do_on("HybridSearchRequest.OnExecute.describe_table_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { if (status.code() == DB_NOT_FOUND) { - return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); + return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(query_ptr_->collection_id)); } else { return status; } - } else { - if (!collection_schema.owner_collection_.empty()) { - return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); - } } + int64_t dimension = collection->GetParams()[engine::DIMENSION].get(); + // step 3: check partition tags - status = ValidatePartitionTags(partition_list_); + status = ValidatePartitionTags(query_ptr_->partitions); fiu_do_on("HybridSearchRequest.OnExecute.invalid_partition_tags", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { @@ -97,7 +85,14 @@ HybridSearchRequest::OnExecute() { return status; } - // step 3: check field names + // step 4: Get field info + std::unordered_map field_types; + for (auto& schema : fields_schema) { + field_types.insert( + std::make_pair(schema.first->GetName(), (engine::meta::hybrid::DataType)schema.first->GetFtype())); + } + + // step 5: check field names if (json_params_.contains("fields")) { if (json_params_["fields"].is_array()) { for (auto& name : json_params_["fields"]) { @@ -106,8 +101,8 @@ HybridSearchRequest::OnExecute() { return status; } bool find_field_name = false; - for (const auto& schema : fields_schema.fields_schema_) { - if (name.get() == schema.field_name_) { + for (const auto& schema : field_types) { + if (name.get() == schema.first) { find_field_name = true; break; } @@ -115,20 +110,13 @@ HybridSearchRequest::OnExecute() { if (not find_field_name) { return Status{SERVER_INVALID_FIELD_NAME, "Field: " + name.get() + " not exist"}; } - field_names_.emplace_back(name.get()); + query_ptr_->field_names.emplace_back(name.get()); } } } - std::unordered_map attr_type; - for (auto& field_schema : fields_schema.fields_schema_) { - attr_type.insert( - std::make_pair(field_schema.field_name_, (engine::meta::hybrid::DataType)field_schema.field_type_)); - } - - result_.row_num_ = 0; - status = DBWrapper::DB()->HybridQuery(context_, collection_name_, partition_list_, general_query_, query_ptr_, - field_names_, attr_type, result_); + result_->row_num_ = 0; + status = DBWrapper::SSDB()->Query(context_, query_ptr_, result_); #ifdef ENABLE_CPU_PROFILING ProfilerStop(); @@ -138,13 +126,13 @@ HybridSearchRequest::OnExecute() { if (!status.ok()) { return status; } - fiu_do_on("HybridSearchRequest.OnExecute.empty_result_ids", result_.result_ids_.clear()); - if (result_.result_ids_.empty()) { + fiu_do_on("HybridSearchRequest.OnExecute.empty_result_ids", result_->result_ids_.clear()); + if (result_->result_ids_.empty()) { auto vector_query = query_ptr_->vectors.begin()->second; if (!vector_query->query_vector.binary_data.empty()) { - result_.row_num_ = vector_query->query_vector.binary_data.size() * 8 / collection_schema.dimension_; + result_->row_num_ = vector_query->query_vector.binary_data.size() * 8 / dimension; } else if (!vector_query->query_vector.float_data.empty()) { - result_.row_num_ = vector_query->query_vector.float_data.size() / collection_schema.dimension_; + result_->row_num_ = vector_query->query_vector.float_data.size() / dimension; } return Status::OK(); // empty table } diff --git a/core/src/server/delivery/hybrid_request/HybridSearchRequest.h b/core/src/server/delivery/hybrid_request/HybridSearchRequest.h index 420aa41c..a439dc84 100644 --- a/core/src/server/delivery/hybrid_request/HybridSearchRequest.h +++ b/core/src/server/delivery/hybrid_request/HybridSearchRequest.h @@ -13,7 +13,7 @@ #include "server/delivery/request/BaseRequest.h" -#include +#include #include #include #include @@ -24,27 +24,20 @@ namespace server { class HybridSearchRequest : public BaseRequest { public: static BaseRequestPtr - Create(const std::shared_ptr& context, const std::string& collection_name, - std::vector& partition_list, query::GeneralQueryPtr& general_query, query::QueryPtr& query_ptr, - milvus::json& json_params, std::vector& field_names, engine::QueryResult& result); + Create(const std::shared_ptr& context, const query::QueryPtr& query_ptr, + const milvus::json& json_params, engine::QueryResultPtr& result); protected: - HybridSearchRequest(const std::shared_ptr& context, const std::string& collection_name, - std::vector& partition_list, query::GeneralQueryPtr& general_query, - query::QueryPtr& query_ptr, milvus::json& json_params, std::vector& field_names, - engine::QueryResult& result); + HybridSearchRequest(const std::shared_ptr& context, const query::QueryPtr& query_ptr, + const milvus::json& json_params, engine::QueryResultPtr& result); Status OnExecute() override; private: - const std::string collection_name_; - std::vector partition_list_; - milvus::query::GeneralQueryPtr general_query_; milvus::query::QueryPtr query_ptr_; milvus::json json_params_; - std::vector& field_names_; - engine::QueryResult& result_; + engine::QueryResultPtr& result_; }; } // namespace server diff --git a/core/src/server/delivery/request/CompactRequest.cpp b/core/src/server/delivery/request/CompactRequest.cpp index a46887ab..54b78bf2 100644 --- a/core/src/server/delivery/request/CompactRequest.cpp +++ b/core/src/server/delivery/request/CompactRequest.cpp @@ -22,6 +22,8 @@ #include "utils/TimeRecorder.h" #include +#include +#include namespace milvus { namespace server { @@ -52,25 +54,21 @@ CompactRequest::OnExecute() { } // only process root collection, ignore partition collection - engine::meta::CollectionSchema collection_schema; - collection_schema.collection_id_ = collection_name_; - status = DBWrapper::DB()->DescribeCollection(collection_schema); + engine::snapshot::CollectionPtr collection; + std::unordered_map> fields_schema; + status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema); if (!status.ok()) { if (status.code() == DB_NOT_FOUND) { return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); } else { return status; } - } else { - if (!collection_schema.owner_collection_.empty()) { - return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); - } } rc.RecordSection("check validation"); // step 2: check collection existence - status = DBWrapper::DB()->Compact(context_, collection_name_, compact_threshold_); + status = DBWrapper::SSDB()->Compact(context_, collection_name_, compact_threshold_); if (!status.ok()) { return status; } diff --git a/core/src/server/delivery/request/CountCollectionRequest.cpp b/core/src/server/delivery/request/CountCollectionRequest.cpp index 142c33b2..0e8c6a67 100644 --- a/core/src/server/delivery/request/CountCollectionRequest.cpp +++ b/core/src/server/delivery/request/CountCollectionRequest.cpp @@ -18,6 +18,8 @@ #include #include +#include +#include namespace milvus { namespace server { @@ -46,26 +48,22 @@ CountCollectionRequest::OnExecute() { } // only process root collection, ignore partition collection - engine::meta::CollectionSchema collection_schema; - collection_schema.collection_id_ = collection_name_; - status = DBWrapper::DB()->DescribeCollection(collection_schema); + engine::snapshot::CollectionPtr collection; + std::unordered_map> fields_schema; + status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema); if (!status.ok()) { if (status.code() == DB_NOT_FOUND) { return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); } else { return status; } - } else { - if (!collection_schema.owner_collection_.empty()) { - return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); - } } rc.RecordSection("check validation"); // step 2: get row count uint64_t row_count = 0; - status = DBWrapper::DB()->GetCollectionRowCount(collection_name_, row_count); + status = DBWrapper::SSDB()->GetCollectionRowCount(collection_name_, row_count); fiu_do_on("CountCollectionRequest.OnExecute.db_not_found", status = Status(DB_NOT_FOUND, "")); fiu_do_on("CountCollectionRequest.OnExecute.status_error", status = Status(SERVER_UNEXPECTED_ERROR, "")); fiu_do_on("CountCollectionRequest.OnExecute.throw_std_exception", throw std::exception()); diff --git a/core/src/server/delivery/request/CreateCollectionRequest.cpp b/core/src/server/delivery/request/CreateCollectionRequest.cpp deleted file mode 100644 index b7a3d071..00000000 --- a/core/src/server/delivery/request/CreateCollectionRequest.cpp +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#include "server/delivery/request/CreateCollectionRequest.h" -#include "db/Utils.h" -#include "server/DBWrapper.h" -#include "server/ValidationUtil.h" -#include "server/delivery/request/BaseRequest.h" -#include "utils/Log.h" -#include "utils/TimeRecorder.h" - -#include -#include -#include - -namespace milvus { -namespace server { - -CreateCollectionRequest::CreateCollectionRequest(const std::shared_ptr& context, - const std::string& collection_name, int64_t dimension, - int64_t index_file_size, int64_t metric_type) - : BaseRequest(context, BaseRequest::kCreateCollection), - collection_name_(collection_name), - dimension_(dimension), - index_file_size_(index_file_size), - metric_type_(metric_type) { -} - -BaseRequestPtr -CreateCollectionRequest::Create(const std::shared_ptr& context, - const std::string& collection_name, int64_t dimension, int64_t index_file_size, - int64_t metric_type) { - return std::shared_ptr( - new CreateCollectionRequest(context, collection_name, dimension, index_file_size, metric_type)); -} - -Status -CreateCollectionRequest::OnExecute() { - std::string hdr = - "CreateCollectionRequest(collection=" + collection_name_ + ", dimension=" + std::to_string(dimension_) + ")"; - TimeRecorderAuto rc(hdr); - - try { - // step 1: check arguments - auto status = ValidateCollectionName(collection_name_); - if (!status.ok()) { - return status; - } - - status = ValidateTableDimension(dimension_, metric_type_); - if (!status.ok()) { - return status; - } - - status = ValidateCollectionIndexFileSize(index_file_size_); - fiu_do_on("CreateCollectionRequest.OnExecute.invalid_index_file_size", - status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); - if (!status.ok()) { - return status; - } - - status = ValidateCollectionIndexMetricType(metric_type_); - if (!status.ok()) { - return status; - } - - rc.RecordSection("check validation"); - - // step 2: construct collection schema - engine::meta::CollectionSchema collection_info; - collection_info.collection_id_ = collection_name_; - collection_info.dimension_ = static_cast(dimension_); - collection_info.index_file_size_ = index_file_size_; - collection_info.metric_type_ = metric_type_; - - // some metric type only support binary vector, adapt the index type - if (engine::utils::IsBinaryMetricType(metric_type_)) { - if (collection_info.engine_type_ == static_cast(engine::EngineType::FAISS_IDMAP)) { - collection_info.engine_type_ = static_cast(engine::EngineType::FAISS_BIN_IDMAP); - } else if (collection_info.engine_type_ == static_cast(engine::EngineType::FAISS_IVFFLAT)) { - collection_info.engine_type_ = static_cast(engine::EngineType::FAISS_BIN_IVFFLAT); - } - } - - // step 3: create collection - status = DBWrapper::DB()->CreateCollection(collection_info); - fiu_do_on("CreateCollectionRequest.OnExecute.db_already_exist", status = Status(milvus::DB_ALREADY_EXIST, "")); - fiu_do_on("CreateCollectionRequest.OnExecute.create_collection_fail", - status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); - fiu_do_on("CreateCollectionRequest.OnExecute.throw_std_exception", throw std::exception()); - if (!status.ok()) { - // collection could exist - if (status.code() == DB_ALREADY_EXIST) { - return Status(SERVER_INVALID_COLLECTION_NAME, status.message()); - } - return status; - } - } catch (std::exception& ex) { - return Status(SERVER_UNEXPECTED_ERROR, ex.what()); - } - - return Status::OK(); -} - -} // namespace server -} // namespace milvus diff --git a/core/src/server/delivery/request/CreateCollectionRequest.h b/core/src/server/delivery/request/CreateCollectionRequest.h deleted file mode 100644 index dda8a8bc..00000000 --- a/core/src/server/delivery/request/CreateCollectionRequest.h +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#pragma once - -#include -#include - -#include "server/delivery/request/BaseRequest.h" - -namespace milvus { -namespace server { - -class CreateCollectionRequest : public BaseRequest { - public: - static BaseRequestPtr - Create(const std::shared_ptr& context, const std::string& collection_name, - int64_t dimension, int64_t index_file_size, int64_t metric_type); - - protected: - CreateCollectionRequest(const std::shared_ptr& context, const std::string& collection_name, - int64_t dimension, int64_t index_file_size, int64_t metric_type); - - Status - OnExecute() override; - - private: - const std::string collection_name_; - int64_t dimension_; - int64_t index_file_size_; - int64_t metric_type_; -}; - -} // namespace server -} // namespace milvus diff --git a/core/src/server/delivery/request/CreateIndexRequest.cpp b/core/src/server/delivery/request/CreateIndexRequest.cpp index 685f7497..cd3da41c 100644 --- a/core/src/server/delivery/request/CreateIndexRequest.cpp +++ b/core/src/server/delivery/request/CreateIndexRequest.cpp @@ -19,6 +19,8 @@ #include #include #include +#include +#include namespace milvus { namespace server { @@ -64,10 +66,9 @@ CreateIndexRequest::OnExecute() { } // only process root collection, ignore partition collection - engine::meta::CollectionSchema collection_schema; - engine::meta::hybrid::FieldsSchema fields_schema; - collection_schema.collection_id_ = collection_name_; - status = DBWrapper::DB()->DescribeHybridCollection(collection_schema, fields_schema); + engine::snapshot::CollectionPtr collection; + std::unordered_map> fields_schema; + status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema); fiu_do_on("CreateIndexRequest.OnExecute.not_has_collection", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); fiu_do_on("CreateIndexRequest.OnExecute.throw_std.exception", throw std::exception()); @@ -77,9 +78,16 @@ CreateIndexRequest::OnExecute() { } else { return status; } - } else { - if (!collection_schema.owner_collection_.empty()) { - return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); + } + + int64_t dimension; + for (auto field_it = fields_schema.begin(); field_it != fields_schema.end(); field_it++) { + auto type = field_it->first->GetFtype(); + if (type == (int64_t)engine::meta::hybrid::DataType::VECTOR_FLOAT || + type == (int64_t)engine::meta::hybrid::DataType::VECTOR_BINARY) { + auto params = field_it->first->GetParams(); + dimension = params[engine::DIMENSION].get(); + break; } } @@ -97,7 +105,7 @@ CreateIndexRequest::OnExecute() { return status; } - status = ValidateIndexParams(json_params_, collection_schema, index_type); + status = ValidateIndexParams(json_params_, dimension, index_type); if (!status.ok()) { return status; } @@ -114,9 +122,6 @@ CreateIndexRequest::OnExecute() { } else if (adapter_index_type == static_cast(engine::EngineType::FAISS_IVFFLAT)) { adapter_index_type = static_cast(engine::EngineType::FAISS_BIN_IVFFLAT); } - // else { - // return Status(SERVER_INVALID_INDEX_TYPE, "Invalid index type for collection metric type"); - // } } rc.RecordSection("check validation"); @@ -128,8 +133,9 @@ CreateIndexRequest::OnExecute() { } index.engine_type_ = adapter_index_type; + index.index_name_ = index_name_; index.extra_params_ = json_params_; - status = DBWrapper::DB()->CreateIndex(context_, collection_name_, index); + status = DBWrapper::SSDB()->CreateIndex(context_, collection_name_, field_name_, index); fiu_do_on("CreateIndexRequest.OnExecute.create_index_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { diff --git a/core/src/server/delivery/request/DeleteByIDRequest.cpp b/core/src/server/delivery/request/DeleteByIDRequest.cpp index 647fbe93..b1a2f408 100644 --- a/core/src/server/delivery/request/DeleteByIDRequest.cpp +++ b/core/src/server/delivery/request/DeleteByIDRequest.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include "server/DBWrapper.h" @@ -30,14 +31,14 @@ namespace milvus { namespace server { DeleteByIDRequest::DeleteByIDRequest(const std::shared_ptr& context, - const std::string& collection_name, const std::vector& vector_ids) - : BaseRequest(context, BaseRequest::kDeleteByID), collection_name_(collection_name), vector_ids_(vector_ids) { + const std::string& collection_name, const std::vector& entity_ids) + : BaseRequest(context, BaseRequest::kDeleteByID), collection_name_(collection_name), entity_ids_(entity_ids) { } BaseRequestPtr DeleteByIDRequest::Create(const std::shared_ptr& context, const std::string& collection_name, - const std::vector& vector_ids) { - return std::shared_ptr(new DeleteByIDRequest(context, collection_name, vector_ids)); + const std::vector& entity_ids) { + return std::shared_ptr(new DeleteByIDRequest(context, collection_name, entity_ids)); } Status @@ -52,21 +53,16 @@ DeleteByIDRequest::OnExecute() { } // step 2: check collection existence - engine::meta::CollectionSchema collection_schema; - collection_schema.collection_id_ = collection_name_; - status = DBWrapper::DB()->DescribeCollection(collection_schema); + engine::snapshot::CollectionPtr collection; + std::unordered_map> fields_schema; + status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema); if (!status.ok()) { if (status.code() == DB_NOT_FOUND) { return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); } else { return status; } - } else { - if (!collection_schema.owner_collection_.empty()) { - return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); - } } - // Check collection's index type supports delete #ifdef MILVUS_SUPPORT_SPTAG if (collection_schema.engine_type_ == (int32_t)engine::EngineType::SPTAG_BKT || @@ -80,7 +76,7 @@ DeleteByIDRequest::OnExecute() { rc.RecordSection("check validation"); - status = DBWrapper::DB()->DeleteEntities(collection_name_, vector_ids_); + status = DBWrapper::SSDB()->DeleteEntities(collection_name_, entity_ids_); if (!status.ok()) { return status; } diff --git a/core/src/server/delivery/request/DeleteByIDRequest.h b/core/src/server/delivery/request/DeleteByIDRequest.h index 2ebadd5f..18e02002 100644 --- a/core/src/server/delivery/request/DeleteByIDRequest.h +++ b/core/src/server/delivery/request/DeleteByIDRequest.h @@ -30,18 +30,18 @@ class DeleteByIDRequest : public BaseRequest { public: static BaseRequestPtr Create(const std::shared_ptr& context, const std::string& collection_name, - const std::vector& vector_ids); + const std::vector& entity_ids); protected: DeleteByIDRequest(const std::shared_ptr& context, const std::string& collection_name, - const std::vector& vector_ids); + const std::vector& entity_ids); Status OnExecute() override; private: const std::string collection_name_; - const std::vector& vector_ids_; + const std::vector& entity_ids_; }; } // namespace server diff --git a/core/src/server/delivery/request/DescribeCollectionRequest.cpp b/core/src/server/delivery/request/DescribeCollectionRequest.cpp deleted file mode 100644 index cffb8767..00000000 --- a/core/src/server/delivery/request/DescribeCollectionRequest.cpp +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#include "server/delivery/request/DescribeCollectionRequest.h" -#include "server/DBWrapper.h" -#include "server/ValidationUtil.h" -#include "utils/Log.h" -#include "utils/TimeRecorder.h" - -#include -#include - -namespace milvus { -namespace server { - -DescribeCollectionRequest::DescribeCollectionRequest(const std::shared_ptr& context, - const std::string& collection_name, CollectionSchema& schema) - : BaseRequest(context, BaseRequest::kDescribeCollection), collection_name_(collection_name), schema_(schema) { -} - -BaseRequestPtr -DescribeCollectionRequest::Create(const std::shared_ptr& context, - const std::string& collection_name, CollectionSchema& schema) { - return std::shared_ptr(new DescribeCollectionRequest(context, collection_name, schema)); -} - -Status -DescribeCollectionRequest::OnExecute() { - std::string hdr = "DescribeCollectionRequest(collection=" + collection_name_ + ")"; - TimeRecorderAuto rc(hdr); - - try { - // step 1: check arguments - auto status = ValidateCollectionName(collection_name_); - if (!status.ok()) { - return status; - } - - // step 2: get collection info - // only process root collection, ignore partition collection - engine::meta::CollectionSchema collection_schema; - collection_schema.collection_id_ = collection_name_; - status = DBWrapper::DB()->DescribeCollection(collection_schema); - fiu_do_on("DescribeCollectionRequest.OnExecute.describe_collection_fail", - status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); - fiu_do_on("DescribeCollectionRequest.OnExecute.throw_std_exception", throw std::exception()); - if (!status.ok()) { - if (status.code() == DB_NOT_FOUND) { - return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); - } else { - return status; - } - } else { - if (!collection_schema.owner_collection_.empty()) { - return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); - } - } - - schema_.collection_name_ = collection_schema.collection_id_; - schema_.dimension_ = static_cast(collection_schema.dimension_); - schema_.index_file_size_ = collection_schema.index_file_size_; - schema_.metric_type_ = collection_schema.metric_type_; - } catch (std::exception& ex) { - return Status(SERVER_UNEXPECTED_ERROR, ex.what()); - } - - return Status::OK(); -} - -} // namespace server -} // namespace milvus diff --git a/core/src/server/delivery/request/DescribeCollectionRequest.h b/core/src/server/delivery/request/DescribeCollectionRequest.h deleted file mode 100644 index d7639896..00000000 --- a/core/src/server/delivery/request/DescribeCollectionRequest.h +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#pragma once - -#include -#include - -#include "server/delivery/request/BaseRequest.h" - -namespace milvus { -namespace server { - -class DescribeCollectionRequest : public BaseRequest { - public: - static BaseRequestPtr - Create(const std::shared_ptr& context, const std::string& collection_name, - CollectionSchema& schema); - - protected: - DescribeCollectionRequest(const std::shared_ptr& context, - const std::string& collection_name, CollectionSchema& schema); - - Status - OnExecute() override; - - private: - const std::string collection_name_; - CollectionSchema& schema_; -}; - -} // namespace server -} // namespace milvus diff --git a/core/src/server/delivery/request/DescribeIndexRequest.cpp b/core/src/server/delivery/request/DescribeIndexRequest.cpp index b556ff9e..b33d3fcf 100644 --- a/core/src/server/delivery/request/DescribeIndexRequest.cpp +++ b/core/src/server/delivery/request/DescribeIndexRequest.cpp @@ -17,6 +17,8 @@ #include #include +#include +#include namespace milvus { namespace server { @@ -46,24 +48,33 @@ DescribeIndexRequest::OnExecute() { } // only process root collection, ignore partition collection - engine::meta::CollectionSchema collection_schema; - collection_schema.collection_id_ = collection_name_; - status = DBWrapper::DB()->DescribeCollection(collection_schema); + engine::snapshot::CollectionPtr collection; + std::unordered_map> fields_schema; + status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema); + fiu_do_on("DropIndexRequest.OnExecute.collection_not_exist", + status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { if (status.code() == DB_NOT_FOUND) { return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); } else { return status; } - } else { - if (!collection_schema.owner_collection_.empty()) { - return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); + } + + // TODO(yukun): Currently field name is vector field name + std::string field_name; + for (auto field_it = fields_schema.begin(); field_it != fields_schema.end(); field_it++) { + auto type = field_it->first->GetFtype(); + if (type == (int64_t)engine::meta::hybrid::DataType::VECTOR_FLOAT || + type == (int64_t)engine::meta::hybrid::DataType::VECTOR_BINARY) { + field_name = field_it->first->GetName(); + break; } } // step 2: check collection existence engine::CollectionIndex index; - status = DBWrapper::DB()->DescribeIndex(collection_name_, index); + status = DBWrapper::SSDB()->DescribeIndex(collection_name_, field_name, index); if (!status.ok()) { return status; } diff --git a/core/src/server/delivery/request/DropCollectionRequest.cpp b/core/src/server/delivery/request/DropCollectionRequest.cpp index ae8f1260..025df4a3 100644 --- a/core/src/server/delivery/request/DropCollectionRequest.cpp +++ b/core/src/server/delivery/request/DropCollectionRequest.cpp @@ -17,6 +17,7 @@ #include #include +#include #include namespace milvus { @@ -47,9 +48,10 @@ DropCollectionRequest::OnExecute() { // step 2: check collection existence // only process root collection, ignore partition collection - engine::meta::CollectionSchema collection_schema; - collection_schema.collection_id_ = collection_name_; - status = DBWrapper::DB()->DescribeCollection(collection_schema); + engine::snapshot::CollectionPtr collection; + std::unordered_map> fields_schema; + + status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema); fiu_do_on("DropCollectionRequest.OnExecute.db_not_found", status = Status(milvus::DB_NOT_FOUND, "")); fiu_do_on("DropCollectionRequest.OnExecute.describe_collection_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); @@ -60,16 +62,12 @@ DropCollectionRequest::OnExecute() { } else { return status; } - } else { - if (!collection_schema.owner_collection_.empty()) { - return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); - } } rc.RecordSection("check validation"); // step 3: Drop collection - status = DBWrapper::DB()->DropCollection(collection_name_); + status = DBWrapper::SSDB()->DropCollection(collection_name_); fiu_do_on("DropCollectionRequest.OnExecute.drop_collection_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { @@ -77,7 +75,7 @@ DropCollectionRequest::OnExecute() { } // step 4: flush to trigger CleanUpFilesWithTTL - status = DBWrapper::DB()->Flush(); + status = DBWrapper::SSDB()->Flush(); rc.ElapseFromBegin("total cost"); } catch (std::exception& ex) { diff --git a/core/src/server/delivery/request/DropIndexRequest.cpp b/core/src/server/delivery/request/DropIndexRequest.cpp index c0c828a2..991bd669 100644 --- a/core/src/server/delivery/request/DropIndexRequest.cpp +++ b/core/src/server/delivery/request/DropIndexRequest.cpp @@ -17,6 +17,8 @@ #include #include +#include +#include namespace milvus { namespace server { @@ -50,12 +52,9 @@ DropIndexRequest::OnExecute() { } // only process root collection, ignore partition collection - engine::meta::CollectionSchema collection_schema; - engine::meta::hybrid::FieldsSchema fields_schema; - collection_schema.collection_id_ = collection_name_; - status = DBWrapper::DB()->DescribeHybridCollection(collection_schema, fields_schema); - - status = DBWrapper::DB()->DescribeCollection(collection_schema); + engine::snapshot::CollectionPtr collection; + std::unordered_map> fields_schema; + status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema); fiu_do_on("DropIndexRequest.OnExecute.collection_not_exist", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { @@ -64,16 +63,12 @@ DropIndexRequest::OnExecute() { } else { return status; } - } else { - if (!collection_schema.owner_collection_.empty()) { - return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); - } } rc.RecordSection("check validation"); // step 2: drop index - status = DBWrapper::DB()->DropIndex(collection_name_); + status = DBWrapper::SSDB()->DropIndex(collection_name_, field_name_); fiu_do_on("DropIndexRequest.OnExecute.drop_index_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { return status; diff --git a/core/src/server/delivery/request/FlushRequest.cpp b/core/src/server/delivery/request/FlushRequest.cpp index f8bec5ef..fef4198c 100644 --- a/core/src/server/delivery/request/FlushRequest.cpp +++ b/core/src/server/delivery/request/FlushRequest.cpp @@ -18,6 +18,7 @@ #include "server/delivery/request/FlushRequest.h" #include +#include #include "server/DBWrapper.h" #include "utils/Log.h" @@ -58,22 +59,18 @@ FlushRequest::OnExecute() { // flush specified collections for (auto& name : collection_names_) { // only process root collection, ignore partition collection - engine::meta::CollectionSchema collection_schema; - collection_schema.collection_id_ = name; - status = DBWrapper::DB()->DescribeCollection(collection_schema); + engine::snapshot::CollectionPtr collection; + std::unordered_map> fields_schema; + status = DBWrapper::SSDB()->DescribeCollection(name, collection, fields_schema); if (!status.ok()) { if (status.code() == DB_NOT_FOUND) { return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(name)); } else { return status; } - } else { - if (!collection_schema.owner_collection_.empty()) { - return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(name)); - } } - status = DBWrapper::DB()->Flush(name); + status = DBWrapper::SSDB()->Flush(name); if (!status.ok()) { return status; } diff --git a/core/src/server/delivery/request/HasCollectionRequest.cpp b/core/src/server/delivery/request/HasCollectionRequest.cpp index fc142965..cb6e853a 100644 --- a/core/src/server/delivery/request/HasCollectionRequest.cpp +++ b/core/src/server/delivery/request/HasCollectionRequest.cpp @@ -47,7 +47,7 @@ HasCollectionRequest::OnExecute() { } // step 2: check collection existence - status = DBWrapper::DB()->HasNativeCollection(collection_name_, has_collection_); + status = DBWrapper::SSDB()->HasCollection(collection_name_, has_collection_); fiu_do_on("HasCollectionRequest.OnExecute.throw_std_exception", throw std::exception()); return status; diff --git a/core/src/server/delivery/request/PreloadCollectionRequest.cpp b/core/src/server/delivery/request/PreloadCollectionRequest.cpp index 93087a65..984d6c8a 100644 --- a/core/src/server/delivery/request/PreloadCollectionRequest.cpp +++ b/core/src/server/delivery/request/PreloadCollectionRequest.cpp @@ -17,6 +17,8 @@ #include #include +#include +#include namespace milvus { namespace server { @@ -45,24 +47,26 @@ PreloadCollectionRequest::OnExecute() { } // only process root collection, ignore partition collection - engine::meta::CollectionSchema collection_schema; - collection_schema.collection_id_ = collection_name_; - status = DBWrapper::DB()->DescribeCollection(collection_schema); + engine::snapshot::CollectionPtr collection; + std::unordered_map> fields_schema; + status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema); if (!status.ok()) { if (status.code() == DB_NOT_FOUND) { return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); } else { return status; } - } else { - if (!collection_schema.owner_collection_.empty()) { - return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); - } + } + + // TODO(yukun): if PreloadCollection interface needs to add field names as params + std::vector field_names; + for (auto field_it : fields_schema) { + field_names.emplace_back(field_it.first->GetName()); } // step 2: force load collection data into cache // load each segment and insert into cache even cache capacity is not enough - status = DBWrapper::DB()->PreloadCollection(context_, collection_name_, true); + status = DBWrapper::SSDB()->LoadCollection(context_, collection_name_, field_names, true); fiu_do_on("PreloadCollectionRequest.OnExecute.preload_collection_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); fiu_do_on("PreloadCollectionRequest.OnExecute.throw_std_exception", throw std::exception()); diff --git a/core/src/server/delivery/request/ShowCollectionInfoRequest.cpp b/core/src/server/delivery/request/ShowCollectionInfoRequest.cpp index 98e05893..791df023 100644 --- a/core/src/server/delivery/request/ShowCollectionInfoRequest.cpp +++ b/core/src/server/delivery/request/ShowCollectionInfoRequest.cpp @@ -22,6 +22,7 @@ #include "utils/TimeRecorder.h" #include +#include #include namespace milvus { @@ -53,22 +54,19 @@ ShowCollectionInfoRequest::OnExecute() { // step 2: check collection existence // only process root collection, ignore partition collection - engine::meta::CollectionSchema collection_schema; - collection_schema.collection_id_ = collection_name_; - status = DBWrapper::DB()->DescribeCollection(collection_schema); + engine::snapshot::CollectionPtr collection; + std::unordered_map> fields_schema; + status = DBWrapper::SSDB()->DescribeCollection(collection_name_, collection, fields_schema); if (!status.ok()) { if (status.code() == DB_NOT_FOUND) { return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); } else { return status; } - } else { - if (!collection_schema.owner_collection_.empty()) { - return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); - } } // step 3: get partitions + // TODO(yukun): SSDBImpl::GetCollectionInfo has not implemented yet status = DBWrapper::DB()->GetCollectionInfo(collection_name_, collection_info_); if (!status.ok()) { return status; diff --git a/core/src/server/delivery/request/ShowCollectionsRequest.cpp b/core/src/server/delivery/request/ShowCollectionsRequest.cpp index d117b795..625a13dc 100644 --- a/core/src/server/delivery/request/ShowCollectionsRequest.cpp +++ b/core/src/server/delivery/request/ShowCollectionsRequest.cpp @@ -38,7 +38,7 @@ ShowCollectionsRequest::OnExecute() { TimeRecorderAuto rc("ShowCollectionsRequest"); std::vector names; - auto status = DBWrapper::DB()->AllCollections(names); + auto status = DBWrapper::SSDB()->AllCollections(names); fiu_do_on("ShowCollectionsRequest.OnExecute.show_collections_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { diff --git a/core/src/server/grpc_impl/GrpcRequestHandler.cpp b/core/src/server/grpc_impl/GrpcRequestHandler.cpp index eb493c3d..aec4f236 100644 --- a/core/src/server/grpc_impl/GrpcRequestHandler.cpp +++ b/core/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -1314,22 +1314,21 @@ GrpcRequestHandler::SearchPB(::grpc::ServerContext* context, const ::milvus::grp } } - engine::QueryResult result; + engine::QueryResultPtr result = std::make_shared(); std::vector field_names; - status = request_handler_.HybridSearch(GetContext(context), request->collection_name(), partition_list, - general_query, query_ptr, json_params, field_names, result); + status = request_handler_.HybridSearch(GetContext(context), query_ptr, json_params, result); // step 6: construct and return result - response->set_row_num(result.row_num_); + response->set_row_num(result->row_num_); auto grpc_entity = response->mutable_entities(); - ConstructEntityResults(result.attrs_, result.vectors_, field_names, grpc_entity); - grpc_entity->mutable_ids()->Resize(static_cast(result.result_ids_.size()), 0); - memcpy(grpc_entity->mutable_ids()->mutable_data(), result.result_ids_.data(), - result.result_ids_.size() * sizeof(int64_t)); + ConstructEntityResults(result->attrs_, result->vectors_, field_names, grpc_entity); + grpc_entity->mutable_ids()->Resize(static_cast(result->result_ids_.size()), 0); + memcpy(grpc_entity->mutable_ids()->mutable_data(), result->result_ids_.data(), + result->result_ids_.size() * sizeof(int64_t)); - response->mutable_distances()->Resize(static_cast(result.result_distances_.size()), 0.0); - memcpy(response->mutable_distances()->mutable_data(), result.result_distances_.data(), - result.result_distances_.size() * sizeof(float)); + response->mutable_distances()->Resize(static_cast(result->result_distances_.size()), 0.0); + memcpy(response->mutable_distances()->mutable_data(), result->result_distances_.data(), + result->result_distances_.size() * sizeof(float)); LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->RequestID().c_str(), __func__); SET_RESPONSE(response->mutable_status(), status, context); @@ -1631,6 +1630,8 @@ GrpcRequestHandler::Search(::grpc::ServerContext* context, const ::milvus::grpc: query::BooleanQueryPtr boolean_query = std::make_shared(); query::QueryPtr query_ptr = std::make_shared(); + query_ptr->collection_id = request->collection_name(); + std::unordered_map vectors; status = DeserializeJsonToBoolQuery(request->vector_param(), request->dsl(), boolean_query, vectors); @@ -1663,6 +1664,8 @@ GrpcRequestHandler::Search(::grpc::ServerContext* context, const ::milvus::grpc: partition_list[i] = request->partition_tag_array(i); } + query_ptr->partitions = partition_list; + milvus::json json_params; for (int i = 0; i < request->extra_params_size(); i++) { const ::milvus::grpc::KeyValuePair& extra = request->extra_params(i); @@ -1671,22 +1674,20 @@ GrpcRequestHandler::Search(::grpc::ServerContext* context, const ::milvus::grpc: } } - engine::QueryResult result; - std::vector field_names; - status = request_handler_.HybridSearch(GetContext(context), request->collection_name(), partition_list, - general_query, query_ptr, json_params, field_names, result); + engine::QueryResultPtr result = std::make_shared(); + status = request_handler_.HybridSearch(GetContext(context), query_ptr, json_params, result); // step 6: construct and return result - response->set_row_num(result.row_num_); - ConstructEntityResults(result.attrs_, result.vectors_, field_names, grpc_entity); + response->set_row_num(result->row_num_); + ConstructEntityResults(result->attrs_, result->vectors_, query_ptr->field_names, grpc_entity); - grpc_entity->mutable_ids()->Resize(static_cast(result.result_ids_.size()), 0); - memcpy(grpc_entity->mutable_ids()->mutable_data(), result.result_ids_.data(), - result.result_ids_.size() * sizeof(int64_t)); + grpc_entity->mutable_ids()->Resize(static_cast(result->result_ids_.size()), 0); + memcpy(grpc_entity->mutable_ids()->mutable_data(), result->result_ids_.data(), + result->result_ids_.size() * sizeof(int64_t)); - response->mutable_distances()->Resize(static_cast(result.result_distances_.size()), 0.0); - memcpy(response->mutable_distances()->mutable_data(), result.result_distances_.data(), - result.result_distances_.size() * sizeof(float)); + response->mutable_distances()->Resize(static_cast(result->result_distances_.size()), 0.0); + memcpy(response->mutable_distances()->mutable_data(), result->result_distances_.data(), + result->result_distances_.size() * sizeof(float)); LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->RequestID().c_str(), __func__); SET_RESPONSE(response->mutable_status(), status, context); diff --git a/core/src/server/web_impl/handler/WebRequestHandler.cpp b/core/src/server/web_impl/handler/WebRequestHandler.cpp index 63ad58d3..350d18ed 100644 --- a/core/src/server/web_impl/handler/WebRequestHandler.cpp +++ b/core/src/server/web_impl/handler/WebRequestHandler.cpp @@ -90,7 +90,8 @@ WebRequestHandler::AddStatusToJson(nlohmann::json& json, int64_t code, const std Status WebRequestHandler::IsBinaryCollection(const std::string& collection_name, bool& bin) { CollectionSchema schema; - auto status = request_handler_.DescribeCollection(context_ptr_, collection_name, schema); + auto status = Status::OK(); + // status = request_handler_.DescribeCollection(context_ptr_, collection_name, schema); if (status.ok()) { auto metric = engine::MetricType(schema.metric_type_); bin = engine::MetricType::HAMMING == metric || engine::MetricType::JACCARD == metric || @@ -136,7 +137,8 @@ WebRequestHandler::CopyRecordsFromJson(const nlohmann::json& json, engine::Vecto Status WebRequestHandler::GetCollectionMetaInfo(const std::string& collection_name, nlohmann::json& json_out) { CollectionSchema schema; - auto status = request_handler_.DescribeCollection(context_ptr_, collection_name, schema); + auto status = Status::OK(); + // status = request_handler_.DescribeCollection(context_ptr_, collection_name, schema); if (!status.ok()) { return status; } @@ -835,37 +837,35 @@ WebRequestHandler::HybridSearch(const std::string& collection_name, const nlohma query_ptr_->root = general_query->bin; - engine::QueryResult result; - std::vector field_names; - status = request_handler_.HybridSearch(context_ptr_, collection_name, partition_tags, general_query, query_ptr_, - extra_params, field_names, result); + engine::QueryResultPtr result = std::make_shared(); + status = request_handler_.HybridSearch(context_ptr_, query_ptr_, extra_params, result); if (!status.ok()) { return status; } nlohmann::json result_json; - result_json["num"] = result.row_num_; - if (result.row_num_ == 0) { + result_json["num"] = result->row_num_; + if (result->row_num_ == 0) { result_json["result"] = std::vector(); result_str = result_json.dump(); return Status::OK(); } - auto step = result.result_ids_.size() / result.row_num_; + auto step = result->result_ids_.size() / result->row_num_; nlohmann::json search_result_json; - for (int64_t i = 0; i < result.row_num_; i++) { + for (int64_t i = 0; i < result->row_num_; i++) { nlohmann::json raw_result_json; for (size_t j = 0; j < step; j++) { nlohmann::json one_result_json; - one_result_json["id"] = std::to_string(result.result_ids_.at(i * step + j)); - one_result_json["distance"] = std::to_string(result.result_distances_.at(i * step + j)); + one_result_json["id"] = std::to_string(result->result_ids_.at(i * step + j)); + one_result_json["distance"] = std::to_string(result->result_distances_.at(i * step + j)); raw_result_json.emplace_back(one_result_json); } search_result_json.emplace_back(raw_result_json); } nlohmann::json attr_json; - ConvertRowToColumnJson(result.attrs_, field_names, result.row_num_, attr_json); + ConvertRowToColumnJson(result->attrs_, query_ptr_->field_names, result->row_num_, attr_json); result_json["Entity"] = attr_json; result_json["result"] = search_result_json; result_str = result_json.dump(); @@ -1261,10 +1261,11 @@ WebRequestHandler::CreateCollection(const CollectionRequestDto::ObjectWrapper& c RETURN_STATUS_DTO(ILLEGAL_METRIC_TYPE, "metric_type is illegal") } - auto status = request_handler_.CreateCollection( - context_ptr_, collection_schema->collection_name->std_str(), collection_schema->dimension, - collection_schema->index_file_size, - static_cast(MetricNameMap.at(collection_schema->metric_type->std_str()))); + auto status = Status::OK(); + // auto status = request_handler_.CreateCollection( + // context_ptr_, collection_schema->collection_name->std_str(), collection_schema->dimension, + // collection_schema->index_file_size, + // static_cast(MetricNameMap.at(collection_schema->metric_type->std_str()))); ASSIGN_RETURN_STATUS_DTO(status) } diff --git a/core/unittest/ssdb/CMakeLists.txt b/core/unittest/ssdb/CMakeLists.txt index 0cd2e9c1..2f54f9fb 100644 --- a/core/unittest/ssdb/CMakeLists.txt +++ b/core/unittest/ssdb/CMakeLists.txt @@ -31,6 +31,7 @@ add_executable(test_ssdb ${codecs_snapshot_files} ${config_files} ${config_handler_files} + ${db_attr_files} ${db_main_files} ${db_attr_files} ${db_engine_files} -- GitLab