diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc index 13c6a4e8c39d783f60542918d6f6efd5725a80b4..868e23280be86d3f5bb6e7cf9f5be8747dabe0fc 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc @@ -35,6 +35,7 @@ FleetExecutor::~FleetExecutor() { void FleetExecutor::Init(const paddle::framework::ProgramDesc& program_desc) { runtime_graph_ = std::make_unique(program_desc, exe_desc_); + VLOG(5) << runtime_graph_->DebugString(); InitCarrier(); InitMessageBus(); } diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.h b/paddle/fluid/distributed/fleet_executor/fleet_executor.h index 6343e21f7dcb836758b479d43227b2fd85eb5704..3ef3e5345492abc62e648a70a8986766a84c0464 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.h +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.h @@ -14,6 +14,7 @@ #pragma once #include +#include #include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h" #include "paddle/fluid/platform/macros.h" diff --git a/paddle/fluid/distributed/fleet_executor/runtime_graph.cc b/paddle/fluid/distributed/fleet_executor/runtime_graph.cc index 5a98704fe9c0155909170517241b37546fbdbace..c9455ffef492d5d25e9179832e7f452e7cfe6c2c 100644 --- a/paddle/fluid/distributed/fleet_executor/runtime_graph.cc +++ b/paddle/fluid/distributed/fleet_executor/runtime_graph.cc @@ -27,24 +27,24 @@ using OpRole = paddle::framework::OpRole; using OpRegistry = paddle::framework::OpRegistry; using ProgramDesc = paddle::framework::ProgramDesc; -bool IsForward(int64_t op_role) { - return (op_role == static_cast(OpRole::kForward)) || - (op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss))); +bool IsForward(int32_t op_role) { + return (op_role == static_cast(OpRole::kForward)) || + (op_role == (static_cast(OpRole::kForward) | + static_cast(OpRole::kLoss))); } -bool IsLRSched(int64_t op_role) { - return op_role == static_cast(OpRole::kLRSched); +bool IsLRSched(int32_t op_role) { + return op_role == static_cast(OpRole::kLRSched); } -bool IsBackward(int64_t op_role) { - return (op_role == static_cast(OpRole::kBackward)) || - (op_role == (static_cast(OpRole::kBackward) | - static_cast(OpRole::kLoss))); +bool IsBackward(int32_t op_role) { + return (op_role == static_cast(OpRole::kBackward)) || + (op_role == (static_cast(OpRole::kBackward) | + static_cast(OpRole::kLoss))); } -bool IsOptimize(int64_t op_role) { - return op_role == static_cast(OpRole::kOptimize); +bool IsOptimize(int32_t op_role) { + return op_role == static_cast(OpRole::kOptimize); } struct DistCoord { @@ -112,9 +112,9 @@ void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) { for (const auto& op_desc : program.Block(0).AllOps()) { ops_.emplace_back(OpRegistry::CreateOp(*op_desc)); } - std::unordered_map> role_to_ops; + std::unordered_map> role_to_ops; for (const auto& op : ops_) { - int64_t op_role = op->Attr("op_role"); + int32_t op_role = op->Attr("op_role"); OpRole new_op_role; if (IsLRSched(op_role)) { new_op_role = OpRole::kLRSched; @@ -129,7 +129,7 @@ void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) { "The op %s is None of LRSched, Forward, Backward or Optimize.", op->Type())); } - int64_t new_op_role_id = static_cast(new_op_role); + int32_t new_op_role_id = static_cast(new_op_role); if (role_to_ops.find(new_op_role_id) == role_to_ops.end()) { role_to_ops.insert({new_op_role_id, {}}); } @@ -147,7 +147,7 @@ void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) { int64_t task_id = cur_rank * functionality_order.size(); for (std::size_t i = 0; i < functionality_order.size(); ++i) { OpRole role = functionality_order[i]; - int64_t role_id = static_cast(role); + int32_t role_id = static_cast(role); int64_t max_run_times = num_micro_batches; int64_t max_slot_nums = start_up_steps; if (IsLRSched(role_id) || IsOptimize(role_id)) { @@ -225,12 +225,22 @@ void RuntimeGraph::FakeRuntimeInfo() { int64_t nrank = exe_desc_.cluster_info().size(); int32_t num_of_functionality = functionality_order.size(); for (int64_t i = 0; i < nrank; ++i) { - for (int64_t j = 0; j < num_of_functionality; ++j) { + for (int32_t j = 0; j < num_of_functionality; ++j) { int64_t intercepter_id = i * num_of_functionality + j; intercepter_id_to_rank_.insert({intercepter_id, i}); } } } +std::string RuntimeGraph::DebugString() const { + std::ostringstream os; + os << "\nRuntime Graph Debug: \n"; + for (const auto& task : task_nodes_) { + os << task->DebugString(); + os << "\n"; + } + return os.str(); +} + } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/fleet_executor/runtime_graph.h b/paddle/fluid/distributed/fleet_executor/runtime_graph.h index b25a93080da256e393ce7bedd8db92704a3dfd2a..b19456962d63161f06aeafca22f6b8bd6596ad25 100644 --- a/paddle/fluid/distributed/fleet_executor/runtime_graph.h +++ b/paddle/fluid/distributed/fleet_executor/runtime_graph.h @@ -14,6 +14,7 @@ #pragma once #include +#include #include #include #include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h" @@ -43,6 +44,7 @@ class RuntimeGraph final { const std::unordered_map& intercepter_id_to_rank() const { return intercepter_id_to_rank_; } + std::string DebugString() const; private: DISABLE_COPY_AND_ASSIGN(RuntimeGraph); diff --git a/paddle/fluid/distributed/fleet_executor/task_node.cc b/paddle/fluid/distributed/fleet_executor/task_node.cc index 1a20b4c32b5058fc63c51068f41297973b574fe7..3eee0fa3cb0719df18048cba9c6ecd74043f9280 100644 --- a/paddle/fluid/distributed/fleet_executor/task_node.cc +++ b/paddle/fluid/distributed/fleet_executor/task_node.cc @@ -21,7 +21,7 @@ namespace { using OperatorBase = TaskNode::OperatorBase; } -TaskNode::TaskNode(int64_t role, const std::vector& ops, +TaskNode::TaskNode(int32_t role, const std::vector& ops, int64_t rank, int64_t task_id, int64_t max_run_times, int64_t max_slot_nums) : ops_(ops), @@ -31,7 +31,7 @@ TaskNode::TaskNode(int64_t role, const std::vector& ops, max_run_times_(max_run_times), max_slot_nums_(max_slot_nums) {} -TaskNode::TaskNode(int64_t role, int64_t rank, int64_t task_id, +TaskNode::TaskNode(int32_t role, int64_t rank, int64_t task_id, int64_t max_run_times, int64_t max_slot_nums) : role_(role), rank_(rank), @@ -39,7 +39,7 @@ TaskNode::TaskNode(int64_t role, int64_t rank, int64_t task_id, max_run_times_(max_run_times), max_slot_nums_(max_slot_nums) {} -std::unique_ptr TaskNode::CreateEmptyTaskNode(int64_t role, +std::unique_ptr TaskNode::CreateEmptyTaskNode(int32_t role, int64_t rank, int64_t task_id, int64_t max_run_times, @@ -49,7 +49,7 @@ std::unique_ptr TaskNode::CreateEmptyTaskNode(int64_t role, } std::unique_ptr TaskNode::CreateTaskNode( - int64_t role, const std::vector& ops, int64_t rank, + int32_t role, const std::vector& ops, int64_t rank, int64_t task_id, int64_t max_run_times, int64_t max_slot_nums) { return std::make_unique(role, ops, rank, task_id, max_run_times, max_slot_nums); @@ -60,5 +60,15 @@ void TaskNode::AddUpstreamTask(int64_t task_id) { upstream_.insert(task_id); } void TaskNode::AddDownstreamTask(int64_t task_id) { downstream_.insert(task_id); } + +std::string TaskNode::DebugString() const { + std::ostringstream os; + os << "role: " << role_ << ", task_id: " << task_id_ << "\n"; + for (std::size_t i = 0; i < ops_.size(); ++i) { + os << ops_[i]->Type() << " "; + } + os << "\n"; + return os.str(); +} } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/fleet_executor/task_node.h b/paddle/fluid/distributed/fleet_executor/task_node.h index a90106d01d26dc2087b14f3c06cc03ff43fddb42..ea7f43eb82bb9726404f3d34312d0d41cb78c230 100644 --- a/paddle/fluid/distributed/fleet_executor/task_node.h +++ b/paddle/fluid/distributed/fleet_executor/task_node.h @@ -28,27 +28,28 @@ namespace distributed { class TaskNode final { public: using OperatorBase = paddle::framework::OperatorBase; - TaskNode(int64_t role, int64_t rank, int64_t task_id, int64_t max_run_times, + TaskNode(int32_t role, int64_t rank, int64_t task_id, int64_t max_run_times, int64_t max_slot_nums); - TaskNode(int64_t role, const std::vector& ops, int64_t rank, + TaskNode(int32_t role, const std::vector& ops, int64_t rank, int64_t task_id, int64_t max_run_times, int64_t max_slot_nums); ~TaskNode() = default; int64_t rank() const { return rank_; } int64_t task_id() const { return task_id_; } - int64_t role() const { return role_; } + int32_t role() const { return role_; } int64_t max_run_times() const { return max_run_times_; } int64_t max_slot_nums() const { return max_slot_nums_; } const std::unordered_set& upstream() const { return upstream_; } const std::unordered_set& downstream() const { return downstream_; } void AddUpstreamTask(int64_t task_id); void AddDownstreamTask(int64_t task_id); - static std::unique_ptr CreateEmptyTaskNode(int64_t role, + std::string DebugString() const; + static std::unique_ptr CreateEmptyTaskNode(int32_t role, int64_t rank, int64_t task_id, int64_t max_run_times, int64_t max_slot_nums); static std::unique_ptr CreateTaskNode( - int64_t role, const std::vector& ops, int64_t rank, + int32_t role, const std::vector& ops, int64_t rank, int64_t task_id, int64_t max_run_times, int64_t max_slot_nums); private: @@ -57,7 +58,7 @@ class TaskNode final { std::vector ops_; std::unordered_set upstream_; std::unordered_set downstream_; - int64_t role_; + int32_t role_; int64_t rank_; int64_t task_id_; int64_t max_run_times_;