未验证 提交 4e143775 编写于 作者: X XuPeng-SH 提交者: GitHub

(db/snapshot): Update DropAllIndexOperation and some small changes (#3087)

* (db/snapshot): update for row count
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix bug in NewSegmentOperation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): remove dummy print
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Add some test for row count
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update size logic
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update size logic related ut
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): rollback if operation is not done
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): clean store
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): remove some dependency
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update for store
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update Store.h
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update store related code
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add field element modification operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): change new operation name
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint error
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Add Segment File Operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): crtp for BaseResource
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add InActiveResourcesGCEvent
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix ut error
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): small change
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update snapshot segmentcommit operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update drop all index operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update ut
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint error
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix gc
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix gc segment files
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): refactor Event related
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): change 1
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): change for GC event
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix build error for high version of boost filesystem
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): small change
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update compound operations
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add operation test
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update for CompoundSegmentsOperation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add test for CompoundSegmentsOperation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add time stat in operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): timeout handling
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): timeout handling 2
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): timeout handling 3
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): small refactor
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update for DropAllIndexOperation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
Co-authored-by: NWang XiangYu <xy.wang@zilliz.com>
上级 a85660be
......@@ -25,7 +25,7 @@
namespace milvus {
namespace engine {
struct LoadVectorFieldElementHandler : public snapshot::IterateHandler<snapshot::FieldElement> {
struct LoadVectorFieldElementHandler : public snapshot::FieldElementIterator {
using ResourceT = snapshot::FieldElement;
using BaseT = snapshot::IterateHandler<ResourceT>;
LoadVectorFieldElementHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss,
......@@ -38,7 +38,7 @@ struct LoadVectorFieldElementHandler : public snapshot::IterateHandler<snapshot:
const snapshot::FieldPtr field_;
};
struct LoadVectorFieldHandler : public snapshot::IterateHandler<snapshot::Field> {
struct LoadVectorFieldHandler : public snapshot::FieldIterator {
using ResourceT = snapshot::Field;
using BaseT = snapshot::IterateHandler<ResourceT>;
LoadVectorFieldHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss);
......@@ -49,7 +49,7 @@ struct LoadVectorFieldHandler : public snapshot::IterateHandler<snapshot::Field>
const server::ContextPtr context_;
};
struct SegmentsToSearchCollector : public snapshot::IterateHandler<snapshot::SegmentCommit> {
struct SegmentsToSearchCollector : public snapshot::SegmentCommitIterator {
using ResourceT = snapshot::SegmentCommit;
using BaseT = snapshot::IterateHandler<ResourceT>;
SegmentsToSearchCollector(snapshot::ScopedSnapshotT ss, snapshot::IDS_TYPE& segment_ids);
......@@ -60,7 +60,7 @@ struct SegmentsToSearchCollector : public snapshot::IterateHandler<snapshot::Seg
snapshot::IDS_TYPE& segment_ids_;
};
struct SegmentsToIndexCollector : public snapshot::IterateHandler<snapshot::SegmentCommit> {
struct SegmentsToIndexCollector : public snapshot::SegmentCommitIterator {
using ResourceT = snapshot::SegmentCommit;
using BaseT = snapshot::IterateHandler<ResourceT>;
SegmentsToIndexCollector(snapshot::ScopedSnapshotT ss, const std::string& field_name,
......@@ -74,7 +74,7 @@ struct SegmentsToIndexCollector : public snapshot::IterateHandler<snapshot::Segm
};
///////////////////////////////////////////////////////////////////////////////
struct GetEntityByIdSegmentHandler : public snapshot::IterateHandler<snapshot::Segment> {
struct GetEntityByIdSegmentHandler : public snapshot::SegmentIterator {
using ResourceT = snapshot::Segment;
using BaseT = snapshot::IterateHandler<ResourceT>;
GetEntityByIdSegmentHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss,
......
......@@ -742,8 +742,8 @@ ExecutionEngineImpl::BuildKnowhereIndex(const std::string& field_name, const Col
auto field_json = field->GetParams();
auto dimension = field_json[milvus::knowhere::meta::DIM];
auto segment_commit = snapshot->GetSegmentCommitBySegmentId(segment->GetID());
auto row_count = segment_commit->GetRowCount();
snapshot::SIZE_TYPE row_count;
snapshot->GetSegmentRowCount(segment->GetID(), row_count);
milvus::json conf = index_info.extra_params_;
conf[knowhere::meta::DIM] = dimension;
......
......@@ -17,6 +17,7 @@
#include <vector>
#include "db/meta/MetaAdapter.h"
#include "db/snapshot/IterateHandler.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/ResourceContext.h"
#include "db/snapshot/Snapshots.h"
......@@ -383,18 +384,21 @@ DropAllIndexOperation::DropAllIndexOperation(const OperationContext& context, Sc
Status
DropAllIndexOperation::PreCheck() {
if (context_.stale_field_element == nullptr) {
if (context_.stale_field_elements.size() == 0) {
std::stringstream emsg;
emsg << GetRepr() << ". Stale field element is requried";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
if (!GetStartedSS()->GetResource<FieldElement>(context_.stale_field_element->GetID())) {
std::stringstream emsg;
emsg << GetRepr() << ". Specified field element " << context_.stale_field_element->GetName();
emsg << " is stale";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
for (auto stale_fe : context_.stale_field_elements) {
if (!GetStartedSS()->GetResource<FieldElement>(stale_fe->GetID())) {
std::stringstream emsg;
emsg << GetRepr() << ". Specified field element " << stale_fe->GetName();
emsg << " is stale";
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
}
// TODO: Check type
return Status::OK();
}
......@@ -406,7 +410,6 @@ DropAllIndexOperation::DoExecute(StorePtr store) {
OperationContext cc_context;
{
auto context = context_;
context.stale_field_elements.push_back(context.stale_field_element);
FieldCommitOperation fc_op(context, GetAdjustedSS());
STATUS_CHECK(fc_op(store));
......@@ -430,20 +433,40 @@ DropAllIndexOperation::DoExecute(StorePtr store) {
}
std::map<ID_TYPE, std::vector<SegmentCommitPtr>> p_sc_map;
for (auto& kv : segment_files) {
if (kv.second->GetFieldElementId() != context_.stale_field_element->GetID()) {
continue;
}
std::set<ID_TYPE> stale_fe_ids;
for (auto& fe : context_.stale_field_elements) {
stale_fe_ids.insert(fe->GetID());
}
auto seg_executor = [&](const SegmentPtr& segment, SegmentIterator* handler) -> Status {
auto sf_ids = handler->ss_->GetSegmentFileIds(segment->GetID());
if (sf_ids.size() == 0) {
return Status::OK();
}
auto context = context_;
context.stale_segment_files.push_back(kv.second.Get());
for (auto& sf_id : sf_ids) {
auto sf = handler->ss_->GetResource<SegmentFile>(sf_id);
if (stale_fe_ids.find(sf->GetFieldElementId()) == stale_fe_ids.end()) {
continue;
}
context.stale_segment_files.push_back(sf);
}
if (context.stale_segment_files.size() == 0) {
return Status::OK();
}
SegmentCommitOperation sc_op(context, GetAdjustedSS());
STATUS_CHECK(sc_op(store));
STATUS_CHECK(sc_op.GetResource(context.new_segment_commit));
auto segc_ctx_p = ResourceContextBuilder<SegmentCommit>().SetOp(meta::oUpdate).CreatePtr();
AddStepWithLsn(*context.new_segment_commit, context.lsn, segc_ctx_p);
p_sc_map[context.new_segment_commit->GetPartitionId()].push_back(context.new_segment_commit);
}
return Status::OK();
};
auto segment_iter = std::make_shared<SegmentIterator>(GetAdjustedSS(), seg_executor);
segment_iter->Iterate();
STATUS_CHECK(segment_iter->GetStatus());
for (auto& kv : p_sc_map) {
auto& partition_id = kv.first;
......
......@@ -66,9 +66,6 @@ struct OperationContext {
std::vector<SegmentPtr> stale_segments;
FieldPtr prev_field = nullptr;
FieldElementPtr prev_field_element = nullptr;
FieldElementPtr stale_field_element = nullptr;
std::vector<FieldElementPtr> new_field_elements;
std::vector<FieldElementPtr> stale_field_elements;
......
......@@ -72,6 +72,7 @@ struct IterateHandler : public std::enable_shared_from_this<IterateHandler<T>> {
using CollectionIterator = IterateHandler<Collection>;
using PartitionIterator = IterateHandler<Partition>;
using SegmentCommitIterator = IterateHandler<SegmentCommit>;
using SegmentIterator = IterateHandler<Segment>;
using SegmentFileIterator = IterateHandler<SegmentFile>;
using FieldIterator = IterateHandler<Field>;
......
......@@ -130,6 +130,19 @@ Snapshot::Snapshot(StorePtr store, ID_TYPE ss_id) {
RefAll();
}
Status
Snapshot::GetSegmentRowCount(ID_TYPE segment_id, SIZE_TYPE& row_cnt) const {
auto sc = GetSegmentCommitBySegmentId(segment_id);
if (!sc) {
std::stringstream emsg;
emsg << "Snapshot::GetSegmentRowCount: Specified segment \"" << segment_id;
emsg << "\" not found";
return Status(SS_NOT_FOUND_ERROR, emsg.str());
}
row_cnt = sc->GetRowCount();
return Status::OK();
}
FieldPtr
Snapshot::GetField(const std::string& name) const {
auto it = field_names_map_.find(name);
......
......@@ -131,7 +131,6 @@ class Snapshot : public ReferenceProxy {
GetFieldElement(const std::string& field_name, const std::string& field_element_name,
FieldElementPtr& field_element) const;
// PXU TODO: add const. Need to change Scopedxxxx::Get
SegmentCommitPtr
GetSegmentCommitBySegmentId(ID_TYPE segment_id) const {
auto it = seg_segc_map_.find(segment_id);
......@@ -140,6 +139,9 @@ class Snapshot : public ReferenceProxy {
return GetResource<SegmentCommit>(it->second);
}
Status
GetSegmentRowCount(ID_TYPE segment_id, SIZE_TYPE&) const;
std::vector<std::string>
GetPartitionNames() const {
std::vector<std::string> names;
......
......@@ -573,14 +573,14 @@ TEST_F(SnapshotTest, IndexTest) {
OperationContext d_a_i_ctx;
d_a_i_ctx.lsn = next_lsn();
d_a_i_ctx.stale_field_element = ss->GetResource<FieldElement>(field_element_id);
d_a_i_ctx.stale_field_elements.push_back(ss->GetResource<FieldElement>(field_element_id));
FieldElement::Ptr fe;
status = ss->GetFieldElement(sf_context.field_name, sf_context.field_element_name,
fe);
ASSERT_TRUE(status.ok());
ASSERT_EQ(fe, d_a_i_ctx.stale_field_element);
ASSERT_EQ(fe, d_a_i_ctx.stale_field_elements[0]);
auto drop_all_index_op = std::make_shared<DropAllIndexOperation>(d_a_i_ctx, ss);
status = drop_all_index_op->Push();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册