未验证 提交 7ee597bd 编写于 作者: G groot 提交者: GitHub

add wal unittest (#3367)

* add wal unittest
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* fix test fail
Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 4d491d08
......@@ -51,7 +51,7 @@ class DB {
CreateCollection(const snapshot::CreateCollectionContext& context) = 0;
virtual Status
DropCollection(const std::string& name) = 0;
DropCollection(const std::string& collection_name) = 0;
virtual Status
HasCollection(const std::string& collection_name, bool& has_or_not) = 0;
......
......@@ -197,14 +197,14 @@ DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
}
Status
DBImpl::DropCollection(const std::string& name) {
DBImpl::DropCollection(const std::string& collection_name) {
CHECK_INITIALIZED;
LOG_ENGINE_DEBUG_ << "Prepare to drop collection " << name;
LOG_ENGINE_DEBUG_ << "Prepare to drop collection " << collection_name;
snapshot::ScopedSnapshotT ss;
auto& snapshots = snapshot::Snapshots::GetInstance();
STATUS_CHECK(snapshots.GetSnapshot(ss, name));
STATUS_CHECK(snapshots.GetSnapshot(ss, collection_name));
mem_mgr_->EraseMem(ss->GetCollectionId()); // not allow insert
......
......@@ -45,7 +45,7 @@ class DBImpl : public DB, public ConfigObserver {
CreateCollection(const snapshot::CreateCollectionContext& context) override;
Status
DropCollection(const std::string& name) override;
DropCollection(const std::string& collection_name) override;
Status
HasCollection(const std::string& collection_name, bool& has_or_not) override;
......
......@@ -41,9 +41,9 @@ DBProxy::CreateCollection(const snapshot::CreateCollectionContext& context) {
}
Status
DBProxy::DropCollection(const std::string& name) {
DBProxy::DropCollection(const std::string& collection_name) {
DB_CHECK
return db_->DropCollection(name);
return db_->DropCollection(collection_name);
}
Status
......
......@@ -34,7 +34,7 @@ class DBProxy : public DB {
CreateCollection(const snapshot::CreateCollectionContext& context) override;
Status
DropCollection(const std::string& name) override;
DropCollection(const std::string& collection_name) override;
Status
HasCollection(const std::string& collection_name, bool& has_or_not) override;
......
......@@ -68,8 +68,8 @@ TranscriptProxy::CreateCollection(const snapshot::CreateCollectionContext& conte
}
Status
TranscriptProxy::DropCollection(const std::string& name) {
return db_->DropCollection(name);
TranscriptProxy::DropCollection(const std::string& collection_name) {
return db_->DropCollection(collection_name);
}
Status
......
......@@ -34,7 +34,7 @@ class TranscriptProxy : public DBProxy {
CreateCollection(const snapshot::CreateCollectionContext& context) override;
Status
DropCollection(const std::string& name) override;
DropCollection(const std::string& collection_name) override;
Status
HasCollection(const std::string& collection_name, bool& has_or_not) override;
......
......@@ -27,7 +27,20 @@ WalFile::OpenFile(const std::string& path, OpenMode mode) {
CloseFile();
try {
std::string str_mode = (mode == OpenMode::READ) ? "rb" : "awb";
std::string str_mode;
switch (mode) {
case OpenMode::READ:
str_mode = "rb";
break;
case OpenMode::APPEND_WRITE:
str_mode = "awb";
break;
case OpenMode::OVER_WRITE:
str_mode = "wb";
break;
default:
return Status(DB_ERROR, "Unsupported file mode");
}
file_ = fopen(path.c_str(), str_mode.c_str());
if (file_ == nullptr) {
std::string msg = "Failed to create wal file: " + path;
......
......@@ -14,7 +14,6 @@
#include "db/wal/WalOperationCodec.h"
#include "utils/CommonUtil.h"
#include <limits>
#include <map>
#include <memory>
#include <utility>
......@@ -24,7 +23,8 @@
namespace milvus {
namespace engine {
const char* MAX_OP_ID_FILE_NAME = "max_op";
const char* WAL_MAX_OP_FILE_NAME = "max_op";
const char* WAL_DEL_FILE_NAME = "del";
WalManager::WalManager() : cleanup_thread_pool_(1, 1) {
}
......@@ -43,7 +43,7 @@ WalManager::Start(const DBOptions& options) {
CommonUtil::CreateDirectory(wal_path_);
auto status = ReadMaxOpId();
auto status = Init();
if (!status.ok()) {
return status;
}
......@@ -53,11 +53,28 @@ WalManager::Start(const DBOptions& options) {
Status
WalManager::Stop() {
std::lock_guard<std::mutex> lck(cleanup_thread_mutex_);
for (auto& iter : cleanup_thread_results_) {
iter.wait();
{
std::lock_guard<std::mutex> lock(file_map_mutex_);
file_map_.clear();
}
WaitCleanupFinish();
return Status::OK();
}
Status
WalManager::DropCollection(const std::string& collection_name) {
// write a placeholder file 'del' under collection folder, let cleanup thread remove this folder
std::string path = ConstructFilePath(collection_name, WAL_DEL_FILE_NAME);
WalFile file;
file.OpenFile(path, WalFile::OVER_WRITE);
bool del = true;
file.Write<bool>(&del);
AddCleanupTask(collection_name);
StartCleanupThread();
return Status::OK();
}
......@@ -106,7 +123,7 @@ WalManager::OperationDone(const std::string& collection_name, idx_t op_id) {
start_clecnup = true;
// write max op id to disk
std::string path = ConstructFilePath(collection_name, MAX_OP_ID_FILE_NAME);
std::string path = ConstructFilePath(collection_name, WAL_MAX_OP_FILE_NAME);
WalFile file;
file.OpenFile(path, WalFile::OVER_WRITE);
file.Write<idx_t>(&op_id);
......@@ -114,7 +131,8 @@ WalManager::OperationDone(const std::string& collection_name, idx_t op_id) {
}
if (start_clecnup) {
StartCleanupThread(collection_name);
AddCleanupTask(collection_name);
StartCleanupThread();
}
return Status::OK();
......@@ -122,6 +140,8 @@ WalManager::OperationDone(const std::string& collection_name, idx_t op_id) {
Status
WalManager::Recovery(const DBPtr& db) {
WaitCleanupFinish();
using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator;
DirectoryIterator iter_outer(wal_path_);
DirectoryIterator end_outer;
......@@ -140,7 +160,7 @@ WalManager::Recovery(const DBPtr& db) {
for (; iter_inner != end_inner; ++iter_inner) {
auto path_inner = (*iter_inner).path();
std::string file_name = path_inner.filename().c_str();
if (file_name == MAX_OP_ID_FILE_NAME) {
if (file_name == WAL_MAX_OP_FILE_NAME) {
continue;
}
idx_t op_id = std::stol(file_name);
......@@ -148,7 +168,7 @@ WalManager::Recovery(const DBPtr& db) {
}
// the max operation id
idx_t max_op_id = std::numeric_limits<idx_t>::max();
idx_t max_op_id = 0;
{
std::lock_guard<std::mutex> lock(max_op_mutex_);
if (max_op_id_map_.find(collection_name) != max_op_id_map_.end()) {
......@@ -168,12 +188,15 @@ WalManager::Recovery(const DBPtr& db) {
continue; // skip and delete this file since all its operations already done
}
// read operation and execute
Status status = Status::OK();
while (status.ok()) {
WalOperationPtr operation;
operation->collection_name_ = collection_name;
status = WalOperationCodec::IterateOperation(file, operation, max_op_id);
PerformOperation(operation, db);
if (operation) {
operation->collection_name_ = collection_name;
PerformOperation(operation, db);
}
}
}
}
......@@ -182,7 +205,7 @@ WalManager::Recovery(const DBPtr& db) {
}
Status
WalManager::ReadMaxOpId() {
WalManager::Init() {
using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator;
DirectoryIterator iter(wal_path_);
DirectoryIterator end;
......@@ -190,21 +213,30 @@ WalManager::ReadMaxOpId() {
auto path = (*iter).path();
if (std::experimental::filesystem::is_directory(path)) {
std::string collection_name = path.filename().c_str();
path.append(MAX_OP_ID_FILE_NAME);
if (!std::experimental::filesystem::is_regular_file(path)) {
continue; // ignore?
}
WalFile file;
file.OpenFile(path.c_str(), WalFile::READ);
idx_t max_op = 0;
file.Read(&max_op);
// read max op id
std::experimental::filesystem::path file_path = path;
file_path.append(WAL_MAX_OP_FILE_NAME);
if (std::experimental::filesystem::is_regular_file(file_path)) {
WalFile file;
file.OpenFile(path.c_str(), WalFile::READ);
idx_t max_op = 0;
file.Read(&max_op);
std::lock_guard<std::mutex> lock(max_op_mutex_);
max_op_id_map_.insert(std::make_pair(collection_name, max_op));
}
std::lock_guard<std::mutex> lock(max_op_mutex_);
max_op_id_map_.insert(std::make_pair(collection_name, max_op));
// this collection has been deleted?
file_path = path;
file_path.append(WAL_DEL_FILE_NAME);
if (std::experimental::filesystem::is_regular_file(file_path)) {
AddCleanupTask(collection_name);
}
}
}
StartCleanupThread(); // do cleanup
return Status::OK();
}
......@@ -245,9 +277,11 @@ WalManager::RecordInsertOperation(const InsertEntityOperationPtr& operation, con
}
// insert action to db
status = db->Insert(operation->collection_name_, operation->partition_name, operation->data_chunk_, op_id);
if (!status.ok()) {
return status;
if (db) {
status = db->Insert(operation->collection_name_, operation->partition_name, operation->data_chunk_, op_id);
if (!status.ok()) {
return status;
}
}
}
......@@ -296,7 +330,11 @@ WalManager::RecordDeleteOperation(const DeleteEntityOperationPtr& operation, con
}
// delete action to db
return db->DeleteEntityByID(operation->collection_name_, operation->entity_ids_, op_id);
if (db) {
return db->DeleteEntityByID(operation->collection_name_, operation->entity_ids_, op_id);
}
return Status::OK();
}
std::string
......@@ -305,100 +343,171 @@ WalManager::ConstructFilePath(const std::string& collection_name, const std::str
std::experimental::filesystem::create_directory(full_path);
full_path.append(collection_name);
std::experimental::filesystem::create_directory(full_path);
full_path.append(file_name);
if (!file_name.empty()) {
full_path.append(file_name);
}
std::string path(full_path.c_str());
return path;
}
void
WalManager::StartCleanupThread(const std::string& collection_name) {
WalManager::AddCleanupTask(const std::string& collection_name) {
std::lock_guard<std::mutex> lck(cleanup_task_mutex_);
if (cleanup_tasks_.empty()) {
cleanup_tasks_.push_back(collection_name);
} else {
// no need to add duplicate name
std::string back = cleanup_tasks_.back();
if (back != collection_name) {
cleanup_tasks_.push_back(collection_name);
}
}
}
void
WalManager::TakeCleanupTask(std::string& collection_name) {
collection_name = "";
std::lock_guard<std::mutex> lck(cleanup_task_mutex_);
if (cleanup_tasks_.empty()) {
return;
}
collection_name = cleanup_tasks_.front();
cleanup_tasks_.pop_front();
}
void
WalManager::StartCleanupThread() {
// the previous thread finished?
std::lock_guard<std::mutex> lck(cleanup_thread_mutex_);
if (cleanup_thread_results_.empty()) {
// start a new cleanup thread
cleanup_thread_results_.push_back(
cleanup_thread_pool_.enqueue(&WalManager::CleanupThread, this, collection_name));
cleanup_thread_results_.push_back(cleanup_thread_pool_.enqueue(&WalManager::CleanupThread, this));
} else {
std::chrono::milliseconds span(1);
if (cleanup_thread_results_.back().wait_for(span) == std::future_status::ready) {
cleanup_thread_results_.pop_back();
// start a new cleanup thread
cleanup_thread_results_.push_back(
cleanup_thread_pool_.enqueue(&WalManager::CleanupThread, this, collection_name));
cleanup_thread_results_.push_back(cleanup_thread_pool_.enqueue(&WalManager::CleanupThread, this));
}
}
}
void
WalManager::CleanupThread(std::string collection_name) {
WalManager::WaitCleanupFinish() {
std::lock_guard<std::mutex> lck(cleanup_thread_mutex_);
for (auto& iter : cleanup_thread_results_) {
iter.wait();
}
}
void
WalManager::CleanupThread() {
SetThreadName("wal_clean");
std::string target_collection;
TakeCleanupTask(target_collection);
using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator;
DirectoryIterator iter_outer(wal_path_);
DirectoryIterator end_outer;
for (; iter_outer != end_outer; ++iter_outer) {
auto path_outer = (*iter_outer).path();
if (!std::experimental::filesystem::is_directory(path_outer)) {
while (!target_collection.empty()) {
std::string path = ConstructFilePath(target_collection, "");
std::experimental::filesystem::path collection_path = path;
// not a folder
if (!std::experimental::filesystem::is_directory(path)) {
TakeCleanupTask(target_collection);
continue;
}
// get max operation id
std::string file_name = path_outer.filename().c_str();
if (file_name != collection_name) {
// collection already deleted
std::experimental::filesystem::path file_path = collection_path;
file_path.append(WAL_DEL_FILE_NAME);
if (std::experimental::filesystem::is_regular_file(file_path)) {
// clean max operation id
{
std::lock_guard<std::mutex> lock(max_op_mutex_);
max_op_id_map_.erase(target_collection);
}
// clean opened file in buffer
{
std::lock_guard<std::mutex> lock(file_map_mutex_);
file_map_.erase(target_collection);
}
// remove collection folder
std::experimental::filesystem::remove_all(collection_path);
TakeCleanupTask(target_collection);
continue;
}
idx_t max_op = std::numeric_limits<idx_t>::max();
// get max operation id
idx_t max_op = 0;
{
std::lock_guard<std::mutex> lock(max_op_mutex_);
if (max_op_id_map_.find(collection_name) != max_op_id_map_.end()) {
max_op = max_op_id_map_[collection_name];
if (max_op_id_map_.find(target_collection) != max_op_id_map_.end()) {
max_op = max_op_id_map_[target_collection];
}
}
// iterate files
std::map<idx_t, std::experimental::filesystem::path> id_files;
DirectoryIterator iter_inner(path_outer);
DirectoryIterator end_inner;
for (; iter_inner != end_inner; ++iter_inner) {
auto path_inner = (*iter_inner).path();
std::string file_name = path_inner.filename().c_str();
if (file_name == MAX_OP_ID_FILE_NAME) {
std::map<idx_t, std::experimental::filesystem::path> wal_files;
DirectoryIterator file_iter(collection_path);
DirectoryIterator end_iter;
for (; file_iter != end_iter; ++file_iter) {
auto file_path = (*file_iter).path();
std::string file_name = file_path.filename().c_str();
if (file_name == WAL_MAX_OP_FILE_NAME) {
continue;
}
idx_t op_id = std::stol(file_name);
id_files.insert(std::make_pair(op_id, path_inner));
wal_files.insert(std::make_pair(op_id, file_path));
}
if (id_files.empty()) {
// no wal file
if (wal_files.empty()) {
TakeCleanupTask(target_collection);
continue;
}
// remove unused files
// the std::map arrange id in assendent, direct delete files except the last one
idx_t max_id = id_files.rbegin()->first;
std::experimental::filesystem::path max_file = id_files.rbegin()->second;
id_files.erase(max_id);
for (auto& pair : id_files) {
std::experimental::filesystem::remove(pair.second);
}
// the std::map arrange id in assendent
// if the last id < max_op, delete the wal file
for (auto& pair : wal_files) {
WalFile file;
file.OpenFile(pair.second.c_str(), WalFile::READ);
idx_t last_id = 0;
file.ReadLastOpId(last_id);
if (last_id <= max_op) {
file.CloseFile();
// makesure wal file is closed
{
std::lock_guard<std::mutex> lock(file_map_mutex_);
WalFilePtr file = file_map_[target_collection];
if (file) {
if (file->Path() == pair.second) {
file->CloseFile();
file_map_.erase(target_collection);
}
}
}
// the last wal file need to be deleted?
WalFile file;
file.OpenFile(max_file.c_str(), WalFile::READ);
idx_t last_id = 0;
file.ReadLastOpId(last_id);
if (last_id <= max_op) {
file.CloseFile();
std::experimental::filesystem::remove(max_file);
std::experimental::filesystem::remove(pair.second);
}
}
TakeCleanupTask(target_collection);
}
}
Status
WalManager::PerformOperation(const WalOperationPtr& operation, const DBPtr& db) {
if (operation == nullptr || db == nullptr) {
return Status(DB_ERROR, "null pointer");
}
Status status;
switch (operation->Type()) {
case WalOperationType::INSERT_ENTITY: {
......
......@@ -29,6 +29,9 @@
namespace milvus {
namespace engine {
extern const char* WAL_MAX_OP_FILE_NAME;
extern const char* WAL_DEL_FILE_NAME;
class WalManager {
public:
WalManager();
......@@ -42,6 +45,9 @@ class WalManager {
Status
Stop();
Status
DropCollection(const std::string& collection_name);
Status
RecordOperation(const WalOperationPtr& operation, const DBPtr& db);
......@@ -53,7 +59,7 @@ class WalManager {
private:
Status
ReadMaxOpId();
Init();
Status
RecordInsertOperation(const InsertEntityOperationPtr& operation, const DBPtr& db);
......@@ -68,10 +74,19 @@ class WalManager {
ConstructFilePath(const std::string& collection_name, const std::string& file_name);
void
StartCleanupThread(const std::string& collection_name);
AddCleanupTask(const std::string& collection_name);
void
TakeCleanupTask(std::string& collection_name);
void
CleanupThread(std::string collection_name);
StartCleanupThread();
void
WaitCleanupFinish();
void
CleanupThread();
Status
PerformOperation(const WalOperationPtr& operation, const DBPtr& db);
......@@ -87,13 +102,16 @@ class WalManager {
WalFileMap file_map_; // mapping collection name to file
std::mutex file_map_mutex_;
using MaxOpIdMap = std::unordered_map<std::string, id_t>;
using MaxOpIdMap = std::unordered_map<std::string, idx_t>;
MaxOpIdMap max_op_id_map_; // mapping collection name to max operation id
std::mutex max_op_mutex_;
ThreadPool cleanup_thread_pool_;
std::mutex cleanup_thread_mutex_;
std::list<std::future<void>> cleanup_thread_results_;
std::list<std::string> cleanup_tasks_; // cleanup target collections
std::mutex cleanup_task_mutex_;
};
} // namespace engine
......
......@@ -90,8 +90,6 @@ WalOperationCodec::WriteInsertOperation(const WalFilePtr& file, const std::strin
if (total_bytes != calculate_total_bytes) {
LOG_ENGINE_ERROR_ << "wal serialize(insert) bytes " << total_bytes << " not equal "
<< calculate_total_bytes;
} else {
LOG_ENGINE_DEBUG_ << "Wal serialize(insert) " << total_bytes << " bytes";
}
} catch (std::exception& ex) {
std::string msg = "Failed to write insert operation, reason: " + std::string(ex.what());
......@@ -144,8 +142,6 @@ WalOperationCodec::WriteDeleteOperation(const WalFilePtr& file, const IDNumbers&
if (total_bytes != calculate_total_bytes) {
LOG_ENGINE_ERROR_ << "wal serialize(delete) bytes " << total_bytes << " not equal "
<< calculate_total_bytes;
} else {
LOG_ENGINE_DEBUG_ << "Wal serialize(delete) " << total_bytes << " bytes";
}
} catch (std::exception& ex) {
std::string msg = "Failed to write insert operation, reason: " + std::string(ex.what());
......@@ -192,15 +188,13 @@ WalOperationCodec::IterateOperation(const WalFilePtr& file, WalOperationPtr& ope
if (type == WalOperationType::INSERT_ENTITY) {
// read partition name
int32_t part_name_length = 0;
read_bytes = file->Read<int32_t>(&part_name_length);
if (read_bytes <= 0) {
return Status(DB_ERROR, "End of file");
}
std::string partition_name;
read_bytes = file->ReadStr(partition_name, part_name_length);
if (read_bytes <= 0) {
return Status(DB_ERROR, "End of file");
read_bytes = file->Read<int32_t>(&part_name_length);
if (part_name_length > 0) {
read_bytes = file->ReadStr(partition_name, part_name_length);
if (read_bytes <= 0) {
return Status(DB_ERROR, "End of file");
}
}
// read fixed data
......
......@@ -52,6 +52,17 @@ WalProxy::Stop() {
return status;
}
Status
WalProxy::DropCollection(const std::string& collection_name) {
auto status = db_->DropCollection(collection_name);
if (!status.ok()) {
return status;
}
WalManager::GetInstance().DropCollection(collection_name);
return status;
}
Status
WalProxy::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
idx_t op_id) {
......
......@@ -30,6 +30,9 @@ class WalProxy : public DBProxy {
Status
Stop() override;
Status
DropCollection(const std::string& collection_name) override;
Status
Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
idx_t op_id) override;
......
......@@ -34,6 +34,8 @@ using WalOperation = milvus::engine::WalOperation;
using WalOperationPtr = milvus::engine::WalOperationPtr;
using WalOperationType = milvus::engine::WalOperationType;
using WalOperationCodec = milvus::engine::WalOperationCodec;
using InsertEntityOperation = milvus::engine::InsertEntityOperation;
using DeleteEntityOperation = milvus::engine::DeleteEntityOperation;
using WalProxy = milvus::engine::WalProxy;
void CreateChunk(DataChunkPtr& chunk, int64_t row_count, int64_t& chunk_size) {
......@@ -71,11 +73,16 @@ void CreateChunk(DataChunkPtr& chunk, int64_t row_count, int64_t& chunk_size) {
class DummyDB : public DBProxy {
public:
DummyDB(const DBOptions& options)
: DBProxy(nullptr, options) {
}
Status
Insert(const std::string& collection_name,
const std::string& partition_name,
DataChunkPtr& data_chunk,
idx_t op_id) override {
insert_count_++;
WalManager::GetInstance().OperationDone(collection_name, op_id);
return Status::OK();
}
......@@ -84,12 +91,21 @@ class DummyDB : public DBProxy {
DeleteEntityByID(const std::string& collection_name,
const IDNumbers& entity_ids,
idx_t op_id) override {
delete_count_++;
WalManager::GetInstance().OperationDone(collection_name, op_id);
return Status::OK();
}
int64_t InsertCount() const { return insert_count_; }
int64_t DeleteCount() const { return delete_count_; }
private:
int64_t insert_count_ = 0;
int64_t delete_count_ = 0;
};
using DummyDBPtr = std::shared_ptr<DummyDB>;
} // namespace
TEST_F(WalTest, WalFileTest) {
......@@ -257,27 +273,167 @@ TEST_F(WalTest, WalProxyTest) {
std::string collection_name = "col_1";
std::string partition_name = "part_1";
// write over more than 400MB data
// write over more than 400MB data, 2 wal files
for (int64_t i = 1; i <= 1000; i++) {
idx_t op_id = i;
if (i % 10 == 0) {
IDNumbers ids = {1, 2, 3};
auto status = db_->DeleteEntityByID(collection_name, ids, op_id);
auto status = db_->DeleteEntityByID(collection_name, ids, 0);
ASSERT_TRUE(status.ok());
} else {
DataChunkPtr chunk;
int64_t chunk_size = 0;
CreateChunk(chunk, 1000, chunk_size);
auto status = db_->Insert(collection_name, partition_name, chunk, op_id);
auto status = db_->Insert(collection_name, partition_name, chunk, 0);
ASSERT_TRUE(status.ok());
}
}
// find out the wal files
DBOptions opt = GetOptions();
std::experimental::filesystem::path collection_path = opt.meta_.path_;
collection_path.append(collection_name);
using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator;
std::set<idx_t> op_ids;
{
DirectoryIterator file_iter(collection_path);
DirectoryIterator end_iter;
for (; file_iter != end_iter; ++file_iter) {
auto file_path = (*file_iter).path();
std::string file_name = file_path.filename().c_str();
if (file_name == milvus::engine::WAL_MAX_OP_FILE_NAME) {
continue;
}
// read all operation ids
auto file = std::make_shared<WalFile>();
auto status = file->OpenFile(file_path, WalFile::READ);
ASSERT_TRUE(status.ok());
Status iter_status;
while (iter_status.ok()) {
WalOperationPtr operation;
iter_status = WalOperationCodec::IterateOperation(file, operation, 0);
if (operation != nullptr) {
op_ids.insert(operation->ID());
}
}
}
}
// notify operation done, the wal files will be removed after all operations done
for (auto id : op_ids) {
auto status = WalManager::GetInstance().OperationDone(collection_name, id);
ASSERT_TRUE(status.ok());
}
// wait cleanup thread finish
WalManager::GetInstance().Stop();
// check the wal files
{
DirectoryIterator file_iter(collection_path);
DirectoryIterator end_iter;
for (; file_iter != end_iter; ++file_iter) {
auto file_path = (*file_iter).path();
std::string file_name = file_path.filename().c_str();
if (file_name == milvus::engine::WAL_MAX_OP_FILE_NAME) {
continue;
}
// wal file not deleted?
ASSERT_TRUE(false);
}
}
}
TEST(WalManagerTest, WalManagerTest) {
std::string path = "/tmp/milvus_wal/test_file";
TEST_F(WalTest, WalManagerTest) {
std::string collection_name = "collection";
// construct mock db
DBOptions options;
options.meta_.path_ = "/tmp/milvus_wal";
options.wal_enable_ = true;
DummyDBPtr db_1 = std::make_shared<DummyDB>(options);
// prepare wal manager
WalManager::GetInstance().Stop();
WalManager::GetInstance().Start(options);
// write over more than 400MB data, 2 wal files
int64_t insert_count = 0;
int64_t delete_count = 0;
for (int64_t i = 1; i <= 1000; i++) {
if (i % 100 == 0) {
auto status = WalManager::GetInstance().DropCollection(collection_name);
ASSERT_TRUE(status.ok());
} else if (i % 10 == 0) {
IDNumbers ids = {1, 2, 3};
auto op = std::make_shared<DeleteEntityOperation>();
op->collection_name_ = collection_name;
op->entity_ids_ = ids;
auto status = WalManager::GetInstance().RecordOperation(op, db_1);
ASSERT_TRUE(status.ok());
delete_count++;
} else {
DataChunkPtr chunk;
int64_t chunk_size = 0;
CreateChunk(chunk, 1000, chunk_size);
auto op = std::make_shared<InsertEntityOperation>();
op->collection_name_ = collection_name;
op->partition_name = "";
op->data_chunk_ = chunk;
auto status = WalManager::GetInstance().RecordOperation(op, db_1);
ASSERT_TRUE(status.ok());
insert_count++;
}
}
ASSERT_EQ(db_1->InsertCount(), insert_count);
ASSERT_EQ(db_1->DeleteCount(), delete_count);
// test recovery
insert_count = 0;
delete_count = 0;
for (int64_t i = 1; i <= 1000; i++) {
if (i % 10 == 0) {
IDNumbers ids = {1, 2, 3};
auto op = std::make_shared<DeleteEntityOperation>();
op->collection_name_ = collection_name;
op->entity_ids_ = ids;
auto status = WalManager::GetInstance().RecordOperation(op, nullptr);
ASSERT_TRUE(status.ok());
delete_count++;
} else {
DataChunkPtr chunk;
int64_t chunk_size = 0;
CreateChunk(chunk, 1000, chunk_size);
auto op = std::make_shared<InsertEntityOperation>();
op->collection_name_ = collection_name;
op->partition_name = "";
op->data_chunk_ = chunk;
auto status = WalManager::GetInstance().RecordOperation(op, nullptr);
ASSERT_TRUE(status.ok());
insert_count++;
}
}
// WalManager::GetInstance().Start(options_);
// WalManager::GetInstance().Recovery(db_);
DummyDBPtr db_2 = std::make_shared<DummyDB>(options);
WalManager::GetInstance().Recovery(db_2);
ASSERT_EQ(db_2->InsertCount(), insert_count);
ASSERT_EQ(db_2->DeleteCount(), delete_count);
}
\ No newline at end of file
......@@ -81,7 +81,7 @@ BaseTest::InitLog() {
}
void
BaseTest::SnapshotStart(bool mock_store, milvus::engine::DBOptions options) {
BaseTest::SnapshotStart(bool mock_store, DBOptions options) {
auto store = Store::Build(options.meta_.backend_uri_, options.meta_.path_,
milvus::codec::Codec::instance().GetSuffixSet());
......@@ -136,7 +136,7 @@ BaseTest::TearDown() {
void
SnapshotTest::SetUp() {
BaseTest::SetUp();
milvus::engine::DBOptions options;
DBOptions options;
options.meta_.path_ = "/tmp/milvus_ss";
options.meta_.backend_uri_ = "mock://:@:/";
options.wal_enable_ = false;
......@@ -150,11 +150,11 @@ SnapshotTest::TearDown() {
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
milvus::engine::DBOptions
DBOptions
DBTest::GetOptions() {
milvus::cache::CpuCacheMgr::GetInstance().SetCapacity(256 * milvus::engine::MB);
auto options = milvus::engine::DBOptions();
auto options = DBOptions();
options.meta_.path_ = "/tmp/milvus_ss";
options.meta_.backend_uri_ = "mock://:@:/";
options.wal_enable_ = false;
......@@ -216,7 +216,7 @@ DBTest::TearDown() {
void
SegmentTest::SetUp() {
BaseTest::SetUp();
milvus::engine::DBOptions options;
DBOptions options;
options.meta_.path_ = "/tmp/milvus_ss";
options.meta_.backend_uri_ = "mock://:@:/";
options.wal_enable_ = false;
......@@ -251,7 +251,7 @@ MetaTest::TearDown() {
void
SchedulerTest::SetUp() {
BaseTest::SetUp();
milvus::engine::DBOptions options;
DBOptions options;
options.meta_.path_ = "/tmp/milvus_ss";
options.meta_.backend_uri_ = "mock://:@:/";
options.wal_enable_ = false;
......@@ -303,9 +303,9 @@ EventTest::TearDown() {
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
milvus::engine::DBOptions
DBOptions
WalTest::GetOptions() {
milvus::engine::DBOptions options;
DBOptions options;
options.meta_.path_ = "/tmp/milvus_wal";
options.meta_.backend_uri_ = "mock://:@:/";
options.wal_enable_ = true;
......
......@@ -88,6 +88,7 @@ using StorePtr = milvus::engine::snapshot::Store::Ptr;
using MetaAdapterPtr = milvus::engine::meta::MetaAdapterPtr;
using DB = milvus::engine::DB;
using DBOptions = milvus::engine::DBOptions;
using Status = milvus::Status;
using idx_t = milvus::engine::idx_t;
using IDNumbers = milvus::engine::IDNumbers;
......@@ -327,7 +328,7 @@ class BaseTest : public ::testing::Test {
void
InitLog();
void
SnapshotStart(bool mock_store, milvus::engine::DBOptions);
SnapshotStart(bool mock_store, DBOptions);
void
SnapshotStop();
......@@ -351,7 +352,7 @@ class DBTest : public BaseTest {
protected:
std::shared_ptr<DB> db_;
milvus::engine::DBOptions
DBOptions
GetOptions();
void
......@@ -414,7 +415,7 @@ class WalTest : public ::testing::Test {
protected:
std::shared_ptr<DB> db_;
milvus::engine::DBOptions
DBOptions
GetOptions();
void
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册