提交 50c4f98d 编写于 作者: G groot

merge from 0.6.0

......@@ -23,6 +23,7 @@ Please mark all change in change log and use the ticket from JIRA.
- \#77 - Support table partition
- \#127 - Support new Index type IVFPQ
- \#226 - Experimental shards middleware for Milvus
- \#346 - Support build index with multiple gpu
## Improvement
- \#255 - Add ivfsq8 test report detailed version
......
......@@ -27,9 +27,7 @@ metric_config:
port: 8080 # port prometheus uses to fetch metrics, must in range [1025, 65534]
cache_config:
cpu_cache_capacity: 16 # GB, CPU memory used for cache, must be a positive integer
cpu_cache_threshold: 0.85 # percentage of data that will be kept when cache cleanup is triggered, must be in range (0.0, 1.0]
cpu_cache_capacity: 16 # GB, size of CPU memory used for cache, must be a positive integer
cache_insert_data: false # whether to load inserted data into cache, must be a boolean
engine_config:
......@@ -37,7 +35,10 @@ engine_config:
# if nq >= use_blas_threshold, use OpenBlas, slower with stable response times
gpu_search_threshold: 1000 # threshold beyond which the search computation is executed on GPUs only
resource_config:
search_resources: # define the device used for search computation
- cpu
index_build_device: cpu # CPU used for building index
gpu_resource_config:
enable_gpu: false # whether to enable GPU resources
cache_capacity: 4 # GB, size of GPU memory per card used for cache, must be a positive integer
search_resources: # define the GPU devices used for search computation, must be in format gpux
- gpu0
build_index_resources: # define the GPU devices used for index building, must be in format gpux
- gpu0
......@@ -27,10 +27,7 @@ metric_config:
port: 8080 # port prometheus uses to fetch metrics, must in range [1025, 65534]
cache_config:
cpu_cache_capacity: 16 # GB, CPU memory used for cache, must be a positive integer
cpu_cache_threshold: 0.85 # percentage of data that will be kept when cache cleanup is triggered, must be in range (0.0, 1.0]
gpu_cache_capacity: 4 # GB, GPU memory used for cache, must be a positive integer
gpu_cache_threshold: 0.85 # percentage of data that will be kept when cache cleanup is triggered, must be in range (0.0, 1.0]
cpu_cache_capacity: 16 # GB, size of CPU memory used for cache, must be a positive integer
cache_insert_data: false # whether to load inserted data into cache, must be a boolean
engine_config:
......@@ -38,8 +35,10 @@ engine_config:
# if nq >= use_blas_threshold, use OpenBlas, slower with stable response times
gpu_search_threshold: 1000 # threshold beyond which the search computation is executed on GPUs only
resource_config:
search_resources: # define the devices used for search computation, must be in format: cpu or gpux
- cpu
gpu_resource_config:
enable_gpu: true # whether to enable GPU resources
cache_capacity: 4 # GB, size of GPU memory per card used for cache, must be a positive integer
search_resources: # define the GPU devices used for search computation, must be in format gpux
- gpu0
build_index_resources: # define the GPU devices used for index building, must be in format gpux
- gpu0
index_build_device: gpu0 # CPU / GPU used for building index, must be in format: cpu or gpux
......@@ -37,7 +37,7 @@ GpuCacheMgr::GpuCacheMgr() {
Status s;
int64_t gpu_cache_cap;
s = config.GetCacheConfigGpuCacheCapacity(gpu_cache_cap);
s = config.GetGpuResourceConfigCacheCapacity(gpu_cache_cap);
if (!s.ok()) {
SERVER_LOG_ERROR << s.message();
}
......@@ -45,7 +45,7 @@ GpuCacheMgr::GpuCacheMgr() {
cache_ = std::make_shared<Cache<DataObjPtr>>(cap, 1UL << 32);
float gpu_mem_threshold;
s = config.GetCacheConfigGpuCacheThreshold(gpu_mem_threshold);
s = config.GetGpuResourceConfigCacheThreshold(gpu_mem_threshold);
if (!s.ok()) {
SERVER_LOG_ERROR << s.message();
}
......
......@@ -144,7 +144,14 @@ ExecutionEngineImpl::HybridLoad() const {
}
const std::string key = location_ + ".quantizer";
std::vector<uint64_t> gpus = scheduler::get_gpu_pool();
server::Config& config = server::Config::GetInstance();
std::vector<int32_t> gpus;
Status s = config.GetGpuResourceConfigSearchResources(gpus);
if (!s.ok()) {
ENGINE_LOG_ERROR << s.message();
return;
}
// cache hit
{
......@@ -355,6 +362,7 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) {
Status
ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) {
gpu_num_ = device_id;
auto to_index_data = std::make_shared<ToIndexData>(PhysicalSize());
cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(to_index_data);
milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_, obj);
......@@ -578,12 +586,16 @@ ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
Status
ExecutionEngineImpl::Init() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetResourceConfigIndexBuildDevice(gpu_num_);
if (!s.ok()) {
return s;
std::vector<int32_t> gpu_ids;
Status s = config.GetGpuResourceConfigBuildIndexResources(gpu_ids);
for (auto id : gpu_ids) {
if (gpu_num_ == id) {
return Status::OK();
}
}
return Status::OK();
std::string msg = "Invalid gpu_num";
return Status(SERVER_INVALID_ARGUMENT, msg);
}
} // namespace engine
......
......@@ -85,7 +85,7 @@ JobMgr::worker_function() {
}
for (auto& task : tasks) {
calculate_path(task);
calculate_path(res_mgr_, task);
}
// disk resources NEVER be empty.
......@@ -103,8 +103,8 @@ JobMgr::build_task(const JobPtr& job) {
}
void
JobMgr::calculate_path(const TaskPtr& task) {
if (task->type_ != TaskType::SearchTask) {
JobMgr::calculate_path(const ResourceMgrPtr& res_mgr, const TaskPtr& task) {
if (task->type_ != TaskType::SearchTask && task->type_ != TaskType::BuildIndexTask) {
return;
}
......@@ -114,9 +114,9 @@ JobMgr::calculate_path(const TaskPtr& task) {
std::vector<std::string> path;
auto spec_label = std::static_pointer_cast<SpecResLabel>(task->label());
auto src = res_mgr_->GetDiskResources()[0];
auto src = res_mgr->GetDiskResources()[0];
auto dest = spec_label->resource();
ShortestPath(src.lock(), dest.lock(), res_mgr_, path);
ShortestPath(src.lock(), dest.lock(), res_mgr, path);
task->path() = Path(path, path.size() - 1);
}
......
......@@ -59,8 +59,9 @@ class JobMgr : public interface::dumpable {
static std::vector<TaskPtr>
build_task(const JobPtr& job);
void
calculate_path(const TaskPtr& task);
public:
static void
calculate_path(const ResourceMgrPtr& res_mgr, const TaskPtr& task);
private:
bool running_ = false;
......
......@@ -45,18 +45,6 @@ std::mutex BuildMgrInst::mutex_;
void
load_simple_config() {
server::Config& config = server::Config::GetInstance();
std::string mode;
config.GetResourceConfigMode(mode);
std::vector<std::string> pool;
config.GetResourceConfigSearchResources(pool);
// get resources
auto gpu_ids = get_gpu_pool();
int32_t build_gpu_id;
config.GetResourceConfigIndexBuildDevice(build_gpu_id);
// create and connect
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("disk", "DISK", 0, true, false));
......@@ -64,21 +52,40 @@ load_simple_config() {
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("cpu", "CPU", 0, true, true));
ResMgrInst::GetInstance()->Connect("disk", "cpu", io);
// get resources
#ifdef MILVUS_GPU_VERSION
server::Config& config = server::Config::GetInstance();
std::vector<int32_t> gpu_ids;
config.GetGpuResourceConfigSearchResources(gpu_ids);
std::vector<int32_t> build_gpu_ids;
config.GetGpuResourceConfigBuildIndexResources(build_gpu_ids);
auto pcie = Connection("pcie", 12000);
bool find_build_gpu_id = false;
std::vector<int64_t> not_find_build_ids;
for (auto& build_id : build_gpu_ids) {
bool find_gpu_id = false;
for (auto& gpu_id : gpu_ids) {
if (gpu_id == build_id) {
find_gpu_id = true;
break;
}
}
if (not find_gpu_id) {
not_find_build_ids.emplace_back(build_id);
}
}
for (auto& gpu_id : gpu_ids) {
ResMgrInst::GetInstance()->Add(ResourceFactory::Create(std::to_string(gpu_id), "GPU", gpu_id, true, true));
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(gpu_id), pcie);
if (build_gpu_id == gpu_id) {
find_build_gpu_id = true;
}
}
if (not find_build_gpu_id) {
for (auto& not_find_id : not_find_build_ids) {
ResMgrInst::GetInstance()->Add(
ResourceFactory::Create(std::to_string(build_gpu_id), "GPU", build_gpu_id, true, true));
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(build_gpu_id), pcie);
ResourceFactory::Create(std::to_string(not_find_id), "GPU", not_find_id, true, true));
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(not_find_id), pcie);
}
#endif
}
void
......
......@@ -21,11 +21,13 @@
#include "JobMgr.h"
#include "ResourceMgr.h"
#include "Scheduler.h"
#include "Utils.h"
#include "optimizer/BuildIndexPass.h"
#include "optimizer/FaissFlatPass.h"
#include "optimizer/FaissIVFFlatPass.h"
#include "optimizer/FaissIVFSQ8HPass.h"
#include "optimizer/FaissIVFSQ8Pass.h"
#include "optimizer/FallbackPass.h"
#include "optimizer/HybridPass.h"
#include "optimizer/LargeSQ8HPass.h"
#include "optimizer/OnlyCPUPass.h"
#include "optimizer/OnlyGPUPass.h"
#include "optimizer/Optimizer.h"
#include "server/Config.h"
......@@ -98,21 +100,14 @@ class OptimizerInst {
if (instance == nullptr) {
std::lock_guard<std::mutex> lock(mutex_);
if (instance == nullptr) {
server::Config& config = server::Config::GetInstance();
std::vector<std::string> search_resources;
bool has_cpu = false;
config.GetResourceConfigSearchResources(search_resources);
for (auto& resource : search_resources) {
if (resource == "cpu") {
has_cpu = true;
}
}
std::vector<PassPtr> pass_list;
pass_list.push_back(std::make_shared<LargeSQ8HPass>());
pass_list.push_back(std::make_shared<HybridPass>());
pass_list.push_back(std::make_shared<OnlyCPUPass>());
pass_list.push_back(std::make_shared<OnlyGPUPass>(has_cpu));
#ifdef MILVUS_GPU_VERSION
pass_list.push_back(std::make_shared<BuildIndexPass>());
pass_list.push_back(std::make_shared<FaissFlatPass>());
pass_list.push_back(std::make_shared<FaissIVFFlatPass>());
pass_list.push_back(std::make_shared<FaissIVFSQ8Pass>());
pass_list.push_back(std::make_shared<FaissIVFSQ8HPass>());
#endif
pass_list.push_back(std::make_shared<FallbackPass>());
instance = std::make_shared<Optimizer>(pass_list);
}
......
......@@ -108,10 +108,6 @@ Scheduler::OnLoadCompleted(const EventPtr& event) {
auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
switch (task_table_type) {
case TaskLabelType::DEFAULT: {
Action::DefaultLabelTaskScheduler(res_mgr_, resource, load_completed_event);
break;
}
case TaskLabelType::SPECIFIED_RESOURCE: {
Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event);
break;
......
......@@ -18,7 +18,6 @@
#include "scheduler/TaskCreator.h"
#include "SchedInst.h"
#include "tasklabel/BroadcastLabel.h"
#include "tasklabel/DefaultLabel.h"
#include "tasklabel/SpecResLabel.h"
namespace milvus {
......@@ -47,8 +46,7 @@ std::vector<TaskPtr>
TaskCreator::Create(const SearchJobPtr& job) {
std::vector<TaskPtr> tasks;
for (auto& index_file : job->index_files()) {
auto label = std::make_shared<DefaultLabel>();
auto task = std::make_shared<XSearchTask>(index_file.second, label);
auto task = std::make_shared<XSearchTask>(index_file.second, nullptr);
task->job_ = job;
tasks.emplace_back(task);
}
......@@ -70,12 +68,8 @@ TaskCreator::Create(const DeleteJobPtr& job) {
std::vector<TaskPtr>
TaskCreator::Create(const BuildIndexJobPtr& job) {
std::vector<TaskPtr> tasks;
// TODO(yukun): remove "disk" hardcode here
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk");
for (auto& to_index_file : job->to_index_files()) {
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
auto task = std::make_shared<XBuildIndexTask>(to_index_file.second, label);
auto task = std::make_shared<XBuildIndexTask>(to_index_file.second, nullptr);
task->job_ = job;
tasks.emplace_back(task);
}
......
......@@ -16,8 +16,6 @@
// under the License.
#include "scheduler/Utils.h"
#include "server/Config.h"
#include "utils/Log.h"
#ifdef MILVUS_GPU_VERSION
#include <cuda_runtime.h>
......@@ -46,42 +44,5 @@ get_num_gpu() {
return n_devices;
}
std::vector<uint64_t>
get_gpu_pool() {
std::vector<uint64_t> gpu_pool;
server::Config& config = server::Config::GetInstance();
std::vector<std::string> pool;
Status s = config.GetResourceConfigSearchResources(pool);
if (!s.ok()) {
SERVER_LOG_ERROR << s.message();
}
std::set<uint64_t> gpu_ids;
for (auto& resource : pool) {
if (resource == "cpu") {
continue;
} else {
if (resource.length() < 4 || resource.substr(0, 3) != "gpu") {
// error
exit(-1);
}
auto gpu_id = std::stoi(resource.substr(3));
if (gpu_id >= scheduler::get_num_gpu()) {
// error
exit(-1);
}
gpu_ids.insert(gpu_id);
}
}
for (auto& gpu_id : gpu_ids) {
gpu_pool.push_back(gpu_id);
}
return gpu_pool;
}
} // namespace scheduler
} // namespace milvus
......@@ -27,8 +27,5 @@ get_current_timestamp();
uint64_t
get_num_gpu();
std::vector<uint64_t>
get_gpu_pool();
} // namespace scheduler
} // namespace milvus
......@@ -36,10 +36,6 @@ class Action {
static void
PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest);
static void
DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event);
static void
SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event);
......
......@@ -101,110 +101,46 @@ Action::PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest)
dest->task_table().Put(task_item->task, task_item);
}
void
Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event) {
if (not resource->HasExecutor() && event->task_table_item_->Move()) {
auto task_item = event->task_table_item_;
auto task = event->task_table_item_->task;
auto search_task = std::static_pointer_cast<XSearchTask>(task);
bool moved = false;
// to support test task, REFACTOR
if (resource->type() == ResourceType::CPU) {
if (auto index_engine = search_task->index_engine_) {
auto location = index_engine->GetLocation();
for (auto i = 0; i < res_mgr->GetNumGpuResource(); ++i) {
auto index = milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
if (index != nullptr) {
moved = true;
auto dest_resource = res_mgr->GetResource(ResourceType::GPU, i);
PushTaskToResource(event->task_table_item_, dest_resource);
break;
}
}
}
}
if (not moved) {
PushTaskToNeighbourRandomly(task_item, resource);
}
}
}
void
Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event) {
auto task_item = event->task_table_item_;
auto task = event->task_table_item_->task;
if (resource->type() == ResourceType::DISK) {
// step 1: calculate shortest path per resource, from disk to compute resource
auto compute_resources = res_mgr->GetComputeResources();
std::vector<std::vector<std::string>> paths;
std::vector<uint64_t> transport_costs;
for (auto& res : compute_resources) {
std::vector<std::string> path;
uint64_t transport_cost = ShortestPath(resource, res, res_mgr, path);
transport_costs.push_back(transport_cost);
paths.emplace_back(path);
}
// if (task->job_.lock()->type() == JobType::SEARCH) {
// auto label = task->label();
// auto spec_label = std::static_pointer_cast<SpecResLabel>(label);
// if (spec_label->resource().lock()->type() == ResourceType::CPU) {
// std::vector<std::string> spec_path;
// spec_path.push_back(spec_label->resource().lock()->name());
// spec_path.push_back(resource->name());
// task->path() = Path(spec_path, spec_path.size() - 1);
// } else {
// // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
// uint64_t min_cost = std::numeric_limits<uint64_t>::max();
// uint64_t min_cost_idx = 0;
// for (uint64_t i = 0; i < compute_resources.size(); ++i) {
// if (compute_resources[i]->TotalTasks() == 0) {
// min_cost_idx = i;
// break;
// }
// uint64_t cost = compute_resources[i]->TaskAvgCost() *
// compute_resources[i]->NumOfTaskToExec() +
// transport_costs[i];
// if (min_cost > cost) {
// min_cost = cost;
// min_cost_idx = i;
// }
// }
//
// // step 3: set path in task
// Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
// task->path() = task_path;
// }
//
// } else
if (task->job_.lock()->type() == JobType::BUILD) {
// step2: Read device id in config
// get build index gpu resource
server::Config& config = server::Config::GetInstance();
int32_t build_index_gpu;
Status stat = config.GetResourceConfigIndexBuildDevice(build_index_gpu);
bool find_gpu_res = false;
if (res_mgr->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->name() ==
res_mgr->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
find_gpu_res = true;
Path task_path(paths[i], paths[i].size() - 1);
task->path() = task_path;
break;
}
}
}
if (not find_gpu_res) {
task->path() = Path(paths[0], paths[0].size() - 1);
}
}
}
// if (resource->type() == ResourceType::DISK) {
// // step 1: calculate shortest path per resource, from disk to compute resource
// auto compute_resources = res_mgr->GetComputeResources();
// std::vector<std::vector<std::string>> paths;
// std::vector<uint64_t> transport_costs;
// for (auto& res : compute_resources) {
// std::vector<std::string> path;
// uint64_t transport_cost = ShortestPath(resource, res, res_mgr, path);
// transport_costs.push_back(transport_cost);
// paths.emplace_back(path);
// }
// if (task->job_.lock()->type() == JobType::BUILD) {
// // step2: Read device id in config
// // get build index gpu resource
// server::Config& config = server::Config::GetInstance();
// int32_t build_index_gpu;
// Status stat = config.GetResourceConfigIndexBuildDevice(build_index_gpu);
//
// bool find_gpu_res = false;
// if (res_mgr->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
// for (uint64_t i = 0; i < compute_resources.size(); ++i) {
// if (compute_resources[i]->name() ==
// res_mgr->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
// find_gpu_res = true;
// Path task_path(paths[i], paths[i].size() - 1);
// task->path() = task_path;
// break;
// }
// }
// }
// if (not find_gpu_res) {
// task->path() = Path(paths[0], paths[0].size() - 1);
// }
// }
// }
if (resource->name() == task->path().Last()) {
resource->WakeupExecutor();
......
......@@ -15,36 +15,38 @@
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/OnlyCPUPass.h"
#include "scheduler/optimizer/BuildIndexPass.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/SpecResLabel.h"
namespace milvus {
namespace scheduler {
void
OnlyCPUPass::Init() {
BuildIndexPass::Init() {
server::Config& config = server::Config::GetInstance();
std::vector<int32_t> build_resources;
Status s = config.GetGpuResourceConfigBuildIndexResources(build_resources);
if (!s.ok()) {
throw;
}
}
bool
OnlyCPUPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask)
return false;
auto search_task = std::static_pointer_cast<XSearchTask>(task);
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFSQ8 &&
search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFFLAT) {
BuildIndexPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::BuildIndexTask)
return false;
}
auto gpu_id = get_gpu_pool();
if (not gpu_id.empty())
if (build_gpu_ids_.empty())
return false;
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
ResourcePtr res_ptr;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, build_gpu_ids_[specified_gpu_id_]);
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
task->label() = label;
specified_gpu_id_ = (specified_gpu_id_ + 1) % build_gpu_ids_.size();
return true;
}
......
......@@ -32,9 +32,9 @@
namespace milvus {
namespace scheduler {
class OnlyGPUPass : public Pass {
class BuildIndexPass : public Pass {
public:
explicit OnlyGPUPass(bool has_cpu);
BuildIndexPass() = default;
public:
void
......@@ -45,10 +45,10 @@ class OnlyGPUPass : public Pass {
private:
uint64_t specified_gpu_id_ = 0;
bool has_cpu_ = false;
std::vector<int32_t> build_gpu_ids_;
};
using OnlyGPUPassPtr = std::shared_ptr<OnlyGPUPass>;
using BuildIndexPassPtr = std::shared_ptr<BuildIndexPass>;
} // namespace scheduler
} // namespace milvus
......@@ -15,43 +15,53 @@
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/OnlyGPUPass.h"
#include "scheduler/optimizer/FaissFlatPass.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "server/Config.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
OnlyGPUPass::OnlyGPUPass(bool has_cpu) : has_cpu_(has_cpu) {
}
void
OnlyGPUPass::Init() {
FaissFlatPass::Init() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetEngineConfigGpuSearchThreshold(threshold_);
if (!s.ok()) {
threshold_ = std::numeric_limits<int32_t>::max();
}
s = config.GetGpuResourceConfigSearchResources(gpus);
if (!s.ok()) {
throw;
}
}
bool
OnlyGPUPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask || has_cpu_)
FaissFlatPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask) {
return false;
}
auto search_task = std::static_pointer_cast<XSearchTask>(task);
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFSQ8 &&
search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFFLAT &&
search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IDMAP) {
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IDMAP) {
return false;
}
auto gpu_id = get_gpu_pool();
if (gpu_id.empty())
return false;
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, gpu_id[specified_gpu_id_]);
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
ResourcePtr res_ptr;
if (search_job->nq() < threshold_) {
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % gpus.size();
count_++;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
}
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;
specified_gpu_id_ = specified_gpu_id_++ % gpu_id.size();
return true;
}
......
......@@ -33,9 +33,9 @@
namespace milvus {
namespace scheduler {
class LargeSQ8HPass : public Pass {
class FaissFlatPass : public Pass {
public:
LargeSQ8HPass() = default;
FaissFlatPass() = default;
public:
void
......@@ -47,9 +47,10 @@ class LargeSQ8HPass : public Pass {
private:
int32_t threshold_ = std::numeric_limits<int32_t>::max();
int64_t count_ = 0;
std::vector<int32_t> gpus;
};
using LargeSQ8HPassPtr = std::shared_ptr<LargeSQ8HPass>;
using FaissFlatPassPtr = std::shared_ptr<FaissFlatPass>;
} // namespace scheduler
} // namespace milvus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/FaissIVFFlatPass.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "server/Config.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
void
FaissIVFFlatPass::Init() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetEngineConfigGpuSearchThreshold(threshold_);
if (!s.ok()) {
threshold_ = std::numeric_limits<int32_t>::max();
}
s = config.GetGpuResourceConfigSearchResources(gpus);
if (!s.ok()) {
throw;
}
}
bool
FaissIVFFlatPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask) {
return false;
}
auto search_task = std::static_pointer_cast<XSearchTask>(task);
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFFLAT) {
return false;
}
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
ResourcePtr res_ptr;
if (search_job->nq() < threshold_) {
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % gpus.size();
count_++;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
}
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;
return true;
}
} // namespace scheduler
} // namespace milvus
......@@ -18,6 +18,7 @@
#include <condition_variable>
#include <deque>
#include <limits>
#include <list>
#include <memory>
#include <mutex>
......@@ -32,9 +33,9 @@
namespace milvus {
namespace scheduler {
class OnlyCPUPass : public Pass {
class FaissIVFFlatPass : public Pass {
public:
OnlyCPUPass() = default;
FaissIVFFlatPass() = default;
public:
void
......@@ -42,9 +43,14 @@ class OnlyCPUPass : public Pass {
bool
Run(const TaskPtr& task) override;
private:
int32_t threshold_ = std::numeric_limits<int32_t>::max();
int64_t count_ = 0;
std::vector<int32_t> gpus;
};
using OnlyCPUPassPtr = std::shared_ptr<OnlyCPUPass>;
using FaissIVFFlatPassPtr = std::shared_ptr<FaissIVFFlatPass>;
} // namespace scheduler
} // namespace milvus
......@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/LargeSQ8HPass.h"
#include "scheduler/optimizer/FaissIVFSQ8HPass.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h"
......@@ -28,16 +28,17 @@ namespace milvus {
namespace scheduler {
void
LargeSQ8HPass::Init() {
FaissIVFSQ8HPass::Init() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetEngineConfigGpuSearchThreshold(threshold_);
if (!s.ok()) {
threshold_ = std::numeric_limits<int32_t>::max();
}
s = config.GetGpuResourceConfigSearchResources(gpus);
}
bool
LargeSQ8HPass::Run(const TaskPtr& task) {
FaissIVFSQ8HPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask) {
return false;
}
......@@ -48,37 +49,16 @@ LargeSQ8HPass::Run(const TaskPtr& task) {
}
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
// TODO: future, Index::IVFSQ8H, if nq < threshold set cpu, else set gpu
ResourcePtr res_ptr;
if (search_job->nq() < threshold_) {
return false;
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % gpus.size();
count_++;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
}
std::vector<uint64_t> gpus = scheduler::get_gpu_pool();
// std::vector<int64_t> all_free_mem;
// for (auto& gpu : gpus) {
// auto cache = cache::GpuCacheMgr::GetInstance(gpu);
// auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
// all_free_mem.push_back(free_mem);
// }
//
// auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
// auto best_index = std::distance(all_free_mem.begin(), max_e);
// auto best_device_id = gpus[best_index];
auto best_device_id = count_ % gpus.size();
count_++;
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
if (not res_ptr) {
SERVER_LOG_ERROR << "GpuResource " << best_device_id << " invalid.";
// TODO: throw critical error and exit
return false;
}
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;
return true;
}
......
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <condition_variable>
#include <deque>
#include <limits>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#include "Pass.h"
namespace milvus {
namespace scheduler {
class FaissIVFSQ8HPass : public Pass {
public:
FaissIVFSQ8HPass() = default;
public:
void
Init() override;
bool
Run(const TaskPtr& task) override;
private:
int32_t threshold_ = std::numeric_limits<int32_t>::max();
int64_t count_ = 0;
std::vector<int32_t> gpus;
};
using FaissIVFSQ8HPassPtr = std::shared_ptr<FaissIVFSQ8HPass>;
} // namespace scheduler
} // namespace milvus
......@@ -15,32 +15,54 @@
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/HybridPass.h"
#include "scheduler/optimizer/FaissIVFSQ8Pass.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "server/Config.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
void
HybridPass::Init() {
FaissIVFSQ8Pass::Init() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetEngineConfigGpuSearchThreshold(threshold_);
if (!s.ok()) {
threshold_ = std::numeric_limits<int32_t>::max();
}
s = config.GetGpuResourceConfigSearchResources(gpus);
if (!s.ok()) {
throw;
}
}
bool
HybridPass::Run(const TaskPtr& task) {
// TODO: future, Index::IVFSQ8H, if nq < threshold set cpu, else set gpu
if (task->Type() != TaskType::SearchTask)
FaissIVFSQ8Pass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::SearchTask) {
return false;
}
auto search_task = std::static_pointer_cast<XSearchTask>(task);
if (search_task->file_->engine_type_ == (int)engine::EngineType::FAISS_IVFSQ8H) {
// TODO: remove "cpu" hardcode
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
task->label() = label;
return true;
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFSQ8) {
return false;
}
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
ResourcePtr res_ptr;
if (search_job->nq() < threshold_) {
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
auto best_device_id = count_ % gpus.size();
count_++;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
}
return false;
auto label = std::make_shared<SpecResLabel>(res_ptr);
task->label() = label;
return true;
}
} // namespace scheduler
......
......@@ -18,6 +18,7 @@
#include <condition_variable>
#include <deque>
#include <limits>
#include <list>
#include <memory>
#include <mutex>
......@@ -32,9 +33,9 @@
namespace milvus {
namespace scheduler {
class HybridPass : public Pass {
class FaissIVFSQ8Pass : public Pass {
public:
HybridPass() = default;
FaissIVFSQ8Pass() = default;
public:
void
......@@ -42,9 +43,14 @@ class HybridPass : public Pass {
bool
Run(const TaskPtr& task) override;
private:
int32_t threshold_ = std::numeric_limits<int32_t>::max();
int64_t count_ = 0;
std::vector<int32_t> gpus;
};
using HybridPassPtr = std::shared_ptr<HybridPass>;
using FaissIVFSQ8PassPtr = std::shared_ptr<FaissIVFSQ8Pass>;
} // namespace scheduler
} // namespace milvus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "TaskLabel.h"
#include <memory>
namespace milvus {
namespace scheduler {
class DefaultLabel : public TaskLabel {
public:
DefaultLabel() : TaskLabel(TaskLabelType::DEFAULT) {
}
};
using DefaultLabelPtr = std::shared_ptr<DefaultLabel>;
} // namespace scheduler
} // namespace milvus
......@@ -23,7 +23,6 @@ namespace milvus {
namespace scheduler {
enum class TaskLabelType {
DEFAULT, // means can be executed in any resource
SPECIFIED_RESOURCE, // means must executing in special resource
BROADCAST, // means all enable-executor resource must execute task
};
......
此差异已折叠。
......@@ -59,12 +59,8 @@ static const char* CONFIG_DB_PRELOAD_TABLE = "preload_table";
static const char* CONFIG_CACHE = "cache_config";
static const char* CONFIG_CACHE_CPU_CACHE_CAPACITY = "cpu_cache_capacity";
static const char* CONFIG_CACHE_CPU_CACHE_CAPACITY_DEFAULT = "16";
static const char* CONFIG_CACHE_GPU_CACHE_CAPACITY = "gpu_cache_capacity";
static const char* CONFIG_CACHE_GPU_CACHE_CAPACITY_DEFAULT = "4";
static const char* CONFIG_CACHE_CPU_CACHE_THRESHOLD = "cpu_mem_threshold";
static const char* CONFIG_CACHE_CPU_CACHE_THRESHOLD = "cpu_cache_threshold";
static const char* CONFIG_CACHE_CPU_CACHE_THRESHOLD_DEFAULT = "0.85";
static const char* CONFIG_CACHE_GPU_CACHE_THRESHOLD = "gpu_mem_threshold";
static const char* CONFIG_CACHE_GPU_CACHE_THRESHOLD_DEFAULT = "0.85";
static const char* CONFIG_CACHE_CACHE_INSERT_DATA = "cache_insert_data";
static const char* CONFIG_CACHE_CACHE_INSERT_DATA_DEFAULT = "false";
......@@ -87,26 +83,23 @@ static const char* CONFIG_ENGINE_OMP_THREAD_NUM_DEFAULT = "0";
static const char* CONFIG_ENGINE_GPU_SEARCH_THRESHOLD = "gpu_search_threshold";
static const char* CONFIG_ENGINE_GPU_SEARCH_THRESHOLD_DEFAULT = "1000";
/* resource config */
static const char* CONFIG_RESOURCE = "resource_config";
static const char* CONFIG_RESOURCE_MODE = "mode";
static const char* CONFIG_RESOURCE_MODE_DEFAULT = "simple";
static const char* CONFIG_RESOURCE_SEARCH_RESOURCES = "search_resources";
static const char* CONFIG_RESOURCE_SEARCH_RESOURCES_DELIMITER = ",";
#ifdef MILVUS_CPU_VERSION
static const char* CONFIG_RESOURCE_SEARCH_RESOURCES_DEFAULT = "cpu";
#else
static const char* CONFIG_RESOURCE_SEARCH_RESOURCES_DEFAULT = "cpu,gpu0";
#endif
static const char* CONFIG_RESOURCE_INDEX_BUILD_DEVICE = "index_build_device";
#ifdef MILVUS_CPU_VERSION
static const char* CONFIG_RESOURCE_INDEX_BUILD_DEVICE_DEFAULT = "cpu";
/* gpu resource config */
static const char* CONFIG_GPU_RESOURCE = "gpu_resource_config";
static const char* CONFIG_GPU_RESOURCE_ENABLE_GPU = "enable_gpu";
#ifdef MILVUS_GPU_VERSION
static const char* CONFIG_GPU_RESOURCE_ENABLE_GPU_DEFAULT = "true";
#else
static const char* CONFIG_RESOURCE_INDEX_BUILD_DEVICE_DEFAULT = "gpu0";
static const char* CONFIG_GPU_RESOURCE_ENABLE_GPU_DEFAULT = "false";
#endif
const int32_t CPU_DEVICE_ID = -1;
static const char* CONFIG_GPU_RESOURCE_CACHE_CAPACITY = "cache_capacity";
static const char* CONFIG_GPU_RESOURCE_CACHE_CAPACITY_DEFAULT = "4";
static const char* CONFIG_GPU_RESOURCE_CACHE_THRESHOLD = "cache_threshold";
static const char* CONFIG_GPU_RESOURCE_CACHE_THRESHOLD_DEFAULT = "0.85";
static const char* CONFIG_GPU_RESOURCE_DELIMITER = ",";
static const char* CONFIG_GPU_RESOURCE_SEARCH_RESOURCES = "search_resources";
static const char* CONFIG_GPU_RESOURCE_SEARCH_RESOURCES_DEFAULT = "gpu0";
static const char* CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES = "build_index_resources";
static const char* CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES_DEFAULT = "gpu0";
class Config {
public:
......@@ -170,10 +163,6 @@ class Config {
Status
CheckCacheConfigCpuCacheThreshold(const std::string& value);
Status
CheckCacheConfigGpuCacheCapacity(const std::string& value);
Status
CheckCacheConfigGpuCacheThreshold(const std::string& value);
Status
CheckCacheConfigCacheInsertData(const std::string& value);
/* engine config */
......@@ -184,13 +173,17 @@ class Config {
Status
CheckEngineConfigGpuSearchThreshold(const std::string& value);
/* resource config */
/* gpu resource config */
Status
CheckGpuResourceConfigEnableGpu(const std::string& value);
Status
CheckGpuResourceConfigCacheCapacity(const std::string& value);
Status
CheckResourceConfigMode(const std::string& value);
CheckGpuResourceConfigCacheThreshold(const std::string& value);
Status
CheckResourceConfigSearchResources(const std::vector<std::string>& value);
CheckGpuResourceConfigSearchResources(const std::vector<std::string>& value);
Status
CheckResourceConfigIndexBuildDevice(const std::string& value);
CheckGpuResourceConfigBuildIndexResources(const std::vector<std::string>& value);
std::string
GetConfigStr(const std::string& parent_key, const std::string& child_key, const std::string& default_value = "");
......@@ -239,10 +232,6 @@ class Config {
Status
GetCacheConfigCpuCacheThreshold(float& value);
Status
GetCacheConfigGpuCacheCapacity(int64_t& value);
Status
GetCacheConfigGpuCacheThreshold(float& value);
Status
GetCacheConfigCacheInsertData(bool& value);
/* engine config */
......@@ -253,13 +242,17 @@ class Config {
Status
GetEngineConfigGpuSearchThreshold(int32_t& value);
/* resource config */
/* gpu resource config */
Status
GetGpuResourceConfigEnableGpu(bool& value);
Status
GetGpuResourceConfigCacheCapacity(int64_t& value);
Status
GetResourceConfigMode(std::string& value);
GetGpuResourceConfigCacheThreshold(float& value);
Status
GetResourceConfigSearchResources(std::vector<std::string>& value);
GetGpuResourceConfigSearchResources(std::vector<int32_t>& value);
Status
GetResourceConfigIndexBuildDevice(int32_t& value);
GetGpuResourceConfigBuildIndexResources(std::vector<int32_t>& value);
public:
/* server config */
......@@ -300,10 +293,6 @@ class Config {
Status
SetCacheConfigCpuCacheThreshold(const std::string& value);
Status
SetCacheConfigGpuCacheCapacity(const std::string& value);
Status
SetCacheConfigGpuCacheThreshold(const std::string& value);
Status
SetCacheConfigCacheInsertData(const std::string& value);
/* engine config */
......@@ -314,13 +303,17 @@ class Config {
Status
SetEngineConfigGpuSearchThreshold(const std::string& value);
/* resource config */
/* gpu resource config */
Status
SetGpuResourceConfigEnableGpu(const std::string& value);
Status
SetGpuResourceConfigCacheCapacity(const std::string& value);
Status
SetResourceConfigMode(const std::string& value);
SetGpuResourceConfigCacheThreshold(const std::string& value);
Status
SetResourceConfigSearchResources(const std::string& value);
SetGpuResourceConfigSearchResources(const std::string& value);
Status
SetResourceConfigIndexBuildDevice(const std::string& value);
SetGpuResourceConfigBuildIndexResources(const std::string& value);
private:
std::unordered_map<std::string, std::unordered_map<std::string, std::string>> config_map_;
......
......@@ -198,7 +198,7 @@ ValidationUtil::ValidatePartitionTags(const std::vector<std::string>& partition_
}
Status
ValidationUtil::ValidateGpuIndex(uint32_t gpu_index) {
ValidationUtil::ValidateGpuIndex(int32_t gpu_index) {
#ifdef MILVUS_GPU_VERSION
int num_devices = 0;
auto cuda_err = cudaGetDeviceCount(&num_devices);
......@@ -219,7 +219,7 @@ ValidationUtil::ValidateGpuIndex(uint32_t gpu_index) {
}
Status
ValidationUtil::GetGpuMemory(uint32_t gpu_index, size_t& memory) {
ValidationUtil::GetGpuMemory(int32_t gpu_index, size_t& memory) {
#ifdef MILVUS_GPU_VERSION
cudaDeviceProp deviceProp;
......
......@@ -62,10 +62,10 @@ class ValidationUtil {
ValidatePartitionTags(const std::vector<std::string>& partition_tags);
static Status
ValidateGpuIndex(uint32_t gpu_index);
ValidateGpuIndex(int32_t gpu_index);
static Status
GetGpuMemory(uint32_t gpu_index, size_t& memory);
GetGpuMemory(int32_t gpu_index, size_t& memory);
static Status
ValidateIpAddress(const std::string& ip_address);
......
......@@ -19,6 +19,8 @@
#ifdef MILVUS_GPU_VERSION
#include "knowhere/index/vector_index/helpers/FaissGpuResourceMgr.h"
#endif
#include "scheduler/Utils.h"
#include "server/Config.h"
#include <map>
......@@ -35,7 +37,6 @@ constexpr int64_t M_BYTE = 1024 * 1024;
Status
KnowhereResource::Initialize() {
#ifdef MILVUS_GPU_VERSION
struct GpuResourceSetting {
int64_t pinned_memory = 300 * M_BYTE;
int64_t temp_memory = 300 * M_BYTE;
......@@ -47,27 +48,22 @@ KnowhereResource::Initialize() {
// get build index gpu resource
server::Config& config = server::Config::GetInstance();
int32_t build_index_gpu;
s = config.GetResourceConfigIndexBuildDevice(build_index_gpu);
std::vector<int32_t> build_index_gpus;
s = config.GetGpuResourceConfigBuildIndexResources(build_index_gpus);
if (!s.ok())
return s;
gpu_resources.insert(std::make_pair(build_index_gpu, GpuResourceSetting()));
for (auto gpu_id : build_index_gpus) {
gpu_resources.insert(std::make_pair(gpu_id, GpuResourceSetting()));
}
// get search gpu resource
std::vector<std::string> pool;
s = config.GetResourceConfigSearchResources(pool);
std::vector<int32_t> search_gpus;
s = config.GetGpuResourceConfigSearchResources(search_gpus);
if (!s.ok())
return s;
std::set<uint64_t> gpu_ids;
for (auto& resource : pool) {
if (resource.length() < 4 || resource.substr(0, 3) != "gpu") {
// invalid
continue;
}
auto gpu_id = std::stoi(resource.substr(3));
for (auto& gpu_id : search_gpus) {
gpu_resources.insert(std::make_pair(gpu_id, GpuResourceSetting()));
}
......
......@@ -159,6 +159,10 @@ DBTest::SetUp() {
auto default_conn = milvus::scheduler::Connection("IO", 500.0);
auto PCIE = milvus::scheduler::Connection("IO", 11000.0);
res_mgr->Connect("disk", "cpu", default_conn);
#ifdef MILVUS_GPU_VERSION
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("0", "GPU", 0, true, true));
res_mgr->Connect("cpu", "0", PCIE);
#endif
res_mgr->Start();
milvus::scheduler::SchedInst::GetInstance()->Start();
......
......@@ -21,7 +21,6 @@ set(test_files
${CMAKE_CURRENT_SOURCE_DIR}/test_algorithm.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_event.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_node.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_normal.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_resource.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_resource_factory.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_resource_mgr.cpp
......
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include "scheduler/ResourceFactory.h"
#include "scheduler/ResourceMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Scheduler.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/tasklabel/DefaultLabel.h"
#include "utils/Log.h"
namespace {
namespace ms = milvus::scheduler;
} // namespace
TEST(NormalTest, INST_TEST) {
// ResourceMgr only compose resources, provide unified event
auto res_mgr = ms::ResMgrInst::GetInstance();
res_mgr->Add(ms::ResourceFactory::Create("disk", "DISK", 0, true, false));
res_mgr->Add(ms::ResourceFactory::Create("cpu", "CPU", 0, true, true));
auto IO = ms::Connection("IO", 500.0);
res_mgr->Connect("disk", "cpu", IO);
auto scheduler = ms::SchedInst::GetInstance();
res_mgr->Start();
scheduler->Start();
const uint64_t NUM_TASK = 2;
std::vector<std::shared_ptr<ms::TestTask>> tasks;
ms::TableFileSchemaPtr dummy = nullptr;
auto disks = res_mgr->GetDiskResources();
ASSERT_FALSE(disks.empty());
if (auto observe = disks[0].lock()) {
for (uint64_t i = 0; i < NUM_TASK; ++i) {
auto label = std::make_shared<ms::DefaultLabel>();
auto task = std::make_shared<ms::TestTask>(dummy, label);
task->label() = std::make_shared<ms::DefaultLabel>();
tasks.push_back(task);
observe->task_table().Put(task);
}
}
for (auto& task : tasks) {
task->Wait();
ASSERT_EQ(task->load_count_, 1);
ASSERT_EQ(task->exec_count_, 1);
}
scheduler->Stop();
res_mgr->Stop();
}
......@@ -24,7 +24,7 @@
#include "scheduler/resource/TestResource.h"
#include "scheduler/task/Task.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/tasklabel/DefaultLabel.h"
#include "scheduler/tasklabel/SpecResLabel.h"
namespace milvus {
namespace scheduler {
......@@ -182,8 +182,10 @@ TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) {
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto label = std::make_shared<DefaultLabel>();
auto label = std::make_shared<SpecResLabel>(disk_resource_);
auto task = std::make_shared<TestTask>(dummy, label);
std::vector<std::string> path{disk_resource_->name()};
task->path() = Path(path, 0);
tasks.push_back(task);
disk_resource_->task_table().Put(task);
}
......@@ -208,8 +210,10 @@ TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) {
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto label = std::make_shared<DefaultLabel>();
auto label = std::make_shared<SpecResLabel>(cpu_resource_);
auto task = std::make_shared<TestTask>(dummy, label);
std::vector<std::string> path{cpu_resource_->name()};
task->path() = Path(path, 0);
tasks.push_back(task);
cpu_resource_->task_table().Put(task);
}
......@@ -234,8 +238,10 @@ TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) {
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto label = std::make_shared<DefaultLabel>();
auto label = std::make_shared<SpecResLabel>(gpu_resource_);
auto task = std::make_shared<TestTask>(dummy, label);
std::vector<std::string> path{gpu_resource_->name()};
task->path() = Path(path, 0);
tasks.push_back(task);
gpu_resource_->task_table().Put(task);
}
......@@ -260,8 +266,10 @@ TEST_F(ResourceAdvanceTest, TEST_RESOURCE_TEST) {
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto label = std::make_shared<DefaultLabel>();
auto label = std::make_shared<SpecResLabel>(test_resource_);
auto task = std::make_shared<TestTask>(dummy, label);
std::vector<std::string> path{test_resource_->name()};
task->path() = Path(path, 0);
tasks.push_back(task);
test_resource_->task_table().Put(task);
}
......
......@@ -22,7 +22,6 @@
#include "scheduler/resource/GpuResource.h"
#include "scheduler/resource/TestResource.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/tasklabel/DefaultLabel.h"
namespace milvus {
namespace scheduler {
......@@ -187,8 +186,7 @@ TEST_F(ResourceMgrAdvanceTest, REGISTER_SUBSCRIBER) {
auto callback = [&](EventPtr event) { flag = true; };
mgr1_->RegisterSubscriber(callback);
TableFileSchemaPtr dummy = nullptr;
auto label = std::make_shared<DefaultLabel>();
disk_res->task_table().Put(std::make_shared<TestTask>(dummy, label));
disk_res->task_table().Put(std::make_shared<TestTask>(dummy, nullptr));
sleep(1);
ASSERT_TRUE(flag);
}
......
......@@ -23,7 +23,6 @@
#include "scheduler/Scheduler.h"
#include "scheduler/resource/Resource.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/tasklabel/DefaultLabel.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "utils/Error.h"
#include "wrapper/VecIndex.h"
......@@ -150,46 +149,6 @@ insert_dummy_index_into_gpu_cache(uint64_t device_id) {
cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location", obj);
}
TEST_F(SchedulerTest, ON_LOAD_COMPLETED) {
const uint64_t NUM = 10;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = std::make_shared<TableFileSchema>();
dummy->location_ = "location";
insert_dummy_index_into_gpu_cache(1);
for (uint64_t i = 0; i < NUM; ++i) {
auto label = std::make_shared<DefaultLabel>();
auto task = std::make_shared<TestTask>(dummy, label);
task->label() = std::make_shared<DefaultLabel>();
tasks.push_back(task);
cpu_resource_.lock()->task_table().Put(task);
}
sleep(3);
ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().size(), NUM);
}
TEST_F(SchedulerTest, PUSH_TASK_TO_NEIGHBOUR_RANDOMLY_TEST) {
const uint64_t NUM = 10;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy1 = std::make_shared<TableFileSchema>();
dummy1->location_ = "location";
tasks.clear();
for (uint64_t i = 0; i < NUM; ++i) {
auto label = std::make_shared<DefaultLabel>();
auto task = std::make_shared<TestTask>(dummy1, label);
task->label() = std::make_shared<DefaultLabel>();
tasks.push_back(task);
cpu_resource_.lock()->task_table().Put(task);
}
sleep(3);
// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
}
class SchedulerTest2 : public testing::Test {
protected:
void
......
......@@ -18,7 +18,6 @@
#include <gtest/gtest.h>
#include "scheduler/TaskTable.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/tasklabel/DefaultLabel.h"
/************ TaskTableBaseTest ************/
......@@ -162,9 +161,8 @@ class TaskTableBaseTest : public ::testing::Test {
SetUp() override {
milvus::scheduler::TableFileSchemaPtr dummy = nullptr;
invalid_task_ = nullptr;
auto label = std::make_shared<milvus::scheduler::DefaultLabel>();
task1_ = std::make_shared<milvus::scheduler::TestTask>(dummy, label);
task2_ = std::make_shared<milvus::scheduler::TestTask>(dummy, label);
task1_ = std::make_shared<milvus::scheduler::TestTask>(dummy, nullptr);
task2_ = std::make_shared<milvus::scheduler::TestTask>(dummy, nullptr);
}
milvus::scheduler::TaskPtr invalid_task_;
......@@ -320,8 +318,7 @@ class TaskTableAdvanceTest : public ::testing::Test {
SetUp() override {
milvus::scheduler::TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < 8; ++i) {
auto label = std::make_shared<milvus::scheduler::DefaultLabel>();
auto task = std::make_shared<milvus::scheduler::TestTask>(dummy, label);
auto task = std::make_shared<milvus::scheduler::TestTask>(dummy, nullptr);
table1_.Put(task);
}
......
......@@ -216,21 +216,6 @@ TEST_F(ConfigTest, SERVER_CONFIG_VALID_TEST) {
s = config.GetCacheConfigCpuCacheThreshold(float_val);
ASSERT_TRUE(float_val == cache_cpu_cache_threshold);
#ifdef MILVUS_GPU_VERSION
int64_t cache_gpu_cache_capacity = 1;
s = config.SetCacheConfigGpuCacheCapacity(std::to_string(cache_gpu_cache_capacity));
ASSERT_TRUE(s.ok());
s = config.GetCacheConfigGpuCacheCapacity(int64_val);
ASSERT_TRUE(s.ok());
ASSERT_TRUE(int64_val == cache_gpu_cache_capacity);
float cache_gpu_cache_threshold = 0.2;
s = config.SetCacheConfigGpuCacheThreshold(std::to_string(cache_gpu_cache_threshold));
ASSERT_TRUE(s.ok());
s = config.GetCacheConfigGpuCacheThreshold(float_val);
ASSERT_TRUE(float_val == cache_gpu_cache_threshold);
#endif
bool cache_insert_data = true;
s = config.SetCacheConfigCacheInsertData(std::to_string(cache_insert_data));
ASSERT_TRUE(s.ok());
......@@ -259,42 +244,54 @@ TEST_F(ConfigTest, SERVER_CONFIG_VALID_TEST) {
ASSERT_TRUE(s.ok());
ASSERT_TRUE(int32_val == engine_gpu_search_threshold);
/* resource config */
std::string resource_mode = "simple";
s = config.SetResourceConfigMode(resource_mode);
/* gpu resource config */
bool resource_enable_gpu = true;
s = config.SetGpuResourceConfigEnableGpu(std::to_string(resource_enable_gpu));
ASSERT_TRUE(s.ok());
s = config.GetResourceConfigMode(str_val);
s = config.GetGpuResourceConfigEnableGpu(bool_val);
ASSERT_TRUE(s.ok());
ASSERT_TRUE(str_val == resource_mode);
ASSERT_TRUE(bool_val == resource_enable_gpu);
#ifdef MILVUS_CPU_VERSION
std::vector<std::string> search_resources = {"cpu"};
#else
std::vector<std::string> search_resources = {"cpu", "gpu0"};
#endif
std::vector<std::string> res_vec;
std::string res_str;
#ifdef MILVUS_GPU_VERSION
int64_t gpu_cache_capacity = 1;
s = config.SetGpuResourceConfigCacheCapacity(std::to_string(gpu_cache_capacity));
ASSERT_TRUE(s.ok());
s = config.GetGpuResourceConfigCacheCapacity(int64_val);
ASSERT_TRUE(s.ok());
ASSERT_TRUE(int64_val == gpu_cache_capacity);
float gpu_cache_threshold = 0.2;
s = config.SetGpuResourceConfigCacheThreshold(std::to_string(gpu_cache_threshold));
ASSERT_TRUE(s.ok());
s = config.GetGpuResourceConfigCacheThreshold(float_val);
ASSERT_TRUE(float_val == gpu_cache_threshold);
std::vector<std::string> search_resources = {"gpu0"};
std::vector<int32_t> search_res_vec;
std::string search_res_str;
milvus::server::StringHelpFunctions::MergeStringWithDelimeter(
search_resources, milvus::server::CONFIG_RESOURCE_SEARCH_RESOURCES_DELIMITER, res_str);
s = config.SetResourceConfigSearchResources(res_str);
search_resources, milvus::server::CONFIG_GPU_RESOURCE_DELIMITER, search_res_str);
s = config.SetGpuResourceConfigSearchResources(search_res_str);
ASSERT_TRUE(s.ok());
s = config.GetResourceConfigSearchResources(res_vec);
s = config.GetGpuResourceConfigSearchResources(search_res_vec);
ASSERT_TRUE(s.ok());
for (size_t i = 0; i < search_resources.size(); i++) {
ASSERT_TRUE(search_resources[i] == res_vec[i]);
ASSERT_TRUE(std::stoi(search_resources[i].substr(3)) == search_res_vec[i]);
}
#ifdef MILVUS_CPU_VERSION
int32_t resource_index_build_device = milvus::server::CPU_DEVICE_ID;
s = config.SetResourceConfigIndexBuildDevice("cpu");
#else
int32_t resource_index_build_device = 0;
s = config.SetResourceConfigIndexBuildDevice("gpu" + std::to_string(resource_index_build_device));
#endif
std::vector<std::string> build_index_resources = {"gpu0"};
std::vector<int32_t> build_index_res_vec;
std::string build_index_res_str;
milvus::server::StringHelpFunctions::MergeStringWithDelimeter(
build_index_resources, milvus::server::CONFIG_GPU_RESOURCE_DELIMITER, build_index_res_str);
s = config.SetGpuResourceConfigBuildIndexResources(build_index_res_str);
ASSERT_TRUE(s.ok());
s = config.GetResourceConfigIndexBuildDevice(int32_val);
s = config.GetGpuResourceConfigBuildIndexResources(build_index_res_vec);
ASSERT_TRUE(s.ok());
ASSERT_TRUE(int32_val == resource_index_build_device);
for (size_t i = 0; i < build_index_resources.size(); i++) {
ASSERT_TRUE(std::stoi(build_index_resources[i].substr(3)) == build_index_res_vec[i]);
}
#endif
}
TEST_F(ConfigTest, SERVER_CONFIG_INVALID_TEST) {
......@@ -381,18 +378,6 @@ TEST_F(ConfigTest, SERVER_CONFIG_INVALID_TEST) {
s = config.SetCacheConfigCpuCacheThreshold("1.0");
ASSERT_FALSE(s.ok());
#ifdef MILVUS_GPU_VERSION
s = config.SetCacheConfigGpuCacheCapacity("a");
ASSERT_FALSE(s.ok());
s = config.SetCacheConfigGpuCacheCapacity("128");
ASSERT_FALSE(s.ok());
s = config.SetCacheConfigGpuCacheThreshold("a");
ASSERT_FALSE(s.ok());
s = config.SetCacheConfigGpuCacheThreshold("1.0");
ASSERT_FALSE(s.ok());
#endif
s = config.SetCacheConfigCacheInsertData("N");
ASSERT_FALSE(s.ok());
......@@ -408,20 +393,29 @@ TEST_F(ConfigTest, SERVER_CONFIG_INVALID_TEST) {
s = config.SetEngineConfigGpuSearchThreshold("-1");
ASSERT_FALSE(s.ok());
/* resource config */
s = config.SetResourceConfigMode("default");
/* gpu resource config */
s = config.SetGpuResourceConfigEnableGpu("ok");
ASSERT_FALSE(s.ok());
s = config.SetResourceConfigSearchResources("gpu10");
#ifdef MILVUS_GPU_VERSION
s = config.SetGpuResourceConfigCacheCapacity("a");
ASSERT_FALSE(s.ok());
s = config.SetGpuResourceConfigCacheCapacity("128");
ASSERT_FALSE(s.ok());
s = config.SetResourceConfigSearchResources("cpu");
ASSERT_TRUE(s.ok());
s = config.SetGpuResourceConfigCacheThreshold("a");
ASSERT_FALSE(s.ok());
s = config.SetGpuResourceConfigCacheThreshold("1.0");
ASSERT_FALSE(s.ok());
s = config.SetResourceConfigIndexBuildDevice("gup2");
s = config.SetGpuResourceConfigSearchResources("gpu10");
ASSERT_FALSE(s.ok());
s = config.SetResourceConfigIndexBuildDevice("gpu16");
s = config.SetGpuResourceConfigBuildIndexResources("gup2");
ASSERT_FALSE(s.ok());
s = config.SetGpuResourceConfigBuildIndexResources("gpu16");
ASSERT_FALSE(s.ok());
#endif
}
TEST_F(ConfigTest, SERVER_CONFIG_TEST) {
......@@ -438,4 +432,3 @@ TEST_F(ConfigTest, SERVER_CONFIG_TEST) {
s = config.ResetDefaultConfig();
ASSERT_TRUE(s.ok());
}
......@@ -85,7 +85,6 @@ class RpcHandlerTest : public testing::Test {
// DBWrapper::GetInstance().GetInstance().StartService();
// DBWrapper::GetInstance().GetInstance().StopService();
milvus::server::Config::GetInstance().SetResourceConfigMode("single");
milvus::server::DBWrapper::GetInstance().StartService();
// initialize handler, create table
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册