diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index a8a5af195f32f1017e652a75a005cd0577baa1e5..85c488ebdee4ca6168cb309d4bc632cb2e8223c1 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -5,6 +5,7 @@ Please mark all change in change log and use the ticket from JIRA. # MegaSearch 0.3.0 (TBD) ## Bug +- MS-80 - Fix server hang issue ## Improvement - MS-82 - Update server startup welcome message diff --git a/cpp/src/db/scheduler/IndexLoaderQueue.cpp b/cpp/src/db/scheduler/IndexLoaderQueue.cpp index 7d7ae07374c3eb52216cde0e35e7e3f1aeeff536..2522815520ccc512f74d86118eccc54f7fe0da95 100644 --- a/cpp/src/db/scheduler/IndexLoaderQueue.cpp +++ b/cpp/src/db/scheduler/IndexLoaderQueue.cpp @@ -13,12 +13,6 @@ namespace zilliz { namespace milvus { namespace engine { -IndexLoaderQueue& -IndexLoaderQueue::GetInstance() { - static IndexLoaderQueue instance; - return instance; -} - void IndexLoaderQueue::Put(const SearchContextPtr &search_context) { std::unique_lock lock(mtx); @@ -26,6 +20,7 @@ IndexLoaderQueue::Put(const SearchContextPtr &search_context) { if(search_context == nullptr) { queue_.push_back(nullptr); + empty_.notify_all(); return; } diff --git a/cpp/src/db/scheduler/IndexLoaderQueue.h b/cpp/src/db/scheduler/IndexLoaderQueue.h index 150dde84841e212de69fd65f77b693098eec5dea..4f6dcfcd67c50bf4c2bdabc5de14925e5d4eb65e 100644 --- a/cpp/src/db/scheduler/IndexLoaderQueue.h +++ b/cpp/src/db/scheduler/IndexLoaderQueue.h @@ -26,18 +26,15 @@ public: using IndexLoaderContextPtr = std::shared_ptr; class IndexLoaderQueue { -private: +public: IndexLoaderQueue() : mtx(), full_(), empty_() {} IndexLoaderQueue(const IndexLoaderQueue &rhs) = delete; IndexLoaderQueue &operator=(const IndexLoaderQueue &rhs) = delete; -public: using LoaderQueue = std::list; - static IndexLoaderQueue& GetInstance(); - void Put(const SearchContextPtr &search_context); IndexLoaderContextPtr Take(); diff --git a/cpp/src/db/scheduler/SearchScheduler.cpp b/cpp/src/db/scheduler/SearchScheduler.cpp index 2235ee9df3bf08c7ecdf0c5542eedda6406df619..756abea4cdf9ef4e09956e301920c1839dd56f97 100644 --- a/cpp/src/db/scheduler/SearchScheduler.cpp +++ b/cpp/src/db/scheduler/SearchScheduler.cpp @@ -55,8 +55,7 @@ void CollectDurationMetrics(int index_type, double total_time) { } SearchScheduler::SearchScheduler() - : thread_pool_(2), - stopped_(true) { + : stopped_(true) { Start(); } @@ -75,8 +74,13 @@ SearchScheduler::Start() { return true; } - thread_pool_.enqueue(&SearchScheduler::IndexLoadWorker, this); - thread_pool_.enqueue(&SearchScheduler::SearchWorker, this); + stopped_ = false; + + search_queue_.SetCapacity(2); + + index_load_thread_ = std::make_shared(&SearchScheduler::IndexLoadWorker, this); + search_thread_ = std::make_shared(&SearchScheduler::SearchWorker, this); + return true; } @@ -86,29 +90,34 @@ SearchScheduler::Stop() { return true; } - IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance(); - index_queue.Put(nullptr); + if(index_load_thread_) { + index_load_queue_.Put(nullptr); + index_load_thread_->join(); + index_load_thread_ = nullptr; + } + + if(search_thread_) { + search_queue_.Put(nullptr); + search_thread_->join(); + search_thread_ = nullptr; + } - SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance(); - search_queue.Put(nullptr); + stopped_ = true; return true; } bool SearchScheduler::ScheduleSearchTask(SearchContextPtr& search_context) { - IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance(); - index_queue.Put(search_context); + index_load_queue_.Put(search_context); return true; } bool SearchScheduler::IndexLoadWorker() { - IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance(); - SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance(); while(true) { - IndexLoaderContextPtr context = index_queue.Take(); + IndexLoaderContextPtr context = index_load_queue_.Take(); if(context == nullptr) { SERVER_LOG_INFO << "Stop thread for index loading"; break;//exit @@ -137,7 +146,7 @@ SearchScheduler::IndexLoadWorker() { task_ptr->index_type_ = context->file_->file_type_; task_ptr->index_engine_ = index_ptr; task_ptr->search_contexts_.swap(context->search_contexts_); - search_queue.Put(task_ptr); + search_queue_.Put(task_ptr); } return true; @@ -145,9 +154,8 @@ SearchScheduler::IndexLoadWorker() { bool SearchScheduler::SearchWorker() { - SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance(); while(true) { - SearchTaskPtr task_ptr = search_queue.Take(); + SearchTaskPtr task_ptr = search_queue_.Take(); if(task_ptr == nullptr) { SERVER_LOG_INFO << "Stop thread for searching"; break;//exit diff --git a/cpp/src/db/scheduler/SearchScheduler.h b/cpp/src/db/scheduler/SearchScheduler.h index 8b797908ef1cfd200783af005e134cd9c904d5aa..6e84c43684c6c83913425b72ba726c1f1d74cbf0 100644 --- a/cpp/src/db/scheduler/SearchScheduler.h +++ b/cpp/src/db/scheduler/SearchScheduler.h @@ -6,7 +6,8 @@ #pragma once #include "SearchContext.h" -#include "utils/ThreadPool.h" +#include "IndexLoaderQueue.h" +#include "SearchTaskQueue.h" namespace zilliz { namespace milvus { @@ -30,7 +31,12 @@ private: bool SearchWorker(); private: - server::ThreadPool thread_pool_; + std::shared_ptr index_load_thread_; + std::shared_ptr search_thread_; + + IndexLoaderQueue index_load_queue_; + SearchTaskQueue search_queue_; + bool stopped_ = true; }; diff --git a/cpp/src/db/scheduler/SearchTaskQueue.cpp b/cpp/src/db/scheduler/SearchTaskQueue.cpp index f077001f901764bad2cbde967c5a46e50dedb94f..a0ed0834d03a6e5b7993d3dbfa4e43ee03ed3883 100644 --- a/cpp/src/db/scheduler/SearchTaskQueue.cpp +++ b/cpp/src/db/scheduler/SearchTaskQueue.cpp @@ -94,18 +94,6 @@ void CalcScore(uint64_t vector_count, } - -SearchTaskQueue::SearchTaskQueue() { - SetCapacity(4); -} - - -SearchTaskQueue& -SearchTaskQueue::GetInstance() { - static SearchTaskQueue instance; - return instance; -} - bool SearchTask::DoSearch() { if(index_engine_ == nullptr) { return false; diff --git a/cpp/src/db/scheduler/SearchTaskQueue.h b/cpp/src/db/scheduler/SearchTaskQueue.h index 7231870bd34a7d9d0eddbf3fd7da6c441fad2e41..e5841cd1d5a1a2f55872a10c2ee46c3758548805 100644 --- a/cpp/src/db/scheduler/SearchTaskQueue.h +++ b/cpp/src/db/scheduler/SearchTaskQueue.h @@ -27,21 +27,7 @@ public: }; using SearchTaskPtr = std::shared_ptr; - -class SearchTaskQueue : public server::BlockingQueue { -private: - SearchTaskQueue(); - - SearchTaskQueue(const SearchTaskQueue &rhs) = delete; - - SearchTaskQueue &operator=(const SearchTaskQueue &rhs) = delete; - -public: - static SearchTaskQueue& GetInstance(); - -private: - -}; +using SearchTaskQueue = server::BlockingQueue; }