From f457cb4328c9cc85e1c87b4f8261c593e7e1a08d Mon Sep 17 00:00:00 2001 From: starlord Date: Sat, 31 Aug 2019 12:07:48 +0800 Subject: [PATCH] fix server exist hang Former-commit-id: 08bf7c7731e35f6befe741729f931c45cc3baa45 --- cpp/src/db/DB.h | 3 + cpp/src/db/DBImpl.cpp | 55 +++++++++++---- cpp/src/db/DBImpl.h | 24 +++---- cpp/src/scheduler/SchedInst.cpp | 7 +- cpp/src/scheduler/SchedInst.h | 5 +- cpp/src/server/DBWrapper.cpp | 20 +++++- cpp/src/server/DBWrapper.h | 24 +++++-- cpp/src/server/Server.cpp | 7 +- cpp/src/server/grpc_impl/GrpcMilvusServer.cpp | 2 - .../server/grpc_impl/GrpcRequestScheduler.cpp | 67 +++++++++---------- .../server/grpc_impl/GrpcRequestScheduler.h | 37 +++++----- 11 files changed, 153 insertions(+), 98 deletions(-) diff --git a/cpp/src/db/DB.h b/cpp/src/db/DB.h index 908714a0..d493c0f7 100644 --- a/cpp/src/db/DB.h +++ b/cpp/src/db/DB.h @@ -22,6 +22,9 @@ class DB { public: static void Open(const Options& options, DB** dbptr); + virtual Status Start() = 0; + virtual Status Stop() = 0; + virtual Status CreateTable(meta::TableSchema& table_schema_) = 0; virtual Status DeleteTable(const std::string& table_id, const meta::DatesT& dates) = 0; virtual Status DescribeTable(meta::TableSchema& table_schema_) = 0; diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 36580dce..b744899d 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -41,17 +41,55 @@ constexpr uint64_t INDEX_ACTION_INTERVAL = 1; DBImpl::DBImpl(const Options& options) : options_(options), - shutting_down_(false), + shutting_down_(true), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) { meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode); mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_); - if (options.mode != Options::MODE::READ_ONLY) { + Start(); +} + +DBImpl::~DBImpl() { + Stop(); +} + +Status DBImpl::Start() { + if (!shutting_down_.load(std::memory_order_acquire)){ + return Status::OK(); + } + + //for distribute version, some nodes are read only + if (options_.mode != Options::MODE::READ_ONLY) { ENGINE_LOG_TRACE << "StartTimerTasks"; - StartTimerTasks(); + bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this); } + shutting_down_.store(false, std::memory_order_release); + return Status::OK(); +} + +Status DBImpl::Stop() { + if (shutting_down_.load(std::memory_order_acquire)){ + return Status::OK(); + } + + shutting_down_.store(true, std::memory_order_release); + bg_timer_thread_.join(); + + //wait compaction/buildindex finish + for(auto& result : compact_thread_results_) { + result.wait(); + } + + for(auto& result : index_thread_results_) { + result.wait(); + } + + //makesure all memory data serialized + MemSerialize(); + + return Status::OK(); } Status DBImpl::CreateTable(meta::TableSchema& table_schema) { @@ -278,10 +316,6 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch return Status::OK(); } -void DBImpl::StartTimerTasks() { - bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this); -} - void DBImpl::BackgroundTimerTask() { Status status; server::SystemInfo::GetInstance().Init(); @@ -741,13 +775,6 @@ Status DBImpl::Size(uint64_t& result) { return meta_ptr_->Size(result); } -DBImpl::~DBImpl() { - shutting_down_.store(true, std::memory_order_release); - bg_timer_thread_.join(); - std::set ids; - mem_mgr_->Serialize(ids); -} - } // namespace engine } // namespace milvus } // namespace zilliz diff --git a/cpp/src/db/DBImpl.h b/cpp/src/db/DBImpl.h index b8f43daf..7fb8295b 100644 --- a/cpp/src/db/DBImpl.h +++ b/cpp/src/db/DBImpl.h @@ -36,6 +36,9 @@ class DBImpl : public DB { explicit DBImpl(const Options &options); + Status Start() override; + Status Stop() override; + Status CreateTable(meta::TableSchema &table_schema) override; Status DeleteTable(const std::string &table_id, const meta::DatesT &dates) override; @@ -91,18 +94,15 @@ class DBImpl : public DB { ~DBImpl() override; private: - Status - QueryAsync(const std::string &table_id, - const meta::TableFilesSchema &files, - uint64_t k, - uint64_t nq, - uint64_t nprobe, - const float *vectors, - const meta::DatesT &dates, - QueryResults &results); - - - void StartTimerTasks(); + Status QueryAsync(const std::string &table_id, + const meta::TableFilesSchema &files, + uint64_t k, + uint64_t nq, + uint64_t nprobe, + const float *vectors, + const meta::DatesT &dates, + QueryResults &results); + void BackgroundTimerTask(); void StartMetricTask(); diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp index 972c2e26..b8fb0f22 100644 --- a/cpp/src/scheduler/SchedInst.cpp +++ b/cpp/src/scheduler/SchedInst.cpp @@ -19,7 +19,7 @@ SchedulerPtr SchedInst::instance = nullptr; std::mutex SchedInst::mutex_; void -SchedServInit() { +StartSchedulerService() { server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE); auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren(); for (auto &resource : resources) { @@ -52,6 +52,11 @@ SchedServInit() { SchedInst::GetInstance()->Start(); } +void +StopSchedulerService() { + ResMgrInst::GetInstance()->Stop(); + SchedInst::GetInstance()->Stop(); +} } } } diff --git a/cpp/src/scheduler/SchedInst.h b/cpp/src/scheduler/SchedInst.h index 3ae36827..92f3575e 100644 --- a/cpp/src/scheduler/SchedInst.h +++ b/cpp/src/scheduler/SchedInst.h @@ -53,7 +53,10 @@ private: }; void -SchedServInit(); +StartSchedulerService(); + +void +StopSchedulerService(); } } diff --git a/cpp/src/server/DBWrapper.cpp b/cpp/src/server/DBWrapper.cpp index d5ca5797..647fea59 100644 --- a/cpp/src/server/DBWrapper.cpp +++ b/cpp/src/server/DBWrapper.cpp @@ -17,6 +17,10 @@ namespace milvus { namespace server { DBWrapper::DBWrapper() { + +} + +ServerError DBWrapper::StartService() { //db config zilliz::milvus::engine::Options opt; ConfigNode& db_config = ServerConfig::GetInstance().GetConfig(CONFIG_DB); @@ -91,7 +95,9 @@ DBWrapper::DBWrapper() { //create db instance std::string msg = opt.meta.path; try { - zilliz::milvus::engine::DB::Open(opt, &db_); + engine::DB* db = nullptr; + zilliz::milvus::engine::DB::Open(opt, &db); + db_.reset(db); } catch(std::exception& ex) { msg = ex.what(); } @@ -100,10 +106,18 @@ DBWrapper::DBWrapper() { std::cout << "ERROR! Failed to open database: " << msg << std::endl; kill(0, SIGUSR1); } + + db_->Start(); + + return SERVER_SUCCESS; } -DBWrapper::~DBWrapper() { - delete db_; +ServerError DBWrapper::StopService() { + if(db_) { + db_->Stop(); + } + + return SERVER_SUCCESS; } } diff --git a/cpp/src/server/DBWrapper.h b/cpp/src/server/DBWrapper.h index fdde4b15..8b25dc0d 100644 --- a/cpp/src/server/DBWrapper.h +++ b/cpp/src/server/DBWrapper.h @@ -5,8 +5,11 @@ ******************************************************************************/ #pragma once +#include "utils/Error.h" #include "db/DB.h" +#include + namespace zilliz { namespace milvus { namespace server { @@ -14,18 +17,27 @@ namespace server { class DBWrapper { private: DBWrapper(); - ~DBWrapper(); + ~DBWrapper() = default; public: - static zilliz::milvus::engine::DB* DB() { - static DBWrapper db_wrapper; - return db_wrapper.db(); + static DBWrapper& GetInstance() { + static DBWrapper wrapper; + return wrapper; + } + + static std::shared_ptr DB() { + return GetInstance().EngineDB(); } - zilliz::milvus::engine::DB* db() { return db_; } + ServerError StartService(); + ServerError StopService(); + + std::shared_ptr EngineDB() { + return db_; + } private: - zilliz::milvus::engine::DB* db_ = nullptr; + std::shared_ptr db_; }; } diff --git a/cpp/src/server/Server.cpp b/cpp/src/server/Server.cpp index 5c0229e3..2df42791 100644 --- a/cpp/src/server/Server.cpp +++ b/cpp/src/server/Server.cpp @@ -21,6 +21,7 @@ #include #include "metrics/Metrics.h" +#include "DBWrapper.h" namespace zilliz { namespace milvus { @@ -158,7 +159,7 @@ Server::Start() { signal(SIGTERM, SignalUtil::HandleSignal); server::Metrics::GetInstance().Init(); server::SystemInfo::GetInstance().Init(); - engine::SchedServInit(); + std::cout << "Milvus server start successfully." << std::endl; StartService(); @@ -221,12 +222,16 @@ Server::LoadConfig() { void Server::StartService() { + engine::StartSchedulerService(); + DBWrapper::GetInstance().StartService(); grpc::GrpcMilvusServer::StartService(); } void Server::StopService() { grpc::GrpcMilvusServer::StopService(); + DBWrapper::GetInstance().StopService(); + engine::StopSchedulerService(); } } diff --git a/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp b/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp index baf91161..737f3dab 100644 --- a/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp +++ b/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp @@ -49,8 +49,6 @@ GrpcMilvusServer::StartService() { faiss::distance_compute_blas_threshold = engine_config.GetInt32Value(CONFIG_DCBT, 20); - DBWrapper::DB();//initialize db - std::string server_address(address + ":" + std::to_string(port)); ::grpc::ServerBuilder builder; diff --git a/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp index 637b4c3b..6f1a42b6 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp @@ -66,16 +66,18 @@ GrpcBaseTask::~GrpcBaseTask() { WaitToFinish(); } -ServerError -GrpcBaseTask::Execute() { +ServerError GrpcBaseTask::Execute() { error_code_ = OnExecute(); + Done(); + return error_code_; +} + +void GrpcBaseTask::Done() { done_ = true; finish_cond_.notify_all(); - return error_code_; } -ServerError -GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) { +ServerError GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) { error_code_ = error_code; error_msg_ = error_msg; @@ -83,8 +85,7 @@ GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) { return error_code_; } -ServerError -GrpcBaseTask::WaitToFinish() { +ServerError GrpcBaseTask::WaitToFinish() { std::unique_lock lock(finish_mtx_); finish_cond_.wait(lock, [this] { return done_; }); @@ -101,8 +102,7 @@ GrpcRequestScheduler::~GrpcRequestScheduler() { Stop(); } -void -GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) { +void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) { if (task_ptr == nullptr) { return; } @@ -120,8 +120,7 @@ GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *gr } } -void -GrpcRequestScheduler::Start() { +void GrpcRequestScheduler::Start() { if (!stopped_) { return; } @@ -129,8 +128,7 @@ GrpcRequestScheduler::Start() { stopped_ = false; } -void -GrpcRequestScheduler::Stop() { +void GrpcRequestScheduler::Stop() { if (stopped_) { return; } @@ -155,8 +153,7 @@ GrpcRequestScheduler::Stop() { SERVER_LOG_INFO << "Scheduler stopped"; } -ServerError -GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { +ServerError GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { if (task_ptr == nullptr) { return SERVER_NULL_POINTER; } @@ -174,33 +171,31 @@ GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { return task_ptr->WaitToFinish();//sync execution } -namespace { - void TakeTaskToExecute(TaskQueuePtr task_queue) { - if (task_queue == nullptr) { - return; - } - while (true) { - BaseTaskPtr task = task_queue->Take(); - if (task == nullptr) { - SERVER_LOG_ERROR << "Take null from task queue, stop thread"; - break;//stop the thread - } +void GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) { + if (task_queue == nullptr) { + return; + } + + while (true) { + BaseTaskPtr task = task_queue->Take(); + if (task == nullptr) { + SERVER_LOG_ERROR << "Take null from task queue, stop thread"; + break;//stop the thread + } - try { - ServerError err = task->Execute(); - if (err != SERVER_SUCCESS) { - SERVER_LOG_ERROR << "Task failed with code: " << err; - } - } catch (std::exception &ex) { - SERVER_LOG_ERROR << "Task failed to execute: " << ex.what(); + try { + ServerError err = task->Execute(); + if (err != SERVER_SUCCESS) { + SERVER_LOG_ERROR << "Task failed with code: " << err; } + } catch (std::exception &ex) { + SERVER_LOG_ERROR << "Task failed to execute: " << ex.what(); } } } -ServerError -GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { +ServerError GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { std::lock_guard lock(queue_mtx_); std::string group_name = task_ptr->TaskGroup(); @@ -212,7 +207,7 @@ GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { task_groups_.insert(std::make_pair(group_name, queue)); //start a thread - ThreadPtr thread = std::make_shared(&TakeTaskToExecute, queue); + ThreadPtr thread = std::make_shared(&GrpcRequestScheduler::TakeTaskToExecute, this, queue); execute_threads_.push_back(thread); SERVER_LOG_INFO << "Create new thread for task group: " << group_name; } diff --git a/cpp/src/server/grpc_impl/GrpcRequestScheduler.h b/cpp/src/server/grpc_impl/GrpcRequestScheduler.h index a436e8de..96be9883 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestScheduler.h +++ b/cpp/src/server/grpc_impl/GrpcRequestScheduler.h @@ -25,30 +25,24 @@ protected: virtual ~GrpcBaseTask(); public: - ServerError - Execute(); + ServerError Execute(); - ServerError - WaitToFinish(); + void Done(); - std::string - TaskGroup() const { return task_group_; } + ServerError WaitToFinish(); - ServerError - ErrorCode() const { return error_code_; } + std::string TaskGroup() const { return task_group_; } - std::string - ErrorMsg() const { return error_msg_; } + ServerError ErrorCode() const { return error_code_; } - bool - IsAsync() const { return async_; } + std::string ErrorMsg() const { return error_msg_; } + + bool IsAsync() const { return async_; } protected: - virtual ServerError - OnExecute() = 0; + virtual ServerError OnExecute() = 0; - ServerError - SetError(ServerError error_code, const std::string &msg); + ServerError SetError(ServerError error_code, const std::string &msg); protected: mutable std::mutex finish_mtx_; @@ -77,19 +71,18 @@ public: void Stop(); - ServerError - ExecuteTask(const BaseTaskPtr &task_ptr); + ServerError ExecuteTask(const BaseTaskPtr &task_ptr); - static void - ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status); + static void ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status); protected: GrpcRequestScheduler(); virtual ~GrpcRequestScheduler(); - ServerError - PutTaskToQueue(const BaseTaskPtr &task_ptr); + void TakeTaskToExecute(TaskQueuePtr task_queue); + + ServerError PutTaskToQueue(const BaseTaskPtr &task_ptr); private: mutable std::mutex queue_mtx_; -- GitLab