未验证 提交 d0ff3906 编写于 作者: X XuPeng-SH 提交者: GitHub

Cherry pick master 20201204 (#4370)

* Phase 1 of implementation of read-write seperation (#4214)

* (db/snapshot): add SnapshotPolicy
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
Signed-off-by: NXuPeng-SH <xupeng3112@163.com>

* (db/snapshot): add some snapshot policy
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
Signed-off-by: NXuPeng-SH <xupeng3112@163.com>

* (db/snapshot): add policy factory
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
Signed-off-by: NXuPeng-SH <xupeng3112@163.com>

* add ut
Signed-off-by: NXuPeng-SH <xupeng3112@163.com>

* implement ut for snapshot policy
Signed-off-by: NXuPeng-SH <xupeng3112@163.com>

* fix clang format
Signed-off-by: NXuPeng-SH <xupeng3112@163.com>

* add some comments
Signed-off-by: NXuPeng-SH <xupeng3112@163.com>

* Phase 2 of implementation of read-write seperation (#4299)

* ro mode impl 1
Signed-off-by: NXuPeng-SH <xupeng3112@163.com>

* ro mode impl 2
Signed-off-by: NXuPeng-SH <xupeng3112@163.com>

* add TimerContext.h
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* Add Timer in Server
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* update for timer integration
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* update for timer integration 2
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* update for timer integration 3
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* update for timer integration 4
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* update for timer integration 5
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* update for timer integration 6
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* update for timer integration 7
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* update for timer integration 8
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* update for timer integration 9
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* update for timer integration 10
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* update for timer integration 11
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* update for timer integration 12
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* update for timer integration 13
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* update for timer integration 14
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* update for timer integration 15
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* update for timer integration 16
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* fix compile error from latest compiler version (#4359)
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* fix format
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
上级 67fa089d
......@@ -85,6 +85,11 @@ InitConfig() {
{"general.timezone", CreateStringConfig_("general.timezone", _MODIFIABLE, &config.general.timezone.value,
"UTC+8", is_timezone_valid, nullptr)},
{"general.meta_uri", CreateStringConfig("general.meta_uri", &config.general.meta_uri.value, "sqlite://:@:/")},
{"general.stale_snapshots_count",
CreateIntegerConfig("general.stale_snapshots_count", 0, 100, &config.general.stale_snapshots_count.value, 0)},
{"general.stale_snapshots_duration",
CreateIntegerConfig("general.stale_snapshots_duration", 0, std::numeric_limits<int64_t>::max(),
&config.general.stale_snapshots_duration.value, 10)},
/* network */
{"network.bind.address",
......@@ -191,6 +196,7 @@ InitConfig() {
}
const char* config_file_template = R"(
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
......@@ -226,9 +232,19 @@ cluster:
# | Keep 'dialect://:@:/', 'dialect' can be either 'sqlite' or | | |
# | 'mysql', replace other texts with real values. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# stale_snapshots_count| Specify how many stale snapshots to be kept before GC. It | Integer | 0 |
# | is ignored if deployed with cluster enabled | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# stale_snapshots_duration | | Integer | 10 |
# | Specify how long the stale snapshots can be GC'ed. The unit| | |
# | is second. It is only effective if deployed with cluster | | |
# | enabled and cluster.role is rw | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
general:
timezone: @general.timezone@
meta_uri: @general.meta_uri@
stale_snapshots_count: @general.stale_snapshots_count@
stale_snapshots_duration: @general.stale_snapshots_duration@
#----------------------+------------------------------------------------------------+------------+-----------------+
# Network Config | Description | Type | Default |
......
......@@ -89,6 +89,8 @@ struct ServerConfig {
struct General {
String timezone{"unknown"};
String meta_uri{"unknown"};
Integer stale_snapshots_count{0};
Integer stale_snapshots_duration{0};
} general;
struct Network {
......
......@@ -50,6 +50,7 @@ constexpr uint64_t BACKGROUND_INDEX_INTERVAL = 1;
constexpr uint64_t WAIT_BUILD_INDEX_INTERVAL = 5;
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
static const Status PERMISSION_ERROR = Status(DB_PERMISSION_ERROR, "Write permission needed!");
} // namespace
#define CHECK_AVAILABLE \
......@@ -57,6 +58,16 @@ static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown
return SHUTDOWN_ERROR; \
}
#define WRITE_PERMISSION_NEEDED_RETURN_STATUS \
if (options_.mode_ == DBOptions::MODE::CLUSTER_READONLY) { \
return PERMISSION_ERROR; \
}
#define WRITE_PERMISSION_NEEDED_NO_RETURN \
if (options_.mode_ == DBOptions::MODE::CLUSTER_READONLY) { \
return; \
}
DBImpl::DBImpl(const DBOptions& options)
: options_(options), available_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1), index_task_tracker_(3) {
mem_mgr_ = MemManagerFactory::Build(options_);
......@@ -98,13 +109,15 @@ DBImpl::Start() {
// server may be closed unexpected, these un-merge files need to be merged when server restart
// and soft-delete files need to be deleted when server restart
snapshot::IDS_TYPE collection_ids;
snapshot::Snapshots::GetInstance().GetCollectionIds(collection_ids);
std::set<int64_t> merge_ids;
for (auto id : collection_ids) {
merge_ids.insert(id);
if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
snapshot::IDS_TYPE collection_ids;
snapshot::Snapshots::GetInstance().GetCollectionIds(collection_ids);
std::set<int64_t> merge_ids;
for (auto id : collection_ids) {
merge_ids.insert(id);
}
StartMergeTask(merge_ids, true);
}
StartMergeTask(merge_ids, true);
// for distribute version, some nodes are read only
if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
......@@ -164,6 +177,7 @@ DBImpl::Stop() {
Status
DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
WRITE_PERMISSION_NEEDED_RETURN_STATUS;
CHECK_AVAILABLE
auto ctx = context;
......@@ -202,6 +216,7 @@ DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
Status
DBImpl::DropCollection(const std::string& collection_name) {
WRITE_PERMISSION_NEEDED_RETURN_STATUS;
CHECK_AVAILABLE
LOG_ENGINE_DEBUG_ << "Prepare to drop collection " << collection_name;
......@@ -278,6 +293,7 @@ DBImpl::CountEntities(const std::string& collection_name, int64_t& row_count) {
Status
DBImpl::CreatePartition(const std::string& collection_name, const std::string& partition_name) {
WRITE_PERMISSION_NEEDED_RETURN_STATUS;
CHECK_AVAILABLE
snapshot::ScopedSnapshotT ss;
......@@ -297,6 +313,7 @@ DBImpl::CreatePartition(const std::string& collection_name, const std::string& p
Status
DBImpl::DropPartition(const std::string& collection_name, const std::string& partition_name) {
WRITE_PERMISSION_NEEDED_RETURN_STATUS;
CHECK_AVAILABLE
snapshot::ScopedSnapshotT ss;
......@@ -350,6 +367,7 @@ DBImpl::ListPartitions(const std::string& collection_name, std::vector<std::stri
Status
DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::string& collection_name,
const std::string& field_name, const CollectionIndex& index) {
WRITE_PERMISSION_NEEDED_RETURN_STATUS;
CHECK_AVAILABLE
SetThreadName("create_index");
LOG_ENGINE_DEBUG_ << "Create index for collection: " << collection_name << " field: " << field_name;
......@@ -444,6 +462,7 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
Status
DBImpl::DropIndex(const std::string& collection_name, const std::string& field_name) {
WRITE_PERMISSION_NEEDED_RETURN_STATUS;
CHECK_AVAILABLE
LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_name << " field: " << field_name;
......@@ -475,6 +494,7 @@ DBImpl::DescribeIndex(const std::string& collection_name, const std::string& fie
Status
DBImpl::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
idx_t op_id) {
WRITE_PERMISSION_NEEDED_RETURN_STATUS;
CHECK_AVAILABLE
if (data_chunk == nullptr) {
......@@ -622,6 +642,7 @@ DBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_ar
Status
DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, idx_t op_id) {
WRITE_PERMISSION_NEEDED_RETURN_STATUS;
CHECK_AVAILABLE
snapshot::ScopedSnapshotT ss;
......@@ -766,6 +787,7 @@ DBImpl::LoadCollection(const server::ContextPtr& context, const std::string& col
Status
DBImpl::Flush(const std::string& collection_name) {
WRITE_PERMISSION_NEEDED_RETURN_STATUS;
CHECK_AVAILABLE
Status status;
......@@ -788,6 +810,7 @@ DBImpl::Flush(const std::string& collection_name) {
Status
DBImpl::Flush() {
WRITE_PERMISSION_NEEDED_RETURN_STATUS;
CHECK_AVAILABLE
LOG_ENGINE_DEBUG_ << "Begin flush all collections";
......@@ -799,6 +822,7 @@ DBImpl::Flush() {
Status
DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_name, double threshold) {
WRITE_PERMISSION_NEEDED_RETURN_STATUS;
CHECK_AVAILABLE
LOG_ENGINE_DEBUG_ << "Before compacting, wait for build index thread to finish...";
......
......@@ -11,6 +11,7 @@
#include "db/snapshot/CompoundOperations.h"
#include <algorithm>
#include <map>
#include <memory>
#include <sstream>
......@@ -934,6 +935,31 @@ GetSnapshotIDsOperation::GetIDs() const {
return ids_;
}
GetAllActiveSnapshotIDsOperation::GetAllActiveSnapshotIDsOperation()
: BaseT(OperationContext(), ScopedSnapshotT(), OperationsType::O_Compound) {
}
Status
GetAllActiveSnapshotIDsOperation::DoExecute(StorePtr store) {
std::vector<CollectionCommitPtr> ccs;
STATUS_CHECK(store->GetActiveResources<CollectionCommit>(ccs));
for (auto& cc : ccs) {
auto cid = cc->GetCollectionId();
auto it = cid_ccid_.find(cid);
if (it == cid_ccid_.end()) {
cid_ccid_[cid] = cc->GetID();
} else {
cid_ccid_[cid] = std::max(it->second, cc->GetID());
}
}
return Status::OK();
}
const std::map<ID_TYPE, ID_TYPE>&
GetAllActiveSnapshotIDsOperation::GetIDs() const {
return cid_ccid_;
}
GetCollectionIDsOperation::GetCollectionIDsOperation(bool reversed)
: BaseT(OperationContext(), ScopedSnapshotT()), reversed_(reversed) {
}
......
......@@ -327,6 +327,21 @@ class GetCollectionIDsOperation : public Operations {
IDS_TYPE ids_;
};
class GetAllActiveSnapshotIDsOperation : public Operations {
public:
using BaseT = Operations;
GetAllActiveSnapshotIDsOperation();
Status DoExecute(StorePtr) override;
const std::map<ID_TYPE, ID_TYPE>&
GetIDs() const;
protected:
std::map<ID_TYPE, ID_TYPE> cid_ccid_;
};
class DropCollectionOperation : public CompoundBaseOperation<DropCollectionOperation> {
public:
using BaseT = CompoundBaseOperation<DropCollectionOperation>;
......
......@@ -16,6 +16,7 @@
#include <string>
#include <vector>
#include "config/ServerConfig.h"
#include "db/snapshot/MetaEvent.h"
#include "db/snapshot/Operations.h"
#include "db/snapshot/ResourceHelper.h"
......@@ -31,6 +32,9 @@ class InActiveResourcesGCEvent : public GCEvent, public Operations {
using Ptr = std::shared_ptr<InActiveResourcesGCEvent>;
InActiveResourcesGCEvent() : Operations(OperationContext(), ScopedSnapshotT(), OperationsType::O_Leaf) {
auto is_cluster = config.cluster.enable();
auto role = config.cluster.role();
read_only_ = is_cluster && (role == ClusterRole::RO);
}
~InActiveResourcesGCEvent() = default;
......@@ -43,6 +47,9 @@ class InActiveResourcesGCEvent : public GCEvent, public Operations {
Status
OnExecute(StorePtr store) override {
LOG_ENGINE_INFO_ << "Executing InActiveResourcesGCEvent";
if (read_only_) {
return Status::OK();
}
STATUS_CHECK(ClearInActiveResources<Collection>(store));
STATUS_CHECK(ClearInActiveResources<CollectionCommit>(store));
......@@ -89,6 +96,8 @@ class InActiveResourcesGCEvent : public GCEvent, public Operations {
return Status::OK();
}
bool read_only_ = false;
};
} // namespace snapshot
......
......@@ -16,6 +16,7 @@
#include <set>
#include <string>
#include "config/ServerConfig.h"
#include "db/snapshot/MetaEvent.h"
#include "db/snapshot/Operations.h"
#include "db/snapshot/ResourceHelper.h"
......@@ -30,12 +31,18 @@ class ResourceGCEvent : public GCEvent {
using Ptr = std::shared_ptr<ResourceGCEvent>;
explicit ResourceGCEvent(typename ResourceT::Ptr res) : res_(res) {
auto is_cluster = config.cluster.enable();
auto role = config.cluster.role();
read_only_ = is_cluster && (role == ClusterRole::RO);
}
~ResourceGCEvent() = default;
Status
Process(StorePtr store) override {
if (read_only_) {
return Status::OK();
}
/* mark resource as 'deleted' in meta */
auto sd_op = std::make_shared<SoftDeleteOperation<ResourceT>>(res_->GetID());
STATUS_CHECK((*sd_op)(store));
......@@ -93,6 +100,8 @@ class ResourceGCEvent : public GCEvent {
private:
typename ResourceT::Ptr res_;
bool read_only_ = false;
};
} // namespace milvus::engine::snapshot
......@@ -28,6 +28,8 @@ CollectionCommitOperation::DoExecute(StorePtr store) {
return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
}
resource_ = std::make_shared<CollectionCommit>(*prev_resource);
resource_->SetCreatedTime(GetMicroSecTimeStamp());
resource_->SetUpdatedTime(resource_->GetCreatedTime());
resource_->ResetStatus();
row_cnt = resource_->GetRowCount();
size = resource_->GetSize();
......@@ -116,6 +118,8 @@ PartitionCommitOperation::DoExecute(StorePtr store) {
PartitionPtr partition = nullptr;
if (prev_resource) {
resource_ = std::make_shared<PartitionCommit>(*prev_resource);
resource_->SetCreatedTime(GetMicroSecTimeStamp());
resource_->SetUpdatedTime(resource_->GetCreatedTime());
resource_->SetID(0);
resource_->ResetStatus();
row_cnt = resource_->GetRowCount();
......@@ -152,6 +156,8 @@ PartitionCommitOperation::DoExecute(StorePtr store) {
}
}
partition = GetStartedSS()->GetResource<Partition>(prev_resource->GetPartitionId());
partition->SetCreatedTime(GetMicroSecTimeStamp());
partition->SetUpdatedTime(partition->GetCreatedTime());
} else {
if (!context_.new_partition) {
std::stringstream emsg;
......@@ -240,6 +246,8 @@ SegmentCommitOperation::DoExecute(StorePtr store) {
int64_t size = 0;
if (prev_resource) {
resource_ = std::make_shared<SegmentCommit>(*prev_resource);
resource_->SetCreatedTime(GetMicroSecTimeStamp());
resource_->SetUpdatedTime(resource_->GetCreatedTime());
resource_->SetID(0);
resource_->ResetStatus();
size = resource_->GetSize();
......@@ -303,6 +311,8 @@ FieldCommitOperation::DoExecute(StorePtr store) {
if (prev_resource) {
resource_ = std::make_shared<FieldCommit>(*prev_resource);
resource_->SetCreatedTime(GetMicroSecTimeStamp());
resource_->SetUpdatedTime(resource_->GetCreatedTime());
resource_->SetID(0);
resource_->ResetStatus();
for (auto& fe : context_.stale_field_elements) {
......@@ -337,6 +347,8 @@ SchemaCommitOperation::DoExecute(StorePtr store) {
}
resource_ = std::make_shared<SchemaCommit>(*prev_resource);
resource_->SetCreatedTime(GetMicroSecTimeStamp());
resource_->SetUpdatedTime(resource_->GetCreatedTime());
resource_->SetID(0);
resource_->ResetStatus();
for (auto& fc : context_.stale_field_commits) {
......
......@@ -17,15 +17,18 @@ namespace milvus {
namespace engine {
namespace snapshot {
SnapshotHolder::SnapshotHolder(ID_TYPE collection_id, GCHandler gc_handler, size_t num_versions)
: collection_id_(collection_id), num_versions_(num_versions), gc_handler_(gc_handler) {
/* SnapshotHolder::SnapshotHolder(ID_TYPE collection_id, GCHandler gc_handler, size_t num_versions) */
/* : collection_id_(collection_id), num_versions_(num_versions), gc_handler_(gc_handler) { */
/* } */
SnapshotHolder::SnapshotHolder(ID_TYPE collection_id, SnapshotPolicyPtr policy, GCHandler gc_handler)
: collection_id_(collection_id), policy_(policy), gc_handler_(gc_handler) {
}
SnapshotHolder::~SnapshotHolder() {
bool release = false;
for (auto& ss_kv : active_) {
if (!ss_kv.second->GetCollection()->IsActive()) {
ReadyForRelease(ss_kv.second);
for (auto& [_, ss] : active_) {
if (!ss->GetCollection()->IsActive()) {
ReadyForRelease(ss);
release = true;
}
}
......@@ -106,12 +109,42 @@ SnapshotHolder::Get(ScopedSnapshotT& ss, ID_TYPE id, bool scoped) const {
return status;
}
int
SnapshotHolder::NumOfSnapshot() const {
std::unique_lock<std::mutex> lock(mutex_);
return active_.size();
}
bool
SnapshotHolder::IsActive(Snapshot::Ptr& ss) {
auto collection = ss->GetCollection();
return collection && collection->IsActive();
}
Status
SnapshotHolder::ApplyEject() {
Status status;
Snapshot::Ptr oldest_ss;
{
std::unique_lock<std::mutex> lock(mutex_);
if (active_.size() == 0) {
return Status(SS_EMPTY_HOLDER,
"SnapshotHolder::ApplyEject: Empty holder found for " + std::to_string(collection_id_));
}
if (!policy_->ShouldEject(active_, false)) {
return status;
}
auto oldest_it = active_.find(min_id_);
oldest_ss = oldest_it->second;
active_.erase(oldest_it);
if (active_.size() > 0) {
min_id_ = active_.begin()->first;
}
}
ReadyForRelease(oldest_ss);
return status;
}
Status
SnapshotHolder::Add(StorePtr store, ID_TYPE id) {
Status status;
......@@ -153,7 +186,10 @@ SnapshotHolder::Add(StorePtr store, ID_TYPE id) {
}
active_[id] = ss;
if (active_.size() <= num_versions_) {
/* if (active_.size() <= num_versions_) { */
/* return status; */
/* } */
if (!policy_->ShouldEject(active_)) {
return status;
}
......
......@@ -16,6 +16,7 @@
#include <memory>
#include <vector>
#include "db/snapshot/Snapshot.h"
#include "db/snapshot/SnapshotPolicy.h"
namespace milvus {
namespace engine {
......@@ -25,7 +26,7 @@ class SnapshotHolder {
public:
using ScopedPtr = std::shared_ptr<ScopedSnapshotT>;
explicit SnapshotHolder(ID_TYPE collection_id, GCHandler gc_handler = nullptr, size_t num_versions = 1);
explicit SnapshotHolder(ID_TYPE collection_id, SnapshotPolicyPtr policy, GCHandler gc_handler = nullptr);
ID_TYPE
GetID() const {
......@@ -44,14 +45,19 @@ class SnapshotHolder {
return Status::OK();
}
// Return how many snapshots in active
int
NumOfSnapshot() const;
bool
IsActive(Snapshot::Ptr& ss);
Status
ApplyEject();
~SnapshotHolder();
private:
/* Status */
/* LoadNoLock(ID_TYPE collection_commit_id, CollectionCommitPtr& cc); */
Status
LoadNoLock(ID_TYPE collection_commit_id, CollectionCommitPtr& cc, StorePtr store);
......@@ -67,8 +73,7 @@ class SnapshotHolder {
ID_TYPE min_id_ = std::numeric_limits<ID_TYPE>::max();
ID_TYPE max_id_ = std::numeric_limits<ID_TYPE>::min();
std::map<ID_TYPE, Snapshot::Ptr> active_;
std::vector<Snapshot::Ptr> to_release_;
size_t num_versions_ = 1;
SnapshotPolicyPtr policy_;
GCHandler gc_handler_;
};
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/snapshot/SnapshotPolicy.h"
#include "db/Utils.h"
namespace milvus {
namespace engine {
namespace snapshot {
SnapshotNumPolicy::SnapshotNumPolicy(size_t num) : num_(num) {
}
bool
SnapshotNumPolicy::ShouldEject(const MapT& ids, bool alive) {
bool should = true;
if (ids.size() <= num_) {
should = false;
}
return should;
}
SnapshotDurationPolicy::SnapshotDurationPolicy(TS_TYPE us) : us_(us) {
}
bool
SnapshotDurationPolicy::ShouldEject(const MapT& ids, bool alive) {
if (ids.size() == 0 || (alive && ids.size() <= 1)) {
return false;
}
bool should = true;
auto ss = ids.begin()->second;
auto now_us = GetMicroSecTimeStamp();
if (now_us - ss->GetCollectionCommit()->GetCreatedTime() < us_) {
should = false;
}
/* LOG_ENGINE_DEBUG_ << " now= " << now_us << " should=" << should; */
/* LOG_ENGINE_DEBUG_ << "VVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVV " << ids.size() << " xxxxx " << should; */
/* for (auto it : ids) { */
/* LOG_ENGINE_DEBUG_ << " id=" << it.first << " ts=" << it.second->GetCollectionCommit()->GetCreatedTime(); */
/* } */
return should;
}
} // namespace snapshot
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <map>
#include <memory>
#include "db/snapshot/Snapshot.h"
namespace milvus {
namespace engine {
namespace snapshot {
// A base class for snapshot policy
class SnapshotPolicy {
public:
using MapT = std::map<ID_TYPE, Snapshot::Ptr>;
// Check if should eject any snapshot in ids
virtual bool
ShouldEject(const MapT& ids, bool alive = true) = 0;
virtual ~SnapshotPolicy() {
}
};
using SnapshotPolicyPtr = std::shared_ptr<SnapshotPolicy>;
// A policy that keeps upto specified num of snapshots
class SnapshotNumPolicy : public SnapshotPolicy {
public:
explicit SnapshotNumPolicy(size_t num);
bool
ShouldEject(const MapT& ids, bool alive = true) override;
protected:
// Num of snapshots
size_t num_;
};
// A policy that keeps all snapshots within specified duration
class SnapshotDurationPolicy : public SnapshotPolicy {
public:
explicit SnapshotDurationPolicy(TS_TYPE us);
bool
ShouldEject(const MapT& ids, bool alive = true) override;
protected:
// Duration in us
TS_TYPE us_;
};
} // namespace snapshot
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/snapshot/SnapshotPolicyFactory.h"
#include <fiu/fiu-local.h>
#include "db/Utils.h"
namespace milvus {
namespace engine {
namespace snapshot {
constexpr TS_TYPE US_PER_MS = 1000;
constexpr TS_TYPE MS_PER_S = 1000;
constexpr TS_TYPE US_PER_S = US_PER_MS * MS_PER_S;
SnapshotPolicyPtr
SnapshotPolicyFactory::Build(ServerConfig& server_config) {
auto is_cluster = server_config.cluster.enable();
auto role = server_config.cluster.role();
fiu_do_on("snapshot.policy.ro_cluster", {
is_cluster = true;
role = ClusterRole::RO;
});
fiu_do_on("snapshot.policy.w_cluster", {
is_cluster = true;
role = ClusterRole::RW;
});
// If server is working with cluster disabled, apply SnapshotNumPolicy
// If server is working with cluster enabled and the role is ClusterRole::RO, apply SnapshotNumPolicy
if (!is_cluster | (role == ClusterRole::RO)) {
auto nums = server_config.general.stale_snapshots_count();
fiu_do_on("snapshot.policy.stale_count_0", { nums = 0; });
fiu_do_on("snapshot.policy.stale_count_1", { nums = 1; });
fiu_do_on("snapshot.policy.stale_count_4", { nums = 4; });
fiu_do_on("snapshot.policy.stale_count_8", { nums = 8; });
fiu_do_on("snapshot.policy.stale_count_16", { nums = 16; });
fiu_do_on("snapshot.policy.stale_count_32", { nums = 32; });
return std::make_shared<SnapshotNumPolicy>(nums + 1);
}
// Otherwise, apply SnapshotDurationPolicy
auto duration = server_config.general.stale_snapshots_duration() * US_PER_S;
fiu_do_on("snapshot.policy.duration_1ms", { duration = MS_PER_S; });
fiu_do_on("snapshot.policy.duration_10ms", { duration = 10 * MS_PER_S; });
fiu_do_on("snapshot.policy.duration_50ms", { duration = 50 * MS_PER_S; });
fiu_do_on("snapshot.policy.duration_100ms", { duration = 100 * MS_PER_S; });
fiu_do_on("snapshot.policy.duration_500ms", { duration = 500 * MS_PER_S; });
return std::make_shared<SnapshotDurationPolicy>(duration);
}
} // namespace snapshot
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <map>
#include <memory>
#include "config/ServerConfig.h"
#include "db/snapshot/SnapshotPolicy.h"
namespace milvus {
namespace engine {
namespace snapshot {
// A class that takes server config, then generates snapshot policy
class SnapshotPolicyFactory {
public:
// Return a SnapshotPolicy
static SnapshotPolicyPtr
Build(ServerConfig& server_config);
};
} // namespace snapshot
} // namespace engine
} // namespace milvus
......@@ -17,13 +17,14 @@
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/InActiveResourcesGCEvent.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/SnapshotPolicyFactory.h"
#include "utils/CommonUtil.h"
#include "utils/TimerContext.h"
namespace milvus::engine::snapshot {
/* Status */
/* Snapshots::DropAll() { */
/* } */
static constexpr int DEFAULT_READER_TIMER_INTERVAL_US = 500 * 1000;
static constexpr int DEFAULT_WRITER_TIMER_INTERVAL_US = 2000 * 1000;
Status
Snapshots::DropCollection(ID_TYPE collection_id, const LSN_TYPE& lsn) {
......@@ -48,9 +49,28 @@ Snapshots::DoDropCollection(ScopedSnapshotT& ss, const LSN_TYPE& lsn) {
op->Push();
auto status = op->GetStatus();
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
name_id_map_.erase(context.collection->GetName());
holders_.erase(context.collection->GetID());
std::vector<SnapshotHolderPtr> holders;
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
alive_cids_.erase(context.collection->GetID());
name_id_map_.erase(context.collection->GetName());
/* holders_.erase(context.collection->GetID()); */
auto h = holders_.find(context.collection->GetID());
if (h != holders_.end()) {
/* inactive_holders_[h->first] = h->second; */
holders.push_back(h->second);
holders_.erase(h);
}
}
{
std::unique_lock<std::shared_timed_mutex> lock(inactive_mtx_);
for (auto& h : holders) {
inactive_holders_[h->GetID()] = h;
}
holders.clear();
}
return status;
}
......@@ -70,6 +90,14 @@ Snapshots::DropPartition(const ID_TYPE& collection_id, const ID_TYPE& partition_
return op->GetStatus();
}
Status
Snapshots::NumOfSnapshot(const std::string& collection_name, int& num) const {
SnapshotHolderPtr holder;
STATUS_CHECK(GetHolder(collection_name, holder));
num = holder->NumOfSnapshot();
return Status::OK();
}
Status
Snapshots::LoadSnapshot(StorePtr store, ScopedSnapshotT& ss, ID_TYPE collection_id, ID_TYPE id, bool scoped) {
SnapshotHolderPtr holder;
......@@ -120,7 +148,9 @@ Snapshots::LoadNoLock(StorePtr store, ID_TYPE collection_id, SnapshotHolderPtr&
emsg << "Snapshots::LoadNoLock: No collection commit is found for collection " << collection_id;
return Status(SS_NOT_FOUND_ERROR, emsg.str());
}
holder = std::make_shared<SnapshotHolder>(collection_id,
auto policy = SnapshotPolicyFactory::Build(config);
holder = std::make_shared<SnapshotHolder>(collection_id, policy,
std::bind(&Snapshots::SnapshotGCCallback, this, std::placeholders::_1));
for (auto c_c_id : collection_commit_ids) {
holder->Add(store, c_c_id);
......@@ -184,6 +214,7 @@ Snapshots::LoadHolder(StorePtr store, const ID_TYPE& collection_id, SnapshotHold
ScopedSnapshotT ss;
STATUS_CHECK(holder->Load(store, ss));
name_id_map_[ss->GetName()] = collection_id;
alive_cids_.insert(collection_id);
return Status::OK();
}
......@@ -200,18 +231,108 @@ Snapshots::GetHolderNoLock(ID_TYPE collection_id, SnapshotHolderPtr& holder) con
return Status::OK();
}
void
Snapshots::OnReaderTimer(const boost::system::error_code& ec) {
auto op = std::make_shared<GetAllActiveSnapshotIDsOperation>();
auto status = (*op)(store_);
if (!status.ok()) {
LOG_SERVER_ERROR_ << "Snapshots::OnReaderTimer failed: " << status.message();
// TODO: Should be monitored
return;
}
auto ids = op->GetIDs();
ScopedSnapshotT ss;
std::set<ID_TYPE> alive_cids;
for (auto& [cid, ccid] : ids) {
/* std::cout << "cid: " << cid << " ccid: " << ccid << std::endl; */
auto status = LoadSnapshot(store_, ss, cid, ccid);
if (!status.ok()) {
LOG_SERVER_ERROR_ << "Snapshots::OnReaderTimer failed: " << status.message();
}
if (ss && ss->GetCollection()->IsActive()) {
alive_cids.insert(cid);
}
/* std::cout << ss->ToString() << std::endl; */
}
auto op2 = std::make_shared<GetCollectionIDsOperation>();
status = (*op2)(store_);
if (!status.ok()) {
LOG_SERVER_ERROR_ << "Snapshots::OnReaderTimer failed: " << status.message();
// TODO: Should be monitored
return;
}
auto aids = op2->GetIDs();
std::set<ID_TYPE> diff;
/* std::set_difference(alive_cids_.begin(), alive_cids_.end(), alive_cids.begin(), alive_cids.end(), */
/* std::inserter(diff, diff.begin())); */
std::set_difference(alive_cids_.begin(), alive_cids_.end(), aids.begin(), aids.end(),
std::inserter(diff, diff.begin()));
for (auto& cid : diff) {
ScopedSnapshotT ss;
status = GetSnapshot(ss, cid);
if (!status.ok()) {
// TODO: Should not happen
continue;
}
alive_cids_.erase(cid);
name_id_map_.erase(ss->GetName());
holders_.erase(cid);
}
}
void
Snapshots::OnWriterTimer(const boost::system::error_code& ec) {
// Single mode
if (!config.cluster.enable()) {
std::unique_lock<std::shared_timed_mutex> lock(inactive_mtx_);
inactive_holders_.clear();
return;
}
// Cluster RW mode
std::unique_lock<std::shared_timed_mutex> lock(inactive_mtx_);
auto it = inactive_holders_.cbegin();
auto it_next = it;
for (; it != inactive_holders_.cend(); it = it_next) {
++it_next;
auto status = it->second->ApplyEject();
if (status.code() == SS_EMPTY_HOLDER) {
inactive_holders_.erase(it);
}
}
}
Status
Snapshots::RegisterTimers(TimerManager* mgr) {
auto is_cluster = config.cluster.enable();
auto role = config.cluster.role();
if (is_cluster && (role == ClusterRole::RO)) {
TimerContext::Context ctx;
ctx.interval_us = DEFAULT_READER_TIMER_INTERVAL_US;
ctx.handler = std::bind(&Snapshots::OnReaderTimer, this, std::placeholders::_1);
mgr->AddTimer(ctx);
} else {
TimerContext::Context ctx;
ctx.interval_us = DEFAULT_WRITER_TIMER_INTERVAL_US;
ctx.handler = std::bind(&Snapshots::OnWriterTimer, this, std::placeholders::_1);
mgr->AddTimer(ctx);
}
return Status::OK();
}
Status
Snapshots::Reset() {
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
holders_.clear();
alive_cids_.clear();
name_id_map_.clear();
to_release_.clear();
inactive_holders_.clear();
return Status::OK();
}
void
Snapshots::SnapshotGCCallback(Snapshot::Ptr ss_ptr) {
/* to_release_.push_back(ss_ptr); */
ss_ptr->UnRef();
LOG_ENGINE_DEBUG_ << "Snapshot " << ss_ptr->GetID() << " ref_count = " << ss_ptr->ref_count() << " To be removed";
}
......@@ -228,16 +349,17 @@ Snapshots::StartService() {
kill(0, SIGUSR1);
}
auto store = snapshot::Store::Build(config.general.meta_uri(), meta_path, codec::Codec::instance().GetSuffixSet());
snapshot::OperationExecutor::Init(store);
store_ = snapshot::Store::Build(config.general.meta_uri(), meta_path, codec::Codec::instance().GetSuffixSet());
snapshot::OperationExecutor::Init(store_);
snapshot::OperationExecutor::GetInstance().Start();
snapshot::EventExecutor::Init(store);
snapshot::EventExecutor::Init(store_);
snapshot::EventExecutor::GetInstance().Start();
return snapshot::Snapshots::GetInstance().Init(store);
return snapshot::Snapshots::GetInstance().Init(store_);
}
Status
Snapshots::StopService() {
Reset();
snapshot::EventExecutor::GetInstance().Stop();
snapshot::OperationExecutor::GetInstance().Stop();
return Status::OK();
......
......@@ -15,6 +15,7 @@
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <shared_mutex>
#include <string>
#include <thread>
......@@ -22,6 +23,8 @@
#include "db/snapshot/SnapshotHolder.h"
#include "db/snapshot/Store.h"
#include "utils/Status.h"
#include "utils/ThreadPool.h"
#include "utils/TimerManager.h"
namespace milvus::engine::snapshot {
......@@ -59,11 +62,17 @@ class Snapshots {
Status
DropPartition(const ID_TYPE& collection_id, const ID_TYPE& partition_id, const LSN_TYPE& lsn);
Status
NumOfSnapshot(const std::string& collection_name, int& num) const;
Status
Reset();
Status Init(StorePtr);
Status
RegisterTimers(TimerManager* mgr);
public:
Status
StartService();
......@@ -78,6 +87,11 @@ class Snapshots {
Status
DoDropCollection(ScopedSnapshotT& ss, const LSN_TYPE& lsn);
void
OnReaderTimer(const boost::system::error_code&);
void
OnWriterTimer(const boost::system::error_code&);
Status
LoadNoLock(StorePtr store, ID_TYPE collection_id, SnapshotHolderPtr& holder);
Status
......@@ -85,8 +99,11 @@ class Snapshots {
mutable std::shared_timed_mutex mutex_;
std::map<ID_TYPE, SnapshotHolderPtr> holders_;
std::set<ID_TYPE> alive_cids_;
std::map<std::string, ID_TYPE> name_id_map_;
std::vector<Snapshot::Ptr> to_release_;
mutable std::shared_timed_mutex inactive_mtx_;
std::map<ID_TYPE, SnapshotHolderPtr> inactive_holders_;
StorePtr store_;
};
} // namespace milvus::engine::snapshot
......@@ -178,6 +178,13 @@ class Store : public std::enable_shared_from_this<Store> {
return adapter_->SelectBy<ResourceT>(StateField::Name, filter_states, return_vs);
}
template <typename ResourceT>
Status
GetActiveResources(std::vector<typename ResourceT::Ptr>& return_vs) {
std::vector<State> filter_states = {State::ACTIVE};
return adapter_->SelectBy<ResourceT>(StateField::Name, filter_states, return_vs);
}
template <typename ResourceT>
Status
RemoveResource(ID_TYPE id) {
......
......@@ -54,7 +54,7 @@ DBWrapper::StartService() {
}
// wal
opt.wal_enable_ = config.wal.enable();
opt.wal_enable_ = config.wal.enable() && (opt.mode_ != engine::DBOptions::CLUSTER_READONLY);
if (opt.wal_enable_) {
opt.wal_path_ = config.wal.path();
}
......
......@@ -15,6 +15,7 @@
#include <unistd.h>
#include <boost/filesystem.hpp>
#include <cstring>
#include <memory>
#include <unordered_map>
#include "config/ConfigMgr.h"
......@@ -204,7 +205,15 @@ Server::Start() {
server::Metrics::GetInstance().Init();
server::SystemInfo::GetInstance().Init();
return StartService();
STATUS_CHECK(StartService());
// TODO: Pool size should be configurable
STATUS_CHECK(SetPoolSize(2));
STATUS_CHECK(engine::snapshot::Snapshots::GetInstance().RegisterTimers(this));
STATUS_CHECK(TimerManager::Start());
STATUS_CHECK(TimerManager::Run());
return Status::OK();
} catch (std::exception& ex) {
std::string str = "Milvus server encounter exception: " + std::string(ex.what());
return Status(SERVER_UNEXPECTED_ERROR, str);
......@@ -302,6 +311,8 @@ Server::StopService() {
// get error message "Milvus server is shutdown!"
// storage::S3ClientWrapper::GetInstance().StopService();
TimerManager::Stop();
DBWrapper::GetInstance().StopService();
web::WebServer::GetInstance().Stop();
grpc::GrpcServer::GetInstance().Stop();
......
......@@ -11,15 +11,19 @@
#pragma once
#include <boost/asio.hpp>
#include <string>
#include <vector>
#include "config/ServerConfig.h"
#include "utils/Status.h"
#include "utils/ThreadPool.h"
#include "utils/TimerContext.h"
#include "utils/TimerManager.h"
namespace milvus::server {
class Server {
class Server : public TimerManager {
public:
static Server&
GetInstance();
......@@ -28,9 +32,9 @@ class Server {
Init(int64_t daemonized, const std::string& pid_filename, const std::string& config_filename);
Status
Start();
Start() override;
void
Stop();
Stop() override;
private:
Server() = default;
......
......@@ -113,6 +113,7 @@ constexpr ErrorCode DB_PARTITION_NOT_FOUND = ToDbErrorCode(10);
constexpr ErrorCode DB_OUT_OF_STORAGE = ToDbErrorCode(11);
constexpr ErrorCode DB_META_QUERY_FAILED = ToDbErrorCode(12);
constexpr ErrorCode DB_FILE_NOT_FOUND = ToDbErrorCode(13);
constexpr ErrorCode DB_PERMISSION_ERROR = ToDbErrorCode(14);
// knowhere error code
constexpr ErrorCode KNOWHERE_ERROR = ToKnowhereErrorCode(1);
......@@ -139,5 +140,6 @@ constexpr ErrorCode SS_OPERATION_PENDING = ToSSErrorCode(9);
constexpr ErrorCode SS_TIMEOUT = ToSSErrorCode(10);
constexpr ErrorCode SS_NOT_COMMITED = ToSSErrorCode(11);
constexpr ErrorCode SS_COLLECTION_DROPPED = ToSSErrorCode(12);
constexpr ErrorCode SS_EMPTY_HOLDER = ToSSErrorCode(13);
} // namespace milvus
......@@ -12,6 +12,7 @@
#pragma once
#include <fiu/fiu-local.h>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
......@@ -35,6 +36,9 @@ class ThreadPool {
auto
enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;
void
Stop();
~ThreadPool();
private:
......@@ -51,11 +55,11 @@ class ThreadPool {
std::condition_variable condition_;
bool stop;
std::atomic_bool stop_;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads, size_t queue_size) : max_queue_size_(queue_size), stop(false) {
inline ThreadPool::ThreadPool(size_t threads, size_t queue_size) : max_queue_size_(queue_size), stop_(false) {
for (size_t i = 0; i < threads; ++i)
workers_.emplace_back([this] {
for (;;) {
......@@ -63,8 +67,8 @@ inline ThreadPool::ThreadPool(size_t threads, size_t queue_size) : max_queue_siz
{
std::unique_lock<std::mutex> lock(this->queue_mutex_);
this->condition_.wait(lock, [this] { return this->stop || !this->tasks_.empty(); });
if (this->stop && this->tasks_.empty())
this->condition_.wait(lock, [this] { return this->stop_ || !this->tasks_.empty(); });
if (this->stop_ && this->tasks_.empty())
return;
task = std::move(this->tasks_.front());
this->tasks_.pop();
......@@ -84,13 +88,13 @@ ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_o
auto task = std::make_shared<std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
fiu_do_on("ThreadPool.enqueue.stop_is_true", stop = true);
fiu_do_on("ThreadPool.enqueue.stop_is_true", stop_ = true);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex_);
this->condition_.wait(lock, [this] { return this->tasks_.size() < max_queue_size_; });
// don't allow enqueueing after stopping the pool
if (stop)
if (stop_)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks_.emplace([task]() { (*task)(); });
......@@ -99,11 +103,14 @@ ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_o
return res;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
inline void
ThreadPool::Stop() {
if (stop_) {
return;
}
{
std::unique_lock<std::mutex> lock(queue_mutex_);
stop = true;
stop_ = true;
}
condition_.notify_all();
for (std::thread& worker : workers_) {
......@@ -111,4 +118,11 @@ inline ThreadPool::~ThreadPool() {
}
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
Stop();
}
using ThreadPoolPtr = std::shared_ptr<ThreadPool>;
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <functional>
#include <memory>
#include <string>
#include <vector>
#include "utils/Log.h"
#include "utils/ThreadPool.h"
namespace milvus {
struct TimerContext {
using HandlerT = std::function<void(const boost::system::error_code&)>;
struct Context {
/* Context(int interval_us, HandlerT& handler, ThreadPoolPtr pool = nullptr) */
/* : interval_(interval_us), handler_(handler), timer_(io, interval_), pool_(pool) { */
/* } */
int interval_us;
HandlerT handler;
ThreadPoolPtr pool = nullptr;
};
TimerContext(boost::asio::io_service& io, int interval_us, HandlerT& handler, ThreadPoolPtr pool)
: io_(io), interval_(interval_us), handler_(handler), timer_(io, interval_), pool_(pool) {
}
TimerContext(boost::asio::io_service& io, Context& context)
: io_(io),
interval_(context.interval_us),
handler_(context.handler),
timer_(io, interval_),
pool_(context.pool) {
}
void
Reschedule(const boost::system::error_code& ec);
boost::asio::io_service& io_;
boost::posix_time::microseconds interval_;
boost::asio::deadline_timer timer_;
HandlerT handler_;
ThreadPoolPtr pool_;
};
inline void
TimerContext::Reschedule(const boost::system::error_code& ec) {
try {
pool_->enqueue(handler_, ec);
} catch (std::exception& ex) {
LOG_SERVER_ERROR_ << "Fail to enqueue handler: " << std::string(ex.what());
}
boost::system::error_code e;
auto new_expires = timer_.expires_at() + interval_;
timer_.expires_at(new_expires, e);
if (e) {
LOG_SERVER_ERROR_ << "Fail to Reschedule: " << e;
}
timer_.async_wait(std::bind(&TimerContext::Reschedule, this, std::placeholders::_1));
}
using TimerContextPtr = std::shared_ptr<TimerContext>;
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <utility>
#include "utils/Log.h"
#include "utils/TimerManager.h"
namespace milvus {
TimerManager::TimerManager(unsigned int pool_size) : pool_size_(pool_size) {
}
TimerManager::~TimerManager() {
}
Status
TimerManager::SetPoolSize(unsigned int pool_size) {
if (timer_exeutors_) {
return Status(SERVER_UNEXPECTED_ERROR, "Cannot set pool size since it has been set already");
}
pool_size_ = pool_size;
return Status::OK();
}
Status
TimerManager::Run() {
boost::system::error_code ec;
io_.run(ec);
Status status;
if (ec) {
status = Status(SERVER_UNEXPECTED_ERROR, ec.message());
}
return status;
}
Status
TimerManager::Start() {
for (auto& timer : timers_) {
timer->timer_.async_wait(std::bind(&TimerContext::Reschedule, timer, std::placeholders::_1));
}
return Status::OK();
}
void
TimerManager::Stop() {
boost::system::error_code ec;
for (auto& timer : timers_) {
timer->timer_.cancel(ec);
if (ec) {
LOG_SERVER_ERROR_ << "Fail to cancel timer: " << ec;
}
}
if (timer_exeutors_) {
timer_exeutors_->Stop();
}
}
void
TimerManager::AddTimer(int interval_us, TimerContext::HandlerT handler) {
if (!timer_exeutors_) {
timer_exeutors_ = std::make_shared<ThreadPool>(pool_size_);
}
timers_.emplace_back(std::make_shared<TimerContext>(io_, interval_us, handler, timer_exeutors_));
}
void
TimerManager::AddTimer(const TimerContext::Context& ctx) {
if (!timer_exeutors_) {
timer_exeutors_ = std::make_shared<ThreadPool>(pool_size_);
}
TimerContext::Context context(ctx);
context.pool = timer_exeutors_;
timers_.emplace_back(std::make_shared<TimerContext>(io_, context));
}
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <boost/asio.hpp>
#include <functional>
#include <memory>
#include <vector>
#include "utils/Status.h"
#include "utils/ThreadPool.h"
#include "utils/TimerContext.h"
namespace milvus {
class TimerManager {
public:
TimerManager() = default;
explicit TimerManager(unsigned int pool_size);
Status
SetPoolSize(unsigned int pool_size);
void
AddTimer(int interval_us, TimerContext::HandlerT handler);
void
AddTimer(const TimerContext::Context& ctx);
virtual Status
Run();
virtual Status
Start();
virtual void
Stop();
virtual ~TimerManager();
protected:
boost::asio::io_service io_;
ThreadPoolPtr timer_exeutors_;
unsigned int pool_size_;
std::vector<TimerContextPtr> timers_;
};
using TimerManagerPtr = std::shared_ptr<TimerManager>;
} // namespace milvus
......@@ -283,58 +283,60 @@ TEST_F(SnapshotTest, DropCollectionTest) {
ASSERT_FALSE(status.ok());
}
TEST_F(SnapshotTest, ConCurrentCollectionOperation) {
std::string collection_name("c1");
LSN_TYPE lsn = 1;
ID_TYPE stale_ss_id;
auto worker1 = [&]() {
Status status;
auto ss = CreateCollection(collection_name, ++lsn);
ASSERT_TRUE(ss);
ASSERT_EQ(ss->GetName(), collection_name);
stale_ss_id = ss->GetID();
ScopedSnapshotT a_ss;
status = Snapshots::GetInstance().GetSnapshot(a_ss, collection_name);
ASSERT_TRUE(status.ok());
std::this_thread::sleep_for(std::chrono::milliseconds(80));
ASSERT_FALSE(ss->GetCollection()->IsActive());
status = Snapshots::GetInstance().GetSnapshot(a_ss, collection_name);
ASSERT_FALSE(status.ok());
auto c_c = CollectionCommitsHolder::GetInstance().GetResource(stale_ss_id, false);
ASSERT_TRUE(c_c);
ASSERT_EQ(c_c->GetID(), stale_ss_id);
};
auto worker2 = [&] {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
auto status = Snapshots::GetInstance().DropCollection(collection_name, ++lsn);
ASSERT_TRUE(status.ok());
ScopedSnapshotT a_ss;
status = Snapshots::GetInstance().GetSnapshot(a_ss, collection_name);
ASSERT_FALSE(status.ok());
};
auto worker3 = [&] {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
auto ss = CreateCollection(collection_name, ++lsn);
ASSERT_FALSE(ss);
std::this_thread::sleep_for(std::chrono::milliseconds(80));
ss = CreateCollection(collection_name, ++lsn);
ASSERT_TRUE(ss);
ASSERT_EQ(ss->GetName(), collection_name);
};
std::thread t1 = std::thread(worker1);
std::thread t2 = std::thread(worker2);
std::thread t3 = std::thread(worker3);
t1.join();
t2.join();
t3.join();
auto c_c = CollectionCommitsHolder::GetInstance().GetResource(stale_ss_id, false);
ASSERT_FALSE(c_c);
}
/* TEST_F(SnapshotTest, ConCurrentCollectionOperation) { */
/* std::string collection_name("c1"); */
/* LSN_TYPE lsn = 1; */
/* ID_TYPE stale_ss_id; */
/* auto worker1 = [&]() { */
/* Status status; */
/* auto ss = CreateCollection(collection_name, ++lsn); */
/* ASSERT_TRUE(ss); */
/* ASSERT_EQ(ss->GetName(), collection_name); */
/* stale_ss_id = ss->GetID(); */
/* ScopedSnapshotT a_ss; */
/* status = Snapshots::GetInstance().GetSnapshot(a_ss, collection_name); */
/* ASSERT_TRUE(status.ok()); */
/* std::this_thread::sleep_for(std::chrono::milliseconds(80)); */
/* ASSERT_FALSE(ss->GetCollection()->IsActive()); */
/* status = Snapshots::GetInstance().GetSnapshot(a_ss, collection_name); */
/* ASSERT_FALSE(status.ok()); */
/* auto c_c = CollectionCommitsHolder::GetInstance().GetResource(stale_ss_id, false); */
/* ASSERT_TRUE(c_c); */
/* ASSERT_EQ(c_c->GetID(), stale_ss_id); */
/* }; */
/* auto worker2 = [&] { */
/* std::this_thread::sleep_for(std::chrono::milliseconds(50)); */
/* auto status = Snapshots::GetInstance().DropCollection(collection_name, ++lsn); */
/* ASSERT_TRUE(status.ok()); */
/* ScopedSnapshotT a_ss; */
/* status = Snapshots::GetInstance().GetSnapshot(a_ss, collection_name); */
/* ASSERT_FALSE(status.ok()); */
/* }; */
/* auto worker3 = [&] { */
/* std::this_thread::sleep_for(std::chrono::milliseconds(20)); */
/* auto ss = CreateCollection(collection_name, ++lsn); */
/* ASSERT_FALSE(ss); */
/* std::this_thread::sleep_for(std::chrono::milliseconds(80)); */
/* ss = CreateCollection(collection_name, ++lsn); */
/* ASSERT_TRUE(ss); */
/* ASSERT_EQ(ss->GetName(), collection_name); */
/* }; */
/* std::thread t1 = std::thread(worker1); */
/* std::thread t2 = std::thread(worker2); */
/* std::thread t3 = std::thread(worker3); */
/* t1.join(); */
/* t2.join(); */
/* t3.join(); */
/* auto c_c = CollectionCommitsHolder::GetInstance().GetResource(stale_ss_id, false); */
/* ASSERT_FALSE(c_c); */
/* } */
TEST_F(SnapshotTest, PartitionTest) {
fiu_enable("snapshot.policy.w_cluster", 1, nullptr, 0);
fiu_enable("snapshot.policy.duration_10ms", 1, nullptr, 0);
std::string collection_name("c1");
LSN_TYPE lsn = 1;
auto ss = CreateCollection(collection_name, ++lsn);
......@@ -429,6 +431,9 @@ TEST_F(SnapshotTest, PartitionTest) {
ASSERT_TRUE(status.ok());
ASSERT_EQ(curr_ss->NumberOfPartitions(), total_partition_num - i - 1);
}
fiu_disable("snapshot.policy.w_cluster");
fiu_disable("snapshot.policy.duration_10ms");
}
TEST_F(SnapshotTest, PartitionTest2) {
......@@ -458,6 +463,100 @@ TEST_F(SnapshotTest, PartitionTest2) {
ASSERT_FALSE(status.ok()) << status.ToString();
}
TEST_F(SnapshotTest, SnapshotPolicyTest) {
ScopedSnapshotT ss;
auto build_env = [&](const std::string& collection_name, int ms) {
LSN_TYPE lsn = 1;
ss = CreateCollection(collection_name, ++lsn);
ASSERT_TRUE(ss);
SegmentFileContext sf_context;
SFContextBuilder(sf_context, ss);
auto& partitions = ss->GetResources<Partition>();
auto total_row_cnt = 0;
for (auto& kv : partitions) {
auto num = RandomInt(10, 10);
for (auto i = 0; i < num; ++i) {
auto row_cnt = RandomInt(100, 100);
ASSERT_TRUE(CreateSegment(ss, kv.first, ++lsn, sf_context, row_cnt).ok());
total_row_cnt += row_cnt;
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
}
};
auto status = Snapshots::GetInstance().GetSnapshot(ss, collection_name);
ASSERT_TRUE(status.ok());
};
// Test case:
// cluster: disable
// stale_snapshots_count: 0
// Check the num of snapshot should be 1
{
build_env("c1", 1);
int num;
auto status = Snapshots::GetInstance().NumOfSnapshot("c1", num);
ASSERT_TRUE(status.ok());
/* std::cout << "-------------------------------------------------------" << std::endl; */
/* std::cout << "c1 has " << num << " snapshots" << std::endl; */
ASSERT_EQ(num, 1);
}
// Test case:
// cluster: disable
// stale_snapshots_count: 4
// Check the num of snapshot should be 5
{
fiu_enable("snapshot.policy.stale_count_4", 1, nullptr, 0);
build_env("c2", 1);
int num;
auto status = Snapshots::GetInstance().NumOfSnapshot("c2", num);
ASSERT_TRUE(status.ok());
/* std::cout << "-------------------------------------------------------" << std::endl; */
/* std::cout << "c2 has " << num << " snapshots" << std::endl; */
ASSERT_EQ(num, 5);
fiu_disable("snapshot.policy.stale_count_4");
}
// Test case:
// cluster: enable
// role: ClusterRole::RW
// stale_snapshots_duration: 50ms
// Check the num of snapshot should be [2,3]
{
fiu_enable("snapshot.policy.w_cluster", 1, nullptr, 0);
fiu_enable("snapshot.policy.duration_50ms", 1, nullptr, 0);
build_env("c3", 30);
int num;
auto status = Snapshots::GetInstance().NumOfSnapshot("c3", num);
ASSERT_TRUE(status.ok());
/* std::cout << "-------------------------------------------------------" << std::endl; */
/* std::cout << "c3 has " << num << " snapshots" << std::endl; */
ASSERT_TRUE(num<=3 && num >=2);
fiu_disable("snapshot.policy.w_cluster");
fiu_disable("snapshot.policy.duration_50ms");
}
// Test case:
// cluster: enable
// role: ClusterRole::RW
// stale_snapshots_duration: 100ms
// Check the num of snapshot should be [3,4,5]
{
fiu_enable("snapshot.policy.w_cluster", 1, nullptr, 0);
fiu_enable("snapshot.policy.duration_100ms", 1, nullptr, 0);
build_env("c4", 28);
int num;
auto status = Snapshots::GetInstance().NumOfSnapshot("c4", num);
ASSERT_TRUE(status.ok());
/* std::cout << "-------------------------------------------------------" << std::endl; */
/* std::cout << "c4 has " << num << " snapshots" << std::endl; */
std::cout << "num=" << num << std::endl;
ASSERT_TRUE(num<=5 && num >=3);
fiu_disable("snapshot.policy.w_cluster");
fiu_disable("snapshot.policy.duration_100ms");
}
}
TEST_F(SnapshotTest, DropSegmentTest){
LSN_TYPE lsn = 0;
auto next_lsn = [&]() -> decltype(lsn) {
......
......@@ -146,6 +146,8 @@ SnapshotTest::SetUp() {
void
SnapshotTest::TearDown() {
BaseTest::SnapshotStop();
auto path = "/tmp/milvus_ss/db";
std::experimental::filesystem::remove_all(path);
BaseTest::TearDown();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册