提交 f457cb43 编写于 作者: S starlord

fix server exist hang


Former-commit-id: 08bf7c7731e35f6befe741729f931c45cc3baa45
上级 7d4f3569
......@@ -22,6 +22,9 @@ class DB {
public:
static void Open(const Options& options, DB** dbptr);
virtual Status Start() = 0;
virtual Status Stop() = 0;
virtual Status CreateTable(meta::TableSchema& table_schema_) = 0;
virtual Status DeleteTable(const std::string& table_id, const meta::DatesT& dates) = 0;
virtual Status DescribeTable(meta::TableSchema& table_schema_) = 0;
......
......@@ -41,17 +41,55 @@ constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
DBImpl::DBImpl(const Options& options)
: options_(options),
shutting_down_(false),
shutting_down_(true),
compact_thread_pool_(1, 1),
index_thread_pool_(1, 1) {
meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
if (options.mode != Options::MODE::READ_ONLY) {
Start();
}
DBImpl::~DBImpl() {
Stop();
}
Status DBImpl::Start() {
if (!shutting_down_.load(std::memory_order_acquire)){
return Status::OK();
}
//for distribute version, some nodes are read only
if (options_.mode != Options::MODE::READ_ONLY) {
ENGINE_LOG_TRACE << "StartTimerTasks";
StartTimerTasks();
bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
}
shutting_down_.store(false, std::memory_order_release);
return Status::OK();
}
Status DBImpl::Stop() {
if (shutting_down_.load(std::memory_order_acquire)){
return Status::OK();
}
shutting_down_.store(true, std::memory_order_release);
bg_timer_thread_.join();
//wait compaction/buildindex finish
for(auto& result : compact_thread_results_) {
result.wait();
}
for(auto& result : index_thread_results_) {
result.wait();
}
//makesure all memory data serialized
MemSerialize();
return Status::OK();
}
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
......@@ -278,10 +316,6 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
return Status::OK();
}
void DBImpl::StartTimerTasks() {
bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
}
void DBImpl::BackgroundTimerTask() {
Status status;
server::SystemInfo::GetInstance().Init();
......@@ -741,13 +775,6 @@ Status DBImpl::Size(uint64_t& result) {
return meta_ptr_->Size(result);
}
DBImpl::~DBImpl() {
shutting_down_.store(true, std::memory_order_release);
bg_timer_thread_.join();
std::set<std::string> ids;
mem_mgr_->Serialize(ids);
}
} // namespace engine
} // namespace milvus
} // namespace zilliz
......@@ -36,6 +36,9 @@ class DBImpl : public DB {
explicit DBImpl(const Options &options);
Status Start() override;
Status Stop() override;
Status CreateTable(meta::TableSchema &table_schema) override;
Status DeleteTable(const std::string &table_id, const meta::DatesT &dates) override;
......@@ -91,18 +94,15 @@ class DBImpl : public DB {
~DBImpl() override;
private:
Status
QueryAsync(const std::string &table_id,
const meta::TableFilesSchema &files,
uint64_t k,
uint64_t nq,
uint64_t nprobe,
const float *vectors,
const meta::DatesT &dates,
QueryResults &results);
void StartTimerTasks();
Status QueryAsync(const std::string &table_id,
const meta::TableFilesSchema &files,
uint64_t k,
uint64_t nq,
uint64_t nprobe,
const float *vectors,
const meta::DatesT &dates,
QueryResults &results);
void BackgroundTimerTask();
void StartMetricTask();
......
......@@ -19,7 +19,7 @@ SchedulerPtr SchedInst::instance = nullptr;
std::mutex SchedInst::mutex_;
void
SchedServInit() {
StartSchedulerService() {
server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE);
auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren();
for (auto &resource : resources) {
......@@ -52,6 +52,11 @@ SchedServInit() {
SchedInst::GetInstance()->Start();
}
void
StopSchedulerService() {
ResMgrInst::GetInstance()->Stop();
SchedInst::GetInstance()->Stop();
}
}
}
}
......@@ -53,7 +53,10 @@ private:
};
void
SchedServInit();
StartSchedulerService();
void
StopSchedulerService();
}
}
......
......@@ -17,6 +17,10 @@ namespace milvus {
namespace server {
DBWrapper::DBWrapper() {
}
ServerError DBWrapper::StartService() {
//db config
zilliz::milvus::engine::Options opt;
ConfigNode& db_config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
......@@ -91,7 +95,9 @@ DBWrapper::DBWrapper() {
//create db instance
std::string msg = opt.meta.path;
try {
zilliz::milvus::engine::DB::Open(opt, &db_);
engine::DB* db = nullptr;
zilliz::milvus::engine::DB::Open(opt, &db);
db_.reset(db);
} catch(std::exception& ex) {
msg = ex.what();
}
......@@ -100,10 +106,18 @@ DBWrapper::DBWrapper() {
std::cout << "ERROR! Failed to open database: " << msg << std::endl;
kill(0, SIGUSR1);
}
db_->Start();
return SERVER_SUCCESS;
}
DBWrapper::~DBWrapper() {
delete db_;
ServerError DBWrapper::StopService() {
if(db_) {
db_->Stop();
}
return SERVER_SUCCESS;
}
}
......
......@@ -5,8 +5,11 @@
******************************************************************************/
#pragma once
#include "utils/Error.h"
#include "db/DB.h"
#include <memory>
namespace zilliz {
namespace milvus {
namespace server {
......@@ -14,18 +17,27 @@ namespace server {
class DBWrapper {
private:
DBWrapper();
~DBWrapper();
~DBWrapper() = default;
public:
static zilliz::milvus::engine::DB* DB() {
static DBWrapper db_wrapper;
return db_wrapper.db();
static DBWrapper& GetInstance() {
static DBWrapper wrapper;
return wrapper;
}
static std::shared_ptr<engine::DB> DB() {
return GetInstance().EngineDB();
}
zilliz::milvus::engine::DB* db() { return db_; }
ServerError StartService();
ServerError StopService();
std::shared_ptr<engine::DB> EngineDB() {
return db_;
}
private:
zilliz::milvus::engine::DB* db_ = nullptr;
std::shared_ptr<engine::DB> db_;
};
}
......
......@@ -21,6 +21,7 @@
#include <src/scheduler/SchedInst.h>
#include "metrics/Metrics.h"
#include "DBWrapper.h"
namespace zilliz {
namespace milvus {
......@@ -158,7 +159,7 @@ Server::Start() {
signal(SIGTERM, SignalUtil::HandleSignal);
server::Metrics::GetInstance().Init();
server::SystemInfo::GetInstance().Init();
engine::SchedServInit();
std::cout << "Milvus server start successfully." << std::endl;
StartService();
......@@ -221,12 +222,16 @@ Server::LoadConfig() {
void
Server::StartService() {
engine::StartSchedulerService();
DBWrapper::GetInstance().StartService();
grpc::GrpcMilvusServer::StartService();
}
void
Server::StopService() {
grpc::GrpcMilvusServer::StopService();
DBWrapper::GetInstance().StopService();
engine::StopSchedulerService();
}
}
......
......@@ -49,8 +49,6 @@ GrpcMilvusServer::StartService() {
faiss::distance_compute_blas_threshold = engine_config.GetInt32Value(CONFIG_DCBT, 20);
DBWrapper::DB();//initialize db
std::string server_address(address + ":" + std::to_string(port));
::grpc::ServerBuilder builder;
......
......@@ -66,16 +66,18 @@ GrpcBaseTask::~GrpcBaseTask() {
WaitToFinish();
}
ServerError
GrpcBaseTask::Execute() {
ServerError GrpcBaseTask::Execute() {
error_code_ = OnExecute();
Done();
return error_code_;
}
void GrpcBaseTask::Done() {
done_ = true;
finish_cond_.notify_all();
return error_code_;
}
ServerError
GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) {
ServerError GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) {
error_code_ = error_code;
error_msg_ = error_msg;
......@@ -83,8 +85,7 @@ GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) {
return error_code_;
}
ServerError
GrpcBaseTask::WaitToFinish() {
ServerError GrpcBaseTask::WaitToFinish() {
std::unique_lock<std::mutex> lock(finish_mtx_);
finish_cond_.wait(lock, [this] { return done_; });
......@@ -101,8 +102,7 @@ GrpcRequestScheduler::~GrpcRequestScheduler() {
Stop();
}
void
GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) {
void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) {
if (task_ptr == nullptr) {
return;
}
......@@ -120,8 +120,7 @@ GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *gr
}
}
void
GrpcRequestScheduler::Start() {
void GrpcRequestScheduler::Start() {
if (!stopped_) {
return;
}
......@@ -129,8 +128,7 @@ GrpcRequestScheduler::Start() {
stopped_ = false;
}
void
GrpcRequestScheduler::Stop() {
void GrpcRequestScheduler::Stop() {
if (stopped_) {
return;
}
......@@ -155,8 +153,7 @@ GrpcRequestScheduler::Stop() {
SERVER_LOG_INFO << "Scheduler stopped";
}
ServerError
GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) {
ServerError GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) {
if (task_ptr == nullptr) {
return SERVER_NULL_POINTER;
}
......@@ -174,33 +171,31 @@ GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) {
return task_ptr->WaitToFinish();//sync execution
}
namespace {
void TakeTaskToExecute(TaskQueuePtr task_queue) {
if (task_queue == nullptr) {
return;
}
while (true) {
BaseTaskPtr task = task_queue->Take();
if (task == nullptr) {
SERVER_LOG_ERROR << "Take null from task queue, stop thread";
break;//stop the thread
}
void GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) {
if (task_queue == nullptr) {
return;
}
while (true) {
BaseTaskPtr task = task_queue->Take();
if (task == nullptr) {
SERVER_LOG_ERROR << "Take null from task queue, stop thread";
break;//stop the thread
}
try {
ServerError err = task->Execute();
if (err != SERVER_SUCCESS) {
SERVER_LOG_ERROR << "Task failed with code: " << err;
}
} catch (std::exception &ex) {
SERVER_LOG_ERROR << "Task failed to execute: " << ex.what();
try {
ServerError err = task->Execute();
if (err != SERVER_SUCCESS) {
SERVER_LOG_ERROR << "Task failed with code: " << err;
}
} catch (std::exception &ex) {
SERVER_LOG_ERROR << "Task failed to execute: " << ex.what();
}
}
}
ServerError
GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) {
ServerError GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) {
std::lock_guard<std::mutex> lock(queue_mtx_);
std::string group_name = task_ptr->TaskGroup();
......@@ -212,7 +207,7 @@ GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) {
task_groups_.insert(std::make_pair(group_name, queue));
//start a thread
ThreadPtr thread = std::make_shared<std::thread>(&TakeTaskToExecute, queue);
ThreadPtr thread = std::make_shared<std::thread>(&GrpcRequestScheduler::TakeTaskToExecute, this, queue);
execute_threads_.push_back(thread);
SERVER_LOG_INFO << "Create new thread for task group: " << group_name;
}
......
......@@ -25,30 +25,24 @@ protected:
virtual ~GrpcBaseTask();
public:
ServerError
Execute();
ServerError Execute();
ServerError
WaitToFinish();
void Done();
std::string
TaskGroup() const { return task_group_; }
ServerError WaitToFinish();
ServerError
ErrorCode() const { return error_code_; }
std::string TaskGroup() const { return task_group_; }
std::string
ErrorMsg() const { return error_msg_; }
ServerError ErrorCode() const { return error_code_; }
bool
IsAsync() const { return async_; }
std::string ErrorMsg() const { return error_msg_; }
bool IsAsync() const { return async_; }
protected:
virtual ServerError
OnExecute() = 0;
virtual ServerError OnExecute() = 0;
ServerError
SetError(ServerError error_code, const std::string &msg);
ServerError SetError(ServerError error_code, const std::string &msg);
protected:
mutable std::mutex finish_mtx_;
......@@ -77,19 +71,18 @@ public:
void Stop();
ServerError
ExecuteTask(const BaseTaskPtr &task_ptr);
ServerError ExecuteTask(const BaseTaskPtr &task_ptr);
static void
ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status);
static void ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status);
protected:
GrpcRequestScheduler();
virtual ~GrpcRequestScheduler();
ServerError
PutTaskToQueue(const BaseTaskPtr &task_ptr);
void TakeTaskToExecute(TaskQueuePtr task_queue);
ServerError PutTaskToQueue(const BaseTaskPtr &task_ptr);
private:
mutable std::mutex queue_mtx_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册