提交 56848e6a 编写于 作者: W willzhang4a58

add cpu device

上级 e47e915f
......@@ -343,7 +343,7 @@ void ChainGraph::BuildLossPrintStruct() {
ParallelConf loss_print_pr_conf;
loss_print_pr_conf.set_policy(kDataParallel);
loss_print_pr_conf.add_device_name(
IDMgr::Singleton()->MachineName4MachineId(0) + ":0");
IDMgr::Singleton()->MachineName4MachineId(0) + ":persistence:1");
auto loss_print_chain = NewNode<LossPrintChainNode>();
loss_print_chain->mut_op_vec() = {loss_print_op};
loss_print_chain->mut_parallel_desc().reset(
......
......@@ -128,10 +128,10 @@ void ChainNode::GenSortedCompTaskNodes(CompTaskNodeHandler Handler) const {
int64_t parallel_idx = 0;
int64_t parallel_num = parallel_desc_->parallel_num();
for (int64_t machine_id : parallel_desc_->sorted_machine_ids()) {
for (int64_t dev_phy_id : parallel_desc_->sorted_dev_phy_ids(machine_id)) {
for (int64_t thrd_id : parallel_desc_->sorted_thrd_ids(machine_id)) {
CompTaskNode* comp_task_node = NewCompTaskNode();
comp_task_node->set_machine_id(machine_id);
comp_task_node->set_thrd_id(dev_phy_id);
comp_task_node->set_thrd_id(thrd_id);
comp_task_node->set_chain_node(this);
comp_task_node->mut_parallel_ctx()->set_parallel_id(parallel_idx++);
comp_task_node->mut_parallel_ctx()->set_parallel_num(parallel_num);
......
......@@ -10,8 +10,8 @@ const std::string& IDMgr::MachineName4MachineId(int64_t machine_id) const {
}
DeviceType IDMgr::GetDeviceTypeFromThrdId(int64_t thrd_id) const {
if (thrd_id < device_num_per_machine_) {
return JobDesc::Singleton()->resource().device_type();
if (cpu_device_num_ <= thrd_id && thrd_id < xpu_device_num_) {
return DeviceType::kGPU;
} else {
return DeviceType::kCPU;
}
......@@ -24,21 +24,30 @@ int64_t IDMgr::NewTaskId(int64_t machine_id, int64_t thrd_id) {
return machine_thrd_id | (thread_id2num_of_tasks_[machine_thrd_id]++);
}
int64_t IDMgr::GetGpuDeviceThrdId(int64_t dev_phy_id) const {
return cpu_device_num_ + dev_phy_id;
}
int64_t IDMgr::GetGpuDevPhyIdFromThrdId(int64_t thrd_id) const {
CHECK_GE(thrd_id, cpu_device_num_);
return thrd_id - cpu_device_num_;
}
int64_t IDMgr::AllocatePersistenceThrdId(int64_t machine_id) {
int64_t& offset = persistence_thrd_offset_[machine_id];
int64_t ret = device_num_per_machine_ + offset;
int64_t ret = xpu_device_num_ + offset;
offset = (offset + 1) % JobDesc::Singleton()->PersistenceWorkerNum();
return ret;
}
int64_t IDMgr::AllocateBoxingThrdId(int64_t machine_id) {
int64_t offset = boxing_thrd_offset_[machine_id];
int64_t ret = device_num_per_machine_
+ JobDesc::Singleton()->PersistenceWorkerNum() + offset;
int64_t ret =
xpu_device_num_ + JobDesc::Singleton()->PersistenceWorkerNum() + offset;
offset = (offset + 1) % JobDesc::Singleton()->BoxingWorkerNum();
return ret;
}
int64_t IDMgr::CommNetThrdId() const {
return device_num_per_machine_ + JobDesc::Singleton()->PersistenceWorkerNum()
return xpu_device_num_ + JobDesc::Singleton()->PersistenceWorkerNum()
+ JobDesc::Singleton()->BoxingWorkerNum();
}
......@@ -75,19 +84,21 @@ int64_t IDMgr::NewWorkStreamId(int64_t machine_id, int64_t thrd_id) {
IDMgr::IDMgr() {
const Resource& resource = JobDesc::Singleton()->resource();
machine_num_ = resource.machine_size();
CHECK_LT(machine_num_, static_cast<int64_t>(1) << machine_id_bit_num_);
device_num_per_machine_ = resource.device_num_per_machine();
CHECK_LT(device_num_per_machine_,
int64_t machine_num = resource.machine_size();
CHECK_LT(machine_num, static_cast<int64_t>(1) << machine_id_bit_num_);
cpu_device_num_ = resource.cpu_device_num();
gpu_device_num_ = resource.gpu_device_num();
xpu_device_num_ = cpu_device_num_ + gpu_device_num_;
CHECK_LT(xpu_device_num_,
(static_cast<int64_t>(1) << thread_id_bit_num_) - 3);
for (int64_t i = 0; i < machine_num_; ++i) {
for (int64_t i = 0; i < machine_num; ++i) {
const std::string& machine_name = resource.machine(i).name();
CHECK(machine_name2machine_id_.emplace(machine_name, i).second);
CHECK(machine_id2machine_name_.emplace(i, machine_name).second);
}
regst_desc_id_count_ = 0;
persistence_thrd_offset_.assign(machine_num_, 0);
boxing_thrd_offset_.assign(machine_num_, 0);
persistence_thrd_offset_.assign(machine_num, 0);
boxing_thrd_offset_.assign(machine_num, 0);
}
int64_t IDMgr::GetMachineThrdId(int64_t machine_id, int64_t thrd_id) {
......
......@@ -20,6 +20,11 @@ class IDMgr final {
DeviceType GetDeviceTypeFromThrdId(int64_t thrd_id) const;
int64_t NewTaskId(int64_t machine_id, int64_t thrd_id);
int64_t GetCpuDeviceThrdId(int64_t dev_phy_id) const { return dev_phy_id; }
int64_t GetGpuDeviceThrdId(int64_t dev_phy_id) const;
int64_t GetGpuDevPhyIdFromThrdId(int64_t thrd_id) const;
int64_t AllocatePersistenceThrdId(int64_t machine_id);
int64_t AllocateBoxingThrdId(int64_t machine_id);
int64_t CommNetThrdId() const;
......@@ -44,8 +49,9 @@ class IDMgr final {
IDMgr();
int64_t GetMachineThrdId(int64_t machine_id, int64_t thrd_id);
int32_t machine_num_;
int64_t device_num_per_machine_;
int64_t cpu_device_num_;
int64_t gpu_device_num_;
int64_t xpu_device_num_;
int64_t regst_desc_id_count_;
HashMap<int64_t, int64_t> thread_id2num_of_tasks_;
HashMap<int64_t, int64_t> thread_id2num_of_streams_;
......
......@@ -28,7 +28,9 @@ class JobDesc final {
size_t SizeOfOneDataId() const;
bool use_rdma() const { return job_conf_.use_rdma(); }
int64_t TotalMachineNum() const { return resource_.machine().size(); }
DeviceType GetDeviceType() const { return resource_.device_type(); }
int32_t CpuDeviceNum() const { return resource_.cpu_device_num(); }
int32_t GpuDeviceNum() const { return resource_.gpu_device_num(); }
int32_t XpuDeviceNum() const { return CpuDeviceNum() + GpuDeviceNum(); }
int32_t PersistenceWorkerNum() const;
int32_t BoxingWorkerNum() const;
int32_t CommNetWorkerNum() const;
......
......@@ -22,10 +22,8 @@ std::string GetAmdCtrlKey(int64_t machine_id) {
void PushAvailableMemDescOfThisMachine() {
const JobDesc* job_desc = JobDesc::Singleton();
AvailableMemDescOfMachine this_machine_mem_desc;
if (job_desc->GetDeviceType() == DeviceType::kGPU) {
FOR_RANGE(int, i, 0, job_desc->resource().device_num_per_machine()) {
this_machine_mem_desc.add_zone_size(GetAvailableGpuMemSize(i));
}
FOR_RANGE(int, i, 0, job_desc->GpuDeviceNum()) {
this_machine_mem_desc.add_zone_size(GetAvailableGpuMemSize(i));
}
this_machine_mem_desc.add_zone_size(GetAvailableCpuMemSize());
CtrlClient::Singleton()->PushKV(
......
......@@ -5,34 +5,59 @@ namespace oneflow {
namespace {
void ParseDeviceNameConf(const std::string& device_name, std::string* mchn_name,
std::string* device_id_str) {
size_t delimiter_pos = device_name.rfind(":");
CHECK_NE(delimiter_pos, std::string::npos);
*mchn_name = device_name.substr(0, delimiter_pos);
*device_id_str = device_name.substr(delimiter_pos + 1);
std::string* device_tag, std::string* device_id_str) {
size_t second_delimiter_pos = device_name.rfind(":");
CHECK_NE(second_delimiter_pos, std::string::npos);
size_t first_delimiter_pos = device_name.rfind(":", second_delimiter_pos - 1);
CHECK_NE(first_delimiter_pos, std::string::npos);
*mchn_name = device_name.substr(0, first_delimiter_pos);
*device_tag = device_name.substr(
first_delimiter_pos + 1, second_delimiter_pos - first_delimiter_pos - 1);
*device_id_str = device_name.substr(second_delimiter_pos + 1);
}
} // namespace
ParallelDesc::ParallelDesc(const ParallelConf& user_conf) {
policy_ = user_conf.policy();
HashSet<std::string> machine_name_set;
std::string device_tag_check;
for (const std::string& device_name : user_conf.device_name()) {
std::string mchn_name;
std::string device_tag;
std::string device_id_str;
ParseDeviceNameConf(device_name, &mchn_name, &device_id_str);
ParseDeviceNameConf(device_name, &mchn_name, &device_tag, &device_id_str);
if (device_tag_check == "") {
device_tag_check = device_tag;
} else if (device_tag_check == "persistence") {
CHECK_STREQ(device_tag.c_str(), "persistence");
CHECK(machine_name_set.find(mchn_name) == machine_name_set.end());
} else {
// do nothing
}
machine_name_set.insert(mchn_name);
if (device_tag == "persistence") {
int64_t part_num = oneflow_cast<int64_t>(device_id_str);
device_id_str = "0-" + std::to_string(part_num - 1);
}
int64_t machine_id = IDMgr::Singleton()->MachineID4MachineName(mchn_name);
sorted_machine_ids_.push_back(machine_id);
int64_t minus_pos = device_id_str.rfind("-");
int64_t minus_pos = device_id_str.find("-");
if (minus_pos == std::string::npos) {
int64_t dev_phy_id = oneflow_cast<int64_t>(device_id_str);
machine_id2sorted_dev_phy_ids_[machine_id] = {dev_phy_id};
continue;
device_id_str = device_id_str + "-" + device_id_str;
minus_pos = device_id_str.find("-");
}
int64_t min_id = oneflow_cast<int64_t>(device_id_str.substr(0, minus_pos));
int64_t max_id = oneflow_cast<int64_t>(device_id_str.substr(minus_pos + 1));
CHECK_LE(min_id, max_id);
for (int64_t dev_phy_id = min_id; dev_phy_id <= max_id; ++dev_phy_id) {
machine_id2sorted_dev_phy_ids_[machine_id].push_back(dev_phy_id);
int64_t thrd_id = -1;
if (device_tag == "gpu") {
thrd_id = IDMgr::Singleton()->GetGpuDeviceThrdId(dev_phy_id);
} else {
thrd_id = IDMgr::Singleton()->GetCpuDeviceThrdId(dev_phy_id);
}
machine_id2sorted_thrd_ids_[machine_id].push_back(thrd_id);
}
}
ClearUp();
......@@ -47,7 +72,7 @@ void ParallelDesc::RemoveNeedlessDevice(const std::string& op_name,
int32_t device_cnt = 0;
int64_t max_machine_id = -1;
for (int64_t machine_id : sorted_machine_ids_) {
auto it = machine_id2sorted_dev_phy_ids_.find(machine_id);
auto it = machine_id2sorted_thrd_ids_.find(machine_id);
int32_t cur_device_num = it->second.size();
int32_t cur_device_max_num = max_device_num - device_cnt;
if (cur_device_num > cur_device_max_num) {
......@@ -71,7 +96,7 @@ void ParallelDesc::RemoveNeedlessDevice(const std::string& op_name,
}
}
EraseIf<int64_t, std::vector<int64_t>>(
&machine_id2sorted_dev_phy_ids_,
&machine_id2sorted_thrd_ids_,
[&](HashMap<int64_t, std::vector<int64_t>>::iterator it) {
return it->first > max_machine_id;
});
......@@ -80,10 +105,10 @@ void ParallelDesc::RemoveNeedlessDevice(const std::string& op_name,
void ParallelDesc::RemoveInvalidDevice(const std::string& op_name) {
for (int64_t machine_id : sorted_machine_ids_) {
auto& sorted_dev_ids = machine_id2sorted_dev_phy_ids_.at(machine_id);
auto bound_it = std::lower_bound(
sorted_dev_ids.begin(), sorted_dev_ids.end(),
JobDesc::Singleton()->resource().device_num_per_machine());
auto& sorted_dev_ids = machine_id2sorted_thrd_ids_.at(machine_id);
auto bound_it =
std::lower_bound(sorted_dev_ids.begin(), sorted_dev_ids.end(),
JobDesc::Singleton()->XpuDeviceNum());
if (bound_it == sorted_dev_ids.end()) {
continue;
} else {
......@@ -100,19 +125,18 @@ void ParallelDesc::RemoveInvalidDevice(const std::string& op_name) {
bool ParallelDesc::Equal(const ParallelDesc& rhs) const {
return policy_ == rhs.policy_
&& sorted_machine_ids_ == rhs.sorted_machine_ids_
&& machine_id2sorted_dev_phy_ids_
== rhs.machine_id2sorted_dev_phy_ids_;
&& machine_id2sorted_thrd_ids_ == rhs.machine_id2sorted_thrd_ids_;
}
void ParallelDesc::ClearUp() {
EraseIf<int64_t, std::vector<int64_t>>(
&machine_id2sorted_dev_phy_ids_,
&machine_id2sorted_thrd_ids_,
[](HashMap<int64_t, std::vector<int64_t>>::iterator it) {
return it->second.empty();
});
sorted_machine_ids_.clear();
parallel_num_ = 0;
for (auto& pair : machine_id2sorted_dev_phy_ids_) {
for (auto& pair : machine_id2sorted_thrd_ids_) {
sorted_machine_ids_.push_back(pair.first);
SortAndRemoveDuplication(&(pair.second));
parallel_num_ += pair.second.size();
......
......@@ -21,8 +21,8 @@ class ParallelDesc {
const std::vector<int64_t>& sorted_machine_ids() const {
return sorted_machine_ids_;
}
const std::vector<int64_t>& sorted_dev_phy_ids(int64_t machine_id) const {
return machine_id2sorted_dev_phy_ids_.at(machine_id);
const std::vector<int64_t>& sorted_thrd_ids(int64_t machine_id) const {
return machine_id2sorted_thrd_ids_.at(machine_id);
}
int64_t parallel_num() const { return parallel_num_; }
......@@ -44,7 +44,7 @@ class ParallelDesc {
ParallelPolicy policy_;
std::vector<int64_t> sorted_machine_ids_;
HashMap<int64_t, std::vector<int64_t>> machine_id2sorted_dev_phy_ids_;
HashMap<int64_t, std::vector<int64_t>> machine_id2sorted_thrd_ids_;
int64_t parallel_num_;
};
......
......@@ -14,18 +14,24 @@ enum DeviceType {
message Resource {
repeated Machine machine = 1;
required int32 device_num_per_machine = 2;
required DeviceType device_type = 3;
required int32 cpu_device_num = 2;
required int32 gpu_device_num = 3;
optional int32 persistence_worker_num = 4 [default = 4];
optional int32 boxing_worker_num = 5 [default = 4];
optional int32 comm_net_worker_num = 6 [default = 4];
}
// If one machine named "machine_xxx" and device_num_per_machine = 4
// then we have 4 devices:
// "machine_xxx:0"
// "machine_xxx:1"
// "machine_xxx:2"
// "machine_xxx:3"
// If one machine named "machine_xxx" and cpu_device_num_per_machine = 4, gpu_device_num_per_machine = 2
// then we have follow device_names:
// "machine_xxx:cpu:0"
// "machine_xxx:cpu:1"
// "machine_xxx:cpu:2"
// "machine_xxx:cpu:3"
// "machine_xxx:gpu:0"
// "machine_xxx:gpu:1"
// "machine_xxx:persistence"
//
// "machine_xxx:0-2" means {"machine_xxx:0", "machine_xxx:1", "machine_xxx:2"}
// "machine_xxx:persistence" is special device_name for persistence_op such as "data_loader", "print"
// "data_loader" placed with "machine_xxx:persistence:10" means: load 10-part on "machine_xxx:persistence"
// "machine_xxx:xpu:0-2" means {"machine_xxx:xpu:0", "machine_xxx:xpu:1", "machine_xxx:xpu:2"}
......@@ -86,6 +86,7 @@ void Runtime::NewAllSingleton(const Plan& plan, bool is_experiment_phase) {
EpollCommNet::Init();
#endif
SnapshotMgr::NewSingleton(plan);
MemoryAllocator::NewSingleton();
RegstMgr::NewSingleton();
ActorMsgBus::NewSingleton();
ThreadMgr::NewSingleton();
......@@ -95,6 +96,7 @@ void Runtime::DeleteAllSingleton() {
ThreadMgr::DeleteSingleton();
ActorMsgBus::DeleteSingleton();
RegstMgr::DeleteSingleton();
MemoryAllocator::DeleteSingleton();
SnapshotMgr::DeleteSingleton();
delete CommNet::Singleton();
RuntimeCtx::DeleteSingleton();
......
......@@ -100,7 +100,8 @@ void RegstDesc::InferMemCase() {
int64_t thrd_id = producer_->thrd_id();
if (auto cp_hd_producer = dynamic_cast<const CopyHdTaskNode*>(producer_)) {
if (cp_hd_producer->copy_type() == CopyHdOpConf::H2D) {
mem_case_.mutable_device_cuda_mem()->set_device_id(thrd_id);
mem_case_.mutable_device_cuda_mem()->set_device_id(
IDMgr::Singleton()->GetGpuDevPhyIdFromThrdId(thrd_id));
} else {
mem_case_.mutable_host_pinned_mem()->set_used_by_device(true);
SetHostPinnedMemoryAccordingToConsumers(consumers_, &mem_case_);
......@@ -110,7 +111,8 @@ void RegstDesc::InferMemCase() {
SetHostPinnedMemoryAccordingToConsumers(consumers_, &mem_case_);
} else {
if (producer_->device_type() == kGPU) {
mem_case_.mutable_device_cuda_mem()->set_device_id(thrd_id);
mem_case_.mutable_device_cuda_mem()->set_device_id(
IDMgr::Singleton()->GetGpuDevPhyIdFromThrdId(thrd_id));
} else {
mem_case_.mutable_host_pageable_mem();
SetHostPinnedMemoryAccordingToConsumers(consumers_, &mem_case_);
......
......@@ -19,13 +19,13 @@ Thread* ThreadMgr::GetThrd(int64_t thrd_id) { return threads_.at(thrd_id); }
ThreadMgr::ThreadMgr() {
const JobDesc* job_desc = JobDesc::Singleton();
int64_t thrd_id = 0;
// device
FOR_RANGE(int64_t, dev_id, 0, job_desc->resource().device_num_per_machine()) {
if (job_desc->resource().device_type() == kGPU) {
threads_.push_back(new GpuThread(thrd_id++, dev_id));
} else {
threads_.push_back(new CpuThread(thrd_id++));
}
// cpu device
FOR_RANGE(int64_t, i, 0, job_desc->CpuDeviceNum()) {
threads_.push_back(new CpuThread(thrd_id++));
}
// gpu device
FOR_RANGE(int64_t, i, 0, job_desc->GpuDeviceNum()) {
threads_.push_back(new GpuThread(thrd_id++, i));
}
// persistence
FOR_RANGE(int64_t, i, 0, job_desc->PersistenceWorkerNum()) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册