From 640e9141c6e0a08ccd1e8b35b621f93fe4932e54 Mon Sep 17 00:00:00 2001 From: starlord Date: Fri, 26 Jul 2019 10:54:49 +0800 Subject: [PATCH] MS-275 Avoid sqlite logic error excetion Former-commit-id: d6ecaf2f32ed6c75370e6ed2bd24779cc18e76ff --- cpp/CHANGELOG.md | 2 ++ cpp/src/db/DBMetaImpl.cpp | 51 +++++++++++++++++++++++++++++++++- cpp/src/db/DBMetaImpl.h | 3 ++ cpp/src/db/Factories.cpp | 4 +-- cpp/unittest/db/meta_tests.cpp | 4 +-- 5 files changed, 59 insertions(+), 5 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index b326293e..10fbc904 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -37,6 +37,8 @@ Please mark all change in change log and use the ticket from JIRA. - MS-260 - Refine log - MS-249 - Check machine hardware during initialize - MS-261 - Update faiss version to 1.5.3 and add BUILD_FAISS_WITH_MKL as an option +- MS-266 - Improve topk reduce time by using multi-threads +- MS-275 - Avoid sqlite logic error excetion ## New Feature - MS-180 - Add new mem manager diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index 56f741c4..61492021 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -109,7 +109,7 @@ Status DBMetaImpl::Initialize() { auto ret = boost::filesystem::create_directory(options_.path); if (!ret) { ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path; - return Status::DBTransactionError("Failed to create db directory", options_.path); + return Status::InvalidDBPath("Failed to create db directory", options_.path); } } @@ -147,6 +147,9 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id, } } + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + ConnectorPtr->update_all( set( c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE @@ -167,6 +170,9 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) { try { MetricCollector metric; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + if (table_schema.table_id_ == "") { NextTableId(table_schema.table_id_); } else { @@ -190,6 +196,7 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) { auto id = ConnectorPtr->insert(table_schema); table_schema.id_ = id; } catch (...) { + ENGINE_LOG_ERROR << "sqlite transaction failed"; return Status::DBTransactionError("Add Table Error"); } @@ -206,6 +213,9 @@ Status DBMetaImpl::DeleteTable(const std::string& table_id) { try { MetricCollector metric; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + //soft delete table auto tables = ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::files_cnt_, @@ -238,6 +248,9 @@ Status DBMetaImpl::DeleteTableFiles(const std::string& table_id) { try { MetricCollector metric; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + //soft delete table files ConnectorPtr->update_all( set( @@ -383,6 +396,9 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) { file_schema.updated_time_ = file_schema.created_on_; file_schema.engine_type_ = table_schema.engine_type_; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + auto id = ConnectorPtr->insert(file_schema); file_schema.id_ = id; @@ -649,6 +665,9 @@ Status DBMetaImpl::Archive() { long usecs = limit * D_SEC * US_PS; long now = utils::GetMicroSecTimeStamp(); try { + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + ConnectorPtr->update_all( set( c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE @@ -710,6 +729,9 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) { try { MetricCollector metric; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + auto commited = ConnectorPtr->transaction([&]() mutable { auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::size_), @@ -748,6 +770,7 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) { }); if (!commited) { + ENGINE_LOG_ERROR << "sqlite transaction failed"; return Status::DBTransactionError("Update table file error"); } @@ -763,6 +786,9 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { try { MetricCollector metric; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + auto tables = ConnectorPtr->select(columns(&TableSchema::state_), where(c(&TableSchema::table_id_) == file_schema.table_id_)); @@ -784,6 +810,11 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { Status DBMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) { try { + MetricCollector metric; + + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + ConnectorPtr->update_all( set( c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX @@ -803,6 +834,9 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) { try { MetricCollector metric; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + std::map has_tables; for (auto &file : files) { if(has_tables.find(file.table_id_) != has_tables.end()) { @@ -831,6 +865,7 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) { }); if (!commited) { + ENGINE_LOG_ERROR << "sqlite transaction failed"; return Status::DBTransactionError("Update table files error"); } @@ -845,6 +880,9 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { try { MetricCollector metric; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + auto files = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_, @@ -873,6 +911,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { }); if (!commited) { + ENGINE_LOG_ERROR << "sqlite transaction failed"; return Status::DBTransactionError("Clean files error"); } @@ -883,6 +922,9 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { try { MetricCollector metric; + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + auto tables = ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::table_id_), where(c(&TableSchema::state_) == (int) TableSchema::TO_DELETE)); @@ -897,6 +939,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { }); if (!commited) { + ENGINE_LOG_ERROR << "sqlite transaction failed"; return Status::DBTransactionError("Clean files error"); } @@ -909,6 +952,11 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { Status DBMetaImpl::CleanUp() { try { + MetricCollector metric; + + //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + auto files = ConnectorPtr->select(columns(&TableFileSchema::id_), where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW)); @@ -921,6 +969,7 @@ Status DBMetaImpl::CleanUp() { }); if (!commited) { + ENGINE_LOG_ERROR << "sqlite transaction failed"; return Status::DBTransactionError("Clean files error"); } diff --git a/cpp/src/db/DBMetaImpl.h b/cpp/src/db/DBMetaImpl.h index 6187ad7e..ada8a9a4 100644 --- a/cpp/src/db/DBMetaImpl.h +++ b/cpp/src/db/DBMetaImpl.h @@ -8,6 +8,7 @@ #include "Meta.h" #include "Options.h" +#include namespace zilliz { namespace milvus { @@ -94,6 +95,8 @@ class DBMetaImpl : public Meta { Status Initialize(); const DBMetaOptions options_; + + std::mutex meta_mutex_; }; // DBMetaImpl } // namespace meta diff --git a/cpp/src/db/Factories.cpp b/cpp/src/db/Factories.cpp index 442dca29..22d4760b 100644 --- a/cpp/src/db/Factories.cpp +++ b/cpp/src/db/Factories.cpp @@ -77,10 +77,10 @@ std::shared_ptr DBMetaImplFactory::Build(const DBMetaOptions& metaOp std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower); if (dialect.find("mysql") != std::string::npos) { ENGINE_LOG_INFO << "Using MySQL"; - return std::make_shared(meta::MySQLMetaImpl(metaOptions, mode)); + return std::make_shared(metaOptions, mode); } else if (dialect.find("sqlite") != std::string::npos) { ENGINE_LOG_INFO << "Using SQLite"; - return std::make_shared(meta::DBMetaImpl(metaOptions)); + return std::make_shared(metaOptions); } else { ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << dialect; throw InvalidArgumentException("URI dialect is not mysql / sqlite"); diff --git a/cpp/unittest/db/meta_tests.cpp b/cpp/unittest/db/meta_tests.cpp index 5bce4058..5fe9ca43 100644 --- a/cpp/unittest/db/meta_tests.cpp +++ b/cpp/unittest/db/meta_tests.cpp @@ -113,7 +113,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) { ss << "days:" << days_num; options.archive_conf = ArchiveConf("delete", ss.str()); - auto impl = meta::DBMetaImpl(options); + meta::DBMetaImpl impl(options); auto table_id = "meta_test_table"; meta::TableSchema table; @@ -163,7 +163,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) { options.path = "/tmp/milvus_test"; options.archive_conf = ArchiveConf("delete", "disk:11"); - auto impl = meta::DBMetaImpl(options); + meta::DBMetaImpl impl(options); auto table_id = "meta_test_group"; meta::TableSchema table; -- GitLab