提交 3bf12c0d 编写于 作者: J Jin Hai 提交者: GitHub

Merge pull request #165 from fishpenguin/branch-0.5.1-yk

 #164 - Add CPU version for building index 

Former-commit-id: 0787555623a1e34dc7c9d52a95e674100e445e95
......@@ -14,6 +14,7 @@ Please mark all change in change log and use the ticket from JIRA.
- \#115 - Using new structure for tasktable
- \#139 - New config option use_gpu_threshold
- \#146 - Add only GPU and only CPU version for IVF_SQ8 and IVF_FLAT
- \#164 - Add CPU version for building index
## Improvement
- \#64 - Improvement dump function in scheduler
......
......@@ -104,20 +104,25 @@ JobMgr::build_task(const JobPtr& job) {
void
JobMgr::calculate_path(const TaskPtr& task) {
if (task->type_ != TaskType::SearchTask) {
return;
}
if (task->type_ == TaskType::SearchTask) {
if (task->label()->Type() != TaskLabelType::SPECIFIED_RESOURCE) {
return;
}
if (task->label()->Type() != TaskLabelType::SPECIFIED_RESOURCE) {
return;
std::vector<std::string> path;
auto spec_label = std::static_pointer_cast<SpecResLabel>(task->label());
auto src = res_mgr_->GetDiskResources()[0];
auto dest = spec_label->resource();
ShortestPath(src.lock(), dest.lock(), res_mgr_, path);
task->path() = Path(path, path.size() - 1);
} else if (task->type_ == TaskType::BuildIndexTask) {
auto spec_label = std::static_pointer_cast<SpecResLabel>(task->label());
auto src = res_mgr_->GetDiskResources()[0];
auto dest = spec_label->resource();
std::vector<std::string> path;
ShortestPath(src.lock(), dest.lock(), res_mgr_, path);
task->path() = Path(path, path.size() - 1);
}
std::vector<std::string> path;
auto spec_label = std::static_pointer_cast<SpecResLabel>(task->label());
auto src = res_mgr_->GetDiskResources()[0];
auto dest = spec_label->resource();
ShortestPath(src.lock(), dest.lock(), res_mgr_, path);
task->path() = Path(path, path.size() - 1);
}
} // namespace scheduler
......
......@@ -55,8 +55,8 @@ load_simple_config() {
// get resources
auto gpu_ids = get_gpu_pool();
int32_t build_gpu_id;
config.GetResourceConfigIndexBuildDevice(build_gpu_id);
int32_t index_build_device_id;
config.GetResourceConfigIndexBuildDevice(index_build_device_id);
// create and connect
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("disk", "DISK", 0, true, false));
......@@ -70,15 +70,15 @@ load_simple_config() {
for (auto& gpu_id : gpu_ids) {
ResMgrInst::GetInstance()->Add(ResourceFactory::Create(std::to_string(gpu_id), "GPU", gpu_id, true, true));
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(gpu_id), pcie);
if (build_gpu_id == gpu_id) {
if (index_build_device_id == gpu_id) {
find_build_gpu_id = true;
}
}
if (not find_build_gpu_id) {
if (not find_build_gpu_id && index_build_device_id != server::CPU_DEVICE_ID) {
ResMgrInst::GetInstance()->Add(
ResourceFactory::Create(std::to_string(build_gpu_id), "GPU", build_gpu_id, true, true));
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(build_gpu_id), pcie);
ResourceFactory::Create(std::to_string(index_build_device_id), "GPU", index_build_device_id, true, true));
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(index_build_device_id), pcie);
}
}
......
......@@ -106,7 +106,6 @@ class OptimizerInst {
has_cpu = true;
}
}
std::vector<PassPtr> pass_list;
pass_list.push_back(std::make_shared<LargeSQ8HPass>());
pass_list.push_back(std::make_shared<HybridPass>());
......
......@@ -70,8 +70,15 @@ TaskCreator::Create(const DeleteJobPtr& job) {
std::vector<TaskPtr>
TaskCreator::Create(const BuildIndexJobPtr& job) {
std::vector<TaskPtr> tasks;
// TODO(yukun): remove "disk" hardcode here
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk");
server::Config& config = server::Config::GetInstance();
int32_t build_index_id;
Status stat = config.GetResourceConfigIndexBuildDevice(build_index_id);
ResourcePtr res_ptr;
if (build_index_id == server::CPU_DEVICE_ID) {
res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
} else {
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, build_index_id);
}
for (auto& to_index_file : job->to_index_files()) {
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
......
......@@ -138,73 +138,41 @@ Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, Resou
std::shared_ptr<LoadCompletedEvent> event) {
auto task_item = event->task_table_item_;
auto task = event->task_table_item_->task;
if (resource->type() == ResourceType::DISK) {
// step 1: calculate shortest path per resource, from disk to compute resource
auto compute_resources = res_mgr->GetComputeResources();
std::vector<std::vector<std::string>> paths;
std::vector<uint64_t> transport_costs;
for (auto& res : compute_resources) {
std::vector<std::string> path;
uint64_t transport_cost = ShortestPath(resource, res, res_mgr, path);
transport_costs.push_back(transport_cost);
paths.emplace_back(path);
}
// if (task->job_.lock()->type() == JobType::SEARCH) {
// auto label = task->label();
// auto spec_label = std::static_pointer_cast<SpecResLabel>(label);
// if (spec_label->resource().lock()->type() == ResourceType::CPU) {
// std::vector<std::string> spec_path;
// spec_path.push_back(spec_label->resource().lock()->name());
// spec_path.push_back(resource->name());
// task->path() = Path(spec_path, spec_path.size() - 1);
// } else {
// // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
// uint64_t min_cost = std::numeric_limits<uint64_t>::max();
// uint64_t min_cost_idx = 0;
// for (uint64_t i = 0; i < compute_resources.size(); ++i) {
// if (compute_resources[i]->TotalTasks() == 0) {
// min_cost_idx = i;
// break;
// }
// uint64_t cost = compute_resources[i]->TaskAvgCost() *
// compute_resources[i]->NumOfTaskToExec() +
// transport_costs[i];
// if (min_cost > cost) {
// min_cost = cost;
// min_cost_idx = i;
// }
// }
//
// // step 3: set path in task
// Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
// task->path() = task_path;
// }
//
// } else
if (task->job_.lock()->type() == JobType::BUILD) {
// step2: Read device id in config
// get build index gpu resource
server::Config& config = server::Config::GetInstance();
int32_t build_index_gpu;
Status stat = config.GetResourceConfigIndexBuildDevice(build_index_gpu);
bool find_gpu_res = false;
if (res_mgr->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->name() ==
res_mgr->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
find_gpu_res = true;
Path task_path(paths[i], paths[i].size() - 1);
task->path() = task_path;
break;
}
}
}
if (not find_gpu_res) {
task->path() = Path(paths[0], paths[0].size() - 1);
}
}
}
// if (resource->type() == ResourceType::DISK) {
// // step 1: calculate shortest path per resource, from disk to compute resource
// auto compute_resources = res_mgr->GetComputeResources();
// std::vector<std::vector<std::string>> paths;
// std::vector<uint64_t> transport_costs;
// for (auto& res : compute_resources) {
// std::vector<std::string> path;
// uint64_t transport_cost = ShortestPath(resource, res, res_mgr, path);
// transport_costs.push_back(transport_cost);
// paths.emplace_back(path);
// }
// if (task->job_.lock()->type() == JobType::BUILD) {
// // step2: Read device id in config
// // get build index gpu resource
// server::Config& config = server::Config::GetInstance();
// int32_t build_index_gpu;
// Status stat = config.GetResourceConfigIndexBuildDevice(build_index_gpu);
//
// bool find_gpu_res = false;
// if (res_mgr->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
// for (uint64_t i = 0; i < compute_resources.size(); ++i) {
// if (compute_resources[i]->name() ==
// res_mgr->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
// find_gpu_res = true;
// Path task_path(paths[i], paths[i].size() - 1);
// task->path() = task_path;
// break;
// }
// }
// }
// if (not find_gpu_res) {
// task->path() = Path(paths[0], paths[0].size() - 1);
// }
// }
// }
if (resource->name() == task->path().Last()) {
resource->WakeupExecutor();
......
......@@ -46,7 +46,7 @@ OnlyGPUPass::Run(const TaskPtr& task) {
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
task->label() = label;
specified_gpu_id_ = specified_gpu_id_++ % gpu_id.size();
specified_gpu_id_ = (specified_gpu_id_ + 1) % gpu_id.size();
return true;
}
......
......@@ -590,15 +590,18 @@ Config::CheckCacheConfigGpuCacheCapacity(const std::string& value) {
return Status(SERVER_INVALID_ARGUMENT, msg);
} else {
uint64_t gpu_cache_capacity = std::stoi(value) * GB;
int gpu_index;
Status s = GetResourceConfigIndexBuildDevice(gpu_index);
int device_id;
Status s = GetResourceConfigIndexBuildDevice(device_id);
if (!s.ok()) {
return s;
}
if (device_id == server::CPU_DEVICE_ID)
return Status::OK();
size_t gpu_memory;
if (!ValidationUtil::GetGpuMemory(gpu_index, gpu_memory).ok()) {
std::string msg = "Fail to get GPU memory for GPU device: " + std::to_string(gpu_index);
if (!ValidationUtil::GetGpuMemory(device_id, gpu_memory).ok()) {
std::string msg = "Fail to get GPU memory for GPU device: " + std::to_string(device_id);
return Status(SERVER_UNEXPECTED_ERROR, msg);
} else if (gpu_cache_capacity >= gpu_memory) {
std::string msg = "Invalid gpu cache capacity: " + value +
......@@ -1013,7 +1016,12 @@ Config::GetResourceConfigIndexBuildDevice(int32_t& value) {
return s;
}
value = std::stoi(str.substr(3));
if (str == "cpu") {
value = CPU_DEVICE_ID;
} else {
value = std::stoi(str.substr(3));
}
return Status::OK();
}
......
......@@ -95,6 +95,8 @@ static const char* CONFIG_RESOURCE_SEARCH_RESOURCES = "search_resources";
static const char* CONFIG_RESOURCE_INDEX_BUILD_DEVICE = "index_build_device";
static const char* CONFIG_RESOURCE_INDEX_BUILD_DEVICE_DEFAULT = "gpu0";
const int32_t CPU_DEVICE_ID = -1;
class Config {
public:
static Config&
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册