未验证 提交 fe8060f4 编写于 作者: G groot 提交者: GitHub

fix wal bug for delete entity (#3940)

* fix wal bug for delete entity
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* add unittest
Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 cb753909
......@@ -92,6 +92,9 @@ MemCollection::Delete(const std::vector<idx_t>& ids, idx_t op_id) {
ids_to_delete_.insert(id);
}
// record max delete operation id here, for the case that no insert but only delete action performed
max_delete_op_id_ = (op_id > max_delete_op_id_) ? op_id : max_delete_op_id_;
// Add the id to mem segments so it can be applied during the next flush
std::lock_guard<std::mutex> lock(mem_mutex_);
for (auto& partition_segments : mem_segments_) {
......@@ -337,13 +340,17 @@ MemCollection::CreateDeletedDocsBloomFilter(const std::shared_ptr<snapshot::Comp
Status
MemCollection::SerializeSegments() {
std::lock_guard<std::mutex> lock(mem_mutex_);
if (mem_segments_.empty()) {
return Status::OK();
}
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_id_));
if (mem_segments_.empty()) {
// for the case that no insert but only delete action performed
// notify wal manager the delete operations has been done
WalManager::GetInstance().OperationDone(ss->GetName(), max_delete_op_id_);
return Status::OK();
}
snapshot::OperationContext context;
auto operation = std::make_shared<snapshot::MultiSegmentsOperation>(context, ss);
......
......@@ -81,6 +81,7 @@ class MemCollection {
std::mutex mem_mutex_;
std::unordered_set<idx_t> ids_to_delete_;
idx_t max_delete_op_id_ = 0;
int64_t segment_row_count_ = 0;
};
......
......@@ -155,14 +155,14 @@ WalManager::OperationDone(const std::string& collection_name, idx_t op_id) {
return Status::OK();
}
bool start_clecnup = false;
bool start_cleanup = false;
{
// record max operation id for each collection
std::lock_guard<std::mutex> lock(max_op_mutex_);
idx_t last_id = max_op_id_map_[collection_name];
if (op_id > last_id) {
max_op_id_map_[collection_name] = op_id;
start_clecnup = true;
start_cleanup = true;
// write max op id to disk
std::string path = ConstructFilePath(collection_name, WAL_MAX_OP_FILE_NAME);
......@@ -174,7 +174,7 @@ WalManager::OperationDone(const std::string& collection_name, idx_t op_id) {
}
}
if (start_clecnup) {
if (start_cleanup) {
AddCleanupTask(collection_name);
StartCleanupThread();
}
......@@ -258,6 +258,17 @@ WalManager::Recovery(const DBPtr& db, const CollectionMaxOpIDMap& max_op_ids) {
}
}
idx_t
WalManager::GetMaxOperationID(const std::string& collection_name) {
std::lock_guard<std::mutex> lock(max_op_mutex_);
auto pair = max_op_id_map_.find(collection_name);
if (pair != max_op_id_map_.end()) {
return pair->second;
}
return 0;
}
Status
WalManager::Init() {
sync_mode_ = config.wal.sync_mode();
......@@ -304,6 +315,7 @@ WalManager::Init() {
Status
WalManager::RecordInsertOperation(const InsertEntityOperationPtr& operation, const DBPtr& db) {
idx_t op_id = id_gen_.GetNextIDNumber();
operation->SetID(op_id);
DataChunkPtr& chunk = operation->data_chunk_;
int64_t chunk_size = utils::GetSizeOfChunk(chunk);
......@@ -347,6 +359,7 @@ WalManager::RecordInsertOperation(const InsertEntityOperationPtr& operation, con
Status
WalManager::RecordDeleteOperation(const DeleteEntityOperationPtr& operation, const DBPtr& db) {
idx_t op_id = id_gen_.GetNextIDNumber();
operation->SetID(op_id);
int64_t append_size = operation->entity_ids_.size() * sizeof(idx_t);
// open wal file
......
......@@ -59,6 +59,9 @@ class WalManager {
Status
Recovery(const DBPtr& db, const CollectionMaxOpIDMap& max_op_ids);
idx_t
GetMaxOperationID(const std::string& collection_name);
private:
WalManager();
......
......@@ -431,7 +431,7 @@ TEST_F(WalTest, WalProxyTest) {
}
}
TEST_F(WalTest, WalManagerTest) {
TEST_F(WalTest, WalManagerTest1) {
// construct mock db
DBOptions options;
options.wal_path_ = "/tmp/milvus_wal";
......@@ -459,6 +459,9 @@ TEST_F(WalTest, WalManagerTest) {
auto status = WalManager::GetInstance().RecordOperation(op, db_1);
ASSERT_TRUE(status.ok());
idx_t max_done_id = WalManager::GetInstance().GetMaxOperationID(COLLECTION_NAME);
ASSERT_EQ(max_done_id, op->ID());
delete_count++;
} else {
DataChunkPtr chunk;
......@@ -473,6 +476,9 @@ TEST_F(WalTest, WalManagerTest) {
auto status = WalManager::GetInstance().RecordOperation(op, db_1);
ASSERT_TRUE(status.ok());
idx_t max_done_id = WalManager::GetInstance().GetMaxOperationID(COLLECTION_NAME);
ASSERT_EQ(max_done_id, op->ID());
insert_count++;
}
}
......@@ -480,10 +486,11 @@ TEST_F(WalTest, WalManagerTest) {
ASSERT_EQ(db_1->InsertCount(), insert_count);
ASSERT_EQ(db_1->DeleteCount(), delete_count);
// test recovery
// firstly write some operations to wal, not apply to DB
insert_count = 0;
delete_count = 0;
idx_t max_op_id = 0;
for (int64_t i = 1; i <= 1000; i++) {
if (i % 10 == 0) {
IDNumbers ids = {1, 2, 3};
......@@ -495,6 +502,7 @@ TEST_F(WalTest, WalManagerTest) {
auto status = WalManager::GetInstance().RecordOperation(op, nullptr);
ASSERT_TRUE(status.ok());
max_op_id = (op->ID() > max_op_id) ? op->ID() : max_op_id;
delete_count++;
} else {
DataChunkPtr chunk;
......@@ -509,13 +517,56 @@ TEST_F(WalTest, WalManagerTest) {
auto status = WalManager::GetInstance().RecordOperation(op, nullptr);
ASSERT_TRUE(status.ok());
max_op_id = (op->ID() > max_op_id) ? op->ID() : max_op_id;
insert_count++;
}
}
// secondly call wal manager to recovery
DummyDBPtr db_2 = std::make_shared<DummyDB>(options);
milvus::engine::CollectionMaxOpIDMap max_op_ids;
WalManager::GetInstance().Recovery(db_2, max_op_ids);
ASSERT_EQ(db_2->InsertCount(), insert_count);
ASSERT_EQ(db_2->DeleteCount(), delete_count);
idx_t max_done_id = WalManager::GetInstance().GetMaxOperationID(COLLECTION_NAME);
ASSERT_EQ(max_done_id, max_op_id);
}
TEST_F(WalTest, WalManagerTest2) {
// construct mock db
DBOptions options;
options.wal_path_ = "/tmp/milvus_wal";
options.wal_enable_ = true;
// prepare wal manager
WalManager::GetInstance().Stop();
WalManager::GetInstance().Start(options);
// do some delete operations without insert, write the operations into wal, not apply to DB
int64_t delete_count = 0;
idx_t max_op_id = 0;
for (int64_t i = 1; i <= 100; i++) {
IDNumbers ids = {i};
auto op = std::make_shared<DeleteEntityOperation>();
op->collection_name_ = COLLECTION_NAME;
op->entity_ids_ = ids;
auto status = WalManager::GetInstance().RecordOperation(op, nullptr);
ASSERT_TRUE(status.ok());
max_op_id = (op->ID() > max_op_id) ? op->ID() : max_op_id;
delete_count++;
}
// test recovery the delete operations
DummyDBPtr db = std::make_shared<DummyDB>(options);
milvus::engine::CollectionMaxOpIDMap max_op_ids;
WalManager::GetInstance().Recovery(db, max_op_ids);
ASSERT_EQ(db->DeleteCount(), delete_count);
idx_t max_done_id = WalManager::GetInstance().GetMaxOperationID(COLLECTION_NAME);
ASSERT_EQ(max_done_id, max_op_id);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册