diff --git a/core/src/codecs/default/DefaultAttrsFormat.cpp b/core/src/codecs/default/DefaultAttrsFormat.cpp index 6e3abbfdbae431998d93b871c4afb0a684262c9d..e4493cfb2aca37f7a6102aaab86d3d5e11804ba8 100644 --- a/core/src/codecs/default/DefaultAttrsFormat.cpp +++ b/core/src/codecs/default/DefaultAttrsFormat.cpp @@ -18,6 +18,7 @@ #include "codecs/default/DefaultAttrsFormat.h" #include +#include #include #include #include @@ -34,7 +35,9 @@ namespace codec { void DefaultAttrsFormat::read_attrs_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, off_t offset, size_t num, std::vector& raw_attrs, size_t& nbytes) { - if (!fs_ptr->reader_ptr_->open(file_path.c_str())) { + auto open_res = fs_ptr->reader_ptr_->open(file_path.c_str()); + fiu_do_on("read_attrs_internal_open_file_fail", open_res = false); + if (!open_res) { std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno); LOG_ENGINE_ERROR_ << err_msg; throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); @@ -56,7 +59,9 @@ DefaultAttrsFormat::read_attrs_internal(const storage::FSHandlerPtr& fs_ptr, con void DefaultAttrsFormat::read_uids_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, std::vector& uids) { - if (!fs_ptr->reader_ptr_->open(file_path.c_str())) { + auto open_res = fs_ptr->reader_ptr_->open(file_path.c_str()); + fiu_do_on("read_uids_internal_open_file_fail", open_res = false); + if (!open_res) { std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno); LOG_ENGINE_ERROR_ << err_msg; throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); @@ -76,7 +81,9 @@ DefaultAttrsFormat::read(const milvus::storage::FSHandlerPtr& fs_ptr, milvus::se const std::lock_guard lock(mutex_); std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); - if (!boost::filesystem::is_directory(dir_path)) { + auto is_directory = boost::filesystem::is_directory(dir_path); + fiu_do_on("read_id_directory_false", is_directory = false); + if (!is_directory) { std::string err_msg = "Directory: " + dir_path + "does not exist"; LOG_ENGINE_ERROR_ << err_msg; throw Exception(SERVER_INVALID_ARGUMENT, err_msg); @@ -218,7 +225,9 @@ DefaultAttrsFormat::read_uids(const milvus::storage::FSHandlerPtr& fs_ptr, std:: const std::lock_guard lock(mutex_); std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); - if (!boost::filesystem::is_directory(dir_path)) { + auto is_directory = boost::filesystem::is_directory(dir_path); + fiu_do_on("is_directory_false", is_directory = false); + if (!is_directory) { std::string err_msg = "Directory: " + dir_path + "does not exist"; LOG_ENGINE_ERROR_ << err_msg; throw Exception(SERVER_INVALID_ARGUMENT, err_msg); diff --git a/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp b/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp index 3222eac631cb6915c2f02daea205f8984bd638b8..fb2eb9e5356ff28e748ae6a679949ff19fb4d7e9 100644 --- a/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp +++ b/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp @@ -17,6 +17,7 @@ #include "codecs/default/DefaultIdBloomFilterFormat.h" +#include #include #include @@ -37,6 +38,7 @@ DefaultIdBloomFilterFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::I const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_; scaling_bloom_t* bloom_filter = new_scaling_bloom_from_file(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str()); + fiu_do_on("bloom_filter_nullptr", bloom_filter = nullptr); if (bloom_filter == nullptr) { std::string err_msg = "Failed to read bloom filter from file: " + bloom_filter_file_path + ". " + std::strerror(errno); diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 2405ceedf52b7c0bab1d76b54e6e912f0d318e55..c2e36a24f8e693642b0f1047c6ef443d8b4bee62 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -168,6 +168,7 @@ DBImpl::Start() { } // background metric thread + fiu_do_on("options_metric_enable", options_.metric_enable_ = true); if (options_.metric_enable_) { bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this); } @@ -1042,6 +1043,7 @@ DBImpl::Flush() { LOG_ENGINE_DEBUG_ << "Begin flush all collections"; Status status; + fiu_do_on("options_wal_enable_false", options_.wal_enable_ = false); if (options_.wal_enable_) { LOG_ENGINE_DEBUG_ << "WAL flush"; auto lsn = wal_mgr_->Flush(); @@ -1472,7 +1474,10 @@ DBImpl::GetVectorsByIdHelper(const IDNumbers& id_array, std::vector& merge_collection_ids, bool f // LOG_ENGINE_DEBUG_ << "End StartMergeTask"; } -Status -DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& files_holder) { - // const std::lock_guard lock(flush_merge_compact_mutex_); - - LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id; - - // step 1: create table file - meta::SegmentSchema table_file; - table_file.collection_id_ = collection_id; - table_file.file_type_ = meta::SegmentSchema::NEW_MERGE; - Status status = meta_ptr_->CreateHybridCollectionFile(table_file); - - if (!status.ok()) { - LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString(); - return status; - } - - // step 2: merge files - /* - ExecutionEnginePtr index = - EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_, - (MetricType)table_file.metric_type_, table_file.nlist_); -*/ - meta::SegmentsSchema updated; - - std::string new_segment_dir; - utils::GetParentPath(table_file.location_, new_segment_dir); - auto segment_writer_ptr = std::make_shared(new_segment_dir); - - // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal - milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles(); - for (auto& file : files) { - server::CollectMergeFilesMetrics metrics; - std::string segment_dir_to_merge; - utils::GetParentPath(file.location_, segment_dir_to_merge); - segment_writer_ptr->Merge(segment_dir_to_merge, table_file.file_id_); - - files_holder.UnmarkFile(file); - - auto file_schema = file; - file_schema.file_type_ = meta::SegmentSchema::TO_DELETE; - updated.push_back(file_schema); - int64_t size = segment_writer_ptr->Size(); - if (size >= file_schema.index_file_size_) { - break; - } - } - - // step 3: serialize to disk - try { - status = segment_writer_ptr->Serialize(); - fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception()); - fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, "")); - } catch (std::exception& ex) { - std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what()); - LOG_ENGINE_ERROR_ << msg; - status = Status(DB_ERROR, msg); - } - - if (!status.ok()) { - LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message(); - - // if failed to serialize merge file to disk - // typical error: out of disk space, out of memory or permission denied - table_file.file_type_ = meta::SegmentSchema::TO_DELETE; - status = meta_ptr_->UpdateCollectionFile(table_file); - LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; - - return status; - } - - // step 4: update table files state - // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size - // else set file type to RAW, no need to build index - if (!utils::IsRawIndexType(table_file.engine_type_)) { - table_file.file_type_ = (segment_writer_ptr->Size() >= (size_t)(table_file.index_file_size_)) - ? meta::SegmentSchema::TO_INDEX - : meta::SegmentSchema::RAW; - } else { - table_file.file_type_ = meta::SegmentSchema::RAW; - } - table_file.file_size_ = segment_writer_ptr->Size(); - table_file.row_count_ = segment_writer_ptr->VectorCount(); - updated.push_back(table_file); - status = meta_ptr_->UpdateCollectionFiles(updated); - LOG_ENGINE_DEBUG_ << "New merged segment " << table_file.segment_id_ << " of size " << segment_writer_ptr->Size() - << " bytes"; - - if (options_.insert_cache_immediately_) { - segment_writer_ptr->Cache(); - } - - return status; -} +// Status +// DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& files_holder) { +// // const std::lock_guard lock(flush_merge_compact_mutex_); +// +// LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id; +// +// // step 1: create table file +// meta::SegmentSchema table_file; +// table_file.collection_id_ = collection_id; +// table_file.file_type_ = meta::SegmentSchema::NEW_MERGE; +// Status status = meta_ptr_->CreateHybridCollectionFile(table_file); +// +// if (!status.ok()) { +// LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString(); +// return status; +// } +// +// // step 2: merge files +// /* +// ExecutionEnginePtr index = +// EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_, +// (MetricType)table_file.metric_type_, table_file.nlist_); +//*/ +// meta::SegmentsSchema updated; +// +// std::string new_segment_dir; +// utils::GetParentPath(table_file.location_, new_segment_dir); +// auto segment_writer_ptr = std::make_shared(new_segment_dir); +// +// // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal +// milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles(); +// for (auto& file : files) { +// server::CollectMergeFilesMetrics metrics; +// std::string segment_dir_to_merge; +// utils::GetParentPath(file.location_, segment_dir_to_merge); +// segment_writer_ptr->Merge(segment_dir_to_merge, table_file.file_id_); +// +// files_holder.UnmarkFile(file); +// +// auto file_schema = file; +// file_schema.file_type_ = meta::SegmentSchema::TO_DELETE; +// updated.push_back(file_schema); +// int64_t size = segment_writer_ptr->Size(); +// if (size >= file_schema.index_file_size_) { +// break; +// } +// } +// +// // step 3: serialize to disk +// try { +// status = segment_writer_ptr->Serialize(); +// fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception()); +// fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, "")); +// } catch (std::exception& ex) { +// std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what()); +// LOG_ENGINE_ERROR_ << msg; +// status = Status(DB_ERROR, msg); +// } +// +// if (!status.ok()) { +// LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << +// status.message(); +// +// // if failed to serialize merge file to disk +// // typical error: out of disk space, out of memory or permission denied +// table_file.file_type_ = meta::SegmentSchema::TO_DELETE; +// status = meta_ptr_->UpdateCollectionFile(table_file); +// LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; +// +// return status; +// } +// +// // step 4: update table files state +// // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size +// // else set file type to RAW, no need to build index +// if (!utils::IsRawIndexType(table_file.engine_type_)) { +// table_file.file_type_ = (segment_writer_ptr->Size() >= (size_t)(table_file.index_file_size_)) +// ? meta::SegmentSchema::TO_INDEX +// : meta::SegmentSchema::RAW; +// } else { +// table_file.file_type_ = meta::SegmentSchema::RAW; +// } +// table_file.file_size_ = segment_writer_ptr->Size(); +// table_file.row_count_ = segment_writer_ptr->VectorCount(); +// updated.push_back(table_file); +// status = meta_ptr_->UpdateCollectionFiles(updated); +// LOG_ENGINE_DEBUG_ << "New merged segment " << table_file.segment_id_ << " of size " << segment_writer_ptr->Size() +// << " bytes"; +// +// if (options_.insert_cache_immediately_) { +// segment_writer_ptr->Cache(); +// } +// +// return status; +//} void DBImpl::BackgroundMerge(std::set collection_ids, bool force_merge_all) { diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index c48853cadbfe707b2c9a638b10fd3f4bae87d51b..7100d4409e03902e24b728883261e8a2685ec419 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -248,8 +248,8 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi void BackgroundMerge(std::set collection_ids, bool force_merge_all); - Status - MergeHybridFiles(const std::string& table_id, meta::FilesHolder& files_holder); + // Status + // MergeHybridFiles(const std::string& table_id, meta::FilesHolder& files_holder); void StartBuildIndexTask(); diff --git a/core/src/db/insert/MemManagerImpl.cpp b/core/src/db/insert/MemManagerImpl.cpp index 9de533b1186423bbf3c7cff4ca7b2d78cecbadb1..00bf712436f7dcb403b22b3682858d6b0a0e9bf8 100644 --- a/core/src/db/insert/MemManagerImpl.cpp +++ b/core/src/db/insert/MemManagerImpl.cpp @@ -11,6 +11,7 @@ #include "db/insert/MemManagerImpl.h" +#include #include #include "VectorSource.h" @@ -36,9 +37,9 @@ MemManagerImpl::InsertVectors(const std::string& collection_id, int64_t length, const float* vectors, uint64_t lsn, std::set& flushed_tables) { flushed_tables.clear(); if (GetCurrentMem() > options_.insert_buffer_size_) { - LOG_ENGINE_DEBUG_ << "Insert buffer size exceeds limit. Performing force flush"; // TODO(zhiru): Don't apply delete here in order to avoid possible concurrency issues with Merge auto status = Flush(flushed_tables, false); + fiu_do_on("MemManagerImpl::InsertVectors_flush_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { return status; } diff --git a/core/src/db/meta/Meta.h b/core/src/db/meta/Meta.h index a715f31487b37756e631970b354158cc434d2e5c..7261341a6a12bcf80bcec7a2a25f4f872893fffa 100644 --- a/core/src/db/meta/Meta.h +++ b/core/src/db/meta/Meta.h @@ -174,9 +174,6 @@ class Meta { virtual Status DescribeHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) = 0; - - virtual Status - CreateHybridCollectionFile(SegmentSchema& file_schema) = 0; }; // MetaData using MetaPtr = std::shared_ptr; diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index 3e1f3de7b54860c08c6e97727125cc7189244798..a5b296dce10c0fd54ffae07915029c0ba624d0cd 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -2186,8 +2186,8 @@ MySQLMetaImpl::FilesByTypeEx(const std::vector& collecti mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_); bool is_null_connection = (connectionPtr == nullptr); - fiu_do_on("MySQLMetaImpl.FilesByType.null_connection", is_null_connection = true); - fiu_do_on("MySQLMetaImpl.FilesByType.throw_exception", throw std::exception();); + fiu_do_on("MySQLMetaImpl.FilesByTypeEx.null_connection", is_null_connection = true); + fiu_do_on("MySQLMetaImpl.FilesByTypeEx.throw_exception", throw std::exception();); if (is_null_connection) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } @@ -3204,11 +3204,6 @@ MySQLMetaImpl::DescribeHybridCollection(CollectionSchema& collection_schema, hyb return Status::OK(); } -Status -MySQLMetaImpl::CreateHybridCollectionFile(milvus::engine::meta::SegmentSchema& file_schema) { - return Status::OK(); -} - } // namespace meta } // namespace engine } // namespace milvus diff --git a/core/src/db/meta/MySQLMetaImpl.h b/core/src/db/meta/MySQLMetaImpl.h index dce46e637c7b3f3f3399af1b1377ea7a64a9fb6f..44251de1193f95c7d1ba40c171344b62675c5c2a 100644 --- a/core/src/db/meta/MySQLMetaImpl.h +++ b/core/src/db/meta/MySQLMetaImpl.h @@ -161,9 +161,6 @@ class MySQLMetaImpl : public Meta { Status DescribeHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) override; - Status - CreateHybridCollectionFile(SegmentSchema& file_schema) override; - private: Status NextFileId(std::string& file_id); diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index 87cf7a65562dc2aa99795de10f91c7a0b9836686..d53162a901441972a3b46bd89b45e861c1d1db47 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -49,7 +49,7 @@ namespace { constexpr uint64_t SQL_BATCH_SIZE = 50; -template +template void DistributeBatch(const T& id_array, std::vector>& id_groups) { std::vector temp_group; @@ -86,8 +86,7 @@ StoragePrototype(const std::string& path) { return make_storage( path, make_table(META_ENVIRONMENT, make_column("global_lsn", &EnvironmentSchema::global_lsn_, default_value(0))), - make_table(META_TABLES, - make_column("id", &CollectionSchema::id_, primary_key()), + make_table(META_TABLES, make_column("id", &CollectionSchema::id_, primary_key()), make_column("table_id", &CollectionSchema::collection_id_, unique()), make_column("state", &CollectionSchema::state_), make_column("dimension", &CollectionSchema::dimension_), @@ -210,9 +209,9 @@ SqliteMetaImpl::CreateCollection(CollectionSchema& collection_schema) { NextCollectionId(collection_schema.collection_id_); } else { fiu_do_on("SqliteMetaImpl.CreateCollection.throw_exception", throw std::exception()); - auto collection = ConnectorPtr->select(columns(&CollectionSchema::state_), - where(c(&CollectionSchema::collection_id_) - == collection_schema.collection_id_)); + auto collection = + ConnectorPtr->select(columns(&CollectionSchema::state_), + where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_)); if (collection.size() == 1) { if (CollectionSchema::TO_DELETE == std::get<0>(collection[0])) { return Status(DB_ERROR, @@ -252,19 +251,11 @@ SqliteMetaImpl::DescribeCollection(CollectionSchema& collection_schema) { std::lock_guard meta_lock(meta_mutex_); fiu_do_on("SqliteMetaImpl.DescribeCollection.throw_exception", throw std::exception()); auto groups = ConnectorPtr->select( - columns(&CollectionSchema::id_, - &CollectionSchema::state_, - &CollectionSchema::dimension_, - &CollectionSchema::created_on_, - &CollectionSchema::flag_, - &CollectionSchema::index_file_size_, - &CollectionSchema::engine_type_, - &CollectionSchema::index_params_, - &CollectionSchema::metric_type_, - &CollectionSchema::owner_collection_, - &CollectionSchema::partition_tag_, - &CollectionSchema::version_, - &CollectionSchema::flush_lsn_), + columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, + &CollectionSchema::created_on_, &CollectionSchema::flag_, &CollectionSchema::index_file_size_, + &CollectionSchema::engine_type_, &CollectionSchema::index_params_, &CollectionSchema::metric_type_, + &CollectionSchema::owner_collection_, &CollectionSchema::partition_tag_, + &CollectionSchema::version_, &CollectionSchema::flush_lsn_), where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_ and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); @@ -307,14 +298,13 @@ SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not decltype(ConnectorPtr->select(select_columns)) selected; if (is_root) { selected = ConnectorPtr->select(select_columns, - where(c(&CollectionSchema::collection_id_) == collection_id - and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE - and c(&CollectionSchema::owner_collection_) == "")); + where(c(&CollectionSchema::collection_id_) == collection_id and + c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE and + c(&CollectionSchema::owner_collection_) == "")); } else { selected = ConnectorPtr->select(select_columns, - where(c(&CollectionSchema::collection_id_) == collection_id - and c(&CollectionSchema::state_) - != (int)CollectionSchema::TO_DELETE)); + where(c(&CollectionSchema::collection_id_) == collection_id and + c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); } if (selected.size() == 1) { @@ -337,25 +327,18 @@ SqliteMetaImpl::AllCollections(std::vector& collection_schema_ // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); - auto select_columns = columns(&CollectionSchema::id_, - &CollectionSchema::collection_id_, - &CollectionSchema::dimension_, - &CollectionSchema::created_on_, - &CollectionSchema::flag_, - &CollectionSchema::index_file_size_, - &CollectionSchema::engine_type_, - &CollectionSchema::index_params_, - &CollectionSchema::metric_type_, - &CollectionSchema::owner_collection_, - &CollectionSchema::partition_tag_, - &CollectionSchema::version_, - &CollectionSchema::flush_lsn_); + auto select_columns = + columns(&CollectionSchema::id_, &CollectionSchema::collection_id_, &CollectionSchema::dimension_, + &CollectionSchema::created_on_, &CollectionSchema::flag_, &CollectionSchema::index_file_size_, + &CollectionSchema::engine_type_, &CollectionSchema::index_params_, &CollectionSchema::metric_type_, + &CollectionSchema::owner_collection_, &CollectionSchema::partition_tag_, + &CollectionSchema::version_, &CollectionSchema::flush_lsn_); decltype(ConnectorPtr->select(select_columns)) selected; if (is_root) { selected = ConnectorPtr->select(select_columns, - where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE - and c(&CollectionSchema::owner_collection_) == "")); + where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE and + c(&CollectionSchema::owner_collection_) == "")); } else { selected = ConnectorPtr->select(select_columns, where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); @@ -403,10 +386,9 @@ SqliteMetaImpl::DropCollections(const std::vector& collection_id_ar for (auto group : id_groups) { // soft delete collection - ConnectorPtr->update_all( - set(c(&CollectionSchema::state_) = (int)CollectionSchema::TO_DELETE), - where(in(&CollectionSchema::collection_id_, group) - and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + ConnectorPtr->update_all(set(c(&CollectionSchema::state_) = (int)CollectionSchema::TO_DELETE), + where(in(&CollectionSchema::collection_id_, group) and + c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); } } @@ -509,23 +491,18 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std:: return status; } - auto select_columns = columns(&SegmentSchema::id_, - &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, - &SegmentSchema::file_type_, - &SegmentSchema::file_size_, - &SegmentSchema::row_count_, - &SegmentSchema::date_, - &SegmentSchema::engine_type_, - &SegmentSchema::created_on_); + auto select_columns = + columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, + &SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_, + &SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_); decltype(ConnectorPtr->select(select_columns)) selected; { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); - selected = ConnectorPtr->select(select_columns, - where(c(&SegmentSchema::collection_id_) == collection_id - and in(&SegmentSchema::id_, ids) and - c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE)); + selected = ConnectorPtr->select( + select_columns, + where(c(&SegmentSchema::collection_id_) == collection_id and in(&SegmentSchema::id_, ids) and + c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE)); } Status result; @@ -559,8 +536,7 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std:: } Status -SqliteMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id, - FilesHolder& files_holder) { +SqliteMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id, FilesHolder& files_holder) { try { auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, @@ -661,9 +637,8 @@ SqliteMetaImpl::GetCollectionFlushLSN(const std::string& collection_id, uint64_t // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); - auto selected = - ConnectorPtr->select(columns(&CollectionSchema::flush_lsn_), - where(c(&CollectionSchema::collection_id_) == collection_id)); + auto selected = ConnectorPtr->select(columns(&CollectionSchema::flush_lsn_), + where(c(&CollectionSchema::collection_id_) == collection_id)); if (selected.size() > 0) { flush_lsn = std::get<0>(selected[0]); @@ -688,9 +663,9 @@ SqliteMetaImpl::UpdateCollectionFile(SegmentSchema& file_schema) { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); - auto collections = ConnectorPtr->select(columns(&CollectionSchema::state_), - where(c(&CollectionSchema::collection_id_) - == file_schema.collection_id_)); + auto collections = + ConnectorPtr->select(columns(&CollectionSchema::state_), + where(c(&CollectionSchema::collection_id_) == file_schema.collection_id_)); // if the collection has been deleted, just mark the collection file as TO_DELETE // clean thread will delete the file later @@ -702,9 +677,8 @@ SqliteMetaImpl::UpdateCollectionFile(SegmentSchema& file_schema) { LOG_ENGINE_DEBUG_ << "Update single collection file, file id = " << file_schema.file_id_; } catch (std::exception& e) { - std::string msg = - "Exception update collection file: collection_id = " + file_schema.collection_id_ + " file_id = " - + file_schema.file_id_; + std::string msg = "Exception update collection file: collection_id = " + file_schema.collection_id_ + + " file_id = " + file_schema.file_id_; return HandleException(msg, e.what()); } return Status::OK(); @@ -724,11 +698,10 @@ SqliteMetaImpl::UpdateCollectionFiles(SegmentsSchema& files) { if (has_collections.find(file.collection_id_) != has_collections.end()) { continue; } - auto collections = ConnectorPtr->select(columns(&CollectionSchema::id_), - where( - c(&CollectionSchema::collection_id_) == file.collection_id_ and - c(&CollectionSchema::state_) - != (int)CollectionSchema::TO_DELETE)); + auto collections = + ConnectorPtr->select(columns(&CollectionSchema::id_), + where(c(&CollectionSchema::collection_id_) == file.collection_id_ and + c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); if (collections.size() >= 1) { has_collections[file.collection_id_] = true; } else { @@ -791,18 +764,12 @@ SqliteMetaImpl::UpdateCollectionIndex(const std::string& collection_id, const Co auto collections = ConnectorPtr->select( - columns(&CollectionSchema::id_, - &CollectionSchema::state_, - &CollectionSchema::dimension_, - &CollectionSchema::created_on_, - &CollectionSchema::flag_, - &CollectionSchema::index_file_size_, - &CollectionSchema::owner_collection_, - &CollectionSchema::partition_tag_, - &CollectionSchema::version_, - &CollectionSchema::flush_lsn_), - where(c(&CollectionSchema::collection_id_) == collection_id - and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, + &CollectionSchema::created_on_, &CollectionSchema::flag_, &CollectionSchema::index_file_size_, + &CollectionSchema::owner_collection_, &CollectionSchema::partition_tag_, + &CollectionSchema::version_, &CollectionSchema::flush_lsn_), + where(c(&CollectionSchema::collection_id_) == collection_id and + c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); if (collections.size() > 0) { meta::CollectionSchema collection_schema; @@ -871,8 +838,8 @@ SqliteMetaImpl::DescribeCollectionIndex(const std::string& collection_id, Collec auto groups = ConnectorPtr->select( columns(&CollectionSchema::engine_type_, &CollectionSchema::index_params_, &CollectionSchema::metric_type_), - where(c(&CollectionSchema::collection_id_) == collection_id - and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + where(c(&CollectionSchema::collection_id_) == collection_id and + c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); if (groups.size() == 1) { index.engine_type_ = std::get<0>(groups[0]); @@ -933,10 +900,8 @@ SqliteMetaImpl::DropCollectionIndex(const std::string& collection_id) { } Status -SqliteMetaImpl::CreatePartition(const std::string& collection_id, - const std::string& partition_name, - const std::string& tag, - uint64_t lsn) { +SqliteMetaImpl::CreatePartition(const std::string& collection_id, const std::string& partition_name, + const std::string& tag, uint64_t lsn) { USING_SQLITE_WARNING server::MetricCollector metric; @@ -996,11 +961,10 @@ SqliteMetaImpl::HasPartition(const std::string& collection_id, const std::string std::string valid_tag = tag; server::StringHelpFunctions::TrimStringBlank(valid_tag); - auto name = ConnectorPtr->select( - columns(&CollectionSchema::collection_id_), - where(c(&CollectionSchema::owner_collection_) == collection_id - and c(&CollectionSchema::partition_tag_) == valid_tag and - c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + auto name = ConnectorPtr->select(columns(&CollectionSchema::collection_id_), + where(c(&CollectionSchema::owner_collection_) == collection_id and + c(&CollectionSchema::partition_tag_) == valid_tag and + c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); if (name.size() > 0) { has_or_not = true; } else { @@ -1026,18 +990,10 @@ SqliteMetaImpl::ShowPartitions(const std::string& collection_id, fiu_do_on("SqliteMetaImpl.ShowPartitions.throw_exception", throw std::exception()); auto partitions = ConnectorPtr->select( - columns(&CollectionSchema::id_, - &CollectionSchema::state_, - &CollectionSchema::dimension_, - &CollectionSchema::created_on_, - &CollectionSchema::flag_, - &CollectionSchema::index_file_size_, - &CollectionSchema::engine_type_, - &CollectionSchema::index_params_, - &CollectionSchema::metric_type_, - &CollectionSchema::partition_tag_, - &CollectionSchema::version_, - &CollectionSchema::collection_id_, + columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, + &CollectionSchema::created_on_, &CollectionSchema::flag_, &CollectionSchema::index_file_size_, + &CollectionSchema::engine_type_, &CollectionSchema::index_params_, &CollectionSchema::metric_type_, + &CollectionSchema::partition_tag_, &CollectionSchema::version_, &CollectionSchema::collection_id_, &CollectionSchema::flush_lsn_), where(c(&CollectionSchema::owner_collection_) == collection_id and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); @@ -1068,8 +1024,7 @@ SqliteMetaImpl::ShowPartitions(const std::string& collection_id, } Status -SqliteMetaImpl::GetPartitionName(const std::string& collection_id, - const std::string& tag, +SqliteMetaImpl::GetPartitionName(const std::string& collection_id, const std::string& tag, std::string& partition_name) { try { server::MetricCollector metric; @@ -1080,11 +1035,10 @@ SqliteMetaImpl::GetPartitionName(const std::string& collection_id, std::string valid_tag = tag; server::StringHelpFunctions::TrimStringBlank(valid_tag); - auto name = ConnectorPtr->select( - columns(&CollectionSchema::collection_id_), - where(c(&CollectionSchema::owner_collection_) == collection_id - and c(&CollectionSchema::partition_tag_) == valid_tag and - c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + auto name = ConnectorPtr->select(columns(&CollectionSchema::collection_id_), + where(c(&CollectionSchema::owner_collection_) == collection_id and + c(&CollectionSchema::partition_tag_) == valid_tag and + c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); if (name.size() > 0) { partition_name = std::get<0>(name[0]); } else { @@ -1171,8 +1125,7 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& fil } Status -SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection, - const std::set& partition_id_array, +SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::set& partition_id_array, FilesHolder& files_holder) { try { server::MetricCollector metric; @@ -1280,11 +1233,10 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& file } // get files to merge - auto select_columns = - columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, - &SegmentSchema::created_on_, &SegmentSchema::updated_time_); + auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, + &SegmentSchema::created_on_, &SegmentSchema::updated_time_); decltype(ConnectorPtr->select(select_columns)) selected; { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here @@ -1348,11 +1300,10 @@ SqliteMetaImpl::FilesToIndex(FilesHolder& files_holder) { server::MetricCollector metric; - auto select_columns = - columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, - &SegmentSchema::created_on_, &SegmentSchema::updated_time_); + auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, + &SegmentSchema::created_on_, &SegmentSchema::updated_time_); decltype(ConnectorPtr->select(select_columns)) selected; { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here @@ -1413,8 +1364,7 @@ SqliteMetaImpl::FilesToIndex(FilesHolder& files_holder) { } Status -SqliteMetaImpl::FilesByType(const std::string& collection_id, - const std::vector& file_types, +SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector& file_types, FilesHolder& files_holder) { if (file_types.empty()) { return Status(DB_ERROR, "file types array is empty"); @@ -1433,18 +1383,16 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, } // get files by type - auto select_columns = - columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, - &SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_, - &SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_, - &SegmentSchema::updated_time_); + auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, + &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, + &SegmentSchema::created_on_, &SegmentSchema::updated_time_); decltype(ConnectorPtr->select(select_columns)) selected; { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); - selected = ConnectorPtr->select(select_columns, - where(in(&SegmentSchema::file_type_, file_types) - and c(&SegmentSchema::collection_id_) == collection_id)); + selected = ConnectorPtr->select(select_columns, where(in(&SegmentSchema::file_type_, file_types) and + c(&SegmentSchema::collection_id_) == collection_id)); } if (selected.size() >= 1) { @@ -1470,21 +1418,29 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, file_schema.metric_type_ = collection_schema.metric_type_; switch (file_schema.file_type_) { - case (int)SegmentSchema::RAW:++raw_count; + case (int)SegmentSchema::RAW: + ++raw_count; + break; + case (int)SegmentSchema::NEW: + ++new_count; break; - case (int)SegmentSchema::NEW:++new_count; + case (int)SegmentSchema::NEW_MERGE: + ++new_merge_count; break; - case (int)SegmentSchema::NEW_MERGE:++new_merge_count; + case (int)SegmentSchema::NEW_INDEX: + ++new_index_count; break; - case (int)SegmentSchema::NEW_INDEX:++new_index_count; + case (int)SegmentSchema::TO_INDEX: + ++to_index_count; break; - case (int)SegmentSchema::TO_INDEX:++to_index_count; + case (int)SegmentSchema::INDEX: + ++index_count; break; - case (int)SegmentSchema::INDEX:++index_count; + case (int)SegmentSchema::BACKUP: + ++backup_count; break; - case (int)SegmentSchema::BACKUP:++backup_count; + default: break; - default:break; } auto status = utils::GetCollectionFilePath(options_, file_schema); @@ -1498,9 +1454,11 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, std::string msg = "Get collection files by type."; for (int file_type : file_types) { switch (file_type) { - case (int)SegmentSchema::RAW:msg = msg + " raw files:" + std::to_string(raw_count); + case (int)SegmentSchema::RAW: + msg = msg + " raw files:" + std::to_string(raw_count); break; - case (int)SegmentSchema::NEW:msg = msg + " new files:" + std::to_string(new_count); + case (int)SegmentSchema::NEW: + msg = msg + " new files:" + std::to_string(new_count); break; case (int)SegmentSchema::NEW_MERGE: msg = msg + " new_merge files:" + std::to_string(new_merge_count); @@ -1508,13 +1466,17 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, case (int)SegmentSchema::NEW_INDEX: msg = msg + " new_index files:" + std::to_string(new_index_count); break; - case (int)SegmentSchema::TO_INDEX:msg = msg + " to_index files:" + std::to_string(to_index_count); + case (int)SegmentSchema::TO_INDEX: + msg = msg + " to_index files:" + std::to_string(to_index_count); + break; + case (int)SegmentSchema::INDEX: + msg = msg + " index files:" + std::to_string(index_count); break; - case (int)SegmentSchema::INDEX:msg = msg + " index files:" + std::to_string(index_count); + case (int)SegmentSchema::BACKUP: + msg = msg + " backup files:" + std::to_string(backup_count); break; - case (int)SegmentSchema::BACKUP:msg = msg + " backup files:" + std::to_string(backup_count); + default: break; - default:break; } } LOG_ENGINE_DEBUG_ << msg; @@ -1528,8 +1490,7 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, Status SqliteMetaImpl::FilesByTypeEx(const std::vector& collections, - const std::vector& file_types, - FilesHolder& files_holder) { + const std::vector& file_types, FilesHolder& files_holder) { if (file_types.empty()) { return Status(DB_ERROR, "file types array is empty"); } @@ -1537,7 +1498,7 @@ SqliteMetaImpl::FilesByTypeEx(const std::vector& collect Status ret = Status::OK(); try { - fiu_do_on("SqliteMetaImpl.FilesByType.throw_exception", throw std::exception()); + fiu_do_on("SqliteMetaImpl.FilesByTypeEx.throw_exception", throw std::exception()); // distribute id array to batches const uint64_t batch_size = 50; @@ -1563,17 +1524,10 @@ SqliteMetaImpl::FilesByTypeEx(const std::vector& collect int to_index_count = 0, index_count = 0, backup_count = 0; for (auto group : id_groups) { auto select_columns = - columns(&SegmentSchema::id_, - &SegmentSchema::collection_id_, - &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, - &SegmentSchema::file_type_, - &SegmentSchema::file_size_, - &SegmentSchema::row_count_, - &SegmentSchema::date_, - &SegmentSchema::engine_type_, - &SegmentSchema::created_on_, - &SegmentSchema::updated_time_); + columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, + &SegmentSchema::created_on_, &SegmentSchema::updated_time_); decltype(ConnectorPtr->select(select_columns)) selected; auto match_collectionid = in(&SegmentSchema::collection_id_, group); @@ -1609,21 +1563,29 @@ SqliteMetaImpl::FilesByTypeEx(const std::vector& collect file_schema.metric_type_ = collection_schema.metric_type_; switch (file_schema.file_type_) { - case (int)SegmentSchema::RAW:++raw_count; + case (int)SegmentSchema::RAW: + ++raw_count; + break; + case (int)SegmentSchema::NEW: + ++new_count; break; - case (int)SegmentSchema::NEW:++new_count; + case (int)SegmentSchema::NEW_MERGE: + ++new_merge_count; break; - case (int)SegmentSchema::NEW_MERGE:++new_merge_count; + case (int)SegmentSchema::NEW_INDEX: + ++new_index_count; break; - case (int)SegmentSchema::NEW_INDEX:++new_index_count; + case (int)SegmentSchema::TO_INDEX: + ++to_index_count; break; - case (int)SegmentSchema::TO_INDEX:++to_index_count; + case (int)SegmentSchema::INDEX: + ++index_count; break; - case (int)SegmentSchema::INDEX:++index_count; + case (int)SegmentSchema::BACKUP: + ++backup_count; break; - case (int)SegmentSchema::BACKUP:++backup_count; + default: break; - default:break; } auto status = utils::GetCollectionFilePath(options_, file_schema); @@ -1638,9 +1600,11 @@ SqliteMetaImpl::FilesByTypeEx(const std::vector& collect std::string msg = "Get collection files by type."; for (int file_type : file_types) { switch (file_type) { - case (int)SegmentSchema::RAW:msg = msg + " raw files:" + std::to_string(raw_count); + case (int)SegmentSchema::RAW: + msg = msg + " raw files:" + std::to_string(raw_count); break; - case (int)SegmentSchema::NEW:msg = msg + " new files:" + std::to_string(new_count); + case (int)SegmentSchema::NEW: + msg = msg + " new files:" + std::to_string(new_count); break; case (int)SegmentSchema::NEW_MERGE: msg = msg + " new_merge files:" + std::to_string(new_merge_count); @@ -1651,11 +1615,14 @@ SqliteMetaImpl::FilesByTypeEx(const std::vector& collect case (int)SegmentSchema::TO_INDEX: msg = msg + " to_index files:" + std::to_string(to_index_count); break; - case (int)SegmentSchema::INDEX:msg = msg + " index files:" + std::to_string(index_count); + case (int)SegmentSchema::INDEX: + msg = msg + " index files:" + std::to_string(index_count); + break; + case (int)SegmentSchema::BACKUP: + msg = msg + " backup files:" + std::to_string(backup_count); break; - case (int)SegmentSchema::BACKUP:msg = msg + " backup files:" + std::to_string(backup_count); + default: break; - default:break; } } LOG_ENGINE_DEBUG_ << msg; @@ -1676,17 +1643,10 @@ SqliteMetaImpl::FilesByID(const std::vector& ids, FilesHolder& files_hol server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.FilesByID.throw_exception", throw std::exception()); - auto select_columns = columns(&SegmentSchema::id_, - &SegmentSchema::collection_id_, - &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, - &SegmentSchema::file_type_, - &SegmentSchema::file_size_, - &SegmentSchema::row_count_, - &SegmentSchema::date_, - &SegmentSchema::engine_type_, - &SegmentSchema::created_on_, - &SegmentSchema::updated_time_); + auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, + &SegmentSchema::created_on_, &SegmentSchema::updated_time_); // perform query decltype(ConnectorPtr->select(select_columns)) selected; @@ -1917,8 +1877,8 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/ // delete file from disk storage utils::DeleteCollectionFilePath(options_, collection_file); - LOG_ENGINE_DEBUG_ << "Remove file id:" << collection_file.file_id_ << " location:" - << collection_file.location_; + LOG_ENGINE_DEBUG_ << "Remove file id:" << collection_file.file_id_ + << " location:" << collection_file.location_; collection_ids.insert(collection_file.collection_id_); segment_ids.insert(std::make_pair(collection_file.segment_id_, collection_file)); @@ -1948,9 +1908,9 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/ // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); - auto collections = ConnectorPtr->select(columns(&CollectionSchema::id_, &CollectionSchema::collection_id_), - where( - c(&CollectionSchema::state_) == (int)CollectionSchema::TO_DELETE)); + auto collections = + ConnectorPtr->select(columns(&CollectionSchema::id_, &CollectionSchema::collection_id_), + where(c(&CollectionSchema::state_) == (int)CollectionSchema::TO_DELETE)); auto commited = ConnectorPtr->transaction([&]() mutable { for (auto& collection : collections) { @@ -2039,9 +1999,8 @@ SqliteMetaImpl::Count(const std::string& collection_id, uint64_t& result) { { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); - selected = ConnectorPtr->select(select_columns, - where(in(&SegmentSchema::file_type_, file_types) - and c(&SegmentSchema::collection_id_) == collection_id)); + selected = ConnectorPtr->select(select_columns, where(in(&SegmentSchema::file_type_, file_types) and + c(&SegmentSchema::collection_id_) == collection_id)); } CollectionSchema collection_schema; @@ -2251,19 +2210,11 @@ SqliteMetaImpl::DescribeHybridCollection(milvus::engine::meta::CollectionSchema& server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.DescriCollection.throw_exception", throw std::exception()); auto groups = ConnectorPtr->select( - columns(&CollectionSchema::id_, - &CollectionSchema::state_, - &CollectionSchema::dimension_, - &CollectionSchema::created_on_, - &CollectionSchema::flag_, - &CollectionSchema::index_file_size_, - &CollectionSchema::engine_type_, - &CollectionSchema::index_params_, - &CollectionSchema::metric_type_, - &CollectionSchema::owner_collection_, - &CollectionSchema::partition_tag_, - &CollectionSchema::version_, - &CollectionSchema::flush_lsn_), + columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, + &CollectionSchema::created_on_, &CollectionSchema::flag_, &CollectionSchema::index_file_size_, + &CollectionSchema::engine_type_, &CollectionSchema::index_params_, &CollectionSchema::metric_type_, + &CollectionSchema::owner_collection_, &CollectionSchema::partition_tag_, + &CollectionSchema::version_, &CollectionSchema::flush_lsn_), where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_ and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); @@ -2309,57 +2260,6 @@ SqliteMetaImpl::DescribeHybridCollection(milvus::engine::meta::CollectionSchema& return Status::OK(); } -Status -SqliteMetaImpl::CreateHybridCollectionFile(SegmentSchema& file_schema) { - USING_SQLITE_WARNING - if (file_schema.date_ == EmptyDate) { - file_schema.date_ = utils::GetDate(); - } - CollectionSchema collection_schema; - hybrid::FieldsSchema fields_schema; - collection_schema.collection_id_ = file_schema.collection_id_; - auto status = DescribeHybridCollection(collection_schema, fields_schema); - if (!status.ok()) { - return status; - } - - try { - fiu_do_on("SqliteMetaImpl.CreateCollectionFile.throw_exception", throw std::exception()); - server::MetricCollector metric; - - NextFileId(file_schema.file_id_); - if (file_schema.segment_id_.empty()) { - file_schema.segment_id_ = file_schema.file_id_; - } - file_schema.dimension_ = collection_schema.dimension_; - file_schema.file_size_ = 0; - file_schema.row_count_ = 0; - file_schema.created_on_ = utils::GetMicroSecTimeStamp(); - file_schema.updated_time_ = file_schema.created_on_; - file_schema.index_file_size_ = collection_schema.index_file_size_; - file_schema.index_params_ = collection_schema.index_params_; - file_schema.engine_type_ = collection_schema.engine_type_; - file_schema.metric_type_ = collection_schema.metric_type_; - - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - - auto id = ConnectorPtr->insert(file_schema); - file_schema.id_ = id; - - for (auto field_schema : fields_schema.fields_schema_) { - ConnectorPtr->insert(field_schema); - } - - LOG_ENGINE_DEBUG_ << "Successfully create collection file, file id = " << file_schema.file_id_; - return utils::CreateCollectionFilePath(options_, file_schema); - } catch (std::exception& e) { - return HandleException("Encounter exception when create collection file", e.what()); - } - - return Status::OK(); -} - } // namespace meta } // namespace engine } // namespace milvus diff --git a/core/src/db/meta/SqliteMetaImpl.h b/core/src/db/meta/SqliteMetaImpl.h index 222d7298a37a7f0aedf552185f344c3c7bdcc617..5b3c04314a88e273fb73423f9cdf55ec07f4ea80 100644 --- a/core/src/db/meta/SqliteMetaImpl.h +++ b/core/src/db/meta/SqliteMetaImpl.h @@ -163,9 +163,6 @@ class SqliteMetaImpl : public Meta { Status DescribeHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) override; - Status - CreateHybridCollectionFile(SegmentSchema& file_schema) override; - private: Status NextFileId(std::string& file_id); diff --git a/core/src/segment/Attr.cpp b/core/src/segment/Attr.cpp index 3c55c0e165bd40d52e7cf3951b8373c5c3478295..30d4b91a046fc492fe9b21de020953205c4a6035 100644 --- a/core/src/segment/Attr.cpp +++ b/core/src/segment/Attr.cpp @@ -34,23 +34,23 @@ Attr::Attr(const std::vector& data, size_t nbytes, const std::vector& data, size_t nbytes) { - data_.reserve(data_.size() + data.size()); - data_.insert(data_.end(), std::make_move_iterator(data.begin()), std::make_move_iterator(data.end())); - nbytes_ += nbytes; -} - -void -Attr::AddUids(const std::vector& uids) { - uids_.reserve(uids_.size() + uids.size()); - uids_.insert(uids_.end(), std::make_move_iterator(uids.begin()), std::make_move_iterator(uids.end())); -} +// void +// Attr::AddAttr(const std::vector& data, size_t nbytes) { +// data_.reserve(data_.size() + data.size()); +// data_.insert(data_.end(), std::make_move_iterator(data.begin()), std::make_move_iterator(data.end())); +// nbytes_ += nbytes; +//} +// +// void +// Attr::AddUids(const std::vector& uids) { +// uids_.reserve(uids_.size() + uids.size()); +// uids_.insert(uids_.end(), std::make_move_iterator(uids.begin()), std::make_move_iterator(uids.end())); +//} -void -Attr::SetName(const std::string& name) { - name_ = name; -} +// void +// Attr::SetName(const std::string& name) { +// name_ = name; +//} const std::vector& Attr::GetData() const { @@ -87,15 +87,15 @@ Attr::GetCodeLength() const { return uids_.size() == 0 ? 0 : nbytes_ / uids_.size(); } -void -Attr::Erase(int32_t offset) { - auto code_length = GetCodeLength(); - if (code_length != 0) { - auto step = offset * code_length; - data_.erase(data_.begin() + step, data_.begin() + step + code_length); - uids_.erase(uids_.begin() + offset, uids_.begin() + offset + 1); - } -} +// void +// Attr::Erase(int32_t offset) { +// auto code_length = GetCodeLength(); +// if (code_length != 0) { +// auto step = offset * code_length; +// data_.erase(data_.begin() + step, data_.begin() + step + code_length); +// uids_.erase(uids_.begin() + offset, uids_.begin() + offset + 1); +// } +//} void Attr::Erase(std::vector& offsets) { diff --git a/core/src/segment/Attr.h b/core/src/segment/Attr.h index 506e50f33bb88a4dab8ea706e14a1627c326314f..f88a5300ac41263775a4f661b7e9978c15a55cbc 100644 --- a/core/src/segment/Attr.h +++ b/core/src/segment/Attr.h @@ -30,14 +30,14 @@ class Attr { Attr(); - void - AddAttr(const std::vector& data, size_t nbytes); - - void - AddUids(const std::vector& uids); - - void - SetName(const std::string& name); + // void + // AddAttr(const std::vector& data, size_t nbytes); + // + // void + // AddUids(const std::vector& uids); + // + // void + // SetName(const std::string& name); const std::vector& GetData() const; @@ -60,8 +60,8 @@ class Attr { size_t GetCodeLength() const; - void - Erase(int32_t offset); + // void + // Erase(int32_t offset); void Erase(std::vector& offsets); diff --git a/core/src/segment/SegmentWriter.cpp b/core/src/segment/SegmentWriter.cpp index f66a00e3d781bbd88bc0e311993ce7fc3b1414ca..f6bc412eb3d46c340830ad5fe73d51cdaa9f59a5 100644 --- a/core/src/segment/SegmentWriter.cpp +++ b/core/src/segment/SegmentWriter.cpp @@ -69,15 +69,20 @@ SegmentWriter::AddAttrs(const std::string& name, const std::unordered_mapattrs_ptr_->attrs; for (; attr_data_it != attr_data.end(); ++attr_data_it) { - if (attrs.find(attr_data_it->first) != attrs.end()) { - segment_ptr_->attrs_ptr_->attrs.at(attr_data_it->first) - ->AddAttr(attr_data_it->second, attr_nbytes.at(attr_data_it->first)); - segment_ptr_->attrs_ptr_->attrs.at(attr_data_it->first)->AddUids(uids); - } else { - AttrPtr attr = std::make_shared(attr_data_it->second, attr_nbytes.at(attr_data_it->first), uids, - attr_data_it->first); - segment_ptr_->attrs_ptr_->attrs.insert(std::make_pair(attr_data_it->first, attr)); - } + AttrPtr attr = std::make_shared(attr_data_it->second, attr_nbytes.at(attr_data_it->first), uids, + attr_data_it->first); + segment_ptr_->attrs_ptr_->attrs.insert(std::make_pair(attr_data_it->first, attr)); + + // if (attrs.find(attr_data_it->first) != attrs.end()) { + // segment_ptr_->attrs_ptr_->attrs.at(attr_data_it->first) + // ->AddAttr(attr_data_it->second, attr_nbytes.at(attr_data_it->first)); + // segment_ptr_->attrs_ptr_->attrs.at(attr_data_it->first)->AddUids(uids); + // } else { + // AttrPtr attr = std::make_shared(attr_data_it->second, attr_nbytes.at(attr_data_it->first), + // uids, + // attr_data_it->first); + // segment_ptr_->attrs_ptr_->attrs.insert(std::make_pair(attr_data_it->first, attr)); + // } } return Status::OK(); } diff --git a/core/src/server/delivery/hybrid_request/CreateHybridCollectionRequest.cpp b/core/src/server/delivery/hybrid_request/CreateHybridCollectionRequest.cpp index 82b186f2fd7064ec0e6c92d18e384ec12dc58a9c..f39e97e23bbcd97e53839279155002a404b18a5b 100644 --- a/core/src/server/delivery/hybrid_request/CreateHybridCollectionRequest.cpp +++ b/core/src/server/delivery/hybrid_request/CreateHybridCollectionRequest.cpp @@ -57,6 +57,8 @@ CreateHybridCollectionRequest::OnExecute() { try { // step 1: check arguments auto status = ValidationUtil::ValidateCollectionName(collection_name_); + fiu_do_on("CreateHybridCollectionRequest.OnExecute.invalid_collection_name", + status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { return status; } @@ -98,6 +100,8 @@ CreateHybridCollectionRequest::OnExecute() { // step 3: create collection status = DBWrapper::DB()->CreateHybridCollection(collection_info, fields_schema); + fiu_do_on("CreateHybridCollectionRequest.OnExecute.invalid_db_execute", + status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { // collection could exist if (status.code() == DB_ALREADY_EXIST) { diff --git a/core/src/server/delivery/hybrid_request/DescribeHybridCollectionRequest.cpp b/core/src/server/delivery/hybrid_request/DescribeHybridCollectionRequest.cpp index fa3bf78486a70bcbd43987e0a805f1228f3ccd78..2c50cc7573c348d8eb91ccbb4cbe4c92f97f112e 100644 --- a/core/src/server/delivery/hybrid_request/DescribeHybridCollectionRequest.cpp +++ b/core/src/server/delivery/hybrid_request/DescribeHybridCollectionRequest.cpp @@ -53,6 +53,8 @@ DescribeHybridCollectionRequest::OnExecute() { engine::meta::hybrid::FieldsSchema fields_schema; collection_schema.collection_id_ = collection_name_; auto status = DBWrapper::DB()->DescribeHybridCollection(collection_schema, fields_schema); + fiu_do_on("DescribeHybridCollectionRequest.OnExecute.invalid_db_execute", + status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { return status; } diff --git a/core/src/server/delivery/hybrid_request/HybridSearchRequest.cpp b/core/src/server/delivery/hybrid_request/HybridSearchRequest.cpp index c4d527b3f5f7755daa13753ecb8daca08422c7ec..aa0c1a09ec0875fbd62bb58d2328a123e5b104dc 100644 --- a/core/src/server/delivery/hybrid_request/HybridSearchRequest.cpp +++ b/core/src/server/delivery/hybrid_request/HybridSearchRequest.cpp @@ -116,8 +116,8 @@ HybridSearchRequest::OnExecute() { if (!status.ok()) { return status; } - fiu_do_on("HybridSearchRequest.OnExecute.empty_result_ids", result_ids.clear()); - if (result_ids.empty()) { + fiu_do_on("HybridSearchRequest.OnExecute.empty_result_ids", result_.result_ids_.clear()); + if (result_.result_ids_.empty()) { return Status::OK(); // empty table } diff --git a/core/src/server/grpc_impl/GrpcRequestHandler.cpp b/core/src/server/grpc_impl/GrpcRequestHandler.cpp index 0a680a6188cb6ed0859b3414138ba7c9f53ca158..650d701e111807f149dcb8f1db755e1ba628b804 100644 --- a/core/src/server/grpc_impl/GrpcRequestHandler.cpp +++ b/core/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -1159,6 +1159,20 @@ GrpcRequestHandler::DescribeHybridCollection(::grpc::ServerContext* context, const ::milvus::grpc::CollectionName* request, ::milvus::grpc::Mapping* response) { LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->RequestID().c_str(), __func__); + std::unordered_map field_types; + Status status = + request_handler_.DescribeHybridCollection(GetContext(context), request->collection_name(), field_types); + + response->mutable_status()->set_error_code((milvus::grpc::ErrorCode)status.code()); + response->mutable_status()->set_reason(status.message()); + response->set_collection_name(request->collection_name()); + auto field_it = field_types.begin(); + for (; field_it != field_types.end(); field_it++) { + auto field = response->add_fields(); + field->set_name(field_it->first); + field->mutable_type()->set_data_type((milvus::grpc::DataType)field_it->second); + } + CHECK_NULLPTR_RETURN(request); LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->RequestID().c_str(), __func__); return ::grpc::Status::OK; diff --git a/core/src/utils/LogUtil.cpp b/core/src/utils/LogUtil.cpp index d6883bafb70491c3ebc42eea0a5c3952ca3ff7a2..869170b0653a76cb5ae8fe8ca40ff03ff58430dd 100644 --- a/core/src/utils/LogUtil.cpp +++ b/core/src/utils/LogUtil.cpp @@ -11,11 +11,11 @@ #include "utils/LogUtil.h" +#include #include #include #include -#include #include #include diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index 1c5882d25d5bdd284091fba022c424922de4fc94..2b9db83c4e100ff52f9c9fd3c3f5fc2ad9444c3c 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -621,6 +621,11 @@ TEST_F(DBTest, BACK_TIMER_THREAD_1) { db_->Start(); db_->Stop(); fiu_disable("DBImpl.StartMetricTask.InvalidTotalCache"); + + FIU_ENABLE_FIU("options_metric_enable"); + db_->Start(); + db_->Stop(); + fiu_disable("options_metric_enable"); } TEST_F(DBTest, BACK_TIMER_THREAD_2) { @@ -1219,6 +1224,39 @@ TEST_F(DBTest2, GET_VECTOR_BY_ID_TEST) { } } +TEST_F(DBTest2, GET_VECTOR_BY_ID_INVALID_TEST) { + fiu_init(0); + + milvus::engine::meta::CollectionSchema collection_info = BuildCollectionSchema(); + auto stat = db_->CreateCollection(collection_info); + ASSERT_TRUE(stat.ok()); + + uint64_t qb = 1000; + milvus::engine::VectorsData qxb; + BuildVectors(qb, 0, qxb); + + std::string partition_name = "part_name"; + std::string partition_tag = "part_tag"; + stat = db_->CreatePartition(collection_info.collection_id_, partition_name, partition_tag); + ASSERT_TRUE(stat.ok()); + + std::vector vectors; + std::vector empty_array; + stat = db_->GetVectorsByID(collection_info, empty_array, vectors); + ASSERT_FALSE(stat.ok()); + + stat = db_->InsertVectors(collection_info.collection_id_, partition_tag, qxb); + ASSERT_TRUE(stat.ok()); + + db_->Flush(collection_info.collection_id_); + + fiu_enable("bloom_filter_nullptr", 1, NULL, 0); + stat = db_->GetVectorsByID(collection_info, qxb.id_array_, vectors); + ASSERT_FALSE(stat.ok()); + fiu_disable("bloom_filter_nullptr"); +} + + TEST_F(DBTest2, GET_VECTOR_IDS_TEST) { milvus::engine::meta::CollectionSchema collection_schema = BuildCollectionSchema(); auto stat = db_->CreateCollection(collection_schema); diff --git a/core/unittest/db/test_hybrid_db.cpp b/core/unittest/db/test_hybrid_db.cpp index 1d53fce7fa0e4cab2b806a72d4d37d12098bb2bb..a272bf1b9028bbb71e9a942225bd98e843c96f26 100644 --- a/core/unittest/db/test_hybrid_db.cpp +++ b/core/unittest/db/test_hybrid_db.cpp @@ -49,15 +49,13 @@ BuildCollectionSchema(milvus::engine::meta::CollectionSchema& collection_schema, fields[i].collection_id_ = COLLECTION_NAME; fields[i].field_name_ = "field_" + std::to_string(i); } - fields[0].field_type_ = (int)milvus::engine::meta::hybrid::DataType::INT32; - fields[1].field_type_ = (int)milvus::engine::meta::hybrid::DataType::INT64; - fields[2].field_type_ = (int)milvus::engine::meta::hybrid::DataType::FLOAT; - fields[3].field_type_ = (int)milvus::engine::meta::hybrid::DataType::VECTOR; - fields_schema.fields_schema_ = fields; + milvus::engine::meta::hybrid::FieldSchema schema; + schema.field_name_ = "field_vector"; + schema.collection_id_ = TABLE_NAME; + schema.field_type_ = (int)(milvus::engine::meta::hybrid::DataType::VECTOR); + fields.emplace_back(schema); - attr_type.insert(std::make_pair("field_0", milvus::engine::meta::hybrid::DataType::INT32)); - attr_type.insert(std::make_pair("field_1", milvus::engine::meta::hybrid::DataType::INT64)); - attr_type.insert(std::make_pair("field_2", milvus::engine::meta::hybrid::DataType::FLOAT)); + fields_schema.fields_schema_ = fields; } void @@ -87,7 +85,7 @@ BuildEntity(uint64_t n, uint64_t batch_index, milvus::engine::Entity& entity) { vectors.id_array_.push_back(n * batch_index + i); } - entity.vector_data_.insert(std::make_pair("field_3", vectors)); + entity.vector_data_.insert(std::make_pair("field_vector", vectors)); std::vector value_0; std::vector value_1; std::vector value_2; @@ -174,6 +172,7 @@ ConstructGeneralQuery(milvus::query::GeneralQueryPtr& general_query, milvus::que query_ptr->root = general_query->bin; query_ptr->vectors.insert(std::make_pair(vector_placeholder, vector_query)); } + } // namespace TEST_F(DBTest, HYBRID_DB_TEST) { @@ -228,9 +227,9 @@ TEST_F(DBTest, HYBRID_SEARCH_TEST) { uint64_t qb = 1000; milvus::engine::Entity entity; - BuildEntity(qb, 0, entity); + BuildComplexEntity(qb, 0, entity); - std::vector field_names = {"field_0", "field_1", "field_2"}; + std::vector field_names = {"field_0", "field_1", "field_2", "field_3", "field_4", "field_5"}; stat = db_->InsertEntities(COLLECTION_NAME, "", field_names, entity, attr_type); ASSERT_TRUE(stat.ok()); @@ -347,5 +346,3 @@ TEST_F(DBTest2, GET_ENTITY_BY_ID_TEST) { ASSERT_EQ(vector.vector_count_, 0); ASSERT_TRUE(vector.float_data_.empty()); ASSERT_TRUE(vector.binary_data_.empty()); - } -} diff --git a/core/unittest/db/test_meta.cpp b/core/unittest/db/test_meta.cpp index a331e65fb372ff65041a3fb485c09caa812b4d86..435aabdb5f45466e484eef3d3a04931dbe8e735d 100644 --- a/core/unittest/db/test_meta.cpp +++ b/core/unittest/db/test_meta.cpp @@ -295,6 +295,20 @@ TEST_F(MetaTest, FAILED_TEST) { ASSERT_EQ(status.code(), milvus::DB_META_TRANSACTION_FAILED); fiu_disable("SqliteMetaImpl.FilesByType.throw_exception"); } + { + milvus::engine::meta::FilesHolder files_holder; + std::vector collection_array; + milvus::engine::meta::CollectionSchema schema; + schema.collection_id_ = collection_id; + collection_array.emplace_back(schema); + std::vector file_types; + file_types.push_back(milvus::engine::meta::SegmentSchema::INDEX); + FIU_ENABLE_FIU("SqliteMetaImpl.FilesByTypeEx.throw_exception"); + status = impl_->FilesByTypeEx(collection_array, file_types, files_holder); + ASSERT_EQ(status.code(), milvus::DB_META_TRANSACTION_FAILED); + fiu_disable("SqliteMetaImpl.FilesByTypeEx.throw_exception"); + status = impl_->FilesByTypeEx(collection_array, file_types, files_holder); + } { uint64_t size = 0; FIU_ENABLE_FIU("SqliteMetaImpl.Size.throw_exception"); @@ -567,6 +581,9 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) { ++i; } + status = impl.GetCollectionFilesBySegmentId(table_file.segment_id_, files_holder); + ASSERT_TRUE(status.ok()); + impl.DropAll(); } diff --git a/core/unittest/db/test_meta_mysql.cpp b/core/unittest/db/test_meta_mysql.cpp index 7c2c1df77e871acc29c9e69643c17a824aa6062e..97de12ad766f7c4640103488736b1a578def52a4 100644 --- a/core/unittest/db/test_meta_mysql.cpp +++ b/core/unittest/db/test_meta_mysql.cpp @@ -499,6 +499,9 @@ TEST_F(MySqlMetaTest, ARCHIVE_TEST_DISK) { ++i; } + status = impl.GetCollectionFilesBySegmentId(table_file.segment_id_, files_holder); + ASSERT_TRUE(status.ok()); + status = impl.DropAll(); ASSERT_TRUE(status.ok()); } @@ -709,6 +712,17 @@ TEST_F(MySqlMetaTest, COLLECTION_FILES_TEST) { to_index_files_cnt + index_files_cnt; ASSERT_EQ(files_holder.HoldFiles().size(), total_cnt); + std::vector collection_array; + milvus::engine::meta::CollectionSchema schema; + schema.collection_id_ = collection_id; + status = impl_->FilesByTypeEx(collection_array, file_types, files_holder); + ASSERT_TRUE(status.ok()); + + // FIU_ENABLE_FIU("MySQLMetaImpl.FilesByTypeEx.throw_exception"); + // status = impl_->FilesByTypeEx(collection_array, file_types, files_holder); + // ASSERT_FALSE(status.ok()); + // fiu_disable("MySQLMetaImpl.FilesByTypeEx.throw_exception"); + FIU_ENABLE_FIU("MySQLMetaImpl.DeleteCollectionFiles.null_connection"); status = impl_->DeleteCollectionFiles({collection_id}); ASSERT_FALSE(status.ok()); diff --git a/core/unittest/server/test_rpc.cpp b/core/unittest/server/test_rpc.cpp index afdc0a4016b488079b5cd7cdbd4c80a74da66d3f..d0c1982c28db7b89d489cb943c4e97303bcdc4c0 100644 --- a/core/unittest/server/test_rpc.cpp +++ b/core/unittest/server/test_rpc.cpp @@ -1220,6 +1220,11 @@ TEST_F(RpcHandlerTest, HYBRID_TEST) { field_1->mutable_type()->mutable_vector_param()->set_dimension(128); field_1->set_name("field_1"); + milvus::json json_param = {{"metric_type", 1}, {"engine_type", 1}}; + auto extra_param = field_1->add_extra_params(); + extra_param->set_key("params"); + extra_param->set_value(json_param.dump()); + handler->CreateHybridCollection(&context, &mapping, &response); // Insert Entities @@ -1281,6 +1286,15 @@ TEST_F(RpcHandlerTest, HYBRID_TEST) { term_query->mutable_int_value()->Resize(static_cast(nq), 0); memcpy(term_query->mutable_int_value()->mutable_data(), term_value.data(), nq * sizeof(int64_t)); + auto range_query = boolean_query_2->add_general_query()->mutable_range_query(); + range_query->set_field_name("field_0"); + auto comp1 = range_query->add_operand(); + comp1->set_operator_(::milvus::grpc::CompareOperator::GTE); + comp1->set_operand("0"); + auto comp2 = range_query->add_operand(); + comp2->set_operator_(::milvus::grpc::CompareOperator::LTE); + comp2->set_operand("100000"); + auto vector_query = boolean_query_2->add_general_query()->mutable_vector_query(); vector_query->set_field_name("field_1"); vector_query->set_topk(topk); @@ -1297,10 +1311,146 @@ TEST_F(RpcHandlerTest, HYBRID_TEST) { auto row_record = vector_query->add_records(); CopyRowRecord(row_record, record); } - auto extra_param = vector_query->add_extra_params(); + auto extra_param_1 = vector_query->add_extra_params(); + extra_param_1->set_key("params"); + milvus::json param = {{"nprobe", 16}}; + extra_param_1->set_value(param.dump()); + + search_param.set_collection_name("test_hybrid"); + auto search_extra_param = search_param.add_extra_params(); + search_extra_param->set_key("params"); + search_extra_param->set_value(""); + + milvus::grpc::TopKQueryResult topk_query_result; + handler->HybridSearch(&context, &search_param, &topk_query_result); +} + +TEST_F(RpcHandlerTest, HYBRID_INVALID_TEST) { + fiu_init(0); + + ::grpc::ServerContext context; + milvus::grpc::Mapping mapping; + milvus::grpc::Status response; + + uint64_t row_num = 1000; + uint64_t dimension = 128; + + // Create Hybrid Collection + mapping.set_collection_name("test_hybrid"); + auto field_0 = mapping.add_fields(); + field_0->set_name("field_0"); + field_0->mutable_type()->set_data_type(::milvus::grpc::DataType::INT64); + + auto field_1 = mapping.add_fields(); + field_1->mutable_type()->mutable_vector_param()->set_dimension(128); + field_1->set_name("field_1"); + + milvus::json json_param = {{"metric_type", 1}, {"engine_type", 1}}; + auto extra_param = field_1->add_extra_params(); extra_param->set_key("params"); + extra_param->set_value(json_param.dump()); + + fiu_enable("CreateHybridCollectionRequest.OnExecute.invalid_collection_name", 1, NULL, 0); + handler->CreateHybridCollection(&context, &mapping, &response); + fiu_disable("CreateHybridCollectionRequest.OnExecute.invalid_collection_name"); + + fiu_enable("CreateHybridCollectionRequest.OnExecute.invalid_db_execute", 1, NULL, 0); + handler->CreateHybridCollection(&context, &mapping, &response); + fiu_disable("CreateHybridCollectionRequest.OnExecute.invalid_db_execute"); + + handler->CreateHybridCollection(&context, &mapping, &response); + milvus::grpc::CollectionName grpc_collection_name; + grpc_collection_name.set_collection_name("test_hybrid"); + fiu_enable("DescribeHybridCollectionRequest.OnExecute.invalid_db_execute", 1, NULL, 0); + handler->DescribeHybridCollection(&context, &grpc_collection_name, &mapping); + fiu_disable("DescribeHybridCollectionRequest.OnExecute.invalid_db_execute"); + handler->DescribeHybridCollection(&context, &grpc_collection_name, &mapping); + + // Insert Entities + milvus::grpc::HInsertParam insert_param; + milvus::grpc::HEntityIDs entity_ids; + insert_param.set_collection_name("test_hybrid"); + + auto entity = insert_param.mutable_entities(); + auto field_name_0 = entity->add_field_names(); + *field_name_0 = "field_0"; + auto field_name_1 = entity->add_field_names(); + *field_name_1 = "field_1"; + + entity->set_row_num(row_num); + std::vector field_value(row_num, 0); + for (uint64_t i = 0; i < row_num; i++) { + field_value[i] = i; + } + entity->set_attr_records(field_value.data(), row_num * sizeof(int64_t)); + + std::vector> vector_field; + vector_field.resize(row_num); + for (uint64_t i = 0; i < row_num; ++i) { + vector_field[i].resize(dimension); + for (uint64_t j = 0; j < dimension; ++j) { + vector_field[i][j] = (float)((i + 10) / (j + 20)); + } + } + auto vector_record = entity->add_result_values(); + for (uint64_t i = 0; i < row_num; ++i) { + auto record = vector_record->mutable_vector_value()->add_value(); + auto vector_data = record->mutable_float_data(); + vector_data->Resize(static_cast(vector_field[i].size()), 0.0); + memcpy(vector_data->mutable_data(), vector_field[i].data(), vector_field[i].size() * sizeof(float)); + } + + fiu_enable("InsertEntityRequest.OnExecute.throw_std_exception", 1, NULL, 0); + handler->InsertEntity(&context, &insert_param, &entity_ids); + fiu_disable("InsertEntityRequest.OnExecute.throw_std_exception"); + handler->InsertEntity(&context, &insert_param, &entity_ids); + + uint64_t nq = 10; + uint64_t topk = 10; + milvus::grpc::HSearchParam search_param; + auto general_query = search_param.mutable_general_query(); + auto boolean_query_1 = general_query->mutable_boolean_query(); + boolean_query_1->set_occur(milvus::grpc::Occur::MUST); + auto general_query_1 = boolean_query_1->add_general_query(); + auto boolean_query_2 = general_query_1->mutable_boolean_query(); + auto term_query = boolean_query_2->add_general_query()->mutable_term_query(); + term_query->set_field_name("field_0"); + std::vector term_value(nq, 0); + for (uint64_t i = 0; i < nq; ++i) { + term_value[i] = i + nq; + } + term_query->set_value_num(nq); + term_query->set_values(term_value.data(), nq * sizeof(int64_t)); + + auto range_query = boolean_query_2->add_general_query()->mutable_range_query(); + range_query->set_field_name("field_0"); + auto comp1 = range_query->add_operand(); + comp1->set_operator_(::milvus::grpc::CompareOperator::GTE); + comp1->set_operand("0"); + auto comp2 = range_query->add_operand(); + comp2->set_operator_(::milvus::grpc::CompareOperator::LTE); + comp2->set_operand("100000"); + + auto vector_query = boolean_query_2->add_general_query()->mutable_vector_query(); + vector_query->set_field_name("field_1"); + vector_query->set_topk(topk); + vector_query->set_query_boost(2); + std::vector> query_vector; + query_vector.resize(nq); + for (uint64_t i = 0; i < nq; ++i) { + query_vector[i].resize(dimension); + for (uint64_t j = 0; j < dimension; ++j) { + query_vector[i][j] = (float)((j + 1) / (i + dimension)); + } + } + for (auto record : query_vector) { + auto row_record = vector_query->add_records(); + CopyRowRecord(row_record, record); + } + auto extra_param_1 = vector_query->add_extra_params(); + extra_param_1->set_key("params"); milvus::json param = {{"nprobe", 16}}; - extra_param->set_value(param.dump()); + extra_param_1->set_value(param.dump()); search_param.set_collection_name("test_hybrid"); auto search_extra_param = search_param.add_extra_params();