未验证 提交 fc393b3a 编写于 作者: B BossZou 提交者: GitHub

(scalar) Debug delete request (#3107)

* Debug delete request
Signed-off-by: Nyinghao.zou <yinghao.zou@zilliz.com>

* Chang previous bloom filter and deleted docs file to stale during ApplyDeletes
Signed-off-by: Nyinghao.zou <yinghao.zou@zilliz.com>

* Add delete case
Signed-off-by: Nyinghao.zou <yinghao.zou@zilliz.com>

* Add delete ut
Signed-off-by: Nyinghao.zou <yinghao.zou@zilliz.com>

* Add Delete test
Signed-off-by: Nyinghao.zou <yinghao.zou@zilliz.com>

* Format
Signed-off-by: Nyinghao.zou <yinghao.zou@zilliz.com>
Co-authored-by: NWang Xiangyu <xy.wang@zilliz.com>
上级 aad8f82d
......@@ -100,7 +100,7 @@ class DB {
DataChunkPtr& data_chunk) = 0;
virtual Status
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers entity_ids) = 0;
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) = 0;
virtual Status
ListIDInSegment(const std::string& collection_id, int64_t segment_id, IDNumbers& entity_ids) = 0;
......
......@@ -576,7 +576,7 @@ DBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_ar
}
Status
DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers entity_ids) {
DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) {
CHECK_INITIALIZED;
Status status;
......
......@@ -95,7 +95,7 @@ class DBImpl : public DB, public ConfigObserver {
DataChunkPtr& data_chunk) override;
Status
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers entity_ids) override;
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) override;
Status
Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) override;
......
......@@ -11,11 +11,17 @@
#include "db/insert/MemCollection.h"
#include <unistd.h>
#include <algorithm>
#include <chrono>
#include <cstdlib>
#include <ctime>
#include <memory>
#include <string>
#include <fiu-local.h>
#include "config/ServerConfig.h"
#include "db/Utils.h"
#include "db/snapshot/CompoundOperations.h"
......@@ -101,6 +107,22 @@ Status
MemCollection::Serialize(uint64_t wal_lsn) {
TimeRecorder recorder("MemCollection::Serialize collection " + collection_id_);
if (!doc_ids_to_delete_.empty()) {
while (true) {
auto status = ApplyDeletes();
if (status.ok()) {
break;
} else if (status.code() == SS_STALE_ERROR) {
LOG_ENGINE_WARNING_ << "ApplyDeletes is stale, try again";
continue;
} else {
return status;
}
}
}
doc_ids_to_delete_.clear();
std::lock_guard<std::mutex> lock(mutex_);
for (auto& partition_segments : mem_segments_) {
MemSegmentList& segments = partition_segments.second;
......@@ -113,13 +135,6 @@ MemCollection::Serialize(uint64_t wal_lsn) {
}
}
if (!doc_ids_to_delete_.empty()) {
auto status = ApplyDeletes();
if (!status.ok()) {
return Status(DB_ERROR, status.message());
}
}
mem_segments_.clear();
recorder.RecordSection("Finished flushing");
......@@ -152,6 +167,7 @@ MemCollection::ApplyDeletes() {
// TODO: check stale segment files here
snapshot::OperationContext context;
context.lsn = lsn_;
auto segments_op = std::make_shared<snapshot::CompoundSegmentsOperation>(context, ss);
auto segment_executor = [&](const snapshot::SegmentPtr& segment, snapshot::SegmentIterator* iterator) -> Status {
......@@ -161,6 +177,7 @@ MemCollection::ApplyDeletes() {
segment::IdBloomFilterPtr pre_bloom_filter;
STATUS_CHECK(segment_reader->LoadBloomFilter(pre_bloom_filter));
// Step 1: Check delete_id in mem
std::vector<segment::doc_id_t> delete_ids;
for (auto& id : doc_ids_to_delete_) {
if (pre_bloom_filter->Check(id)) {
......@@ -168,11 +185,11 @@ MemCollection::ApplyDeletes() {
}
}
// No entities to delete, skip
if (delete_ids.empty()) {
return Status::OK();
}
// Step 2: Load previous delete_id and merge into 'delete_ids'
segment::DeletedDocsPtr prev_del_docs;
STATUS_CHECK(segment_reader->LoadDeletedDocs(prev_del_docs));
std::vector<segment::offset_t> pre_del_ids;
......@@ -190,12 +207,32 @@ MemCollection::ApplyDeletes() {
std::sort(delete_ids.begin(), delete_ids.end());
std::set<segment::doc_id_t> ids_to_check(delete_ids.begin(), delete_ids.end());
// write delete docs
// Step 3: Mark previous deleted docs file and bloom filter file stale
auto& field_visitors_map = seg_visitor->GetFieldVisitors();
auto uid_field_visitor = seg_visitor->GetFieldVisitor(engine::DEFAULT_UID_NAME);
auto del_doc_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_DELETED_DOCS);
auto del_docs_element = del_doc_visitor->GetElement();
// TODO(yhz): Create a new delete doc file in snapshot and obtain a new SegmentFile Res
auto blm_filter_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_BLOOM_FILTER);
auto blm_filter_element = blm_filter_visitor->GetElement();
auto segment_file_executor = [&](const snapshot::SegmentFilePtr& segment_file,
snapshot::SegmentFileIterator* iterator) -> Status {
if (segment_file->GetSegmentId() != segment->GetID()) {
return Status::OK();
}
if (segment_file->GetFieldElementId() == del_docs_element->GetID() ||
segment_file->GetFieldElementId() == blm_filter_element->GetID()) {
segments_op->AddStaleSegmentFile(segment_file);
}
return Status::OK();
};
auto segment_file_iterator = std::make_shared<snapshot::SegmentFileIterator>(ss, segment_file_executor);
segment_file_iterator->Iterate();
STATUS_CHECK(segment_file_iterator->GetStatus());
// Step 4: Create new deleted docs file and bloom filter file
snapshot::SegmentFileContext del_file_context;
del_file_context.field_name = uid_field_visitor->GetField()->GetName();
del_file_context.field_element_name = del_docs_element->GetName();
......@@ -203,15 +240,11 @@ MemCollection::ApplyDeletes() {
del_file_context.partition_id = segment->GetPartitionId();
del_file_context.segment_id = segment->GetID();
snapshot::SegmentFilePtr delete_file;
segments_op->CommitNewSegmentFile(del_file_context, delete_file);
STATUS_CHECK(segments_op->CommitNewSegmentFile(del_file_context, delete_file));
auto segment_writer = std::make_shared<segment::SegmentWriter>(options_.meta_.path_, seg_visitor);
std::string del_docs_path = snapshot::GetResPath<snapshot::SegmentFile>(collection_root_path, delete_file);
// write bloom filter
auto blm_filter_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_BLOOM_FILTER);
auto blm_filter_element = blm_filter_visitor->GetElement();
snapshot::SegmentFileContext bloom_file_context;
bloom_file_context.field_name = uid_field_visitor->GetField()->GetName();
bloom_file_context.field_element_name = blm_filter_element->GetName();
......@@ -220,14 +253,15 @@ MemCollection::ApplyDeletes() {
bloom_file_context.segment_id = segment->GetID();
engine::snapshot::SegmentFile::Ptr bloom_filter_file;
segments_op->CommitNewSegmentFile(bloom_file_context, bloom_filter_file);
STATUS_CHECK(segments_op->CommitNewSegmentFile(bloom_file_context, bloom_filter_file));
std::string bloom_filter_file_path =
snapshot::GetResPath<snapshot::SegmentFile>(collection_root_path, bloom_filter_file);
auto delete_docs = std::make_shared<segment::DeletedDocs>();
// Step 5: Write to file
segment::IdBloomFilterPtr bloom_filter;
segment_writer->CreateBloomFilter(bloom_filter_file_path, bloom_filter);
STATUS_CHECK(segment_writer->CreateBloomFilter(bloom_filter_file_path, bloom_filter));
auto delete_docs = std::make_shared<segment::DeletedDocs>();
std::vector<segment::doc_id_t> uids;
STATUS_CHECK(segment_reader->LoadUids(uids));
for (size_t i = 0; i < uids.size(); i++) {
......@@ -238,10 +272,11 @@ MemCollection::ApplyDeletes() {
}
}
segments_op->CommitRowCountDelta(segment->GetID(), delete_docs->GetCount() - pre_del_ids.size(), true);
STATUS_CHECK(
segments_op->CommitRowCountDelta(segment->GetID(), delete_docs->GetCount() - pre_del_ids.size(), true));
segment_writer->WriteDeletedDocs(del_docs_path, delete_docs);
segment_writer->WriteBloomFilter(bloom_filter_file_path, bloom_filter);
STATUS_CHECK(segment_writer->WriteDeletedDocs(del_docs_path, delete_docs));
STATUS_CHECK(segment_writer->WriteBloomFilter(bloom_filter_file_path, bloom_filter));
return Status::OK();
};
......@@ -250,6 +285,10 @@ MemCollection::ApplyDeletes() {
segment_iterator->Iterate();
STATUS_CHECK(segment_iterator->GetStatus());
fiu_do_on("MemCollection.ApplyDeletes.RandomSleep", {
std::srand(std::time(nullptr));
sleep(std::rand() % 3);
});
return segments_op->Push();
}
......
......@@ -87,6 +87,14 @@ CompoundSegmentsOperation::CommitNewSegmentFile(const SegmentFileContext& contex
return Status::OK();
}
Status
CompoundSegmentsOperation::AddStaleSegmentFile(const SegmentFilePtr& stale_segment_file) {
stale_segment_files_[stale_segment_file->GetSegmentId()].push_back(stale_segment_file);
modified_segments_.insert(stale_segment_file->GetSegmentId());
return Status::OK();
}
Status
CompoundSegmentsOperation::DoExecute(StorePtr store) {
if (!context_.new_segment && stale_segment_files_.size() == 0 && new_segment_files_.size() == 0) {
......
......@@ -93,6 +93,9 @@ class CompoundSegmentsOperation : public CompoundBaseOperation<CompoundSegmentsO
Status DoExecute(StorePtr) override;
Status
AddStaleSegmentFile(const SegmentFilePtr&);
Status
CommitNewSegment(const OperationContext& context, SegmentPtr&);
......
......@@ -227,10 +227,12 @@ SegmentCommitOperation::DoExecute(StorePtr store) {
resource_->GetMappings().erase(stale_segment_file->GetID());
size -= stale_segment_file->GetSize();
}
} else {
} else if (context_.new_segment && GetStartedSS()->GetResource<Partition>(context_.new_segment->GetPartitionId())) {
resource_ =
std::make_shared<SegmentCommit>(GetStartedSS()->GetLatestSchemaCommitId(),
context_.new_segment->GetPartitionId(), context_.new_segment->GetID());
} else {
return Status(SS_STALE_ERROR, "Stale Error");
}
for (auto& new_segment_file : context_.new_segment_files) {
resource_->GetMappings().insert(new_segment_file->GetID());
......
......@@ -212,7 +212,7 @@ SegmentWriter::CreateBloomFilter(const std::string& file_path, IdBloomFilterPtr&
try {
ss_codec.GetIdBloomFilterFormat()->Create(fs_ptr_, file_path, bloom_filter_ptr);
} catch (std::exception& er) {
return Status(DB_ERROR, "Create a new bloom filter fail");
return Status(DB_ERROR, "Create a new bloom filter fail: " + std::string(er.what()));
}
return Status::OK();
......
......@@ -31,7 +31,7 @@ namespace {
const char* VECTOR_FIELD_NAME = "vector";
milvus::Status
CreateCollection(std::shared_ptr<DBImpl> db, const std::string& collection_name, const LSN_TYPE& lsn) {
CreateCollection(const std::shared_ptr<DBImpl>& db, const std::string& collection_name, const LSN_TYPE& lsn) {
CreateCollectionContext context;
context.lsn = lsn;
auto collection_schema = std::make_shared<Collection>(collection_name);
......@@ -734,3 +734,153 @@ TEST_F(DBTest, StatsTest) {
std::string ss = json_stats.dump();
std::cout << ss << std::endl;
}
TEST_F(DBTest, DeleteEntitiesTest) {
std::string collection_name = "test_collection_delete_";
CreateCollection2(db_, collection_name, 0);
auto insert_entities = [&](const std::string& collection, const std::string& partition,
uint64_t count, uint64_t batch_index, milvus::engine::IDNumbers& ids) -> Status {
milvus::engine::DataChunkPtr data_chunk;
BuildEntities(count, batch_index, data_chunk);
STATUS_CHECK(db_->Insert(collection, partition, data_chunk));
STATUS_CHECK(db_->Flush(collection));
auto iter = data_chunk->fixed_fields_.find(milvus::engine::DEFAULT_UID_NAME);
if (iter == data_chunk->fixed_fields_.end()) {
return Status(1, "Cannot find uid field");
}
auto& ids_buffer = iter->second;
ids.resize(data_chunk->count_);
memcpy(ids.data(), ids_buffer->data_.data(), ids_buffer->Size());
return Status::OK();
};
milvus::engine::IDNumbers entity_ids;
auto status = insert_entities(collection_name, "", 10000, 0, entity_ids);
ASSERT_TRUE(status.ok()) << status.ToString();
milvus::engine::IDNumbers delete_ids = {entity_ids[0]};
status = db_->DeleteEntityByID(collection_name, delete_ids);
ASSERT_TRUE(status.ok()) << status.ToString();
milvus::engine::IDNumbers whole_delete_ids;
fiu_init(0);
fiu_enable("MemCollection.ApplyDeletes.RandomSleep", 1, nullptr, 0);
for (size_t i = 0; i < 5; i++) {
std::string partition0 = collection_name + "p_" + std::to_string(i) + "_0";
std::string partition1 = collection_name + "p_" + std::to_string(i) + "_1";
status = db_->CreatePartition(collection_name, partition0);
ASSERT_TRUE(status.ok()) << status.ToString();
status = db_->CreatePartition(collection_name, partition1);
ASSERT_TRUE(status.ok()) << status.ToString();
milvus::engine::IDNumbers partition0_ids;
status = insert_entities(collection_name, partition0, 10000, 2 * i + 1, partition0_ids);
ASSERT_TRUE(status.ok()) << status.ToString();
milvus::engine::IDNumbers partition1_ids;
status = insert_entities(collection_name, partition1, 10000, 2 * i + 2, partition1_ids);
ASSERT_TRUE(status.ok()) << status.ToString();
milvus::engine::IDNumbers partition_delete_ids = {partition0_ids[0], partition1_ids[0]};
whole_delete_ids.insert(whole_delete_ids.begin(), partition_delete_ids.begin(), partition_delete_ids.end());
db_->DeleteEntityByID(collection_name, partition_delete_ids);
ASSERT_TRUE(status.ok()) << status.ToString();
status = db_->DropPartition(collection_name, partition1);
ASSERT_TRUE(status.ok()) << status.ToString();
}
sleep(3);
fiu_disable("MemCollection.ApplyDeletes.RandomSleep");
std::vector<bool> valid_row;
milvus::engine::DataChunkPtr entity_data_chunk;
for (auto& id : whole_delete_ids) {
status = db_->GetEntityByID(collection_name, {id}, {}, valid_row, entity_data_chunk);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_EQ(entity_data_chunk->count_, 0);
}
}
TEST_F(DBTest, DeleteStaleTest) {
auto insert_entities = [&](const std::string& collection, const std::string& partition,
uint64_t count, uint64_t batch_index, milvus::engine::IDNumbers& ids) -> Status {
milvus::engine::DataChunkPtr data_chunk;
BuildEntities(count, batch_index, data_chunk);
STATUS_CHECK(db_->Insert(collection, partition, data_chunk));
STATUS_CHECK(db_->Flush(collection));
auto iter = data_chunk->fixed_fields_.find(milvus::engine::DEFAULT_UID_NAME);
if (iter == data_chunk->fixed_fields_.end()) {
return Status(1, "Cannot find uid field");
}
auto& ids_buffer = iter->second;
ids.resize(data_chunk->count_);
memcpy(ids.data(), ids_buffer->data_.data(), ids_buffer->Size());
return Status::OK();
};
auto build_task = [&](const std::string& collection, const std::string& field) {
milvus::engine::CollectionIndex index;
index.index_name_ = "my_index1";
index.index_type_ = milvus::knowhere::IndexEnum::INDEX_FAISS_IVFFLAT;
index.metric_name_ = milvus::knowhere::Metric::L2;
index.extra_params_["nlist"] = 2048;
auto status = db_->CreateIndex(dummy_context_, collection, field, index);
ASSERT_TRUE(status.ok()) << status.ToString();
};
auto delete_task = [&](const std::string& collection, const milvus::engine::IDNumbers& del_ids) {
auto status = Status::OK();
for (size_t i = 0; i < 5; i++) {
milvus::engine::IDNumbers ids = {del_ids[2 * i], del_ids[2 * i + 1]};
status = db_->DeleteEntityByID(collection, ids);
ASSERT_TRUE(status.ok()) << status.ToString();
sleep(1);
}
};
const std::string collection_name = "test_delete_stale_";
auto status = CreateCollection2(db_, collection_name, 0);
ASSERT_TRUE(status.ok()) << status.ToString();
milvus::engine::IDNumbers del_ids;
milvus::engine::IDNumbers entity_ids;
status = insert_entities(collection_name, "", 10000, 0, entity_ids);
ASSERT_TRUE(status.ok()) << status.ToString();
status = db_->Flush(collection_name);
ASSERT_TRUE(status.ok()) << status.ToString();
milvus::engine::IDNumbers entity_ids2;
status = insert_entities(collection_name, "", 10000, 1, entity_ids2);
ASSERT_TRUE(status.ok()) << status.ToString();
status = db_->Flush(collection_name);
ASSERT_TRUE(status.ok()) << status.ToString();
for (size_t i = 0; i < 5; i ++) {
del_ids.push_back(entity_ids[i]);
del_ids.push_back(entity_ids2[i]);
}
fiu_init(0);
fiu_enable("MemCollection.ApplyDeletes.RandomSleep", 1, nullptr, 0);
auto build_thread = std::thread(build_task, collection_name, VECTOR_FIELD_NAME);
auto delete_thread = std::thread(delete_task, collection_name, del_ids);
build_thread.join();
delete_thread.join();
// sleep(15);
fiu_disable("MemCollection.ApplyDeletes.RandomSleep");
//
// std::vector<bool> valid_row;
// milvus::engine::DataChunkPtr entity_data_chunk;
// std::cout << "Get Entity" << std::endl;
// for (size_t j = 0; j < del_ids.size(); j++) {
// status = db_->GetEntityByID(collection_name, {del_ids[j]}, {}, valid_row, entity_data_chunk);
// ASSERT_TRUE(status.ok()) << status.ToString();
// ASSERT_EQ(entity_data_chunk->count_, 0) << "[" << j << "] Delete id " << del_ids[j] << " failed.";
// }
}
......@@ -114,7 +114,6 @@ class TestDeleteBase:
status = connect.delete_entity_by_id(collection, delete_ids)
assert status
@pytest.mark.level(2)
def test_insert_delete_A(self, connect, collection):
'''
target: test delete entity
......@@ -130,7 +129,6 @@ class TestDeleteBase:
res_count = connect.count_entities(collection)
assert res_count == nb - 1
@pytest.mark.level(2)
def test_insert_delete_B(self, connect, collection):
'''
target: test delete entity
......@@ -147,7 +145,6 @@ class TestDeleteBase:
res_count = connect.count_entities(collection)
assert res_count == 0
@pytest.mark.level(2)
def test_delete_exceed_limit(self, connect, collection):
'''
target: test delete entity
......@@ -163,7 +160,6 @@ class TestDeleteBase:
assert res_count == 0
# TODO
@pytest.mark.level(2)
def test_flush_after_delete(self, connect, collection):
'''
target: test delete entity
......@@ -197,11 +193,11 @@ class TestDeleteBase:
assert res_count == nb - len(delete_ids)
# TODO
@pytest.mark.level(2)
def test_insert_same_ids_after_delete(self, connect, collection):
'''
method: add entities and delete
expected: status DELETED
note: Not flush after delete
'''
insert_ids = [i for i in range(nb)]
ids = connect.insert(collection, entities, insert_ids)
......@@ -216,7 +212,6 @@ class TestDeleteBase:
assert res_count == nb - 1
# TODO
@pytest.mark.level(2)
def test_insert_same_ids_after_delete_binary(self, connect, binary_collection):
'''
method: add entities, with the same id and delete the ids
......@@ -356,7 +351,6 @@ class TestDeleteBase:
assert status
# TODO:
@pytest.mark.level(2)
def test_insert_tags_delete(self, connect, collection):
'''
method: add entitys with given two tags, delete entities with the return ids
......@@ -422,13 +416,11 @@ class TestDeleteInvalid(object):
with pytest.raises(Exception) as e:
status = connect.delete_entity_by_id(collection, [invalid_id])
@pytest.mark.level(2)
def test_delete_entity_ids_invalid(self, connect, collection, gen_entity_id):
invalid_id = gen_entity_id
with pytest.raises(Exception) as e:
status = connect.delete_entity_by_id(collection, [1, invalid_id])
@pytest.mark.level(2)
def test_delete_entity_with_invalid_collection_name(self, connect, get_collection_name):
collection_name = get_collection_name
with pytest.raises(Exception) as e:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册