未验证 提交 75868b20 编写于 作者: G groot 提交者: GitHub

fix wal test case issue (#3380)

* prepare change memmanager for wal
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* fix wal test case
Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 03c0415e
......@@ -26,6 +26,7 @@
namespace milvus {
namespace engine {
const char* WAL_DATA_FOLDER = "wal";
const char* WAL_MAX_OP_FILE_NAME = "max_op";
const char* WAL_DEL_FILE_NAME = "del";
......@@ -41,9 +42,11 @@ WalManager::GetInstance() {
Status
WalManager::Start(const DBOptions& options) {
enable_ = options.wal_enable_;
wal_path_ = options.meta_.path_;
insert_buffer_size_ = options.insert_buffer_size_;
std::experimental::filesystem::path wal_path(options.meta_.path_);
wal_path.append((WAL_DATA_FOLDER));
wal_path_ = wal_path.c_str();
CommonUtil::CreateDirectory(wal_path_);
auto status = Init();
......@@ -149,63 +152,68 @@ Status
WalManager::Recovery(const DBPtr& db) {
WaitCleanupFinish();
using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator;
DirectoryIterator iter_outer(wal_path_);
DirectoryIterator end_outer;
for (; iter_outer != end_outer; ++iter_outer) {
auto path_outer = (*iter_outer).path();
if (!std::experimental::filesystem::is_directory(path_outer)) {
continue;
}
std::string collection_name = path_outer.filename().c_str();
// iterate files
std::map<idx_t, std::experimental::filesystem::path> id_files;
DirectoryIterator iter_inner(path_outer);
DirectoryIterator end_inner;
for (; iter_inner != end_inner; ++iter_inner) {
auto path_inner = (*iter_inner).path();
std::string file_name = path_inner.filename().c_str();
if (file_name == WAL_MAX_OP_FILE_NAME) {
try {
using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator;
DirectoryIterator iter_outer(wal_path_);
DirectoryIterator end_outer;
for (; iter_outer != end_outer; ++iter_outer) {
auto path_outer = (*iter_outer).path();
if (!std::experimental::filesystem::is_directory(path_outer)) {
continue;
}
idx_t op_id = std::stol(file_name);
id_files.insert(std::make_pair(op_id, path_inner));
}
// the max operation id
idx_t max_op_id = 0;
{
std::lock_guard<std::mutex> lock(max_op_mutex_);
if (max_op_id_map_.find(collection_name) != max_op_id_map_.end()) {
max_op_id = max_op_id_map_[collection_name];
std::string collection_name = path_outer.filename().c_str();
// iterate files
std::map<idx_t, std::experimental::filesystem::path> id_files;
DirectoryIterator iter_inner(path_outer);
DirectoryIterator end_inner;
for (; iter_inner != end_inner; ++iter_inner) {
auto path_inner = (*iter_inner).path();
std::string file_name = path_inner.filename().c_str();
if (file_name == WAL_MAX_OP_FILE_NAME) {
continue;
}
idx_t op_id = std::stol(file_name);
id_files.insert(std::make_pair(op_id, path_inner));
}
}
// id_files arrange id in assendent, we know which file should be read
for (auto& pair : id_files) {
WalFilePtr file = std::make_shared<WalFile>();
file->OpenFile(pair.second.c_str(), WalFile::READ);
idx_t last_id = 0;
file->ReadLastOpId(last_id);
if (last_id <= max_op_id) {
file->CloseFile();
std::experimental::filesystem::remove(pair.second);
continue; // skip and delete this file since all its operations already done
// the max operation id
idx_t max_op_id = 0;
{
std::lock_guard<std::mutex> lock(max_op_mutex_);
if (max_op_id_map_.find(collection_name) != max_op_id_map_.end()) {
max_op_id = max_op_id_map_[collection_name];
}
}
// read operation and execute
Status status = Status::OK();
while (status.ok()) {
WalOperationPtr operation;
status = WalOperationCodec::IterateOperation(file, operation, max_op_id);
if (operation) {
operation->collection_name_ = collection_name;
PerformOperation(operation, db);
// id_files arrange id in assendent, we know which file should be read
for (auto& pair : id_files) {
WalFilePtr file = std::make_shared<WalFile>();
file->OpenFile(pair.second.c_str(), WalFile::READ);
idx_t last_id = 0;
file->ReadLastOpId(last_id);
if (last_id <= max_op_id) {
file->CloseFile();
std::experimental::filesystem::remove(pair.second);
continue; // skip and delete this file since all its operations already done
}
// read operation and execute
Status status = Status::OK();
while (status.ok()) {
WalOperationPtr operation;
status = WalOperationCodec::IterateOperation(file, operation, max_op_id);
if (operation) {
operation->collection_name_ = collection_name;
PerformOperation(operation, db);
}
}
}
}
} catch (std::exception& ex) {
std::string msg = "Failed to recovery wal, reason: " + std::string(ex.what());
return Status(DB_ERROR, msg);
}
return Status::OK();
......@@ -213,34 +221,39 @@ WalManager::Recovery(const DBPtr& db) {
Status
WalManager::Init() {
using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator;
DirectoryIterator iter(wal_path_);
DirectoryIterator end;
for (; iter != end; ++iter) {
auto path = (*iter).path();
if (std::experimental::filesystem::is_directory(path)) {
std::string collection_name = path.filename().c_str();
// read max op id
std::experimental::filesystem::path file_path = path;
file_path.append(WAL_MAX_OP_FILE_NAME);
if (std::experimental::filesystem::is_regular_file(file_path)) {
WalFile file;
file.OpenFile(path.c_str(), WalFile::READ);
idx_t max_op = 0;
file.Read(&max_op);
std::lock_guard<std::mutex> lock(max_op_mutex_);
max_op_id_map_.insert(std::make_pair(collection_name, max_op));
}
try {
using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator;
DirectoryIterator iter(wal_path_);
DirectoryIterator end;
for (; iter != end; ++iter) {
auto path = (*iter).path();
if (std::experimental::filesystem::is_directory(path)) {
std::string collection_name = path.filename().c_str();
// read max op id
std::experimental::filesystem::path file_path = path;
file_path.append(WAL_MAX_OP_FILE_NAME);
if (std::experimental::filesystem::is_regular_file(file_path)) {
WalFile file;
file.OpenFile(path.c_str(), WalFile::READ);
idx_t max_op = 0;
file.Read(&max_op);
std::lock_guard<std::mutex> lock(max_op_mutex_);
max_op_id_map_.insert(std::make_pair(collection_name, max_op));
}
// this collection has been deleted?
file_path = path;
file_path.append(WAL_DEL_FILE_NAME);
if (std::experimental::filesystem::is_regular_file(file_path)) {
AddCleanupTask(collection_name);
// this collection has been deleted?
file_path = path;
file_path.append(WAL_DEL_FILE_NAME);
if (std::experimental::filesystem::is_regular_file(file_path)) {
AddCleanupTask(collection_name);
}
}
}
} catch (std::exception& ex) {
std::string msg = "Failed to initial wal, reason: " + std::string(ex.what());
return Status(DB_ERROR, msg);
}
StartCleanupThread(); // do cleanup
......@@ -263,24 +276,29 @@ WalManager::RecordInsertOperation(const InsertEntityOperationPtr& operation, con
DataChunkPtr& chunk = chunks[i];
int64_t chunk_size = utils::GetSizeOfChunk(chunk);
// open wal file
std::string path = ConstructFilePath(operation->collection_name_, std::to_string(op_id));
if (!path.empty()) {
std::lock_guard<std::mutex> lock(file_map_mutex_);
WalFilePtr file = file_map_[operation->collection_name_];
if (file == nullptr) {
file = std::make_shared<WalFile>();
file_map_[operation->collection_name_] = file;
file->OpenFile(path, WalFile::APPEND_WRITE);
} else if (!file->IsOpened() || file->ExceedMaxSize(chunk_size)) {
file->OpenFile(path, WalFile::APPEND_WRITE);
}
try {
// open wal file
std::string path = ConstructFilePath(operation->collection_name_, std::to_string(op_id));
if (!path.empty()) {
std::lock_guard<std::mutex> lock(file_map_mutex_);
WalFilePtr file = file_map_[operation->collection_name_];
if (file == nullptr) {
file = std::make_shared<WalFile>();
file_map_[operation->collection_name_] = file;
file->OpenFile(path, WalFile::APPEND_WRITE);
} else if (!file->IsOpened() || file->ExceedMaxSize(chunk_size)) {
file->OpenFile(path, WalFile::APPEND_WRITE);
}
// write to wal file
status = WalOperationCodec::WriteInsertOperation(file, operation->partition_name, chunk, op_id);
if (!status.ok()) {
return status;
// write to wal file
status = WalOperationCodec::WriteInsertOperation(file, operation->partition_name, chunk, op_id);
if (!status.ok()) {
return status;
}
}
} catch (std::exception& ex) {
std::string msg = "Failed to record insert operation, reason: " + std::string(ex.what());
return Status(DB_ERROR, msg);
}
// insert action to db
......@@ -317,23 +335,28 @@ WalManager::RecordDeleteOperation(const DeleteEntityOperationPtr& operation, con
int64_t append_size = operation->entity_ids_.size() * sizeof(idx_t);
// open wal file
std::string path = ConstructFilePath(operation->collection_name_, std::to_string(op_id));
if (!path.empty()) {
std::lock_guard<std::mutex> lock(file_map_mutex_);
WalFilePtr file = file_map_[operation->collection_name_];
if (file == nullptr) {
file = std::make_shared<WalFile>();
file_map_[operation->collection_name_] = file;
file->OpenFile(path, WalFile::APPEND_WRITE);
} else if (!file->IsOpened() || file->ExceedMaxSize(append_size)) {
file->OpenFile(path, WalFile::APPEND_WRITE);
}
try {
std::string path = ConstructFilePath(operation->collection_name_, std::to_string(op_id));
if (!path.empty()) {
std::lock_guard<std::mutex> lock(file_map_mutex_);
WalFilePtr file = file_map_[operation->collection_name_];
if (file == nullptr) {
file = std::make_shared<WalFile>();
file_map_[operation->collection_name_] = file;
file->OpenFile(path, WalFile::APPEND_WRITE);
} else if (!file->IsOpened() || file->ExceedMaxSize(append_size)) {
file->OpenFile(path, WalFile::APPEND_WRITE);
}
// write to wal file
auto status = WalOperationCodec::WriteDeleteOperation(file, operation->entity_ids_, op_id);
if (!status.ok()) {
return status;
// write to wal file
auto status = WalOperationCodec::WriteDeleteOperation(file, operation->entity_ids_, op_id);
if (!status.ok()) {
return status;
}
}
} catch (std::exception& ex) {
std::string msg = "Failed to record delete operation, reason: " + std::string(ex.what());
return Status(DB_ERROR, msg);
}
// delete action to db
......
......@@ -29,6 +29,7 @@
namespace milvus {
namespace engine {
extern const char* WAL_DATA_FOLDER;
extern const char* WAL_MAX_OP_FILE_NAME;
extern const char* WAL_DEL_FILE_NAME;
......
......@@ -54,13 +54,8 @@ WalProxy::Stop() {
Status
WalProxy::DropCollection(const std::string& collection_name) {
auto status = db_->DropCollection(collection_name);
if (!status.ok()) {
return status;
}
WalManager::GetInstance().DropCollection(collection_name);
return status;
return db_->DropCollection(collection_name);
}
Status
......
......@@ -292,6 +292,7 @@ TEST_F(WalTest, WalProxyTest) {
// find out the wal files
DBOptions opt = GetOptions();
std::experimental::filesystem::path collection_path = opt.meta_.path_;
collection_path.append(milvus::engine::WAL_DATA_FOLDER);
collection_path.append(collection_name);
using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator;
......
......@@ -27,7 +27,7 @@
#include "cache/GpuCacheMgr.h"
#include "config/ServerConfig.h"
#include "codecs/Codec.h"
#include "db/DBImpl.h"
#include "db/DBFactory.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/Snapshots.h"
......@@ -174,7 +174,8 @@ DBTest::SetUp() {
auto trace_context = std::make_shared<milvus::tracing::TraceContext>(mock_span);
dummy_context_->SetTraceContext(trace_context);
db_ = std::make_shared<milvus::engine::DBImpl>(GetOptions());
db_ = milvus::engine::DBFactory::BuildDB(GetOptions());
db_->Start();
auto res_mgr = milvus::scheduler::ResMgrInst::GetInstance();
res_mgr->Clear();
......@@ -196,6 +197,7 @@ DBTest::SetUp() {
void
DBTest::TearDown() {
db_->Stop();
db_ = nullptr; // db must be stopped before JobMgr and Snapshot
milvus::scheduler::JobMgrInst::GetInstance()->Stop();
......@@ -222,12 +224,14 @@ SegmentTest::SetUp() {
options.wal_enable_ = false;
BaseTest::SnapshotStart(false, options);
db_ = std::make_shared<milvus::engine::DBImpl>(options);
db_ = milvus::engine::DBFactory::BuildDB(options);
db_->Start();
}
void
SegmentTest::TearDown() {
BaseTest::SnapshotStop();
db_->Stop();
db_ = nullptr;
BaseTest::TearDown();
}
......@@ -256,7 +260,8 @@ SchedulerTest::SetUp() {
options.meta_.backend_uri_ = "mock://:@:/";
options.wal_enable_ = false;
BaseTest::SnapshotStart(true, options);
db_ = std::make_shared<milvus::engine::DBImpl>(options);
db_ = milvus::engine::DBFactory::BuildDB(options);
db_->Start();
auto res_mgr = milvus::scheduler::ResMgrInst::GetInstance();
res_mgr->Clear();
......@@ -278,6 +283,7 @@ SchedulerTest::SetUp() {
void
SchedulerTest::TearDown() {
db_->Stop();
db_ = nullptr; // db must be stopped before JobMgr and Snapshot
milvus::scheduler::JobMgrInst::GetInstance()->Stop();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册