diff --git a/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc b/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc index 0c0411a035fb36532c0abf602f59f4041646268b..084e91c11caa7372fc079999aed279871cd96792 100644 --- a/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc +++ b/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc @@ -27,19 +27,15 @@ ComputeInterceptor::ComputeInterceptor(int64_t interceptor_id, TaskNode* node) } void ComputeInterceptor::PrepareDeps() { - auto& upstream = GetTaskNode()->upstream(); - auto& downstream = GetTaskNode()->downstream(); + auto& upstream = node_->upstream(); + auto& downstream = node_->downstream(); - // TODO(wangxi): get from task node - int64_t in_buff_size = std::numeric_limits::max(); - int64_t out_buff_size = 2; - - for (auto up_id : upstream) { - in_readys_.emplace(up_id, std::make_pair(in_buff_size, 0)); - in_stops_.emplace(up_id, false); + for (auto up : upstream) { + in_readys_.emplace(up.first, std::make_pair(up.second, 0)); + in_stops_.emplace(up.first, false); } - for (auto down_id : downstream) { - out_buffs_.emplace(down_id, std::make_pair(out_buff_size, 0)); + for (auto down : downstream) { + out_buffs_.emplace(down.first, std::make_pair(down.second, 0)); } // source compute node, should we add a new SourceInterceptor? @@ -114,8 +110,7 @@ bool ComputeInterceptor::CanWriteOutput() { // only source node need reset bool ComputeInterceptor::ShouldReset() { - if (is_source_ && step_ == node_->max_run_times()) return true; - return false; + return is_source_ && (step_ == node_->max_run_times()); } void ComputeInterceptor::SendDataReadyToDownStream() { diff --git a/paddle/fluid/distributed/fleet_executor/runtime_graph.cc b/paddle/fluid/distributed/fleet_executor/runtime_graph.cc index 19afdf7441257fdd71de2627d148bf3aab335893..9999956223ab1538d13f5494be7b25cc46cd5fb0 100644 --- a/paddle/fluid/distributed/fleet_executor/runtime_graph.cc +++ b/paddle/fluid/distributed/fleet_executor/runtime_graph.cc @@ -150,12 +150,14 @@ void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) { } role_to_ops.at(new_op_role_id).emplace_back(op.get()); } + int64_t cur_rank = exe_desc_.cur_rank(); DistCoordSys coord_sys(exe_desc_.dp_degree(), exe_desc_.pp_degree(), exe_desc_.mp_degree()); const auto& coord = coord_sys.RankToCoord(cur_rank); int pipeline_stage = coord.pp_idx; int64_t num_pipeline_stages = exe_desc_.pp_degree(); + // TODO(fleet_executor dev): start up steps should be a config `num_slots` int64_t start_up_steps = num_pipeline_stages - pipeline_stage; int64_t num_micro_batches = exe_desc_.num_micro_batches(); @@ -199,36 +201,69 @@ void RuntimeGraph::FakeDependence() { downstream_coord.pp_idx += 1; int64_t pp_upstream = coord_sys.CoordToRank(upstream_coord); int64_t pp_downstream = coord_sys.CoordToRank(downstream_coord); + bool is_first_stage = (pp_upstream == -1); + bool is_last_stage = (pp_downstream == -1); + int32_t num_of_functionality = functionality_order.size(); - // lr -> forward -> backward -> optimize - // | | - // lr -> forward -> backward -> optimize + // lr(1:m) -> forward -> backward -> (m:1)optimize + // ↑ ↓ + // lr(1:m) -> forward -> backward -> (m:1)optimize + // ↑ ↓ + // lr(1:m) -> forward -> backward -> (m:1)optimize for (std::size_t i = 0; i < task_nodes_.size(); ++i) { - if (i != 0) { - task_nodes_[i]->AddUpstreamTask(cur_rank * num_of_functionality + i - 1); + auto& node = task_nodes_[i]; + bool is_forward = IsForward(node->role()); + bool is_backward = IsBackward(node->role()); + + int64_t cur_id = cur_rank * num_of_functionality + i; + int64_t prev_id = cur_id - 1; + int64_t next_id = cur_id + 1; + + int64_t upstream_id = pp_upstream * num_of_functionality + i; + int64_t downstream_id = pp_downstream * num_of_functionality + i; + + // 1F1B, last stage pp_buff_size should be 1, while first stage + // pp_buff_size should be pp_degree + int64_t pp_buff_size = exe_desc_.pp_degree() - coord.pp_idx; + + std::vector> ups; + std::vector> downs; + + if (i != 0) { // not lr + int64_t buff_size = is_backward ? pp_buff_size : 2; + ups.emplace_back(prev_id, buff_size); } - if (i != task_nodes_.size() - 1) { - task_nodes_[i]->AddDownstreamTask(cur_rank * num_of_functionality + i + - 1); + if (i != task_nodes_.size() - 1) { // not optimize + int64_t buff_size = is_forward ? pp_buff_size : 2; + downs.emplace_back(next_id, buff_size); } - if (IsForward(task_nodes_[i]->role())) { - if (pp_upstream != -1) { - task_nodes_[i]->AddUpstreamTask(pp_upstream * num_of_functionality + i); + + if (is_forward) { + if (!is_first_stage) { + ups.emplace_back(upstream_id, 2); } - if (pp_downstream != -1) { - task_nodes_[i]->AddDownstreamTask(pp_downstream * num_of_functionality + - i); + if (!is_last_stage) { + downs.emplace_back(downstream_id, 2); } - } else if (IsBackward(task_nodes_[i]->role())) { - if (pp_downstream != -1) { - task_nodes_[i]->AddUpstreamTask(pp_downstream * num_of_functionality + - i); + } else if (is_backward) { + if (!is_last_stage) { + ups.emplace_back(downstream_id, 2); } - if (pp_upstream != -1) { - task_nodes_[i]->AddDownstreamTask(pp_upstream * num_of_functionality + - i); + if (!is_first_stage) { + downs.emplace_back(upstream_id, 2); } } + + for (auto up : ups) { + VLOG(3) << "Task(" << cur_id << ") AddUpstream Task(" << up.first + << ") with buff_size=" << up.second; + node->AddUpstreamTask(up.first, up.second); + } + for (auto down : downs) { + VLOG(3) << "Task(" << cur_id << ") AddDownstream Task(" << down.first + << ") with buff_size=" << down.second; + node->AddDownstreamTask(down.first, down.second); + } } } diff --git a/paddle/fluid/distributed/fleet_executor/task_node.cc b/paddle/fluid/distributed/fleet_executor/task_node.cc index f2e785010b72632d87f4543a139038fa1518613d..e92ab09d481e8fe4e013b44ea7817d6dff305e8e 100644 --- a/paddle/fluid/distributed/fleet_executor/task_node.cc +++ b/paddle/fluid/distributed/fleet_executor/task_node.cc @@ -57,14 +57,14 @@ TaskNode::TaskNode(int32_t role, int64_t rank, int64_t task_id, max_run_times_(max_run_times), max_slot_nums_(max_slot_nums) {} -bool TaskNode::AddUpstreamTask(int64_t task_id) { - const auto& ret = upstream_.insert(task_id); - return *ret.first == task_id; +bool TaskNode::AddUpstreamTask(int64_t task_id, int64_t buff_size) { + const auto& ret = upstream_.emplace(task_id, buff_size); + return ret.second; } -bool TaskNode::AddDownstreamTask(int64_t task_id) { - const auto& ret = downstream_.insert(task_id); - return *ret.first == task_id; +bool TaskNode::AddDownstreamTask(int64_t task_id, int64_t buff_size) { + const auto& ret = downstream_.emplace(task_id, buff_size); + return ret.second; } std::string TaskNode::DebugString() const { diff --git a/paddle/fluid/distributed/fleet_executor/task_node.h b/paddle/fluid/distributed/fleet_executor/task_node.h index 23fb4c0a7dbfcdf801afebd593a495d0322d78f1..a03ccd4cded18e5aab86ad126d41ec710decfa26 100644 --- a/paddle/fluid/distributed/fleet_executor/task_node.h +++ b/paddle/fluid/distributed/fleet_executor/task_node.h @@ -48,8 +48,12 @@ class TaskNode final { int64_t run_at_offset() const { return run_at_offset_; } int64_t reply_up_per_steps() const { return reply_up_per_steps_; } int64_t send_down_per_steps() const { return send_down_per_steps_; } - const std::unordered_set& upstream() const { return upstream_; } - const std::unordered_set& downstream() const { return downstream_; } + const std::unordered_map& upstream() const { + return upstream_; + } + const std::unordered_map& downstream() const { + return downstream_; + } const std::string& type() const { return type_; } const paddle::framework::ProgramDesc& program() const { return program_; } const std::vector& ops() const { return ops_; } @@ -60,8 +64,9 @@ class TaskNode final { void SetSendDownPerSteps(int64_t value); void SetType(const std::string& type) { type_ = type; } - bool AddUpstreamTask(int64_t task_id); - bool AddDownstreamTask(int64_t task_id); + // upstream need buffs? + bool AddUpstreamTask(int64_t task_id, int64_t buff_size = 1); + bool AddDownstreamTask(int64_t task_id, int64_t buff_size = 1); std::string DebugString() const; private: @@ -69,8 +74,9 @@ class TaskNode final { TaskNode() = default; // ops_ will be removed in the future std::vector ops_; - std::unordered_set upstream_; - std::unordered_set downstream_; + // task_id-->buff_size + std::unordered_map upstream_; + std::unordered_map downstream_; framework::ProgramDesc program_; std::vector> ops_vec_; int32_t role_; diff --git a/paddle/fluid/distributed/fleet_executor/test/compute_interceptor_test.cc b/paddle/fluid/distributed/fleet_executor/test/compute_interceptor_test.cc index 8f44b2035aea02f36205f6c9d1af0d490979370e..44dc0c9bc9b0c9b1010ab20d8696e97e13165173 100644 --- a/paddle/fluid/distributed/fleet_executor/test/compute_interceptor_test.cc +++ b/paddle/fluid/distributed/fleet_executor/test/compute_interceptor_test.cc @@ -56,8 +56,8 @@ TEST(ComputeInterceptor, Compute) { TaskNode* node_c = new TaskNode(0, 0, 2, 3, 0); // a->b->c - node_a->AddDownstreamTask(1); - node_b->AddUpstreamTask(0); + node_a->AddDownstreamTask(1, 3); + node_b->AddUpstreamTask(0, 3); node_b->AddDownstreamTask(2); node_c->AddUpstreamTask(1); diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_short_path_test.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_short_path_test.cc index db42135040ae744f9a1946aa6123a6104621f55e..936a970c05f7c507a30fc9434fa3c8013f2d0362 100644 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_short_path_test.cc +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_short_path_test.cc @@ -25,19 +25,34 @@ limitations under the License. */ namespace paddle { namespace distributed { -void LinkNodes(const std::vector& nodes) { +int64_t GetBuffSize( + const std::map, int64_t> buffs, + TaskNode* from, TaskNode* to) { + if (buffs.find({from, to}) != buffs.end()) { + return buffs.at({from, to}); + } + if (buffs.find({to, from}) != buffs.end()) { + return buffs.at({to, from}); + } + return 2; // set default 2 +} + +void LinkNodes(const std::vector& nodes, + const std::map, int64_t> buffs) { size_t size = nodes.size(); if (size <= 1) return; { // i = 0 TaskNode* now = nodes[0]; TaskNode* next = nodes[1]; - now->AddDownstreamTask(next->task_id()); + auto buff_size = GetBuffSize(buffs, now, next); + now->AddDownstreamTask(next->task_id(), buff_size); } { // i = size - 1 TaskNode* prev = nodes[size - 2]; TaskNode* now = nodes[size - 1]; - now->AddUpstreamTask(prev->task_id()); + auto buff_size = GetBuffSize(buffs, prev, now); + now->AddUpstreamTask(prev->task_id(), buff_size); } for (size_t i = 1; i < size - 1; ++i) { @@ -45,8 +60,11 @@ void LinkNodes(const std::vector& nodes) { TaskNode* now = nodes[i]; TaskNode* next = nodes[i + 1]; - now->AddUpstreamTask(prev->task_id()); - now->AddDownstreamTask(next->task_id()); + auto buff_size = GetBuffSize(buffs, prev, now); + now->AddUpstreamTask(prev->task_id(), buff_size); + + buff_size = GetBuffSize(buffs, now, next); + now->AddDownstreamTask(next->task_id(), buff_size); } } @@ -55,7 +73,7 @@ TEST(AmplifierInterceptor, Amplifier) { MessageBus& msg_bus = MessageBus::Instance(); msg_bus.Init({{0, 0}, {1, 0}, {2, 0}, {3, 0}}, {{0, ""}}, ""); - int64_t micro_steps = 3; + int64_t micro_steps = 6; // NOTE: don't delete, otherwise interceptor will use undefined node TaskNode* node_a = @@ -65,7 +83,8 @@ TEST(AmplifierInterceptor, Amplifier) { TaskNode* node_d = new TaskNode(0, 0, 3, micro_steps, 0); // a->b->c->d - LinkNodes({node_a, node_b, node_c, node_d}); + // LR->F->B->U + LinkNodes({node_a, node_b, node_c, node_d}, {{{node_b, node_c}, 1}}); node_a->SetRunPerSteps(micro_steps); node_d->SetRunPerSteps(micro_steps); diff --git a/python/paddle/fluid/tests/unittests/test_fleet_executor_task_node.py b/python/paddle/fluid/tests/unittests/test_fleet_executor_task_node.py index 2c944aa5dbc471b3bd42cdf756fa4e4e3aed9155..3dae8a5bf6b95814b00cf4a3ddfa514f8a425505 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_executor_task_node.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_executor_task_node.py @@ -28,8 +28,9 @@ class TestFleetExecutorTaskNode(unittest.TestCase): self.assertEqual(task_node_0.task_id(), 0) self.assertEqual(task_node_1.task_id(), 1) self.assertEqual(task_node_2.task_id(), 2) - self.assertTrue(task_node_0.add_downstream_task(task_node_1.task_id())) - self.assertTrue(task_node_1.add_upstream_task(task_node_0.task_id())) + self.assertTrue( + task_node_0.add_downstream_task(task_node_1.task_id(), 1)) + self.assertTrue(task_node_1.add_upstream_task(task_node_0.task_id(), 1)) if __name__ == "__main__":