未验证 提交 19fdde6d 编写于 作者: J Juncheng 提交者: GitHub

Remove GlobalWorkStreamId/GlobalThrdId (#5917)

* Remove GlobalWorkStreamId/GlobalThrdId

* refine
Co-authored-by: Noneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
上级 476920fc
......@@ -41,7 +41,7 @@ void Actor::Init(const JobDesc* job_desc, const TaskProto& task_proto,
const ThreadCtx& thread_ctx) {
job_desc_ = job_desc;
actor_id_ = task_proto.task_id();
global_work_stream_id_ = Global<IDMgr>::Get()->GlobalWorkStreamId4ActorId(actor_id_);
thrd_id_ = Global<IDMgr>::Get()->ThrdId4ActorId(actor_id_);
job_id_ = task_proto.job_id();
InitDeviceCtx(thread_ctx);
if (task_proto.has_parallel_ctx()) {
......@@ -585,8 +585,7 @@ int Actor::TryUpdtStateAsProducedRegst(Regst* regst) {
void Actor::EnqueueAsyncMsg(const ActorMsg& msg) {
if (is_kernel_launch_synchronized_
&& global_work_stream_id_
== Global<IDMgr>::Get()->GlobalWorkStreamId4ActorId(msg.dst_actor_id())) {
&& thrd_id_ == Global<IDMgr>::Get()->ThrdId4ActorId(msg.dst_actor_id())) {
Global<ActorMsgBus>::Get()->SendMsg(msg);
} else {
async_msg_queue_.push_back(msg);
......
......@@ -43,7 +43,7 @@ class Actor {
int ProcessMsg(const ActorMsg& msg) { return (this->*msg_handler_)(msg); }
int64_t machine_id() const { return Global<IDMgr>::Get()->MachineId4ActorId(actor_id_); }
int64_t thrd_id() const { return Global<IDMgr>::Get()->ThrdId4ActorId(actor_id_); }
int64_t thrd_id() const { return thrd_id_; }
int64_t actor_id() const { return actor_id_; }
int64_t job_id() const { return job_id_; }
......@@ -195,7 +195,7 @@ class Actor {
const JobDesc* job_desc_;
int64_t actor_id_;
int64_t global_work_stream_id_;
int64_t thrd_id_;
int64_t job_id_;
std::unique_ptr<ParallelContext> parallel_ctx_;
std::vector<ExecKernel> exec_kernel_vec_;
......
......@@ -144,7 +144,7 @@ void TraverseConnectedSubGraphMergeInThisChain(TaskNode* this_node, const int64_
cur_node->ForEachNodeOnInOutDataEdge([&](TaskNode* next_node) {
if (visited_nodes.find(next_node) == visited_nodes.end() && CanBeMergedInChain(next_node)
&& this_node->GlobalWorkStreamId() == next_node->GlobalWorkStreamId()
&& this_node->thrd_id() == next_node->thrd_id()
&& (*GetTaskNodeTimeShape(next_node)) == (*seed_time_shape)) {
if (next_node->chain_id() == -1) {
queued_nodes.push(next_node);
......
......@@ -340,11 +340,6 @@ void TaskNode::UpdateTaskId() {
task_id_ = SerializeTaskIdToInt64(task_id);
}
int64_t TaskNode::GlobalWorkStreamId() const {
CHECK_NE(task_id_, -1);
return Global<IDMgr>::Get()->GlobalWorkStreamId4TaskId(task_id_);
}
void TaskNode::EraseConsumedRegstsByName(const std::string& name) {
if (consumed_regsts_.find(name) != consumed_regsts_.end()) {
for (auto& regst : consumed_regsts_[name]) { regst->DeleteConsumer(this); }
......
......@@ -67,7 +67,6 @@ class TaskNode : public Node<TaskNode, TaskEdge> {
}
DeviceType device_type() const;
virtual const ParallelContext* parallel_ctx() const { return nullptr; }
int64_t GlobalWorkStreamId() const;
int64_t GpuPhyId() const { return Global<IDMgr>::Get()->GetGpuPhyIdFromThrdId(thrd_id_); }
// Setters
......
......@@ -44,18 +44,6 @@ int64_t IDMgr::ThrdId4ActorId(int64_t actor_id) const {
return SerializeStreamIdToInt64(DeserializeTaskIdFromInt64(actor_id).stream_id());
}
int64_t IDMgr::GlobalWorkStreamId4TaskId(int64_t task_id) const {
return SerializeStreamIdToInt64(DeserializeTaskIdFromInt64(task_id).stream_id());
}
int64_t IDMgr::GlobalWorkStreamId4ActorId(int64_t actor_id) const {
return GlobalWorkStreamId4TaskId(actor_id);
}
int64_t IDMgr::GlobalThrdId4TaskId(int64_t task_id) const {
return SerializeStreamIdToInt64(DeserializeTaskIdFromInt64(task_id).stream_id());
}
int64_t IDMgr::PickCpuThrdIdEvenly(int64_t machine_id) {
DeviceId device_id{static_cast<DeviceId::rank_t>(machine_id), DeviceType::kCPU,
DeviceId::kCPUDeviceIndex};
......
......@@ -43,15 +43,6 @@ class IDMgr final {
int64_t MachineId4ActorId(int64_t actor_id) const;
int64_t ThrdId4ActorId(int64_t actor_id) const;
// global_thread_id
// sign | machine_id | thrd_id | 0 | 0
// 1 | 10 | 11 | 21 | 21
int64_t GlobalThrdId4TaskId(int64_t task_id) const;
// global_work_stream_id
// sign | machine_id | thrd_id | local_work_stream_id | 0
// 1 | 10 | 11 | 21 | 21
int64_t GlobalWorkStreamId4ActorId(int64_t actor_id) const;
int64_t GlobalWorkStreamId4TaskId(int64_t task_id) const;
int64_t PickCpuThrdIdEvenly(int64_t machine_id);
StreamIndexGeneratorManager* GetStreamIndexGeneratorManager() { return &stream_index_gen_mgr_; }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册