未验证 提交 1449cf09 编写于 作者: X XuPeng-SH 提交者: GitHub

(db/snapshot): Handle timeout with external metastore (#3075)

* (db/snapshot): update for row count
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix bug in NewSegmentOperation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): remove dummy print
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Add some test for row count
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update size logic
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update size logic related ut
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): rollback if operation is not done
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): clean store
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): remove some dependency
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update for store
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update Store.h
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update store related code
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add field element modification operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): change new operation name
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint error
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): Add Segment File Operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): crtp for BaseResource
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add InActiveResourcesGCEvent
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix ut error
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): small change
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update snapshot segmentcommit operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update drop all index operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update ut
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix lint error
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix gc
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix gc segment files
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): refactor Event related
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): change 1
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): change for GC event
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): fix build error for high version of boost filesystem
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): small change
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update compound operations
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add operation test
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): update for CompoundSegmentsOperation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add test for CompoundSegmentsOperation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add time stat in operation
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): timeout handling
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): timeout handling 2
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): timeout handling 3
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
Co-authored-by: NWang XiangYu <xy.wang@zilliz.com>
上级 68e1a8da
......@@ -70,6 +70,7 @@ Operations::ToString() const {
ss << " | " << FailureString();
}
}
ss << " | " << execution_time_ / 1000 << " ms";
return ss.str();
}
......@@ -183,7 +184,6 @@ Operations::GetSnapshot(ScopedSnapshotT& ss) const {
STATUS_CHECK(CheckPrevSnapshot());
STATUS_CHECK(CheckDone());
STATUS_CHECK(CheckIDSNotEmpty());
/* status = Snapshots::GetInstance().GetSnapshot(ss, prev_ss_->GetCollectionId(), ids_.back()); */
ss = context_.latest_ss;
return Status::OK();
}
......@@ -197,7 +197,11 @@ Operations::ApplyToStore(StorePtr store) {
Done(store);
return status_;
}
auto start_time = std::chrono::high_resolution_clock::now();
auto status = OnExecute(store);
execution_time_ =
std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now() - start_time)
.count();
SetStatus(status);
Done(store);
return status_;
......@@ -241,9 +245,42 @@ Operations::DoExecute(StorePtr store) {
return Status::OK();
}
Status
Operations::OnApplyTimeoutCallback(StorePtr store) {
ApplyContext context;
context.on_succes_cb = std::bind(&Operations::OnApplySuccessCallback, this, std::placeholders::_1);
context.on_error_cb = std::bind(&Operations::OnApplyErrorCallback, this, std::placeholders::_1);
auto try_times = 0;
auto status = store->ApplyOperation(*this, context);
while (status.code() == SS_TIMEOUT && !HasAborted()) {
std::cout << GetName() << " Timeout! Try " << ++try_times << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
status = store->ApplyOperation(*this, context);
}
return status;
}
Status
Operations::OnApplySuccessCallback(ID_TYPE result_id) {
SetStepResult(result_id);
return Status::OK();
}
Status
Operations::OnApplyErrorCallback(Status status) {
return status;
}
Status
Operations::PostExecute(StorePtr store) {
return store->ApplyOperation(*this);
ApplyContext context;
context.on_succes_cb = std::bind(&Operations::OnApplySuccessCallback, this, std::placeholders::_1);
context.on_error_cb = std::bind(&Operations::OnApplyErrorCallback, this, std::placeholders::_1);
context.on_timeout_cb = std::bind(&Operations::OnApplyTimeoutCallback, this, std::placeholders::_1);
return store->ApplyOperation(*this, context);
}
template <typename ResourceT>
......@@ -263,7 +300,7 @@ Operations::RollBack() {
}
Operations::~Operations() {
if (!status_.ok() || !done_) {
if ((!status_.ok() || !done_) && status_.code() != SS_TIMEOUT) {
RollBack();
}
}
......
......@@ -35,9 +35,6 @@ namespace engine {
namespace snapshot {
using CheckStaleFunc = std::function<Status(ScopedSnapshotT&)>;
// using StepsHolderT = std::tuple<CollectionCommit::SetT, Collection::SetT, SchemaCommit::SetT, FieldCommit::SetT,
// Field::SetT, FieldElement::SetT, PartitionCommit::SetT, Partition::SetT,
// SegmentCommit::SetT, Segment::SetT, SegmentFile::SetT>;
template <typename ResourceT>
using StepsContextSet = std::set<typename ResourceContext<ResourceT>::Ptr>;
using StepsHolderT =
......@@ -50,6 +47,8 @@ enum OperationsType { Invalid, W_Leaf, O_Leaf, W_Compound, O_Compound };
class Operations : public std::enable_shared_from_this<Operations> {
public:
using TimeoutCBT = std::function<Status(const Status&)>;
Operations(const OperationContext& context, ScopedSnapshotT prev_ss,
const OperationsType& type = OperationsType::Invalid);
......@@ -161,12 +160,26 @@ class Operations : public std::enable_shared_from_this<Operations> {
virtual Status
OnSnapshotDropped();
void
Abort() {
aborted_ = true;
}
bool
HasAborted() const {
return aborted_;
}
virtual ~Operations();
friend std::ostream&
operator<<(std::ostream& out, const Operations& operation);
protected:
virtual Status
OnApplySuccessCallback(ID_TYPE result_id);
virtual Status OnApplyErrorCallback(Status);
virtual Status OnApplyTimeoutCallback(StorePtr);
virtual std::string
SuccessString() const;
virtual std::string
......@@ -193,6 +206,8 @@ class Operations : public std::enable_shared_from_this<Operations> {
std::condition_variable finish_cond_;
ID_TYPE uid_;
OperationsType type_;
double execution_time_ = 0;
std::atomic_bool aborted_ = false;
};
template <typename StepT>
......
......@@ -14,6 +14,7 @@
#include "codecs/Codec.h"
#include "db/Utils.h"
#include "db/meta/MetaFactory.h"
#include "db/meta/MetaSession.h"
#include "db/snapshot/ResourceContext.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/Resources.h"
......@@ -23,6 +24,7 @@
#include "utils/Log.h"
#include "utils/Status.h"
#include <fiu-local.h>
#include <stdlib.h>
#include <time.h>
#include <any>
......@@ -47,6 +49,17 @@ namespace milvus {
namespace engine {
namespace snapshot {
class Store;
struct ApplyContext {
using ApplyFunc = std::function<Status(int64_t&)>;
using SuccessCBT = std::function<Status(ID_TYPE)>;
using ErrorCBT = std::function<Status(Status)>;
using TimeoutCBT = std::function<Status(std::shared_ptr<Store>)>;
SuccessCBT on_succes_cb = {};
ErrorCBT on_error_cb = {};
TimeoutCBT on_timeout_cb = {};
};
class Store : public std::enable_shared_from_this<Store> {
public:
using Ptr = typename std::shared_ptr<Store>;
......@@ -78,7 +91,7 @@ class Store : public std::enable_shared_from_this<Store> {
template <typename OpT>
Status
ApplyOperation(OpT& op) {
ApplyOperation(OpT& op, ApplyContext& context) {
auto session = adapter_->CreateSession();
std::apply(
[&](auto&... step_context_set) {
......@@ -89,8 +102,17 @@ class Store : public std::enable_shared_from_this<Store> {
ID_TYPE result_id;
auto status = session->Commit(result_id);
if (status.ok()) {
op.SetStepResult(result_id);
fiu_do_on("Store.ApplyOperation.mock_timeout", { status = Status(SS_TIMEOUT, "Mock Timeout"); });
if (status.ok() && context.on_succes_cb) {
return context.on_succes_cb(result_id);
} else if (status.code() == SS_TIMEOUT && context.on_timeout_cb) {
return context.on_timeout_cb(shared_from_this());
/* return context.on_timeout_cb( */
/* std::bind(static_cast<Status(meta::MetaSession::*)(int64_t&)>( */
/* &meta::MetaSession::Commit), session, std::placeholders::_1)); */
} else if (context.on_error_cb) {
return context.on_error_cb(status);
}
return status;
......@@ -374,7 +396,6 @@ class Store : public std::enable_shared_from_this<Store> {
};
using StorePtr = Store::Ptr;
} // namespace snapshot
} // namespace engine
} // namespace milvus
......@@ -132,5 +132,7 @@ constexpr ErrorCode SS_NOT_ACTIVE_ERROR = ToSSErrorCode(6);
constexpr ErrorCode SS_CONSTRAINT_CHECK_ERROR = ToSSErrorCode(7);
constexpr ErrorCode SS_INVALID_ARGUMENT_ERROR = ToSSErrorCode(8);
constexpr ErrorCode SS_OPERATION_PENDING = ToSSErrorCode(9);
constexpr ErrorCode SS_TIMEOUT = ToSSErrorCode(10);
constexpr ErrorCode SS_NOT_COMMITED = ToSSErrorCode(11);
} // namespace milvus
......@@ -19,7 +19,7 @@
#include <string>
#include <thread>
#include <utility>
#include <fiu-local.h>
#include <fiu-control.h>
#include <random>
#include "cache/CpuCacheMgr.h"
......@@ -182,10 +182,13 @@ BaseTest::SnapshotStop() {
void
BaseTest::SetUp() {
InitLog();
fiu_init(0);
fiu_enable_random("Store.ApplyOperation.mock_timeout", 1, nullptr, 0, 0.2);
}
void
BaseTest::TearDown() {
fiu_disable("Store.ApplyOperation.mock_timeout");
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册