提交 49c52830 编写于 作者: J jinhai

Merge branch 'branch-0.3.0' into 'branch-0.3.0'

MS-80 fix hang issue

See merge request megasearch/vecwise_engine!85

Former-commit-id: 583d6c0a0915be203fa26b235958f6c161075fd4
......@@ -5,6 +5,7 @@ Please mark all change in change log and use the ticket from JIRA.
# MegaSearch 0.3.0 (TBD)
## Bug
- MS-80 - Fix server hang issue
## Improvement
......
......@@ -13,12 +13,6 @@ namespace zilliz {
namespace vecwise {
namespace engine {
IndexLoaderQueue&
IndexLoaderQueue::GetInstance() {
static IndexLoaderQueue instance;
return instance;
}
void
IndexLoaderQueue::Put(const SearchContextPtr &search_context) {
std::unique_lock <std::mutex> lock(mtx);
......@@ -26,6 +20,7 @@ IndexLoaderQueue::Put(const SearchContextPtr &search_context) {
if(search_context == nullptr) {
queue_.push_back(nullptr);
empty_.notify_all();
return;
}
......
......@@ -26,18 +26,15 @@ public:
using IndexLoaderContextPtr = std::shared_ptr<IndexLoaderContext>;
class IndexLoaderQueue {
private:
public:
IndexLoaderQueue() : mtx(), full_(), empty_() {}
IndexLoaderQueue(const IndexLoaderQueue &rhs) = delete;
IndexLoaderQueue &operator=(const IndexLoaderQueue &rhs) = delete;
public:
using LoaderQueue = std::list<IndexLoaderContextPtr>;
static IndexLoaderQueue& GetInstance();
void Put(const SearchContextPtr &search_context);
IndexLoaderContextPtr Take();
......
......@@ -55,8 +55,7 @@ void CollectDurationMetrics(int index_type, double total_time) {
}
SearchScheduler::SearchScheduler()
: thread_pool_(2),
stopped_(true) {
: stopped_(true) {
Start();
}
......@@ -75,8 +74,13 @@ SearchScheduler::Start() {
return true;
}
thread_pool_.enqueue(&SearchScheduler::IndexLoadWorker, this);
thread_pool_.enqueue(&SearchScheduler::SearchWorker, this);
stopped_ = false;
search_queue_.SetCapacity(2);
index_load_thread_ = std::make_shared<std::thread>(&SearchScheduler::IndexLoadWorker, this);
search_thread_ = std::make_shared<std::thread>(&SearchScheduler::SearchWorker, this);
return true;
}
......@@ -86,29 +90,34 @@ SearchScheduler::Stop() {
return true;
}
IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance();
index_queue.Put(nullptr);
if(index_load_thread_) {
index_load_queue_.Put(nullptr);
index_load_thread_->join();
index_load_thread_ = nullptr;
}
if(search_thread_) {
search_queue_.Put(nullptr);
search_thread_->join();
search_thread_ = nullptr;
}
SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance();
search_queue.Put(nullptr);
stopped_ = true;
return true;
}
bool
SearchScheduler::ScheduleSearchTask(SearchContextPtr& search_context) {
IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance();
index_queue.Put(search_context);
index_load_queue_.Put(search_context);
return true;
}
bool
SearchScheduler::IndexLoadWorker() {
IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance();
SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance();
while(true) {
IndexLoaderContextPtr context = index_queue.Take();
IndexLoaderContextPtr context = index_load_queue_.Take();
if(context == nullptr) {
SERVER_LOG_INFO << "Stop thread for index loading";
break;//exit
......@@ -137,7 +146,7 @@ SearchScheduler::IndexLoadWorker() {
task_ptr->index_type_ = context->file_->file_type_;
task_ptr->index_engine_ = index_ptr;
task_ptr->search_contexts_.swap(context->search_contexts_);
search_queue.Put(task_ptr);
search_queue_.Put(task_ptr);
}
return true;
......@@ -145,9 +154,8 @@ SearchScheduler::IndexLoadWorker() {
bool
SearchScheduler::SearchWorker() {
SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance();
while(true) {
SearchTaskPtr task_ptr = search_queue.Take();
SearchTaskPtr task_ptr = search_queue_.Take();
if(task_ptr == nullptr) {
SERVER_LOG_INFO << "Stop thread for searching";
break;//exit
......
......@@ -6,7 +6,8 @@
#pragma once
#include "SearchContext.h"
#include "utils/ThreadPool.h"
#include "IndexLoaderQueue.h"
#include "SearchTaskQueue.h"
namespace zilliz {
namespace vecwise {
......@@ -30,7 +31,12 @@ private:
bool SearchWorker();
private:
server::ThreadPool thread_pool_;
std::shared_ptr<std::thread> index_load_thread_;
std::shared_ptr<std::thread> search_thread_;
IndexLoaderQueue index_load_queue_;
SearchTaskQueue search_queue_;
bool stopped_ = true;
};
......
......@@ -94,18 +94,6 @@ void CalcScore(uint64_t vector_count,
}
SearchTaskQueue::SearchTaskQueue() {
SetCapacity(4);
}
SearchTaskQueue&
SearchTaskQueue::GetInstance() {
static SearchTaskQueue instance;
return instance;
}
bool SearchTask::DoSearch() {
if(index_engine_ == nullptr) {
return false;
......
......@@ -27,21 +27,7 @@ public:
};
using SearchTaskPtr = std::shared_ptr<SearchTask>;
class SearchTaskQueue : public server::BlockingQueue<SearchTaskPtr> {
private:
SearchTaskQueue();
SearchTaskQueue(const SearchTaskQueue &rhs) = delete;
SearchTaskQueue &operator=(const SearchTaskQueue &rhs) = delete;
public:
static SearchTaskQueue& GetInstance();
private:
};
using SearchTaskQueue = server::BlockingQueue<SearchTaskPtr>;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册