提交 bf243965 编写于 作者: W wxyu

SQ8H in GPU


Former-commit-id: 0075a759b0e2368f63c0ef9b75372f2afceafc34
上级 fcff83e0
...@@ -60,6 +60,7 @@ class IVFSQHybrid : public GPUIVFSQ { ...@@ -60,6 +60,7 @@ class IVFSQHybrid : public GPUIVFSQ {
void void
UnsetQuantizer(); UnsetQuantizer();
// todo(xiaojun): return void => VecIndex
void void
LoadData(const knowhere::QuantizerPtr& q, const Config& conf); LoadData(const knowhere::QuantizerPtr& q, const Config& conf);
......
...@@ -65,7 +65,7 @@ class ExecutionEngine { ...@@ -65,7 +65,7 @@ class ExecutionEngine {
Load(bool to_cache = true) = 0; Load(bool to_cache = true) = 0;
virtual Status virtual Status
CopyToGpu(uint64_t device_id) = 0; CopyToGpu(uint64_t device_id, bool hybrid) = 0;
virtual Status virtual Status
CopyToIndexFileToGpu(uint64_t device_id) = 0; CopyToIndexFileToGpu(uint64_t device_id) = 0;
...@@ -80,7 +80,7 @@ class ExecutionEngine { ...@@ -80,7 +80,7 @@ class ExecutionEngine {
Merge(const std::string& location) = 0; Merge(const std::string& location) = 0;
virtual Status virtual Status
Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels) const = 0; Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels, bool hybrid) const = 0;
virtual std::shared_ptr<ExecutionEngine> virtual std::shared_ptr<ExecutionEngine>
BuildIndex(const std::string& location, EngineType engine_type) = 0; BuildIndex(const std::string& location, EngineType engine_type) = 0;
......
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
#include <stdexcept> #include <stdexcept>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include <src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h>
namespace milvus { namespace milvus {
namespace engine { namespace engine {
...@@ -245,7 +246,39 @@ ExecutionEngineImpl::Load(bool to_cache) { ...@@ -245,7 +246,39 @@ ExecutionEngineImpl::Load(bool to_cache) {
} }
Status Status
ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) {
if (hybrid) {
auto key = location_ + ".quantizer";
auto quantizer =
std::static_pointer_cast<CachedQuantizer>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(key));
auto conf = std::make_shared<knowhere::QuantizerCfg>();
conf->gpu_id = device_id;
if (quantizer) {
// cache hit
conf->mode = 2;
index_->SetQuantizer(quantizer->Data());
index_->LoadData(quantizer->Data(), conf);
} else {
// cache miss
if (index_ == nullptr) {
ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to gpu";
return Status(DB_ERROR, "index is null");
}
conf->mode = 1;
auto q = index_->LoadQuantizer(conf);
index_->SetQuantizer(q);
conf->mode = 2;
index_->LoadData(q, conf);
// cache
auto cached_quantizer = std::make_shared<CachedQuantizer>(q);
cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(key, cached_quantizer);
}
return Status::OK();
}
auto index = std::static_pointer_cast<VecIndex>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_)); auto index = std::static_pointer_cast<VecIndex>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_));
bool already_in_cache = (index != nullptr); bool already_in_cache = (index != nullptr);
if (already_in_cache) { if (already_in_cache) {
...@@ -390,7 +423,7 @@ ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_t ...@@ -390,7 +423,7 @@ ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_t
Status Status
ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances,
int64_t* labels) const { int64_t* labels, bool hybrid) const {
if (index_ == nullptr) { if (index_ == nullptr) {
ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search"; ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search";
return Status(DB_ERROR, "index is null"); return Status(DB_ERROR, "index is null");
...@@ -406,7 +439,9 @@ ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t npr ...@@ -406,7 +439,9 @@ ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t npr
auto adapter = AdapterMgr::GetInstance().GetAdapter(index_->GetType()); auto adapter = AdapterMgr::GetInstance().GetAdapter(index_->GetType());
auto conf = adapter->MatchSearch(temp_conf, index_->GetType()); auto conf = adapter->MatchSearch(temp_conf, index_->GetType());
HybridLoad(); if (hybrid) {
HybridLoad();
}
auto status = index_->Search(n, data, distances, labels, conf); auto status = index_->Search(n, data, distances, labels, conf);
......
...@@ -56,7 +56,7 @@ class ExecutionEngineImpl : public ExecutionEngine { ...@@ -56,7 +56,7 @@ class ExecutionEngineImpl : public ExecutionEngine {
Load(bool to_cache) override; Load(bool to_cache) override;
Status Status
CopyToGpu(uint64_t device_id) override; CopyToGpu(uint64_t device_id, bool hybrid = false) override;
Status Status
CopyToIndexFileToGpu(uint64_t device_id) override; CopyToIndexFileToGpu(uint64_t device_id) override;
...@@ -71,7 +71,13 @@ class ExecutionEngineImpl : public ExecutionEngine { ...@@ -71,7 +71,13 @@ class ExecutionEngineImpl : public ExecutionEngine {
Merge(const std::string& location) override; Merge(const std::string& location) override;
Status Status
Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels) const override; Search(int64_t n,
const float* data,
int64_t k,
int64_t nprobe,
float* distances,
int64_t* labels,
bool hybrid = false) const override;
ExecutionEnginePtr ExecutionEnginePtr
BuildIndex(const std::string& location, EngineType engine_type) override; BuildIndex(const std::string& location, EngineType engine_type) override;
......
...@@ -20,8 +20,10 @@ ...@@ -20,8 +20,10 @@
#include "TaskCreator.h" #include "TaskCreator.h"
#include "optimizer/Optimizer.h" #include "optimizer/Optimizer.h"
#include "task/Task.h" #include "task/Task.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "scheduler/optimizer/Optimizer.h"
#include "scheduler/Algorithm.h"
#include <src/scheduler/optimizer/Optimizer.h>
#include <utility> #include <utility>
namespace milvus { namespace milvus {
...@@ -60,7 +62,9 @@ void ...@@ -60,7 +62,9 @@ void
JobMgr::worker_function() { JobMgr::worker_function() {
while (running_) { while (running_) {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return !queue_.empty(); }); cv_.wait(lock, [this] {
return !queue_.empty();
});
auto job = queue_.front(); auto job = queue_.front();
queue_.pop(); queue_.pop();
lock.unlock(); lock.unlock();
...@@ -73,6 +77,10 @@ JobMgr::worker_function() { ...@@ -73,6 +77,10 @@ JobMgr::worker_function() {
OptimizerInst::GetInstance()->Run(task); OptimizerInst::GetInstance()->Run(task);
} }
for (auto& task: tasks) {
calculate_path(task);
}
// disk resources NEVER be empty. // disk resources NEVER be empty.
if (auto disk = res_mgr_->GetDiskResources()[0].lock()) { if (auto disk = res_mgr_->GetDiskResources()[0].lock()) {
for (auto& task : tasks) { for (auto& task : tasks) {
...@@ -87,5 +95,23 @@ JobMgr::build_task(const JobPtr& job) { ...@@ -87,5 +95,23 @@ JobMgr::build_task(const JobPtr& job) {
return TaskCreator::Create(job); return TaskCreator::Create(job);
} }
void
JobMgr::calculate_path(const TaskPtr& task) {
if (task->type_ != TaskType::SearchTask) {
return;
}
if (task->label()->Type() != TaskLabelType::SPECIFIED_RESOURCE) {
return;
}
std::vector<std::string> path;
auto spec_label = std::static_pointer_cast<SpecResLabel>(task->label());
auto src = res_mgr_->GetDiskResources()[0];
auto dest = spec_label->resource();
ShortestPath(src.lock(), dest.lock(), res_mgr_, path);
task->path() = Path(path, path.size() - 1);
}
} // namespace scheduler } // namespace scheduler
} // namespace milvus } // namespace milvus
...@@ -52,9 +52,12 @@ class JobMgr { ...@@ -52,9 +52,12 @@ class JobMgr {
void void
worker_function(); worker_function();
std::vector<TaskPtr> static std::vector<TaskPtr>
build_task(const JobPtr& job); build_task(const JobPtr& job);
void
calculate_path(const TaskPtr& task);
private: private:
bool running_ = false; bool running_ = false;
std::queue<JobPtr> queue_; std::queue<JobPtr> queue_;
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "ResourceMgr.h" #include "ResourceMgr.h"
#include "Scheduler.h" #include "Scheduler.h"
#include "optimizer/HybridPass.h" #include "optimizer/HybridPass.h"
#include "optimizer/LargeSQ8HPass.h"
#include "optimizer/Optimizer.h" #include "optimizer/Optimizer.h"
#include <memory> #include <memory>
...@@ -91,9 +92,9 @@ class OptimizerInst { ...@@ -91,9 +92,9 @@ class OptimizerInst {
if (instance == nullptr) { if (instance == nullptr) {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
if (instance == nullptr) { if (instance == nullptr) {
HybridPassPtr pass_ptr = std::make_shared<HybridPass>();
std::vector<PassPtr> pass_list; std::vector<PassPtr> pass_list;
pass_list.push_back(pass_ptr); pass_list.push_back(std::make_shared<LargeSQ8HPass>());
pass_list.push_back(std::make_shared<HybridPass>());
instance = std::make_shared<Optimizer>(pass_list); instance = std::make_shared<Optimizer>(pass_list);
} }
} }
......
...@@ -145,37 +145,38 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr ...@@ -145,37 +145,38 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
transport_costs.push_back(transport_cost); transport_costs.push_back(transport_cost);
paths.emplace_back(path); paths.emplace_back(path);
} }
if (task->job_.lock()->type() == JobType::SEARCH) { // if (task->job_.lock()->type() == JobType::SEARCH) {
auto label = task->label(); // auto label = task->label();
auto spec_label = std::static_pointer_cast<SpecResLabel>(label); // auto spec_label = std::static_pointer_cast<SpecResLabel>(label);
if (spec_label->resource().lock()->type() == ResourceType::CPU) { // if (spec_label->resource().lock()->type() == ResourceType::CPU) {
std::vector<std::string> spec_path; // std::vector<std::string> spec_path;
spec_path.push_back(spec_label->resource().lock()->name()); // spec_path.push_back(spec_label->resource().lock()->name());
spec_path.push_back(resource->name()); // spec_path.push_back(resource->name());
task->path() = Path(spec_path, spec_path.size() - 1); // task->path() = Path(spec_path, spec_path.size() - 1);
} else { // } else {
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost // // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
uint64_t min_cost = std::numeric_limits<uint64_t>::max(); // uint64_t min_cost = std::numeric_limits<uint64_t>::max();
uint64_t min_cost_idx = 0; // uint64_t min_cost_idx = 0;
for (uint64_t i = 0; i < compute_resources.size(); ++i) { // for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->TotalTasks() == 0) { // if (compute_resources[i]->TotalTasks() == 0) {
min_cost_idx = i; // min_cost_idx = i;
break; // break;
} // }
uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + // uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() +
transport_costs[i]; // transport_costs[i];
if (min_cost > cost) { // if (min_cost > cost) {
min_cost = cost; // min_cost = cost;
min_cost_idx = i; // min_cost_idx = i;
} // }
} // }
//
// step 3: set path in task // // step 3: set path in task
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); // Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
task->path() = task_path; // task->path() = task_path;
} // }
//
} else if (task->job_.lock()->type() == JobType::BUILD) { // } else
if (task->job_.lock()->type() == JobType::BUILD) {
// step2: Read device id in config // step2: Read device id in config
// get build index gpu resource // get build index gpu resource
server::Config& config = server::Config::GetInstance(); server::Config& config = server::Config::GetInstance();
...@@ -201,12 +202,13 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr ...@@ -201,12 +202,13 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
} }
if (resource->name() == task->path().Last()) { if (resource->name() == task->path().Last()) {
resource->WakeupLoader(); resource->WakeupExecutor();
} else { } else {
auto next_res_name = task->path().Next(); auto next_res_name = task->path().Next();
auto next_res = res_mgr.lock()->GetResource(next_res_name); auto next_res = res_mgr.lock()->GetResource(next_res_name);
event->task_table_item_->Move(); if (event->task_table_item_->Move()) {
next_res->task_table().Put(task); next_res->task_table().Put(task);
}
} }
} }
......
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "cache/GpuCacheMgr.h"
#include "scheduler/Utils.h"
#include "scheduler/optimizer/LargeSQ8HPass.h"
#include "scheduler/SchedInst.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
bool
LargeSQ8HPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask) {
return false;
}
auto search_task = std::static_pointer_cast<XSearchTask>(task);
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFSQ8H) {
return false;
}
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
// TODO: future, Index::IVFSQ8H, if nq < threshold set cpu, else set gpu
if (search_job->nq() < 100) {
return false;
}
std::vector<uint64_t> gpus = scheduler::get_gpu_pool();
std::vector<int64_t> all_free_mem;
for (auto& gpu : gpus) {
auto cache = cache::GpuCacheMgr::GetInstance(gpu);
auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
all_free_mem.push_back(free_mem);
}
auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
auto best_index = std::distance(all_free_mem.begin(), max_e);
auto best_device_id = gpus[best_index];
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
if (not res_ptr) {
SERVER_LOG_ERROR << "GpuResource " << best_device_id << " invalid.";
// TODO: throw critical error and exit
return false;
}
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
task->label() = label;
return true;
}
} // namespace scheduler
} // namespace milvus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <condition_variable>
#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#include "Pass.h"
namespace milvus {
namespace scheduler {
class LargeSQ8HPass : public Pass {
public:
LargeSQ8HPass() = default;
public:
bool
Run(const TaskPtr& task) override;
};
using LargeSQ8HPassPtr = std::shared_ptr<LargeSQ8HPass>;
} // namespace scheduler
} // namespace milvus
...@@ -111,11 +111,12 @@ Resource::pick_task_load() { ...@@ -111,11 +111,12 @@ Resource::pick_task_load() {
TaskTableItemPtr TaskTableItemPtr
Resource::pick_task_execute() { Resource::pick_task_execute() {
auto indexes = task_table_.PickToExecute(3); auto indexes = task_table_.PickToExecute(std::numeric_limits<uint64_t>::max());
for (auto index : indexes) { for (auto index : indexes) {
// try to set one task executing, then return // try to set one task executing, then return
if (task_table_.Execute(index)) if (task_table_[index]->task->path().Last() == name() && task_table_.Execute(index)) {
return task_table_.Get(index); return task_table_.Get(index);
}
// else try next // else try next
} }
return nullptr; return nullptr;
...@@ -125,7 +126,9 @@ void ...@@ -125,7 +126,9 @@ void
Resource::loader_function() { Resource::loader_function() {
while (running_) { while (running_) {
std::unique_lock<std::mutex> lock(load_mutex_); std::unique_lock<std::mutex> lock(load_mutex_);
load_cv_.wait(lock, [&] { return load_flag_; }); load_cv_.wait(lock, [&] {
return load_flag_;
});
load_flag_ = false; load_flag_ = false;
lock.unlock(); lock.unlock();
while (true) { while (true) {
...@@ -151,7 +154,9 @@ Resource::executor_function() { ...@@ -151,7 +154,9 @@ Resource::executor_function() {
} }
while (running_) { while (running_) {
std::unique_lock<std::mutex> lock(exec_mutex_); std::unique_lock<std::mutex> lock(exec_mutex_);
exec_cv_.wait(lock, [&] { return exec_flag_; }); exec_cv_.wait(lock, [&] {
return exec_flag_;
});
exec_flag_ = false; exec_flag_ = false;
lock.unlock(); lock.unlock();
while (true) { while (true) {
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include <string> #include <string>
#include <thread> #include <thread>
#include <utility> #include <utility>
#include <src/scheduler/SchedInst.h>
namespace milvus { namespace milvus {
namespace scheduler { namespace scheduler {
...@@ -121,7 +122,11 @@ XSearchTask::Load(LoadType type, uint8_t device_id) { ...@@ -121,7 +122,11 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
stat = index_engine_->Load(); stat = index_engine_->Load();
type_str = "DISK2CPU"; type_str = "DISK2CPU";
} else if (type == LoadType::CPU2GPU) { } else if (type == LoadType::CPU2GPU) {
stat = index_engine_->CopyToGpu(device_id); bool hybrid = false;
if (index_engine_->IndexEngineType() == engine::EngineType::FAISS_IVFSQ8H) {
hybrid = true;
}
stat = index_engine_->CopyToGpu(device_id, hybrid);
type_str = "CPU2GPU"; type_str = "CPU2GPU";
} else if (type == LoadType::GPU2CPU) { } else if (type == LoadType::GPU2CPU) {
stat = index_engine_->CopyToCpu(); stat = index_engine_->CopyToCpu();
...@@ -204,7 +209,12 @@ XSearchTask::Execute() { ...@@ -204,7 +209,12 @@ XSearchTask::Execute() {
try { try {
// step 2: search // step 2: search
index_engine_->Search(nq, vectors, topk, nprobe, output_distance.data(), output_ids.data()); bool hybrid = false;
if (index_engine_->IndexEngineType() == engine::EngineType::FAISS_IVFSQ8H &&
ResMgrInst::GetInstance()->GetResource(path().Last())->type() == ResourceType::CPU) {
hybrid = true;
}
index_engine_->Search(nq, vectors, topk, nprobe, output_distance.data(), output_ids.data(), hybrid);
double span = rc.RecordSection(hdr + ", do search"); double span = rc.RecordSection(hdr + ", do search");
// search_job->AccumSearchCost(span); // search_job->AccumSearchCost(span);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册