提交 49407f7c 编写于 作者: K kun yu

Merge remote-tracking branch 'upstream/branch-0.3.1' into without-grpc-vers

DFHN


Former-commit-id: 67e882809754fa6e6df0c5536978524830c0301e
......@@ -37,6 +37,8 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-260 - Refine log
- MS-249 - Check machine hardware during initialize
- MS-261 - Update faiss version to 1.5.3 and add BUILD_FAISS_WITH_MKL as an option
- MS-266 - Improve topk reduce time by using multi-threads
- MS-275 - Avoid sqlite logic error excetion
- MS-278 - add IndexStatsHelper
## New Feature
......
......@@ -217,6 +217,7 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
uint64_t k, uint64_t nq, const float* vectors,
const meta::DatesT& dates, QueryResults& results) {
auto start_time = METRICS_NOW_TIME;
server::TimeRecorder rc("");
//step 1: get files to search
......@@ -259,6 +260,11 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
results = context->GetResult();
rc.ElapseFromBegin("Engine query totally cost");
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
CollectQueryMetrics(total_time, nq);
return Status::OK();
}
......@@ -354,7 +360,7 @@ void DBImpl::StartCompactionTask() {
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
const meta::TableFilesSchema& files) {
ENGINE_LOG_DEBUG << "Merge files for table" << table_id;
ENGINE_LOG_DEBUG << "Merge files for table " << table_id;
meta::TableFileSchema table_file;
table_file.table_id_ = table_id;
......@@ -421,13 +427,15 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
bool has_merge = false;
for (auto& kv : raw_files) {
auto files = kv.second;
if (files.size() <= options_.merge_trigger_number) {
if (files.size() < options_.merge_trigger_number) {
ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
continue;
}
has_merge = true;
MergeFiles(table_id, kv.first, kv.second);
if (shutting_down_.load(std::memory_order_acquire)){
ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table " << table_id;
break;
}
}
......@@ -445,6 +453,11 @@ void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
continue;//let other table get chance to merge
}
if (shutting_down_.load(std::memory_order_acquire)){
ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
break;
}
}
meta_ptr_->Archive();
......@@ -578,6 +591,11 @@ Status DBImpl::BuildIndexByTable(const std::string& table_id) {
return status;
}
ENGINE_LOG_DEBUG << "Sync building index for " << file.id_ << " passed";
if (shutting_down_.load(std::memory_order_acquire)){
ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action for table " << table_id;
break;
}
}
return status;
......@@ -598,6 +616,7 @@ void DBImpl::BackgroundBuildIndex() {
}
if (shutting_down_.load(std::memory_order_acquire)){
ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
break;
}
}
......
......@@ -109,7 +109,7 @@ Status DBMetaImpl::Initialize() {
auto ret = boost::filesystem::create_directory(options_.path);
if (!ret) {
ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path;
return Status::DBTransactionError("Failed to create db directory", options_.path);
return Status::InvalidDBPath("Failed to create db directory", options_.path);
}
}
......@@ -147,6 +147,9 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id,
}
}
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
......@@ -167,6 +170,9 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
if (table_schema.table_id_ == "") {
NextTableId(table_schema.table_id_);
} else {
......@@ -190,6 +196,7 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
auto id = ConnectorPtr->insert(table_schema);
table_schema.id_ = id;
} catch (...) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Add Table Error");
}
......@@ -206,6 +213,9 @@ Status DBMetaImpl::DeleteTable(const std::string& table_id) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
//soft delete table
auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::files_cnt_,
......@@ -238,6 +248,9 @@ Status DBMetaImpl::DeleteTableFiles(const std::string& table_id) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
//soft delete table files
ConnectorPtr->update_all(
set(
......@@ -383,6 +396,9 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time_ = file_schema.created_on_;
file_schema.engine_type_ = table_schema.engine_type_;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto id = ConnectorPtr->insert(file_schema);
file_schema.id_ = id;
......@@ -649,6 +665,9 @@ Status DBMetaImpl::Archive() {
long usecs = limit * D_SEC * US_PS;
long now = utils::GetMicroSecTimeStamp();
try {
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
......@@ -710,6 +729,9 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto commited = ConnectorPtr->transaction([&]() mutable {
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::size_),
......@@ -748,6 +770,7 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) {
});
if (!commited) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Update table file error");
}
......@@ -763,6 +786,9 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
where(c(&TableSchema::table_id_) == file_schema.table_id_));
......@@ -784,6 +810,11 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
Status DBMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX
......@@ -803,6 +834,9 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
std::map<std::string, bool> has_tables;
for (auto &file : files) {
if(has_tables.find(file.table_id_) != has_tables.end()) {
......@@ -831,6 +865,7 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
});
if (!commited) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Update table files error");
}
......@@ -845,6 +880,9 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
......@@ -873,6 +911,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
});
if (!commited) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Clean files error");
}
......@@ -883,6 +922,9 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::table_id_),
where(c(&TableSchema::state_) == (int) TableSchema::TO_DELETE));
......@@ -897,6 +939,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
});
if (!commited) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Clean files error");
}
......@@ -909,6 +952,11 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
Status DBMetaImpl::CleanUp() {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_),
where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW));
......@@ -921,6 +969,7 @@ Status DBMetaImpl::CleanUp() {
});
if (!commited) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Clean files error");
}
......
......@@ -8,6 +8,7 @@
#include "Meta.h"
#include "Options.h"
#include <mutex>
namespace zilliz {
namespace milvus {
......@@ -94,6 +95,8 @@ class DBMetaImpl : public Meta {
Status Initialize();
const DBMetaOptions options_;
std::mutex meta_mutex_;
}; // DBMetaImpl
} // namespace meta
......
......@@ -77,10 +77,10 @@ std::shared_ptr<meta::Meta> DBMetaImplFactory::Build(const DBMetaOptions& metaOp
std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower);
if (dialect.find("mysql") != std::string::npos) {
ENGINE_LOG_INFO << "Using MySQL";
return std::make_shared<meta::MySQLMetaImpl>(meta::MySQLMetaImpl(metaOptions, mode));
return std::make_shared<meta::MySQLMetaImpl>(metaOptions, mode);
} else if (dialect.find("sqlite") != std::string::npos) {
ENGINE_LOG_INFO << "Using SQLite";
return std::make_shared<meta::DBMetaImpl>(meta::DBMetaImpl(metaOptions));
return std::make_shared<meta::DBMetaImpl>(metaOptions);
} else {
ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << dialect;
throw InvalidArgumentException("URI dialect is not mysql / sqlite");
......
......@@ -217,7 +217,7 @@ TEST_F(DBTest, SEARCH_TEST) {
{//search by specify index file
engine::meta::DatesT dates;
std::vector<std::string> file_ids = {"1", "2", "3", "4"};
std::vector<std::string> file_ids = {"4", "5"};
engine::QueryResults results;
stat = db_->Query(TABLE_NAME, file_ids, k, nq, xq.data(), dates, results);
ASSERT_STATS(stat);
......
......@@ -113,7 +113,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
ss << "days:" << days_num;
options.archive_conf = ArchiveConf("delete", ss.str());
auto impl = meta::DBMetaImpl(options);
meta::DBMetaImpl impl(options);
auto table_id = "meta_test_table";
meta::TableSchema table;
......@@ -163,7 +163,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
options.path = "/tmp/milvus_test";
options.archive_conf = ArchiveConf("delete", "disk:11");
auto impl = meta::DBMetaImpl(options);
meta::DBMetaImpl impl(options);
auto table_id = "meta_test_group";
meta::TableSchema table;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册