提交 47303d25 编写于 作者: G groot 提交者: jinhai

QPS decrease (#2372)

* optimize merge strategy
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* #2365
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* fix typo
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* optimize search
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* code format
Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 1b8a06da
......@@ -1624,9 +1624,10 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
}
}
// step 3: let merge file thread finish
// to avoid duplicate data bug
WaitMergeFileFinish();
// step 3: wait merge file thread finished to avoid duplicate data bug
WaitMergeFileFinish(); // let merge file thread finish
StartMergeTask(true); // start force-merge task
WaitMergeFileFinish(); // let force-merge file thread finish
// step 4: wait and build index
status = index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
......@@ -2150,7 +2151,7 @@ DBImpl::StartMetricTask() {
}
void
DBImpl::StartMergeTask() {
DBImpl::StartMergeTask(bool force_merge_all) {
// LOG_ENGINE_DEBUG_ << "Begin StartMergeTask";
// merge task has been finished?
{
......@@ -2180,7 +2181,7 @@ DBImpl::StartMergeTask() {
// start merge file thread
merge_thread_results_.push_back(
merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids_));
merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids_, force_merge_all));
merge_collection_ids_.clear();
}
}
......@@ -2284,14 +2285,20 @@ DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& fi
}
void
DBImpl::BackgroundMerge(std::set<std::string> collection_ids) {
DBImpl::BackgroundMerge(std::set<std::string> collection_ids, bool force_merge_all) {
// LOG_ENGINE_TRACE_ << " Background merge thread start";
Status status;
for (auto& collection_id : collection_ids) {
const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
auto old_strategy = merge_mgr_ptr_->Strategy();
if (force_merge_all) {
merge_mgr_ptr_->UseStrategy(MergeStrategyType::ADAPTIVE);
}
auto status = merge_mgr_ptr_->MergeFiles(collection_id);
merge_mgr_ptr_->UseStrategy(old_strategy);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id
<< " reason:" << status.message();
......
......@@ -238,10 +238,10 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
StartMetricTask();
void
StartMergeTask();
StartMergeTask(bool force_merge_all = false);
void
BackgroundMerge(std::set<std::string> collection_ids);
BackgroundMerge(std::set<std::string> collection_ids, bool force_merge_all);
Status
MergeHybridFiles(const std::string& table_id, meta::FilesHolder& files_holder);
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/MergeAdaptiveStrategy.h"
#include "utils/Log.h"
#include <algorithm>
#include <vector>
namespace milvus {
namespace engine {
namespace {
struct {
bool
operator()(meta::SegmentSchema& left, meta::SegmentSchema& right) const {
return left.file_size_ > right.file_size_;
}
} CompareSegment;
} // namespace
Status
MergeAdaptiveStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) {
meta::SegmentsSchema sort_files;
meta::SegmentsSchema& files = files_holder.HoldFiles();
for (meta::SegmentsSchema::reverse_iterator iter = files.rbegin(); iter != files.rend(); ++iter) {
meta::SegmentSchema& file = *iter;
if (file.index_file_size_ > 0 && file.file_size_ > file.index_file_size_) {
// file that no need to merge
continue;
}
sort_files.push_back(file);
}
// no need to merge single file
if (sort_files.size() < 2) {
return Status::OK();
}
// two files, simply merge them
if (sort_files.size() == 2) {
files_groups.emplace_back(sort_files);
return Status::OK();
}
// arrange files by file size in descending order
std::sort(sort_files.begin(), sort_files.end(), CompareSegment);
// pick files to merge
int64_t index_file_size = sort_files[0].index_file_size_;
while (true) {
meta::SegmentsSchema temp_group;
int64_t sum_size = 0;
for (auto iter = sort_files.begin(); iter != sort_files.end();) {
meta::SegmentSchema& file = *iter;
if (sum_size + file.file_size_ <= index_file_size) {
temp_group.push_back(file);
sum_size += file.file_size_;
iter = sort_files.erase(iter);
} else {
if ((iter + 1 == sort_files.end()) && sum_size < index_file_size) {
temp_group.push_back(file);
sort_files.erase(iter);
break;
} else {
++iter;
}
}
}
if (!temp_group.empty()) {
files_groups.emplace_back(temp_group);
}
if (sort_files.empty()) {
break;
}
}
return Status::OK();
}
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <vector>
#include "db/merge/MergeStrategy.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
class MergeAdaptiveStrategy : public MergeStrategy {
public:
Status
RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) override;
}; // MergeSimpleStrategy
} // namespace engine
} // namespace milvus
......@@ -40,8 +40,7 @@ MergeLayeredStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGr
for (meta::SegmentsSchema::reverse_iterator iter = files.rbegin(); iter != files.rend(); ++iter) {
meta::SegmentSchema& file = *iter;
if (file.index_file_size_ > 0 && file.file_size_ > (size_t)(file.index_file_size_)) {
// release file that no need to merge
files_holder.UnmarkFile(file);
// file that no need to merge
continue;
}
......
......@@ -24,15 +24,34 @@
namespace milvus {
namespace engine {
// 1. SIMPLE
// merge in old way, merge files one by one, stop merge until file size exceed index_file_size
// 2. LAYERED
// distribute files to several groups according to file size
// firstly, define layers by file size: 4MB, 16MB, 64MB, 256MB, 1024MB
// if file size between 0MB~4MB, put it into layer "4"
// if file size between 4MB~16MB, put it into layer "16"
// if file size between 16MB~64MB, put it into layer "64"
// if file size between 64MB~256MB, put it into layer "256"
// if file size between 256MB~1024MB, put it into layer "1024"
// secondly, merge files for each group
// third, if some file's create time is 30 seconds ago, and it still un-merged, force merge with upper layer files
// 3. ADAPTIVE
// Pick files that sum of size is close to index_file_size, merge them
enum class MergeStrategyType {
SIMPLE = 1,
LAYERED = 2,
ADAPTIVE = 3,
};
class MergeManager {
public:
virtual MergeStrategyType
Strategy() const = 0;
virtual Status
UseStrategy(MergeStrategyType type) = 0;
virtual Status
MergeFiles(const std::string& collection_id) = 0;
}; // MergeManager
......
......@@ -10,6 +10,7 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/MergeManagerImpl.h"
#include "db/merge/MergeAdaptiveStrategy.h"
#include "db/merge/MergeLayeredStrategy.h"
#include "db/merge/MergeSimpleStrategy.h"
#include "db/merge/MergeStrategy.h"
......@@ -21,7 +22,7 @@ namespace milvus {
namespace engine {
MergeManagerImpl::MergeManagerImpl(const meta::MetaPtr& meta_ptr, const DBOptions& options, MergeStrategyType type)
: meta_ptr_(meta_ptr), options_(options) {
: meta_ptr_(meta_ptr), options_(options), strategy_type_(type) {
UseStrategy(type);
}
......@@ -36,12 +37,17 @@ MergeManagerImpl::UseStrategy(MergeStrategyType type) {
strategy_ = std::make_shared<MergeLayeredStrategy>();
break;
}
case MergeStrategyType::ADAPTIVE: {
strategy_ = std::make_shared<MergeAdaptiveStrategy>();
break;
}
default: {
std::string msg = "Unsupported merge strategy type: " + std::to_string((int32_t)type);
LOG_ENGINE_ERROR_ << msg;
throw Exception(DB_ERROR, msg);
}
}
strategy_type_ = type;
return Status::OK();
}
......
......@@ -31,6 +31,11 @@ class MergeManagerImpl : public MergeManager {
public:
MergeManagerImpl(const meta::MetaPtr& meta_ptr, const DBOptions& options, MergeStrategyType type);
MergeStrategyType
Strategy() const override {
return strategy_type_;
}
Status
UseStrategy(MergeStrategyType type) override;
......@@ -41,6 +46,7 @@ class MergeManagerImpl : public MergeManager {
meta::MetaPtr meta_ptr_;
DBOptions options_;
MergeStrategyType strategy_type_ = MergeStrategyType::SIMPLE;
MergeStrategyPtr strategy_;
}; // MergeManagerImpl
......
......@@ -13,6 +13,7 @@
#include <cstddef>
#include <memory>
#include <set>
#include <string>
#include <vector>
......@@ -124,6 +125,10 @@ class Meta {
virtual Status
FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) = 0;
virtual Status
FilesToSearchEx(const std::string& root_collection, const std::set<std::string>& partition_id_array,
FilesHolder& files_holder) = 0;
virtual Status
FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) = 0;
......
......@@ -1692,6 +1692,116 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& file
}
}
Status
MySQLMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::set<std::string>& partition_id_array,
FilesHolder& files_holder) {
try {
server::MetricCollector metric;
// get root collection information
CollectionSchema collection_schema;
collection_schema.collection_id_ = root_collection;
auto status = DescribeCollection(collection_schema);
if (!status.ok()) {
return status;
}
// distribute id array to batchs
const int64_t batch_size = 50;
std::vector<std::vector<std::string>> id_groups;
std::vector<std::string> temp_group = {root_collection};
int64_t count = 1;
for (auto& id : partition_id_array) {
temp_group.push_back(id);
count++;
if (count >= batch_size) {
id_groups.emplace_back(temp_group);
temp_group.clear();
count = 0;
}
}
if (!temp_group.empty()) {
id_groups.emplace_back(temp_group);
}
// perform query batch by batch
int64_t files_count = 0;
Status ret;
for (auto group : id_groups) {
mysqlpp::StoreQueryResult res;
{
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
bool is_null_connection = (connectionPtr == nullptr);
if (is_null_connection) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement
<< "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date"
<< " FROM " << META_TABLEFILES << " WHERE table_id in (";
for (size_t i = 0; i < group.size(); i++) {
statement << mysqlpp::quote << group[i];
if (i != group.size() - 1) {
statement << ",";
}
}
statement << ")";
// End
statement << " AND"
<< " (file_type = " << std::to_string(SegmentSchema::RAW)
<< " OR file_type = " << std::to_string(SegmentSchema::TO_INDEX)
<< " OR file_type = " << std::to_string(SegmentSchema::INDEX) << ");";
LOG_ENGINE_DEBUG_ << "FilesToSearch: " << statement.str();
res = statement.store();
} // Scoped Connection
for (auto& resRow : res) {
SegmentSchema collection_file;
collection_file.id_ = resRow["id"]; // implicit conversion
resRow["table_id"].to_string(collection_file.collection_id_);
resRow["segment_id"].to_string(collection_file.segment_id_);
collection_file.index_file_size_ = collection_schema.index_file_size_;
collection_file.engine_type_ = resRow["engine_type"];
collection_file.index_params_ = collection_schema.index_params_;
collection_file.metric_type_ = collection_schema.metric_type_;
resRow["file_id"].to_string(collection_file.file_id_);
collection_file.file_type_ = resRow["file_type"];
collection_file.file_size_ = resRow["file_size"];
collection_file.row_count_ = resRow["row_count"];
collection_file.date_ = resRow["date"];
collection_file.dimension_ = collection_schema.dimension_;
auto status = utils::GetCollectionFilePath(options_, collection_file);
if (!status.ok()) {
ret = status;
continue;
}
files_holder.MarkFile(collection_file);
files_count++;
}
}
if (files_count == 0) {
LOG_ENGINE_DEBUG_ << "No file to search for collection: " << root_collection;
} else {
LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-search files in collection " << root_collection;
}
return ret;
} catch (std::exception& e) {
return HandleException("Failed to get files to search", e.what());
}
}
Status
MySQLMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) {
try {
......
......@@ -15,6 +15,7 @@
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <vector>
......@@ -109,6 +110,10 @@ class MySQLMetaImpl : public Meta {
Status
FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) override;
Status
FilesToSearchEx(const std::string& root_collection, const std::set<std::string>& partition_id_array,
FilesHolder& files_holder) override;
Status
FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) override;
......
......@@ -286,13 +286,14 @@ SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not
decltype(ConnectorPtr->select(select_columns)) selected;
if (is_root) {
selected = ConnectorPtr->select(select_columns,
where(c(&CollectionSchema::collection_id_) == collection_id
and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE
and c(&CollectionSchema::owner_collection_) == ""));
where(c(&CollectionSchema::collection_id_) == collection_id
and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE
and c(&CollectionSchema::owner_collection_) == ""));
} else {
selected = ConnectorPtr->select(select_columns,
where(c(&CollectionSchema::collection_id_) == collection_id
and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
where(c(&CollectionSchema::collection_id_) == collection_id
and c(&CollectionSchema::state_)
!= (int)CollectionSchema::TO_DELETE));
}
if (selected.size() == 1) {
......@@ -1118,6 +1119,99 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& fil
}
}
Status
SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection,
const std::set<std::string>& partition_id_array,
FilesHolder& files_holder) {
try {
server::MetricCollector metric;
// get root collection information
CollectionSchema collection_schema;
collection_schema.collection_id_ = root_collection;
auto status = DescribeCollection(collection_schema);
if (!status.ok()) {
return status;
}
// distribute id array to batchs
const int64_t batch_size = 50;
std::vector<std::vector<std::string>> id_groups;
std::vector<std::string> temp_group = {root_collection};
int64_t count = 1;
for (auto& id : partition_id_array) {
temp_group.push_back(id);
count++;
if (count >= batch_size) {
id_groups.emplace_back(temp_group);
temp_group.clear();
count = 0;
}
}
if (!temp_group.empty()) {
id_groups.emplace_back(temp_group);
}
// perform query batch by batch
int64_t files_count = 0;
Status ret;
for (auto group : id_groups) {
auto select_columns =
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_);
auto match_collectionid = in(&SegmentSchema::collection_id_, group);
std::vector<int> file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX,
(int)SegmentSchema::INDEX};
auto match_type = in(&SegmentSchema::file_type_, file_types);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto filter = where(match_collectionid and match_type);
selected = ConnectorPtr->select(select_columns, filter);
}
for (auto& file : selected) {
SegmentSchema collection_file;
collection_file.id_ = std::get<0>(file);
collection_file.collection_id_ = std::get<1>(file);
collection_file.segment_id_ = std::get<2>(file);
collection_file.file_id_ = std::get<3>(file);
collection_file.file_type_ = std::get<4>(file);
collection_file.file_size_ = std::get<5>(file);
collection_file.row_count_ = std::get<6>(file);
collection_file.date_ = std::get<7>(file);
collection_file.engine_type_ = std::get<8>(file);
collection_file.dimension_ = collection_schema.dimension_;
collection_file.index_file_size_ = collection_schema.index_file_size_;
collection_file.index_params_ = collection_schema.index_params_;
collection_file.metric_type_ = collection_schema.metric_type_;
auto status = utils::GetCollectionFilePath(options_, collection_file);
if (!status.ok()) {
ret = status;
continue;
}
files_holder.MarkFile(collection_file);
files_count++;
}
}
if (files_count == 0) {
LOG_ENGINE_DEBUG_ << "No file to search for collection: " << root_collection;
} else {
LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-search files in collection " << root_collection;
}
return ret;
} catch (std::exception& e) {
return HandleException("Encounter exception when iterate index files", e.what());
}
}
Status
SqliteMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) {
try {
......@@ -1315,29 +1409,21 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id,
file_schema.metric_type_ = collection_schema.metric_type_;
switch (file_schema.file_type_) {
case (int)SegmentSchema::RAW:
++raw_count;
case (int)SegmentSchema::RAW:++raw_count;
break;
case (int)SegmentSchema::NEW:
++new_count;
case (int)SegmentSchema::NEW:++new_count;
break;
case (int)SegmentSchema::NEW_MERGE:
++new_merge_count;
case (int)SegmentSchema::NEW_MERGE:++new_merge_count;
break;
case (int)SegmentSchema::NEW_INDEX:
++new_index_count;
case (int)SegmentSchema::NEW_INDEX:++new_index_count;
break;
case (int)SegmentSchema::TO_INDEX:
++to_index_count;
case (int)SegmentSchema::TO_INDEX:++to_index_count;
break;
case (int)SegmentSchema::INDEX:
++index_count;
case (int)SegmentSchema::INDEX:++index_count;
break;
case (int)SegmentSchema::BACKUP:
++backup_count;
break;
default:
case (int)SegmentSchema::BACKUP:++backup_count;
break;
default:break;
}
auto status = utils::GetCollectionFilePath(options_, file_schema);
......@@ -1351,11 +1437,9 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id,
std::string msg = "Get collection files by type.";
for (int file_type : file_types) {
switch (file_type) {
case (int)SegmentSchema::RAW:
msg = msg + " raw files:" + std::to_string(raw_count);
case (int)SegmentSchema::RAW:msg = msg + " raw files:" + std::to_string(raw_count);
break;
case (int)SegmentSchema::NEW:
msg = msg + " new files:" + std::to_string(new_count);
case (int)SegmentSchema::NEW:msg = msg + " new files:" + std::to_string(new_count);
break;
case (int)SegmentSchema::NEW_MERGE:
msg = msg + " new_merge files:" + std::to_string(new_merge_count);
......@@ -1363,17 +1447,13 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id,
case (int)SegmentSchema::NEW_INDEX:
msg = msg + " new_index files:" + std::to_string(new_index_count);
break;
case (int)SegmentSchema::TO_INDEX:
msg = msg + " to_index files:" + std::to_string(to_index_count);
break;
case (int)SegmentSchema::INDEX:
msg = msg + " index files:" + std::to_string(index_count);
case (int)SegmentSchema::TO_INDEX:msg = msg + " to_index files:" + std::to_string(to_index_count);
break;
case (int)SegmentSchema::BACKUP:
msg = msg + " backup files:" + std::to_string(backup_count);
case (int)SegmentSchema::INDEX:msg = msg + " index files:" + std::to_string(index_count);
break;
default:
case (int)SegmentSchema::BACKUP:msg = msg + " backup files:" + std::to_string(backup_count);
break;
default:break;
}
}
LOG_ENGINE_DEBUG_ << msg;
......
......@@ -12,6 +12,7 @@
#pragma once
#include <mutex>
#include <set>
#include <string>
#include <vector>
......@@ -111,6 +112,10 @@ class SqliteMetaImpl : public Meta {
Status
FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) override;
Status
FilesToSearchEx(const std::string& root_collection, const std::set<std::string>& partition_id_array,
FilesHolder& files_holder) override;
Status
FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) override;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册