From d91facc86d512b940417198e4bfb5bcc3354ef4f Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Thu, 30 Jul 2020 09:37:27 +0800 Subject: [PATCH] fix collection ci (#3061) * fix set config issue Signed-off-by: yudong.cai * add auto_flush_interval observer in DBImpl Signed-off-by: yudong.cai * fix some issues in SegmentReader Signed-off-by: yudong.cai * update CreateCollection interface Signed-off-by: yudong.cai * fix clang-format Signed-off-by: yudong.cai * update Signed-off-by: yudong.cai --- core/src/config/ConfigMgr.cpp | 205 ++++++++++-------- core/src/config/ConfigMgr.h | 2 +- core/src/db/DBImpl.cpp | 13 ++ core/src/db/DBImpl.h | 6 +- core/src/db/Types.cpp | 2 +- core/src/db/Types.h | 2 +- core/src/segment/SegmentReader.cpp | 20 +- core/src/server/delivery/ReqHandler.cpp | 7 +- core/src/server/delivery/ReqHandler.h | 4 +- core/src/server/delivery/request/BaseReq.h | 10 +- .../delivery/request/CreateCollectionReq.cpp | 56 +++-- .../delivery/request/CreateCollectionReq.h | 12 +- .../delivery/request/GetCollectionInfoReq.cpp | 15 +- .../server/grpc_impl/GrpcRequestHandler.cpp | 59 +++-- .../src/server/grpc_impl/GrpcRequestHandler.h | 3 - .../web_impl/handler/WebRequestHandler.cpp | 31 ++- 16 files changed, 238 insertions(+), 209 deletions(-) diff --git a/core/src/config/ConfigMgr.cpp b/core/src/config/ConfigMgr.cpp index cd5171a2..1b2d1dac 100644 --- a/core/src/config/ConfigMgr.cpp +++ b/core/src/config/ConfigMgr.cpp @@ -69,107 +69,132 @@ ConfigMgr ConfigMgr::instance; ConfigMgr::ConfigMgr() { config_list_ = { /* version */ - CreateStringConfig("version", false, &config.version.value, "unknown", nullptr, nullptr), + {"version", CreateStringConfig("version", false, &config.version.value, "unknown", nullptr, nullptr)}, /* cluster */ - CreateBoolConfig("cluster.enable", false, &config.cluster.enable.value, false, nullptr, nullptr), - CreateEnumConfig("cluster.role", false, &ClusterRoleMap, &config.cluster.role.value, ClusterRole::RW, nullptr, - nullptr), + {"cluster.enable", + CreateBoolConfig("cluster.enable", false, &config.cluster.enable.value, false, nullptr, nullptr)}, + {"cluster.role", CreateEnumConfig("cluster.role", false, &ClusterRoleMap, &config.cluster.role.value, + ClusterRole::RW, nullptr, nullptr)}, /* general */ - CreateStringConfig("general.timezone", false, &config.general.timezone.value, "UTC+8", nullptr, nullptr), - CreateStringConfig("general.meta_uri", false, &config.general.meta_uri.value, "sqlite://:@:/", nullptr, - nullptr), + {"general.timezone", + CreateStringConfig("general.timezone", false, &config.general.timezone.value, "UTC+8", nullptr, nullptr)}, + {"general.meta_uri", CreateStringConfig("general.meta_uri", false, &config.general.meta_uri.value, + "sqlite://:@:/", nullptr, nullptr)}, /* network */ - CreateStringConfig("network.bind.address", false, &config.network.bind.address.value, "0.0.0.0", nullptr, - nullptr), - CreateIntegerConfig("network.bind.port", false, 0, 65535, &config.network.bind.port.value, 19530, nullptr, - nullptr), - CreateBoolConfig("network.http.enable", false, &config.network.http.enable.value, true, nullptr, nullptr), - CreateIntegerConfig("network.http.port", false, 0, 65535, &config.network.http.port.value, 19121, nullptr, - nullptr), + {"network.bind.address", CreateStringConfig("network.bind.address", false, &config.network.bind.address.value, + "0.0.0.0", nullptr, nullptr)}, + {"network.bind.port", CreateIntegerConfig("network.bind.port", false, 0, 65535, &config.network.bind.port.value, + 19530, nullptr, nullptr)}, + {"network.http.enable", + CreateBoolConfig("network.http.enable", false, &config.network.http.enable.value, true, nullptr, nullptr)}, + {"network.http.port", CreateIntegerConfig("network.http.port", false, 0, 65535, &config.network.http.port.value, + 19121, nullptr, nullptr)}, /* storage */ - CreateStringConfig("storage.path", false, &config.storage.path.value, "/var/lib/milvus", nullptr, nullptr), - CreateIntegerConfig("storage.auto_flush_interval", false, 0, std::numeric_limits::max(), - &config.storage.auto_flush_interval.value, 1, nullptr, nullptr), - CreateIntegerConfig("storage.file_cleanup_timeout", false, 0, 3600, &config.storage.file_cleanup_timeout.value, - 10, nullptr, nullptr), + {"storage.path", + CreateStringConfig("storage.path", false, &config.storage.path.value, "/var/lib/milvus", nullptr, nullptr)}, + {"storage.auto_flush_interval", + CreateIntegerConfig("storage.auto_flush_interval", true, 0, std::numeric_limits::max(), + &config.storage.auto_flush_interval.value, 1, nullptr, nullptr)}, + {"storage.file_cleanup_timeout", + CreateIntegerConfig("storage.file_cleanup_timeout", false, 0, 3600, &config.storage.file_cleanup_timeout.value, + 10, nullptr, nullptr)}, /* wal */ - CreateBoolConfig("wal.enable", false, &config.wal.enable.value, true, nullptr, nullptr), - CreateBoolConfig("wal.recovery_error_ignore", false, &config.wal.recovery_error_ignore.value, false, nullptr, - nullptr), - CreateSizeConfig("wal.buffer_size", false, 64 * MB, 4096 * MB, &config.wal.buffer_size.value, 256 * MB, nullptr, - nullptr), - CreateStringConfig("wal.path", false, &config.wal.path.value, "/var/lib/milvus/wal", nullptr, nullptr), + {"wal.enable", CreateBoolConfig("wal.enable", false, &config.wal.enable.value, true, nullptr, nullptr)}, + {"wal.recovery_error_ignore", + CreateBoolConfig("wal.recovery_error_ignore", false, &config.wal.recovery_error_ignore.value, false, nullptr, + nullptr)}, + {"wal.buffer_size", CreateSizeConfig("wal.buffer_size", false, 64 * MB, 4096 * MB, + &config.wal.buffer_size.value, 256 * MB, nullptr, nullptr)}, + {"wal.path", + CreateStringConfig("wal.path", false, &config.wal.path.value, "/var/lib/milvus/wal", nullptr, nullptr)}, /* cache */ - CreateSizeConfig("cache.cache_size", true, 0, std::numeric_limits::max(), - &config.cache.cache_size.value, 4 * GB, nullptr, nullptr), - CreateFloatingConfig("cache.cpu_cache_threshold", false, 0.0, 1.0, &config.cache.cpu_cache_threshold.value, 0.7, - nullptr, nullptr), - CreateSizeConfig("cache.insert_buffer_size", false, 0, std::numeric_limits::max(), - &config.cache.insert_buffer_size.value, 1 * GB, nullptr, nullptr), - CreateBoolConfig("cache.cache_insert_data", false, &config.cache.cache_insert_data.value, false, nullptr, - nullptr), - CreateStringConfig("cache.preload_collection", false, &config.cache.preload_collection.value, "", nullptr, - nullptr), + {"cache.cache_size", CreateSizeConfig("cache.cache_size", true, 0, std::numeric_limits::max(), + &config.cache.cache_size.value, 4 * GB, nullptr, nullptr)}, + {"cache.cpu_cache_threshold", + CreateFloatingConfig("cache.cpu_cache_threshold", false, 0.0, 1.0, &config.cache.cpu_cache_threshold.value, + 0.7, nullptr, nullptr)}, + {"cache.insert_buffer_size", + CreateSizeConfig("cache.insert_buffer_size", false, 0, std::numeric_limits::max(), + &config.cache.insert_buffer_size.value, 1 * GB, nullptr, nullptr)}, + {"cache.cache_insert_data", CreateBoolConfig("cache.cache_insert_data", false, + &config.cache.cache_insert_data.value, false, nullptr, nullptr)}, + {"cache.preload_collection", CreateStringConfig("cache.preload_collection", false, + &config.cache.preload_collection.value, "", nullptr, nullptr)}, /* gpu */ - CreateBoolConfig("gpu.enable", false, &config.gpu.enable.value, false, nullptr, nullptr), - CreateSizeConfig("gpu.cache_size", true, 0, std::numeric_limits::max(), &config.gpu.cache_size.value, - 1 * GB, nullptr, nullptr), - CreateFloatingConfig("gpu.cache_threshold", false, 0.0, 1.0, &config.gpu.cache_threshold.value, 0.7, nullptr, - nullptr), - CreateIntegerConfig("gpu.gpu_search_threshold", true, 0, std::numeric_limits::max(), - &config.gpu.gpu_search_threshold.value, 1000, nullptr, nullptr), - CreateStringConfig("gpu.search_devices", false, &config.gpu.search_devices.value, "gpu0", nullptr, nullptr), - CreateStringConfig("gpu.build_index_devices", false, &config.gpu.build_index_devices.value, "gpu0", nullptr, - nullptr), + {"gpu.enable", CreateBoolConfig("gpu.enable", false, &config.gpu.enable.value, false, nullptr, nullptr)}, + {"gpu.cache_size", CreateSizeConfig("gpu.cache_size", true, 0, std::numeric_limits::max(), + &config.gpu.cache_size.value, 1 * GB, nullptr, nullptr)}, + {"gpu.cache_threshold", CreateFloatingConfig("gpu.cache_threshold", false, 0.0, 1.0, + &config.gpu.cache_threshold.value, 0.7, nullptr, nullptr)}, + {"gpu.gpu_search_threshold", + CreateIntegerConfig("gpu.gpu_search_threshold", true, 0, std::numeric_limits::max(), + &config.gpu.gpu_search_threshold.value, 1000, nullptr, nullptr)}, + {"gpu.search_devices", + CreateStringConfig("gpu.search_devices", false, &config.gpu.search_devices.value, "gpu0", nullptr, nullptr)}, + {"gpu.build_index_devices", + CreateStringConfig("gpu.build_index_devices", false, &config.gpu.build_index_devices.value, "gpu0", nullptr, + nullptr)}, /* log */ - CreateStringConfig("logs.level", false, &config.logs.level.value, "debug", nullptr, nullptr), - CreateBoolConfig("logs.trace.enable", false, &config.logs.trace.enable.value, true, nullptr, nullptr), - CreateStringConfig("logs.path", false, &config.logs.path.value, "/var/lib/milvus/logs", nullptr, nullptr), - CreateSizeConfig("logs.max_log_file_size", false, 512 * MB, 4096 * MB, &config.logs.max_log_file_size.value, - 1024 * MB, nullptr, nullptr), - CreateIntegerConfig("logs.log_rotate_num", false, 0, 1024, &config.logs.log_rotate_num.value, 0, nullptr, - nullptr), + {"logs.level", CreateStringConfig("logs.level", false, &config.logs.level.value, "debug", nullptr, nullptr)}, + {"logs.trace.enable", + CreateBoolConfig("logs.trace.enable", false, &config.logs.trace.enable.value, true, nullptr, nullptr)}, + {"logs.path", + CreateStringConfig("logs.path", false, &config.logs.path.value, "/var/lib/milvus/logs", nullptr, nullptr)}, + {"logs.max_log_file_size", CreateSizeConfig("logs.max_log_file_size", false, 512 * MB, 4096 * MB, + &config.logs.max_log_file_size.value, 1024 * MB, nullptr, nullptr)}, + {"logs.log_rotate_num", CreateIntegerConfig("logs.log_rotate_num", false, 0, 1024, + &config.logs.log_rotate_num.value, 0, nullptr, nullptr)}, /* metric */ - CreateBoolConfig("metric.enable", false, &config.metric.enable.value, false, nullptr, nullptr), - CreateStringConfig("metric.address", false, &config.metric.address.value, "127.0.0.1", nullptr, nullptr), - CreateIntegerConfig("metric.port", false, 1024, 65535, &config.metric.port.value, 9091, nullptr, nullptr), + {"metric.enable", + CreateBoolConfig("metric.enable", false, &config.metric.enable.value, false, nullptr, nullptr)}, + {"metric.address", + CreateStringConfig("metric.address", false, &config.metric.address.value, "127.0.0.1", nullptr, nullptr)}, + {"metric.port", + CreateIntegerConfig("metric.port", false, 1024, 65535, &config.metric.port.value, 9091, nullptr, nullptr)}, /* tracing */ - CreateStringConfig("tracing.json_config_path", false, &config.tracing.json_config_path.value, "", nullptr, - nullptr), + {"tracing.json_config_path", CreateStringConfig("tracing.json_config_path", false, + &config.tracing.json_config_path.value, "", nullptr, nullptr)}, /* invisible */ /* engine */ - CreateIntegerConfig("engine.search_combine_nq", true, 0, std::numeric_limits::max(), - &config.engine.search_combine_nq.value, 64, nullptr, nullptr), - CreateIntegerConfig("engine.use_blas_threshold", true, 0, std::numeric_limits::max(), - &config.engine.use_blas_threshold.value, 1100, nullptr, nullptr), - CreateIntegerConfig("engine.omp_thread_num", true, 0, std::numeric_limits::max(), - &config.engine.omp_thread_num.value, 0, nullptr, nullptr), - CreateEnumConfig("engine.simd_type", false, &SimdMap, &config.engine.simd_type.value, SimdType::AUTO, nullptr, - nullptr), + {"engine.search_combine_nq", + CreateIntegerConfig("engine.search_combine_nq", true, 0, std::numeric_limits::max(), + &config.engine.search_combine_nq.value, 64, nullptr, nullptr)}, + {"engine.use_blas_threshold", + CreateIntegerConfig("engine.use_blas_threshold", true, 0, std::numeric_limits::max(), + &config.engine.use_blas_threshold.value, 1100, nullptr, nullptr)}, + {"engine.omp_thread_num", + CreateIntegerConfig("engine.omp_thread_num", true, 0, std::numeric_limits::max(), + &config.engine.omp_thread_num.value, 0, nullptr, nullptr)}, + {"engine.simd_type", CreateEnumConfig("engine.simd_type", false, &SimdMap, &config.engine.simd_type.value, + SimdType::AUTO, nullptr, nullptr)}, /* db */ - CreateFloatingConfig("db.archive_disk_threshold", false, 0.0, 1.0, &config.db.archive_disk_threshold.value, 0.0, - nullptr, nullptr), - CreateIntegerConfig("db.archive_days_threshold", false, 0, std::numeric_limits::max(), - &config.db.archive_days_threshold.value, 0, nullptr, nullptr), + {"db.archive_disk_threshold", + CreateFloatingConfig("db.archive_disk_threshold", false, 0.0, 1.0, &config.db.archive_disk_threshold.value, + 0.0, nullptr, nullptr)}, + {"db.archive_days_threshold", + CreateIntegerConfig("db.archive_days_threshold", false, 0, std::numeric_limits::max(), + &config.db.archive_days_threshold.value, 0, nullptr, nullptr)}, }; } void ConfigMgr::Init() { std::lock_guard lock(GetConfigMutex()); - for (auto& config : config_list_) config->Init(); + for (auto& kv : config_list_) { + kv.second->Init(); + } } void @@ -187,33 +212,41 @@ ConfigMgr::Load(const std::string& path) { void ConfigMgr::Set(const std::string& name, const std::string& value, bool update) { - for (auto& config : config_list_) { - if (std::strcmp(name.c_str(), config->name_) == 0) { - std::lock_guard lock(GetConfigMutex()); - if (not update || config->modifiable_) { - ThrowIfNotSuccess(config->Set(value, update)); - Notify(name); - return; - } + try { + auto& config = config_list_.at(name); + std::unique_lock lock(GetConfigMutex()); + /* update=false when loading from config file */ + if (not update) { + ThrowIfNotSuccess(config->Set(value, update)); + } else if (config->modifiable_) { + /* set manually */ + ThrowIfNotSuccess(config->Set(value, update)); + lock.unlock(); + Notify(name); } + } catch (...) { + throw "Config " + name + " not found."; } - throw "Config " + name + " not found."; } std::string ConfigMgr::Get(const std::string& name) const { - for (auto& config : config_list_) - if (std::strcmp(name.c_str(), config->name_) == 0) { - std::lock_guard lock(GetConfigMutex()); - return config->Get(); - } - throw "Config " + name + " not found."; + try { + auto& config = config_list_.at(name); + std::lock_guard lock(GetConfigMutex()); + return config->Get(); + } catch (...) { + throw "Config " + name + " not found."; + } } std::string ConfigMgr::Dump() const { std::stringstream ss; - for (auto& config : config_list_) ss << config->name_ << ": " << config->Get() << std::endl; + for (auto& kv : config_list_) { + auto& config = kv.second; + ss << config->name_ << ": " << config->Get() << std::endl; + } return ss.str(); } diff --git a/core/src/config/ConfigMgr.h b/core/src/config/ConfigMgr.h index 48f0f928..802ae250 100644 --- a/core/src/config/ConfigMgr.h +++ b/core/src/config/ConfigMgr.h @@ -82,7 +82,7 @@ class ConfigMgr { Notify(const std::string& name); private: - std::vector config_list_; + std::unordered_map config_list_; std::mutex mutex_; std::unordered_map> observers_; diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index fa519bfc..1192d761 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -73,10 +73,16 @@ DBImpl::DBImpl(const DBOptions& options) // mxlog_config.mxlog_path = options_.mxlog_path_; // wal_mgr_ = std::make_shared(mxlog_config); } + + /* watch on storage.auto_flush_interval */ + ConfigMgr::GetInstance().Attach("storage.auto_flush_interval", this); + Start(); } DBImpl::~DBImpl() { + ConfigMgr::GetInstance().Detach("storage.auto_flush_interval", this); + Stop(); } @@ -1284,5 +1290,12 @@ DBImpl::ResumeIfLast() { } } +void +DBImpl::ConfigUpdate(const std::string& name) { + if (name == "storage.auto_flush_interval") { + options_.auto_flush_interval_ = config.storage.auto_flush_interval(); + } +} + } // namespace engine } // namespace milvus diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index 395a91d7..90112507 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -23,13 +23,14 @@ #include "db/DB.h" +#include "config/ConfigMgr.h" #include "utils/ThreadPool.h" #include "wal/WalManager.h" namespace milvus { namespace engine { -class DBImpl : public DB { +class DBImpl : public DB, public ConfigObserver { public: explicit DBImpl(const DBOptions& options); @@ -115,6 +116,9 @@ class DBImpl : public DB { Status Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold = 0.0) override; + void + ConfigUpdate(const std::string& name) override; + private: void InternalFlush(const std::string& collection_name = ""); diff --git a/core/src/db/Types.cpp b/core/src/db/Types.cpp index 8dc2d103..fd71ccce 100644 --- a/core/src/db/Types.cpp +++ b/core/src/db/Types.cpp @@ -22,7 +22,7 @@ const char* DEFAULT_DELETED_DOCS_NAME = "_del"; const char* DEFAULT_INDEX_NAME = "_idx"; const char* DEFAULT_INDEX_COMPRESS_NAME = "_compress"; -const char* PARAM_COLLECTION_DIMENSION = "dim"; +const char* PARAM_DIMENSION = "dim"; const char* PARAM_INDEX_METRIC_TYPE = "metric_type"; const char* PARAM_INDEX_EXTRA_PARAMS = "extra_params"; const char* PARAM_SEGMENT_SIZE = "segment_size"; diff --git a/core/src/db/Types.h b/core/src/db/Types.h index 8ab9a106..95b29d13 100644 --- a/core/src/db/Types.h +++ b/core/src/db/Types.h @@ -85,7 +85,7 @@ extern const char* DEFAULT_DELETED_DOCS_NAME; extern const char* DEFAULT_INDEX_NAME; extern const char* DEFAULT_INDEX_COMPRESS_NAME; -extern const char* PARAM_COLLECTION_DIMENSION; +extern const char* PARAM_DIMENSION; extern const char* PARAM_INDEX_METRIC_TYPE; extern const char* PARAM_INDEX_EXTRA_PARAMS; extern const char* PARAM_SEGMENT_SIZE; diff --git a/core/src/segment/SegmentReader.cpp b/core/src/segment/SegmentReader.cpp index 463cd3d9..dfdf6258 100644 --- a/core/src/segment/SegmentReader.cpp +++ b/core/src/segment/SegmentReader.cpp @@ -72,9 +72,9 @@ SegmentReader::Initialize() { int64_t field_width = 0; int64_t dimension = params[knowhere::meta::DIM]; if (ftype == engine::FIELD_TYPE::VECTOR_BINARY) { - field_width += (dimension / 8); + field_width = (dimension / 8); } else { - field_width += (dimension * sizeof(float)); + field_width = (dimension * sizeof(float)); } segment_ptr_->AddField(name, ftype, field_width); } else { @@ -241,12 +241,8 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex std::make_shared(segment_commit->GetRowCount()); segment::DeletedDocsPtr deleted_docs_ptr; - auto status = LoadDeletedDocs(deleted_docs_ptr); - if (!status.ok()) { - return status; - } - - if (deleted_docs_ptr) { + STATUS_CHECK(LoadDeletedDocs(deleted_docs_ptr)); + if (deleted_docs_ptr != nullptr) { auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs(); for (auto& offset : deleted_docs) { concurrent_bitset_ptr->set(offset); @@ -255,10 +251,7 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex // load uids std::vector uids; - status = LoadUids(uids); - if (!status.ok()) { - return status; - } + STATUS_CHECK(LoadUids(uids)); knowhere::BinarySet index_data; knowhere::BinaryPtr raw_data, compress_data; @@ -278,14 +271,13 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex // if index not specified, or index file not created, return IDMAP auto index_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_INDEX); if (index_visitor == nullptr || index_visitor->GetFile() == nullptr) { - auto& snapshot = segment_visitor_->GetSnapshot(); auto& json = field->GetParams(); if (json.find(knowhere::meta::DIM) == json.end()) { return Status(DB_ERROR, "Vector field dimension undefined"); } int64_t dimension = json[knowhere::meta::DIM]; std::vector raw; - LoadField(field_name, raw); + STATUS_CHECK(LoadField(field_name, raw)); auto dataset = knowhere::GenDataset(segment_commit->GetRowCount(), dimension, raw.data()); knowhere::VecIndexFactory& vec_index_factory = knowhere::VecIndexFactory::GetInstance(); diff --git a/core/src/server/delivery/ReqHandler.cpp b/core/src/server/delivery/ReqHandler.cpp index c6002d88..1764837c 100644 --- a/core/src/server/delivery/ReqHandler.cpp +++ b/core/src/server/delivery/ReqHandler.cpp @@ -43,11 +43,8 @@ namespace server { Status ReqHandler::CreateCollection(const std::shared_ptr& context, const std::string& collection_name, - std::unordered_map& field_types, - std::unordered_map& field_index_params, - std::unordered_map& field_params, milvus::json& json_param) { - BaseReqPtr req_ptr = CreateCollectionReq::Create(context, collection_name, field_types, field_index_params, - field_params, json_param); + std::unordered_map& fields, milvus::json& json_param) { + BaseReqPtr req_ptr = CreateCollectionReq::Create(context, collection_name, fields, json_param); ReqScheduler::ExecReq(req_ptr); return req_ptr->status(); } diff --git a/core/src/server/delivery/ReqHandler.h b/core/src/server/delivery/ReqHandler.h index 84750026..b4417843 100644 --- a/core/src/server/delivery/ReqHandler.h +++ b/core/src/server/delivery/ReqHandler.h @@ -32,9 +32,7 @@ class ReqHandler { Status CreateCollection(const std::shared_ptr& context, const std::string& collection_name, - std::unordered_map& field_types, - std::unordered_map& field_index_params, - std::unordered_map& field_params, milvus::json& json_params); + std::unordered_map& fields, milvus::json& json_params); Status DropCollection(const std::shared_ptr& context, const std::string& collection_name); diff --git a/core/src/server/delivery/request/BaseReq.h b/core/src/server/delivery/request/BaseReq.h index 3af39d43..cd31dee0 100644 --- a/core/src/server/delivery/request/BaseReq.h +++ b/core/src/server/delivery/request/BaseReq.h @@ -32,11 +32,15 @@ namespace milvus { namespace server { +struct FieldSchema { + engine::FieldType field_type_; + milvus::json field_params_; + milvus::json index_params_; +}; + struct CollectionSchema { std::string collection_name_; - std::unordered_map field_types_; - std::unordered_map index_params_; - std::unordered_map field_params_; + std::unordered_map fields_; milvus::json extra_params_; }; diff --git a/core/src/server/delivery/request/CreateCollectionReq.cpp b/core/src/server/delivery/request/CreateCollectionReq.cpp index c5033109..fef25444 100644 --- a/core/src/server/delivery/request/CreateCollectionReq.cpp +++ b/core/src/server/delivery/request/CreateCollectionReq.cpp @@ -30,25 +30,18 @@ namespace server { CreateCollectionReq::CreateCollectionReq(const std::shared_ptr& context, const std::string& collection_name, - std::unordered_map& field_types, - std::unordered_map& field_index_params, - std::unordered_map& field_params, + std::unordered_map& fields, milvus::json& extra_params) : BaseReq(context, BaseReq::kCreateCollection), collection_name_(collection_name), - field_types_(field_types), - field_index_params_(field_index_params), - field_params_(field_params), + fields_(fields), extra_params_(extra_params) { } BaseReqPtr CreateCollectionReq::Create(const std::shared_ptr& context, const std::string& collection_name, - std::unordered_map& field_types, - std::unordered_map& field_index_params, - std::unordered_map& field_params, milvus::json& extra_params) { - return std::shared_ptr( - new CreateCollectionReq(context, collection_name, field_types, field_index_params, field_params, extra_params)); + std::unordered_map& fields, milvus::json& extra_params) { + return std::shared_ptr(new CreateCollectionReq(context, collection_name, fields, extra_params)); } Status @@ -69,37 +62,42 @@ CreateCollectionReq::OnExecute() { // step 2: create snapshot collection context engine::snapshot::CreateCollectionContext create_collection_context; - auto ss_collection_schema = std::make_shared(collection_name_, extra_params_); - create_collection_context.collection = ss_collection_schema; - for (auto& field_type : field_types_) { - std::string field_name = field_type.first; - auto index_params = field_index_params_.at(field_name); + auto collection_schema = std::make_shared(collection_name_, extra_params_); + create_collection_context.collection = collection_schema; + for (auto& field_kv : fields_) { + auto& field_name = field_kv.first; + auto& field_schema = field_kv.second; + + auto& field_type = field_schema.field_type_; + auto& field_params = field_schema.field_params_; + auto& index_params = field_schema.index_params_; + + std::cout << index_params.dump() << std::endl; std::string index_name; if (index_params.contains("name")) { index_name = index_params["name"]; } - int64_t dimension = 0; - json field_params; - if (!field_params_.at(field_name).empty()) { - field_params = json::parse(field_params_.at(field_name)); - if (field_type.second == engine::meta::DataType::VECTOR_FLOAT || - field_type.second == engine::meta::DataType::VECTOR_BINARY) { - if (!field_params.contains(engine::PARAM_COLLECTION_DIMENSION)) { - return Status{milvus::SERVER_INVALID_VECTOR_DIMENSION, - "Dimension should be defined in vector field extra_params"}; - } + std::cout << field_params.dump() << std::endl; + if (field_type == engine::FieldType::VECTOR_FLOAT || field_type == engine::FieldType::VECTOR_BINARY) { + if (!field_params.contains(engine::PARAM_DIMENSION)) { + return Status(SERVER_INVALID_VECTOR_DIMENSION, "Dimension not defined in field_params"); } } - auto field = std::make_shared(field_name, 0, (engine::FieldType)field_type.second, - field_params); - + auto field = std::make_shared(field_name, 0, field_type, field_params); auto field_element = std::make_shared( 0, 0, index_name, engine::FieldElementType::FET_INDEX, index_params); create_collection_context.fields_schema[field] = {field_element}; } + if (!extra_params_.contains(engine::PARAM_SEGMENT_SIZE)) { + return Status(SERVER_UNEXPECTED_ERROR, "Segment size not defined"); + } else { + auto segment_size = extra_params_[engine::PARAM_SEGMENT_SIZE].get(); + STATUS_CHECK(ValidateCollectionIndexFileSize(segment_size)); + } + // step 3: create collection status = DBWrapper::DB()->CreateCollection(create_collection_context); fiu_do_on("CreateCollectionReq.OnExecute.invalid_db_execute", diff --git a/core/src/server/delivery/request/CreateCollectionReq.h b/core/src/server/delivery/request/CreateCollectionReq.h index 4a811cbf..c6fee6c7 100644 --- a/core/src/server/delivery/request/CreateCollectionReq.h +++ b/core/src/server/delivery/request/CreateCollectionReq.h @@ -26,24 +26,18 @@ class CreateCollectionReq : public BaseReq { public: static BaseReqPtr Create(const std::shared_ptr& context, const std::string& collection_name, - std::unordered_map& field_types, - std::unordered_map& field_index_params, - std::unordered_map& field_params, milvus::json& extra_params); + std::unordered_map& fields, milvus::json& extra_params); protected: CreateCollectionReq(const std::shared_ptr& context, const std::string& collection_name, - std::unordered_map& field_types, - std::unordered_map& field_index_params, - std::unordered_map& field_params, milvus::json& extra_params); + std::unordered_map& fields, milvus::json& extra_params); Status OnExecute() override; private: const std::string collection_name_; - std::unordered_map field_types_; - std::unordered_map field_index_params_; - std::unordered_map field_params_; + std::unordered_map fields_; milvus::json extra_params_; }; diff --git a/core/src/server/delivery/request/GetCollectionInfoReq.cpp b/core/src/server/delivery/request/GetCollectionInfoReq.cpp index 5652b782..07584da2 100644 --- a/core/src/server/delivery/request/GetCollectionInfoReq.cpp +++ b/core/src/server/delivery/request/GetCollectionInfoReq.cpp @@ -57,21 +57,22 @@ GetCollectionInfoReq::OnExecute() { continue; } - milvus::json json_index_param; + milvus::json field_index_param; auto field_elements = field_kv.second; for (const auto& element : field_elements) { if (element->GetFtype() == (engine::snapshot::FTYPE_TYPE)engine::FieldElementType::FET_INDEX) { - json_index_param = element->GetParams().dump(); + field_index_param = element->GetParams(); break; } } auto field_name = field->GetName(); - collection_schema_.field_types_.insert( - std::make_pair(field_name, (engine::meta::DataType)field->GetFtype())); - collection_schema_.index_params_.insert(std::make_pair(field_name, json_index_param)); - milvus::json json_extra_param = field->GetParams(); - collection_schema_.field_params_.insert(std::make_pair(field_name, json_extra_param)); + FieldSchema field_schema; + field_schema.field_type_ = (engine::FieldType)field->GetFtype(); + field_schema.field_params_ = field->GetParams(); + field_schema.index_params_ = field_index_param; + + collection_schema_.fields_.insert(std::make_pair(field_name, field_schema)); } rc.ElapseFromBegin("done"); diff --git a/core/src/server/grpc_impl/GrpcRequestHandler.cpp b/core/src/server/grpc_impl/GrpcRequestHandler.cpp index e0d5713f..76710f23 100644 --- a/core/src/server/grpc_impl/GrpcRequestHandler.cpp +++ b/core/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -626,34 +626,31 @@ GrpcRequestHandler::CreateCollection(::grpc::ServerContext* context, const ::mil CHECK_NULLPTR_RETURN(request); LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - std::unordered_map field_types; - std::unordered_map field_index_params; - std::unordered_map field_params; + std::unordered_map fields; + if (request->fields_size() > MAXIMUM_FIELD_NUM) { Status status = Status{SERVER_INVALID_FIELD_NUM, "Maximum field's number should be limited to 64"}; LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); SET_RESPONSE(response, status, context); return ::grpc::Status::OK; } + for (int i = 0; i < request->fields_size(); ++i) { const auto& field = request->fields(i); - auto field_name = field.name(); - field_types.insert(std::make_pair(field_name, (engine::meta::DataType)field.type())); - milvus::json index_param; - for (int j = 0; j < field.index_params_size(); j++) { - index_param[field.index_params(j).key()] = field.index_params(j).value(); - } - field_index_params.insert(std::make_pair(field_name, index_param)); + FieldSchema field_schema; + field_schema.field_type_ = (engine::FieldType)field.type(); // Currently only one extra_param - if (request->fields(i).extra_params_size() != 0) { - auto extra_params = std::make_pair(request->fields(i).name(), request->fields(i).extra_params(0).value()); - field_params.insert(extra_params); - } else { - auto extra_params = std::make_pair(request->fields(i).name(), ""); - field_params.insert(extra_params); + if (field.extra_params_size() != 0) { + field_schema.field_params_ = json::parse(field.extra_params(0).value()); } + + for (int j = 0; j < field.index_params_size(); j++) { + field_schema.index_params_[field.index_params(j).key()] = field.index_params(j).value(); + } + + fields[field.name()] = field_schema; } milvus::json json_params; @@ -664,8 +661,7 @@ GrpcRequestHandler::CreateCollection(::grpc::ServerContext* context, const ::mil } } - Status status = req_handler_.CreateCollection(GetContext(context), request->collection_name(), field_types, - field_index_params, field_params, json_params); + Status status = req_handler_.CreateCollection(GetContext(context), request->collection_name(), fields, json_params); LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); SET_RESPONSE(response, status, context) @@ -981,20 +977,25 @@ GrpcRequestHandler::DescribeCollection(::grpc::ServerContext* context, const ::m } response->set_collection_name(request->collection_name()); - auto field_it = collection_schema.field_types_.begin(); - for (; field_it != collection_schema.field_types_.end(); field_it++) { + for (auto& field_kv : collection_schema.fields_) { auto field = response->add_fields(); - field->set_name(field_it->first); - field->set_type((milvus::grpc::DataType)field_it->second); - for (auto& json_param : collection_schema.index_params_.at(field_it->first).items()) { - auto grpc_index_param = field->add_index_params(); - grpc_index_param->set_key(json_param.key()); - grpc_index_param->set_value(json_param.value()); - } + auto& field_name = field_kv.first; + auto& field_schema = field_kv.second; + + field->set_name(field_name); + field->set_type((milvus::grpc::DataType)field_schema.field_type_); + auto grpc_field_param = field->add_extra_params(); grpc_field_param->set_key(EXTRA_PARAM_KEY); - grpc_field_param->set_value(collection_schema.field_params_.at(field_it->first).dump()); + grpc_field_param->set_value(field_schema.field_params_.dump()); + + for (auto& item : field_schema.index_params_.items()) { + auto grpc_index_param = field->add_index_params(); + grpc_index_param->set_key(item.key()); + grpc_index_param->set_value(item.value()); + } } + auto grpc_extra_param = response->add_extra_params(); grpc_extra_param->set_key(EXTRA_PARAM_KEY); grpc_extra_param->set_value(collection_schema.extra_params_.dump()); @@ -1699,8 +1700,6 @@ GrpcRequestHandler::Search(::grpc::ServerContext* context, const ::milvus::grpc: CollectionSchema collection_schema; status = req_handler_.GetCollectionInfo(GetContext(context), request->collection_name(), collection_schema); - field_type_ = collection_schema.field_types_; - auto grpc_entity = response->mutable_entities(); if (!status.ok()) { SET_RESPONSE(response->mutable_status(), status, context); diff --git a/core/src/server/grpc_impl/GrpcRequestHandler.h b/core/src/server/grpc_impl/GrpcRequestHandler.h index 870454dc..565d180f 100644 --- a/core/src/server/grpc_impl/GrpcRequestHandler.h +++ b/core/src/server/grpc_impl/GrpcRequestHandler.h @@ -333,11 +333,8 @@ class GrpcRequestHandler final : public ::milvus::grpc::MilvusService::Service, private: ReqHandler req_handler_; - // std::unordered_map<::grpc::ServerContext*, std::shared_ptr> context_map_; std::unordered_map> context_map_; std::shared_ptr tracer_; - std::unordered_map field_type_; - // std::unordered_map<::grpc::ServerContext*, std::unique_ptr> span_map_; mutable std::mt19937_64 random_num_generator_; mutable std::mutex random_mutex_; diff --git a/core/src/server/web_impl/handler/WebRequestHandler.cpp b/core/src/server/web_impl/handler/WebRequestHandler.cpp index 584c7e11..fed2df34 100644 --- a/core/src/server/web_impl/handler/WebRequestHandler.cpp +++ b/core/src/server/web_impl/handler/WebRequestHandler.cpp @@ -158,7 +158,7 @@ WebRequestHandler::GetCollectionMetaInfo(const std::string& collection_name, nlo STATUS_CHECK(req_handler_.CountEntities(context_ptr_, collection_name, count)); json_out["collection_name"] = schema.collection_name_; - json_out["dimension"] = schema.extra_params_[engine::PARAM_COLLECTION_DIMENSION].get(); + json_out["dimension"] = schema.extra_params_[engine::PARAM_DIMENSION].get(); json_out["index_file_size"] = schema.extra_params_[engine::PARAM_SEGMENT_SIZE].get(); json_out["metric_type"] = schema.extra_params_[engine::PARAM_INDEX_METRIC_TYPE].get(); json_out["index_params"] = schema.extra_params_[engine::PARAM_INDEX_EXTRA_PARAMS].get(); @@ -699,7 +699,6 @@ WebRequestHandler::Search(const std::string& collection_name, const nlohmann::js if (!status.ok()) { return Status{UNEXPECTED_ERROR, "DescribeHybridCollection failed"}; } - field_type_ = collection_schema.field_types_; milvus::json extra_params; if (json.contains("fields")) { @@ -1202,38 +1201,38 @@ WebRequestHandler::CreateHybridCollection(const milvus::server::web::OString& bo std::string collection_name = json_str["collection_name"]; // TODO(yukun): do checking - std::unordered_map field_types; - std::unordered_map field_index_params; - std::unordered_map field_extra_params; + std::unordered_map fields; for (auto& field : json_str["fields"]) { + FieldSchema field_schema; std::string field_name = field["field_name"]; - std::string field_type = field["field_type"]; - auto extra_params = field["extra_params"]; + + field_schema.field_params_ = field["extra_params"]; + + const std::string& field_type = field["field_type"]; if (field_type == "int8") { - field_types.insert(std::make_pair(field_name, engine::meta::DataType::INT8)); + field_schema.field_type_ = engine::FieldType::INT8; } else if (field_type == "int16") { - field_types.insert(std::make_pair(field_name, engine::meta::DataType::INT16)); + field_schema.field_type_ = engine::FieldType::INT16; } else if (field_type == "int32") { - field_types.insert(std::make_pair(field_name, engine::meta::DataType::INT32)); + field_schema.field_type_ = engine::FieldType::INT32; } else if (field_type == "int64") { - field_types.insert(std::make_pair(field_name, engine::meta::DataType::INT64)); + field_schema.field_type_ = engine::FieldType::INT64; } else if (field_type == "float") { - field_types.insert(std::make_pair(field_name, engine::meta::DataType::FLOAT)); + field_schema.field_type_ = engine::FieldType::FLOAT; } else if (field_type == "double") { - field_types.insert(std::make_pair(field_name, engine::meta::DataType::DOUBLE)); + field_schema.field_type_ = engine::FieldType::DOUBLE; } else if (field_type == "vector") { } else { std::string msg = field_name + " has wrong field_type"; RETURN_STATUS_DTO(BODY_PARSE_FAIL, msg.c_str()); } - field_extra_params.insert(std::make_pair(field_name, extra_params.dump())); + fields[field_name] = field_schema; } milvus::json json_params; - auto status = req_handler_.CreateCollection(context_ptr_, collection_name, field_types, field_index_params, - field_extra_params, json_params); + auto status = req_handler_.CreateCollection(context_ptr_, collection_name, fields, json_params); ASSIGN_RETURN_STATUS_DTO(status) } -- GitLab