未验证 提交 cfe8b0a3 编写于 作者: G groot 提交者: GitHub

stop build index thread when server is shutdown (#3982)

* stop build index thread when server is shutdown
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* typo
Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 edb0e840
......@@ -58,17 +58,13 @@ constexpr uint64_t WAIT_BUILD_INDEX_INTERVAL = 5;
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
} // namespace
#define CHECK_INITIALIZED \
if (!initialized_.load(std::memory_order_acquire)) { \
return SHUTDOWN_ERROR; \
#define CHECK_AVAILABLE \
if (!ServiceAvailable()) { \
return SHUTDOWN_ERROR; \
}
DBImpl::DBImpl(const DBOptions& options)
: options_(options),
initialized_(false),
merge_thread_pool_(1, 1),
index_thread_pool_(1, 1),
index_task_tracker_(3) {
: options_(options), available_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1), index_task_tracker_(3) {
mem_mgr_ = MemManagerFactory::Build(options_);
merge_mgr_ptr_ = MergeManagerFactory::SSBuild(options_);
......@@ -84,19 +80,29 @@ DBImpl::~DBImpl() {
DBImpl::Stop();
}
bool
DBImpl::ServiceAvailable() {
return available_.load(std::memory_order_acquire);
}
void
DBImpl::SetAvailable(bool available) {
available_.store(available, std::memory_order_release);
}
////////////////////////////////////////////////////////////////////////////////
// External APIs
////////////////////////////////////////////////////////////////////////////////
Status
DBImpl::Start() {
if (initialized_.load(std::memory_order_acquire)) {
if (ServiceAvailable()) {
return Status::OK();
}
knowhere::enable_faiss_logging();
// LOG_ENGINE_TRACE_ << "DB service start";
initialized_.store(true, std::memory_order_release);
SetAvailable(true);
// TODO: merge files
......@@ -123,11 +129,11 @@ DBImpl::Start() {
Status
DBImpl::Stop() {
if (!initialized_.load(std::memory_order_acquire)) {
if (!ServiceAvailable()) {
return Status::OK();
}
initialized_.store(false, std::memory_order_release);
SetAvailable(false);
if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
// flush all without merge
......@@ -136,11 +142,14 @@ DBImpl::Stop() {
// wait flush thread finish
swn_flush_.Notify();
bg_flush_thread_.join();
LOG_ENGINE_DEBUG_ << "DBImpl::Stop bg_flush_thread_.join()";
WaitMergeFileFinish();
LOG_ENGINE_DEBUG_ << "DBImpl::Stop WaitMergeFileFinish";
swn_index_.Notify();
index_req_swn_.Notify();
bg_index_thread_.join();
LOG_ENGINE_DEBUG_ << "DBImpl::Stop bg_index_thread_.join()";
}
// wait metric thread exit
......@@ -155,7 +164,7 @@ DBImpl::Stop() {
Status
DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
auto ctx = context;
......@@ -193,7 +202,7 @@ DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
Status
DBImpl::DropCollection(const std::string& collection_name) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
LOG_ENGINE_DEBUG_ << "Prepare to drop collection " << collection_name;
......@@ -215,7 +224,7 @@ DBImpl::DropCollection(const std::string& collection_name) {
Status
DBImpl::HasCollection(const std::string& collection_name, bool& has_or_not) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
......@@ -226,7 +235,7 @@ DBImpl::HasCollection(const std::string& collection_name, bool& has_or_not) {
Status
DBImpl::ListCollections(std::vector<std::string>& names) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
names.clear();
return snapshot::Snapshots::GetInstance().GetCollectionNames(names);
......@@ -235,7 +244,7 @@ DBImpl::ListCollections(std::vector<std::string>& names) {
Status
DBImpl::GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection,
snapshot::FieldElementMappings& fields_schema) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
......@@ -250,7 +259,7 @@ DBImpl::GetCollectionInfo(const std::string& collection_name, snapshot::Collecti
Status
DBImpl::GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
STATUS_CHECK(GetSnapshotInfo(collection_name, collection_stats));
return Status::OK();
......@@ -258,7 +267,7 @@ DBImpl::GetCollectionStats(const std::string& collection_name, milvus::json& col
Status
DBImpl::CountEntities(const std::string& collection_name, int64_t& row_count) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
......@@ -269,7 +278,7 @@ DBImpl::CountEntities(const std::string& collection_name, int64_t& row_count) {
Status
DBImpl::CreatePartition(const std::string& collection_name, const std::string& partition_name) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
......@@ -288,7 +297,7 @@ DBImpl::CreatePartition(const std::string& collection_name, const std::string& p
Status
DBImpl::DropPartition(const std::string& collection_name, const std::string& partition_name) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
......@@ -310,7 +319,7 @@ DBImpl::DropPartition(const std::string& collection_name, const std::string& par
Status
DBImpl::HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(server::ValidatePartitionTags({partition_tag}));
......@@ -330,7 +339,7 @@ DBImpl::HasPartition(const std::string& collection_name, const std::string& part
Status
DBImpl::ListPartitions(const std::string& collection_name, std::vector<std::string>& partition_names) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
......@@ -342,7 +351,7 @@ DBImpl::ListPartitions(const std::string& collection_name, std::vector<std::stri
Status
DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::string& collection_name,
const std::string& field_name, const CollectionIndex& index) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
LOG_ENGINE_DEBUG_ << "Create index for collection: " << collection_name << " field: " << field_name;
......@@ -385,6 +394,12 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
// step 4: iterate segments need to be build index, wait until all segments are built
while (true) {
// server is going to shutdown, quit this thread
if (!ServiceAvailable()) {
LOG_ENGINE_DEBUG_ << "Build index stopped since DB service going to exit";
break;
}
// start background build index thread
std::vector<std::string> collection_names = {collection_name};
StartBuildIndexTask(collection_names, true);
......@@ -430,7 +445,7 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
Status
DBImpl::DropIndex(const std::string& collection_name, const std::string& field_name) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_name << " field: " << field_name;
......@@ -449,7 +464,7 @@ DBImpl::DropIndex(const std::string& collection_name, const std::string& field_n
Status
DBImpl::DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
LOG_ENGINE_DEBUG_ << "Describe index for collection: " << collection_name << " field: " << field_name;
......@@ -461,7 +476,7 @@ DBImpl::DescribeIndex(const std::string& collection_name, const std::string& fie
Status
DBImpl::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
idx_t op_id) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
if (data_chunk == nullptr) {
return Status(DB_ERROR, "Null pointer");
......@@ -590,7 +605,7 @@ Status
DBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
const std::vector<std::string>& field_names, std::vector<bool>& valid_row,
DataChunkPtr& data_chunk) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
......@@ -608,7 +623,7 @@ DBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_ar
Status
DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, idx_t op_id) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
......@@ -636,7 +651,7 @@ DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNum
Status
DBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
TimeRecorder rc("DBImpl::Query");
......@@ -705,7 +720,7 @@ DBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_pt
Status
DBImpl::ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
......@@ -736,7 +751,7 @@ DBImpl::ListIDInSegment(const std::string& collection_name, int64_t segment_id,
Status
DBImpl::LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
const std::vector<std::string>& field_names, bool force) {
CHECK_INITIALIZED;
CHECK_AVAILABLE
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
......@@ -750,9 +765,7 @@ DBImpl::LoadCollection(const server::ContextPtr& context, const std::string& col
Status
DBImpl::Flush(const std::string& collection_name) {
if (!initialized_.load(std::memory_order_acquire)) {
return SHUTDOWN_ERROR;
}
CHECK_AVAILABLE
Status status;
bool has_collection = false;
......@@ -774,9 +787,7 @@ DBImpl::Flush(const std::string& collection_name) {
Status
DBImpl::Flush() {
if (!initialized_.load(std::memory_order_acquire)) {
return SHUTDOWN_ERROR;
}
CHECK_AVAILABLE
LOG_ENGINE_DEBUG_ << "Begin flush all collections";
InternalFlush();
......@@ -787,9 +798,7 @@ DBImpl::Flush() {
Status
DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_name, double threshold) {
if (!initialized_.load(std::memory_order_acquire)) {
return SHUTDOWN_ERROR;
}
CHECK_AVAILABLE
LOG_ENGINE_DEBUG_ << "Before compacting, wait for build index thread to finish...";
const std::lock_guard<std::mutex> index_lock(build_index_mutex_);
......@@ -913,7 +922,7 @@ DBImpl::TimingFlushThread() {
SetThreadName("timing_flush");
server::SystemInfo::GetInstance().Init();
while (true) {
if (!initialized_.load(std::memory_order_acquire)) {
if (!ServiceAvailable()) {
LOG_ENGINE_DEBUG_ << "DB background flush thread exit";
break;
}
......@@ -963,7 +972,7 @@ DBImpl::TimingMetricThread() {
SetThreadName("timing_metric");
server::SystemInfo::GetInstance().Init();
while (true) {
if (!initialized_.load(std::memory_order_acquire)) {
if (!ServiceAvailable()) {
LOG_ENGINE_DEBUG_ << "DB background metric thread exit";
break;
}
......@@ -1027,9 +1036,14 @@ DBImpl::BackgroundBuildIndexTask(std::vector<std::string> collection_names, bool
}
// start build index job
LOG_ENGINE_DEBUG_ << "Create BuildIndexJob for " << segment_ids.size() << " segments of " << collection_name;
// build one segment for each time, for two reasons:
// 1. we don't need to wait all segments index finish when milvus server stop
// 2. avoid build index for deleted segments
snapshot::IDS_TYPE segment_to_build = {segment_ids[0]};
LOG_ENGINE_DEBUG_ << "Create BuildIndexJob for segment " << segment_to_build[0] << " of " << collection_name;
cache::CpuCacheMgr::GetInstance().PrintInfo(); // print cache info before build index
scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(latest_ss, options_, segment_ids);
scheduler::BuildIndexJobPtr job =
std::make_shared<scheduler::BuildIndexJob>(latest_ss, options_, segment_to_build);
IncreaseLiveBuildTaskNum();
scheduler::JobMgrInst::GetInstance()->Put(job);
......@@ -1046,8 +1060,14 @@ DBImpl::BackgroundBuildIndexTask(std::vector<std::string> collection_names, bool
LOG_ENGINE_ERROR_ << job->status().message();
}
// notify index request to return
// notify index request to return (if all segments index has been done)
index_req_swn_.Notify();
// quit this thread if the milvus server is going to shutdown
if (!ServiceAvailable()) {
LOG_ENGINE_DEBUG_ << "DB background build index thread exit";
break;
}
}
}
......@@ -1056,11 +1076,11 @@ DBImpl::TimingIndexThread() {
SetThreadName("timing_index");
server::SystemInfo::GetInstance().Init();
while (true) {
if (!initialized_.load(std::memory_order_acquire)) {
if (!ServiceAvailable()) {
WaitMergeFileFinish();
WaitBuildIndexFinish();
LOG_ENGINE_DEBUG_ << "DB background thread exit";
LOG_ENGINE_DEBUG_ << "DB background timing index thread exit";
break;
}
......@@ -1125,7 +1145,7 @@ DBImpl::BackgroundMerge(std::set<int64_t> collection_ids, bool force_merge_all)
<< " reason:" << status.message();
}
if (!initialized_.load(std::memory_order_acquire)) {
if (!ServiceAvailable()) {
LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection id: " << collection_id;
break;
}
......
......@@ -128,6 +128,12 @@ class DBImpl : public DB, public ConfigObserver {
IsBuildingIndex() override;
private:
bool
ServiceAvailable();
void
SetAvailable(bool available);
void
InternalFlush(const std::string& collection_name = "", bool merge = true);
......@@ -175,7 +181,7 @@ class DBImpl : public DB, public ConfigObserver {
private:
DBOptions options_;
std::atomic<bool> initialized_;
std::atomic<bool> available_;
MemManagerPtr mem_mgr_;
MergeManagerPtr merge_mgr_ptr_;
......
......@@ -283,10 +283,27 @@ FAIL:
void
Server::StopService() {
// Note: don't change the sequence of stop service if you dont have enough reason
// both the WebServer and GrpcServer have similar behavior:
// if you want to stop them, they will wait all the remote requests return.
// most of requests depend on DBImpl(which is wrappered by DBWrapper) interface.
// so we must firstly stop DBWrapper to let the DBImpl know the server is going to shutdown.
// the DBImpl then notify unfinished tasks and break working threads.
// once the DBImpl finish its work, the remote requests can return.
//
// A typical case is:
// insert millons of entities(assume there are lot of segments), then invoke create_index to build index
// in the command line, execute stop_server.sh to stop the milvus_server
// if we stop DBWrapper before GrpcServer, milvus_server will wait the current segment finish index, then exit
// but if we stop GrpcServer before DBWrapper, milvus_server will wait all segments finish index, then exit
//
// Note: if any request comning before GrpcServer::Stop() but DBWrapper has been stopped, the request will
// get error message "Milvus server is shutdown!"
// storage::S3ClientWrapper::GetInstance().StopService();
DBWrapper::GetInstance().StopService();
web::WebServer::GetInstance().Stop();
grpc::GrpcServer::GetInstance().Stop();
DBWrapper::GetInstance().StopService();
scheduler::StopSchedulerService();
engine::snapshot::Snapshots::GetInstance().StopService();
engine::KnowhereResource::Finalize();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册