未验证 提交 13b2a48d 编写于 作者: J Juncheng 提交者: GitHub

Remove IDMgr::GetGpuPhyIdFromThrdId/IDMgr::GetDeviceTypeFromThrdId (#6169)

* Remove IDMgr::GetGpuPhyIdFromThrdId/IDMgr::GetDeviceTypeFromThrdId

* CHECK(new_task_id_)
Co-authored-by: Noneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
上级 332c05c5
......@@ -151,7 +151,8 @@ Maybe<SubTskGphBuilderStatus> SliceBoxingSubTskGphBuilder::Build(
thrd_id = Global<IDMgr>::Get()->PickCpuThrdIdEvenly(src_node->machine_id());
} else if (src_node->device_type() == DeviceType::kGPU) {
#ifdef WITH_CUDA
thrd_id = GetBoxingGpuThrdId(src_node->machine_id(), src_node->GpuPhyId(), "D2H");
thrd_id = GetBoxingGpuThrdId(src_node->machine_id(),
src_node->stream_id().device_id().device_index(), "D2H");
#else
UNIMPLEMENTED();
#endif
......@@ -275,7 +276,8 @@ Maybe<SubTskGphBuilderStatus> SliceBoxingSubTskGphBuilder::Build(
} else if (in_pd.device_type() == DeviceType::kGPU) {
#ifdef WITH_CUDA
TaskNode* node = in_nodes.at(in_parallel_ids.at(out_id % in_parallel_ids.size()));
local_concat_thrd_id = GetBoxingGpuThrdId(node->machine_id(), node->GpuPhyId(), "D2H");
local_concat_thrd_id = GetBoxingGpuThrdId(
node->machine_id(), node->stream_id().device_id().device_index(), "D2H");
#else
UNIMPLEMENTED();
#endif
......@@ -336,7 +338,8 @@ Maybe<SubTskGphBuilderStatus> SliceBoxingSubTskGphBuilder::Build(
} else if (in_pd.device_type() == DeviceType::kGPU) {
#ifdef WITH_CUDA
TaskNode* node = in_nodes.at(in_parallel_ids.at(out_id % in_parallel_ids.size()));
local_add_thrd_id = GetBoxingGpuThrdId(node->machine_id(), node->GpuPhyId(), "D2H");
local_add_thrd_id = GetBoxingGpuThrdId(
node->machine_id(), node->stream_id().device_id().device_index(), "D2H");
#else
UNIMPLEMENTED();
#endif
......@@ -382,7 +385,8 @@ Maybe<SubTskGphBuilderStatus> SliceBoxingSubTskGphBuilder::Build(
} else if (in_pd.device_type() == DeviceType::kGPU) {
#ifdef WITH_CUDA
TaskNode* node = in_nodes.at(in_ids_on_machine.front());
local_add_thrd_id = GetBoxingGpuThrdId(node->machine_id(), node->GpuPhyId(), "H2D");
local_add_thrd_id = GetBoxingGpuThrdId(
node->machine_id(), node->stream_id().device_id().device_index(), "H2D");
#else
UNIMPLEMENTED();
#endif
......
......@@ -35,8 +35,8 @@ bool SubTskGphBuilderUtil::HasEmptySliceIfSplit(int64_t parallel_num,
}
bool SubTskGphBuilderUtil::IsOnSameGPU(const TaskNode* lhs, const TaskNode* rhs) {
return lhs->machine_id() == rhs->machine_id() && lhs->device_type() == DeviceType::kGPU
&& rhs->device_type() == DeviceType::kGPU && lhs->GpuPhyId() == rhs->GpuPhyId();
return lhs->stream_id().device_id() == rhs->stream_id().device_id()
&& lhs->stream_id().device_id().device_type() == DeviceType::kGPU;
}
bool SubTskGphBuilderUtil::IsBoxingS2S(const cfg::SbpParallel& src, const cfg::SbpParallel& dst) {
......@@ -104,19 +104,18 @@ int64_t SubTskGphBuilderUtil::GetDistance(const ParallelDesc& src_parallel_desc,
}
int64_t SubTskGphBuilderUtil::GetDistance(const TaskNode* src, const TaskNode* dst) {
const auto GetDevPhyId = [](const DeviceType device_type, const int64_t thrd_id) -> int64_t {
if (device_type == DeviceType::kGPU) {
return Global<IDMgr>::Get()->GetGpuPhyIdFromThrdId(thrd_id);
} else if (device_type == DeviceType::kCPU) {
const auto GetDevPhyId = [](const TaskNode* node) -> int64_t {
const DeviceId& device_id = node->stream_id().device_id();
if (device_id.device_type() == DeviceType::kCPU) {
return 0;
} else {
UNIMPLEMENTED();
return device_id.device_index();
}
};
const DeviceType src_device_type = src->device_type();
const int64_t src_dev_phy_id = GetDevPhyId(src_device_type, src->thrd_id());
const int64_t src_dev_phy_id = GetDevPhyId(src);
const DeviceType dst_device_type = dst->device_type();
const int64_t dst_dev_phy_id = GetDevPhyId(dst_device_type, dst->thrd_id());
const int64_t dst_dev_phy_id = GetDevPhyId(dst);
return GetDistance(src->machine_id(), src_dev_phy_id, src_device_type, dst->machine_id(),
dst_dev_phy_id, dst_device_type);
}
......
......@@ -65,7 +65,8 @@ void CopyHdTaskNode::InitProducedRegstMemCase(MemoryCase* mem_case) {
if (copy_type_ == CopyHdOpConf::H2D) {
TaskNode::InitProducedRegstMemCase(mem_case);
} else if (copy_type_ == CopyHdOpConf::D2H) {
mem_case->mutable_host_mem()->mutable_cuda_pinned_mem()->set_device_id(GpuPhyId());
mem_case->mutable_host_mem()->mutable_cuda_pinned_mem()->set_device_id(
stream_id().device_id().device_index());
} else {
UNIMPLEMENTED();
}
......
......@@ -686,7 +686,7 @@ void TaskGraph::ForEachGpuDeviceNodes(
HashMap<std::pair<int64_t, int64_t>, HashSet<TaskNode*>> global_dev_phy_id2nodes;
ForEachNode([&](TaskNode* task_node) {
if (task_node->device_type() != DeviceType::kGPU) { return; }
int64_t dev_phy_id = Global<IDMgr>::Get()->GetGpuPhyIdFromThrdId(task_node->thrd_id());
int64_t dev_phy_id = task_node->stream_id().device_id().device_index();
global_dev_phy_id2nodes[{task_node->machine_id(), dev_phy_id}].emplace(task_node);
});
for (const auto& pair : global_dev_phy_id2nodes) { Handler(pair.second); }
......
......@@ -64,10 +64,13 @@ std::shared_ptr<RegstDesc> TaskNode::GetSoleConsumedRegst(const std::string& nam
return vec.front();
}
DeviceType TaskNode::device_type() const {
return Global<IDMgr>::Get()->GetDeviceTypeFromThrdId(thrd_id_);
const StreamId& TaskNode::stream_id() const {
CHECK(new_task_id_);
return new_task_id_->stream_id();
}
DeviceType TaskNode::device_type() const { return stream_id().device_id().device_type(); }
void TaskNode::set_machine_id(int64_t val) {
CHECK_EQ(machine_id_, -1);
machine_id_ = val;
......@@ -310,8 +313,7 @@ void TaskNode::InitProducedRegstMemCase(MemoryCase* mem_case) {
if (device_type() == DeviceType::kCPU) {
mem_case->mutable_host_mem();
} else if (device_type() == DeviceType::kGPU) {
mem_case->mutable_device_cuda_mem()->set_device_id(
Global<IDMgr>::Get()->GetGpuPhyIdFromThrdId(thrd_id_));
mem_case->mutable_device_cuda_mem()->set_device_id(stream_id().device_id().device_index());
} else {
UNIMPLEMENTED();
}
......@@ -319,7 +321,8 @@ void TaskNode::InitProducedRegstMemCase(MemoryCase* mem_case) {
void TaskNode::PinConsumedRegstMemCase(MemoryCase* mem_case) {
if (mem_case->has_host_mem() && device_type() == DeviceType::kGPU) {
mem_case->mutable_host_mem()->mutable_cuda_pinned_mem()->set_device_id(GpuPhyId());
mem_case->mutable_host_mem()->mutable_cuda_pinned_mem()->set_device_id(
stream_id().device_id().device_index());
}
}
......@@ -336,8 +339,8 @@ void TaskNode::UpdateTaskId() {
CHECK_NE(machine_id_, -1);
CHECK_NE(thrd_id_, -1);
StreamId stream_id = DeserializeStreamIdFromInt64(thrd_id_);
TaskId task_id = Global<IDMgr>::Get()->GetTaskIdGenerator()->Generate(stream_id);
task_id_ = SerializeTaskIdToInt64(task_id);
new_task_id_.reset(new TaskId(Global<IDMgr>::Get()->GetTaskIdGenerator()->Generate(stream_id)));
task_id_ = SerializeTaskIdToInt64(*new_task_id_);
}
void TaskNode::EraseConsumedRegstsByName(const std::string& name) {
......
......@@ -53,6 +53,7 @@ class TaskNode : public Node<TaskNode, TaskEdge> {
int64_t machine_id() const { return machine_id_; }
int64_t thrd_id() const { return thrd_id_; }
int64_t task_id() const { return task_id_; }
const StreamId& stream_id() const;
int64_t chain_id() const { return chain_id_; }
int64_t order_in_graph() const { return order_in_graph_; }
const ExecGraph& exec_gph() const { return exec_gph_; }
......@@ -67,7 +68,6 @@ class TaskNode : public Node<TaskNode, TaskEdge> {
}
DeviceType device_type() const;
virtual const ParallelContext* parallel_ctx() const { return nullptr; }
int64_t GpuPhyId() const { return Global<IDMgr>::Get()->GetGpuPhyIdFromThrdId(thrd_id_); }
// Setters
void set_machine_id(int64_t val);
......@@ -150,6 +150,7 @@ class TaskNode : public Node<TaskNode, TaskEdge> {
int64_t task_id_;
int64_t chain_id_;
int64_t order_in_graph_;
std::unique_ptr<TaskId> new_task_id_;
ExecGraph exec_gph_;
HashMap<std::string, std::shared_ptr<RegstDesc>> produced_regsts_;
......
......@@ -14,23 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/job/id_manager.h"
#include "oneflow/core/device/cuda_util.h"
#include "oneflow/core/common/id_util.h"
#include "oneflow/core/graph/id_serialization.h"
namespace oneflow {
DeviceType IDMgr::GetDeviceTypeFromThrdId(int64_t thrd_id) const {
return DeserializeStreamIdFromInt64(thrd_id).device_id().device_type();
}
int64_t IDMgr::GetGpuPhyIdFromThrdId(int64_t thrd_id) const {
StreamId stream_id = DeserializeStreamIdFromInt64(thrd_id);
DeviceId device_id = stream_id.device_id();
CHECK_EQ(device_id.device_type(), DeviceType::kGPU);
return device_id.device_index();
}
DeviceType IDMgr::GetDeviceTypeFromActorId(int64_t actor_id) const {
return DeserializeTaskIdFromInt64(actor_id).stream_id().device_id().device_type();
}
......
......@@ -34,10 +34,6 @@ class IDMgr final {
int64_t NewMemBlockId() { return mem_block_id_count_++; }
int64_t NewChunkId() { return chunk_id_count_++; }
// GetFromThrdId
DeviceType GetDeviceTypeFromThrdId(int64_t thrd_id) const;
int64_t GetGpuPhyIdFromThrdId(int64_t thrd_id) const;
// Runtime
DeviceType GetDeviceTypeFromActorId(int64_t actor_id) const;
int64_t MachineId4ActorId(int64_t actor_id) const;
......
......@@ -94,10 +94,11 @@ void InitMemoryChains(Plan* plan,
HashMap<int64_t, HashMap<int64_t, MemoryChain>>* device2chain2mem_chain) {
for (int64_t i = 0; i < plan->task_size(); ++i) {
TaskProto* task = plan->mutable_task(i);
const StreamId stream_id = PlanUtil::GetStreamId(*task);
int64_t machine_id = task->machine_id();
DeviceType device_type = Global<IDMgr>::Get()->GetDeviceTypeFromThrdId(task->thrd_id());
DeviceType device_type = stream_id.device_id().device_type();
if (device_type != DeviceType::kGPU) { continue; }
int64_t device_id = Global<IDMgr>::Get()->GetGpuPhyIdFromThrdId(task->thrd_id());
int64_t device_id = stream_id.device_id().device_index();
int64_t device_unique_id = GenDeviceUniqueId(machine_id, device_id);
MemoryChain* mem_chain =
&((*device2chain2mem_chain)[device_unique_id][task->task_set_info().chain_id()]);
......
......@@ -18,13 +18,13 @@ limitations under the License.
#include "oneflow/core/job/env_desc.h"
#include "oneflow/core/job/global_for.h"
#include "oneflow/core/common/str_util.h"
#include "oneflow/core/graph/task_node.h"
#include "oneflow/core/graph/plan_task_graph.h"
#include "oneflow/core/graph/boxing/collective_boxing_util.h"
#include "oneflow/core/memory/chunk_manager.h"
#include "oneflow/core/memory/memory_case_util.h"
#include "oneflow/core/register/runtime_register_desc.h"
#include "oneflow/core/persistence/tee_persistent_log_stream.h"
#include "oneflow/core/graph/id_serialization.h"
namespace oneflow {
......@@ -460,10 +460,10 @@ void PlanUtil::ToDotFile(const Plan& plan, const std::string& filepath) {
return;
}
if (pass_tag == kNoPassTag) {
if (Global<IDMgr>::Get()->GetDeviceTypeFromThrdId(task_proto.thrd_id()) == DeviceType::kGPU) {
int64_t device_id = Global<IDMgr>::Get()->GetGpuPhyIdFromThrdId(task_proto.thrd_id());
const StreamId stream_id = PlanUtil::GetStreamId(task_proto);
if (stream_id.device_id().device_type() == DeviceType::kGPU) {
machine_id2job_id_device_id2node_list[task_proto.machine_id()][task_proto.job_id()]
[device_id]
[stream_id.device_id().device_index()]
.push_back(node_def);
machine_id2device_id2node_list_job_ids[task_proto.machine_id()].insert(task_proto.job_id());
} else {
......@@ -757,13 +757,10 @@ struct CollectiveBoxingRequestInfo {
void GetDeviceDesc(const TaskProto* task_proto, boxing::collective::DeviceDesc* device_desc) {
device_desc->set_machine_id(task_proto->machine_id());
const int64_t thrd_id = Global<IDMgr>::Get()->ThrdId4ActorId(task_proto->task_id());
device_desc->set_device_type(Global<IDMgr>::Get()->GetDeviceTypeFromThrdId(thrd_id));
if (device_desc->device_type() == DeviceType::kGPU) {
device_desc->set_device_id(Global<IDMgr>::Get()->GetGpuPhyIdFromThrdId(thrd_id));
} else {
UNIMPLEMENTED();
}
const StreamId stream_id = PlanUtil::GetStreamId(*task_proto);
const DeviceId& device_id = stream_id.device_id();
device_desc->set_device_type(device_id.device_type());
device_desc->set_device_id(device_id.device_index());
}
} // namespace
......@@ -981,4 +978,12 @@ void PlanUtil::PopulateOpAttibute(
}
}
/*static*/ StreamId PlanUtil::GetStreamId(const TaskProto& task) {
return DeserializeStreamIdFromInt64(task.thrd_id());
}
/*static*/ int64_t PlanUtil::GetDeviceIndex(const TaskProto& task) {
return GetStreamId(task).device_id().device_index();
}
} // namespace oneflow
......@@ -21,6 +21,7 @@ limitations under the License.
#include "oneflow/core/common/util.h"
#include "oneflow/core/job/plan.pb.h"
#include "oneflow/core/job/job.pb.h"
#include "oneflow/core/common/id_util.h"
namespace oneflow {
......@@ -45,6 +46,8 @@ struct PlanUtil {
static void PopulateOpAttibute(
Plan* plan,
const PbMap<int64_t, ::oneflow::OpAttributeRefTable>& job_id2op_attribute_ref_table);
static StreamId GetStreamId(const TaskProto& task);
static int64_t GetDeviceIndex(const TaskProto& task);
};
} // namespace oneflow
......
......@@ -19,6 +19,7 @@ limitations under the License.
#include "oneflow/core/persistence/tee_persistent_log_stream.h"
#include "oneflow/core/job/id_manager.h"
#include "oneflow/core/framework/to_string.h"
#include "oneflow/core/job/plan_util.h"
namespace oneflow {
......@@ -54,7 +55,7 @@ void PlanToPhysicalGraphFile(const Plan& plan) {
node->set_name(task_id2op_name.at(task.task_id()));
const OperatorConf& op_conf =
task.exec_sequence().exec_node(0).kernel_conf().op_attribute().op_conf();
DeviceType device_type = Global<IDMgr>::Get()->GetDeviceTypeFromThrdId(task.thrd_id());
DeviceType device_type = PlanUtil::GetStreamId(task).device_id().device_type();
node->set_device(*CHECK_JUST(DeviceTag4DeviceType(device_type)));
if (op_conf.has_user_conf()) {
const UserOpConf& user_op = op_conf.user_conf();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册