提交 640e9141 编写于 作者: S starlord

MS-275 Avoid sqlite logic error excetion


Former-commit-id: d6ecaf2f32ed6c75370e6ed2bd24779cc18e76ff
上级 d7cbf940
...@@ -37,6 +37,8 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -37,6 +37,8 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-260 - Refine log - MS-260 - Refine log
- MS-249 - Check machine hardware during initialize - MS-249 - Check machine hardware during initialize
- MS-261 - Update faiss version to 1.5.3 and add BUILD_FAISS_WITH_MKL as an option - MS-261 - Update faiss version to 1.5.3 and add BUILD_FAISS_WITH_MKL as an option
- MS-266 - Improve topk reduce time by using multi-threads
- MS-275 - Avoid sqlite logic error excetion
## New Feature ## New Feature
- MS-180 - Add new mem manager - MS-180 - Add new mem manager
......
...@@ -109,7 +109,7 @@ Status DBMetaImpl::Initialize() { ...@@ -109,7 +109,7 @@ Status DBMetaImpl::Initialize() {
auto ret = boost::filesystem::create_directory(options_.path); auto ret = boost::filesystem::create_directory(options_.path);
if (!ret) { if (!ret) {
ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path; ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path;
return Status::DBTransactionError("Failed to create db directory", options_.path); return Status::InvalidDBPath("Failed to create db directory", options_.path);
} }
} }
...@@ -147,6 +147,9 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id, ...@@ -147,6 +147,9 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id,
} }
} }
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
ConnectorPtr->update_all( ConnectorPtr->update_all(
set( set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
...@@ -167,6 +170,9 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) { ...@@ -167,6 +170,9 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
try { try {
MetricCollector metric; MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
if (table_schema.table_id_ == "") { if (table_schema.table_id_ == "") {
NextTableId(table_schema.table_id_); NextTableId(table_schema.table_id_);
} else { } else {
...@@ -190,6 +196,7 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) { ...@@ -190,6 +196,7 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
auto id = ConnectorPtr->insert(table_schema); auto id = ConnectorPtr->insert(table_schema);
table_schema.id_ = id; table_schema.id_ = id;
} catch (...) { } catch (...) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Add Table Error"); return Status::DBTransactionError("Add Table Error");
} }
...@@ -206,6 +213,9 @@ Status DBMetaImpl::DeleteTable(const std::string& table_id) { ...@@ -206,6 +213,9 @@ Status DBMetaImpl::DeleteTable(const std::string& table_id) {
try { try {
MetricCollector metric; MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
//soft delete table //soft delete table
auto tables = ConnectorPtr->select(columns(&TableSchema::id_, auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::files_cnt_, &TableSchema::files_cnt_,
...@@ -238,6 +248,9 @@ Status DBMetaImpl::DeleteTableFiles(const std::string& table_id) { ...@@ -238,6 +248,9 @@ Status DBMetaImpl::DeleteTableFiles(const std::string& table_id) {
try { try {
MetricCollector metric; MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
//soft delete table files //soft delete table files
ConnectorPtr->update_all( ConnectorPtr->update_all(
set( set(
...@@ -383,6 +396,9 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) { ...@@ -383,6 +396,9 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time_ = file_schema.created_on_; file_schema.updated_time_ = file_schema.created_on_;
file_schema.engine_type_ = table_schema.engine_type_; file_schema.engine_type_ = table_schema.engine_type_;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto id = ConnectorPtr->insert(file_schema); auto id = ConnectorPtr->insert(file_schema);
file_schema.id_ = id; file_schema.id_ = id;
...@@ -649,6 +665,9 @@ Status DBMetaImpl::Archive() { ...@@ -649,6 +665,9 @@ Status DBMetaImpl::Archive() {
long usecs = limit * D_SEC * US_PS; long usecs = limit * D_SEC * US_PS;
long now = utils::GetMicroSecTimeStamp(); long now = utils::GetMicroSecTimeStamp();
try { try {
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
ConnectorPtr->update_all( ConnectorPtr->update_all(
set( set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
...@@ -710,6 +729,9 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) { ...@@ -710,6 +729,9 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) {
try { try {
MetricCollector metric; MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto commited = ConnectorPtr->transaction([&]() mutable { auto commited = ConnectorPtr->transaction([&]() mutable {
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::size_), &TableFileSchema::size_),
...@@ -748,6 +770,7 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) { ...@@ -748,6 +770,7 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) {
}); });
if (!commited) { if (!commited) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Update table file error"); return Status::DBTransactionError("Update table file error");
} }
...@@ -763,6 +786,9 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { ...@@ -763,6 +786,9 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
try { try {
MetricCollector metric; MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto tables = ConnectorPtr->select(columns(&TableSchema::state_), auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
where(c(&TableSchema::table_id_) == file_schema.table_id_)); where(c(&TableSchema::table_id_) == file_schema.table_id_));
...@@ -784,6 +810,11 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { ...@@ -784,6 +810,11 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
Status DBMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) { Status DBMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
try { try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
ConnectorPtr->update_all( ConnectorPtr->update_all(
set( set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX
...@@ -803,6 +834,9 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) { ...@@ -803,6 +834,9 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try { try {
MetricCollector metric; MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
std::map<std::string, bool> has_tables; std::map<std::string, bool> has_tables;
for (auto &file : files) { for (auto &file : files) {
if(has_tables.find(file.table_id_) != has_tables.end()) { if(has_tables.find(file.table_id_) != has_tables.end()) {
...@@ -831,6 +865,7 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) { ...@@ -831,6 +865,7 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
}); });
if (!commited) { if (!commited) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Update table files error"); return Status::DBTransactionError("Update table files error");
} }
...@@ -845,6 +880,9 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { ...@@ -845,6 +880,9 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
try { try {
MetricCollector metric; MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_, auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_, &TableFileSchema::table_id_,
&TableFileSchema::file_id_, &TableFileSchema::file_id_,
...@@ -873,6 +911,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { ...@@ -873,6 +911,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
}); });
if (!commited) { if (!commited) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Clean files error"); return Status::DBTransactionError("Clean files error");
} }
...@@ -883,6 +922,9 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { ...@@ -883,6 +922,9 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
try { try {
MetricCollector metric; MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto tables = ConnectorPtr->select(columns(&TableSchema::id_, auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::table_id_), &TableSchema::table_id_),
where(c(&TableSchema::state_) == (int) TableSchema::TO_DELETE)); where(c(&TableSchema::state_) == (int) TableSchema::TO_DELETE));
...@@ -897,6 +939,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { ...@@ -897,6 +939,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
}); });
if (!commited) { if (!commited) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Clean files error"); return Status::DBTransactionError("Clean files error");
} }
...@@ -909,6 +952,11 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { ...@@ -909,6 +952,11 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
Status DBMetaImpl::CleanUp() { Status DBMetaImpl::CleanUp() {
try { try {
MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_), auto files = ConnectorPtr->select(columns(&TableFileSchema::id_),
where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW)); where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW));
...@@ -921,6 +969,7 @@ Status DBMetaImpl::CleanUp() { ...@@ -921,6 +969,7 @@ Status DBMetaImpl::CleanUp() {
}); });
if (!commited) { if (!commited) {
ENGINE_LOG_ERROR << "sqlite transaction failed";
return Status::DBTransactionError("Clean files error"); return Status::DBTransactionError("Clean files error");
} }
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include "Meta.h" #include "Meta.h"
#include "Options.h" #include "Options.h"
#include <mutex>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -94,6 +95,8 @@ class DBMetaImpl : public Meta { ...@@ -94,6 +95,8 @@ class DBMetaImpl : public Meta {
Status Initialize(); Status Initialize();
const DBMetaOptions options_; const DBMetaOptions options_;
std::mutex meta_mutex_;
}; // DBMetaImpl }; // DBMetaImpl
} // namespace meta } // namespace meta
......
...@@ -77,10 +77,10 @@ std::shared_ptr<meta::Meta> DBMetaImplFactory::Build(const DBMetaOptions& metaOp ...@@ -77,10 +77,10 @@ std::shared_ptr<meta::Meta> DBMetaImplFactory::Build(const DBMetaOptions& metaOp
std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower); std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower);
if (dialect.find("mysql") != std::string::npos) { if (dialect.find("mysql") != std::string::npos) {
ENGINE_LOG_INFO << "Using MySQL"; ENGINE_LOG_INFO << "Using MySQL";
return std::make_shared<meta::MySQLMetaImpl>(meta::MySQLMetaImpl(metaOptions, mode)); return std::make_shared<meta::MySQLMetaImpl>(metaOptions, mode);
} else if (dialect.find("sqlite") != std::string::npos) { } else if (dialect.find("sqlite") != std::string::npos) {
ENGINE_LOG_INFO << "Using SQLite"; ENGINE_LOG_INFO << "Using SQLite";
return std::make_shared<meta::DBMetaImpl>(meta::DBMetaImpl(metaOptions)); return std::make_shared<meta::DBMetaImpl>(metaOptions);
} else { } else {
ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << dialect; ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << dialect;
throw InvalidArgumentException("URI dialect is not mysql / sqlite"); throw InvalidArgumentException("URI dialect is not mysql / sqlite");
......
...@@ -113,7 +113,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) { ...@@ -113,7 +113,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
ss << "days:" << days_num; ss << "days:" << days_num;
options.archive_conf = ArchiveConf("delete", ss.str()); options.archive_conf = ArchiveConf("delete", ss.str());
auto impl = meta::DBMetaImpl(options); meta::DBMetaImpl impl(options);
auto table_id = "meta_test_table"; auto table_id = "meta_test_table";
meta::TableSchema table; meta::TableSchema table;
...@@ -163,7 +163,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) { ...@@ -163,7 +163,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
options.path = "/tmp/milvus_test"; options.path = "/tmp/milvus_test";
options.archive_conf = ArchiveConf("delete", "disk:11"); options.archive_conf = ArchiveConf("delete", "disk:11");
auto impl = meta::DBMetaImpl(options); meta::DBMetaImpl impl(options);
auto table_id = "meta_test_group"; auto table_id = "meta_test_group";
meta::TableSchema table; meta::TableSchema table;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册