未验证 提交 efe1034f 编写于 作者: X XuPeng-SH 提交者: GitHub

(db/snapshot): Update DropAllIndexOperation (#2947)

* (db/snapshot): update for row count
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix bug in NewSegmentOperation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): remove dummy print
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Add some test for row count
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update size logic
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update size logic related ut
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): rollback if operation is not done
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): clean store
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): remove some dependency
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update for store
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update Store.h
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update store related code
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add field element modification operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): change new operation name
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint error
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Add Segment File Operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): crtp for BaseResource
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add InActiveResourcesGCEvent
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix ut error
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): small change
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update snapshot segmentcommit operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update drop all index operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update ut
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint error
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
上级 b3608e1a
......@@ -610,7 +610,9 @@ SSDBImpl::DropIndex(const std::string& collection_name, const std::string& field
// SS TODO: Check Index Type
snapshot::OperationContext context;
STATUS_CHECK(ss->GetFieldElement(field_name, element_name, context.stale_field_element));
snapshot::FieldElementPtr stale_field_element;
STATUS_CHECK(ss->GetFieldElement(field_name, element_name, stale_field_element));
context.stale_field_elements.push_back(stale_field_element);
auto op = std::make_shared<snapshot::DropAllIndexOperation>(context, ss);
STATUS_CHECK(op->Push());
......
......@@ -13,10 +13,12 @@
#include <map>
#include <memory>
#include <set>
#include <sstream>
#include <vector>
#include "db/meta/MetaAdapter.h"
#include "db/snapshot/IterateHandler.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/ResourceContext.h"
#include "db/snapshot/Snapshots.h"
......@@ -217,30 +219,41 @@ DropAllIndexOperation::DropAllIndexOperation(const OperationContext& context, Sc
Status
DropAllIndexOperation::PreCheck() {
if (context_.stale_field_element == nullptr) {
if (context_.stale_field_elements.size() == 0) {
std::stringstream emsg;
emsg << GetRepr() << ". Stale field element is requried";
emsg << GetRepr() << ". Stale field element is empty";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
if (!GetStartedSS()->GetResource<FieldElement>(context_.stale_field_element->GetID())) {
std::set<ID_TYPE> field_ids;
for (auto& stale_field_element : context_.stale_field_elements) {
if (!GetStartedSS()->GetResource<FieldElement>(stale_field_element->GetID())) {
std::stringstream emsg;
emsg << GetRepr() << ". Specified field element " << stale_field_element->GetName();
emsg << " is stale";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
field_ids.insert(stale_field_element->GetFieldId());
}
if (field_ids.size() > 1) {
std::stringstream emsg;
emsg << GetRepr() << ". Specified field element " << context_.stale_field_element->GetName();
emsg << " is stale";
emsg << GetRepr() << ". All stale field elements should be of same field";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
// TODO: Check type
return Status::OK();
}
Status
DropAllIndexOperation::DoExecute(StorePtr store) {
auto& segment_files = GetAdjustedSS()->GetResources<SegmentFile>();
/* auto& segment_files = GetAdjustedSS()->GetResources<SegmentFile>(); */
OperationContext cc_context;
{
auto context = context_;
context.stale_field_elements.push_back(context.stale_field_element);
FieldCommitOperation fc_op(context, GetAdjustedSS());
STATUS_CHECK(fc_op(store));
......@@ -264,20 +277,29 @@ DropAllIndexOperation::DoExecute(StorePtr store) {
}
std::map<ID_TYPE, std::vector<SegmentCommitPtr>> p_sc_map;
for (auto& kv : segment_files) {
if (kv.second->GetFieldElementId() != context_.stale_field_element->GetID()) {
continue;
}
auto executor = [&](const Segment::Ptr& segment, SegmentIterator* handler) -> Status {
auto context = context_;
context.stale_segment_file = kv.second.Get();
for (auto& stale_element : context.stale_field_elements) {
auto segment_file = handler->ss_->GetSegmentFile(segment->GetID(), stale_element->GetID());
if (segment_file) {
context.stale_segment_files.push_back(segment_file);
}
}
if (context.stale_segment_files.size() == 0) {
return Status::OK();
}
SegmentCommitOperation sc_op(context, GetAdjustedSS());
STATUS_CHECK(sc_op(store));
STATUS_CHECK(sc_op.GetResource(context.new_segment_commit));
auto segc_ctx_p = ResourceContextBuilder<SegmentCommit>().SetOp(meta::oUpdate).CreatePtr();
AddStepWithLsn(*context.new_segment_commit, context.lsn, segc_ctx_p);
p_sc_map[context.new_segment_commit->GetPartitionId()].push_back(context.new_segment_commit);
}
return Status::OK();
};
auto segment_iterator = std::make_shared<SegmentIterator>(GetAdjustedSS(), executor);
segment_iterator->Iterate();
STATUS_CHECK(segment_iterator->GetStatus());
for (auto& kv : p_sc_map) {
auto& partition_id = kv.first;
......@@ -306,7 +328,7 @@ DropIndexOperation::DropIndexOperation(const OperationContext& context, ScopedSn
Status
DropIndexOperation::PreCheck() {
if (context_.stale_segment_file == nullptr) {
if (context_.stale_segment_files.size() == 0) {
std::stringstream emsg;
emsg << GetRepr() << ". Stale segment is requried";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
......
......@@ -68,8 +68,17 @@ OperationContext::ToString() const {
}
ss << "]";
}
if (stale_segment_file) {
ss << ",S_SF=" << stale_segment_file->GetID();
if (stale_segment_files.size() > 0) {
ss << ",S_SF=[";
bool first = true;
for (auto& f : stale_segment_files) {
if (!first) {
ss << INNER_DELIMITER;
}
ss << f->GetID();
first = false;
}
ss << "]";
}
if (new_segment_files.size() > 0) {
ss << ",N_SF=[";
......
......@@ -62,12 +62,11 @@ struct OperationContext {
CollectionCommitPtr new_collection_commit = nullptr;
CollectionPtr new_collection = nullptr;
SegmentFilePtr stale_segment_file = nullptr;
SegmentFile::VecT stale_segment_files;
std::vector<SegmentPtr> stale_segments;
FieldPtr prev_field = nullptr;
FieldElementPtr prev_field_element = nullptr;
FieldElementPtr stale_field_element = nullptr;
std::vector<FieldElementPtr> new_field_elements;
std::vector<FieldElementPtr> stale_field_elements;
......
......@@ -205,8 +205,8 @@ SegmentCommit::Ptr
SegmentCommitOperation::GetPrevResource() const {
if (context_.new_segment_files.size() > 0) {
return GetStartedSS()->GetSegmentCommitBySegmentId(context_.new_segment_files[0]->GetSegmentId());
} else if (context_.stale_segment_file != nullptr) {
return GetStartedSS()->GetSegmentCommitBySegmentId(context_.stale_segment_file->GetSegmentId());
} else if (context_.stale_segment_files.size() != 0) {
return GetStartedSS()->GetSegmentCommitBySegmentId(context_.stale_segment_files[0]->GetSegmentId());
}
return nullptr;
}
......@@ -220,10 +220,11 @@ SegmentCommitOperation::DoExecute(StorePtr store) {
resource_->SetID(0);
resource_->ResetStatus();
size = resource_->GetSize();
if (context_.stale_segment_file) {
resource_->GetMappings().erase(context_.stale_segment_file->GetID());
size -= context_.stale_segment_file->GetSize();
for (auto& stale_file : context_.stale_segment_files) {
resource_->GetMappings().erase(stale_file->GetID());
size -= stale_file->GetSize();
}
} else {
resource_ = std::make_shared<SegmentCommit>(GetStartedSS()->GetLatestSchemaCommitId(),
context_.new_segment_files[0]->GetPartitionId(),
......@@ -240,11 +241,11 @@ SegmentCommitOperation::DoExecute(StorePtr store) {
Status
SegmentCommitOperation::PreCheck() {
if (context_.stale_segment_file == nullptr && context_.new_segment_files.size() == 0) {
if (context_.stale_segment_files.size() == 0 && context_.new_segment_files.size() == 0) {
std::stringstream emsg;
emsg << GetRepr() << ". new_segment_files should not be empty in context";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
} else if (context_.stale_segment_file != nullptr && context_.new_segment_files.size() > 0) {
} else if (context_.stale_segment_files.size() > 0 && context_.new_segment_files.size() > 0) {
std::stringstream emsg;
emsg << GetRepr() << ". new_segment_files should be empty in context";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
......
......@@ -510,7 +510,7 @@ TEST_F(SnapshotTest, IndexTest) {
OperationContext drop_ctx;
drop_ctx.lsn = next_lsn();
drop_ctx.stale_segment_file = seg_file;
drop_ctx.stale_segment_files.push_back(seg_file);
auto drop_op = std::make_shared<DropIndexOperation>(drop_ctx, ss);
status = drop_op->Push();
ASSERT_TRUE(status.ok());
......@@ -573,38 +573,24 @@ TEST_F(SnapshotTest, IndexTest) {
OperationContext d_a_i_ctx;
d_a_i_ctx.lsn = next_lsn();
d_a_i_ctx.stale_field_element = ss->GetResource<FieldElement>(field_element_id);
d_a_i_ctx.stale_field_elements.push_back(ss->GetResource<FieldElement>(field_element_id));
FieldElement::Ptr fe;
status = ss->GetFieldElement(sf_context.field_name, sf_context.field_element_name,
fe);
ASSERT_TRUE(status.ok());
ASSERT_EQ(fe, d_a_i_ctx.stale_field_element);
ASSERT_EQ(fe, d_a_i_ctx.stale_field_elements[0]);
std::cout << ss->ToString() << std::endl;
auto drop_all_index_op = std::make_shared<DropAllIndexOperation>(d_a_i_ctx, ss);
status = drop_all_index_op->Push();
std::cout << status.ToString() << std::endl;
ASSERT_TRUE(status.ok());
status = drop_all_index_op->GetSnapshot(ss);
ASSERT_TRUE(status.ok());
/* { */
/* auto& fields = ss->GetResources<Field>(); */
/* for (auto field_kv : fields) { */
/* auto field = field_kv.second; */
/* std::cout << "field " << field->GetID() << " " << field->GetName() << std::endl; */
/* auto& field_elements = ss->GetResources<FieldElement>(); */
/* for (auto element_kv : field_elements) { */
/* auto element = element_kv.second; */
/* if (element->GetFieldId() != field->GetID()) { */
/* continue; */
/* } */
/* std::cout << "\tfield_element " << element->GetID() << " " << element->GetName() << std::endl; */
/* } */
/* } */
/* } */
sf_collector = std::make_shared<SegmentFileCollector>(ss, filter2);
sf_collector->Iterate();
ASSERT_EQ(sf_collector->segment_files_.size(), total - specified_segment_files_cnt);
......@@ -615,6 +601,59 @@ TEST_F(SnapshotTest, IndexTest) {
ASSERT_NE(kv.second->GetID(), field_element_id);
}
}
{
auto& fields = ss->GetResources<Field>();
OperationContext dai_ctx;
for (auto& field : fields) {
auto elements = ss->GetFieldElementsByField(field.second->GetName());
ASSERT_GE(elements.size(), 1);
dai_ctx.stale_field_elements.push_back(elements[0]);
}
ASSERT_GT(dai_ctx.stale_field_elements.size(), 1);
auto op = std::make_shared<DropAllIndexOperation>(dai_ctx, ss);
status = op->Push();
ASSERT_FALSE(status.ok());
}
{
auto& fields = ss->GetResources<Field>();
ASSERT_GT(fields.size(), 0);
OperationContext dai_ctx;
std::string field_name;
std::set<ID_TYPE> stale_element_ids;
for (auto& field : fields) {
field_name = field.second->GetName();
auto elements = ss->GetFieldElementsByField(field_name);
ASSERT_GE(elements.size(), 2);
for (auto& element : elements) {
stale_element_ids.insert(element->GetID());
}
dai_ctx.stale_field_elements = std::move(elements);
break;
}
std::set<ID_TYPE> stale_segment_ids;
auto& segment_files = ss->GetResources<SegmentFile>();
for (auto& kv : segment_files) {
auto& id = kv.first;
auto& segment_file = kv.second;
auto it = stale_element_ids.find(segment_file->GetFieldElementId());
if (it != stale_element_ids.end()) {
stale_segment_ids.insert(id);
}
}
auto prev_segment_file_cnt = segment_files.size();
ASSERT_GT(dai_ctx.stale_field_elements.size(), 1);
auto op = std::make_shared<DropAllIndexOperation>(dai_ctx, ss);
status = op->Push();
ASSERT_TRUE(status.ok());
status = op->GetSnapshot(ss);
ASSERT_TRUE(status.ok());
ASSERT_EQ(ss->GetResources<SegmentFile>().size() + stale_segment_ids.size(), prev_segment_file_cnt);
}
}
TEST_F(SnapshotTest, OperationTest) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册