diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 1d367916eace88d84e96c3da50efec5326460f21..08cd89d6defcdadab912613839be381b8dfe5b17 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -17,6 +17,8 @@ Please mark all change in change log and use the ticket from JIRA. - MS-430 - Search no result if index created with FLAT - MS-443 - Create index hang again - MS-436 - Delete vectors failed if index created with index_type: IVF_FLAT/IVF_SQ8 +- MS-450 - server hang after run stop_server.sh +- MS-449 - Add vectors twice success, once with ids, the other no ids ## Improvement - MS-327 - Clean code for milvus diff --git a/cpp/src/db/DB.h b/cpp/src/db/DB.h index 908714a08a61a8a7e50a379b49bbf8f413f09b12..d493c0f78cad2cacc77ac5edd7085dbe78a122b6 100644 --- a/cpp/src/db/DB.h +++ b/cpp/src/db/DB.h @@ -22,6 +22,9 @@ class DB { public: static void Open(const Options& options, DB** dbptr); + virtual Status Start() = 0; + virtual Status Stop() = 0; + virtual Status CreateTable(meta::TableSchema& table_schema_) = 0; virtual Status DeleteTable(const std::string& table_id, const meta::DatesT& dates) = 0; virtual Status DescribeTable(meta::TableSchema& table_schema_) = 0; diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 36580dcec563db6187a18cedd46b08bf0040584a..b744899d561b98e5a0bd66b62847412158b37e60 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -41,17 +41,55 @@ constexpr uint64_t INDEX_ACTION_INTERVAL = 1; DBImpl::DBImpl(const Options& options) : options_(options), - shutting_down_(false), + shutting_down_(true), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) { meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode); mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_); - if (options.mode != Options::MODE::READ_ONLY) { + Start(); +} + +DBImpl::~DBImpl() { + Stop(); +} + +Status DBImpl::Start() { + if (!shutting_down_.load(std::memory_order_acquire)){ + return Status::OK(); + } + + //for distribute version, some nodes are read only + if (options_.mode != Options::MODE::READ_ONLY) { ENGINE_LOG_TRACE << "StartTimerTasks"; - StartTimerTasks(); + bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this); } + shutting_down_.store(false, std::memory_order_release); + return Status::OK(); +} + +Status DBImpl::Stop() { + if (shutting_down_.load(std::memory_order_acquire)){ + return Status::OK(); + } + + shutting_down_.store(true, std::memory_order_release); + bg_timer_thread_.join(); + + //wait compaction/buildindex finish + for(auto& result : compact_thread_results_) { + result.wait(); + } + + for(auto& result : index_thread_results_) { + result.wait(); + } + + //makesure all memory data serialized + MemSerialize(); + + return Status::OK(); } Status DBImpl::CreateTable(meta::TableSchema& table_schema) { @@ -278,10 +316,6 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch return Status::OK(); } -void DBImpl::StartTimerTasks() { - bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this); -} - void DBImpl::BackgroundTimerTask() { Status status; server::SystemInfo::GetInstance().Init(); @@ -741,13 +775,6 @@ Status DBImpl::Size(uint64_t& result) { return meta_ptr_->Size(result); } -DBImpl::~DBImpl() { - shutting_down_.store(true, std::memory_order_release); - bg_timer_thread_.join(); - std::set ids; - mem_mgr_->Serialize(ids); -} - } // namespace engine } // namespace milvus } // namespace zilliz diff --git a/cpp/src/db/DBImpl.h b/cpp/src/db/DBImpl.h index b8f43daf83724e5df0a224c6687cd14dcd42fcf0..7fb8295b2f37e72054e1acaaa9621f4d94fe3840 100644 --- a/cpp/src/db/DBImpl.h +++ b/cpp/src/db/DBImpl.h @@ -36,6 +36,9 @@ class DBImpl : public DB { explicit DBImpl(const Options &options); + Status Start() override; + Status Stop() override; + Status CreateTable(meta::TableSchema &table_schema) override; Status DeleteTable(const std::string &table_id, const meta::DatesT &dates) override; @@ -91,18 +94,15 @@ class DBImpl : public DB { ~DBImpl() override; private: - Status - QueryAsync(const std::string &table_id, - const meta::TableFilesSchema &files, - uint64_t k, - uint64_t nq, - uint64_t nprobe, - const float *vectors, - const meta::DatesT &dates, - QueryResults &results); - - - void StartTimerTasks(); + Status QueryAsync(const std::string &table_id, + const meta::TableFilesSchema &files, + uint64_t k, + uint64_t nq, + uint64_t nprobe, + const float *vectors, + const meta::DatesT &dates, + QueryResults &results); + void BackgroundTimerTask(); void StartMetricTask(); diff --git a/cpp/src/db/Utils.cpp b/cpp/src/db/Utils.cpp index 61675b9dc32184b67d7b2cbf8434111c27aa0258..efe238d86f4120e343b64725d383bfc199087749 100644 --- a/cpp/src/db/Utils.cpp +++ b/cpp/src/db/Utils.cpp @@ -152,10 +152,6 @@ bool IsSameIndex(const TableIndex& index1, const TableIndex& index2) { && index1.metric_type_ == index2.metric_type_; } -bool UserDefinedId(int64_t flag) { - return flag & meta::FLAG_MASK_USERID; -} - meta::DateT GetDate(const std::time_t& t, int day_delta) { struct tm ltm; localtime_r(&t, <m); diff --git a/cpp/src/db/Utils.h b/cpp/src/db/Utils.h index d2b8a751f19ec21131b3b04cffbfee1feff10ea5..2094250a1f3c5fd71b81d742004033b5edaca1cb 100644 --- a/cpp/src/db/Utils.h +++ b/cpp/src/db/Utils.h @@ -28,8 +28,6 @@ Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& bool IsSameIndex(const TableIndex& index1, const TableIndex& index2); -bool UserDefinedId(int64_t flag); - meta::DateT GetDate(const std::time_t &t, int day_delta = 0); meta::DateT GetDate(); meta::DateT GetDateWithDelta(int day_delta); diff --git a/cpp/src/db/meta/MetaTypes.h b/cpp/src/db/meta/MetaTypes.h index e4c68decbc03acf4663904c661725ce3a2774205..e31be40ddc25c72476007cb66483b59f7d978bdd 100644 --- a/cpp/src/db/meta/MetaTypes.h +++ b/cpp/src/db/meta/MetaTypes.h @@ -22,7 +22,8 @@ constexpr int32_t DEFAULT_NLIST = 16384; constexpr int32_t DEFAULT_METRIC_TYPE = (int)MetricType::L2; constexpr int32_t DEFAULT_INDEX_FILE_SIZE = ONE_GB; -constexpr int64_t FLAG_MASK_USERID = 1; +constexpr int64_t FLAG_MASK_NO_USERID = 0x1; +constexpr int64_t FLAG_MASK_HAS_USERID = 0x1<<1; typedef int DateT; const DateT EmptyDate = -1; diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp index 42352eed5e38f8d3790e3a9e0d1401ce35ef7ecc..3ee8cbfdb6e4197669a5758f742a5fc6996fc532 100644 --- a/cpp/src/scheduler/SchedInst.cpp +++ b/cpp/src/scheduler/SchedInst.cpp @@ -20,7 +20,7 @@ SchedulerPtr SchedInst::instance = nullptr; std::mutex SchedInst::mutex_; void -SchedServInit() { +StartSchedulerService() { server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE); auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren(); for (auto &resource : resources) { @@ -57,6 +57,11 @@ SchedServInit() { SchedInst::GetInstance()->Start(); } +void +StopSchedulerService() { + ResMgrInst::GetInstance()->Stop(); + SchedInst::GetInstance()->Stop(); +} } } } diff --git a/cpp/src/scheduler/SchedInst.h b/cpp/src/scheduler/SchedInst.h index 3ae36827a8211596aa28cfe02bfb329579478fa1..92f3575ebc20c846670ef0314a344a3511a86ff3 100644 --- a/cpp/src/scheduler/SchedInst.h +++ b/cpp/src/scheduler/SchedInst.h @@ -53,7 +53,10 @@ private: }; void -SchedServInit(); +StartSchedulerService(); + +void +StopSchedulerService(); } } diff --git a/cpp/src/server/DBWrapper.cpp b/cpp/src/server/DBWrapper.cpp index d5ca579773ee86875235096e4c3d034a01c2d7fb..647fea598e9f6879846ce12fc2fefa998473c075 100644 --- a/cpp/src/server/DBWrapper.cpp +++ b/cpp/src/server/DBWrapper.cpp @@ -17,6 +17,10 @@ namespace milvus { namespace server { DBWrapper::DBWrapper() { + +} + +ServerError DBWrapper::StartService() { //db config zilliz::milvus::engine::Options opt; ConfigNode& db_config = ServerConfig::GetInstance().GetConfig(CONFIG_DB); @@ -91,7 +95,9 @@ DBWrapper::DBWrapper() { //create db instance std::string msg = opt.meta.path; try { - zilliz::milvus::engine::DB::Open(opt, &db_); + engine::DB* db = nullptr; + zilliz::milvus::engine::DB::Open(opt, &db); + db_.reset(db); } catch(std::exception& ex) { msg = ex.what(); } @@ -100,10 +106,18 @@ DBWrapper::DBWrapper() { std::cout << "ERROR! Failed to open database: " << msg << std::endl; kill(0, SIGUSR1); } + + db_->Start(); + + return SERVER_SUCCESS; } -DBWrapper::~DBWrapper() { - delete db_; +ServerError DBWrapper::StopService() { + if(db_) { + db_->Stop(); + } + + return SERVER_SUCCESS; } } diff --git a/cpp/src/server/DBWrapper.h b/cpp/src/server/DBWrapper.h index fdde4b157cfbaa71aa093966282acf8ae91d5e4a..8b25dc0d28291499de46aa89c193220a8d0a679d 100644 --- a/cpp/src/server/DBWrapper.h +++ b/cpp/src/server/DBWrapper.h @@ -5,8 +5,11 @@ ******************************************************************************/ #pragma once +#include "utils/Error.h" #include "db/DB.h" +#include + namespace zilliz { namespace milvus { namespace server { @@ -14,18 +17,27 @@ namespace server { class DBWrapper { private: DBWrapper(); - ~DBWrapper(); + ~DBWrapper() = default; public: - static zilliz::milvus::engine::DB* DB() { - static DBWrapper db_wrapper; - return db_wrapper.db(); + static DBWrapper& GetInstance() { + static DBWrapper wrapper; + return wrapper; + } + + static std::shared_ptr DB() { + return GetInstance().EngineDB(); } - zilliz::milvus::engine::DB* db() { return db_; } + ServerError StartService(); + ServerError StopService(); + + std::shared_ptr EngineDB() { + return db_; + } private: - zilliz::milvus::engine::DB* db_ = nullptr; + std::shared_ptr db_; }; } diff --git a/cpp/src/server/Server.cpp b/cpp/src/server/Server.cpp index 5c0229e319adc31edc2b0982de443f22639317ae..2df42791309f4b842a41af978441b769d3e1ba59 100644 --- a/cpp/src/server/Server.cpp +++ b/cpp/src/server/Server.cpp @@ -21,6 +21,7 @@ #include #include "metrics/Metrics.h" +#include "DBWrapper.h" namespace zilliz { namespace milvus { @@ -158,7 +159,7 @@ Server::Start() { signal(SIGTERM, SignalUtil::HandleSignal); server::Metrics::GetInstance().Init(); server::SystemInfo::GetInstance().Init(); - engine::SchedServInit(); + std::cout << "Milvus server start successfully." << std::endl; StartService(); @@ -221,12 +222,16 @@ Server::LoadConfig() { void Server::StartService() { + engine::StartSchedulerService(); + DBWrapper::GetInstance().StartService(); grpc::GrpcMilvusServer::StartService(); } void Server::StopService() { grpc::GrpcMilvusServer::StopService(); + DBWrapper::GetInstance().StopService(); + engine::StopSchedulerService(); } } diff --git a/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp b/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp index baf9116169b08f3445ad2b93019d0dea27b60b4b..737f3dab95d72ce5340f2446c4c26044b8990dfe 100644 --- a/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp +++ b/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp @@ -49,8 +49,6 @@ GrpcMilvusServer::StartService() { faiss::distance_compute_blas_threshold = engine_config.GetInt32Value(CONFIG_DCBT, 20); - DBWrapper::DB();//initialize db - std::string server_address(address + ":" + std::to_string(port)); ::grpc::ServerBuilder builder; diff --git a/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp index 637b4c3b34154fd9c7307c2cf5653ad188f97be2..6f1a42b641b480c684816f5893390efa11de85eb 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp @@ -66,16 +66,18 @@ GrpcBaseTask::~GrpcBaseTask() { WaitToFinish(); } -ServerError -GrpcBaseTask::Execute() { +ServerError GrpcBaseTask::Execute() { error_code_ = OnExecute(); + Done(); + return error_code_; +} + +void GrpcBaseTask::Done() { done_ = true; finish_cond_.notify_all(); - return error_code_; } -ServerError -GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) { +ServerError GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) { error_code_ = error_code; error_msg_ = error_msg; @@ -83,8 +85,7 @@ GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) { return error_code_; } -ServerError -GrpcBaseTask::WaitToFinish() { +ServerError GrpcBaseTask::WaitToFinish() { std::unique_lock lock(finish_mtx_); finish_cond_.wait(lock, [this] { return done_; }); @@ -101,8 +102,7 @@ GrpcRequestScheduler::~GrpcRequestScheduler() { Stop(); } -void -GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) { +void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) { if (task_ptr == nullptr) { return; } @@ -120,8 +120,7 @@ GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *gr } } -void -GrpcRequestScheduler::Start() { +void GrpcRequestScheduler::Start() { if (!stopped_) { return; } @@ -129,8 +128,7 @@ GrpcRequestScheduler::Start() { stopped_ = false; } -void -GrpcRequestScheduler::Stop() { +void GrpcRequestScheduler::Stop() { if (stopped_) { return; } @@ -155,8 +153,7 @@ GrpcRequestScheduler::Stop() { SERVER_LOG_INFO << "Scheduler stopped"; } -ServerError -GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { +ServerError GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { if (task_ptr == nullptr) { return SERVER_NULL_POINTER; } @@ -174,33 +171,31 @@ GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { return task_ptr->WaitToFinish();//sync execution } -namespace { - void TakeTaskToExecute(TaskQueuePtr task_queue) { - if (task_queue == nullptr) { - return; - } - while (true) { - BaseTaskPtr task = task_queue->Take(); - if (task == nullptr) { - SERVER_LOG_ERROR << "Take null from task queue, stop thread"; - break;//stop the thread - } +void GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) { + if (task_queue == nullptr) { + return; + } + + while (true) { + BaseTaskPtr task = task_queue->Take(); + if (task == nullptr) { + SERVER_LOG_ERROR << "Take null from task queue, stop thread"; + break;//stop the thread + } - try { - ServerError err = task->Execute(); - if (err != SERVER_SUCCESS) { - SERVER_LOG_ERROR << "Task failed with code: " << err; - } - } catch (std::exception &ex) { - SERVER_LOG_ERROR << "Task failed to execute: " << ex.what(); + try { + ServerError err = task->Execute(); + if (err != SERVER_SUCCESS) { + SERVER_LOG_ERROR << "Task failed with code: " << err; } + } catch (std::exception &ex) { + SERVER_LOG_ERROR << "Task failed to execute: " << ex.what(); } } } -ServerError -GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { +ServerError GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { std::lock_guard lock(queue_mtx_); std::string group_name = task_ptr->TaskGroup(); @@ -212,7 +207,7 @@ GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { task_groups_.insert(std::make_pair(group_name, queue)); //start a thread - ThreadPtr thread = std::make_shared(&TakeTaskToExecute, queue); + ThreadPtr thread = std::make_shared(&GrpcRequestScheduler::TakeTaskToExecute, this, queue); execute_threads_.push_back(thread); SERVER_LOG_INFO << "Create new thread for task group: " << group_name; } diff --git a/cpp/src/server/grpc_impl/GrpcRequestScheduler.h b/cpp/src/server/grpc_impl/GrpcRequestScheduler.h index a436e8dec6ceb36bd222b52aeeab63f1d3bae5f2..96be98836c6087fa2aaa64f671b1a7dc80b1efb5 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestScheduler.h +++ b/cpp/src/server/grpc_impl/GrpcRequestScheduler.h @@ -25,30 +25,24 @@ protected: virtual ~GrpcBaseTask(); public: - ServerError - Execute(); + ServerError Execute(); - ServerError - WaitToFinish(); + void Done(); - std::string - TaskGroup() const { return task_group_; } + ServerError WaitToFinish(); - ServerError - ErrorCode() const { return error_code_; } + std::string TaskGroup() const { return task_group_; } - std::string - ErrorMsg() const { return error_msg_; } + ServerError ErrorCode() const { return error_code_; } - bool - IsAsync() const { return async_; } + std::string ErrorMsg() const { return error_msg_; } + + bool IsAsync() const { return async_; } protected: - virtual ServerError - OnExecute() = 0; + virtual ServerError OnExecute() = 0; - ServerError - SetError(ServerError error_code, const std::string &msg); + ServerError SetError(ServerError error_code, const std::string &msg); protected: mutable std::mutex finish_mtx_; @@ -77,19 +71,18 @@ public: void Stop(); - ServerError - ExecuteTask(const BaseTaskPtr &task_ptr); + ServerError ExecuteTask(const BaseTaskPtr &task_ptr); - static void - ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status); + static void ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status); protected: GrpcRequestScheduler(); virtual ~GrpcRequestScheduler(); - ServerError - PutTaskToQueue(const BaseTaskPtr &task_ptr); + void TakeTaskToExecute(TaskQueuePtr task_queue); + + ServerError PutTaskToQueue(const BaseTaskPtr &task_ptr); private: mutable std::mutex queue_mtx_; diff --git a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp index 12390256af203746172a75b4232138012a5579cd..1b9bc935fae454cc7c663c8366049f12ce619a18 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp @@ -457,21 +457,17 @@ InsertTask::OnExecute() { } } + //step 3: check table flag //all user provide id, or all internal id - uint64_t row_count = 0; - DBWrapper::DB()->GetTableRowCount(table_info.table_id_, row_count); - bool empty_table = (row_count == 0); bool user_provide_ids = !insert_param_->row_id_array().empty(); - if(!empty_table) { - //user already provided id before, all insert action require user id - if(engine::utils::UserDefinedId(table_info.flag_) && !user_provide_ids) { - return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are user defined, please provide id for this batch"); - } + //user already provided id before, all insert action require user id + if((table_info.flag_ & engine::meta::FLAG_MASK_HAS_USERID) && !user_provide_ids) { + return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are user defined, please provide id for this batch"); + } - //user didn't provided id before, no need to provide user id - if(!engine::utils::UserDefinedId(table_info.flag_) && user_provide_ids) { - return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are auto generated, no need to provide id for this batch"); - } + //user didn't provided id before, no need to provide user id + if((table_info.flag_ & engine::meta::FLAG_MASK_NO_USERID) && user_provide_ids) { + return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are auto generated, no need to provide id for this batch"); } rc.RecordSection("check validation"); @@ -482,7 +478,7 @@ InsertTask::OnExecute() { ProfilerStart(fname.c_str()); #endif - //step 3: prepare float data + //step 4: prepare float data std::vector vec_f(insert_param_->row_record_array_size() * table_info.dimension_, 0); // TODO: change to one dimension array in protobuf or use multiple-thread to copy the data @@ -505,7 +501,7 @@ InsertTask::OnExecute() { rc.ElapseFromBegin("prepare vectors data"); - //step 4: insert vectors + //step 5: insert vectors auto vec_count = (uint64_t) insert_param_->row_record_array_size(); std::vector vec_ids(insert_param_->row_id_array_size(), 0); if(!insert_param_->row_id_array().empty()) { @@ -530,11 +526,10 @@ InsertTask::OnExecute() { return SetError(SERVER_ILLEGAL_VECTOR_ID, msg); } - //step 5: update table flag - if(empty_table && user_provide_ids) { - stat = DBWrapper::DB()->UpdateTableFlag(insert_param_->table_name(), - table_info.flag_ | engine::meta::FLAG_MASK_USERID); - } + //step 6: update table flag + user_provide_ids ? table_info.flag_ |= engine::meta::FLAG_MASK_HAS_USERID + : table_info.flag_ |= engine::meta::FLAG_MASK_NO_USERID; + stat = DBWrapper::DB()->UpdateTableFlag(insert_param_->table_name(), table_info.flag_); #ifdef MILVUS_ENABLE_PROFILING ProfilerStop();