diff --git a/core/src/db/DB.h b/core/src/db/DB.h index 6a25e8e8321de674434fe2464724b11b6aefbe57..ad78aa6c6b5cbdbf39034ee2d5a311f9c51542da 100644 --- a/core/src/db/DB.h +++ b/core/src/db/DB.h @@ -51,7 +51,7 @@ class DB { CreateCollection(const snapshot::CreateCollectionContext& context) = 0; virtual Status - DropCollection(const std::string& name) = 0; + DropCollection(const std::string& collection_name) = 0; virtual Status HasCollection(const std::string& collection_name, bool& has_or_not) = 0; diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 0ce4c1129748fb543052e1f1c6b9db9e27ac21d0..c4f60b198bcf07a5a11ba00f66294d09444eb78f 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -197,14 +197,14 @@ DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) { } Status -DBImpl::DropCollection(const std::string& name) { +DBImpl::DropCollection(const std::string& collection_name) { CHECK_INITIALIZED; - LOG_ENGINE_DEBUG_ << "Prepare to drop collection " << name; + LOG_ENGINE_DEBUG_ << "Prepare to drop collection " << collection_name; snapshot::ScopedSnapshotT ss; auto& snapshots = snapshot::Snapshots::GetInstance(); - STATUS_CHECK(snapshots.GetSnapshot(ss, name)); + STATUS_CHECK(snapshots.GetSnapshot(ss, collection_name)); mem_mgr_->EraseMem(ss->GetCollectionId()); // not allow insert diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index 0c33a171d7033d07b2bdab1a519cf50e5f43a6f9..458556c166b3953100eb5a3f28dec0d3b3663ee3 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -45,7 +45,7 @@ class DBImpl : public DB, public ConfigObserver { CreateCollection(const snapshot::CreateCollectionContext& context) override; Status - DropCollection(const std::string& name) override; + DropCollection(const std::string& collection_name) override; Status HasCollection(const std::string& collection_name, bool& has_or_not) override; diff --git a/core/src/db/DBProxy.cpp b/core/src/db/DBProxy.cpp index 9d76cfb76308da53ed0b8c524e82e10d90727dca..87148a0b3ce303c521dbfe8202465d404c607eab 100644 --- a/core/src/db/DBProxy.cpp +++ b/core/src/db/DBProxy.cpp @@ -41,9 +41,9 @@ DBProxy::CreateCollection(const snapshot::CreateCollectionContext& context) { } Status -DBProxy::DropCollection(const std::string& name) { +DBProxy::DropCollection(const std::string& collection_name) { DB_CHECK - return db_->DropCollection(name); + return db_->DropCollection(collection_name); } Status diff --git a/core/src/db/DBProxy.h b/core/src/db/DBProxy.h index ca03a4ee08a21bc630c6b68f723dcaf00098c48a..c4b04e98b7415c33db429d3080c421b435e10fce 100644 --- a/core/src/db/DBProxy.h +++ b/core/src/db/DBProxy.h @@ -34,7 +34,7 @@ class DBProxy : public DB { CreateCollection(const snapshot::CreateCollectionContext& context) override; Status - DropCollection(const std::string& name) override; + DropCollection(const std::string& collection_name) override; Status HasCollection(const std::string& collection_name, bool& has_or_not) override; diff --git a/core/src/db/transcript/TranscriptProxy.cpp b/core/src/db/transcript/TranscriptProxy.cpp index 9c0e2d6efdc6eaa270f0ff7ec425776a2a718d34..17b2bfd3c343fe9e0331fe44acbaf7920fb1b377 100644 --- a/core/src/db/transcript/TranscriptProxy.cpp +++ b/core/src/db/transcript/TranscriptProxy.cpp @@ -68,8 +68,8 @@ TranscriptProxy::CreateCollection(const snapshot::CreateCollectionContext& conte } Status -TranscriptProxy::DropCollection(const std::string& name) { - return db_->DropCollection(name); +TranscriptProxy::DropCollection(const std::string& collection_name) { + return db_->DropCollection(collection_name); } Status diff --git a/core/src/db/transcript/TranscriptProxy.h b/core/src/db/transcript/TranscriptProxy.h index 1611f9ead97b1efdecfa4561f20a071f13bf6dac..78c997a84356174acb0f479f6c3cf2cbf61131dd 100644 --- a/core/src/db/transcript/TranscriptProxy.h +++ b/core/src/db/transcript/TranscriptProxy.h @@ -34,7 +34,7 @@ class TranscriptProxy : public DBProxy { CreateCollection(const snapshot::CreateCollectionContext& context) override; Status - DropCollection(const std::string& name) override; + DropCollection(const std::string& collection_name) override; Status HasCollection(const std::string& collection_name, bool& has_or_not) override; diff --git a/core/src/db/wal/WalFile.cpp b/core/src/db/wal/WalFile.cpp index e4a487b682fcec4e6412bcbb85af4d05f36dc1d6..e3dbb368b26b35c4682f3ec7a61e3fca134a615d 100644 --- a/core/src/db/wal/WalFile.cpp +++ b/core/src/db/wal/WalFile.cpp @@ -27,7 +27,20 @@ WalFile::OpenFile(const std::string& path, OpenMode mode) { CloseFile(); try { - std::string str_mode = (mode == OpenMode::READ) ? "rb" : "awb"; + std::string str_mode; + switch (mode) { + case OpenMode::READ: + str_mode = "rb"; + break; + case OpenMode::APPEND_WRITE: + str_mode = "awb"; + break; + case OpenMode::OVER_WRITE: + str_mode = "wb"; + break; + default: + return Status(DB_ERROR, "Unsupported file mode"); + } file_ = fopen(path.c_str(), str_mode.c_str()); if (file_ == nullptr) { std::string msg = "Failed to create wal file: " + path; diff --git a/core/src/db/wal/WalManager.cpp b/core/src/db/wal/WalManager.cpp index 86003c60765dd8b802ff9dc5b8a82731a8a2d947..9d94e53c413f150f79cc492d3fecaa2363d415ae 100644 --- a/core/src/db/wal/WalManager.cpp +++ b/core/src/db/wal/WalManager.cpp @@ -14,7 +14,6 @@ #include "db/wal/WalOperationCodec.h" #include "utils/CommonUtil.h" -#include #include #include #include @@ -24,7 +23,8 @@ namespace milvus { namespace engine { -const char* MAX_OP_ID_FILE_NAME = "max_op"; +const char* WAL_MAX_OP_FILE_NAME = "max_op"; +const char* WAL_DEL_FILE_NAME = "del"; WalManager::WalManager() : cleanup_thread_pool_(1, 1) { } @@ -43,7 +43,7 @@ WalManager::Start(const DBOptions& options) { CommonUtil::CreateDirectory(wal_path_); - auto status = ReadMaxOpId(); + auto status = Init(); if (!status.ok()) { return status; } @@ -53,11 +53,28 @@ WalManager::Start(const DBOptions& options) { Status WalManager::Stop() { - std::lock_guard lck(cleanup_thread_mutex_); - for (auto& iter : cleanup_thread_results_) { - iter.wait(); + { + std::lock_guard lock(file_map_mutex_); + file_map_.clear(); } + WaitCleanupFinish(); + + return Status::OK(); +} + +Status +WalManager::DropCollection(const std::string& collection_name) { + // write a placeholder file 'del' under collection folder, let cleanup thread remove this folder + std::string path = ConstructFilePath(collection_name, WAL_DEL_FILE_NAME); + WalFile file; + file.OpenFile(path, WalFile::OVER_WRITE); + bool del = true; + file.Write(&del); + + AddCleanupTask(collection_name); + StartCleanupThread(); + return Status::OK(); } @@ -106,7 +123,7 @@ WalManager::OperationDone(const std::string& collection_name, idx_t op_id) { start_clecnup = true; // write max op id to disk - std::string path = ConstructFilePath(collection_name, MAX_OP_ID_FILE_NAME); + std::string path = ConstructFilePath(collection_name, WAL_MAX_OP_FILE_NAME); WalFile file; file.OpenFile(path, WalFile::OVER_WRITE); file.Write(&op_id); @@ -114,7 +131,8 @@ WalManager::OperationDone(const std::string& collection_name, idx_t op_id) { } if (start_clecnup) { - StartCleanupThread(collection_name); + AddCleanupTask(collection_name); + StartCleanupThread(); } return Status::OK(); @@ -122,6 +140,8 @@ WalManager::OperationDone(const std::string& collection_name, idx_t op_id) { Status WalManager::Recovery(const DBPtr& db) { + WaitCleanupFinish(); + using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator; DirectoryIterator iter_outer(wal_path_); DirectoryIterator end_outer; @@ -140,7 +160,7 @@ WalManager::Recovery(const DBPtr& db) { for (; iter_inner != end_inner; ++iter_inner) { auto path_inner = (*iter_inner).path(); std::string file_name = path_inner.filename().c_str(); - if (file_name == MAX_OP_ID_FILE_NAME) { + if (file_name == WAL_MAX_OP_FILE_NAME) { continue; } idx_t op_id = std::stol(file_name); @@ -148,7 +168,7 @@ WalManager::Recovery(const DBPtr& db) { } // the max operation id - idx_t max_op_id = std::numeric_limits::max(); + idx_t max_op_id = 0; { std::lock_guard lock(max_op_mutex_); if (max_op_id_map_.find(collection_name) != max_op_id_map_.end()) { @@ -168,12 +188,15 @@ WalManager::Recovery(const DBPtr& db) { continue; // skip and delete this file since all its operations already done } + // read operation and execute Status status = Status::OK(); while (status.ok()) { WalOperationPtr operation; - operation->collection_name_ = collection_name; status = WalOperationCodec::IterateOperation(file, operation, max_op_id); - PerformOperation(operation, db); + if (operation) { + operation->collection_name_ = collection_name; + PerformOperation(operation, db); + } } } } @@ -182,7 +205,7 @@ WalManager::Recovery(const DBPtr& db) { } Status -WalManager::ReadMaxOpId() { +WalManager::Init() { using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator; DirectoryIterator iter(wal_path_); DirectoryIterator end; @@ -190,21 +213,30 @@ WalManager::ReadMaxOpId() { auto path = (*iter).path(); if (std::experimental::filesystem::is_directory(path)) { std::string collection_name = path.filename().c_str(); - path.append(MAX_OP_ID_FILE_NAME); - if (!std::experimental::filesystem::is_regular_file(path)) { - continue; // ignore? - } - WalFile file; - file.OpenFile(path.c_str(), WalFile::READ); - idx_t max_op = 0; - file.Read(&max_op); + // read max op id + std::experimental::filesystem::path file_path = path; + file_path.append(WAL_MAX_OP_FILE_NAME); + if (std::experimental::filesystem::is_regular_file(file_path)) { + WalFile file; + file.OpenFile(path.c_str(), WalFile::READ); + idx_t max_op = 0; + file.Read(&max_op); + + std::lock_guard lock(max_op_mutex_); + max_op_id_map_.insert(std::make_pair(collection_name, max_op)); + } - std::lock_guard lock(max_op_mutex_); - max_op_id_map_.insert(std::make_pair(collection_name, max_op)); + // this collection has been deleted? + file_path = path; + file_path.append(WAL_DEL_FILE_NAME); + if (std::experimental::filesystem::is_regular_file(file_path)) { + AddCleanupTask(collection_name); + } } } + StartCleanupThread(); // do cleanup return Status::OK(); } @@ -245,9 +277,11 @@ WalManager::RecordInsertOperation(const InsertEntityOperationPtr& operation, con } // insert action to db - status = db->Insert(operation->collection_name_, operation->partition_name, operation->data_chunk_, op_id); - if (!status.ok()) { - return status; + if (db) { + status = db->Insert(operation->collection_name_, operation->partition_name, operation->data_chunk_, op_id); + if (!status.ok()) { + return status; + } } } @@ -296,7 +330,11 @@ WalManager::RecordDeleteOperation(const DeleteEntityOperationPtr& operation, con } // delete action to db - return db->DeleteEntityByID(operation->collection_name_, operation->entity_ids_, op_id); + if (db) { + return db->DeleteEntityByID(operation->collection_name_, operation->entity_ids_, op_id); + } + + return Status::OK(); } std::string @@ -305,100 +343,171 @@ WalManager::ConstructFilePath(const std::string& collection_name, const std::str std::experimental::filesystem::create_directory(full_path); full_path.append(collection_name); std::experimental::filesystem::create_directory(full_path); - full_path.append(file_name); + + if (!file_name.empty()) { + full_path.append(file_name); + } std::string path(full_path.c_str()); return path; } void -WalManager::StartCleanupThread(const std::string& collection_name) { +WalManager::AddCleanupTask(const std::string& collection_name) { + std::lock_guard lck(cleanup_task_mutex_); + if (cleanup_tasks_.empty()) { + cleanup_tasks_.push_back(collection_name); + } else { + // no need to add duplicate name + std::string back = cleanup_tasks_.back(); + if (back != collection_name) { + cleanup_tasks_.push_back(collection_name); + } + } +} + +void +WalManager::TakeCleanupTask(std::string& collection_name) { + collection_name = ""; + std::lock_guard lck(cleanup_task_mutex_); + if (cleanup_tasks_.empty()) { + return; + } + collection_name = cleanup_tasks_.front(); + cleanup_tasks_.pop_front(); +} + +void +WalManager::StartCleanupThread() { // the previous thread finished? std::lock_guard lck(cleanup_thread_mutex_); if (cleanup_thread_results_.empty()) { // start a new cleanup thread - cleanup_thread_results_.push_back( - cleanup_thread_pool_.enqueue(&WalManager::CleanupThread, this, collection_name)); + cleanup_thread_results_.push_back(cleanup_thread_pool_.enqueue(&WalManager::CleanupThread, this)); } else { std::chrono::milliseconds span(1); if (cleanup_thread_results_.back().wait_for(span) == std::future_status::ready) { cleanup_thread_results_.pop_back(); // start a new cleanup thread - cleanup_thread_results_.push_back( - cleanup_thread_pool_.enqueue(&WalManager::CleanupThread, this, collection_name)); + cleanup_thread_results_.push_back(cleanup_thread_pool_.enqueue(&WalManager::CleanupThread, this)); } } } void -WalManager::CleanupThread(std::string collection_name) { +WalManager::WaitCleanupFinish() { + std::lock_guard lck(cleanup_thread_mutex_); + for (auto& iter : cleanup_thread_results_) { + iter.wait(); + } +} + +void +WalManager::CleanupThread() { SetThreadName("wal_clean"); + std::string target_collection; + TakeCleanupTask(target_collection); + using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator; - DirectoryIterator iter_outer(wal_path_); - DirectoryIterator end_outer; - for (; iter_outer != end_outer; ++iter_outer) { - auto path_outer = (*iter_outer).path(); - if (!std::experimental::filesystem::is_directory(path_outer)) { + while (!target_collection.empty()) { + std::string path = ConstructFilePath(target_collection, ""); + std::experimental::filesystem::path collection_path = path; + + // not a folder + if (!std::experimental::filesystem::is_directory(path)) { + TakeCleanupTask(target_collection); continue; } - // get max operation id - std::string file_name = path_outer.filename().c_str(); - if (file_name != collection_name) { + // collection already deleted + std::experimental::filesystem::path file_path = collection_path; + file_path.append(WAL_DEL_FILE_NAME); + if (std::experimental::filesystem::is_regular_file(file_path)) { + // clean max operation id + { + std::lock_guard lock(max_op_mutex_); + max_op_id_map_.erase(target_collection); + } + // clean opened file in buffer + { + std::lock_guard lock(file_map_mutex_); + file_map_.erase(target_collection); + } + + // remove collection folder + std::experimental::filesystem::remove_all(collection_path); + + TakeCleanupTask(target_collection); continue; } - idx_t max_op = std::numeric_limits::max(); + // get max operation id + idx_t max_op = 0; { std::lock_guard lock(max_op_mutex_); - if (max_op_id_map_.find(collection_name) != max_op_id_map_.end()) { - max_op = max_op_id_map_[collection_name]; + if (max_op_id_map_.find(target_collection) != max_op_id_map_.end()) { + max_op = max_op_id_map_[target_collection]; } } // iterate files - std::map id_files; - DirectoryIterator iter_inner(path_outer); - DirectoryIterator end_inner; - for (; iter_inner != end_inner; ++iter_inner) { - auto path_inner = (*iter_inner).path(); - std::string file_name = path_inner.filename().c_str(); - if (file_name == MAX_OP_ID_FILE_NAME) { + std::map wal_files; + DirectoryIterator file_iter(collection_path); + DirectoryIterator end_iter; + for (; file_iter != end_iter; ++file_iter) { + auto file_path = (*file_iter).path(); + std::string file_name = file_path.filename().c_str(); + if (file_name == WAL_MAX_OP_FILE_NAME) { continue; } idx_t op_id = std::stol(file_name); - id_files.insert(std::make_pair(op_id, path_inner)); + wal_files.insert(std::make_pair(op_id, file_path)); } - if (id_files.empty()) { + // no wal file + if (wal_files.empty()) { + TakeCleanupTask(target_collection); continue; } - // remove unused files - // the std::map arrange id in assendent, direct delete files except the last one - idx_t max_id = id_files.rbegin()->first; - std::experimental::filesystem::path max_file = id_files.rbegin()->second; - id_files.erase(max_id); - for (auto& pair : id_files) { - std::experimental::filesystem::remove(pair.second); - } + // the std::map arrange id in assendent + // if the last id < max_op, delete the wal file + for (auto& pair : wal_files) { + WalFile file; + file.OpenFile(pair.second.c_str(), WalFile::READ); + idx_t last_id = 0; + file.ReadLastOpId(last_id); + if (last_id <= max_op) { + file.CloseFile(); + + // makesure wal file is closed + { + std::lock_guard lock(file_map_mutex_); + WalFilePtr file = file_map_[target_collection]; + if (file) { + if (file->Path() == pair.second) { + file->CloseFile(); + file_map_.erase(target_collection); + } + } + } - // the last wal file need to be deleted? - WalFile file; - file.OpenFile(max_file.c_str(), WalFile::READ); - idx_t last_id = 0; - file.ReadLastOpId(last_id); - if (last_id <= max_op) { - file.CloseFile(); - std::experimental::filesystem::remove(max_file); + std::experimental::filesystem::remove(pair.second); + } } + + TakeCleanupTask(target_collection); } } Status WalManager::PerformOperation(const WalOperationPtr& operation, const DBPtr& db) { + if (operation == nullptr || db == nullptr) { + return Status(DB_ERROR, "null pointer"); + } + Status status; switch (operation->Type()) { case WalOperationType::INSERT_ENTITY: { diff --git a/core/src/db/wal/WalManager.h b/core/src/db/wal/WalManager.h index 58261aac3945c70bc4dd9d163d492145639a9c2f..73ea6a823ffe0313d5356b081ff3df6386e0fa0f 100644 --- a/core/src/db/wal/WalManager.h +++ b/core/src/db/wal/WalManager.h @@ -29,6 +29,9 @@ namespace milvus { namespace engine { +extern const char* WAL_MAX_OP_FILE_NAME; +extern const char* WAL_DEL_FILE_NAME; + class WalManager { public: WalManager(); @@ -42,6 +45,9 @@ class WalManager { Status Stop(); + Status + DropCollection(const std::string& collection_name); + Status RecordOperation(const WalOperationPtr& operation, const DBPtr& db); @@ -53,7 +59,7 @@ class WalManager { private: Status - ReadMaxOpId(); + Init(); Status RecordInsertOperation(const InsertEntityOperationPtr& operation, const DBPtr& db); @@ -68,10 +74,19 @@ class WalManager { ConstructFilePath(const std::string& collection_name, const std::string& file_name); void - StartCleanupThread(const std::string& collection_name); + AddCleanupTask(const std::string& collection_name); + + void + TakeCleanupTask(std::string& collection_name); void - CleanupThread(std::string collection_name); + StartCleanupThread(); + + void + WaitCleanupFinish(); + + void + CleanupThread(); Status PerformOperation(const WalOperationPtr& operation, const DBPtr& db); @@ -87,13 +102,16 @@ class WalManager { WalFileMap file_map_; // mapping collection name to file std::mutex file_map_mutex_; - using MaxOpIdMap = std::unordered_map; + using MaxOpIdMap = std::unordered_map; MaxOpIdMap max_op_id_map_; // mapping collection name to max operation id std::mutex max_op_mutex_; ThreadPool cleanup_thread_pool_; std::mutex cleanup_thread_mutex_; std::list> cleanup_thread_results_; + + std::list cleanup_tasks_; // cleanup target collections + std::mutex cleanup_task_mutex_; }; } // namespace engine diff --git a/core/src/db/wal/WalOperationCodec.cpp b/core/src/db/wal/WalOperationCodec.cpp index c4c9ef1cce33e79090ba1e6fd110910bf20175bd..11e2b19388fa7c2715ce403721084b32911da093 100644 --- a/core/src/db/wal/WalOperationCodec.cpp +++ b/core/src/db/wal/WalOperationCodec.cpp @@ -90,8 +90,6 @@ WalOperationCodec::WriteInsertOperation(const WalFilePtr& file, const std::strin if (total_bytes != calculate_total_bytes) { LOG_ENGINE_ERROR_ << "wal serialize(insert) bytes " << total_bytes << " not equal " << calculate_total_bytes; - } else { - LOG_ENGINE_DEBUG_ << "Wal serialize(insert) " << total_bytes << " bytes"; } } catch (std::exception& ex) { std::string msg = "Failed to write insert operation, reason: " + std::string(ex.what()); @@ -144,8 +142,6 @@ WalOperationCodec::WriteDeleteOperation(const WalFilePtr& file, const IDNumbers& if (total_bytes != calculate_total_bytes) { LOG_ENGINE_ERROR_ << "wal serialize(delete) bytes " << total_bytes << " not equal " << calculate_total_bytes; - } else { - LOG_ENGINE_DEBUG_ << "Wal serialize(delete) " << total_bytes << " bytes"; } } catch (std::exception& ex) { std::string msg = "Failed to write insert operation, reason: " + std::string(ex.what()); @@ -192,15 +188,13 @@ WalOperationCodec::IterateOperation(const WalFilePtr& file, WalOperationPtr& ope if (type == WalOperationType::INSERT_ENTITY) { // read partition name int32_t part_name_length = 0; - read_bytes = file->Read(&part_name_length); - if (read_bytes <= 0) { - return Status(DB_ERROR, "End of file"); - } - std::string partition_name; - read_bytes = file->ReadStr(partition_name, part_name_length); - if (read_bytes <= 0) { - return Status(DB_ERROR, "End of file"); + read_bytes = file->Read(&part_name_length); + if (part_name_length > 0) { + read_bytes = file->ReadStr(partition_name, part_name_length); + if (read_bytes <= 0) { + return Status(DB_ERROR, "End of file"); + } } // read fixed data diff --git a/core/src/db/wal/WalProxy.cpp b/core/src/db/wal/WalProxy.cpp index 9171944eb9020959c78b5d6126262274f27bc40e..89ea8510c6bea6cb2b641a977387c3c14fc1f8da 100644 --- a/core/src/db/wal/WalProxy.cpp +++ b/core/src/db/wal/WalProxy.cpp @@ -52,6 +52,17 @@ WalProxy::Stop() { return status; } +Status +WalProxy::DropCollection(const std::string& collection_name) { + auto status = db_->DropCollection(collection_name); + if (!status.ok()) { + return status; + } + + WalManager::GetInstance().DropCollection(collection_name); + return status; +} + Status WalProxy::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, idx_t op_id) { diff --git a/core/src/db/wal/WalProxy.h b/core/src/db/wal/WalProxy.h index d0d576f009e04fa68a8e5c4364748f136e437461..6e032a5f5f32dbdf16ae2a1e18fc272008c8098b 100644 --- a/core/src/db/wal/WalProxy.h +++ b/core/src/db/wal/WalProxy.h @@ -30,6 +30,9 @@ class WalProxy : public DBProxy { Status Stop() override; + Status + DropCollection(const std::string& collection_name) override; + Status Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, idx_t op_id) override; diff --git a/core/unittest/db/test_wal.cpp b/core/unittest/db/test_wal.cpp index 8131498967515a2988420b63597561dd174709e2..fe2ec881c66681be9205586f9842c1e50da562a9 100644 --- a/core/unittest/db/test_wal.cpp +++ b/core/unittest/db/test_wal.cpp @@ -34,6 +34,8 @@ using WalOperation = milvus::engine::WalOperation; using WalOperationPtr = milvus::engine::WalOperationPtr; using WalOperationType = milvus::engine::WalOperationType; using WalOperationCodec = milvus::engine::WalOperationCodec; +using InsertEntityOperation = milvus::engine::InsertEntityOperation; +using DeleteEntityOperation = milvus::engine::DeleteEntityOperation; using WalProxy = milvus::engine::WalProxy; void CreateChunk(DataChunkPtr& chunk, int64_t row_count, int64_t& chunk_size) { @@ -71,11 +73,16 @@ void CreateChunk(DataChunkPtr& chunk, int64_t row_count, int64_t& chunk_size) { class DummyDB : public DBProxy { public: + DummyDB(const DBOptions& options) + : DBProxy(nullptr, options) { + } + Status Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, idx_t op_id) override { + insert_count_++; WalManager::GetInstance().OperationDone(collection_name, op_id); return Status::OK(); } @@ -84,12 +91,21 @@ class DummyDB : public DBProxy { DeleteEntityByID(const std::string& collection_name, const IDNumbers& entity_ids, idx_t op_id) override { + delete_count_++; WalManager::GetInstance().OperationDone(collection_name, op_id); return Status::OK(); } + int64_t InsertCount() const { return insert_count_; } + int64_t DeleteCount() const { return delete_count_; } + + private: + int64_t insert_count_ = 0; + int64_t delete_count_ = 0; }; +using DummyDBPtr = std::shared_ptr; + } // namespace TEST_F(WalTest, WalFileTest) { @@ -257,27 +273,167 @@ TEST_F(WalTest, WalProxyTest) { std::string collection_name = "col_1"; std::string partition_name = "part_1"; - // write over more than 400MB data + // write over more than 400MB data, 2 wal files for (int64_t i = 1; i <= 1000; i++) { - idx_t op_id = i; if (i % 10 == 0) { IDNumbers ids = {1, 2, 3}; - auto status = db_->DeleteEntityByID(collection_name, ids, op_id); + auto status = db_->DeleteEntityByID(collection_name, ids, 0); ASSERT_TRUE(status.ok()); } else { DataChunkPtr chunk; int64_t chunk_size = 0; CreateChunk(chunk, 1000, chunk_size); - auto status = db_->Insert(collection_name, partition_name, chunk, op_id); + auto status = db_->Insert(collection_name, partition_name, chunk, 0); ASSERT_TRUE(status.ok()); } } + + // find out the wal files + DBOptions opt = GetOptions(); + std::experimental::filesystem::path collection_path = opt.meta_.path_; + collection_path.append(collection_name); + + using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator; + std::set op_ids; + { + DirectoryIterator file_iter(collection_path); + DirectoryIterator end_iter; + for (; file_iter != end_iter; ++file_iter) { + auto file_path = (*file_iter).path(); + std::string file_name = file_path.filename().c_str(); + if (file_name == milvus::engine::WAL_MAX_OP_FILE_NAME) { + continue; + } + + // read all operation ids + auto file = std::make_shared(); + auto status = file->OpenFile(file_path, WalFile::READ); + ASSERT_TRUE(status.ok()); + + Status iter_status; + while (iter_status.ok()) { + WalOperationPtr operation; + iter_status = WalOperationCodec::IterateOperation(file, operation, 0); + if (operation != nullptr) { + op_ids.insert(operation->ID()); + } + } + } + } + + // notify operation done, the wal files will be removed after all operations done + for (auto id : op_ids) { + auto status = WalManager::GetInstance().OperationDone(collection_name, id); + ASSERT_TRUE(status.ok()); + } + + // wait cleanup thread finish + WalManager::GetInstance().Stop(); + + // check the wal files + { + DirectoryIterator file_iter(collection_path); + DirectoryIterator end_iter; + for (; file_iter != end_iter; ++file_iter) { + auto file_path = (*file_iter).path(); + std::string file_name = file_path.filename().c_str(); + if (file_name == milvus::engine::WAL_MAX_OP_FILE_NAME) { + continue; + } + + // wal file not deleted? + ASSERT_TRUE(false); + } + } } -TEST(WalManagerTest, WalManagerTest) { - std::string path = "/tmp/milvus_wal/test_file"; +TEST_F(WalTest, WalManagerTest) { + std::string collection_name = "collection"; + + // construct mock db + DBOptions options; + options.meta_.path_ = "/tmp/milvus_wal"; + options.wal_enable_ = true; + DummyDBPtr db_1 = std::make_shared(options); + + // prepare wal manager + WalManager::GetInstance().Stop(); + WalManager::GetInstance().Start(options); + + // write over more than 400MB data, 2 wal files + int64_t insert_count = 0; + int64_t delete_count = 0; + for (int64_t i = 1; i <= 1000; i++) { + if (i % 100 == 0) { + auto status = WalManager::GetInstance().DropCollection(collection_name); + ASSERT_TRUE(status.ok()); + } else if (i % 10 == 0) { + IDNumbers ids = {1, 2, 3}; + + auto op = std::make_shared(); + op->collection_name_ = collection_name; + op->entity_ids_ = ids; + + auto status = WalManager::GetInstance().RecordOperation(op, db_1); + ASSERT_TRUE(status.ok()); + + delete_count++; + } else { + DataChunkPtr chunk; + int64_t chunk_size = 0; + CreateChunk(chunk, 1000, chunk_size); + + auto op = std::make_shared(); + op->collection_name_ = collection_name; + op->partition_name = ""; + op->data_chunk_ = chunk; + + auto status = WalManager::GetInstance().RecordOperation(op, db_1); + ASSERT_TRUE(status.ok()); + + insert_count++; + } + } + + ASSERT_EQ(db_1->InsertCount(), insert_count); + ASSERT_EQ(db_1->DeleteCount(), delete_count); + + + // test recovery + insert_count = 0; + delete_count = 0; + for (int64_t i = 1; i <= 1000; i++) { + if (i % 10 == 0) { + IDNumbers ids = {1, 2, 3}; + + auto op = std::make_shared(); + op->collection_name_ = collection_name; + op->entity_ids_ = ids; + + auto status = WalManager::GetInstance().RecordOperation(op, nullptr); + ASSERT_TRUE(status.ok()); + + delete_count++; + } else { + DataChunkPtr chunk; + int64_t chunk_size = 0; + CreateChunk(chunk, 1000, chunk_size); + + auto op = std::make_shared(); + op->collection_name_ = collection_name; + op->partition_name = ""; + op->data_chunk_ = chunk; + + auto status = WalManager::GetInstance().RecordOperation(op, nullptr); + ASSERT_TRUE(status.ok()); + + insert_count++; + } + } -// WalManager::GetInstance().Start(options_); -// WalManager::GetInstance().Recovery(db_); + DummyDBPtr db_2 = std::make_shared(options); + WalManager::GetInstance().Recovery(db_2); + ASSERT_EQ(db_2->InsertCount(), insert_count); + ASSERT_EQ(db_2->DeleteCount(), delete_count); } \ No newline at end of file diff --git a/core/unittest/db/utils.cpp b/core/unittest/db/utils.cpp index 810a003a146dcaa9614376437e24f48e030ff10d..1fb4a40e11af6ff352ba839add15b08c1b707e99 100644 --- a/core/unittest/db/utils.cpp +++ b/core/unittest/db/utils.cpp @@ -81,7 +81,7 @@ BaseTest::InitLog() { } void -BaseTest::SnapshotStart(bool mock_store, milvus::engine::DBOptions options) { +BaseTest::SnapshotStart(bool mock_store, DBOptions options) { auto store = Store::Build(options.meta_.backend_uri_, options.meta_.path_, milvus::codec::Codec::instance().GetSuffixSet()); @@ -136,7 +136,7 @@ BaseTest::TearDown() { void SnapshotTest::SetUp() { BaseTest::SetUp(); - milvus::engine::DBOptions options; + DBOptions options; options.meta_.path_ = "/tmp/milvus_ss"; options.meta_.backend_uri_ = "mock://:@:/"; options.wal_enable_ = false; @@ -150,11 +150,11 @@ SnapshotTest::TearDown() { } ///////////////////////////////////////////////////////////////////////////////////////////////////////////////// -milvus::engine::DBOptions +DBOptions DBTest::GetOptions() { milvus::cache::CpuCacheMgr::GetInstance().SetCapacity(256 * milvus::engine::MB); - auto options = milvus::engine::DBOptions(); + auto options = DBOptions(); options.meta_.path_ = "/tmp/milvus_ss"; options.meta_.backend_uri_ = "mock://:@:/"; options.wal_enable_ = false; @@ -216,7 +216,7 @@ DBTest::TearDown() { void SegmentTest::SetUp() { BaseTest::SetUp(); - milvus::engine::DBOptions options; + DBOptions options; options.meta_.path_ = "/tmp/milvus_ss"; options.meta_.backend_uri_ = "mock://:@:/"; options.wal_enable_ = false; @@ -251,7 +251,7 @@ MetaTest::TearDown() { void SchedulerTest::SetUp() { BaseTest::SetUp(); - milvus::engine::DBOptions options; + DBOptions options; options.meta_.path_ = "/tmp/milvus_ss"; options.meta_.backend_uri_ = "mock://:@:/"; options.wal_enable_ = false; @@ -303,9 +303,9 @@ EventTest::TearDown() { } ///////////////////////////////////////////////////////////////////////////////////////////////////////////////// -milvus::engine::DBOptions +DBOptions WalTest::GetOptions() { - milvus::engine::DBOptions options; + DBOptions options; options.meta_.path_ = "/tmp/milvus_wal"; options.meta_.backend_uri_ = "mock://:@:/"; options.wal_enable_ = true; diff --git a/core/unittest/db/utils.h b/core/unittest/db/utils.h index 665e15550c9c007a31009811edb2b8dbf8493ae8..e56d77c1560d69a2b7ba2f3955f134e7bc6246b1 100644 --- a/core/unittest/db/utils.h +++ b/core/unittest/db/utils.h @@ -88,6 +88,7 @@ using StorePtr = milvus::engine::snapshot::Store::Ptr; using MetaAdapterPtr = milvus::engine::meta::MetaAdapterPtr; using DB = milvus::engine::DB; +using DBOptions = milvus::engine::DBOptions; using Status = milvus::Status; using idx_t = milvus::engine::idx_t; using IDNumbers = milvus::engine::IDNumbers; @@ -327,7 +328,7 @@ class BaseTest : public ::testing::Test { void InitLog(); void - SnapshotStart(bool mock_store, milvus::engine::DBOptions); + SnapshotStart(bool mock_store, DBOptions); void SnapshotStop(); @@ -351,7 +352,7 @@ class DBTest : public BaseTest { protected: std::shared_ptr db_; - milvus::engine::DBOptions + DBOptions GetOptions(); void @@ -414,7 +415,7 @@ class WalTest : public ::testing::Test { protected: std::shared_ptr db_; - milvus::engine::DBOptions + DBOptions GetOptions(); void