From 6e6d06ca4a6dd88c764c706fd457012cc8c6144d Mon Sep 17 00:00:00 2001 From: groot Date: Wed, 11 Mar 2020 09:30:39 +0800 Subject: [PATCH] #1598 Server down during mixed operations Signed-off-by: groot --- core/src/db/DBImpl.cpp | 10 +++---- core/src/db/DBImpl.h | 2 +- core/src/db/insert/MemTable.cpp | 2 +- core/src/db/meta/Meta.h | 3 ++ core/src/db/meta/MySQLMetaImpl.cpp | 40 +++++++++++++++++++++++++++ core/src/db/meta/MySQLMetaImpl.h | 3 ++ core/src/db/meta/SqliteMetaImpl.cpp | 20 ++++++++++++++ core/src/db/meta/SqliteMetaImpl.h | 3 ++ core/unittest/db/test_db.cpp | 8 +++--- core/unittest/db/test_meta.cpp | 41 ++++++++++++++++++++++++++++ core/unittest/db/test_meta_mysql.cpp | 41 ++++++++++++++++++++++++++++ 11 files changed, 162 insertions(+), 11 deletions(-) diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index bab1f8361..5bc7ae50b 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -1058,7 +1058,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) { // step 4: wait and build index status = index_failed_checker_.CleanFailedIndexFileOfTable(table_id); - status = BuildTableIndexRecursively(table_id, index); + status = WaitTableIndexRecursively(table_id, index); return status; } @@ -1738,7 +1738,7 @@ DBImpl::UpdateTableIndexRecursively(const std::string& table_id, const TableInde } Status -DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex& index) { +DBImpl::WaitTableIndexRecursively(const std::string& table_id, const TableIndex& index) { // for IDMAP type, only wait all NEW file converted to RAW file // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files std::vector file_types; @@ -1779,8 +1779,8 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex std::vector partition_array; status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& schema : partition_array) { - status = BuildTableIndexRecursively(schema.table_id_, index); - fiu_do_on("DBImpl.BuildTableIndexRecursively.fail_build_table_Index_for_partition", + status = WaitTableIndexRecursively(schema.table_id_, index); + fiu_do_on("DBImpl.WaitTableIndexRecursively.fail_build_table_Index_for_partition", status = Status(DB_ERROR, "")); if (!status.ok()) { return status; @@ -1790,7 +1790,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex // failed to build index for some files, return error std::string err_msg; index_failed_checker_.GetErrMsgForTable(table_id, err_msg); - fiu_do_on("DBImpl.BuildTableIndexRecursively.not_empty_err_msg", err_msg.append("fiu")); + fiu_do_on("DBImpl.WaitTableIndexRecursively.not_empty_err_msg", err_msg.append("fiu")); if (!err_msg.empty()) { return Status(DB_ERROR, err_msg); } diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index 79c7e75a8..bdec4b349 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -216,7 +216,7 @@ class DBImpl : public DB, public server::CacheConfigHandler { UpdateTableIndexRecursively(const std::string& table_id, const TableIndex& index); Status - BuildTableIndexRecursively(const std::string& table_id, const TableIndex& index); + WaitTableIndexRecursively(const std::string& table_id, const TableIndex& index); Status DropTableIndexRecursively(const std::string& table_id); diff --git a/core/src/db/insert/MemTable.cpp b/core/src/db/insert/MemTable.cpp index f91efbabe..f0bf8c94d 100644 --- a/core/src/db/insert/MemTable.cpp +++ b/core/src/db/insert/MemTable.cpp @@ -364,7 +364,7 @@ MemTable::ApplyDeletes() { } } - status = meta_->UpdateTableFiles(table_files_to_update); + status = meta_->UpdateTableFilesRowCount(table_files_to_update); if (!status.ok()) { std::string err_msg = "Failed to apply deletes: " + status.ToString(); diff --git a/core/src/db/meta/Meta.h b/core/src/db/meta/Meta.h index b7cba78ca..8c2f237e2 100644 --- a/core/src/db/meta/Meta.h +++ b/core/src/db/meta/Meta.h @@ -87,6 +87,9 @@ class Meta { virtual Status UpdateTableFiles(TableFilesSchema& files) = 0; + virtual Status + UpdateTableFilesRowCount(TableFilesSchema& files) = 0; + virtual Status UpdateTableIndex(const std::string& table_id, const TableIndex& index) = 0; diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index ca59d8279..3539f1f1b 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -1240,6 +1240,46 @@ MySQLMetaImpl::UpdateTableFiles(TableFilesSchema& files) { return Status::OK(); } +Status +MySQLMetaImpl::UpdateTableFilesRowCount(TableFilesSchema& files) { + try { + server::MetricCollector metric; + { + mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_); + + bool is_null_connection = (connectionPtr == nullptr); + if (is_null_connection) { + return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); + } + + mysqlpp::Query updateTableFilesQuery = connectionPtr->query(); + + for (auto& file : files) { + std::string row_count = std::to_string(file.row_count_); + std::string updated_time = std::to_string(utils::GetMicroSecTimeStamp()); + + updateTableFilesQuery << "UPDATE " << META_TABLEFILES << " SET row_count = " << row_count + << " , updated_time = " << updated_time << " WHERE file_id = " << file.file_id_ + << ";"; + + ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFilesRowCount: " << updateTableFilesQuery.str(); + + if (!updateTableFilesQuery.exec()) { + return HandleException("QUERY ERROR WHEN UPDATING TABLE FILES", updateTableFilesQuery.error()); + } + + ENGINE_LOG_DEBUG << "Update file " << file.file_id_ << " row count to " << file.row_count_; + } + } // Scoped Connection + + ENGINE_LOG_DEBUG << "Update " << files.size() << " table files"; + } catch (std::exception& e) { + return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES ROW COUNT", e.what()); + } + + return Status::OK(); +} + Status MySQLMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& index) { try { diff --git a/core/src/db/meta/MySQLMetaImpl.h b/core/src/db/meta/MySQLMetaImpl.h index 452e04f36..5f6535165 100644 --- a/core/src/db/meta/MySQLMetaImpl.h +++ b/core/src/db/meta/MySQLMetaImpl.h @@ -82,6 +82,9 @@ class MySQLMetaImpl : public Meta { Status UpdateTableFiles(TableFilesSchema& files) override; + Status + UpdateTableFilesRowCount(TableFilesSchema& files) override; + Status DescribeTableIndex(const std::string& table_id, TableIndex& index) override; diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index 622d36175..5bf44f977 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -685,6 +685,26 @@ SqliteMetaImpl::UpdateTableFiles(TableFilesSchema& files) { return Status::OK(); } +Status +SqliteMetaImpl::UpdateTableFilesRowCount(TableFilesSchema& files) { + try { + server::MetricCollector metric; + + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + + for (auto& file : files) { + ConnectorPtr->update_all(set(c(&TableFileSchema::row_count_) = file.row_count_, + c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()), + where(c(&TableFileSchema::file_id_) == file.file_id_)); + ENGINE_LOG_DEBUG << "Update file " << file.file_id_ << " row count to " << file.row_count_; + } + } catch (std::exception& e) { + return HandleException("Encounter exception when update table files row count", e.what()); + } + return Status::OK(); +} + Status SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex& index) { try { diff --git a/core/src/db/meta/SqliteMetaImpl.h b/core/src/db/meta/SqliteMetaImpl.h index e0b596da6..0b65525e6 100644 --- a/core/src/db/meta/SqliteMetaImpl.h +++ b/core/src/db/meta/SqliteMetaImpl.h @@ -81,6 +81,9 @@ class SqliteMetaImpl : public Meta { Status UpdateTableFiles(TableFilesSchema& files) override; + Status + UpdateTableFilesRowCount(TableFilesSchema& files) override; + Status DescribeTableIndex(const std::string& table_id, TableIndex& index) override; diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index 459fa24ae..41946003a 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -836,15 +836,15 @@ TEST_F(DBTest, PARTITION_TEST) { ASSERT_TRUE(stat.ok()); fiu_init(0); - FIU_ENABLE_FIU("DBImpl.BuildTableIndexRecursively.fail_build_table_Index_for_partition"); + FIU_ENABLE_FIU("DBImpl.WaitTableIndexRecursively.fail_build_table_Index_for_partition"); stat = db_->CreateIndex(table_info.table_id_, index); ASSERT_FALSE(stat.ok()); - fiu_disable("DBImpl.BuildTableIndexRecursively.fail_build_table_Index_for_partition"); + fiu_disable("DBImpl.WaitTableIndexRecursively.fail_build_table_Index_for_partition"); - FIU_ENABLE_FIU("DBImpl.BuildTableIndexRecursively.not_empty_err_msg"); + FIU_ENABLE_FIU("DBImpl.WaitTableIndexRecursively.not_empty_err_msg"); stat = db_->CreateIndex(table_info.table_id_, index); ASSERT_FALSE(stat.ok()); - fiu_disable("DBImpl.BuildTableIndexRecursively.not_empty_err_msg"); + fiu_disable("DBImpl.WaitTableIndexRecursively.not_empty_err_msg"); uint64_t row_count = 0; stat = db_->GetTableRowCount(TABLE_NAME, row_count); diff --git a/core/unittest/db/test_meta.cpp b/core/unittest/db/test_meta.cpp index 820e45967..a0a81f2e0 100644 --- a/core/unittest/db/test_meta.cpp +++ b/core/unittest/db/test_meta.cpp @@ -383,6 +383,47 @@ TEST_F(MetaTest, TABLE_FILE_TEST) { ASSERT_EQ(table_file.file_type_, new_file_type); } +TEST_F(MetaTest, TABLE_FILE_ROW_COUNT_TEST) { + auto table_id = "row_count_test_table"; + + milvus::engine::meta::TableSchema table; + table.table_id_ = table_id; + table.dimension_ = 256; + auto status = impl_->CreateTable(table); + + milvus::engine::meta::TableFileSchema table_file; + table_file.row_count_ = 100; + table_file.table_id_ = table.table_id_; + table_file.file_type_ = 1; + status = impl_->CreateTableFile(table_file); + + uint64_t cnt = 0; + status = impl_->Count(table_id, cnt); + ASSERT_EQ(table_file.row_count_, cnt); + + table_file.row_count_ = 99999; + milvus::engine::meta::TableFilesSchema table_files = {table_file}; + status = impl_->UpdateTableFilesRowCount(table_files); + ASSERT_TRUE(status.ok()); + + cnt = 0; + status = impl_->Count(table_id, cnt); + ASSERT_EQ(table_file.row_count_, cnt); + + std::vector ids = {table_file.id_}; + milvus::engine::meta::TableFilesSchema schemas; + status = impl_->GetTableFiles(table_id, ids, schemas); + ASSERT_EQ(schemas.size(), 1UL); + ASSERT_EQ(table_file.row_count_, schemas[0].row_count_); + ASSERT_EQ(table_file.file_id_, schemas[0].file_id_); + ASSERT_EQ(table_file.file_type_, schemas[0].file_type_); + ASSERT_EQ(table_file.segment_id_, schemas[0].segment_id_); + ASSERT_EQ(table_file.table_id_, schemas[0].table_id_); + ASSERT_EQ(table_file.engine_type_, schemas[0].engine_type_); + ASSERT_EQ(table_file.dimension_, schemas[0].dimension_); + ASSERT_EQ(table_file.flush_lsn_, schemas[0].flush_lsn_); +} + TEST_F(MetaTest, ARCHIVE_TEST_DAYS) { srand(time(0)); milvus::engine::DBMetaOptions options; diff --git a/core/unittest/db/test_meta_mysql.cpp b/core/unittest/db/test_meta_mysql.cpp index 75937e245..6411eee53 100644 --- a/core/unittest/db/test_meta_mysql.cpp +++ b/core/unittest/db/test_meta_mysql.cpp @@ -271,6 +271,47 @@ TEST_F(MySqlMetaTest, TABLE_FILE_TEST) { ASSERT_TRUE(status.ok()); } +TEST_F(MySqlMetaTest, TABLE_FILE_ROW_COUNT_TEST) { + auto table_id = "row_count_test_table"; + + milvus::engine::meta::TableSchema table; + table.table_id_ = table_id; + table.dimension_ = 256; + auto status = impl_->CreateTable(table); + + milvus::engine::meta::TableFileSchema table_file; + table_file.row_count_ = 100; + table_file.table_id_ = table.table_id_; + table_file.file_type_ = 1; + status = impl_->CreateTableFile(table_file); + + uint64_t cnt = 0; + status = impl_->Count(table_id, cnt); + ASSERT_EQ(table_file.row_count_, cnt); + + table_file.row_count_ = 99999; + milvus::engine::meta::TableFilesSchema table_files = {table_file}; + status = impl_->UpdateTableFilesRowCount(table_files); + ASSERT_TRUE(status.ok()); + + cnt = 0; + status = impl_->Count(table_id, cnt); + ASSERT_EQ(table_file.row_count_, cnt); + + std::vector ids = {table_file.id_}; + milvus::engine::meta::TableFilesSchema schemas; + status = impl_->GetTableFiles(table_id, ids, schemas); + ASSERT_EQ(schemas.size(), 1UL); + ASSERT_EQ(table_file.row_count_, schemas[0].row_count_); + ASSERT_EQ(table_file.file_id_, schemas[0].file_id_); + ASSERT_EQ(table_file.file_type_, schemas[0].file_type_); + ASSERT_EQ(table_file.segment_id_, schemas[0].segment_id_); + ASSERT_EQ(table_file.table_id_, schemas[0].table_id_); + ASSERT_EQ(table_file.engine_type_, schemas[0].engine_type_); + ASSERT_EQ(table_file.dimension_, schemas[0].dimension_); + ASSERT_EQ(table_file.flush_lsn_, schemas[0].flush_lsn_); +} + TEST_F(MySqlMetaTest, ARCHIVE_TEST_DAYS) { fiu_init(0); -- GitLab