diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index e96b921fb7e397c2d12e83bbe85347de657279ef..c88936a08edf333442a2dad85f343ce0f42f9b09 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -791,18 +791,17 @@ DBImpl::BackgroundCompaction(std::set table_ids) { meta_ptr_->Archive(); - meta::Table2FileIDs ignore_files = ongoing_files_checker_.GetOngoingFiles(); { - uint64_t ttl = 10 * meta::SECOND; // default: file data will be erase from cache after few seconds - meta_ptr_->CleanUpCacheWithTTL(ttl, ignore_files); + uint64_t ttl = 1 * meta::SECOND; // default: file data will be erase from cache after few seconds + meta_ptr_->CleanUpCacheWithTTL(ttl, &ongoing_files_checker_); } { - uint64_t ttl = 20 * meta::SECOND; // default: file will be deleted after few seconds + uint64_t ttl = 1 * meta::SECOND; // default: file will be deleted after few seconds if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) { ttl = meta::H_SEC; } - meta_ptr_->CleanUpFilesWithTTL(ttl, ignore_files); + meta_ptr_->CleanUpFilesWithTTL(ttl, &ongoing_files_checker_); } // ENGINE_LOG_TRACE << " Background compaction thread exit"; diff --git a/core/src/db/IndexFailedChecker.cpp b/core/src/db/IndexFailedChecker.cpp index d1bb753294b659afa1ea4244488ef12531868c60..bdd152dc4ba5302df3fb9a1f7dd24e2620a48ae9 100644 --- a/core/src/db/IndexFailedChecker.cpp +++ b/core/src/db/IndexFailedChecker.cpp @@ -38,7 +38,7 @@ IndexFailedChecker::GetFailedIndexFileOfTable(const std::string& table_id, std:: std::lock_guard lck(mutex_); auto iter = index_failed_files_.find(table_id); if (iter != index_failed_files_.end()) { - FileID2FailedTimes& failed_map = iter->second; + meta::File2RefCount& failed_map = iter->second; for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) { failed_files.push_back(it_file->first); } @@ -53,7 +53,7 @@ IndexFailedChecker::MarkFailedIndexFile(const meta::TableFileSchema& file) { auto iter = index_failed_files_.find(file.table_id_); if (iter == index_failed_files_.end()) { - FileID2FailedTimes failed_files; + meta::File2RefCount failed_files; failed_files.insert(std::make_pair(file.file_id_, 1)); index_failed_files_.insert(std::make_pair(file.table_id_, failed_files)); } else { diff --git a/core/src/db/IndexFailedChecker.h b/core/src/db/IndexFailedChecker.h index cdcc34a76dce0cda7497a12c901a3de4ed75f329..edd6a114bfe7881eb2a35eb7e1e2b7335a32bf8f 100644 --- a/core/src/db/IndexFailedChecker.h +++ b/core/src/db/IndexFailedChecker.h @@ -47,9 +47,7 @@ class IndexFailedChecker { private: std::mutex mutex_; - using FileID2FailedTimes = std::map; - using Table2FailedFiles = std::map; - Table2FailedFiles index_failed_files_; // file id mapping to failed times + meta::Table2Files index_failed_files_; // table id mapping to (file id mapping to failed times) }; } // namespace engine diff --git a/core/src/db/OngoingFileChecker.cpp b/core/src/db/OngoingFileChecker.cpp index 3462621d3c1e1322218549b103e51371befe4d39..daaf6a2a4fa49e617a20ef09ae8be42847fde22c 100644 --- a/core/src/db/OngoingFileChecker.cpp +++ b/core/src/db/OngoingFileChecker.cpp @@ -16,6 +16,7 @@ // under the License. #include "db/OngoingFileChecker.h" +#include "utils/Log.h" #include @@ -56,11 +57,21 @@ OngoingFileChecker::UnmarkOngoingFiles(const meta::TableFilesSchema& table_files return Status::OK(); } -meta::Table2FileIDs -OngoingFileChecker::GetOngoingFiles() { - // return copy - // don't return reference(avoid multi-threads conflict) - return ongoing_files_; +bool +OngoingFileChecker::IsIgnored(const meta::TableFileSchema& schema) { + std::lock_guard lck(mutex_); + + auto iter = ongoing_files_.find(schema.table_id_); + if (iter == ongoing_files_.end()) { + return false; + } else { + auto it_file = iter->second.find(schema.file_id_); + if (it_file == iter->second.end()) { + return false; + } else { + return (it_file->second > 0); + } + } } Status @@ -71,12 +82,21 @@ OngoingFileChecker::MarkOngoingFileNoLock(const meta::TableFileSchema& table_fil auto iter = ongoing_files_.find(table_file.table_id_); if (iter == ongoing_files_.end()) { - meta::FileIDArray file_ids = {table_file.file_id_}; - ongoing_files_.insert(std::make_pair(table_file.table_id_, file_ids)); + meta::File2RefCount files_refcount; + files_refcount.insert(std::make_pair(table_file.file_id_, 1)); + ongoing_files_.insert(std::make_pair(table_file.table_id_, files_refcount)); } else { - iter->second.insert(table_file.file_id_); + auto it_file = iter->second.find(table_file.file_id_); + if (it_file == iter->second.end()) { + iter->second[table_file.file_id_] = 1; + } else { + it_file->second++; + } } + ENGINE_LOG_DEBUG << "Mark ongoing file:" << table_file.file_id_ + << " refcount:" << ongoing_files_[table_file.table_id_][table_file.file_id_]; + return Status::OK(); } @@ -94,6 +114,9 @@ OngoingFileChecker::UnmarkOngoingFileNoLock(const meta::TableFileSchema& table_f } } + ENGINE_LOG_DEBUG << "Mark ongoing file:" << table_file.file_id_ + << " refcount:" << ongoing_files_[table_file.table_id_][table_file.file_id_]; + return Status::OK(); } diff --git a/core/src/db/OngoingFileChecker.h b/core/src/db/OngoingFileChecker.h index 8f21a4504589f3c844dbfd417d19b34237f70349..c7995e425ae7673effae9f2b7e6fb36665584d73 100644 --- a/core/src/db/OngoingFileChecker.h +++ b/core/src/db/OngoingFileChecker.h @@ -28,7 +28,7 @@ namespace milvus { namespace engine { -class OngoingFileChecker { +class OngoingFileChecker : public meta::Meta::CleanUpFilter { public: Status MarkOngoingFile(const meta::TableFileSchema& table_file); @@ -42,8 +42,8 @@ class OngoingFileChecker { Status UnmarkOngoingFiles(const meta::TableFilesSchema& table_files); - meta::Table2FileIDs - GetOngoingFiles(); + bool + IsIgnored(const meta::TableFileSchema& schema) override; private: Status @@ -54,7 +54,7 @@ class OngoingFileChecker { private: std::mutex mutex_; - meta::Table2FileIDs ongoing_files_; + meta::Table2Files ongoing_files_; // table id mapping to (file id mapping to ongoing ref-count) }; } // namespace engine diff --git a/core/src/db/meta/Meta.h b/core/src/db/meta/Meta.h index f620087ad5b5fbad2ca5e68b6dce7e2ddbb0c895..8dabb58203392217257b0eb5aad5a89c998d0f45 100644 --- a/core/src/db/meta/Meta.h +++ b/core/src/db/meta/Meta.h @@ -35,6 +35,13 @@ static const char* META_TABLES = "Tables"; static const char* META_TABLEFILES = "TableFiles"; class Meta { + public: + class CleanUpFilter { + public: + virtual bool + IsIgnored(const TableFileSchema& schema) = 0; + }; + public: virtual ~Meta() = default; @@ -121,10 +128,10 @@ class Meta { CleanUpShadowFiles() = 0; virtual Status - CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) = 0; + CleanUpCacheWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) = 0; virtual Status - CleanUpFilesWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) = 0; + CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) = 0; virtual Status DropAll() = 0; diff --git a/core/src/db/meta/MetaTypes.h b/core/src/db/meta/MetaTypes.h index 3e286589834dc8dc22fa6e60d1f81145c7991a7f..f8c082032e1c9dc2b368f594221704b08ac68d5a 100644 --- a/core/src/db/meta/MetaTypes.h +++ b/core/src/db/meta/MetaTypes.h @@ -98,8 +98,8 @@ using TableFileSchemaPtr = std::shared_ptr; using TableFilesSchema = std::vector; using DatePartionedTableFilesSchema = std::map; -using FileIDArray = std::set; -using Table2FileIDs = std::map; +using File2RefCount = std::map; +using Table2Files = std::map; } // namespace meta } // namespace engine diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index 4e2f3b47e171620d37ea7f03636d909d31ea23e0..1fab7a0f776c0bbb743d63b0d271828a32edca27 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -1783,7 +1783,7 @@ MySQLMetaImpl::CleanUpShadowFiles() { } Status -MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) { +MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds, CleanUpFilter* filter) { auto now = utils::GetMicroSecTimeStamp(); // erase deleted/backup files from cache @@ -1813,14 +1813,11 @@ MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignore resRow["file_id"].to_string(table_file.file_id_); table_file.date_ = resRow["date"]; - // check if the file can be deleted - auto iter = ignore_files.find(table_file.table_id_); - if (iter != ignore_files.end()) { - if (iter->second.find(table_file.file_id_) != iter->second.end()) { - ENGINE_LOG_DEBUG << "File:" << table_file.file_id_ - << " currently is in use, not able to erase from cache now"; - continue; // ignore this file, don't delete it - } + // check if the file can be erased + if (filter && filter->IsIgnored(table_file)) { + ENGINE_LOG_DEBUG << "File:" << table_file.file_id_ + << " currently is in use, not able to erase from cache now"; + continue; // ignore this file, don't erase it } // erase file data from cache @@ -1835,7 +1832,7 @@ MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignore } Status -MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) { +MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { auto now = utils::GetMicroSecTimeStamp(); std::set table_ids; @@ -1869,13 +1866,10 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, const Table2FileIDs& ignore table_file.date_ = resRow["date"]; // check if the file can be deleted - auto iter = ignore_files.find(table_file.table_id_); - if (iter != ignore_files.end()) { - if (iter->second.find(table_file.file_id_) != iter->second.end()) { - ENGINE_LOG_DEBUG << "File:" << table_file.file_id_ - << " currently is in use, not able to delete now"; - continue; // ignore this file, don't delete it - } + if (filter && filter->IsIgnored(table_file)) { + ENGINE_LOG_DEBUG << "File:" << table_file.file_id_ + << " currently is in use, not able to delete now"; + continue; // ignore this file, don't delete it } // delete file from disk storage diff --git a/core/src/db/meta/MySQLMetaImpl.h b/core/src/db/meta/MySQLMetaImpl.h index d13ed7e0438ae7cca3457eca0229a6db73d28648..d2a94f6cd37e77ad6a9be8d30c2f5017d30264b7 100644 --- a/core/src/db/meta/MySQLMetaImpl.h +++ b/core/src/db/meta/MySQLMetaImpl.h @@ -120,10 +120,10 @@ class MySQLMetaImpl : public Meta { CleanUpShadowFiles() override; Status - CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) override; + CleanUpCacheWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) override; Status - CleanUpFilesWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) override; + CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) override; Status DropAll() override; diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index b2f4102dde084c38cc0f3c80cb47f0840a3e5c61..9b63f122da53278bad913e383de5203c998765f7 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -1294,7 +1294,7 @@ SqliteMetaImpl::CleanUpShadowFiles() { } Status -SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) { +SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds, CleanUpFilter* filter) { auto now = utils::GetMicroSecTimeStamp(); // erase deleted/backup files from cache @@ -1327,14 +1327,11 @@ SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignor table_file.file_id_ = std::get<2>(file); table_file.date_ = std::get<3>(file); - // check if the file can be deleted - auto iter = ignore_files.find(table_file.table_id_); - if (iter != ignore_files.end()) { - if (iter->second.find(table_file.file_id_) != iter->second.end()) { - ENGINE_LOG_DEBUG << "File:" << table_file.file_id_ - << " currently is in use, not able to erase from cache now"; - continue; // ignore this file, don't delete it - } + // check if the file can be erased + if (filter && filter->IsIgnored(table_file)) { + ENGINE_LOG_DEBUG << "File:" << table_file.file_id_ + << " currently is in use, not able to erase from cache now"; + continue; // ignore this file, don't erase it } // erase file data from cache @@ -1350,7 +1347,7 @@ SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignor } Status -SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) { +SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { auto now = utils::GetMicroSecTimeStamp(); std::set table_ids; @@ -1382,13 +1379,10 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, const Table2FileIDs& ignor table_file.date_ = std::get<3>(file); // check if the file can be deleted - auto iter = ignore_files.find(table_file.table_id_); - if (iter != ignore_files.end()) { - if (iter->second.find(table_file.file_id_) != iter->second.end()) { - ENGINE_LOG_DEBUG << "File:" << table_file.file_id_ - << " currently is in use, not able to delete now"; - continue; // ignore this file, don't delete it - } + if (filter && filter->IsIgnored(table_file)) { + ENGINE_LOG_DEBUG << "File:" << table_file.file_id_ + << " currently is in use, not able to delete now"; + continue; // ignore this file, don't delete it } // delete file from meta diff --git a/core/src/db/meta/SqliteMetaImpl.h b/core/src/db/meta/SqliteMetaImpl.h index 4ef45420225c9651d6d364e45e23b5238a12d30d..16bbc0a2054469c4f2b207b72418cc226974f45c 100644 --- a/core/src/db/meta/SqliteMetaImpl.h +++ b/core/src/db/meta/SqliteMetaImpl.h @@ -120,10 +120,10 @@ class SqliteMetaImpl : public Meta { CleanUpShadowFiles() override; Status - CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) override; + CleanUpCacheWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) override; Status - CleanUpFilesWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) override; + CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) override; Status DropAll() override; diff --git a/core/unittest/db/test_meta.cpp b/core/unittest/db/test_meta.cpp index c2a9f8e8bc0d6ef74c391cd778ed6ca84c7c0e6b..b89c73c29675b2c359210fda126e140e5989fbcd 100644 --- a/core/unittest/db/test_meta.cpp +++ b/core/unittest/db/test_meta.cpp @@ -335,8 +335,7 @@ TEST_F(MetaTest, TABLE_FILES_TEST) { status = impl_->DropTable(table_id); ASSERT_TRUE(status.ok()); - milvus::engine::meta::Table2FileIDs ignore_files; - status = impl_->CleanUpFilesWithTTL(1UL, ignore_files); + status = impl_->CleanUpFilesWithTTL(1UL); ASSERT_TRUE(status.ok()); } diff --git a/core/unittest/db/test_meta_mysql.cpp b/core/unittest/db/test_meta_mysql.cpp index cdd8aa32a816fdf2c31b36c4b7d37cd870ade4d7..9a52a01b7bad36eeaef1afdaa4ee1129b3dc2462 100644 --- a/core/unittest/db/test_meta_mysql.cpp +++ b/core/unittest/db/test_meta_mysql.cpp @@ -349,8 +349,7 @@ TEST_F(MySqlMetaTest, TABLE_FILES_TEST) { status = impl_->DropTable(table_id); ASSERT_TRUE(status.ok()); - milvus::engine::meta::Table2FileIDs ignore_files; - status = impl_->CleanUpFilesWithTTL(0UL, ignore_files); + status = impl_->CleanUpFilesWithTTL(0UL); ASSERT_TRUE(status.ok()); } diff --git a/core/unittest/db/test_misc.cpp b/core/unittest/db/test_misc.cpp index 1ed02afb53161ab8beae10dbcfa53d297cccdd10..3e9d597c3fefbbabf52392720f06ca32339d4607 100644 --- a/core/unittest/db/test_misc.cpp +++ b/core/unittest/db/test_misc.cpp @@ -20,6 +20,7 @@ #include "db/Options.h" #include "db/Utils.h" #include "db/engine/EngineFactory.h" +#include "db/meta/MetaTypes.h" #include "db/meta/SqliteMetaImpl.h" #include "utils/Exception.h" #include "utils/Status.h" @@ -161,25 +162,21 @@ TEST(DBMiscTest, CHECKER_TEST) { schema.file_id_ = "5000"; checker.MarkOngoingFile(schema); - auto ongoing_files = checker.GetOngoingFiles(); - ASSERT_EQ(ongoing_files.size(), 1UL); + ASSERT_TRUE(checker.IsIgnored(schema)); schema.table_id_ = "bbb"; schema.file_id_ = "5001"; milvus::engine::meta::TableFilesSchema table_files = {schema}; checker.MarkOngoingFiles(table_files); - ongoing_files = checker.GetOngoingFiles(); - ASSERT_EQ(ongoing_files.size(), 2UL); + ASSERT_TRUE(checker.IsIgnored(schema)); checker.UnmarkOngoingFile(schema); - ongoing_files = checker.GetOngoingFiles(); - ASSERT_EQ(ongoing_files.size(), 1UL); + ASSERT_FALSE(checker.IsIgnored(schema)); schema.table_id_ = "aaa"; schema.file_id_ = "5000"; checker.UnmarkOngoingFile(schema); - ongoing_files = checker.GetOngoingFiles(); - ASSERT_EQ(ongoing_files.size(), 0UL); + ASSERT_FALSE(checker.IsIgnored(schema)); } }