未验证 提交 ec4ecdcd 编写于 作者: X XuPeng-SH 提交者: GitHub

optimization for read node sync metadata (#4556)

* optimize read-write mode sync logic
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

update 2
Signed-off-by: NXuPeng-SH <xupeng3112@163.com>

optimize read node sync logic
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* update for optimization
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
上级 56d557e9
......@@ -936,15 +936,16 @@ GetSnapshotIDsOperation::GetIDs() const {
return ids_;
}
GetAllActiveSnapshotIDsOperation::GetAllActiveSnapshotIDsOperation()
: BaseT(OperationContext(), ScopedSnapshotT(), OperationsType::O_Compound) {
GetAllActiveSnapshotIDsOperation::GetAllActiveSnapshotIDsOperation(const RangeContext& context)
: BaseT(OperationContext(), ScopedSnapshotT(), OperationsType::O_Compound), updated_time_range_(context) {
}
Status
GetAllActiveSnapshotIDsOperation::DoExecute(StorePtr store) {
std::vector<CollectionCommitPtr> ccs;
STATUS_CHECK(store->GetActiveResourcesByAttrs<CollectionCommit>(ccs, {meta::F_ID, meta::F_COLLECTON_ID}));
/* STATUS_CHECK(store->GetActiveResources<CollectionCommit>(ccs)); */
STATUS_CHECK(store->GetActiveResourcesByAttrs<CollectionCommit>(
ccs, {meta::F_ID, meta::F_COLLECTON_ID}, updated_time_range_.upper_bound_, updated_time_range_.low_bound_));
for (auto& cc : ccs) {
auto cid = cc->GetCollectionId();
auto it = cid_ccid_.find(cid);
......@@ -953,6 +954,7 @@ GetAllActiveSnapshotIDsOperation::DoExecute(StorePtr store) {
} else {
cid_ccid_[cid] = std::max(it->second, cc->GetID());
}
latest_update_ = std::max(latest_update_, cc->GetUpdatedTime());
}
return Status::OK();
}
......
......@@ -11,6 +11,7 @@
#pragma once
#include <limits>
#include <map>
#include <set>
#include <string>
......@@ -331,15 +332,22 @@ class GetAllActiveSnapshotIDsOperation : public Operations {
public:
using BaseT = Operations;
GetAllActiveSnapshotIDsOperation();
explicit GetAllActiveSnapshotIDsOperation(const RangeContext& context);
Status DoExecute(StorePtr) override;
const std::map<ID_TYPE, ID_TYPE>&
GetIDs() const;
TS_TYPE
GetLatestUpdatedTime() const {
return latest_update_;
}
protected:
std::map<ID_TYPE, ID_TYPE> cid_ccid_;
RangeContext updated_time_range_;
TS_TYPE latest_update_ = std::numeric_limits<TS_TYPE>::min();
};
class DropCollectionOperation : public CompoundBaseOperation<DropCollectionOperation> {
......
......@@ -12,6 +12,7 @@
#pragma once
#include <iostream>
#include <limits>
#include <map>
#include <string>
#include <unordered_map>
......@@ -25,6 +26,11 @@ namespace milvus {
namespace engine {
namespace snapshot {
struct RangeContext {
TS_TYPE upper_bound_ = std::numeric_limits<TS_TYPE>::max();
TS_TYPE low_bound_ = std::numeric_limits<TS_TYPE>::min();
};
struct PartitionContext {
std::string name;
ID_TYPE id = 0;
......
......@@ -19,13 +19,14 @@
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/SnapshotPolicyFactory.h"
#include "utils/CommonUtil.h"
#include "utils/TimeRecorder.h"
#include "utils/TimerContext.h"
#include <utility>
namespace milvus::engine::snapshot {
static constexpr int DEFAULT_READER_TIMER_INTERVAL_US = 100 * 1000;
static constexpr int DEFAULT_READER_TIMER_INTERVAL_US = 120 * 1000;
static constexpr int DEFAULT_WRITER_TIMER_INTERVAL_US = 2000 * 1000;
Status
......@@ -245,9 +246,11 @@ Snapshots::GetHolderNoLock(ID_TYPE collection_id, SnapshotHolderPtr& holder) con
}
void
Snapshots::OnReaderTimer(const boost::system::error_code& ec) {
Snapshots::OnReaderTimer(const boost::system::error_code& ec, TimerContext* timer) {
std::chrono::time_point<std::chrono::system_clock> start = std::chrono::system_clock::now();
auto op = std::make_shared<GetAllActiveSnapshotIDsOperation>();
RangeContext ctx;
ctx.low_bound_ = latest_updated_;
auto op = std::make_shared<GetAllActiveSnapshotIDsOperation>(ctx);
auto status = (*op)(store_);
if (!status.ok()) {
LOG_SERVER_ERROR_ << "Snapshots::OnReaderTimer::GetAllActiveSnapshotIDsOperation failed: " << status.message();
......@@ -260,11 +263,15 @@ Snapshots::OnReaderTimer(const boost::system::error_code& ec) {
}
return;
}
latest_updated_ = std::max(op->GetLatestUpdatedTime(), latest_updated_.load());
auto ids = op->GetIDs();
ScopedSnapshotT ss;
std::set<ID_TYPE> alive_cids;
std::set<ID_TYPE> this_invalid_cids;
bool diff_found = false;
for (auto& [cid, ccid] : ids) {
status = LoadSnapshot(store_, ss, cid, ccid);
if (status.code() == SS_NOT_ACTIVE_ERROR) {
......@@ -347,13 +354,20 @@ Snapshots::OnReaderTimer(const boost::system::error_code& ec) {
auto exe_time =
std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now() - start)
.count();
if (exe_time > DEFAULT_READER_TIMER_INTERVAL_US) {
LOG_ENGINE_WARNING_ << "OnReaderTimer takes too much time: " << exe_time << " us";
reader_time_acc_(exe_time);
if (boost::accumulators::count(reader_time_acc_) >= 100) {
auto acc = reader_time_acc_;
reader_time_acc_ = {};
auto mean_val = boost::accumulators::mean(acc);
auto min_val = boost::accumulators::min(acc);
auto max_val = boost::accumulators::max(acc);
LOG_SERVER_INFO_ << "OnReaderTimer Stastics [US]: MEAN=" << mean_val << ", MIN=" << min_val
<< ", MAX=" << max_val;
}
}
void
Snapshots::OnWriterTimer(const boost::system::error_code& ec) {
Snapshots::OnWriterTimer(const boost::system::error_code& ec, TimerContext* timer) {
// Single mode
if (!config.cluster.enable()) {
std::unique_lock<std::shared_timed_mutex> lock(inactive_mtx_);
......@@ -392,7 +406,7 @@ Snapshots::RegisterTimers(TimerManager* mgr) {
ctx.interval_us = low_limit;
}
LOG_SERVER_INFO_ << "OnReaderTimer INTERVAL: " << ctx.interval_us << " US";
ctx.handler = std::bind(&Snapshots::OnReaderTimer, this, std::placeholders::_1);
ctx.handler = std::bind(&Snapshots::OnReaderTimer, this, std::placeholders::_1, std::placeholders::_2);
mgr->AddTimer(ctx);
} else {
TimerContext::Context ctx;
......@@ -410,7 +424,7 @@ Snapshots::RegisterTimers(TimerManager* mgr) {
}
LOG_SERVER_INFO_ << "OnWriterTimer INTERVAL: " << ctx.interval_us << " US";
ctx.handler = std::bind(&Snapshots::OnWriterTimer, this, std::placeholders::_1);
ctx.handler = std::bind(&Snapshots::OnWriterTimer, this, std::placeholders::_1, std::placeholders::_2);
mgr->AddTimer(ctx);
}
return Status::OK();
......
......@@ -11,7 +11,15 @@
#pragma once
#include <algorithm>
#include <atomic>
#include <boost/accumulators/accumulators.hpp>
#include <boost/accumulators/statistics/count.hpp>
#include <boost/accumulators/statistics/max.hpp>
#include <boost/accumulators/statistics/mean.hpp>
#include <boost/accumulators/statistics/min.hpp>
#include <boost/accumulators/statistics/stats.hpp>
#include <limits>
#include <map>
#include <memory>
#include <mutex>
......@@ -88,9 +96,9 @@ class Snapshots {
DoDropCollection(ScopedSnapshotT& ss, const LSN_TYPE& lsn);
void
OnReaderTimer(const boost::system::error_code&);
OnReaderTimer(const boost::system::error_code&, TimerContext*);
void
OnWriterTimer(const boost::system::error_code&);
OnWriterTimer(const boost::system::error_code&, TimerContext*);
Status
LoadNoLock(StorePtr store, ID_TYPE collection_id, SnapshotHolderPtr& holder);
......@@ -105,6 +113,14 @@ class Snapshots {
std::map<ID_TYPE, SnapshotHolderPtr> inactive_holders_;
std::set<ID_TYPE> invalid_ssid_;
StorePtr store_;
std::atomic<TS_TYPE> latest_updated_ = std::numeric_limits<TS_TYPE>::min();
boost::accumulators::accumulator_set<
double, boost::accumulators::stats<boost::accumulators::tag::count, boost::accumulators::tag::mean,
boost::accumulators::tag::max, boost::accumulators::tag::min>
/* boost::accumulators::features<boost::accumulators::tag::count> */
>
reader_time_acc_;
};
} // namespace milvus::engine::snapshot
......@@ -14,6 +14,8 @@
#include "codecs/Codec.h"
#include "db/Utils.h"
#include "db/meta/MetaFactory.h"
#include "db/meta/condition/MetaFilter.h"
#include "db/meta/condition/MetaRelation.h"
#include "db/snapshot/ResourceContext.h"
#include "db/snapshot/ResourceHelper.h"
#include "db/snapshot/ResourceTypes.h"
......@@ -31,6 +33,7 @@
#include <functional>
#include <iomanip>
#include <iostream>
#include <limits>
#include <map>
#include <memory>
#include <set>
......@@ -181,9 +184,16 @@ class Store : public std::enable_shared_from_this<Store> {
template <typename ResourceT>
Status
GetActiveResourcesByAttrs(std::vector<typename ResourceT::Ptr>& return_vs,
const std::vector<std::string>& target_attrs) {
const std::vector<std::string>& target_attrs, int64_t upper_bound, int64_t low_bound) {
std::vector<State> filter_states = {State::ACTIVE};
return adapter_->SelectByAttrs<ResourceT>(StateField::Name, filter_states, target_attrs, return_vs);
auto relation = meta::ONE_(meta::Range_<ResourceT, StateField>(meta::Range::EQ, State::ACTIVE));
if (upper_bound < std::numeric_limits<TS_TYPE>::max()) {
relation = meta::AND_(relation, meta::Range_<ResourceT, UpdatedOnField>(meta::Range::LTE, upper_bound));
}
if (low_bound > std::numeric_limits<TS_TYPE>::min()) {
relation = meta::AND_(relation, meta::Range_<ResourceT, UpdatedOnField>(meta::Range::GTE, low_bound));
}
return adapter_->Query<ResourceT>(relation, return_vs);
}
template <typename ResourceT>
......
......@@ -24,7 +24,7 @@
namespace milvus {
struct TimerContext {
using HandlerT = std::function<void(const boost::system::error_code&)>;
using HandlerT = std::function<void(const boost::system::error_code&, TimerContext* ctx)>;
struct Context {
/* Context(int interval_us, HandlerT& handler, ThreadPoolPtr pool = nullptr) */
/* : interval_(interval_us), handler_(handler), timer_(io, interval_), pool_(pool) { */
......@@ -58,7 +58,7 @@ struct TimerContext {
inline void
TimerContext::Reschedule(const boost::system::error_code& ec) {
try {
pool_->enqueue(handler_, ec);
pool_->enqueue(handler_, ec, this);
} catch (std::exception& ex) {
LOG_SERVER_ERROR_ << "Fail to enqueue handler: " << std::string(ex.what());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册