未验证 提交 442e6aee 编写于 作者: X XuPeng-SH 提交者: GitHub

(snapshot/db): add ChangeSegmentFileOperation and related test (#3050)

* (db/snapshot): update for row count
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix bug in NewSegmentOperation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): remove dummy print
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Add some test for row count
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update size logic
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update size logic related ut
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): rollback if operation is not done
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): clean store
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): remove some dependency
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update for store
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update Store.h
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update store related code
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add field element modification operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): change new operation name
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint error
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Add Segment File Operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): crtp for BaseResource
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add InActiveResourcesGCEvent
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix ut error
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): small change
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update snapshot segmentcommit operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update drop all index operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update ut
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint error
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix gc
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix gc segment files
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): refactor Event related
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): change 1
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): change for GC event
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix build error for high version of boost filesystem
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): small change
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update compound operations
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add operation test
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
上级 cd4b667b
......@@ -529,7 +529,7 @@ ExecutionEngineImpl::BuildIndex() {
// add segment files
snapshot::OperationContext context;
context.prev_partition = snapshot->GetResource<snapshot::Partition>(segment->GetPartitionId());
auto build_op = std::make_shared<snapshot::AddSegmentFileOperation>(context, snapshot);
auto build_op = std::make_shared<snapshot::ChangeSegmentFileOperation>(context, snapshot);
auto add_segment_file = [&](const std::string& element_name, snapshot::SegmentFilePtr& seg_file) -> Status {
snapshot::SegmentFileContext sf_context;
......
......@@ -26,15 +26,24 @@ namespace milvus {
namespace engine {
namespace snapshot {
AddSegmentFileOperation::AddSegmentFileOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
ChangeSegmentFileOperation::ChangeSegmentFileOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
: BaseT(context, prev_ss) {
}
Status
AddSegmentFileOperation::DoExecute(StorePtr store) {
STATUS_CHECK(CheckStale(std::bind(&AddSegmentFileOperation::CheckSegmentStale, this, std::placeholders::_1,
ChangeSegmentFileOperation::DoExecute(StorePtr store) {
STATUS_CHECK(CheckStale(std::bind(&ChangeSegmentFileOperation::CheckSegmentStale, this, std::placeholders::_1,
context_.new_segment_files[0]->GetSegmentId())));
ID_TYPE segment_id = 0;
for (auto& stale_segment_file : context_.stale_segment_files) {
if (segment_id == 0) {
segment_id = stale_segment_file->GetSegmentId();
} else if (segment_id != stale_segment_file->GetSegmentId()) {
return Status(SS_INVALID_CONTEX_ERROR, "All segment files should be of same segment");
}
}
auto update_size = [&](SegmentFilePtr& file) {
auto update_ctx = ResourceContextBuilder<SegmentFile>().SetOp(meta::oUpdate).CreatePtr();
update_ctx->AddAttr(SizeField::Name);
......@@ -42,6 +51,11 @@ AddSegmentFileOperation::DoExecute(StorePtr store) {
};
for (auto& new_file : context_.new_segment_files) {
if (segment_id == 0) {
segment_id = new_file->GetSegmentId();
} else if (segment_id != new_file->GetSegmentId()) {
return Status(SS_INVALID_CONTEX_ERROR, "All segment files should be of same segment");
}
update_size(new_file);
}
......@@ -93,7 +107,7 @@ AddSegmentFileOperation::DoExecute(StorePtr store) {
}
Status
AddSegmentFileOperation::CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_TYPE segment_id) const {
ChangeSegmentFileOperation::CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_TYPE segment_id) const {
auto segment = latest_snapshot->GetResource<Segment>(segment_id);
if (!segment) {
std::stringstream emsg;
......@@ -104,16 +118,16 @@ AddSegmentFileOperation::CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_
}
Status
AddSegmentFileOperation::CommitRowCountDelta(SIZE_TYPE delta, bool sub) {
ChangeSegmentFileOperation::CommitRowCountDelta(SIZE_TYPE delta, bool sub) {
delta_ = delta;
sub_ = sub;
return Status::OK();
}
Status
AddSegmentFileOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created) {
ChangeSegmentFileOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created) {
STATUS_CHECK(CheckStale(
std::bind(&AddSegmentFileOperation::CheckSegmentStale, this, std::placeholders::_1, context.segment_id)));
std::bind(&ChangeSegmentFileOperation::CheckSegmentStale, this, std::placeholders::_1, context.segment_id)));
auto segment = GetStartedSS()->GetResource<Segment>(context.segment_id);
if (!segment || (context_.new_segment_files.size() > 0 &&
......@@ -270,7 +284,7 @@ DropAllIndexOperation::DoExecute(StorePtr store) {
}
auto context = context_;
context.stale_segment_file = kv.second.Get();
context.stale_segment_files.push_back(kv.second.Get());
SegmentCommitOperation sc_op(context, GetAdjustedSS());
STATUS_CHECK(sc_op(store));
STATUS_CHECK(sc_op.GetResource(context.new_segment_commit));
......@@ -306,7 +320,7 @@ DropIndexOperation::DropIndexOperation(const OperationContext& context, ScopedSn
Status
DropIndexOperation::PreCheck() {
if (context_.stale_segment_file == nullptr) {
if (context_.stale_segment_files.size() == 0) {
std::stringstream emsg;
emsg << GetRepr() << ". Stale segment is requried";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
......
......@@ -58,12 +58,12 @@ class CompoundBaseOperation : public Operations {
}
};
class AddSegmentFileOperation : public CompoundBaseOperation<AddSegmentFileOperation> {
class ChangeSegmentFileOperation : public CompoundBaseOperation<ChangeSegmentFileOperation> {
public:
using BaseT = CompoundBaseOperation<AddSegmentFileOperation>;
using BaseT = CompoundBaseOperation<ChangeSegmentFileOperation>;
static constexpr const char* Name = "B";
AddSegmentFileOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
ChangeSegmentFileOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
Status DoExecute(StorePtr) override;
......
......@@ -68,9 +68,20 @@ OperationContext::ToString() const {
}
ss << "]";
}
if (stale_segment_file) {
ss << ",S_SF=" << stale_segment_file->GetID();
if (stale_segment_files.size() > 0) {
ss << ",S_SF=[";
bool first = true;
for (auto& f : new_segment_files) {
if (!first) {
ss << INNER_DELIMITER;
}
ss << f->GetID();
first = false;
}
ss << "]";
}
if (new_segment_files.size() > 0) {
ss << ",N_SF=[";
bool first = true;
......
......@@ -63,7 +63,6 @@ struct OperationContext {
CollectionCommitPtr new_collection_commit = nullptr;
CollectionPtr new_collection = nullptr;
SegmentFilePtr stale_segment_file = nullptr;
std::vector<SegmentPtr> stale_segments;
FieldPtr prev_field = nullptr;
......@@ -83,6 +82,7 @@ struct OperationContext {
PartitionCommitPtr stale_partition_commit = nullptr;
SegmentFile::VecT new_segment_files;
SegmentFile::VecT stale_segment_files;
CollectionPtr collection = nullptr;
LSN_TYPE lsn = 0;
......
......@@ -205,8 +205,8 @@ SegmentCommit::Ptr
SegmentCommitOperation::GetPrevResource() const {
if (context_.new_segment_files.size() > 0) {
return GetStartedSS()->GetSegmentCommitBySegmentId(context_.new_segment_files[0]->GetSegmentId());
} else if (context_.stale_segment_file != nullptr) {
return GetStartedSS()->GetSegmentCommitBySegmentId(context_.stale_segment_file->GetSegmentId());
} else if (context_.stale_segment_files.size() > 0) {
return GetStartedSS()->GetSegmentCommitBySegmentId(context_.stale_segment_files[0]->GetSegmentId());
}
return nullptr;
}
......@@ -220,9 +220,9 @@ SegmentCommitOperation::DoExecute(StorePtr store) {
resource_->SetID(0);
resource_->ResetStatus();
size = resource_->GetSize();
if (context_.stale_segment_file) {
resource_->GetMappings().erase(context_.stale_segment_file->GetID());
size -= context_.stale_segment_file->GetSize();
for (auto& stale_segment_file : context_.stale_segment_files) {
resource_->GetMappings().erase(stale_segment_file->GetID());
size -= stale_segment_file->GetSize();
}
} else {
resource_ = std::make_shared<SegmentCommit>(GetStartedSS()->GetLatestSchemaCommitId(),
......@@ -240,14 +240,10 @@ SegmentCommitOperation::DoExecute(StorePtr store) {
Status
SegmentCommitOperation::PreCheck() {
if (context_.stale_segment_file == nullptr && context_.new_segment_files.size() == 0) {
if (context_.stale_segment_files.size() == 0 && context_.new_segment_files.size() == 0) {
std::stringstream emsg;
emsg << GetRepr() << ". new_segment_files should not be empty in context";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
} else if (context_.stale_segment_file != nullptr && context_.new_segment_files.size() > 0) {
std::stringstream emsg;
emsg << GetRepr() << ". new_segment_files should be empty in context";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
return Status::OK();
}
......
......@@ -479,7 +479,7 @@ TEST_F(SnapshotTest, IndexTest) {
OperationContext context;
context.lsn = next_lsn();
context.prev_partition = ss->GetResource<Partition>(sf_context.partition_id);
auto build_op = std::make_shared<AddSegmentFileOperation>(context, ss);
auto build_op = std::make_shared<ChangeSegmentFileOperation>(context, ss);
SegmentFilePtr seg_file;
status = build_op->CommitNewSegmentFile(sf_context, seg_file);
ASSERT_TRUE(status.ok());
......@@ -510,7 +510,7 @@ TEST_F(SnapshotTest, IndexTest) {
OperationContext drop_ctx;
drop_ctx.lsn = next_lsn();
drop_ctx.stale_segment_file = seg_file;
drop_ctx.stale_segment_files.push_back(seg_file);
auto drop_op = std::make_shared<DropIndexOperation>(drop_ctx, ss);
status = drop_op->Push();
ASSERT_TRUE(status.ok());
......@@ -673,7 +673,7 @@ TEST_F(SnapshotTest, OperationTest) {
{
OperationContext context;
context.lsn = ++lsn;
auto build_op = std::make_shared<AddSegmentFileOperation>(context, ss);
auto build_op = std::make_shared<ChangeSegmentFileOperation>(context, ss);
SegmentFilePtr seg_file;
status = build_op->CommitNewSegmentFile(sf_context, seg_file);
std::cout << status.ToString() << std::endl;
......@@ -841,7 +841,7 @@ TEST_F(SnapshotTest, OperationTest) {
{
OperationContext context;
context.lsn = ++lsn;
auto build_op = std::make_shared<AddSegmentFileOperation>(context, new_ss);
auto build_op = std::make_shared<ChangeSegmentFileOperation>(context, new_ss);
SegmentFilePtr seg_file;
auto new_sf_context = sf_context;
new_sf_context.segment_id = new_seg_id;
......@@ -852,7 +852,7 @@ TEST_F(SnapshotTest, OperationTest) {
{
OperationContext context;
context.lsn = ++lsn;
auto op = std::make_shared<AddSegmentFileOperation>(context, ss);
auto op = std::make_shared<ChangeSegmentFileOperation>(context, ss);
SegmentFilePtr seg_file;
auto new_sf_context = sf_context;
new_sf_context.segment_id = merge_seg->GetID();
......@@ -918,7 +918,7 @@ TEST_F(SnapshotTest, OperationTest) {
{
OperationContext context;
context.lsn = ++lsn;
auto build_op = std::make_shared<AddSegmentFileOperation>(context, ss);
auto build_op = std::make_shared<ChangeSegmentFileOperation>(context, ss);
SegmentFilePtr seg_file;
auto new_sf_context = sf_context;
new_sf_context.segment_id = merge_seg->GetID();
......@@ -938,6 +938,77 @@ TEST_F(SnapshotTest, OperationTest) {
Snapshots::GetInstance().Reset();
}
TEST_F(SnapshotTest, ChangeSegmentFileOperationTest) {
LSN_TYPE lsn = 0;
std::string collection_name("c1");
auto ss = CreateCollection(collection_name, ++lsn);
ASSERT_TRUE(ss);
SegmentFileContext sf_context;
SFContextBuilder(sf_context, ss);
auto& partitions = ss->GetResources<Partition>();
auto total_row_cnt = 0;
for (auto& kv : partitions) {
auto num = RandomInt(2, 5);
for (auto i = 0; i < num; ++i) {
auto row_cnt = RandomInt(100, 200);
ASSERT_TRUE(CreateSegment(ss, kv.first, ++lsn, sf_context, row_cnt).ok());
total_row_cnt += row_cnt;
}
}
auto status = Snapshots::GetInstance().GetSnapshot(ss, collection_name);
ASSERT_TRUE(status.ok());
ASSERT_EQ(total_row_cnt, ss->GetCollectionCommit()->GetRowCount());
auto total_size = ss->GetCollectionCommit()->GetSize();
auto target_segment = ss->GetResources<Segment>().begin()->second.Get();
auto sf_ids = ss->GetSegmentFileIds(target_segment->GetID());
ASSERT_GT(sf_ids.size(), 0);
auto stale_sf = ss->GetResource<SegmentFile>(*(sf_ids.begin()));
ASSERT_TRUE(stale_sf);
std::cout << stale_sf->GetSize() << std::endl;
OperationContext context;
context.lsn = ++lsn;
context.stale_segment_files.push_back(stale_sf);
auto op = std::make_shared<ChangeSegmentFileOperation>(context, ss);
SegmentFilePtr seg_file;
sf_context.field_name = "vector";
sf_context.field_element_name = "_raw";
sf_context.segment_id = stale_sf->GetSegmentId();
sf_context.partition_id = stale_sf->GetPartitionId();
sf_context.collection_id = stale_sf->GetCollectionId();
status = op->CommitNewSegmentFile(sf_context, seg_file);
/* std::cout << status.ToString() << std::endl; */
ASSERT_TRUE(status.ok());
ASSERT_TRUE(seg_file);
auto prev_segment_commit = ss->GetSegmentCommitBySegmentId(seg_file->GetSegmentId());
auto prev_segment_commit_mappings = prev_segment_commit->GetMappings();
ASSERT_FALSE(prev_segment_commit->ToString().empty());
auto new_size = RandomInt(1000, 20000);
seg_file->SetSize(new_size);
total_size += new_size;
total_size -= stale_sf->GetSize();
auto delta = prev_segment_commit->GetRowCount() / 2;
op->CommitRowCountDelta(delta);
total_row_cnt -= delta;
status = op->Push();
ASSERT_TRUE(status.ok()) << status.message();
status = op->GetSnapshot(ss);
ASSERT_TRUE(status.ok());
ASSERT_EQ(ss->GetCollectionCommit()->GetRowCount(), total_row_cnt);
ASSERT_EQ(ss->GetCollectionCommit()->GetSize(), total_size);
}
TEST_F(SnapshotTest, CompoundTest1) {
Status status;
std::atomic<LSN_TYPE> lsn = 0;
......@@ -988,7 +1059,7 @@ TEST_F(SnapshotTest, CompoundTest1) {
OperationContext context;
context.lsn = next_lsn();
auto build_op = std::make_shared<AddSegmentFileOperation>(context, latest_ss);
auto build_op = std::make_shared<ChangeSegmentFileOperation>(context, latest_ss);
SegmentFilePtr seg_file;
build_sf_context.segment_id = seg_id;
status = build_op->CommitNewSegmentFile(build_sf_context, seg_file);
......@@ -1332,7 +1403,7 @@ TEST_F(SnapshotTest, CompoundTest2) {
OperationContext context;
context.lsn = next_lsn();
auto build_op = std::make_shared<AddSegmentFileOperation>(context, latest_ss);
auto build_op = std::make_shared<ChangeSegmentFileOperation>(context, latest_ss);
SegmentFilePtr seg_file;
build_sf_context.segment_id = seg_id;
status = build_op->CommitNewSegmentFile(build_sf_context, seg_file);
......
......@@ -46,7 +46,7 @@ using PartitionContext = milvus::engine::snapshot::PartitionContext;
using DropIndexOperation = milvus::engine::snapshot::DropIndexOperation;
using AddFieldElementOperation = milvus::engine::snapshot::AddFieldElementOperation;
using DropAllIndexOperation = milvus::engine::snapshot::DropAllIndexOperation;
using AddSegmentFileOperation = milvus::engine::snapshot::AddSegmentFileOperation;
using ChangeSegmentFileOperation = milvus::engine::snapshot::ChangeSegmentFileOperation;
using MergeOperation = milvus::engine::snapshot::MergeOperation;
using CreateCollectionOperation = milvus::engine::snapshot::CreateCollectionOperation;
using NewSegmentOperation = milvus::engine::snapshot::NewSegmentOperation;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册