From 014a0e860cd03b75c89036eed0e6e30704eb841d Mon Sep 17 00:00:00 2001 From: groot Date: Mon, 18 May 2020 22:21:53 -0500 Subject: [PATCH] QPS decrease (#2372) * optimize merge strategy Signed-off-by: yhmo * #2365 Signed-off-by: groot * fix typo Signed-off-by: groot * optimize search Signed-off-by: groot * code format Signed-off-by: groot --- core/src/db/DBImpl.cpp | 19 ++- core/src/db/DBImpl.h | 4 +- core/src/db/merge/MergeAdaptiveStrategy.cpp | 91 ++++++++++++ core/src/db/merge/MergeAdaptiveStrategy.h | 29 ++++ core/src/db/merge/MergeLayeredStrategy.cpp | 3 +- core/src/db/merge/MergeManager.h | 19 +++ core/src/db/merge/MergeManagerImpl.cpp | 8 +- core/src/db/merge/MergeManagerImpl.h | 6 + core/src/db/meta/Meta.h | 5 + core/src/db/meta/MySQLMetaImpl.cpp | 110 +++++++++++++++ core/src/db/meta/MySQLMetaImpl.h | 5 + core/src/db/meta/SqliteMetaImpl.cpp | 146 +++++++++++++++----- core/src/db/meta/SqliteMetaImpl.h | 5 + 13 files changed, 406 insertions(+), 44 deletions(-) create mode 100644 core/src/db/merge/MergeAdaptiveStrategy.cpp create mode 100644 core/src/db/merge/MergeAdaptiveStrategy.h diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index f8a5a5bf..39d757c9 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -1393,9 +1393,10 @@ DBImpl::CreateIndex(const std::shared_ptr& context, const std:: } } - // step 3: let merge file thread finish - // to avoid duplicate data bug - WaitMergeFileFinish(); + // step 3: wait merge file thread finished to avoid duplicate data bug + WaitMergeFileFinish(); // let merge file thread finish + StartMergeTask(true); // start force-merge task + WaitMergeFileFinish(); // let force-merge file thread finish // step 4: wait and build index status = index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id); @@ -1897,7 +1898,7 @@ DBImpl::StartMetricTask() { } void -DBImpl::StartMergeTask() { +DBImpl::StartMergeTask(bool force_merge_all) { // LOG_ENGINE_DEBUG_ << "Begin StartMergeTask"; // merge task has been finished? { @@ -1927,7 +1928,7 @@ DBImpl::StartMergeTask() { // start merge file thread merge_thread_results_.push_back( - merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids_)); + merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids_, force_merge_all)); merge_collection_ids_.clear(); } } @@ -2031,14 +2032,20 @@ DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& fi } void -DBImpl::BackgroundMerge(std::set collection_ids) { +DBImpl::BackgroundMerge(std::set collection_ids, bool force_merge_all) { // LOG_ENGINE_TRACE_ << " Background merge thread start"; Status status; for (auto& collection_id : collection_ids) { const std::lock_guard lock(flush_merge_compact_mutex_); + auto old_strategy = merge_mgr_ptr_->Strategy(); + if (force_merge_all) { + merge_mgr_ptr_->UseStrategy(MergeStrategyType::ADAPTIVE); + } + auto status = merge_mgr_ptr_->MergeFiles(collection_id); + merge_mgr_ptr_->UseStrategy(old_strategy); if (!status.ok()) { LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id << " reason:" << status.message(); diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index 7a3a2032..69a52f72 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -228,10 +228,10 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi StartMetricTask(); void - StartMergeTask(); + StartMergeTask(bool force_merge_all = false); void - BackgroundMerge(std::set collection_ids); + BackgroundMerge(std::set collection_ids, bool force_merge_all); Status MergeHybridFiles(const std::string& table_id, meta::FilesHolder& files_holder); diff --git a/core/src/db/merge/MergeAdaptiveStrategy.cpp b/core/src/db/merge/MergeAdaptiveStrategy.cpp new file mode 100644 index 00000000..bf6626df --- /dev/null +++ b/core/src/db/merge/MergeAdaptiveStrategy.cpp @@ -0,0 +1,91 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/merge/MergeAdaptiveStrategy.h" +#include "utils/Log.h" + +#include +#include + +namespace milvus { +namespace engine { +namespace { +struct { + bool + operator()(meta::SegmentSchema& left, meta::SegmentSchema& right) const { + return left.file_size_ > right.file_size_; + } +} CompareSegment; +} // namespace + +Status +MergeAdaptiveStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) { + meta::SegmentsSchema sort_files; + meta::SegmentsSchema& files = files_holder.HoldFiles(); + for (meta::SegmentsSchema::reverse_iterator iter = files.rbegin(); iter != files.rend(); ++iter) { + meta::SegmentSchema& file = *iter; + if (file.index_file_size_ > 0 && file.file_size_ > file.index_file_size_) { + // file that no need to merge + continue; + } + sort_files.push_back(file); + } + + // no need to merge single file + if (sort_files.size() < 2) { + return Status::OK(); + } + + // two files, simply merge them + if (sort_files.size() == 2) { + files_groups.emplace_back(sort_files); + return Status::OK(); + } + + // arrange files by file size in descending order + std::sort(sort_files.begin(), sort_files.end(), CompareSegment); + + // pick files to merge + int64_t index_file_size = sort_files[0].index_file_size_; + while (true) { + meta::SegmentsSchema temp_group; + int64_t sum_size = 0; + for (auto iter = sort_files.begin(); iter != sort_files.end();) { + meta::SegmentSchema& file = *iter; + if (sum_size + file.file_size_ <= index_file_size) { + temp_group.push_back(file); + sum_size += file.file_size_; + iter = sort_files.erase(iter); + } else { + if ((iter + 1 == sort_files.end()) && sum_size < index_file_size) { + temp_group.push_back(file); + sort_files.erase(iter); + break; + } else { + ++iter; + } + } + } + + if (!temp_group.empty()) { + files_groups.emplace_back(temp_group); + } + + if (sort_files.empty()) { + break; + } + } + + return Status::OK(); +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/MergeAdaptiveStrategy.h b/core/src/db/merge/MergeAdaptiveStrategy.h new file mode 100644 index 00000000..79e4ef57 --- /dev/null +++ b/core/src/db/merge/MergeAdaptiveStrategy.h @@ -0,0 +1,29 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include + +#include "db/merge/MergeStrategy.h" +#include "utils/Status.h" + +namespace milvus { +namespace engine { + +class MergeAdaptiveStrategy : public MergeStrategy { + public: + Status + RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) override; +}; // MergeSimpleStrategy + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/MergeLayeredStrategy.cpp b/core/src/db/merge/MergeLayeredStrategy.cpp index f5ab9a9f..9568fe46 100644 --- a/core/src/db/merge/MergeLayeredStrategy.cpp +++ b/core/src/db/merge/MergeLayeredStrategy.cpp @@ -40,8 +40,7 @@ MergeLayeredStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGr for (meta::SegmentsSchema::reverse_iterator iter = files.rbegin(); iter != files.rend(); ++iter) { meta::SegmentSchema& file = *iter; if (file.index_file_size_ > 0 && file.file_size_ > (size_t)(file.index_file_size_)) { - // release file that no need to merge - files_holder.UnmarkFile(file); + // file that no need to merge continue; } diff --git a/core/src/db/merge/MergeManager.h b/core/src/db/merge/MergeManager.h index 698e64e5..ddc24aba 100644 --- a/core/src/db/merge/MergeManager.h +++ b/core/src/db/merge/MergeManager.h @@ -24,15 +24,34 @@ namespace milvus { namespace engine { +// 1. SIMPLE +// merge in old way, merge files one by one, stop merge until file size exceed index_file_size +// 2. LAYERED +// distribute files to several groups according to file size +// firstly, define layers by file size: 4MB, 16MB, 64MB, 256MB, 1024MB +// if file size between 0MB~4MB, put it into layer "4" +// if file size between 4MB~16MB, put it into layer "16" +// if file size between 16MB~64MB, put it into layer "64" +// if file size between 64MB~256MB, put it into layer "256" +// if file size between 256MB~1024MB, put it into layer "1024" +// secondly, merge files for each group +// third, if some file's create time is 30 seconds ago, and it still un-merged, force merge with upper layer files +// 3. ADAPTIVE +// Pick files that sum of size is close to index_file_size, merge them enum class MergeStrategyType { SIMPLE = 1, LAYERED = 2, + ADAPTIVE = 3, }; class MergeManager { public: + virtual MergeStrategyType + Strategy() const = 0; + virtual Status UseStrategy(MergeStrategyType type) = 0; + virtual Status MergeFiles(const std::string& collection_id) = 0; }; // MergeManager diff --git a/core/src/db/merge/MergeManagerImpl.cpp b/core/src/db/merge/MergeManagerImpl.cpp index 50254484..963c7569 100644 --- a/core/src/db/merge/MergeManagerImpl.cpp +++ b/core/src/db/merge/MergeManagerImpl.cpp @@ -10,6 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "db/merge/MergeManagerImpl.h" +#include "db/merge/MergeAdaptiveStrategy.h" #include "db/merge/MergeLayeredStrategy.h" #include "db/merge/MergeSimpleStrategy.h" #include "db/merge/MergeStrategy.h" @@ -21,7 +22,7 @@ namespace milvus { namespace engine { MergeManagerImpl::MergeManagerImpl(const meta::MetaPtr& meta_ptr, const DBOptions& options, MergeStrategyType type) - : meta_ptr_(meta_ptr), options_(options) { + : meta_ptr_(meta_ptr), options_(options), strategy_type_(type) { UseStrategy(type); } @@ -36,12 +37,17 @@ MergeManagerImpl::UseStrategy(MergeStrategyType type) { strategy_ = std::make_shared(); break; } + case MergeStrategyType::ADAPTIVE: { + strategy_ = std::make_shared(); + break; + } default: { std::string msg = "Unsupported merge strategy type: " + std::to_string((int32_t)type); LOG_ENGINE_ERROR_ << msg; throw Exception(DB_ERROR, msg); } } + strategy_type_ = type; return Status::OK(); } diff --git a/core/src/db/merge/MergeManagerImpl.h b/core/src/db/merge/MergeManagerImpl.h index 257bf100..7772bc0f 100644 --- a/core/src/db/merge/MergeManagerImpl.h +++ b/core/src/db/merge/MergeManagerImpl.h @@ -31,6 +31,11 @@ class MergeManagerImpl : public MergeManager { public: MergeManagerImpl(const meta::MetaPtr& meta_ptr, const DBOptions& options, MergeStrategyType type); + MergeStrategyType + Strategy() const override { + return strategy_type_; + } + Status UseStrategy(MergeStrategyType type) override; @@ -41,6 +46,7 @@ class MergeManagerImpl : public MergeManager { meta::MetaPtr meta_ptr_; DBOptions options_; + MergeStrategyType strategy_type_ = MergeStrategyType::SIMPLE; MergeStrategyPtr strategy_; }; // MergeManagerImpl diff --git a/core/src/db/meta/Meta.h b/core/src/db/meta/Meta.h index abb26b2d..02d46463 100644 --- a/core/src/db/meta/Meta.h +++ b/core/src/db/meta/Meta.h @@ -13,6 +13,7 @@ #include #include +#include #include #include @@ -124,6 +125,10 @@ class Meta { virtual Status FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) = 0; + virtual Status + FilesToSearchEx(const std::string& root_collection, const std::set& partition_id_array, + FilesHolder& files_holder) = 0; + virtual Status FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) = 0; diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index ae98a28f..a7bb6bf1 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -1692,6 +1692,116 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& file } } +Status +MySQLMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::set& partition_id_array, + FilesHolder& files_holder) { + try { + server::MetricCollector metric; + + // get root collection information + CollectionSchema collection_schema; + collection_schema.collection_id_ = root_collection; + auto status = DescribeCollection(collection_schema); + if (!status.ok()) { + return status; + } + + // distribute id array to batchs + const int64_t batch_size = 50; + std::vector> id_groups; + std::vector temp_group = {root_collection}; + int64_t count = 1; + for (auto& id : partition_id_array) { + temp_group.push_back(id); + count++; + if (count >= batch_size) { + id_groups.emplace_back(temp_group); + temp_group.clear(); + count = 0; + } + } + + if (!temp_group.empty()) { + id_groups.emplace_back(temp_group); + } + + // perform query batch by batch + int64_t files_count = 0; + Status ret; + for (auto group : id_groups) { + mysqlpp::StoreQueryResult res; + { + mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_); + + bool is_null_connection = (connectionPtr == nullptr); + if (is_null_connection) { + return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); + } + + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); + + mysqlpp::Query statement = connectionPtr->query(); + statement + << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date" + << " FROM " << META_TABLEFILES << " WHERE table_id in ("; + for (size_t i = 0; i < group.size(); i++) { + statement << mysqlpp::quote << group[i]; + if (i != group.size() - 1) { + statement << ","; + } + } + statement << ")"; + + // End + statement << " AND" + << " (file_type = " << std::to_string(SegmentSchema::RAW) + << " OR file_type = " << std::to_string(SegmentSchema::TO_INDEX) + << " OR file_type = " << std::to_string(SegmentSchema::INDEX) << ");"; + + LOG_ENGINE_DEBUG_ << "FilesToSearch: " << statement.str(); + + res = statement.store(); + } // Scoped Connection + + for (auto& resRow : res) { + SegmentSchema collection_file; + collection_file.id_ = resRow["id"]; // implicit conversion + resRow["table_id"].to_string(collection_file.collection_id_); + resRow["segment_id"].to_string(collection_file.segment_id_); + collection_file.index_file_size_ = collection_schema.index_file_size_; + collection_file.engine_type_ = resRow["engine_type"]; + collection_file.index_params_ = collection_schema.index_params_; + collection_file.metric_type_ = collection_schema.metric_type_; + resRow["file_id"].to_string(collection_file.file_id_); + collection_file.file_type_ = resRow["file_type"]; + collection_file.file_size_ = resRow["file_size"]; + collection_file.row_count_ = resRow["row_count"]; + collection_file.date_ = resRow["date"]; + collection_file.dimension_ = collection_schema.dimension_; + + auto status = utils::GetCollectionFilePath(options_, collection_file); + if (!status.ok()) { + ret = status; + continue; + } + + files_holder.MarkFile(collection_file); + files_count++; + } + } + + if (files_count == 0) { + LOG_ENGINE_DEBUG_ << "No file to search for collection: " << root_collection; + } else { + LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-search files in collection " << root_collection; + } + return ret; + } catch (std::exception& e) { + return HandleException("Failed to get files to search", e.what()); + } +} + Status MySQLMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) { try { diff --git a/core/src/db/meta/MySQLMetaImpl.h b/core/src/db/meta/MySQLMetaImpl.h index d30c69db..708063e4 100644 --- a/core/src/db/meta/MySQLMetaImpl.h +++ b/core/src/db/meta/MySQLMetaImpl.h @@ -15,6 +15,7 @@ #include #include +#include #include #include @@ -109,6 +110,10 @@ class MySQLMetaImpl : public Meta { Status FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) override; + Status + FilesToSearchEx(const std::string& root_collection, const std::set& partition_id_array, + FilesHolder& files_holder) override; + Status FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) override; diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index 373d15ae..d188522f 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -286,13 +286,14 @@ SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not decltype(ConnectorPtr->select(select_columns)) selected; if (is_root) { selected = ConnectorPtr->select(select_columns, - where(c(&CollectionSchema::collection_id_) == collection_id - and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE - and c(&CollectionSchema::owner_collection_) == "")); + where(c(&CollectionSchema::collection_id_) == collection_id + and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE + and c(&CollectionSchema::owner_collection_) == "")); } else { selected = ConnectorPtr->select(select_columns, - where(c(&CollectionSchema::collection_id_) == collection_id - and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + where(c(&CollectionSchema::collection_id_) == collection_id + and c(&CollectionSchema::state_) + != (int)CollectionSchema::TO_DELETE)); } if (selected.size() == 1) { @@ -1118,6 +1119,99 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& fil } } +Status +SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection, + const std::set& partition_id_array, + FilesHolder& files_holder) { + try { + server::MetricCollector metric; + + // get root collection information + CollectionSchema collection_schema; + collection_schema.collection_id_ = root_collection; + auto status = DescribeCollection(collection_schema); + if (!status.ok()) { + return status; + } + + // distribute id array to batchs + const int64_t batch_size = 50; + std::vector> id_groups; + std::vector temp_group = {root_collection}; + int64_t count = 1; + for (auto& id : partition_id_array) { + temp_group.push_back(id); + count++; + if (count >= batch_size) { + id_groups.emplace_back(temp_group); + temp_group.clear(); + count = 0; + } + } + + if (!temp_group.empty()) { + id_groups.emplace_back(temp_group); + } + + // perform query batch by batch + int64_t files_count = 0; + Status ret; + for (auto group : id_groups) { + auto select_columns = + columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_); + + auto match_collectionid = in(&SegmentSchema::collection_id_, group); + + std::vector file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX, + (int)SegmentSchema::INDEX}; + auto match_type = in(&SegmentSchema::file_type_, file_types); + decltype(ConnectorPtr->select(select_columns)) selected; + { + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + auto filter = where(match_collectionid and match_type); + selected = ConnectorPtr->select(select_columns, filter); + } + + for (auto& file : selected) { + SegmentSchema collection_file; + collection_file.id_ = std::get<0>(file); + collection_file.collection_id_ = std::get<1>(file); + collection_file.segment_id_ = std::get<2>(file); + collection_file.file_id_ = std::get<3>(file); + collection_file.file_type_ = std::get<4>(file); + collection_file.file_size_ = std::get<5>(file); + collection_file.row_count_ = std::get<6>(file); + collection_file.date_ = std::get<7>(file); + collection_file.engine_type_ = std::get<8>(file); + collection_file.dimension_ = collection_schema.dimension_; + collection_file.index_file_size_ = collection_schema.index_file_size_; + collection_file.index_params_ = collection_schema.index_params_; + collection_file.metric_type_ = collection_schema.metric_type_; + + auto status = utils::GetCollectionFilePath(options_, collection_file); + if (!status.ok()) { + ret = status; + continue; + } + + files_holder.MarkFile(collection_file); + files_count++; + } + } + if (files_count == 0) { + LOG_ENGINE_DEBUG_ << "No file to search for collection: " << root_collection; + } else { + LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-search files in collection " << root_collection; + } + return ret; + } catch (std::exception& e) { + return HandleException("Encounter exception when iterate index files", e.what()); + } +} + Status SqliteMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) { try { @@ -1315,29 +1409,21 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, file_schema.metric_type_ = collection_schema.metric_type_; switch (file_schema.file_type_) { - case (int)SegmentSchema::RAW: - ++raw_count; + case (int)SegmentSchema::RAW:++raw_count; break; - case (int)SegmentSchema::NEW: - ++new_count; + case (int)SegmentSchema::NEW:++new_count; break; - case (int)SegmentSchema::NEW_MERGE: - ++new_merge_count; + case (int)SegmentSchema::NEW_MERGE:++new_merge_count; break; - case (int)SegmentSchema::NEW_INDEX: - ++new_index_count; + case (int)SegmentSchema::NEW_INDEX:++new_index_count; break; - case (int)SegmentSchema::TO_INDEX: - ++to_index_count; + case (int)SegmentSchema::TO_INDEX:++to_index_count; break; - case (int)SegmentSchema::INDEX: - ++index_count; + case (int)SegmentSchema::INDEX:++index_count; break; - case (int)SegmentSchema::BACKUP: - ++backup_count; - break; - default: + case (int)SegmentSchema::BACKUP:++backup_count; break; + default:break; } auto status = utils::GetCollectionFilePath(options_, file_schema); @@ -1351,11 +1437,9 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, std::string msg = "Get collection files by type."; for (int file_type : file_types) { switch (file_type) { - case (int)SegmentSchema::RAW: - msg = msg + " raw files:" + std::to_string(raw_count); + case (int)SegmentSchema::RAW:msg = msg + " raw files:" + std::to_string(raw_count); break; - case (int)SegmentSchema::NEW: - msg = msg + " new files:" + std::to_string(new_count); + case (int)SegmentSchema::NEW:msg = msg + " new files:" + std::to_string(new_count); break; case (int)SegmentSchema::NEW_MERGE: msg = msg + " new_merge files:" + std::to_string(new_merge_count); @@ -1363,17 +1447,13 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, case (int)SegmentSchema::NEW_INDEX: msg = msg + " new_index files:" + std::to_string(new_index_count); break; - case (int)SegmentSchema::TO_INDEX: - msg = msg + " to_index files:" + std::to_string(to_index_count); - break; - case (int)SegmentSchema::INDEX: - msg = msg + " index files:" + std::to_string(index_count); + case (int)SegmentSchema::TO_INDEX:msg = msg + " to_index files:" + std::to_string(to_index_count); break; - case (int)SegmentSchema::BACKUP: - msg = msg + " backup files:" + std::to_string(backup_count); + case (int)SegmentSchema::INDEX:msg = msg + " index files:" + std::to_string(index_count); break; - default: + case (int)SegmentSchema::BACKUP:msg = msg + " backup files:" + std::to_string(backup_count); break; + default:break; } } LOG_ENGINE_DEBUG_ << msg; diff --git a/core/src/db/meta/SqliteMetaImpl.h b/core/src/db/meta/SqliteMetaImpl.h index 6a9684ea..d10a7599 100644 --- a/core/src/db/meta/SqliteMetaImpl.h +++ b/core/src/db/meta/SqliteMetaImpl.h @@ -12,6 +12,7 @@ #pragma once #include +#include #include #include @@ -111,6 +112,10 @@ class SqliteMetaImpl : public Meta { Status FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) override; + Status + FilesToSearchEx(const std::string& root_collection, const std::set& partition_id_array, + FilesHolder& files_holder) override; + Status FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) override; -- GitLab