提交 b6e51b26 编写于 作者: J jinhai

Merge remote-tracking branch 'upstream/branch-0.3.0' into branch-0.3.0


Former-commit-id: f431335445d2c54f939dfd84a8dfdf0e4d2df4a2
...@@ -5,6 +5,7 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -5,6 +5,7 @@ Please mark all change in change log and use the ticket from JIRA.
# MegaSearch 0.3.0 (TBD) # MegaSearch 0.3.0 (TBD)
## Bug ## Bug
- MS-80 - Fix server hang issue
## Improvement ## Improvement
- MS-82 - Update server startup welcome message - MS-82 - Update server startup welcome message
......
...@@ -13,12 +13,6 @@ namespace zilliz { ...@@ -13,12 +13,6 @@ namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
IndexLoaderQueue&
IndexLoaderQueue::GetInstance() {
static IndexLoaderQueue instance;
return instance;
}
void void
IndexLoaderQueue::Put(const SearchContextPtr &search_context) { IndexLoaderQueue::Put(const SearchContextPtr &search_context) {
std::unique_lock <std::mutex> lock(mtx); std::unique_lock <std::mutex> lock(mtx);
...@@ -26,6 +20,7 @@ IndexLoaderQueue::Put(const SearchContextPtr &search_context) { ...@@ -26,6 +20,7 @@ IndexLoaderQueue::Put(const SearchContextPtr &search_context) {
if(search_context == nullptr) { if(search_context == nullptr) {
queue_.push_back(nullptr); queue_.push_back(nullptr);
empty_.notify_all();
return; return;
} }
......
...@@ -26,18 +26,15 @@ public: ...@@ -26,18 +26,15 @@ public:
using IndexLoaderContextPtr = std::shared_ptr<IndexLoaderContext>; using IndexLoaderContextPtr = std::shared_ptr<IndexLoaderContext>;
class IndexLoaderQueue { class IndexLoaderQueue {
private: public:
IndexLoaderQueue() : mtx(), full_(), empty_() {} IndexLoaderQueue() : mtx(), full_(), empty_() {}
IndexLoaderQueue(const IndexLoaderQueue &rhs) = delete; IndexLoaderQueue(const IndexLoaderQueue &rhs) = delete;
IndexLoaderQueue &operator=(const IndexLoaderQueue &rhs) = delete; IndexLoaderQueue &operator=(const IndexLoaderQueue &rhs) = delete;
public:
using LoaderQueue = std::list<IndexLoaderContextPtr>; using LoaderQueue = std::list<IndexLoaderContextPtr>;
static IndexLoaderQueue& GetInstance();
void Put(const SearchContextPtr &search_context); void Put(const SearchContextPtr &search_context);
IndexLoaderContextPtr Take(); IndexLoaderContextPtr Take();
......
...@@ -55,8 +55,7 @@ void CollectDurationMetrics(int index_type, double total_time) { ...@@ -55,8 +55,7 @@ void CollectDurationMetrics(int index_type, double total_time) {
} }
SearchScheduler::SearchScheduler() SearchScheduler::SearchScheduler()
: thread_pool_(2), : stopped_(true) {
stopped_(true) {
Start(); Start();
} }
...@@ -75,8 +74,13 @@ SearchScheduler::Start() { ...@@ -75,8 +74,13 @@ SearchScheduler::Start() {
return true; return true;
} }
thread_pool_.enqueue(&SearchScheduler::IndexLoadWorker, this); stopped_ = false;
thread_pool_.enqueue(&SearchScheduler::SearchWorker, this);
search_queue_.SetCapacity(2);
index_load_thread_ = std::make_shared<std::thread>(&SearchScheduler::IndexLoadWorker, this);
search_thread_ = std::make_shared<std::thread>(&SearchScheduler::SearchWorker, this);
return true; return true;
} }
...@@ -86,29 +90,34 @@ SearchScheduler::Stop() { ...@@ -86,29 +90,34 @@ SearchScheduler::Stop() {
return true; return true;
} }
IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance(); if(index_load_thread_) {
index_queue.Put(nullptr); index_load_queue_.Put(nullptr);
index_load_thread_->join();
index_load_thread_ = nullptr;
}
if(search_thread_) {
search_queue_.Put(nullptr);
search_thread_->join();
search_thread_ = nullptr;
}
SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance(); stopped_ = true;
search_queue.Put(nullptr);
return true; return true;
} }
bool bool
SearchScheduler::ScheduleSearchTask(SearchContextPtr& search_context) { SearchScheduler::ScheduleSearchTask(SearchContextPtr& search_context) {
IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance(); index_load_queue_.Put(search_context);
index_queue.Put(search_context);
return true; return true;
} }
bool bool
SearchScheduler::IndexLoadWorker() { SearchScheduler::IndexLoadWorker() {
IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance();
SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance();
while(true) { while(true) {
IndexLoaderContextPtr context = index_queue.Take(); IndexLoaderContextPtr context = index_load_queue_.Take();
if(context == nullptr) { if(context == nullptr) {
SERVER_LOG_INFO << "Stop thread for index loading"; SERVER_LOG_INFO << "Stop thread for index loading";
break;//exit break;//exit
...@@ -137,7 +146,7 @@ SearchScheduler::IndexLoadWorker() { ...@@ -137,7 +146,7 @@ SearchScheduler::IndexLoadWorker() {
task_ptr->index_type_ = context->file_->file_type_; task_ptr->index_type_ = context->file_->file_type_;
task_ptr->index_engine_ = index_ptr; task_ptr->index_engine_ = index_ptr;
task_ptr->search_contexts_.swap(context->search_contexts_); task_ptr->search_contexts_.swap(context->search_contexts_);
search_queue.Put(task_ptr); search_queue_.Put(task_ptr);
} }
return true; return true;
...@@ -145,9 +154,8 @@ SearchScheduler::IndexLoadWorker() { ...@@ -145,9 +154,8 @@ SearchScheduler::IndexLoadWorker() {
bool bool
SearchScheduler::SearchWorker() { SearchScheduler::SearchWorker() {
SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance();
while(true) { while(true) {
SearchTaskPtr task_ptr = search_queue.Take(); SearchTaskPtr task_ptr = search_queue_.Take();
if(task_ptr == nullptr) { if(task_ptr == nullptr) {
SERVER_LOG_INFO << "Stop thread for searching"; SERVER_LOG_INFO << "Stop thread for searching";
break;//exit break;//exit
......
...@@ -6,7 +6,8 @@ ...@@ -6,7 +6,8 @@
#pragma once #pragma once
#include "SearchContext.h" #include "SearchContext.h"
#include "utils/ThreadPool.h" #include "IndexLoaderQueue.h"
#include "SearchTaskQueue.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -30,7 +31,12 @@ private: ...@@ -30,7 +31,12 @@ private:
bool SearchWorker(); bool SearchWorker();
private: private:
server::ThreadPool thread_pool_; std::shared_ptr<std::thread> index_load_thread_;
std::shared_ptr<std::thread> search_thread_;
IndexLoaderQueue index_load_queue_;
SearchTaskQueue search_queue_;
bool stopped_ = true; bool stopped_ = true;
}; };
......
...@@ -94,18 +94,6 @@ void CalcScore(uint64_t vector_count, ...@@ -94,18 +94,6 @@ void CalcScore(uint64_t vector_count,
} }
SearchTaskQueue::SearchTaskQueue() {
SetCapacity(4);
}
SearchTaskQueue&
SearchTaskQueue::GetInstance() {
static SearchTaskQueue instance;
return instance;
}
bool SearchTask::DoSearch() { bool SearchTask::DoSearch() {
if(index_engine_ == nullptr) { if(index_engine_ == nullptr) {
return false; return false;
......
...@@ -27,21 +27,7 @@ public: ...@@ -27,21 +27,7 @@ public:
}; };
using SearchTaskPtr = std::shared_ptr<SearchTask>; using SearchTaskPtr = std::shared_ptr<SearchTask>;
using SearchTaskQueue = server::BlockingQueue<SearchTaskPtr>;
class SearchTaskQueue : public server::BlockingQueue<SearchTaskPtr> {
private:
SearchTaskQueue();
SearchTaskQueue(const SearchTaskQueue &rhs) = delete;
SearchTaskQueue &operator=(const SearchTaskQueue &rhs) = delete;
public:
static SearchTaskQueue& GetInstance();
private:
};
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册