diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 563164d458143a8950731e8d2f2da3fdbaa3e9f3..86a883f21ad8f34c71e8881b8a67c62e4c508b6f 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -5,6 +5,7 @@ Please mark all change in change log and use the ticket from JIRA. # MegaSearch 0.3.0 (TBD) ## Bug +- MS-80 - Fix server hang issue ## Improvement diff --git a/cpp/src/db/scheduler/IndexLoaderQueue.cpp b/cpp/src/db/scheduler/IndexLoaderQueue.cpp index 2840e6564e8d3aa83d601315e52d0a37d3010333..52d27831cb74e165c59978ee54799568a4fd3e12 100644 --- a/cpp/src/db/scheduler/IndexLoaderQueue.cpp +++ b/cpp/src/db/scheduler/IndexLoaderQueue.cpp @@ -13,12 +13,6 @@ namespace zilliz { namespace vecwise { namespace engine { -IndexLoaderQueue& -IndexLoaderQueue::GetInstance() { - static IndexLoaderQueue instance; - return instance; -} - void IndexLoaderQueue::Put(const SearchContextPtr &search_context) { std::unique_lock lock(mtx); @@ -26,6 +20,7 @@ IndexLoaderQueue::Put(const SearchContextPtr &search_context) { if(search_context == nullptr) { queue_.push_back(nullptr); + empty_.notify_all(); return; } diff --git a/cpp/src/db/scheduler/IndexLoaderQueue.h b/cpp/src/db/scheduler/IndexLoaderQueue.h index f0d71dcbd7d54d56daa0c0aac41264d78d92e663..3850a8de8b5614edd82998d21d69b90a8f34d43a 100644 --- a/cpp/src/db/scheduler/IndexLoaderQueue.h +++ b/cpp/src/db/scheduler/IndexLoaderQueue.h @@ -26,18 +26,15 @@ public: using IndexLoaderContextPtr = std::shared_ptr; class IndexLoaderQueue { -private: +public: IndexLoaderQueue() : mtx(), full_(), empty_() {} IndexLoaderQueue(const IndexLoaderQueue &rhs) = delete; IndexLoaderQueue &operator=(const IndexLoaderQueue &rhs) = delete; -public: using LoaderQueue = std::list; - static IndexLoaderQueue& GetInstance(); - void Put(const SearchContextPtr &search_context); IndexLoaderContextPtr Take(); diff --git a/cpp/src/db/scheduler/SearchScheduler.cpp b/cpp/src/db/scheduler/SearchScheduler.cpp index c18f95d04d218067c1b82a16081227dc42b06460..137cac30c7ab4d5188a7e57e3d74650478c2127b 100644 --- a/cpp/src/db/scheduler/SearchScheduler.cpp +++ b/cpp/src/db/scheduler/SearchScheduler.cpp @@ -55,8 +55,7 @@ void CollectDurationMetrics(int index_type, double total_time) { } SearchScheduler::SearchScheduler() - : thread_pool_(2), - stopped_(true) { + : stopped_(true) { Start(); } @@ -75,8 +74,13 @@ SearchScheduler::Start() { return true; } - thread_pool_.enqueue(&SearchScheduler::IndexLoadWorker, this); - thread_pool_.enqueue(&SearchScheduler::SearchWorker, this); + stopped_ = false; + + search_queue_.SetCapacity(2); + + index_load_thread_ = std::make_shared(&SearchScheduler::IndexLoadWorker, this); + search_thread_ = std::make_shared(&SearchScheduler::SearchWorker, this); + return true; } @@ -86,29 +90,34 @@ SearchScheduler::Stop() { return true; } - IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance(); - index_queue.Put(nullptr); + if(index_load_thread_) { + index_load_queue_.Put(nullptr); + index_load_thread_->join(); + index_load_thread_ = nullptr; + } + + if(search_thread_) { + search_queue_.Put(nullptr); + search_thread_->join(); + search_thread_ = nullptr; + } - SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance(); - search_queue.Put(nullptr); + stopped_ = true; return true; } bool SearchScheduler::ScheduleSearchTask(SearchContextPtr& search_context) { - IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance(); - index_queue.Put(search_context); + index_load_queue_.Put(search_context); return true; } bool SearchScheduler::IndexLoadWorker() { - IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance(); - SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance(); while(true) { - IndexLoaderContextPtr context = index_queue.Take(); + IndexLoaderContextPtr context = index_load_queue_.Take(); if(context == nullptr) { SERVER_LOG_INFO << "Stop thread for index loading"; break;//exit @@ -137,7 +146,7 @@ SearchScheduler::IndexLoadWorker() { task_ptr->index_type_ = context->file_->file_type_; task_ptr->index_engine_ = index_ptr; task_ptr->search_contexts_.swap(context->search_contexts_); - search_queue.Put(task_ptr); + search_queue_.Put(task_ptr); } return true; @@ -145,9 +154,8 @@ SearchScheduler::IndexLoadWorker() { bool SearchScheduler::SearchWorker() { - SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance(); while(true) { - SearchTaskPtr task_ptr = search_queue.Take(); + SearchTaskPtr task_ptr = search_queue_.Take(); if(task_ptr == nullptr) { SERVER_LOG_INFO << "Stop thread for searching"; break;//exit diff --git a/cpp/src/db/scheduler/SearchScheduler.h b/cpp/src/db/scheduler/SearchScheduler.h index 24c85395fcb1ef76f8aa0c37a33de6dec272b262..6a5b7b344fcd77467eb9a093b94085820238fc43 100644 --- a/cpp/src/db/scheduler/SearchScheduler.h +++ b/cpp/src/db/scheduler/SearchScheduler.h @@ -6,7 +6,8 @@ #pragma once #include "SearchContext.h" -#include "utils/ThreadPool.h" +#include "IndexLoaderQueue.h" +#include "SearchTaskQueue.h" namespace zilliz { namespace vecwise { @@ -30,7 +31,12 @@ private: bool SearchWorker(); private: - server::ThreadPool thread_pool_; + std::shared_ptr index_load_thread_; + std::shared_ptr search_thread_; + + IndexLoaderQueue index_load_queue_; + SearchTaskQueue search_queue_; + bool stopped_ = true; }; diff --git a/cpp/src/db/scheduler/SearchTaskQueue.cpp b/cpp/src/db/scheduler/SearchTaskQueue.cpp index 101dc818c2b520d01bdf038d03ca4041b8647d65..7b18f8cb6939d66dee7461049f53ddd26d45cda5 100644 --- a/cpp/src/db/scheduler/SearchTaskQueue.cpp +++ b/cpp/src/db/scheduler/SearchTaskQueue.cpp @@ -94,18 +94,6 @@ void CalcScore(uint64_t vector_count, } - -SearchTaskQueue::SearchTaskQueue() { - SetCapacity(4); -} - - -SearchTaskQueue& -SearchTaskQueue::GetInstance() { - static SearchTaskQueue instance; - return instance; -} - bool SearchTask::DoSearch() { if(index_engine_ == nullptr) { return false; diff --git a/cpp/src/db/scheduler/SearchTaskQueue.h b/cpp/src/db/scheduler/SearchTaskQueue.h index bd8e9d7f245859d16d9720ce92ed29b772f48349..ef0f77ef9d9e8b71a3d2890a8c709166a4cb5886 100644 --- a/cpp/src/db/scheduler/SearchTaskQueue.h +++ b/cpp/src/db/scheduler/SearchTaskQueue.h @@ -27,21 +27,7 @@ public: }; using SearchTaskPtr = std::shared_ptr; - -class SearchTaskQueue : public server::BlockingQueue { -private: - SearchTaskQueue(); - - SearchTaskQueue(const SearchTaskQueue &rhs) = delete; - - SearchTaskQueue &operator=(const SearchTaskQueue &rhs) = delete; - -public: - static SearchTaskQueue& GetInstance(); - -private: - -}; +using SearchTaskQueue = server::BlockingQueue; }