未验证 提交 ef8171b9 编写于 作者: X XuPeng-SH 提交者: GitHub

(db/snapshot): Bug fix in operations and unittest (#2744)

* (db/snapshot): fix bugs and update UT
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update Snapshot ToString
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add DropIndex API
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Add DropIndex API UT
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
上级 36cb2479
......@@ -209,6 +209,37 @@ SSDBImpl::ShowPartitions(const std::string& collection_name, std::vector<std::st
return Status::OK();
}
Status
SSDBImpl::DropIndex(const std::string& collection_name, const std::string& field_name,
const std::string& field_element_name) {
CHECK_INITIALIZED;
LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_name;
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
// SS TODO: Check Index Type
snapshot::OperationContext context;
// SS TODO: no lsn for drop index
context.lsn = ss->GetCollectionCommit()->GetLsn();
auto field_element_id = ss->GetFieldElementId(field_name, field_element_name);
if (field_element_id == 0) {
std::stringstream emsg;
emsg << "Invalid field name: \"" << field_name;
emsg << "\" or field element name: \"" << field_element_name << "\"";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
context.stale_field_element = ss->GetResource<snapshot::FieldElement>(field_element_id);
auto op = std::make_shared<snapshot::DropAllIndexOperation>(context, ss);
STATUS_CHECK(op->Push());
// SS TODO: Start merge task needed?
/* std::set<std::string> merge_collection_ids = {collection_id}; */
/* StartMergeTask(merge_collection_ids, true); */
return Status::OK();
}
Status
SSDBImpl::PreloadCollection(const std::shared_ptr<server::Context>& context, const std::string& collection_name,
bool force) {
......
......@@ -73,6 +73,9 @@ class SSDBImpl {
Status
ShowPartitions(const std::string& collection_name, std::vector<std::string>& partition_names);
Status
DropIndex(const std::string& collection_name, const std::string& field_name, const std::string& field_element_name);
private:
void
InternalFlush(const std::string& collection_id = "");
......
......@@ -75,6 +75,7 @@ BuildOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentF
emsg << GetRepr() << ". Invalid segment " << context.segment_id << " in context";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
auto ctx = context;
ctx.partition_id = segment->GetPartitionId();
auto new_sf_op = std::make_shared<SegmentFileOperation>(ctx, GetStartedSS());
......@@ -112,8 +113,31 @@ Status
DropAllIndexOperation::DoExecute(Store& store) {
auto& segment_files = GetAdjustedSS()->GetResources<SegmentFile>();
std::map<ID_TYPE, std::vector<SegmentCommitPtr>> p_sc_map;
OperationContext cc_context;
{
auto context = context_;
context.stale_field_elements.push_back(context.stale_field_element);
FieldCommitOperation fc_op(context, GetAdjustedSS());
STATUS_CHECK(fc_op(store));
FieldCommitPtr new_field_commit;
STATUS_CHECK(fc_op.GetResource(new_field_commit));
AddStepWithLsn(*new_field_commit, context.lsn);
context.new_field_commits.push_back(new_field_commit);
for (auto& kv : GetAdjustedSS()->GetResources<FieldCommit>()) {
if (kv.second->GetFieldId() == new_field_commit->GetFieldId()) {
context.stale_field_commits.push_back(kv.second.Get());
}
}
SchemaCommitOperation sc_op(context, GetAdjustedSS());
STATUS_CHECK(sc_op(store));
STATUS_CHECK(sc_op.GetResource(cc_context.new_schema_commit));
AddStepWithLsn(*cc_context.new_schema_commit, context.lsn);
}
std::map<ID_TYPE, std::vector<SegmentCommitPtr>> p_sc_map;
for (auto& kv : segment_files) {
if (kv.second->GetFieldElementId() != context_.stale_field_element->GetID()) {
continue;
......@@ -128,7 +152,6 @@ DropAllIndexOperation::DoExecute(Store& store) {
p_sc_map[context.new_segment_commit->GetPartitionId()].push_back(context.new_segment_commit);
}
OperationContext cc_context;
for (auto& kv : p_sc_map) {
auto& partition_id = kv.first;
auto context = context_;
......
......@@ -68,6 +68,11 @@ struct OperationContext {
FieldPtr prev_field = nullptr;
FieldElementPtr prev_field_element = nullptr;
FieldElementPtr stale_field_element = nullptr;
std::vector<FieldElementPtr> new_field_elements;
std::vector<FieldElementPtr> stale_field_elements;
std::vector<FieldCommitPtr> new_field_commits;
std::vector<FieldCommitPtr> stale_field_commits;
SegmentPtr prev_segment = nullptr;
SegmentCommitPtr prev_segment_commit = nullptr;
......
......@@ -42,7 +42,8 @@ CollectionCommitOperation::DoExecute(Store& store) {
for (auto& pc : context_.new_partition_commits) {
handle_new_pc(pc);
}
} else if (context_.new_schema_commit) {
}
if (context_.new_schema_commit) {
resource_->SetSchemaId(context_.new_schema_commit->GetID());
}
resource_->SetID(0);
......@@ -220,6 +221,84 @@ SegmentCommitOperation::PreCheck() {
return Status::OK();
}
FieldCommitOperation::FieldCommitOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
: BaseT(context, prev_ss) {
}
FieldCommit::Ptr
FieldCommitOperation::GetPrevResource() const {
auto get_resource = [&](FieldElementPtr fe) -> FieldCommitPtr {
auto& field_commits = GetStartedSS()->GetResources<FieldCommit>();
for (auto& kv : field_commits) {
if (kv.second->GetFieldId() == fe->GetFieldId()) {
return kv.second.Get();
}
}
return nullptr;
};
if (context_.new_field_elements.size() > 0) {
return get_resource(context_.new_field_elements[0]);
} else if (context_.stale_field_elements.size() > 0) {
return get_resource(context_.stale_field_elements[0]);
}
return nullptr;
}
Status
FieldCommitOperation::DoExecute(Store& store) {
auto prev_resource = GetPrevResource();
if (prev_resource) {
resource_ = std::make_shared<FieldCommit>(*prev_resource);
resource_->SetID(0);
resource_->ResetStatus();
for (auto& fe : context_.stale_field_elements) {
resource_->GetMappings().erase(fe->GetID());
}
} else {
// TODO
}
for (auto& fe : context_.new_field_elements) {
resource_->GetMappings().insert(fe->GetID());
}
AddStep(*resource_, false);
return Status::OK();
}
SchemaCommitOperation::SchemaCommitOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
: BaseT(context, prev_ss) {
}
SchemaCommit::Ptr
SchemaCommitOperation::GetPrevResource() const {
return GetStartedSS()->GetSchemaCommit();
}
Status
SchemaCommitOperation::DoExecute(Store& store) {
auto prev_resource = GetPrevResource();
if (!prev_resource) {
return Status(SS_INVALID_CONTEX_ERROR, "Cannot get schema commit");
}
resource_ = std::make_shared<SchemaCommit>(*prev_resource);
resource_->SetID(0);
resource_->ResetStatus();
for (auto& fc : context_.stale_field_commits) {
resource_->GetMappings().erase(fc->GetID());
}
for (auto& fc : context_.new_field_commits) {
resource_->GetMappings().insert(fc->GetID());
}
AddStep(*resource_, false);
return Status::OK();
}
SegmentFileOperation::SegmentFileOperation(const SegmentFileContext& sc, ScopedSnapshotT prev_ss)
: BaseT(OperationContext(), prev_ss), context_(sc) {
}
......@@ -227,6 +306,12 @@ SegmentFileOperation::SegmentFileOperation(const SegmentFileContext& sc, ScopedS
Status
SegmentFileOperation::DoExecute(Store& store) {
auto field_element_id = GetStartedSS()->GetFieldElementId(context_.field_name, context_.field_element_name);
if (field_element_id == 0) {
std::stringstream emsg;
emsg << GetRepr() << ". Invalid field name: \"" << context_.field_name;
emsg << "\" or field element name: \"" << context_.field_element_name << "\"";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
resource_ = std::make_shared<SegmentFile>(context_.collection_id, context_.partition_id, context_.segment_id,
field_element_id);
AddStep(*resource_, false);
......
......@@ -101,6 +101,33 @@ class SegmentFileOperation : public CommitOperation<SegmentFile> {
SegmentFileContext context_;
};
class FieldCommitOperation : public CommitOperation<FieldCommit> {
public:
using BaseT = CommitOperation<FieldCommit>;
FieldCommitOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
FieldCommit::Ptr
GetPrevResource() const override;
Status
DoExecute(Store&) override;
/* Status */
/* PreCheck() override; */
};
class SchemaCommitOperation : public CommitOperation<SchemaCommit> {
public:
using BaseT = CommitOperation<SchemaCommit>;
SchemaCommitOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
SchemaCommit::Ptr
GetPrevResource() const override;
Status
DoExecute(Store&) override;
};
template <>
class LoadOperation<Collection> : public Operations {
public:
......
......@@ -162,6 +162,24 @@ Snapshot::ToString() const {
ss << ",mappings=";
auto& cc_m = GetCollectionCommit()->GetMappings();
ss << to_matrix_string(cc_m, row_element_size, 2);
auto& schema_m = GetSchemaCommit()->GetMappings();
ss << "\nSchemaCommit: id=" << GetSchemaCommit()->GetID() << ",mappings=";
ss << to_matrix_string(schema_m, row_element_size, 2);
for (auto& fc_id : schema_m) {
auto fc = GetResource<FieldCommit>(fc_id);
auto f = GetResource<Field>(fc->GetFieldId());
ss << "\n Field: id=" << f->GetID() << ",name=\"" << f->GetName() << "\"";
ss << ", FieldCommit: id=" << fc->GetID();
ss << ",mappings=";
auto& fc_m = fc->GetMappings();
ss << to_matrix_string(fc_m, row_element_size, 2);
for (auto& fe_id : fc_m) {
auto fe = GetResource<FieldElement>(fe_id);
ss << "\n\tFieldElement: id=" << fe_id << ",name=" << fe->GetName();
}
}
for (auto& p_c_id : cc_m) {
auto p_c = GetResource<PartitionCommit>(p_c_id);
auto p = GetResource<Partition>(p_c->GetPartitionId());
......
......@@ -241,6 +241,8 @@ class Store {
resources[res->GetID()] = res;
lock.unlock();
GetResource<ResourceT>(res->GetID(), return_v);
/* std::cout << ">>> [Update] " << ResourceT::Name << " " << id; */
/* std::cout << " " << std::boolalpha << res->IsActive() << std::endl; */
return Status::OK();
}
......@@ -415,7 +417,7 @@ class Store {
auto random_elements = rand_r(&seed) % 2 + 2;
for (auto fei = 1; fei <= random_elements; ++fei) {
std::stringstream fename;
fename << "fe_" << fei << "_";
fename << "fe_" << field->GetID() << "_" << fei << "_";
fename << std::get<Index<FieldElement::MapT, MockResourcesT>::value>(ids_) + 1;
FieldElementPtr element;
......@@ -458,7 +460,7 @@ class Store {
for (auto field_element_id : f_c_m) {
SegmentFilePtr sf;
CreateResource<SegmentFile>(
SegmentFile(c->GetID(), p->GetID(), s->GetID(), field_commit_id, 0, 0, 0, 0, ACTIVE),
SegmentFile(c->GetID(), p->GetID(), s->GetID(), field_element_id, 0, 0, 0, 0, ACTIVE),
sf);
all_records.push_back(sf);
......
......@@ -18,55 +18,6 @@
#include <algorithm>
#include "ssdb/utils.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/Context.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/ReferenceProxy.h"
#include "db/snapshot/ResourceHolders.h"
#include "db/snapshot/ScopedResource.h"
#include "db/snapshot/Snapshots.h"
#include "db/snapshot/Store.h"
#include "db/snapshot/WrappedTypes.h"
using ID_TYPE = milvus::engine::snapshot::ID_TYPE;
using IDS_TYPE = milvus::engine::snapshot::IDS_TYPE;
using LSN_TYPE = milvus::engine::snapshot::LSN_TYPE;
using MappingT = milvus::engine::snapshot::MappingT;
using LoadOperationContext = milvus::engine::snapshot::LoadOperationContext;
using CreateCollectionContext = milvus::engine::snapshot::CreateCollectionContext;
using SegmentFileContext = milvus::engine::snapshot::SegmentFileContext;
using OperationContext = milvus::engine::snapshot::OperationContext;
using PartitionContext = milvus::engine::snapshot::PartitionContext;
using BuildOperation = milvus::engine::snapshot::BuildOperation;
using MergeOperation = milvus::engine::snapshot::MergeOperation;
using CreateCollectionOperation = milvus::engine::snapshot::CreateCollectionOperation;
using NewSegmentOperation = milvus::engine::snapshot::NewSegmentOperation;
using DropPartitionOperation = milvus::engine::snapshot::DropPartitionOperation;
using CreatePartitionOperation = milvus::engine::snapshot::CreatePartitionOperation;
using DropCollectionOperation = milvus::engine::snapshot::DropCollectionOperation;
using CollectionCommitsHolder = milvus::engine::snapshot::CollectionCommitsHolder;
using CollectionsHolder = milvus::engine::snapshot::CollectionsHolder;
using CollectionScopedT = milvus::engine::snapshot::CollectionScopedT;
using Collection = milvus::engine::snapshot::Collection;
using CollectionPtr = milvus::engine::snapshot::CollectionPtr;
using Partition = milvus::engine::snapshot::Partition;
using PartitionPtr = milvus::engine::snapshot::PartitionPtr;
using Segment = milvus::engine::snapshot::Segment;
using SegmentPtr = milvus::engine::snapshot::SegmentPtr;
using SegmentFile = milvus::engine::snapshot::SegmentFile;
using SegmentFilePtr = milvus::engine::snapshot::SegmentFilePtr;
using Field = milvus::engine::snapshot::Field;
using FieldElement = milvus::engine::snapshot::FieldElement;
using Snapshots = milvus::engine::snapshot::Snapshots;
using ScopedSnapshotT = milvus::engine::snapshot::ScopedSnapshotT;
using ReferenceProxy = milvus::engine::snapshot::ReferenceProxy;
using Queue = milvus::BlockingQueue<ID_TYPE>;
using TQueue = milvus::BlockingQueue<std::tuple<ID_TYPE, ID_TYPE>>;
using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation<Collection>;
using ParamsField = milvus::engine::snapshot::ParamsField;
using IteratePartitionHandler = milvus::engine::snapshot::IterateHandler<Partition>;
using SSDBImpl = milvus::engine::SSDBImpl;
milvus::Status
CreateCollection(std::shared_ptr<SSDBImpl> db, const std::string& collection_name, const LSN_TYPE& lsn) {
......@@ -180,3 +131,76 @@ TEST_F(SSDBTest, PartitionTest) {
ASSERT_TRUE(status.ok());
ASSERT_EQ(partition_names.size(), 1);
}
TEST_F(SSDBTest, IndexTest) {
LSN_TYPE lsn = 0;
auto next_lsn = [&]() -> decltype(lsn) {
return ++lsn;
};
std::string c1 = "c1";
auto status = CreateCollection(db_, c1, next_lsn());
ASSERT_TRUE(status.ok());
std::stringstream p_name;
auto num = RandomInt(3, 5);
for (auto i = 0; i < num; ++i) {
p_name.str("");
p_name << "partition_" << i;
status = db_->CreatePartition(c1, p_name.str());
ASSERT_TRUE(status.ok());
}
ScopedSnapshotT ss;
status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok());
SegmentFileContext sf_context;
SFContextBuilder(sf_context, ss);
auto new_total = 0;
auto& partitions = ss->GetResources<Partition>();
for (auto& kv : partitions) {
num = RandomInt(2, 5);
for (auto i = 0; i < num; ++i) {
ASSERT_TRUE(CreateSegment(ss, kv.first, next_lsn(), sf_context).ok());
}
new_total += num;
}
auto field_element_id = ss->GetFieldElementId(sf_context.field_name, sf_context.field_element_name);
ASSERT_NE(field_element_id, 0);
auto filter1 = [&](SegmentFile::Ptr segment_file) -> bool {
if (segment_file->GetFieldElementId() == field_element_id) {
return true;
}
return false;
};
status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok());
auto sf_collector = std::make_shared<SegmentFileCollector>(ss, filter1);
sf_collector->Iterate();
std::cout << "Total " << sf_collector->segment_files_.size() << " of field_element_id ";
std::cout << field_element_id << std::endl;
ASSERT_EQ(new_total, sf_collector->segment_files_.size());
status = db_->DropIndex(c1, sf_context.field_name, sf_context.field_element_name);
ASSERT_TRUE(status.ok());
status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok());
sf_collector = std::make_shared<SegmentFileCollector>(ss, filter1);
sf_collector->Iterate();
std::cout << "Total " << sf_collector->segment_files_.size() << " of field_element_id ";
std::cout << field_element_id << std::endl;
ASSERT_EQ(0, sf_collector->segment_files_.size());
{
auto& field_elements = ss->GetResources<FieldElement>();
for (auto& kv : field_elements) {
ASSERT_NE(kv.second->GetID(), field_element_id);
}
}
}
......@@ -18,183 +18,6 @@
#include <algorithm>
#include "ssdb/utils.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/Context.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/ReferenceProxy.h"
#include "db/snapshot/ResourceHolders.h"
#include "db/snapshot/ScopedResource.h"
#include "db/snapshot/Snapshots.h"
#include "db/snapshot/Store.h"
#include "db/snapshot/WrappedTypes.h"
using ID_TYPE = milvus::engine::snapshot::ID_TYPE;
using IDS_TYPE = milvus::engine::snapshot::IDS_TYPE;
using LSN_TYPE = milvus::engine::snapshot::LSN_TYPE;
using MappingT = milvus::engine::snapshot::MappingT;
using LoadOperationContext = milvus::engine::snapshot::LoadOperationContext;
using CreateCollectionContext = milvus::engine::snapshot::CreateCollectionContext;
using SegmentFileContext = milvus::engine::snapshot::SegmentFileContext;
using OperationContext = milvus::engine::snapshot::OperationContext;
using PartitionContext = milvus::engine::snapshot::PartitionContext;
using DropIndexOperation = milvus::engine::snapshot::DropIndexOperation;
using DropAllIndexOperation = milvus::engine::snapshot::DropAllIndexOperation;
using BuildOperation = milvus::engine::snapshot::BuildOperation;
using MergeOperation = milvus::engine::snapshot::MergeOperation;
using CreateCollectionOperation = milvus::engine::snapshot::CreateCollectionOperation;
using NewSegmentOperation = milvus::engine::snapshot::NewSegmentOperation;
using DropPartitionOperation = milvus::engine::snapshot::DropPartitionOperation;
using CreatePartitionOperation = milvus::engine::snapshot::CreatePartitionOperation;
using DropCollectionOperation = milvus::engine::snapshot::DropCollectionOperation;
using CollectionCommitsHolder = milvus::engine::snapshot::CollectionCommitsHolder;
using CollectionsHolder = milvus::engine::snapshot::CollectionsHolder;
using CollectionScopedT = milvus::engine::snapshot::CollectionScopedT;
using Collection = milvus::engine::snapshot::Collection;
using CollectionPtr = milvus::engine::snapshot::CollectionPtr;
using Partition = milvus::engine::snapshot::Partition;
using PartitionPtr = milvus::engine::snapshot::PartitionPtr;
using Segment = milvus::engine::snapshot::Segment;
using SegmentPtr = milvus::engine::snapshot::SegmentPtr;
using SegmentFile = milvus::engine::snapshot::SegmentFile;
using SegmentFilePtr = milvus::engine::snapshot::SegmentFilePtr;
using Field = milvus::engine::snapshot::Field;
using FieldElement = milvus::engine::snapshot::FieldElement;
using Snapshots = milvus::engine::snapshot::Snapshots;
using ScopedSnapshotT = milvus::engine::snapshot::ScopedSnapshotT;
using ReferenceProxy = milvus::engine::snapshot::ReferenceProxy;
using Queue = milvus::BlockingQueue<ID_TYPE>;
using TQueue = milvus::BlockingQueue<std::tuple<ID_TYPE, ID_TYPE>>;
using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation<Collection>;
using ParamsField = milvus::engine::snapshot::ParamsField;
using IteratePartitionHandler = milvus::engine::snapshot::IterateHandler<Partition>;
using IterateSegmentFileHandler = milvus::engine::snapshot::IterateHandler<SegmentFile>;
struct PartitionCollector : public IteratePartitionHandler {
using ResourceT = Partition;
using BaseT = IteratePartitionHandler;
explicit PartitionCollector(ScopedSnapshotT ss) : BaseT(ss) {}
milvus::Status
PreIterate() override {
partition_names_.clear();
return milvus::Status::OK();
}
milvus::Status
Handle(const typename ResourceT::Ptr& partition) override {
partition_names_.push_back(partition->GetName());
return milvus::Status::OK();
}
std::vector<std::string> partition_names_;
};
using FilterT = std::function<bool(SegmentFile::Ptr)>;
struct SegmentFileCollector : public IterateSegmentFileHandler {
using ResourceT = SegmentFile;
using BaseT = IterateSegmentFileHandler;
explicit SegmentFileCollector(ScopedSnapshotT ss, const FilterT& filter)
: filter_(filter), BaseT(ss) {}
milvus::Status
PreIterate() override {
segment_files_.clear();
return milvus::Status::OK();
}
milvus::Status
Handle(const typename ResourceT::Ptr& segment_file) override {
if (!filter_(segment_file)) {
return milvus::Status::OK();
}
segment_files_.insert(segment_file->GetID());
return milvus::Status::OK();
}
FilterT filter_;
std::set<ID_TYPE> segment_files_;
};
struct WaitableObj {
bool notified_ = false;
std::mutex mutex_;
std::condition_variable cv_;
void
Wait() {
std::unique_lock<std::mutex> lck(mutex_);
if (!notified_) {
cv_.wait(lck);
}
notified_ = false;
}
void
Notify() {
std::unique_lock<std::mutex> lck(mutex_);
notified_ = true;
lck.unlock();
cv_.notify_one();
}
};
ScopedSnapshotT
CreateCollection(const std::string& collection_name, const LSN_TYPE& lsn) {
CreateCollectionContext context;
context.lsn = lsn;
auto collection_schema = std::make_shared<Collection>(collection_name);
context.collection = collection_schema;
auto vector_field = std::make_shared<Field>("vector", 0,
milvus::engine::snapshot::FieldType::VECTOR);
auto vector_field_element = std::make_shared<FieldElement>(0, 0, "ivfsq8",
milvus::engine::snapshot::FieldElementType::IVFSQ8);
auto int_field = std::make_shared<Field>("int", 0,
milvus::engine::snapshot::FieldType::INT32);
context.fields_schema[vector_field] = {vector_field_element};
context.fields_schema[int_field] = {};
auto op = std::make_shared<CreateCollectionOperation>(context);
op->Push();
ScopedSnapshotT ss;
auto status = op->GetSnapshot(ss);
return ss;
}
ScopedSnapshotT
CreatePartition(const std::string& collection_name, const PartitionContext& p_context, const LSN_TYPE& lsn) {
ScopedSnapshotT curr_ss;
ScopedSnapshotT ss;
auto status = Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
OperationContext context;
context.lsn = lsn;
auto op = std::make_shared<CreatePartitionOperation>(context, ss);
PartitionPtr partition;
status = op->CommitNewPartition(p_context, partition);
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
status = op->Push();
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
status = op->GetSnapshot(curr_ss);
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
return curr_ss;
}
TEST_F(SnapshotTest, ResourcesTest) {
int nprobe = 16;
......@@ -452,7 +275,7 @@ TEST_F(SnapshotTest, ConCurrentCollectionOperation) {
ID_TYPE stale_ss_id;
auto worker1 = [&]() {
milvus::Status status;
Status status;
auto ss = CreateCollection(collection_name, ++lsn);
ASSERT_TRUE(ss);
ASSERT_EQ(ss->GetName(), collection_name);
......@@ -628,24 +451,19 @@ TEST_F(SnapshotTest, IndexTest) {
return ++lsn;
};
SegmentFileContext sf_context;
sf_context.field_name = "f_1_1";
sf_context.field_element_name = "fe_1_1";
sf_context.segment_id = 1;
sf_context.partition_id = 1;
ScopedSnapshotT ss;
auto status = Snapshots::GetInstance().GetSnapshot(ss, 1);
std::cout << status.ToString() << std::endl;
ASSERT_TRUE(status.ok());
SegmentFileContext sf_context;
SFContextBuilder(sf_context, ss);
OperationContext context;
context.lsn = next_lsn();
context.prev_partition = ss->GetResource<Partition>(1);
context.prev_partition = ss->GetResource<Partition>(sf_context.partition_id);
auto build_op = std::make_shared<BuildOperation>(context, ss);
SegmentFilePtr seg_file;
status = build_op->CommitNewSegmentFile(sf_context, seg_file);
std::cout << status.ToString() << std::endl;
ASSERT_TRUE(status.ok());
ASSERT_TRUE(seg_file);
......@@ -670,10 +488,6 @@ TEST_F(SnapshotTest, IndexTest) {
auto it_found = sf_collector->segment_files_.find(seg_file->GetID());
ASSERT_NE(it_found, sf_collector->segment_files_.end());
/* for (auto& i : sf_collector->segment_files_) { */
/* std::cout << i << std::endl; */
/* } */
/* std::cout << "XX " << seg_file->GetID() << std::endl; */
status = Snapshots::GetInstance().GetSnapshot(ss, 1);
ASSERT_TRUE(status.ok());
......@@ -683,7 +497,6 @@ TEST_F(SnapshotTest, IndexTest) {
drop_ctx.stale_segment_file = seg_file;
auto drop_op = std::make_shared<DropIndexOperation>(drop_ctx, ss);
status = drop_op->Push();
std::cout << status.ToString() << std::endl;
ASSERT_TRUE(status.ok());
status = drop_op->GetSnapshot(ss);
......@@ -712,38 +525,19 @@ TEST_F(SnapshotTest, IndexTest) {
sf_collector->Iterate();
auto prev_total = sf_collector->segment_files_.size();
auto create_segment = [&](ID_TYPE partition_id) {
OperationContext context;
context.lsn = next_lsn();
context.prev_partition = ss->GetResource<Partition>(partition_id);
auto op = std::make_shared<NewSegmentOperation>(context, ss);
SegmentPtr new_seg;
status = op->CommitNewSegment(new_seg);
ASSERT_TRUE(status.ok());
ASSERT_FALSE(new_seg->ToString().empty());
SegmentFilePtr seg_file;
auto nsf_context = sf_context;
nsf_context.segment_id = new_seg->GetID();
nsf_context.partition_id = new_seg->GetPartitionId();
status = op->CommitNewSegmentFile(nsf_context, seg_file);
ASSERT_TRUE(status.ok());
status = op->Push();
ASSERT_TRUE(status.ok());
status = op->GetSnapshot(ss);
ASSERT_TRUE(status.ok());
};
auto new_total = 0;
auto partitions = ss->GetResources<Partition>();
for (auto& kv : partitions) {
num = RandomInt(2, 5);
for (auto i = 0; i < num; ++i) {
create_segment(kv.first);
ASSERT_TRUE(CreateSegment(ss, kv.first, next_lsn(), sf_context).ok());
}
new_total += num;
}
status = Snapshots::GetInstance().GetSnapshot(ss, ss->GetName());
ASSERT_TRUE(status.ok());
sf_collector = std::make_shared<SegmentFileCollector>(ss, filter2);
sf_collector->Iterate();
auto total = sf_collector->segment_files_.size();
......@@ -753,31 +547,58 @@ TEST_F(SnapshotTest, IndexTest) {
sf_context.field_element_name);
ASSERT_NE(field_element_id, 0);
auto filter3 = [&](SegmentFile::Ptr segment_file) -> bool {
if (segment_file->GetFieldElementId() == field_element_id) {
return true;
}
return false;
};
sf_collector = std::make_shared<SegmentFileCollector>(ss, filter3);
sf_collector->Iterate();
auto specified_segment_files_cnt = sf_collector->segment_files_.size();
OperationContext d_a_i_ctx;
d_a_i_ctx.lsn = next_lsn();
d_a_i_ctx.stale_field_element = ss->GetResource<FieldElement>(field_element_id);
auto drop_all_index_op = std::make_shared<DropAllIndexOperation>(d_a_i_ctx, ss);
status = drop_all_index_op->Push();
std::cout << status.ToString() << std::endl;
ASSERT_TRUE(status.ok());
status = drop_all_index_op->GetSnapshot(ss);
ASSERT_TRUE(status.ok());
/* { */
/* auto& fields = ss->GetResources<Field>(); */
/* for (auto field_kv : fields) { */
/* auto field = field_kv.second; */
/* std::cout << "field " << field->GetID() << " " << field->GetName() << std::endl; */
/* auto& field_elements = ss->GetResources<FieldElement>(); */
/* for (auto element_kv : field_elements) { */
/* auto element = element_kv.second; */
/* if (element->GetFieldId() != field->GetID()) { */
/* continue; */
/* } */
/* std::cout << "\tfield_element " << element->GetID() << " " << element->GetName() << std::endl; */
/* } */
/* } */
/* } */
sf_collector = std::make_shared<SegmentFileCollector>(ss, filter2);
sf_collector->Iterate();
ASSERT_EQ(sf_collector->segment_files_.size(), total - new_total);
ASSERT_EQ(sf_collector->segment_files_.size(), total - specified_segment_files_cnt);
{
auto& field_elements = ss->GetResources<FieldElement>();
for (auto& kv : field_elements) {
ASSERT_NE(kv.second->GetID(), field_element_id);
}
}
}
TEST_F(SnapshotTest, OperationTest) {
milvus::Status status;
Status status;
std::string to_string;
LSN_TYPE lsn;
SegmentFileContext sf_context;
sf_context.field_name = "f_1_1";
sf_context.field_element_name = "fe_1_1";
sf_context.segment_id = 1;
sf_context.partition_id = 1;
ScopedSnapshotT ss;
status = Snapshots::GetInstance().GetSnapshot(ss, 1);
......@@ -794,6 +615,9 @@ TEST_F(SnapshotTest, OperationTest) {
ASSERT_TRUE(collection_commit->ToString().empty());
}
SegmentFileContext sf_context;
SFContextBuilder(sf_context, ss);
OperationContext merge_ctx;
std::set<ID_TYPE> stale_segment_commit_ids;
......@@ -949,7 +773,7 @@ TEST_F(SnapshotTest, OperationTest) {
}
TEST_F(SnapshotTest, CompoundTest1) {
milvus::Status status;
Status status;
std::atomic<LSN_TYPE> lsn = 0;
auto next_lsn = [&]() -> decltype(lsn) {
return ++lsn;
......@@ -1290,7 +1114,7 @@ TEST_F(SnapshotTest, CompoundTest1) {
TEST_F(SnapshotTest, CompoundTest2) {
milvus::Status status;
Status status;
LSN_TYPE lsn = 0;
auto next_lsn = [&]() -> LSN_TYPE& {
return ++lsn;
......
......@@ -14,8 +14,65 @@
#include <gtest/gtest.h>
#include <random>
#include <memory>
#include <tuple>
#include <vector>
#include <set>
#include <string>
#include "db/SSDBImpl.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/Context.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/ReferenceProxy.h"
#include "db/snapshot/ResourceHolders.h"
#include "db/snapshot/ScopedResource.h"
#include "db/snapshot/Snapshots.h"
#include "db/snapshot/Store.h"
#include "db/snapshot/WrappedTypes.h"
using ID_TYPE = milvus::engine::snapshot::ID_TYPE;
using IDS_TYPE = milvus::engine::snapshot::IDS_TYPE;
using LSN_TYPE = milvus::engine::snapshot::LSN_TYPE;
using MappingT = milvus::engine::snapshot::MappingT;
using LoadOperationContext = milvus::engine::snapshot::LoadOperationContext;
using CreateCollectionContext = milvus::engine::snapshot::CreateCollectionContext;
using SegmentFileContext = milvus::engine::snapshot::SegmentFileContext;
using OperationContext = milvus::engine::snapshot::OperationContext;
using PartitionContext = milvus::engine::snapshot::PartitionContext;
using DropIndexOperation = milvus::engine::snapshot::DropIndexOperation;
using DropAllIndexOperation = milvus::engine::snapshot::DropAllIndexOperation;
using BuildOperation = milvus::engine::snapshot::BuildOperation;
using MergeOperation = milvus::engine::snapshot::MergeOperation;
using CreateCollectionOperation = milvus::engine::snapshot::CreateCollectionOperation;
using NewSegmentOperation = milvus::engine::snapshot::NewSegmentOperation;
using DropPartitionOperation = milvus::engine::snapshot::DropPartitionOperation;
using CreatePartitionOperation = milvus::engine::snapshot::CreatePartitionOperation;
using DropCollectionOperation = milvus::engine::snapshot::DropCollectionOperation;
using CollectionCommitsHolder = milvus::engine::snapshot::CollectionCommitsHolder;
using CollectionsHolder = milvus::engine::snapshot::CollectionsHolder;
using CollectionScopedT = milvus::engine::snapshot::CollectionScopedT;
using Collection = milvus::engine::snapshot::Collection;
using CollectionPtr = milvus::engine::snapshot::CollectionPtr;
using Partition = milvus::engine::snapshot::Partition;
using PartitionPtr = milvus::engine::snapshot::PartitionPtr;
using Segment = milvus::engine::snapshot::Segment;
using SegmentPtr = milvus::engine::snapshot::SegmentPtr;
using SegmentFile = milvus::engine::snapshot::SegmentFile;
using SegmentFilePtr = milvus::engine::snapshot::SegmentFilePtr;
using Field = milvus::engine::snapshot::Field;
using FieldElement = milvus::engine::snapshot::FieldElement;
using Snapshots = milvus::engine::snapshot::Snapshots;
using ScopedSnapshotT = milvus::engine::snapshot::ScopedSnapshotT;
using ReferenceProxy = milvus::engine::snapshot::ReferenceProxy;
using Queue = milvus::BlockingQueue<ID_TYPE>;
using TQueue = milvus::BlockingQueue<std::tuple<ID_TYPE, ID_TYPE>>;
using SoftDeleteCollectionOperation = milvus::engine::snapshot::SoftDeleteOperation<Collection>;
using ParamsField = milvus::engine::snapshot::ParamsField;
using IteratePartitionHandler = milvus::engine::snapshot::IterateHandler<Partition>;
using IterateSegmentFileHandler = milvus::engine::snapshot::IterateHandler<SegmentFile>;
using SSDBImpl = milvus::engine::SSDBImpl;
using Status = milvus::Status;
inline int
RandomInt(int start, int end) {
......@@ -25,6 +82,167 @@ RandomInt(int start, int end) {
return dist(rng);
}
inline void
SFContextBuilder(SegmentFileContext& ctx, ScopedSnapshotT sss) {
auto field = sss->GetResources<Field>().begin()->second;
ctx.field_name = field->GetName();
for (auto& kv : sss->GetResources<FieldElement>()) {
ctx.field_element_name = kv.second->GetName();
break;
}
auto& segments = sss->GetResources<Segment>();
if (segments.size() == 0) {
return;
}
ctx.segment_id = sss->GetResources<Segment>().begin()->second->GetID();
ctx.partition_id = sss->GetResources<Segment>().begin()->second->GetPartitionId();
}
struct PartitionCollector : public IteratePartitionHandler {
using ResourceT = Partition;
using BaseT = IteratePartitionHandler;
explicit PartitionCollector(ScopedSnapshotT ss) : BaseT(ss) {}
Status
PreIterate() override {
partition_names_.clear();
return Status::OK();
}
Status
Handle(const typename ResourceT::Ptr& partition) override {
partition_names_.push_back(partition->GetName());
return Status::OK();
}
std::vector<std::string> partition_names_;
};
using FilterT = std::function<bool(SegmentFile::Ptr)>;
struct SegmentFileCollector : public IterateSegmentFileHandler {
using ResourceT = SegmentFile;
using BaseT = IterateSegmentFileHandler;
explicit SegmentFileCollector(ScopedSnapshotT ss, const FilterT& filter)
: filter_(filter), BaseT(ss) {}
Status
PreIterate() override {
segment_files_.clear();
return Status::OK();
}
Status
Handle(const typename ResourceT::Ptr& segment_file) override {
if (!filter_(segment_file)) {
return Status::OK();
}
segment_files_.insert(segment_file->GetID());
return Status::OK();
}
FilterT filter_;
std::set<ID_TYPE> segment_files_;
};
struct WaitableObj {
bool notified_ = false;
std::mutex mutex_;
std::condition_variable cv_;
void
Wait() {
std::unique_lock<std::mutex> lck(mutex_);
if (!notified_) {
cv_.wait(lck);
}
notified_ = false;
}
void
Notify() {
std::unique_lock<std::mutex> lck(mutex_);
notified_ = true;
lck.unlock();
cv_.notify_one();
}
};
inline ScopedSnapshotT
CreateCollection(const std::string& collection_name, const LSN_TYPE& lsn) {
CreateCollectionContext context;
context.lsn = lsn;
auto collection_schema = std::make_shared<Collection>(collection_name);
context.collection = collection_schema;
auto vector_field = std::make_shared<Field>("vector", 0,
milvus::engine::snapshot::FieldType::VECTOR);
auto vector_field_element = std::make_shared<FieldElement>(0, 0, "ivfsq8",
milvus::engine::snapshot::FieldElementType::IVFSQ8);
auto int_field = std::make_shared<Field>("int", 0,
milvus::engine::snapshot::FieldType::INT32);
context.fields_schema[vector_field] = {vector_field_element};
context.fields_schema[int_field] = {};
auto op = std::make_shared<CreateCollectionOperation>(context);
op->Push();
ScopedSnapshotT ss;
auto status = op->GetSnapshot(ss);
return ss;
}
inline ScopedSnapshotT
CreatePartition(const std::string& collection_name, const PartitionContext& p_context, const LSN_TYPE& lsn) {
ScopedSnapshotT curr_ss;
ScopedSnapshotT ss;
auto status = Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
OperationContext context;
context.lsn = lsn;
auto op = std::make_shared<CreatePartitionOperation>(context, ss);
PartitionPtr partition;
status = op->CommitNewPartition(p_context, partition);
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
status = op->Push();
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
status = op->GetSnapshot(curr_ss);
if (!status.ok()) {
std::cout << status.ToString() << std::endl;
return curr_ss;
}
return curr_ss;
}
inline Status
CreateSegment(ScopedSnapshotT ss, ID_TYPE partition_id, LSN_TYPE lsn, const SegmentFileContext& sf_context) {
OperationContext context;
context.lsn = lsn;
context.prev_partition = ss->GetResource<Partition>(partition_id);
auto op = std::make_shared<NewSegmentOperation>(context, ss);
SegmentPtr new_seg;
STATUS_CHECK(op->CommitNewSegment(new_seg));
SegmentFilePtr seg_file;
auto nsf_context = sf_context;
nsf_context.segment_id = new_seg->GetID();
nsf_context.partition_id = new_seg->GetPartitionId();
STATUS_CHECK(op->CommitNewSegmentFile(nsf_context, seg_file));
STATUS_CHECK(op->Push());
return op->GetSnapshot(ss);
}
class BaseTest : public ::testing::Test {
protected:
void
......@@ -46,7 +264,7 @@ class SnapshotTest : public BaseTest {
class SSDBTest : public BaseTest {
protected:
std::shared_ptr<milvus::engine::SSDBImpl> db_;
std::shared_ptr<SSDBImpl> db_;
void
SetUp() override;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册