diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c6847b9240adc6cdd4d73718d19a74388ad8720..7083cc12eb590740e6a2dc3eabee75d5c9a004f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#486 - gpu no usage during index building - \#509 - IVF_PQ index build trapped into dead loop caused by invalid params - \#513 - Unittest DELETE_BY_RANGE sometimes failed +- \#523 - Erase file data from cache once the file is marked as deleted - \#527 - faiss benchmark not compatible with faiss 1.6.0 - \#530 - BuildIndex stop when do build index and search simultaneously - \#532 - assigin value to `table_name` from confest shell diff --git a/core/src/cache/Cache.inl b/core/src/cache/Cache.inl index 9ac7ff21e6a48b2ae7500bcc4a0e564afca2ae66..9ebec7cfdd7fdf547cf2e0d36d7e6431670717b6 100644 --- a/core/src/cache/Cache.inl +++ b/core/src/cache/Cache.inl @@ -99,8 +99,8 @@ Cache::insert(const std::string& key, const ItemObj& item) { std::lock_guard lock(mutex_); lru_.put(key, item); - SERVER_LOG_DEBUG << "Insert " << key << " size:" << item->Size() << " bytes into cache, usage: " << usage_ - << " bytes"; + SERVER_LOG_DEBUG << "Insert " << key << " size: " << item->Size() << " bytes into cache, usage: " << usage_ + << " bytes," << " capacity: " << capacity_ << " bytes"; } } @@ -115,7 +115,8 @@ Cache::erase(const std::string& key) { const ItemObj& old_item = lru_.get(key); usage_ -= old_item->Size(); - SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->Size(); + SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->Size() << " bytes from cache, usage: " << usage_ + << " bytes," << " capacity: " << capacity_ << " bytes"; lru_.erase(key); } diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 1c18c224096d5ead5ce82c56457742247c1d3f47..50cf4614ca753f7586cf4ababea2d86cd3898841 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -112,7 +112,7 @@ DBImpl::Stop() { bg_timer_thread_.join(); if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) { - meta_ptr_->CleanUp(); + meta_ptr_->CleanUpShadowFiles(); } // ENGINE_LOG_TRACE << "DB service stop"; @@ -777,11 +777,18 @@ DBImpl::BackgroundCompaction(std::set table_ids) { meta_ptr_->Archive(); - int ttl = 5 * meta::M_SEC; // default: file will be deleted after 5 minutes - if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) { - ttl = meta::D_SEC; + { + uint64_t ttl = 10 * meta::SECOND; // default: file data will be erase from cache after few seconds + meta_ptr_->CleanUpCacheWithTTL(ttl); + } + + { + uint64_t ttl = 5 * meta::M_SEC; // default: file will be deleted after few minutes + if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) { + ttl = meta::D_SEC; + } + meta_ptr_->CleanUpFilesWithTTL(ttl); } - meta_ptr_->CleanUpFilesWithTTL(ttl); // ENGINE_LOG_TRACE << " Background compaction thread exit"; } diff --git a/core/src/db/engine/ExecutionEngineImpl.cpp b/core/src/db/engine/ExecutionEngineImpl.cpp index c0ab4e829e59839cb8d8baad6dbae473596dcf35..0f92b34eca3334db21913827ebdcff9b67eac41d 100644 --- a/core/src/db/engine/ExecutionEngineImpl.cpp +++ b/core/src/db/engine/ExecutionEngineImpl.cpp @@ -257,6 +257,11 @@ ExecutionEngineImpl::PhysicalSize() const { Status ExecutionEngineImpl::Serialize() { auto status = write_index(index_, location_); + + // here we reset index size by file size, + // since some index type(such as SQ8) data size become smaller after serialized + index_->set_size(PhysicalSize()); + return status; } diff --git a/core/src/db/meta/Meta.h b/core/src/db/meta/Meta.h index 52fe86fe6975cb2fa397776361f1db8602726d71..bf46f02fea5efe7b8ba41646373d8af576b362af 100644 --- a/core/src/db/meta/Meta.h +++ b/core/src/db/meta/Meta.h @@ -118,9 +118,13 @@ class Meta { Archive() = 0; virtual Status - CleanUp() = 0; + CleanUpShadowFiles() = 0; - virtual Status CleanUpFilesWithTTL(uint16_t) = 0; + virtual Status + CleanUpCacheWithTTL(uint64_t seconds) = 0; + + virtual Status + CleanUpFilesWithTTL(uint64_t seconds) = 0; virtual Status DropAll() = 0; diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index 6d13cad2487b015e2f1043898ab96014bb1e10ee..2fb5eb0f3c20c17aa96cf2bcde2045e5bc286267 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -20,6 +20,7 @@ #include "db/IDGenerator.h" #include "db/Utils.h" #include "metrics/Metrics.h" +#include "utils/CommonUtil.h" #include "utils/Exception.h" #include "utils/Log.h" #include "utils/StringHelpFunctions.h" @@ -292,7 +293,7 @@ MySQLMetaImpl::Initialize() { // step 5: create meta tables try { if (mode_ != DBOptions::MODE::CLUSTER_READONLY) { - CleanUp(); + CleanUpShadowFiles(); } { @@ -1710,7 +1711,7 @@ MySQLMetaImpl::Size(uint64_t& result) { } Status -MySQLMetaImpl::CleanUp() { +MySQLMetaImpl::CleanUpShadowFiles() { try { mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_); @@ -1752,7 +1753,49 @@ MySQLMetaImpl::CleanUp() { } Status -MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { +MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) { + auto now = utils::GetMicroSecTimeStamp(); + + // erase deleted/backup files from cache + try { + server::MetricCollector metric; + + mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_); + + if (connectionPtr == nullptr) { + return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); + } + + mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query(); + cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date" + << " FROM " << META_TABLEFILES << " WHERE file_type IN (" + << std::to_string(TableFileSchema::TO_DELETE) << "," + << std::to_string(TableFileSchema::BACKUP) << ")" + << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";"; + + mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); + + TableFileSchema table_file; + std::vector idsToDelete; + + for (auto& resRow : res) { + table_file.id_ = resRow["id"]; // implicit conversion + resRow["table_id"].to_string(table_file.table_id_); + resRow["file_id"].to_string(table_file.file_id_); + table_file.date_ = resRow["date"]; + + utils::GetTableFilePath(options_, table_file); + server::CommonUtil::EraseFromCache(table_file.location_); + } + } catch (std::exception& e) { + return HandleException("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", e.what()); + } + + return Status::OK(); +} + +Status +MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { auto now = utils::GetMicroSecTimeStamp(); std::set table_ids; diff --git a/core/src/db/meta/MySQLMetaImpl.h b/core/src/db/meta/MySQLMetaImpl.h index dd882fca2e04d1017b70bd3ab9234d7d7adba6f4..e7697316af8653a902b04bdadb2e0ac224b4c1cd 100644 --- a/core/src/db/meta/MySQLMetaImpl.h +++ b/core/src/db/meta/MySQLMetaImpl.h @@ -117,10 +117,13 @@ class MySQLMetaImpl : public Meta { Size(uint64_t& result) override; Status - CleanUp() override; + CleanUpShadowFiles() override; Status - CleanUpFilesWithTTL(uint16_t seconds) override; + CleanUpCacheWithTTL(uint64_t seconds) override; + + Status + CleanUpFilesWithTTL(uint64_t seconds) override; Status DropAll() override; diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index 74460c1b4db0d3de4c30540ba266642e1383b4fa..24d5d78bad08f4d636ed114c2c0c665b94fafa65 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -20,6 +20,7 @@ #include "db/IDGenerator.h" #include "db/Utils.h" #include "metrics/Metrics.h" +#include "utils/CommonUtil.h" #include "utils/Exception.h" #include "utils/Log.h" #include "utils/StringHelpFunctions.h" @@ -154,7 +155,7 @@ SqliteMetaImpl::Initialize() { ConnectorPtr->open_forever(); // thread safe option ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log - CleanUp(); + CleanUpShadowFiles(); return Status::OK(); } @@ -1231,7 +1232,7 @@ SqliteMetaImpl::Size(uint64_t& result) { } Status -SqliteMetaImpl::CleanUp() { +SqliteMetaImpl::CleanUpShadowFiles() { try { server::MetricCollector metric; @@ -1269,7 +1270,51 @@ SqliteMetaImpl::CleanUp() { } Status -SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { +SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) { + auto now = utils::GetMicroSecTimeStamp(); + + // erase deleted/backup files from cache + try { + server::MetricCollector metric; + + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + + std::vector file_types = { + (int)TableFileSchema::TO_DELETE, + (int)TableFileSchema::BACKUP, + }; + + auto files = ConnectorPtr->select(columns(&TableFileSchema::id_, + &TableFileSchema::table_id_, + &TableFileSchema::file_id_, + &TableFileSchema::date_), + where( + in(&TableFileSchema::file_type_, file_types) + and + c(&TableFileSchema::updated_time_) + < now - seconds * US_PS)); + + for (auto& file : files) { + TableFileSchema table_file; + table_file.id_ = std::get<0>(file); + table_file.table_id_ = std::get<1>(file); + table_file.file_id_ = std::get<2>(file); + table_file.date_ = std::get<3>(file); + + utils::GetTableFilePath(options_, table_file); + server::CommonUtil::EraseFromCache(table_file.location_); + } + + } catch (std::exception& e) { + return HandleException("Encounter exception when clean cache", e.what()); + } + + return Status::OK(); +} + +Status +SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { auto now = utils::GetMicroSecTimeStamp(); std::set table_ids; diff --git a/core/src/db/meta/SqliteMetaImpl.h b/core/src/db/meta/SqliteMetaImpl.h index 8e821d81de6ca71ecd8c03056f07c02dd00dfa8c..5581efe361257ec11c775a72ec4f7676a9fb5ef2 100644 --- a/core/src/db/meta/SqliteMetaImpl.h +++ b/core/src/db/meta/SqliteMetaImpl.h @@ -117,10 +117,13 @@ class SqliteMetaImpl : public Meta { Archive() override; Status - CleanUp() override; + CleanUpShadowFiles() override; Status - CleanUpFilesWithTTL(uint16_t seconds) override; + CleanUpCacheWithTTL(uint64_t seconds) override; + + Status + CleanUpFilesWithTTL(uint64_t seconds) override; Status DropAll() override; diff --git a/core/src/utils/CommonUtil.cpp b/core/src/utils/CommonUtil.cpp index 26e43619fb405136b5d9ac706a8d3375d621cee0..cdfae8f1e5db7b461cf8ccaa97ae707d1f12f64a 100644 --- a/core/src/utils/CommonUtil.cpp +++ b/core/src/utils/CommonUtil.cpp @@ -16,6 +16,9 @@ // under the License. #include "utils/CommonUtil.h" +#include "cache/CpuCacheMgr.h" +#include "cache/GpuCacheMgr.h" +#include "server/Config.h" #include "utils/Log.h" #include @@ -27,6 +30,7 @@ #include #include #include +#include #include "boost/filesystem.hpp" @@ -222,5 +226,24 @@ CommonUtil::ConvertTime(tm time_struct, time_t& time_integer) { time_integer = mktime(&time_struct); } +void +CommonUtil::EraseFromCache(const std::string& item_key) { + if (item_key.empty()) { + SERVER_LOG_ERROR << "Empty key cannot be erased from cache"; + return; + } + + cache::CpuCacheMgr::GetInstance()->EraseItem(item_key); + +#ifdef MILVUS_GPU_VERSION + server::Config& config = server::Config::GetInstance(); + std::vector gpus; + Status s = config.GetGpuResourceConfigSearchResources(gpus); + for (auto& gpu : gpus) { + cache::GpuCacheMgr::GetInstance(gpu)->EraseItem(item_key); + } +#endif +} + } // namespace server } // namespace milvus diff --git a/core/src/utils/CommonUtil.h b/core/src/utils/CommonUtil.h index 121196986a31797654da4a256335be3db0232e09..39b553d830f39e408bc4b9219f5fce4fce7af5eb 100644 --- a/core/src/utils/CommonUtil.h +++ b/core/src/utils/CommonUtil.h @@ -56,6 +56,9 @@ class CommonUtil { ConvertTime(time_t time_integer, tm& time_struct); static void ConvertTime(tm time_struct, time_t& time_integer); + + static void + EraseFromCache(const std::string& item_key); }; } // namespace server diff --git a/core/unittest/db/test_meta.cpp b/core/unittest/db/test_meta.cpp index 143bf3938385f2bc63c233cbaa81a6656cbe36be..b89c73c29675b2c359210fda126e140e5989fbcd 100644 --- a/core/unittest/db/test_meta.cpp +++ b/core/unittest/db/test_meta.cpp @@ -329,7 +329,7 @@ TEST_F(MetaTest, TABLE_FILES_TEST) { status = impl_->CreateTableFile(table_file); table_file.file_type_ = milvus::engine::meta::TableFileSchema::NEW; status = impl_->UpdateTableFile(table_file); - status = impl_->CleanUp(); + status = impl_->CleanUpShadowFiles(); ASSERT_TRUE(status.ok()); status = impl_->DropTable(table_id);