未验证 提交 c2459614 编写于 作者: B BossZou 提交者: GitHub

Optimize snapshot GC executor (#3528)

* Optimize gc event executor (#3465)
Signed-off-by: Nyinghao.zou <yinghao.zou@zilliz.com>
上级 b8074591
...@@ -61,6 +61,7 @@ Please mark all changes in change log and use the issue from GitHub ...@@ -61,6 +61,7 @@ Please mark all changes in change log and use the issue from GitHub
- \#2884 Using BlockingQueue in JobMgr - \#2884 Using BlockingQueue in JobMgr
- \#3220 Enable -Werror to improve code quality - \#3220 Enable -Werror to improve code quality
- \#3449 Upgrade master version to v0.11.0 - \#3449 Upgrade master version to v0.11.0
- \#3465 Optimize gc event executor
## Task ## Task
......
...@@ -33,5 +33,7 @@ constexpr int64_t MAX_SCRIPT_FILE_SIZE = 256 * MB; // max file size of ...@@ -33,5 +33,7 @@ constexpr int64_t MAX_SCRIPT_FILE_SIZE = 256 * MB; // max file size of
constexpr int64_t BUILD_INEDX_RETRY_TIMES = 3; // retry times if build index failed constexpr int64_t BUILD_INEDX_RETRY_TIMES = 3; // retry times if build index failed
constexpr const char* DB_FOLDER = "/db";
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
...@@ -90,15 +90,6 @@ DBImpl::Start() { ...@@ -90,15 +90,6 @@ DBImpl::Start() {
return Status::OK(); return Status::OK();
} }
// snapshot
auto store = snapshot::Store::Build(options_.meta_.backend_uri_, options_.meta_.path_,
codec::Codec::instance().GetSuffixSet());
snapshot::OperationExecutor::Init(store);
snapshot::OperationExecutor::GetInstance().Start();
snapshot::EventExecutor::Init(store);
snapshot::EventExecutor::GetInstance().Start();
snapshot::Snapshots::GetInstance().Init(store);
knowhere::enable_faiss_logging(); knowhere::enable_faiss_logging();
// LOG_ENGINE_TRACE_ << "DB service start"; // LOG_ENGINE_TRACE_ << "DB service start";
...@@ -155,9 +146,6 @@ DBImpl::Stop() { ...@@ -155,9 +146,6 @@ DBImpl::Stop() {
bg_metric_thread_.join(); bg_metric_thread_.join();
} }
snapshot::EventExecutor::GetInstance().Stop();
snapshot::OperationExecutor::GetInstance().Stop();
// LOG_ENGINE_TRACE_ << "DB service stop"; // LOG_ENGINE_TRACE_ << "DB service stop";
return Status::OK(); return Status::OK();
} }
......
...@@ -958,7 +958,7 @@ CreateCollectionOperation::DoExecute(StorePtr store) { ...@@ -958,7 +958,7 @@ CreateCollectionOperation::DoExecute(StorePtr store) {
auto status = store->CreateResource<Collection>( auto status = store->CreateResource<Collection>(
Collection(c_context_.collection->GetName(), c_context_.collection->GetParams()), collection); Collection(c_context_.collection->GetName(), c_context_.collection->GetParams()), collection);
if (!status.ok()) { if (!status.ok()) {
std::cerr << status.ToString() << std::endl; LOG_ENGINE_ERROR_ << status.ToString();
return status; return status;
} }
auto c_ctx_p = ResourceContextBuilder<Collection>().SetOp(meta::oUpdate).CreatePtr(); auto c_ctx_p = ResourceContextBuilder<Collection>().SetOp(meta::oUpdate).CreatePtr();
......
...@@ -13,18 +13,22 @@ ...@@ -13,18 +13,22 @@
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <queue>
#include <thread> #include <thread>
#include <utility>
#include "db/SimpleWaitNotify.h"
#include "db/snapshot/MetaEvent.h" #include "db/snapshot/MetaEvent.h"
#include "utils/BlockingQueue.h" #include "utils/BlockingQueue.h"
namespace milvus { namespace milvus::engine::snapshot {
namespace engine {
namespace snapshot {
using EventPtr = std::shared_ptr<MetaEvent>; using EventPtr = std::shared_ptr<MetaEvent>;
using ThreadPtr = std::shared_ptr<std::thread>; using ThreadPtr = std::shared_ptr<std::thread>;
using EventQueue = BlockingQueue<EventPtr>; using EventQueue = std::queue<EventPtr>;
constexpr size_t EVENT_QUEUE_SIZE = 4096;
constexpr size_t EVENT_TIMING_INTERVAL = 5;
class EventExecutor { class EventExecutor {
public: public:
...@@ -52,35 +56,51 @@ class EventExecutor { ...@@ -52,35 +56,51 @@ class EventExecutor {
} }
Status Status
Submit(const EventPtr& evt) { Submit(const EventPtr& evt, bool flush = false) {
if (evt == nullptr) { if (evt == nullptr) {
return Status(SS_INVALID_ARGUMENT_ERROR, "Invalid Resource"); return Status(SS_INVALID_ARGUMENT_ERROR, "Invalid Resource");
} }
Enqueue(evt); Enqueue(evt, flush);
return Status::OK(); return Status::OK();
} }
void void
Start() { Start() {
if (thread_ptr_ == nullptr) { if (running_.exchange(true)) {
thread_ptr_ = std::make_shared<std::thread>(&EventExecutor::ThreadMain, this); return;
}
if (gc_thread_ptr_ == nullptr) {
gc_thread_ptr_ = std::make_shared<std::thread>(&EventExecutor::GCThread, this);
}
if (timing_thread_ptr_ == nullptr) {
timing_thread_ptr_ = std::make_shared<std::thread>(&EventExecutor::TimingThread, this);
} }
} }
void void
Stop() { Stop() {
if (thread_ptr_ != nullptr) { if (!running_.exchange(false)) {
Enqueue(nullptr); // executor has been stopped, just return
thread_ptr_->join(); return;
thread_ptr_ = nullptr; }
std::cout << "EventExecutor Stopped" << std::endl;
timing_.Notify();
if (timing_thread_ptr_ != nullptr) {
timing_thread_ptr_->join();
timing_thread_ptr_ = nullptr;
}
if (gc_thread_ptr_ != nullptr) {
gc_thread_ptr_->join();
gc_thread_ptr_ = nullptr;
} }
} }
private: private:
EventExecutor() { EventExecutor() = default;
queue_.SetCapacity(10000);
}
static EventExecutor& static EventExecutor&
GetInstanceImpl() { GetInstanceImpl() {
...@@ -89,24 +109,79 @@ class EventExecutor { ...@@ -89,24 +109,79 @@ class EventExecutor {
} }
void void
ThreadMain() { GCThread() {
Status status; Status status;
while (true) { while (true) {
EventPtr evt = queue_.Take(); auto front = cache_queues_.Take();
if (evt == nullptr) {
while (front && !front->empty()) {
EventPtr event_front(front->front());
front->pop();
if (event_front == nullptr) {
continue;
}
/* std::cout << std::this_thread::get_id() << " Dequeue Event " << std::endl; */
status = event_front->Process(store_);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "EventExecutor Handle Event Error: " << status.ToString();
}
}
if ((!initialized_.load() || !running_.load()) && cache_queues_.Empty()) {
break; break;
} }
/* std::cout << std::this_thread::get_id() << " Dequeue Event " << std::endl; */ }
status = evt->Process(store_); }
if (!status.ok()) {
std::cout << "EventExecutor Handle Event Error: " << status.ToString() << std::endl; void
TimingThread() {
while (true) {
timing_.Wait_For(std::chrono::seconds(EVENT_TIMING_INTERVAL));
std::shared_ptr<EventQueue> queue;
{
std::unique_lock<std::mutex> lock(mutex_);
if (queue_ != nullptr && !queue_->empty()) {
queue = std::move(queue_);
queue_ = nullptr;
}
}
if (queue) {
cache_queues_.Put(queue);
}
if (!running_.load() || !initialized_.load()) {
cache_queues_.Put(nullptr);
break;
} }
} }
} }
void void
Enqueue(const EventPtr& evt) { Enqueue(const EventPtr& evt, bool flush = false) {
queue_.Put(evt); if (!initialized_.load() || !running_.load()) {
LOG_ENGINE_WARNING_ << "GcEvent exiting ...";
return;
}
bool need_notify = flush;
{
std::unique_lock<std::mutex> lock(mutex_);
if (queue_ == nullptr) {
queue_ = std::make_shared<EventQueue>();
}
queue_->push(evt);
if (queue_->size() >= EVENT_QUEUE_SIZE) {
need_notify = true;
}
}
if (!initialized_.load() || !running_.load() || need_notify) {
timing_.Notify();
}
if (evt != nullptr) { if (evt != nullptr) {
/* std::cout << std::this_thread::get_id() << " Enqueue Event " << std::endl; */ /* std::cout << std::this_thread::get_id() << " Enqueue Event " << std::endl; */
} }
...@@ -114,12 +189,15 @@ class EventExecutor { ...@@ -114,12 +189,15 @@ class EventExecutor {
EventExecutor(const EventExecutor&) = delete; EventExecutor(const EventExecutor&) = delete;
ThreadPtr thread_ptr_ = nullptr; ThreadPtr timing_thread_ptr_ = nullptr;
EventQueue queue_; ThreadPtr gc_thread_ptr_ = nullptr;
SimpleWaitNotify timing_;
std::mutex mutex_;
std::shared_ptr<EventQueue> queue_;
BlockingQueue<std::shared_ptr<EventQueue>> cache_queues_;
std::atomic_bool initialized_ = false; std::atomic_bool initialized_ = false;
std::atomic_bool running_ = false;
StorePtr store_; StorePtr store_;
}; };
} // namespace snapshot } // namespace milvus::engine::snapshot
} // namespace engine
} // namespace milvus
...@@ -42,7 +42,7 @@ class InActiveResourcesGCEvent : public GCEvent, public Operations { ...@@ -42,7 +42,7 @@ class InActiveResourcesGCEvent : public GCEvent, public Operations {
Status Status
OnExecute(StorePtr store) override { OnExecute(StorePtr store) override {
std::cout << "Executing InActiveResourcesGCEvent" << std::endl; LOG_ENGINE_INFO_ << "Executing InActiveResourcesGCEvent";
STATUS_CHECK(ClearInActiveResources<Collection>(store)); STATUS_CHECK(ClearInActiveResources<Collection>(store));
STATUS_CHECK(ClearInActiveResources<CollectionCommit>(store)); STATUS_CHECK(ClearInActiveResources<CollectionCommit>(store));
...@@ -78,7 +78,7 @@ class InActiveResourcesGCEvent : public GCEvent, public Operations { ...@@ -78,7 +78,7 @@ class InActiveResourcesGCEvent : public GCEvent, public Operations {
/* std::cout << "[GC] Remove file " << res_->ToString() << " " << res_path << " " << ok << std::endl; */ /* std::cout << "[GC] Remove file " << res_->ToString() << " " << res_path << " " << ok << std::endl; */
} else { } else {
RemoveWithSuffix<ResourceT>(res, res_path, store->GetSuffixSet()); RemoveWithSuffix<ResourceT>(res, res_path, store->GetSuffixSet());
std::cout << "[GC] Remove stale " << res_path << " for " << res->ToString() << std::endl; LOG_ENGINE_DEBUG_ << "[GC] Remove stale " << res_path << " for " << res->ToString();
} }
/* remove resource from meta */ /* remove resource from meta */
......
...@@ -40,11 +40,11 @@ class GCEvent : virtual public MetaEvent { ...@@ -40,11 +40,11 @@ class GCEvent : virtual public MetaEvent {
auto adjusted = path + suffix; auto adjusted = path + suffix;
if (std::experimental::filesystem::is_regular_file(adjusted)) { if (std::experimental::filesystem::is_regular_file(adjusted)) {
auto ok = std::experimental::filesystem::remove(adjusted); auto ok = std::experimental::filesystem::remove(adjusted);
std::cout << "[GC] Remove FILE " << res->ToString() << " " << adjusted << " " << ok << std::endl; LOG_ENGINE_DEBUG_ << "[GC] Remove FILE " << res->ToString() << " " << adjusted << " " << ok;
return; return;
} }
} }
std::cout << "[GC] Remove STALE OBJECT " << path << " for " << res->ToString() << std::endl; LOG_ENGINE_DEBUG_ << "[GC] Remove STALE OBJECT " << path << " for " << res->ToString();
} }
}; };
......
...@@ -73,7 +73,7 @@ class OperationExecutor { ...@@ -73,7 +73,7 @@ class OperationExecutor {
Enqueue(nullptr); Enqueue(nullptr);
thread_ptr_->join(); thread_ptr_->join();
thread_ptr_ = nullptr; thread_ptr_ = nullptr;
std::cout << "OperationExecutor Stopped" << std::endl; LOG_ENGINE_INFO_ << "OperationExecutor Stopped";
} }
} }
......
...@@ -21,9 +21,7 @@ ...@@ -21,9 +21,7 @@
#include "db/snapshot/ResourceGCEvent.h" #include "db/snapshot/ResourceGCEvent.h"
#include "db/snapshot/Snapshots.h" #include "db/snapshot/Snapshots.h"
namespace milvus { namespace milvus::engine::snapshot {
namespace engine {
namespace snapshot {
static ID_TYPE UID = 1; static ID_TYPE UID = 1;
...@@ -114,7 +112,7 @@ Operations::Done(StorePtr store) { ...@@ -114,7 +112,7 @@ Operations::Done(StorePtr store) {
/* context_.new_collection_commit->GetCollectionId(), holder.rbegin()->GetID()); */ /* context_.new_collection_commit->GetCollectionId(), holder.rbegin()->GetID()); */
/* } */ /* } */
/* } */ /* } */
std::cout << ToString() << std::endl; LOG_ENGINE_DEBUG_ << ToString();
} }
finish_cond_.notify_all(); finish_cond_.notify_all();
} }
...@@ -255,7 +253,7 @@ Operations::OnApplyTimeoutCallback(StorePtr store) { ...@@ -255,7 +253,7 @@ Operations::OnApplyTimeoutCallback(StorePtr store) {
auto status = store->ApplyOperation(*this, context); auto status = store->ApplyOperation(*this, context);
while (status.code() == SS_TIMEOUT && !HasAborted()) { while (status.code() == SS_TIMEOUT && !HasAborted()) {
std::cout << GetName() << " Timeout! Try " << ++try_times << std::endl; LOG_ENGINE_WARNING_ << GetName() << " Timeout! Try " << ++try_times;
std::this_thread::sleep_for(std::chrono::milliseconds(1)); std::this_thread::sleep_for(std::chrono::milliseconds(1));
status = store->ApplyOperation(*this, context); status = store->ApplyOperation(*this, context);
} }
...@@ -290,7 +288,7 @@ ApplyRollBack(std::set<std::shared_ptr<ResourceContext<ResourceT>>>& step_contex ...@@ -290,7 +288,7 @@ ApplyRollBack(std::set<std::shared_ptr<ResourceContext<ResourceT>>>& step_contex
auto res = step_context->Resource(); auto res = step_context->Resource();
auto evt_ptr = std::make_shared<ResourceGCEvent<ResourceT>>(res); auto evt_ptr = std::make_shared<ResourceGCEvent<ResourceT>>(res);
EventExecutor::GetInstance().Submit(evt_ptr); EventExecutor::GetInstance().Submit(evt_ptr);
std::cout << "Rollback " << typeid(ResourceT).name() << ": " << res->GetID() << std::endl; LOG_ENGINE_DEBUG_ << "Rollback " << typeid(ResourceT).name() << ": " << res->GetID();
} }
} }
...@@ -305,6 +303,4 @@ Operations::~Operations() { ...@@ -305,6 +303,4 @@ Operations::~Operations() {
} }
} }
} // namespace snapshot } // namespace milvus::engine::snapshot
} // namespace engine
} // namespace milvus
...@@ -48,16 +48,16 @@ class ResourceGCEvent : public GCEvent { ...@@ -48,16 +48,16 @@ class ResourceGCEvent : public GCEvent {
if (res_path.empty()) { if (res_path.empty()) {
/* std::cout << "[GC] No remove action for " << res_->ToString() << std::endl; */ /* std::cout << "[GC] No remove action for " << res_->ToString() << std::endl; */
} else if (std::experimental::filesystem::is_directory(res_path)) { } else if (std::experimental::filesystem::is_directory(res_path)) {
std::cout << "[GC] Remove DIR " << res_->ToString() << " " << res_path << std::endl; LOG_ENGINE_DEBUG_ << "[GC] Remove DIR " << res_->ToString() << " " << res_path;
auto ok = std::experimental::filesystem::remove_all(res_path); auto ok = std::experimental::filesystem::remove_all(res_path);
} else if (std::experimental::filesystem::is_regular_file(res_path)) { } else if (std::experimental::filesystem::is_regular_file(res_path)) {
std::cout << "[GC] Remove FILE " << res_->ToString() << " " << res_path << std::endl; LOG_ENGINE_DEBUG_ << "[GC] Remove FILE " << res_->ToString() << " " << res_path;
auto ok = std::experimental::filesystem::remove(res_path); auto ok = std::experimental::filesystem::remove(res_path);
} else { } else {
RemoveWithSuffix<ResourceT>(res_, res_path, store->GetSuffixSet()); RemoveWithSuffix<ResourceT>(res_, res_path, store->GetSuffixSet());
} }
} catch (const std::experimental::filesystem::filesystem_error& er) { } catch (const std::experimental::filesystem::filesystem_error& er) {
std::cout << "[GC] " << er.what() << std::endl; LOG_SERVER_ERROR_ << "[GC] Error when removing path " << res_path << ": " << er.what();
if (do_throw) { if (do_throw) {
throw; throw;
} }
...@@ -73,7 +73,7 @@ class ResourceGCEvent : public GCEvent { ...@@ -73,7 +73,7 @@ class ResourceGCEvent : public GCEvent {
break; break;
} }
std::this_thread::sleep_for(std::chrono::milliseconds(1)); std::this_thread::sleep_for(std::chrono::milliseconds(1));
std::cout << "[GC] Retry remove: " << res_path << std::endl; LOG_ENGINE_DEBUG_ << "[GC] Retry remove: " << res_path;
} }
auto hard_delete_meta = [](ID_TYPE id, StorePtr store) -> Status { auto hard_delete_meta = [](ID_TYPE id, StorePtr store) -> Status {
...@@ -82,7 +82,7 @@ class ResourceGCEvent : public GCEvent { ...@@ -82,7 +82,7 @@ class ResourceGCEvent : public GCEvent {
}; };
if (!status.ok()) { if (!status.ok()) {
std::cout << "[GC] Stale Resource: " << res_path << " need to be cleanup later" << std::endl; LOG_ENGINE_ERROR_ << "[GC] Stale Resource: " << res_path << " need to be cleanup later";
return status; return status;
} }
......
...@@ -105,12 +105,12 @@ class ResourceHolder { ...@@ -105,12 +105,12 @@ class ResourceHolder {
virtual void virtual void
Dump(const std::string& tag = "") { Dump(const std::string& tag = "") {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
std::cout << typeid(*this).name() << " Dump Start [" << tag << "]:" << id_map_.size() << std::endl; LOG_ENGINE_DEBUG_ << typeid(*this).name() << " Dump Start [" << tag << "]:" << id_map_.size();
for (auto& kv : id_map_) { for (auto& kv : id_map_) {
/* std::cout << "\t" << kv.second->ToString() << std::endl; */ /* std::cout << "\t" << kv.second->ToString() << std::endl; */
std::cout << "\t" << kv.first << " RefCnt " << kv.second->ref_count() << std::endl; LOG_ENGINE_DEBUG_ << "\t" << kv.first << " RefCnt " << kv.second->ref_count();
} }
std::cout << typeid(*this).name() << " Dump End [" << tag << "]" << std::endl; LOG_ENGINE_DEBUG_ << typeid(*this).name() << " Dump End [" << tag << "]";
} }
private: private:
......
...@@ -272,13 +272,13 @@ class Snapshot : public ReferenceProxy { ...@@ -272,13 +272,13 @@ class Snapshot : public ReferenceProxy {
void void
DumpResource(const std::string& tag = "") const { DumpResource(const std::string& tag = "") const {
auto& resources = GetResources<ResourceT>(); auto& resources = GetResources<ResourceT>();
std::cout << typeid(*this).name() << " Dump " << GetID() << " " << ResourceT::Name << " Start [" << tag LOG_ENGINE_DEBUG_ << typeid(*this).name() << " Dump " << GetID() << " " << ResourceT::Name << " Start [" << tag
<< "]:" << resources.size() << std::endl; << "]:" << resources.size();
for (auto& kv : resources) { for (auto& kv : resources) {
std::cout << "\t" << kv.second->ToString() << std::endl; LOG_ENGINE_DEBUG_ << "\t" << kv.second->ToString();
} }
std::cout << typeid(*this).name() << " Dump " << GetID() << " " << ResourceT::Name << " End [" << tag LOG_ENGINE_DEBUG_ << typeid(*this).name() << " Dump " << GetID() << " " << ResourceT::Name << " End [" << tag
<< "]:" << resources.size() << std::endl; << "]:" << resources.size();
} }
template <typename T> template <typename T>
......
...@@ -10,13 +10,16 @@ ...@@ -10,13 +10,16 @@
// or implied. See the License for the specific language governing permissions and limitations under the License. // or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/snapshot/Snapshots.h" #include "db/snapshot/Snapshots.h"
#include "config/ServerConfig.h"
#include "db/Constants.h"
#include "db/snapshot/CompoundOperations.h" #include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/EventExecutor.h" #include "db/snapshot/EventExecutor.h"
#include "db/snapshot/InActiveResourcesGCEvent.h" #include "db/snapshot/InActiveResourcesGCEvent.h"
#include "db/snapshot/OperationExecutor.h"
#include "utils/CommonUtil.h"
namespace milvus { namespace milvus::engine::snapshot {
namespace engine {
namespace snapshot {
/* Status */ /* Status */
/* Snapshots::DropAll() { */ /* Snapshots::DropAll() { */
...@@ -128,7 +131,7 @@ Snapshots::LoadNoLock(StorePtr store, ID_TYPE collection_id, SnapshotHolderPtr& ...@@ -128,7 +131,7 @@ Snapshots::LoadNoLock(StorePtr store, ID_TYPE collection_id, SnapshotHolderPtr&
Status Status
Snapshots::Init(StorePtr store) { Snapshots::Init(StorePtr store) {
auto event = std::make_shared<InActiveResourcesGCEvent>(); auto event = std::make_shared<InActiveResourcesGCEvent>();
EventExecutor::GetInstance().Submit(event); EventExecutor::GetInstance().Submit(event, true);
STATUS_CHECK(event->WaitToFinish()); STATUS_CHECK(event->WaitToFinish());
auto op = std::make_shared<GetCollectionIDsOperation>(); auto op = std::make_shared<GetCollectionIDsOperation>();
STATUS_CHECK((*op)(store)); STATUS_CHECK((*op)(store));
...@@ -209,10 +212,34 @@ void ...@@ -209,10 +212,34 @@ void
Snapshots::SnapshotGCCallback(Snapshot::Ptr ss_ptr) { Snapshots::SnapshotGCCallback(Snapshot::Ptr ss_ptr) {
/* to_release_.push_back(ss_ptr); */ /* to_release_.push_back(ss_ptr); */
ss_ptr->UnRef(); ss_ptr->UnRef();
std::cout << "Snapshot " << ss_ptr->GetID() << " ref_count = " << ss_ptr->ref_count() << " To be removed" LOG_ENGINE_DEBUG_ << "Snapshot " << ss_ptr->GetID() << " ref_count = " << ss_ptr->ref_count() << " To be removed";
<< std::endl; }
Status
Snapshots::StartService() {
auto meta_path = config.storage.path() + DB_FOLDER;
// create db root path
auto s = CommonUtil::CreateDirectory(meta_path);
if (!s.ok()) {
std::cerr << "Error: Failed to create database primary path: " << meta_path
<< ". Possible reason: db_config.primary_path is wrong in milvus.yaml or not available." << std::endl;
kill(0, SIGUSR1);
}
auto store = snapshot::Store::Build(config.general.meta_uri(), meta_path, codec::Codec::instance().GetSuffixSet());
snapshot::OperationExecutor::Init(store);
snapshot::OperationExecutor::GetInstance().Start();
snapshot::EventExecutor::Init(store);
snapshot::EventExecutor::GetInstance().Start();
return snapshot::Snapshots::GetInstance().Init(store);
}
Status
Snapshots::StopService() {
snapshot::EventExecutor::GetInstance().Stop();
snapshot::OperationExecutor::GetInstance().Stop();
return Status::OK();
} }
} // namespace snapshot } // namespace milvus::engine::snapshot
} // namespace engine
} // namespace milvus
...@@ -23,9 +23,7 @@ ...@@ -23,9 +23,7 @@
#include "db/snapshot/Store.h" #include "db/snapshot/Store.h"
#include "utils/Status.h" #include "utils/Status.h"
namespace milvus { namespace milvus::engine::snapshot {
namespace engine {
namespace snapshot {
class Snapshots { class Snapshots {
public: public:
...@@ -66,6 +64,13 @@ class Snapshots { ...@@ -66,6 +64,13 @@ class Snapshots {
Status Init(StorePtr); Status Init(StorePtr);
public:
Status
StartService();
Status
StopService();
private: private:
void void
SnapshotGCCallback(Snapshot::Ptr ss_ptr); SnapshotGCCallback(Snapshot::Ptr ss_ptr);
...@@ -84,6 +89,4 @@ class Snapshots { ...@@ -84,6 +89,4 @@ class Snapshots {
std::vector<Snapshot::Ptr> to_release_; std::vector<Snapshot::Ptr> to_release_;
}; };
} // namespace snapshot } // namespace milvus::engine::snapshot
} // namespace engine
} // namespace milvus
...@@ -244,7 +244,7 @@ class Store : public std::enable_shared_from_this<Store> { ...@@ -244,7 +244,7 @@ class Store : public std::enable_shared_from_this<Store> {
DoReset() { DoReset() {
auto status = adapter_->TruncateAll(); auto status = adapter_->TruncateAll();
if (!status.ok()) { if (!status.ok()) {
std::cout << "TruncateAll failed: " << status.ToString() << std::endl; LOG_ENGINE_ERROR_ << "TruncateAll failed: " << status.ToString();
} }
} }
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <vector> #include <vector>
#include "config/ServerConfig.h" #include "config/ServerConfig.h"
#include "db/Constants.h"
#include "db/DBFactory.h" #include "db/DBFactory.h"
#include "db/snapshot/OperationExecutor.h" #include "db/snapshot/OperationExecutor.h"
#include "utils/CommonUtil.h" #include "utils/CommonUtil.h"
...@@ -35,7 +36,7 @@ DBWrapper::StartService() { ...@@ -35,7 +36,7 @@ DBWrapper::StartService() {
opt.meta_.backend_uri_ = config.general.meta_uri(); opt.meta_.backend_uri_ = config.general.meta_uri();
std::string path = config.storage.path(); std::string path = config.storage.path();
opt.meta_.path_ = path + "/db"; opt.meta_.path_ = path + engine::DB_FOLDER;
opt.auto_flush_interval_ = config.storage.auto_flush_interval(); opt.auto_flush_interval_ = config.storage.auto_flush_interval();
opt.metric_enable_ = config.metric.enable(); opt.metric_enable_ = config.metric.enable();
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <unordered_map> #include <unordered_map>
#include "config/ServerConfig.h" #include "config/ServerConfig.h"
#include "db/snapshot/Snapshots.h"
#include "index/archive/KnowhereResource.h" #include "index/archive/KnowhereResource.h"
#include "log/LogMgr.h" #include "log/LogMgr.h"
#include "metrics/Metrics.h" #include "metrics/Metrics.h"
...@@ -250,6 +251,8 @@ Server::StartService() { ...@@ -250,6 +251,8 @@ Server::StartService() {
goto FAIL; goto FAIL;
} }
engine::snapshot::Snapshots::GetInstance().StartService();
scheduler::StartSchedulerService(); scheduler::StartSchedulerService();
stat = DBWrapper::GetInstance().StartService(); stat = DBWrapper::GetInstance().StartService();
...@@ -280,6 +283,7 @@ Server::StopService() { ...@@ -280,6 +283,7 @@ Server::StopService() {
grpc::GrpcServer::GetInstance().Stop(); grpc::GrpcServer::GetInstance().Stop();
DBWrapper::GetInstance().StopService(); DBWrapper::GetInstance().StopService();
scheduler::StopSchedulerService(); scheduler::StopSchedulerService();
engine::snapshot::Snapshots::GetInstance().StopService();
engine::KnowhereResource::Finalize(); engine::KnowhereResource::Finalize();
} }
......
...@@ -24,12 +24,14 @@ ...@@ -24,12 +24,14 @@
#include "db/SnapshotUtils.h" #include "db/SnapshotUtils.h"
#include "db/SnapshotVisitor.h" #include "db/SnapshotVisitor.h"
#include "db/snapshot/IterateHandler.h" #include "db/snapshot/IterateHandler.h"
#include "db/snapshot/InActiveResourcesGCEvent.h"
#include "db/snapshot/ResourceHelper.h" #include "db/snapshot/ResourceHelper.h"
#include "db/utils.h" #include "db/utils.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h" #include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "segment/Segment.h" #include "segment/Segment.h"
using SegmentVisitor = milvus::engine::SegmentVisitor; using SegmentVisitor = milvus::engine::SegmentVisitor;
using InActiveResourcesGCEvent = milvus::engine::snapshot::InActiveResourcesGCEvent;
namespace { namespace {
const char* VECTOR_FIELD_NAME = "vector"; const char* VECTOR_FIELD_NAME = "vector";
...@@ -284,7 +286,7 @@ TEST_F(DBTest, CollectionTest) { ...@@ -284,7 +286,7 @@ TEST_F(DBTest, CollectionTest) {
}; };
std::string c1 = "c1"; std::string c1 = "c1";
auto status = CreateCollection(db_, c1, next_lsn()); auto status = CreateCollection(db_, c1, next_lsn());
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.ok()) << status.ToString();
ScopedSnapshotT ss; ScopedSnapshotT ss;
status = Snapshots::GetInstance().GetSnapshot(ss, c1); status = Snapshots::GetInstance().GetSnapshot(ss, c1);
...@@ -672,6 +674,9 @@ TEST_F(DBTest, MergeTest) { ...@@ -672,6 +674,9 @@ TEST_F(DBTest, MergeTest) {
// wait to merge finished // wait to merge finished
sleep(2); sleep(2);
auto event = std::make_shared<InActiveResourcesGCEvent>();
milvus::engine::snapshot::EventExecutor::GetInstance().Submit(event, true);
event->WaitToFinish();
// validate entities count // validate entities count
int64_t row_count = 0; int64_t row_count = 0;
......
...@@ -27,6 +27,9 @@ using FType = milvus::engine::DataType; ...@@ -27,6 +27,9 @@ using FType = milvus::engine::DataType;
using FEType = milvus::engine::FieldElementType; using FEType = milvus::engine::FieldElementType;
using InActiveResourcesGCEvent = milvus::engine::snapshot::InActiveResourcesGCEvent; using InActiveResourcesGCEvent = milvus::engine::snapshot::InActiveResourcesGCEvent;
template<typename T>
using ResourceGCEvent = milvus::engine::snapshot::ResourceGCEvent<T>;
using EventExecutor = milvus::engine::snapshot::EventExecutor;
TEST_F(EventTest, TestInActiveResGcEvent) { TEST_F(EventTest, TestInActiveResGcEvent) {
CollectionPtr collection; CollectionPtr collection;
...@@ -104,8 +107,6 @@ TEST_F(EventTest, TestInActiveResGcEvent) { ...@@ -104,8 +107,6 @@ TEST_F(EventTest, TestInActiveResGcEvent) {
auto event = std::make_shared<InActiveResourcesGCEvent>(); auto event = std::make_shared<InActiveResourcesGCEvent>();
status = event->Process(store_); status = event->Process(store_);
// milvus::engine::snapshot::EventExecutor::GetInstance().Submit(event);
// status = event->WaitToFinish();
ASSERT_TRUE(status.ok()) << status.ToString(); ASSERT_TRUE(status.ok()) << status.ToString();
std::vector<FieldElementPtr> field_elements; std::vector<FieldElementPtr> field_elements;
...@@ -152,3 +153,47 @@ TEST_F(EventTest, TestInActiveResGcEvent) { ...@@ -152,3 +153,47 @@ TEST_F(EventTest, TestInActiveResGcEvent) {
ASSERT_TRUE(store_->GetInActiveResources<CollectionCommit>(collection_commits).ok()); ASSERT_TRUE(store_->GetInActiveResources<CollectionCommit>(collection_commits).ok());
ASSERT_TRUE(collection_commits.empty()); ASSERT_TRUE(collection_commits.empty());
} }
TEST_F(EventTest, GcTest) {
std::string collection_name = "event_test_gc_test";
CollectionPtr collection;
auto cc = Collection(collection_name);
cc.Activate();
auto status = store_->CreateResource(std::move(cc), collection);
ASSERT_TRUE(status.ok()) << status.ToString();
CollectionPtr collection2;
status = store_->GetResource<Collection>(collection->GetID(), collection2);
ASSERT_TRUE(status.ok()) << status.ToString();
auto event = std::make_shared<ResourceGCEvent<Collection>>(collection);
status = event->Process(store_);
ASSERT_TRUE(status.ok()) << status.ToString();
CollectionPtr rcollection;
status = store_->GetResource<Collection>(collection->GetID(),rcollection);
ASSERT_FALSE(status.ok());
}
TEST_F(EventTest, GcBlockingTest) {
size_t max_count = 1000;
std::vector<CollectionPtr> collections;
for (size_t i = 0; i < max_count; i++) {
CollectionPtr collection;
std::string name = "test_gc_c_" + std::to_string(i);
auto status = store_->CreateResource(Collection(name), collection);
ASSERT_TRUE(status.ok()) << status.ToString();
collections.push_back(collection);
}
auto start = std::chrono::system_clock::now();
for (auto& collection : collections) {
auto gc_event = std::make_shared<ResourceGCEvent<Collection>>(collection);
EventExecutor::GetInstance().Submit(gc_event);
}
auto end = std::chrono::system_clock::now();
auto count = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
ASSERT_LT(count, 1000);
}
...@@ -137,7 +137,7 @@ void ...@@ -137,7 +137,7 @@ void
SnapshotTest::SetUp() { SnapshotTest::SetUp() {
BaseTest::SetUp(); BaseTest::SetUp();
DBOptions options; DBOptions options;
options.meta_.path_ = "/tmp/milvus_ss"; options.meta_.path_ = "/tmp/milvus_ss/db";
options.meta_.backend_uri_ = "mock://:@:/"; options.meta_.backend_uri_ = "mock://:@:/";
options.wal_enable_ = false; options.wal_enable_ = false;
BaseTest::SnapshotStart(true, options); BaseTest::SnapshotStart(true, options);
...@@ -155,7 +155,7 @@ DBTest::GetOptions() { ...@@ -155,7 +155,7 @@ DBTest::GetOptions() {
milvus::cache::CpuCacheMgr::GetInstance().SetCapacity(256 * milvus::engine::MB); milvus::cache::CpuCacheMgr::GetInstance().SetCapacity(256 * milvus::engine::MB);
auto options = DBOptions(); auto options = DBOptions();
options.meta_.path_ = "/tmp/milvus_ss"; options.meta_.path_ = "/tmp/milvus_ss/db";
options.meta_.backend_uri_ = "mock://:@:/"; options.meta_.backend_uri_ = "mock://:@:/";
options.wal_enable_ = false; options.wal_enable_ = false;
options.auto_flush_interval_ = 1; options.auto_flush_interval_ = 1;
...@@ -165,7 +165,9 @@ DBTest::GetOptions() { ...@@ -165,7 +165,9 @@ DBTest::GetOptions() {
void void
DBTest::SetUp() { DBTest::SetUp() {
BaseTest::SetUp(); BaseTest::SetUp();
BaseTest::SnapshotStart(false, GetOptions()); auto options = GetOptions();
std::experimental::filesystem::create_directories(options.meta_.path_);
BaseTest::SnapshotStart(false, options);
dummy_context_ = std::make_shared<milvus::server::Context>("dummy_request_id"); dummy_context_ = std::make_shared<milvus::server::Context>("dummy_request_id");
opentracing::mocktracer::MockTracerOptions tracer_options; opentracing::mocktracer::MockTracerOptions tracer_options;
...@@ -175,9 +177,6 @@ DBTest::SetUp() { ...@@ -175,9 +177,6 @@ DBTest::SetUp() {
auto trace_context = std::make_shared<milvus::tracing::TraceContext>(mock_span); auto trace_context = std::make_shared<milvus::tracing::TraceContext>(mock_span);
dummy_context_->SetTraceContext(trace_context); dummy_context_->SetTraceContext(trace_context);
db_ = milvus::engine::DBFactory::BuildDB(GetOptions());
db_->Start();
auto res_mgr = milvus::scheduler::ResMgrInst::GetInstance(); auto res_mgr = milvus::scheduler::ResMgrInst::GetInstance();
res_mgr->Clear(); res_mgr->Clear();
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("disk", "DISK", 0, false)); res_mgr->Add(milvus::scheduler::ResourceFactory::Create("disk", "DISK", 0, false));
...@@ -194,6 +193,9 @@ DBTest::SetUp() { ...@@ -194,6 +193,9 @@ DBTest::SetUp() {
milvus::scheduler::SchedInst::GetInstance()->Start(); milvus::scheduler::SchedInst::GetInstance()->Start();
milvus::scheduler::JobMgrInst::GetInstance()->Start(); milvus::scheduler::JobMgrInst::GetInstance()->Start();
milvus::scheduler::CPUBuilderInst::GetInstance()->Start(); milvus::scheduler::CPUBuilderInst::GetInstance()->Start();
db_ = milvus::engine::DBFactory::BuildDB(GetOptions());
db_->Start();
} }
void void
...@@ -220,7 +222,7 @@ void ...@@ -220,7 +222,7 @@ void
SegmentTest::SetUp() { SegmentTest::SetUp() {
BaseTest::SetUp(); BaseTest::SetUp();
DBOptions options; DBOptions options;
options.meta_.path_ = "/tmp/milvus_ss"; options.meta_.path_ = "/tmp/milvus_ss/db";
options.meta_.backend_uri_ = "mock://:@:/"; options.meta_.backend_uri_ = "mock://:@:/";
options.wal_enable_ = false; options.wal_enable_ = false;
BaseTest::SnapshotStart(false, options); BaseTest::SnapshotStart(false, options);
...@@ -231,9 +233,9 @@ SegmentTest::SetUp() { ...@@ -231,9 +233,9 @@ SegmentTest::SetUp() {
void void
SegmentTest::TearDown() { SegmentTest::TearDown() {
BaseTest::SnapshotStop();
db_->Stop(); db_->Stop();
db_ = nullptr; db_ = nullptr;
BaseTest::SnapshotStop();
BaseTest::TearDown(); BaseTest::TearDown();
} }
...@@ -257,7 +259,7 @@ void ...@@ -257,7 +259,7 @@ void
SchedulerTest::SetUp() { SchedulerTest::SetUp() {
BaseTest::SetUp(); BaseTest::SetUp();
DBOptions options; DBOptions options;
options.meta_.path_ = "/tmp/milvus_ss"; options.meta_.path_ = "/tmp/milvus_ss/db";
options.meta_.backend_uri_ = "mock://:@:/"; options.meta_.backend_uri_ = "mock://:@:/";
options.wal_enable_ = false; options.wal_enable_ = false;
BaseTest::SnapshotStart(true, options); BaseTest::SnapshotStart(true, options);
...@@ -303,10 +305,17 @@ EventTest::SetUp() { ...@@ -303,10 +305,17 @@ EventTest::SetUp() {
auto uri = "mock://:@:/"; auto uri = "mock://:@:/";
store_ = Store::Build(uri, "/tmp/milvus_ss/db"); store_ = Store::Build(uri, "/tmp/milvus_ss/db");
store_->DoReset(); store_->DoReset();
milvus::engine::snapshot::OperationExecutor::Init(store_);
milvus::engine::snapshot::OperationExecutor::GetInstance().Start();
milvus::engine::snapshot::EventExecutor::Init(store_);
milvus::engine::snapshot::EventExecutor::GetInstance().Start();
} }
void void
EventTest::TearDown() { EventTest::TearDown() {
milvus::engine::snapshot::EventExecutor::GetInstance().Stop();
milvus::engine::snapshot::OperationExecutor::GetInstance().Stop();
} }
///////////////////////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////////////////////
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "config/ConfigMgr.h" #include "config/ConfigMgr.h"
#include "db/snapshot/EventExecutor.h" #include "db/snapshot/EventExecutor.h"
#include "db/snapshot/OperationExecutor.h" #include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/Snapshots.h"
#include "scheduler/ResourceFactory.h" #include "scheduler/ResourceFactory.h"
#include "scheduler/SchedInst.h" #include "scheduler/SchedInst.h"
#include "server/DBWrapper.h" #include "server/DBWrapper.h"
...@@ -158,11 +159,11 @@ static const char* CONTROLLER_TEST_VALID_CONFIG_STR = ...@@ -158,11 +159,11 @@ static const char* CONTROLLER_TEST_VALID_CONFIG_STR =
"\n" "\n"
"general:\n" "general:\n"
" timezone: UTC+8\n" " timezone: UTC+8\n"
" meta_uri: sqlite://:@:/\n" " meta_uri: mock://:@:/\n"
"\n" "\n"
"network:\n" "network:\n"
" bind.address: 0.0.0.0\n" " bind.address: 0.0.0.0\n"
" bind.port: 19530\n" " bind.port: 19540\n"
" http.enable: true\n" " http.enable: true\n"
" http.port: 29999\n" " http.port: 29999\n"
"\n" "\n"
...@@ -323,7 +324,18 @@ class WebControllerTest : public ::testing::Test { ...@@ -323,7 +324,18 @@ class WebControllerTest : public ::testing::Test {
fs.close(); fs.close();
milvus::ConfigMgr::GetInstance().Init(); milvus::ConfigMgr::GetInstance().Init();
milvus::ConfigMgr::GetInstance().Load(config_path); // milvus::ConfigMgr::GetInstance().Set("general.meta_uri", "mock://:@:/");
// milvus::ConfigMgr::GetInstance().Set("storage.path", CONTROLLER_TEST_CONFIG_DIR);
// milvus::ConfigMgr::GetInstance().Set("network.http.enable", "true");
// milvus::ConfigMgr::GetInstance().Set("network.http.port", "20121");
auto& config = milvus::ConfigMgr::GetInstance();
// milvus::ConfigMgr::GetInstance().Init();
config.Load(config_path);
// milvus::ConfigMgr::GetInstance().Set("general.meta_uri", "mock://:@:/");
milvus::engine::snapshot::Snapshots::GetInstance().StartService();
auto res_mgr = milvus::scheduler::ResMgrInst::GetInstance(); auto res_mgr = milvus::scheduler::ResMgrInst::GetInstance();
res_mgr->Clear(); res_mgr->Clear();
...@@ -358,7 +370,6 @@ class WebControllerTest : public ::testing::Test { ...@@ -358,7 +370,6 @@ class WebControllerTest : public ::testing::Test {
fs.flush(); fs.flush();
fs.close(); fs.close();
// milvus::ConfigMgr::GetInstance().Init();
milvus::ConfigMgr::GetInstance().Load(config_path); milvus::ConfigMgr::GetInstance().Load(config_path);
OATPP_COMPONENT(std::shared_ptr<oatpp::network::ClientConnectionProvider>, clientConnectionProvider); OATPP_COMPONENT(std::shared_ptr<oatpp::network::ClientConnectionProvider>, clientConnectionProvider);
...@@ -382,6 +393,8 @@ class WebControllerTest : public ::testing::Test { ...@@ -382,6 +393,8 @@ class WebControllerTest : public ::testing::Test {
milvus::scheduler::ResMgrInst::GetInstance()->Stop(); milvus::scheduler::ResMgrInst::GetInstance()->Stop();
milvus::scheduler::ResMgrInst::GetInstance()->Clear(); milvus::scheduler::ResMgrInst::GetInstance()->Clear();
milvus::engine::snapshot::Snapshots::GetInstance().StopService();
boost::filesystem::remove_all(CONTROLLER_TEST_CONFIG_DIR); boost::filesystem::remove_all(CONTROLLER_TEST_CONFIG_DIR);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册