提交 db1b37d4 编写于 作者: G groot

fix problems


Former-commit-id: c90a6834c4848fbaed1a1b0c753979f307db4d27
上级 1f906416
......@@ -10,6 +10,8 @@ Please mark all change in change log and use the ticket from JIRA.
## New Feature
- MS-57 - Implement index load/search pipeline
## Task
# MegaSearch 0.2.0 (2019-05-31)
......@@ -37,7 +39,6 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-37 - Add query, cache usage, disk write speed and file data size metrics
- MS-30 - Use faiss v1.5.2
- MS-54 - cmake: Change Thrift third party URL to github.com
- MS-57 - Implement index load/search pipeline
## Task
......
......@@ -8,7 +8,6 @@
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
#include "utils/Log.h"
#include "metrics/Metrics.h"
#include "scheduler/SearchScheduler.h"
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "IndexLoaderQueue.h"
#include "SearchContext.h"
namespace zilliz {
namespace vecwise {
namespace engine {
class IScheduleStrategy {
public:
virtual ~IScheduleStrategy() {}
virtual bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) = 0;
};
using ScheduleStrategyPtr = std::shared_ptr<IScheduleStrategy>;
}
}
}
\ No newline at end of file
......@@ -37,7 +37,7 @@ IndexLoaderQueue::Put(const SearchContextPtr &search_context) {
throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
}
ScheduleStrategyPtr strategy = CreateStrategy();
ScheduleStrategyPtr strategy = StrategyFactory::CreateMemStrategy();
strategy->Schedule(search_context, queue_);
empty_.notify_all();
......
......@@ -16,48 +16,47 @@ namespace engine {
class MemScheduleStrategy : public IScheduleStrategy {
public:
bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) override;
};
ScheduleStrategyPtr CreateStrategy() {
ScheduleStrategyPtr strategy(new MemScheduleStrategy());
return strategy;
}
bool MemScheduleStrategy::Schedule(const SearchContextPtr &search_context,
IndexLoaderQueue::LoaderQueue &loader_list) {
if(search_context == nullptr) {
return false;
}
bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) override {
if(search_context == nullptr) {
return false;
}
SearchContext::Id2IndexMap index_files = search_context->GetIndexMap();
//some index loader alread exists
for(auto iter = loader_list.begin(); iter != loader_list.end(); ++iter) {
if(index_files.find((*iter)->file_->id) != index_files.end()){
SERVER_LOG_INFO << "Append SearchContext to exist IndexLoaderContext";
index_files.erase((*iter)->file_->id);
(*iter)->search_contexts_.push_back(search_context);
SearchContext::Id2IndexMap index_files = search_context->GetIndexMap();
//some index loader alread exists
for(auto& loader : loader_list) {
if(index_files.find(loader->file_->id) != index_files.end()){
SERVER_LOG_INFO << "Append SearchContext to exist IndexLoaderContext";
index_files.erase(loader->file_->id);
loader->search_contexts_.push_back(search_context);
}
}
}
//index_files still contains some index files, create new loader
for(auto iter = index_files.begin(); iter != index_files.end(); ++iter) {
SERVER_LOG_INFO << "Create new IndexLoaderContext for: " << iter->second->location;
IndexLoaderContextPtr new_loader = std::make_shared<IndexLoaderContext>();
new_loader->search_contexts_.push_back(search_context);
new_loader->file_ = iter->second;
//index_files still contains some index files, create new loader
for(auto& pair : index_files) {
SERVER_LOG_INFO << "Create new IndexLoaderContext for: " << pair.second->location;
IndexLoaderContextPtr new_loader = std::make_shared<IndexLoaderContext>();
new_loader->search_contexts_.push_back(search_context);
new_loader->file_ = pair.second;
auto index = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(iter->second->location);
if(index != nullptr) {
//if the index file has been in memory, increase its priority
loader_list.push_front(new_loader);
} else {
//index file not in memory, put it to tail
loader_list.push_back(new_loader);
auto index = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(pair.second->location);
if(index != nullptr) {
//if the index file has been in memory, increase its priority
loader_list.push_front(new_loader);
} else {
//index file not in memory, put it to tail
loader_list.push_back(new_loader);
}
}
return true;
}
};
return true;
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ScheduleStrategyPtr StrategyFactory::CreateMemStrategy() {
ScheduleStrategyPtr strategy(new MemScheduleStrategy());
return strategy;
}
}
......
......@@ -5,24 +5,20 @@
******************************************************************************/
#pragma once
#include "IndexLoaderQueue.h"
#include "SearchContext.h"
#include "IScheduleStrategy.h"
namespace zilliz {
namespace vecwise {
namespace engine {
class IScheduleStrategy {
public:
virtual ~IScheduleStrategy() {}
class StrategyFactory {
private:
StrategyFactory() {}
virtual bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) = 0;
public:
static ScheduleStrategyPtr CreateMemStrategy();
};
using ScheduleStrategyPtr = std::shared_ptr<IScheduleStrategy>;
ScheduleStrategyPtr CreateStrategy();
}
}
}
......@@ -5,7 +5,7 @@
******************************************************************************/
#pragma once
#include "../MetaTypes.h"
#include "db/MetaTypes.h"
#include <unordered_map>
#include <vector>
......@@ -24,9 +24,9 @@ public:
bool AddIndexFile(TableFileSchemaPtr& index_file);
uint64_t Topk() const { return topk_; }
uint64_t Nq() const { return nq_; }
const float* Vectors() const { return vectors_; }
uint64_t topk() const { return topk_; }
uint64_t nq() const { return nq_; }
const float* vectors() const { return vectors_; }
using Id2IndexMap = std::unordered_map<size_t, TableFileSchemaPtr>;
const Id2IndexMap& GetIndexMap() const { return map_index_files_; }
......
......@@ -71,6 +71,7 @@ SearchScheduler::IndexLoadWorker() {
while(true) {
IndexLoaderContextPtr context = index_queue.Take();
if(context == nullptr) {
SERVER_LOG_INFO << "Stop thread for index loading";
break;//exit
}
......@@ -88,20 +89,25 @@ SearchScheduler::IndexLoadWorker() {
<< file_size/(1024*1024) << " M";
//metric
if(context->file_->file_type == meta::TableFileSchema::RAW) {
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
} else if(context->file_->file_type == meta::TableFileSchema::TO_INDEX) {
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
} else {
server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
switch(context->file_->file_type) {
case meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
break;
}
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
break;
}
default: {
server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
break;
}
}
//put search task to another queue
......@@ -122,6 +128,7 @@ SearchScheduler::SearchWorker() {
while(true) {
SearchTaskPtr task_ptr = search_queue.Take();
if(task_ptr == nullptr) {
SERVER_LOG_INFO << "Stop thread for searching";
break;//exit
}
......
......@@ -42,7 +42,7 @@ using SearchTaskPtr = std::shared_ptr<SearchTaskClass>;
class SearchTaskQueue : public server::BlockingQueue<SearchTaskPtr> {
private:
SearchTaskQueue() {}
SearchTaskQueue();
SearchTaskQueue(const SearchTaskQueue &rhs) = delete;
......
......@@ -58,6 +58,12 @@ void TopkResult(SearchContext::ResultSet &result_src,
}
}
SearchTaskQueue::SearchTaskQueue() {
SetCapacity(4);
}
SearchTaskQueue&
SearchTaskQueue::GetInstance() {
static SearchTaskQueue s_instance;
......@@ -75,21 +81,23 @@ bool SearchTask<trait>::DoSearch() {
std::vector<long> output_ids;
std::vector<float> output_distence;
for(auto& context : search_contexts_) {
auto inner_k = index_engine_->Count() < context->Topk() ? index_engine_->Count() : context->Topk();
output_ids.resize(inner_k*context->Nq());
output_distence.resize(inner_k*context->Nq());
auto inner_k = index_engine_->Count() < context->topk() ? index_engine_->Count() : context->topk();
output_ids.resize(inner_k*context->nq());
output_distence.resize(inner_k*context->nq());
try {
index_engine_->Search(context->Nq(), context->Vectors(), inner_k, output_distence.data(),
index_engine_->Search(context->nq(), context->vectors(), inner_k, output_distence.data(),
output_ids.data());
} catch (std::exception& ex) {
SERVER_LOG_ERROR << "SearchTask encounter exception: " << ex.what();
context->IndexSearchDone(index_id_);//mark as done avoid dead lock, even search failed
continue;
}
rc.Record("do search");
SearchContext::ResultSet result_set;
ClusterResult(output_ids, output_distence, context->Nq(), inner_k, result_set);
ClusterResult(output_ids, output_distence, context->nq(), inner_k, result_set);
rc.Record("cluster result");
TopkResult(result_set, inner_k, context->GetResult());
rc.Record("reduce topk");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册