From ef8171b90cfd143d8f3c99cf2556273a077ddfea Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Sun, 5 Jul 2020 00:02:12 +0800 Subject: [PATCH] (db/snapshot): Bug fix in operations and unittest (#2744) * (db/snapshot): fix bugs and update UT Signed-off-by: peng.xu * (db/snapshot): update Snapshot ToString Signed-off-by: peng.xu * (db/snapshot): add DropIndex API Signed-off-by: peng.xu * (db/snapshot): Add DropIndex API UT Signed-off-by: peng.xu --- core/src/db/SSDBImpl.cpp | 31 +++ core/src/db/SSDBImpl.h | 3 + core/src/db/snapshot/CompoundOperations.cpp | 27 +- core/src/db/snapshot/Context.h | 5 + core/src/db/snapshot/ResourceOperations.cpp | 87 ++++++- core/src/db/snapshot/ResourceOperations.h | 27 ++ core/src/db/snapshot/Snapshot.cpp | 18 ++ core/src/db/snapshot/Store.h | 6 +- core/unittest/ssdb/test_db.cpp | 122 +++++---- core/unittest/ssdb/test_snapshot.cpp | 274 ++++---------------- core/unittest/ssdb/utils.h | 220 +++++++++++++++- 11 files changed, 540 insertions(+), 280 deletions(-) diff --git a/core/src/db/SSDBImpl.cpp b/core/src/db/SSDBImpl.cpp index c2b0deeaf..2052eb230 100644 --- a/core/src/db/SSDBImpl.cpp +++ b/core/src/db/SSDBImpl.cpp @@ -209,6 +209,37 @@ SSDBImpl::ShowPartitions(const std::string& collection_name, std::vectorGetCollectionCommit()->GetLsn(); + auto field_element_id = ss->GetFieldElementId(field_name, field_element_name); + if (field_element_id == 0) { + std::stringstream emsg; + emsg << "Invalid field name: \"" << field_name; + emsg << "\" or field element name: \"" << field_element_name << "\""; + return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); + } + context.stale_field_element = ss->GetResource(field_element_id); + auto op = std::make_shared(context, ss); + STATUS_CHECK(op->Push()); + + // SS TODO: Start merge task needed? + /* std::set merge_collection_ids = {collection_id}; */ + /* StartMergeTask(merge_collection_ids, true); */ + return Status::OK(); +} + Status SSDBImpl::PreloadCollection(const std::shared_ptr& context, const std::string& collection_name, bool force) { diff --git a/core/src/db/SSDBImpl.h b/core/src/db/SSDBImpl.h index d9bb02149..f85dde7c7 100644 --- a/core/src/db/SSDBImpl.h +++ b/core/src/db/SSDBImpl.h @@ -73,6 +73,9 @@ class SSDBImpl { Status ShowPartitions(const std::string& collection_name, std::vector& partition_names); + Status + DropIndex(const std::string& collection_name, const std::string& field_name, const std::string& field_element_name); + private: void InternalFlush(const std::string& collection_id = ""); diff --git a/core/src/db/snapshot/CompoundOperations.cpp b/core/src/db/snapshot/CompoundOperations.cpp index 0740c7512..d4c7c378c 100644 --- a/core/src/db/snapshot/CompoundOperations.cpp +++ b/core/src/db/snapshot/CompoundOperations.cpp @@ -75,6 +75,7 @@ BuildOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentF emsg << GetRepr() << ". Invalid segment " << context.segment_id << " in context"; return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); } + auto ctx = context; ctx.partition_id = segment->GetPartitionId(); auto new_sf_op = std::make_shared(ctx, GetStartedSS()); @@ -112,8 +113,31 @@ Status DropAllIndexOperation::DoExecute(Store& store) { auto& segment_files = GetAdjustedSS()->GetResources(); - std::map> p_sc_map; + OperationContext cc_context; + { + auto context = context_; + context.stale_field_elements.push_back(context.stale_field_element); + + FieldCommitOperation fc_op(context, GetAdjustedSS()); + STATUS_CHECK(fc_op(store)); + FieldCommitPtr new_field_commit; + STATUS_CHECK(fc_op.GetResource(new_field_commit)); + AddStepWithLsn(*new_field_commit, context.lsn); + context.new_field_commits.push_back(new_field_commit); + for (auto& kv : GetAdjustedSS()->GetResources()) { + if (kv.second->GetFieldId() == new_field_commit->GetFieldId()) { + context.stale_field_commits.push_back(kv.second.Get()); + } + } + + SchemaCommitOperation sc_op(context, GetAdjustedSS()); + STATUS_CHECK(sc_op(store)); + STATUS_CHECK(sc_op.GetResource(cc_context.new_schema_commit)); + AddStepWithLsn(*cc_context.new_schema_commit, context.lsn); + } + + std::map> p_sc_map; for (auto& kv : segment_files) { if (kv.second->GetFieldElementId() != context_.stale_field_element->GetID()) { continue; @@ -128,7 +152,6 @@ DropAllIndexOperation::DoExecute(Store& store) { p_sc_map[context.new_segment_commit->GetPartitionId()].push_back(context.new_segment_commit); } - OperationContext cc_context; for (auto& kv : p_sc_map) { auto& partition_id = kv.first; auto context = context_; diff --git a/core/src/db/snapshot/Context.h b/core/src/db/snapshot/Context.h index 58c94f7a3..23784e92f 100644 --- a/core/src/db/snapshot/Context.h +++ b/core/src/db/snapshot/Context.h @@ -68,6 +68,11 @@ struct OperationContext { FieldPtr prev_field = nullptr; FieldElementPtr prev_field_element = nullptr; FieldElementPtr stale_field_element = nullptr; + std::vector new_field_elements; + std::vector stale_field_elements; + + std::vector new_field_commits; + std::vector stale_field_commits; SegmentPtr prev_segment = nullptr; SegmentCommitPtr prev_segment_commit = nullptr; diff --git a/core/src/db/snapshot/ResourceOperations.cpp b/core/src/db/snapshot/ResourceOperations.cpp index bdc3d90b6..4f7282231 100644 --- a/core/src/db/snapshot/ResourceOperations.cpp +++ b/core/src/db/snapshot/ResourceOperations.cpp @@ -42,7 +42,8 @@ CollectionCommitOperation::DoExecute(Store& store) { for (auto& pc : context_.new_partition_commits) { handle_new_pc(pc); } - } else if (context_.new_schema_commit) { + } + if (context_.new_schema_commit) { resource_->SetSchemaId(context_.new_schema_commit->GetID()); } resource_->SetID(0); @@ -220,6 +221,84 @@ SegmentCommitOperation::PreCheck() { return Status::OK(); } +FieldCommitOperation::FieldCommitOperation(const OperationContext& context, ScopedSnapshotT prev_ss) + : BaseT(context, prev_ss) { +} + +FieldCommit::Ptr +FieldCommitOperation::GetPrevResource() const { + auto get_resource = [&](FieldElementPtr fe) -> FieldCommitPtr { + auto& field_commits = GetStartedSS()->GetResources(); + for (auto& kv : field_commits) { + if (kv.second->GetFieldId() == fe->GetFieldId()) { + return kv.second.Get(); + } + } + return nullptr; + }; + + if (context_.new_field_elements.size() > 0) { + return get_resource(context_.new_field_elements[0]); + } else if (context_.stale_field_elements.size() > 0) { + return get_resource(context_.stale_field_elements[0]); + } + return nullptr; +} + +Status +FieldCommitOperation::DoExecute(Store& store) { + auto prev_resource = GetPrevResource(); + + if (prev_resource) { + resource_ = std::make_shared(*prev_resource); + resource_->SetID(0); + resource_->ResetStatus(); + for (auto& fe : context_.stale_field_elements) { + resource_->GetMappings().erase(fe->GetID()); + } + } else { + // TODO + } + + for (auto& fe : context_.new_field_elements) { + resource_->GetMappings().insert(fe->GetID()); + } + + AddStep(*resource_, false); + return Status::OK(); +} + +SchemaCommitOperation::SchemaCommitOperation(const OperationContext& context, ScopedSnapshotT prev_ss) + : BaseT(context, prev_ss) { +} + +SchemaCommit::Ptr +SchemaCommitOperation::GetPrevResource() const { + return GetStartedSS()->GetSchemaCommit(); +} + +Status +SchemaCommitOperation::DoExecute(Store& store) { + auto prev_resource = GetPrevResource(); + if (!prev_resource) { + return Status(SS_INVALID_CONTEX_ERROR, "Cannot get schema commit"); + } + + resource_ = std::make_shared(*prev_resource); + resource_->SetID(0); + resource_->ResetStatus(); + for (auto& fc : context_.stale_field_commits) { + resource_->GetMappings().erase(fc->GetID()); + } + + for (auto& fc : context_.new_field_commits) { + resource_->GetMappings().insert(fc->GetID()); + } + + AddStep(*resource_, false); + return Status::OK(); +} + SegmentFileOperation::SegmentFileOperation(const SegmentFileContext& sc, ScopedSnapshotT prev_ss) : BaseT(OperationContext(), prev_ss), context_(sc) { } @@ -227,6 +306,12 @@ SegmentFileOperation::SegmentFileOperation(const SegmentFileContext& sc, ScopedS Status SegmentFileOperation::DoExecute(Store& store) { auto field_element_id = GetStartedSS()->GetFieldElementId(context_.field_name, context_.field_element_name); + if (field_element_id == 0) { + std::stringstream emsg; + emsg << GetRepr() << ". Invalid field name: \"" << context_.field_name; + emsg << "\" or field element name: \"" << context_.field_element_name << "\""; + return Status(SS_INVALID_CONTEX_ERROR, emsg.str()); + } resource_ = std::make_shared(context_.collection_id, context_.partition_id, context_.segment_id, field_element_id); AddStep(*resource_, false); diff --git a/core/src/db/snapshot/ResourceOperations.h b/core/src/db/snapshot/ResourceOperations.h index 924ab66d0..9905188c0 100644 --- a/core/src/db/snapshot/ResourceOperations.h +++ b/core/src/db/snapshot/ResourceOperations.h @@ -101,6 +101,33 @@ class SegmentFileOperation : public CommitOperation { SegmentFileContext context_; }; +class FieldCommitOperation : public CommitOperation { + public: + using BaseT = CommitOperation; + FieldCommitOperation(const OperationContext& context, ScopedSnapshotT prev_ss); + + FieldCommit::Ptr + GetPrevResource() const override; + + Status + DoExecute(Store&) override; + + /* Status */ + /* PreCheck() override; */ +}; + +class SchemaCommitOperation : public CommitOperation { + public: + using BaseT = CommitOperation; + SchemaCommitOperation(const OperationContext& context, ScopedSnapshotT prev_ss); + + SchemaCommit::Ptr + GetPrevResource() const override; + + Status + DoExecute(Store&) override; +}; + template <> class LoadOperation : public Operations { public: diff --git a/core/src/db/snapshot/Snapshot.cpp b/core/src/db/snapshot/Snapshot.cpp index 0ac5ea01d..1bd5458c2 100644 --- a/core/src/db/snapshot/Snapshot.cpp +++ b/core/src/db/snapshot/Snapshot.cpp @@ -162,6 +162,24 @@ Snapshot::ToString() const { ss << ",mappings="; auto& cc_m = GetCollectionCommit()->GetMappings(); ss << to_matrix_string(cc_m, row_element_size, 2); + + auto& schema_m = GetSchemaCommit()->GetMappings(); + ss << "\nSchemaCommit: id=" << GetSchemaCommit()->GetID() << ",mappings="; + ss << to_matrix_string(schema_m, row_element_size, 2); + for (auto& fc_id : schema_m) { + auto fc = GetResource(fc_id); + auto f = GetResource(fc->GetFieldId()); + ss << "\n Field: id=" << f->GetID() << ",name=\"" << f->GetName() << "\""; + ss << ", FieldCommit: id=" << fc->GetID(); + ss << ",mappings="; + auto& fc_m = fc->GetMappings(); + ss << to_matrix_string(fc_m, row_element_size, 2); + for (auto& fe_id : fc_m) { + auto fe = GetResource(fe_id); + ss << "\n\tFieldElement: id=" << fe_id << ",name=" << fe->GetName(); + } + } + for (auto& p_c_id : cc_m) { auto p_c = GetResource(p_c_id); auto p = GetResource(p_c->GetPartitionId()); diff --git a/core/src/db/snapshot/Store.h b/core/src/db/snapshot/Store.h index cf0151836..bc40556f4 100644 --- a/core/src/db/snapshot/Store.h +++ b/core/src/db/snapshot/Store.h @@ -241,6 +241,8 @@ class Store { resources[res->GetID()] = res; lock.unlock(); GetResource(res->GetID(), return_v); + /* std::cout << ">>> [Update] " << ResourceT::Name << " " << id; */ + /* std::cout << " " << std::boolalpha << res->IsActive() << std::endl; */ return Status::OK(); } @@ -415,7 +417,7 @@ class Store { auto random_elements = rand_r(&seed) % 2 + 2; for (auto fei = 1; fei <= random_elements; ++fei) { std::stringstream fename; - fename << "fe_" << fei << "_"; + fename << "fe_" << field->GetID() << "_" << fei << "_"; fename << std::get::value>(ids_) + 1; FieldElementPtr element; @@ -458,7 +460,7 @@ class Store { for (auto field_element_id : f_c_m) { SegmentFilePtr sf; CreateResource( - SegmentFile(c->GetID(), p->GetID(), s->GetID(), field_commit_id, 0, 0, 0, 0, ACTIVE), + SegmentFile(c->GetID(), p->GetID(), s->GetID(), field_element_id, 0, 0, 0, 0, ACTIVE), sf); all_records.push_back(sf); diff --git a/core/unittest/ssdb/test_db.cpp b/core/unittest/ssdb/test_db.cpp index 68b96b648..3f15242dc 100644 --- a/core/unittest/ssdb/test_db.cpp +++ b/core/unittest/ssdb/test_db.cpp @@ -18,55 +18,6 @@ #include #include "ssdb/utils.h" -#include "db/snapshot/CompoundOperations.h" -#include "db/snapshot/Context.h" -#include "db/snapshot/EventExecutor.h" -#include "db/snapshot/OperationExecutor.h" -#include "db/snapshot/ReferenceProxy.h" -#include "db/snapshot/ResourceHolders.h" -#include "db/snapshot/ScopedResource.h" -#include "db/snapshot/Snapshots.h" -#include "db/snapshot/Store.h" -#include "db/snapshot/WrappedTypes.h" - -using ID_TYPE = milvus::engine::snapshot::ID_TYPE; -using IDS_TYPE = milvus::engine::snapshot::IDS_TYPE; -using LSN_TYPE = milvus::engine::snapshot::LSN_TYPE; -using MappingT = milvus::engine::snapshot::MappingT; -using LoadOperationContext = milvus::engine::snapshot::LoadOperationContext; -using CreateCollectionContext = milvus::engine::snapshot::CreateCollectionContext; -using SegmentFileContext = milvus::engine::snapshot::SegmentFileContext; -using OperationContext = milvus::engine::snapshot::OperationContext; -using PartitionContext = milvus::engine::snapshot::PartitionContext; -using BuildOperation = milvus::engine::snapshot::BuildOperation; -using MergeOperation = milvus::engine::snapshot::MergeOperation; -using CreateCollectionOperation = milvus::engine::snapshot::CreateCollectionOperation; -using NewSegmentOperation = milvus::engine::snapshot::NewSegmentOperation; -using DropPartitionOperation = milvus::engine::snapshot::DropPartitionOperation; -using CreatePartitionOperation = milvus::engine::snapshot::CreatePartitionOperation; -using DropCollectionOperation = milvus::engine::snapshot::DropCollectionOperation; -using CollectionCommitsHolder = milvus::engine::snapshot::CollectionCommitsHolder; -using CollectionsHolder = milvus::engine::snapshot::CollectionsHolder; -using CollectionScopedT = milvus::engine::snapshot::CollectionScopedT; -using Collection = milvus::engine::snapshot::Collection; -using CollectionPtr = milvus::engine::snapshot::CollectionPtr; -using Partition = milvus::engine::snapshot::Partition; -using PartitionPtr = milvus::engine::snapshot::PartitionPtr; -using Segment = milvus::engine::snapshot::Segment; -using SegmentPtr = milvus::engine::snapshot::SegmentPtr; -using SegmentFile = milvus::engine::snapshot::SegmentFile; -using SegmentFilePtr = milvus::engine::snapshot::SegmentFilePtr; -using Field = milvus::engine::snapshot::Field; -using FieldElement = milvus::engine::snapshot::FieldElement; -using Snapshots = milvus::engine::snapshot::Snapshots; -using ScopedSnapshotT = milvus::engine::snapshot::ScopedSnapshotT; -using ReferenceProxy = milvus::engine::snapshot::ReferenceProxy; -using Queue = milvus::BlockingQueue; -using TQueue = milvus::BlockingQueue>; -using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation; -using ParamsField = milvus::engine::snapshot::ParamsField; -using IteratePartitionHandler = milvus::engine::snapshot::IterateHandler; -using SSDBImpl = milvus::engine::SSDBImpl; milvus::Status CreateCollection(std::shared_ptr db, const std::string& collection_name, const LSN_TYPE& lsn) { @@ -180,3 +131,76 @@ TEST_F(SSDBTest, PartitionTest) { ASSERT_TRUE(status.ok()); ASSERT_EQ(partition_names.size(), 1); } + +TEST_F(SSDBTest, IndexTest) { + LSN_TYPE lsn = 0; + auto next_lsn = [&]() -> decltype(lsn) { + return ++lsn; + }; + + std::string c1 = "c1"; + auto status = CreateCollection(db_, c1, next_lsn()); + ASSERT_TRUE(status.ok()); + + std::stringstream p_name; + auto num = RandomInt(3, 5); + for (auto i = 0; i < num; ++i) { + p_name.str(""); + p_name << "partition_" << i; + status = db_->CreatePartition(c1, p_name.str()); + ASSERT_TRUE(status.ok()); + } + + ScopedSnapshotT ss; + status = Snapshots::GetInstance().GetSnapshot(ss, c1); + ASSERT_TRUE(status.ok()); + + SegmentFileContext sf_context; + SFContextBuilder(sf_context, ss); + + auto new_total = 0; + auto& partitions = ss->GetResources(); + for (auto& kv : partitions) { + num = RandomInt(2, 5); + for (auto i = 0; i < num; ++i) { + ASSERT_TRUE(CreateSegment(ss, kv.first, next_lsn(), sf_context).ok()); + } + new_total += num; + } + + auto field_element_id = ss->GetFieldElementId(sf_context.field_name, sf_context.field_element_name); + ASSERT_NE(field_element_id, 0); + + auto filter1 = [&](SegmentFile::Ptr segment_file) -> bool { + if (segment_file->GetFieldElementId() == field_element_id) { + return true; + } + return false; + }; + + status = Snapshots::GetInstance().GetSnapshot(ss, c1); + ASSERT_TRUE(status.ok()); + auto sf_collector = std::make_shared(ss, filter1); + sf_collector->Iterate(); + std::cout << "Total " << sf_collector->segment_files_.size() << " of field_element_id "; + std::cout << field_element_id << std::endl; + ASSERT_EQ(new_total, sf_collector->segment_files_.size()); + + status = db_->DropIndex(c1, sf_context.field_name, sf_context.field_element_name); + ASSERT_TRUE(status.ok()); + + status = Snapshots::GetInstance().GetSnapshot(ss, c1); + ASSERT_TRUE(status.ok()); + sf_collector = std::make_shared(ss, filter1); + sf_collector->Iterate(); + std::cout << "Total " << sf_collector->segment_files_.size() << " of field_element_id "; + std::cout << field_element_id << std::endl; + ASSERT_EQ(0, sf_collector->segment_files_.size()); + + { + auto& field_elements = ss->GetResources(); + for (auto& kv : field_elements) { + ASSERT_NE(kv.second->GetID(), field_element_id); + } + } +} diff --git a/core/unittest/ssdb/test_snapshot.cpp b/core/unittest/ssdb/test_snapshot.cpp index 1231005fc..65905758a 100644 --- a/core/unittest/ssdb/test_snapshot.cpp +++ b/core/unittest/ssdb/test_snapshot.cpp @@ -18,183 +18,6 @@ #include #include "ssdb/utils.h" -#include "db/snapshot/CompoundOperations.h" -#include "db/snapshot/Context.h" -#include "db/snapshot/EventExecutor.h" -#include "db/snapshot/OperationExecutor.h" -#include "db/snapshot/ReferenceProxy.h" -#include "db/snapshot/ResourceHolders.h" -#include "db/snapshot/ScopedResource.h" -#include "db/snapshot/Snapshots.h" -#include "db/snapshot/Store.h" -#include "db/snapshot/WrappedTypes.h" - -using ID_TYPE = milvus::engine::snapshot::ID_TYPE; -using IDS_TYPE = milvus::engine::snapshot::IDS_TYPE; -using LSN_TYPE = milvus::engine::snapshot::LSN_TYPE; -using MappingT = milvus::engine::snapshot::MappingT; -using LoadOperationContext = milvus::engine::snapshot::LoadOperationContext; -using CreateCollectionContext = milvus::engine::snapshot::CreateCollectionContext; -using SegmentFileContext = milvus::engine::snapshot::SegmentFileContext; -using OperationContext = milvus::engine::snapshot::OperationContext; -using PartitionContext = milvus::engine::snapshot::PartitionContext; -using DropIndexOperation = milvus::engine::snapshot::DropIndexOperation; -using DropAllIndexOperation = milvus::engine::snapshot::DropAllIndexOperation; -using BuildOperation = milvus::engine::snapshot::BuildOperation; -using MergeOperation = milvus::engine::snapshot::MergeOperation; -using CreateCollectionOperation = milvus::engine::snapshot::CreateCollectionOperation; -using NewSegmentOperation = milvus::engine::snapshot::NewSegmentOperation; -using DropPartitionOperation = milvus::engine::snapshot::DropPartitionOperation; -using CreatePartitionOperation = milvus::engine::snapshot::CreatePartitionOperation; -using DropCollectionOperation = milvus::engine::snapshot::DropCollectionOperation; -using CollectionCommitsHolder = milvus::engine::snapshot::CollectionCommitsHolder; -using CollectionsHolder = milvus::engine::snapshot::CollectionsHolder; -using CollectionScopedT = milvus::engine::snapshot::CollectionScopedT; -using Collection = milvus::engine::snapshot::Collection; -using CollectionPtr = milvus::engine::snapshot::CollectionPtr; -using Partition = milvus::engine::snapshot::Partition; -using PartitionPtr = milvus::engine::snapshot::PartitionPtr; -using Segment = milvus::engine::snapshot::Segment; -using SegmentPtr = milvus::engine::snapshot::SegmentPtr; -using SegmentFile = milvus::engine::snapshot::SegmentFile; -using SegmentFilePtr = milvus::engine::snapshot::SegmentFilePtr; -using Field = milvus::engine::snapshot::Field; -using FieldElement = milvus::engine::snapshot::FieldElement; -using Snapshots = milvus::engine::snapshot::Snapshots; -using ScopedSnapshotT = milvus::engine::snapshot::ScopedSnapshotT; -using ReferenceProxy = milvus::engine::snapshot::ReferenceProxy; -using Queue = milvus::BlockingQueue; -using TQueue = milvus::BlockingQueue>; -using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation; -using ParamsField = milvus::engine::snapshot::ParamsField; -using IteratePartitionHandler = milvus::engine::snapshot::IterateHandler; -using IterateSegmentFileHandler = milvus::engine::snapshot::IterateHandler; - -struct PartitionCollector : public IteratePartitionHandler { - using ResourceT = Partition; - using BaseT = IteratePartitionHandler; - explicit PartitionCollector(ScopedSnapshotT ss) : BaseT(ss) {} - - milvus::Status - PreIterate() override { - partition_names_.clear(); - return milvus::Status::OK(); - } - - milvus::Status - Handle(const typename ResourceT::Ptr& partition) override { - partition_names_.push_back(partition->GetName()); - return milvus::Status::OK(); - } - - std::vector partition_names_; -}; - -using FilterT = std::function; -struct SegmentFileCollector : public IterateSegmentFileHandler { - using ResourceT = SegmentFile; - using BaseT = IterateSegmentFileHandler; - explicit SegmentFileCollector(ScopedSnapshotT ss, const FilterT& filter) - : filter_(filter), BaseT(ss) {} - - milvus::Status - PreIterate() override { - segment_files_.clear(); - return milvus::Status::OK(); - } - - milvus::Status - Handle(const typename ResourceT::Ptr& segment_file) override { - if (!filter_(segment_file)) { - return milvus::Status::OK(); - } - segment_files_.insert(segment_file->GetID()); - return milvus::Status::OK(); - } - - FilterT filter_; - std::set segment_files_; -}; - -struct WaitableObj { - bool notified_ = false; - std::mutex mutex_; - std::condition_variable cv_; - - void - Wait() { - std::unique_lock lck(mutex_); - if (!notified_) { - cv_.wait(lck); - } - notified_ = false; - } - - void - Notify() { - std::unique_lock lck(mutex_); - notified_ = true; - lck.unlock(); - cv_.notify_one(); - } -}; - -ScopedSnapshotT -CreateCollection(const std::string& collection_name, const LSN_TYPE& lsn) { - CreateCollectionContext context; - context.lsn = lsn; - auto collection_schema = std::make_shared(collection_name); - context.collection = collection_schema; - auto vector_field = std::make_shared("vector", 0, - milvus::engine::snapshot::FieldType::VECTOR); - auto vector_field_element = std::make_shared(0, 0, "ivfsq8", - milvus::engine::snapshot::FieldElementType::IVFSQ8); - auto int_field = std::make_shared("int", 0, - milvus::engine::snapshot::FieldType::INT32); - context.fields_schema[vector_field] = {vector_field_element}; - context.fields_schema[int_field] = {}; - - auto op = std::make_shared(context); - op->Push(); - ScopedSnapshotT ss; - auto status = op->GetSnapshot(ss); - return ss; -} - -ScopedSnapshotT -CreatePartition(const std::string& collection_name, const PartitionContext& p_context, const LSN_TYPE& lsn) { - ScopedSnapshotT curr_ss; - ScopedSnapshotT ss; - auto status = Snapshots::GetInstance().GetSnapshot(ss, collection_name); - if (!status.ok()) { - std::cout << status.ToString() << std::endl; - return curr_ss; - } - - OperationContext context; - context.lsn = lsn; - auto op = std::make_shared(context, ss); - - PartitionPtr partition; - status = op->CommitNewPartition(p_context, partition); - if (!status.ok()) { - std::cout << status.ToString() << std::endl; - return curr_ss; - } - - status = op->Push(); - if (!status.ok()) { - std::cout << status.ToString() << std::endl; - return curr_ss; - } - - status = op->GetSnapshot(curr_ss); - if (!status.ok()) { - std::cout << status.ToString() << std::endl; - return curr_ss; - } - return curr_ss; -} TEST_F(SnapshotTest, ResourcesTest) { int nprobe = 16; @@ -452,7 +275,7 @@ TEST_F(SnapshotTest, ConCurrentCollectionOperation) { ID_TYPE stale_ss_id; auto worker1 = [&]() { - milvus::Status status; + Status status; auto ss = CreateCollection(collection_name, ++lsn); ASSERT_TRUE(ss); ASSERT_EQ(ss->GetName(), collection_name); @@ -628,24 +451,19 @@ TEST_F(SnapshotTest, IndexTest) { return ++lsn; }; - SegmentFileContext sf_context; - sf_context.field_name = "f_1_1"; - sf_context.field_element_name = "fe_1_1"; - sf_context.segment_id = 1; - sf_context.partition_id = 1; - ScopedSnapshotT ss; auto status = Snapshots::GetInstance().GetSnapshot(ss, 1); - std::cout << status.ToString() << std::endl; ASSERT_TRUE(status.ok()); + SegmentFileContext sf_context; + SFContextBuilder(sf_context, ss); + OperationContext context; context.lsn = next_lsn(); - context.prev_partition = ss->GetResource(1); + context.prev_partition = ss->GetResource(sf_context.partition_id); auto build_op = std::make_shared(context, ss); SegmentFilePtr seg_file; status = build_op->CommitNewSegmentFile(sf_context, seg_file); - std::cout << status.ToString() << std::endl; ASSERT_TRUE(status.ok()); ASSERT_TRUE(seg_file); @@ -670,10 +488,6 @@ TEST_F(SnapshotTest, IndexTest) { auto it_found = sf_collector->segment_files_.find(seg_file->GetID()); ASSERT_NE(it_found, sf_collector->segment_files_.end()); - /* for (auto& i : sf_collector->segment_files_) { */ - /* std::cout << i << std::endl; */ - /* } */ - /* std::cout << "XX " << seg_file->GetID() << std::endl; */ status = Snapshots::GetInstance().GetSnapshot(ss, 1); ASSERT_TRUE(status.ok()); @@ -683,7 +497,6 @@ TEST_F(SnapshotTest, IndexTest) { drop_ctx.stale_segment_file = seg_file; auto drop_op = std::make_shared(drop_ctx, ss); status = drop_op->Push(); - std::cout << status.ToString() << std::endl; ASSERT_TRUE(status.ok()); status = drop_op->GetSnapshot(ss); @@ -712,38 +525,19 @@ TEST_F(SnapshotTest, IndexTest) { sf_collector->Iterate(); auto prev_total = sf_collector->segment_files_.size(); - auto create_segment = [&](ID_TYPE partition_id) { - OperationContext context; - context.lsn = next_lsn(); - context.prev_partition = ss->GetResource(partition_id); - auto op = std::make_shared(context, ss); - SegmentPtr new_seg; - status = op->CommitNewSegment(new_seg); - ASSERT_TRUE(status.ok()); - ASSERT_FALSE(new_seg->ToString().empty()); - SegmentFilePtr seg_file; - auto nsf_context = sf_context; - nsf_context.segment_id = new_seg->GetID(); - nsf_context.partition_id = new_seg->GetPartitionId(); - status = op->CommitNewSegmentFile(nsf_context, seg_file); - ASSERT_TRUE(status.ok()); - status = op->Push(); - ASSERT_TRUE(status.ok()); - - status = op->GetSnapshot(ss); - ASSERT_TRUE(status.ok()); - }; - auto new_total = 0; auto partitions = ss->GetResources(); for (auto& kv : partitions) { num = RandomInt(2, 5); for (auto i = 0; i < num; ++i) { - create_segment(kv.first); + ASSERT_TRUE(CreateSegment(ss, kv.first, next_lsn(), sf_context).ok()); } new_total += num; } + status = Snapshots::GetInstance().GetSnapshot(ss, ss->GetName()); + ASSERT_TRUE(status.ok()); + sf_collector = std::make_shared(ss, filter2); sf_collector->Iterate(); auto total = sf_collector->segment_files_.size(); @@ -753,31 +547,58 @@ TEST_F(SnapshotTest, IndexTest) { sf_context.field_element_name); ASSERT_NE(field_element_id, 0); + auto filter3 = [&](SegmentFile::Ptr segment_file) -> bool { + if (segment_file->GetFieldElementId() == field_element_id) { + return true; + } + return false; + }; + sf_collector = std::make_shared(ss, filter3); + sf_collector->Iterate(); + auto specified_segment_files_cnt = sf_collector->segment_files_.size(); + OperationContext d_a_i_ctx; d_a_i_ctx.lsn = next_lsn(); d_a_i_ctx.stale_field_element = ss->GetResource(field_element_id); auto drop_all_index_op = std::make_shared(d_a_i_ctx, ss); status = drop_all_index_op->Push(); - std::cout << status.ToString() << std::endl; ASSERT_TRUE(status.ok()); status = drop_all_index_op->GetSnapshot(ss); ASSERT_TRUE(status.ok()); + /* { */ + /* auto& fields = ss->GetResources(); */ + /* for (auto field_kv : fields) { */ + /* auto field = field_kv.second; */ + /* std::cout << "field " << field->GetID() << " " << field->GetName() << std::endl; */ + /* auto& field_elements = ss->GetResources(); */ + /* for (auto element_kv : field_elements) { */ + /* auto element = element_kv.second; */ + /* if (element->GetFieldId() != field->GetID()) { */ + /* continue; */ + /* } */ + /* std::cout << "\tfield_element " << element->GetID() << " " << element->GetName() << std::endl; */ + /* } */ + /* } */ + /* } */ + sf_collector = std::make_shared(ss, filter2); sf_collector->Iterate(); - ASSERT_EQ(sf_collector->segment_files_.size(), total - new_total); + ASSERT_EQ(sf_collector->segment_files_.size(), total - specified_segment_files_cnt); + + { + auto& field_elements = ss->GetResources(); + for (auto& kv : field_elements) { + ASSERT_NE(kv.second->GetID(), field_element_id); + } + } } TEST_F(SnapshotTest, OperationTest) { - milvus::Status status; + Status status; std::string to_string; LSN_TYPE lsn; - SegmentFileContext sf_context; - sf_context.field_name = "f_1_1"; - sf_context.field_element_name = "fe_1_1"; - sf_context.segment_id = 1; - sf_context.partition_id = 1; ScopedSnapshotT ss; status = Snapshots::GetInstance().GetSnapshot(ss, 1); @@ -794,6 +615,9 @@ TEST_F(SnapshotTest, OperationTest) { ASSERT_TRUE(collection_commit->ToString().empty()); } + SegmentFileContext sf_context; + SFContextBuilder(sf_context, ss); + OperationContext merge_ctx; std::set stale_segment_commit_ids; @@ -949,7 +773,7 @@ TEST_F(SnapshotTest, OperationTest) { } TEST_F(SnapshotTest, CompoundTest1) { - milvus::Status status; + Status status; std::atomic lsn = 0; auto next_lsn = [&]() -> decltype(lsn) { return ++lsn; @@ -1290,7 +1114,7 @@ TEST_F(SnapshotTest, CompoundTest1) { TEST_F(SnapshotTest, CompoundTest2) { - milvus::Status status; + Status status; LSN_TYPE lsn = 0; auto next_lsn = [&]() -> LSN_TYPE& { return ++lsn; diff --git a/core/unittest/ssdb/utils.h b/core/unittest/ssdb/utils.h index 6415048cc..242e99ef9 100644 --- a/core/unittest/ssdb/utils.h +++ b/core/unittest/ssdb/utils.h @@ -14,8 +14,65 @@ #include #include #include +#include +#include +#include +#include #include "db/SSDBImpl.h" +#include "db/snapshot/CompoundOperations.h" +#include "db/snapshot/Context.h" +#include "db/snapshot/EventExecutor.h" +#include "db/snapshot/OperationExecutor.h" +#include "db/snapshot/ReferenceProxy.h" +#include "db/snapshot/ResourceHolders.h" +#include "db/snapshot/ScopedResource.h" +#include "db/snapshot/Snapshots.h" +#include "db/snapshot/Store.h" +#include "db/snapshot/WrappedTypes.h" + +using ID_TYPE = milvus::engine::snapshot::ID_TYPE; +using IDS_TYPE = milvus::engine::snapshot::IDS_TYPE; +using LSN_TYPE = milvus::engine::snapshot::LSN_TYPE; +using MappingT = milvus::engine::snapshot::MappingT; +using LoadOperationContext = milvus::engine::snapshot::LoadOperationContext; +using CreateCollectionContext = milvus::engine::snapshot::CreateCollectionContext; +using SegmentFileContext = milvus::engine::snapshot::SegmentFileContext; +using OperationContext = milvus::engine::snapshot::OperationContext; +using PartitionContext = milvus::engine::snapshot::PartitionContext; +using DropIndexOperation = milvus::engine::snapshot::DropIndexOperation; +using DropAllIndexOperation = milvus::engine::snapshot::DropAllIndexOperation; +using BuildOperation = milvus::engine::snapshot::BuildOperation; +using MergeOperation = milvus::engine::snapshot::MergeOperation; +using CreateCollectionOperation = milvus::engine::snapshot::CreateCollectionOperation; +using NewSegmentOperation = milvus::engine::snapshot::NewSegmentOperation; +using DropPartitionOperation = milvus::engine::snapshot::DropPartitionOperation; +using CreatePartitionOperation = milvus::engine::snapshot::CreatePartitionOperation; +using DropCollectionOperation = milvus::engine::snapshot::DropCollectionOperation; +using CollectionCommitsHolder = milvus::engine::snapshot::CollectionCommitsHolder; +using CollectionsHolder = milvus::engine::snapshot::CollectionsHolder; +using CollectionScopedT = milvus::engine::snapshot::CollectionScopedT; +using Collection = milvus::engine::snapshot::Collection; +using CollectionPtr = milvus::engine::snapshot::CollectionPtr; +using Partition = milvus::engine::snapshot::Partition; +using PartitionPtr = milvus::engine::snapshot::PartitionPtr; +using Segment = milvus::engine::snapshot::Segment; +using SegmentPtr = milvus::engine::snapshot::SegmentPtr; +using SegmentFile = milvus::engine::snapshot::SegmentFile; +using SegmentFilePtr = milvus::engine::snapshot::SegmentFilePtr; +using Field = milvus::engine::snapshot::Field; +using FieldElement = milvus::engine::snapshot::FieldElement; +using Snapshots = milvus::engine::snapshot::Snapshots; +using ScopedSnapshotT = milvus::engine::snapshot::ScopedSnapshotT; +using ReferenceProxy = milvus::engine::snapshot::ReferenceProxy; +using Queue = milvus::BlockingQueue; +using TQueue = milvus::BlockingQueue>; +using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation; +using ParamsField = milvus::engine::snapshot::ParamsField; +using IteratePartitionHandler = milvus::engine::snapshot::IterateHandler; +using IterateSegmentFileHandler = milvus::engine::snapshot::IterateHandler; +using SSDBImpl = milvus::engine::SSDBImpl; +using Status = milvus::Status; inline int RandomInt(int start, int end) { @@ -25,6 +82,167 @@ RandomInt(int start, int end) { return dist(rng); } +inline void +SFContextBuilder(SegmentFileContext& ctx, ScopedSnapshotT sss) { + auto field = sss->GetResources().begin()->second; + ctx.field_name = field->GetName(); + for (auto& kv : sss->GetResources()) { + ctx.field_element_name = kv.second->GetName(); + break; + } + auto& segments = sss->GetResources(); + if (segments.size() == 0) { + return; + } + + ctx.segment_id = sss->GetResources().begin()->second->GetID(); + ctx.partition_id = sss->GetResources().begin()->second->GetPartitionId(); +} + +struct PartitionCollector : public IteratePartitionHandler { + using ResourceT = Partition; + using BaseT = IteratePartitionHandler; + explicit PartitionCollector(ScopedSnapshotT ss) : BaseT(ss) {} + + Status + PreIterate() override { + partition_names_.clear(); + return Status::OK(); + } + + Status + Handle(const typename ResourceT::Ptr& partition) override { + partition_names_.push_back(partition->GetName()); + return Status::OK(); + } + + std::vector partition_names_; +}; + +using FilterT = std::function; +struct SegmentFileCollector : public IterateSegmentFileHandler { + using ResourceT = SegmentFile; + using BaseT = IterateSegmentFileHandler; + explicit SegmentFileCollector(ScopedSnapshotT ss, const FilterT& filter) + : filter_(filter), BaseT(ss) {} + + Status + PreIterate() override { + segment_files_.clear(); + return Status::OK(); + } + + Status + Handle(const typename ResourceT::Ptr& segment_file) override { + if (!filter_(segment_file)) { + return Status::OK(); + } + segment_files_.insert(segment_file->GetID()); + return Status::OK(); + } + + FilterT filter_; + std::set segment_files_; +}; + +struct WaitableObj { + bool notified_ = false; + std::mutex mutex_; + std::condition_variable cv_; + + void + Wait() { + std::unique_lock lck(mutex_); + if (!notified_) { + cv_.wait(lck); + } + notified_ = false; + } + + void + Notify() { + std::unique_lock lck(mutex_); + notified_ = true; + lck.unlock(); + cv_.notify_one(); + } +}; + +inline ScopedSnapshotT +CreateCollection(const std::string& collection_name, const LSN_TYPE& lsn) { + CreateCollectionContext context; + context.lsn = lsn; + auto collection_schema = std::make_shared(collection_name); + context.collection = collection_schema; + auto vector_field = std::make_shared("vector", 0, + milvus::engine::snapshot::FieldType::VECTOR); + auto vector_field_element = std::make_shared(0, 0, "ivfsq8", + milvus::engine::snapshot::FieldElementType::IVFSQ8); + auto int_field = std::make_shared("int", 0, + milvus::engine::snapshot::FieldType::INT32); + context.fields_schema[vector_field] = {vector_field_element}; + context.fields_schema[int_field] = {}; + + auto op = std::make_shared(context); + op->Push(); + ScopedSnapshotT ss; + auto status = op->GetSnapshot(ss); + return ss; +} + +inline ScopedSnapshotT +CreatePartition(const std::string& collection_name, const PartitionContext& p_context, const LSN_TYPE& lsn) { + ScopedSnapshotT curr_ss; + ScopedSnapshotT ss; + auto status = Snapshots::GetInstance().GetSnapshot(ss, collection_name); + if (!status.ok()) { + std::cout << status.ToString() << std::endl; + return curr_ss; + } + + OperationContext context; + context.lsn = lsn; + auto op = std::make_shared(context, ss); + + PartitionPtr partition; + status = op->CommitNewPartition(p_context, partition); + if (!status.ok()) { + std::cout << status.ToString() << std::endl; + return curr_ss; + } + + status = op->Push(); + if (!status.ok()) { + std::cout << status.ToString() << std::endl; + return curr_ss; + } + + status = op->GetSnapshot(curr_ss); + if (!status.ok()) { + std::cout << status.ToString() << std::endl; + return curr_ss; + } + return curr_ss; +} + +inline Status +CreateSegment(ScopedSnapshotT ss, ID_TYPE partition_id, LSN_TYPE lsn, const SegmentFileContext& sf_context) { + OperationContext context; + context.lsn = lsn; + context.prev_partition = ss->GetResource(partition_id); + auto op = std::make_shared(context, ss); + SegmentPtr new_seg; + STATUS_CHECK(op->CommitNewSegment(new_seg)); + SegmentFilePtr seg_file; + auto nsf_context = sf_context; + nsf_context.segment_id = new_seg->GetID(); + nsf_context.partition_id = new_seg->GetPartitionId(); + STATUS_CHECK(op->CommitNewSegmentFile(nsf_context, seg_file)); + STATUS_CHECK(op->Push()); + + return op->GetSnapshot(ss); +} + class BaseTest : public ::testing::Test { protected: void @@ -46,7 +264,7 @@ class SnapshotTest : public BaseTest { class SSDBTest : public BaseTest { protected: - std::shared_ptr db_; + std::shared_ptr db_; void SetUp() override; -- GitLab