提交 d98f0549 编写于 作者: J jinhai

Merge branch 'branch-0.3.1' into 'branch-0.3.1'

MS-275  Avoid sqlite logic error excetion

See merge request megasearch/milvus!278

Former-commit-id: 2bd20e7b00d755590f3ab3819067be88d687b7a9
......@@ -37,6 +37,8 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-260 - Refine log
- MS-249 - Check machine hardware during initialize
- MS-261 - Update faiss version to 1.5.3 and add BUILD_FAISS_WITH_MKL as an option
- MS-266 - Improve topk reduce time by using multi-threads
- MS-275 - Avoid sqlite logic error excetion
- MS-278 - add IndexStatsHelper
## New Feature
......
......@@ -109,7 +109,7 @@ Status DBMetaImpl::Initialize() {
auto ret = boost::filesystem::create_directory(options_.path);
if (!ret) {
ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path;
return Status::DBTransactionError("Failed to create db directory", options_.path);
return Status::InvalidDBPath("Failed to create db directory", options_.path);
}
}
......@@ -147,6 +147,9 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id,
}
}
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
......@@ -167,6 +170,9 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
if (table_schema.table_id_ == "") {
NextTableId(table_schema.table_id_);
} else {
......@@ -190,6 +196,7 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
auto id = ConnectorPtr->insert(table_schema);
table_schema.id_ = id;
} catch (...) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Add Table Error");
}
......@@ -206,6 +213,9 @@ Status DBMetaImpl::DeleteTable(const std::string& table_id) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
//soft delete table
auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::files_cnt_,
......@@ -238,6 +248,9 @@ Status DBMetaImpl::DeleteTableFiles(const std::string& table_id) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
//soft delete table files
ConnectorPtr->update_all(
set(
......@@ -383,6 +396,9 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time_ = file_schema.created_on_;
file_schema.engine_type_ = table_schema.engine_type_;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto id = ConnectorPtr->insert(file_schema);
file_schema.id_ = id;
......@@ -649,6 +665,9 @@ Status DBMetaImpl::Archive() {
long usecs = limit * D_SEC * US_PS;
long now = utils::GetMicroSecTimeStamp();
try {
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
......@@ -710,6 +729,9 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto commited = ConnectorPtr->transaction([&]() mutable {
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::size_),
......@@ -748,6 +770,7 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) {
});
if (!commited) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Update table file error");
}
......@@ -763,6 +786,9 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
where(c(&TableSchema::table_id_) == file_schema.table_id_));
......@@ -784,6 +810,11 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
Status DBMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX
......@@ -803,6 +834,9 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
std::map<std::string, bool> has_tables;
for (auto &file : files) {
if(has_tables.find(file.table_id_) != has_tables.end()) {
......@@ -831,6 +865,7 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
});
if (!commited) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Update table files error");
}
......@@ -845,6 +880,9 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
......@@ -873,6 +911,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
});
if (!commited) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Clean files error");
}
......@@ -883,6 +922,9 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::table_id_),
where(c(&TableSchema::state_) == (int) TableSchema::TO_DELETE));
......@@ -897,6 +939,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
});
if (!commited) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Clean files error");
}
......@@ -909,6 +952,11 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
Status DBMetaImpl::CleanUp() {
try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_),
where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW));
......@@ -921,6 +969,7 @@ Status DBMetaImpl::CleanUp() {
});
if (!commited) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Clean files error");
}
......
......@@ -8,6 +8,7 @@
#include "Meta.h"
#include "Options.h"
#include <mutex>
namespace zilliz {
namespace milvus {
......@@ -94,6 +95,8 @@ class DBMetaImpl : public Meta {
Status Initialize();
const DBMetaOptions options_;
std::mutex meta_mutex_;
}; // DBMetaImpl
} // namespace meta
......
......@@ -77,10 +77,10 @@ std::shared_ptr<meta::Meta> DBMetaImplFactory::Build(const DBMetaOptions& metaOp
std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower);
if (dialect.find("mysql") != std::string::npos) {
ENGINE_LOG_INFO << "Using MySQL";
return std::make_shared<meta::MySQLMetaImpl>(meta::MySQLMetaImpl(metaOptions, mode));
return std::make_shared<meta::MySQLMetaImpl>(metaOptions, mode);
} else if (dialect.find("sqlite") != std::string::npos) {
ENGINE_LOG_INFO << "Using SQLite";
return std::make_shared<meta::DBMetaImpl>(meta::DBMetaImpl(metaOptions));
return std::make_shared<meta::DBMetaImpl>(metaOptions);
} else {
ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << dialect;
throw InvalidArgumentException("URI dialect is not mysql / sqlite");
......
......@@ -217,7 +217,7 @@ TEST_F(DBTest, SEARCH_TEST) {
{//search by specify index file
engine::meta::DatesT dates;
std::vector<std::string> file_ids = {"1", "2", "3", "4"};
std::vector<std::string> file_ids = {"4", "5"};
engine::QueryResults results;
stat = db_->Query(TABLE_NAME, file_ids, k, nq, xq.data(), dates, results);
ASSERT_STATS(stat);
......
......@@ -113,7 +113,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
ss << "days:" << days_num;
options.archive_conf = ArchiveConf("delete", ss.str());
auto impl = meta::DBMetaImpl(options);
meta::DBMetaImpl impl(options);
auto table_id = "meta_test_table";
meta::TableSchema table;
......@@ -163,7 +163,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
options.path = "/tmp/milvus_test";
options.archive_conf = ArchiveConf("delete", "disk:11");
auto impl = meta::DBMetaImpl(options);
meta::DBMetaImpl impl(options);
auto table_id = "meta_test_group";
meta::TableSchema table;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册