diff --git a/paddle/fluid/distributed/fleet_executor/amplifier_interceptor.cc b/paddle/fluid/distributed/fleet_executor/amplifier_interceptor.cc index 7d71f8e7b2242ead75d93294b007070409a828ef..72c689732b5b7df5f61d28d93a3bef6e305f426d 100644 --- a/paddle/fluid/distributed/fleet_executor/amplifier_interceptor.cc +++ b/paddle/fluid/distributed/fleet_executor/amplifier_interceptor.cc @@ -27,28 +27,6 @@ AmplifierInterceptor::AmplifierInterceptor(int64_t interceptor_id, run_at_offset_ = node->run_at_offset(); reply_up_per_steps_ = node->reply_up_per_steps(); send_down_per_steps_ = node->send_down_per_steps(); - - PADDLE_ENFORCE_GE( - run_per_steps_, 1, - platform::errors::InvalidArgument( - "run_per_steps must >= 1, but now is %ld", run_per_steps_)); - PADDLE_ENFORCE_GE( - run_at_offset_, 0, - platform::errors::InvalidArgument( - "run_at_offset must >= 0, but now is %ld", run_at_offset_)); - PADDLE_ENFORCE_LT(run_at_offset_, run_per_steps_, - platform::errors::InvalidArgument( - "run_at_offset must < run_per_steps, must now " - "run_at_offset=%ld run_per_steps=%ld", - run_at_offset_, run_per_steps_)); - PADDLE_ENFORCE_GE( - reply_up_per_steps_, 1, - platform::errors::InvalidArgument( - "reply_up_per_steps must >= 1, but now is %ld", reply_up_per_steps_)); - PADDLE_ENFORCE_GE(send_down_per_steps_, 1, - platform::errors::InvalidArgument( - "send_down_per_steps must >= 1, but now is %ld", - send_down_per_steps_)); } void AmplifierInterceptor::RunOps() { diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index 55878a1000ec453e0a358ee752a0ee7ae5ba31d1..e3af0de2c89d76ded0f1ecda555c17945aace597 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -199,6 +199,13 @@ void Carrier::CreateInterceptors() { int64_t interceptor_id = item.first; TaskNode* task_node = item.second; + PADDLE_ENFORCE_LT( + task_node->run_at_offset(), task_node->run_per_steps(), + platform::errors::InvalidArgument( + "Interceptor's run_at_offset must < run_per_steps, must now " + "run_at_offset=%ld run_per_steps=%ld", + task_node->run_at_offset(), task_node->run_per_steps())); + std::unique_ptr interceptor; if (task_node->type().empty()) { // TODO(wangxi): delete this in future @@ -214,7 +221,7 @@ void Carrier::CreateInterceptors() { SetInterceptor(interceptor_id, std::move(interceptor)); VLOG(3) << "Create Interceptor with interceptor id: " << interceptor_id - << "."; + << " with type: " << task_node->type() << "."; if (task_node->upstream().empty()) { source_interceptor_ids_.emplace_back(interceptor_id); diff --git a/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc b/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc index 09275dc10a136d2aaeb82805e3efb568e9593451..0c0411a035fb36532c0abf602f59f4041646268b 100644 --- a/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc +++ b/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc @@ -161,7 +161,8 @@ void ComputeInterceptor::ReplyCompletedToUpStream() { } void ComputeInterceptor::RunOps() { - VLOG(3) << "ComputeInterceptor " << interceptor_id_ << " running ops."; + VLOG(3) << "ComputeInterceptor " << interceptor_id_ << " running ops for the " + << step_ << " time."; for (auto op : node_->ops()) { op->Run(*microbatch_scopes_[step_ % node_->max_run_times()], place_); } @@ -180,6 +181,8 @@ void ComputeInterceptor::Run() { ReplyCompletedToUpStream(); // Try to stop Carrier if (is_last_ && (step_ % node_->max_run_times() == 0)) { + VLOG(3) << "Interceptor " << GetInterceptorId() + << " is stopping carrier."; StopCarrier(); } } diff --git a/paddle/fluid/distributed/fleet_executor/runtime_graph.cc b/paddle/fluid/distributed/fleet_executor/runtime_graph.cc index 21026ee3f973b7a7214a26b5a462292107984dd3..19afdf7441257fdd71de2627d148bf3aab335893 100644 --- a/paddle/fluid/distributed/fleet_executor/runtime_graph.cc +++ b/paddle/fluid/distributed/fleet_executor/runtime_graph.cc @@ -161,22 +161,30 @@ void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) { int64_t num_micro_batches = exe_desc_.num_micro_batches(); int64_t task_id = cur_rank * functionality_order.size(); for (std::size_t i = 0; i < functionality_order.size(); ++i) { + VLOG(3) << "Runtime graph is creating task node for: " << task_id << "."; OpRole role = functionality_order[i]; int32_t role_id = static_cast(role); int64_t max_run_times = num_micro_batches; int64_t max_slot_nums = start_up_steps; - if (IsLRSched(role_id) || IsOptimize(role_id)) { - max_run_times = 1; - max_slot_nums = 1; + // NOTE: use short path, each interceptor should run for max_run_times + std::vector task_ops{}; + if (role_to_ops.find(role_id) != role_to_ops.end()) { + task_ops = role_to_ops.at(role_id); } - if (role_to_ops.find(role_id) == role_to_ops.end()) { - task_nodes_.emplace_back(TaskNode::CreateEmptyTaskNode( - role_id, cur_rank, task_id, max_run_times, max_slot_nums)); + std::unique_ptr task_node = std::make_unique( + role_id, task_ops, cur_rank, task_id, max_run_times, max_slot_nums); + if (IsLRSched(role_id) || IsOptimize(role_id)) { + task_node->SetType("Amplifier"); + if (IsLRSched(role_id)) { + task_node->SetRunPerSteps(max_run_times); + } else { + task_node->SetRunAtOffset(max_run_times - 1); + task_node->SetRunPerSteps(max_run_times); + } } else { - task_nodes_.emplace_back( - TaskNode::CreateTaskNode(role_id, role_to_ops.at(role_id), cur_rank, - task_id, max_run_times, max_slot_nums)); + task_node->SetType("Compute"); } + task_nodes_.emplace_back(std::move(task_node)); ++task_id; } } @@ -227,6 +235,8 @@ void RuntimeGraph::FakeDependence() { void RuntimeGraph::AssignTaskToIntercepter() { for (const auto& task : task_nodes_) { int64_t intercepter_id = task->task_id(); + VLOG(3) << "Runtime graph is assigning task to interceptor: " + << intercepter_id << " with type: " << task->type() << "."; if (intercepter_id_to_node_.find(intercepter_id) != intercepter_id_to_node_.end()) { PADDLE_THROW(platform::errors::PreconditionNotMet( diff --git a/paddle/fluid/distributed/fleet_executor/task_node.cc b/paddle/fluid/distributed/fleet_executor/task_node.cc index 00b256da6af3831dd6c2c1545a961e0355a0016c..f2e785010b72632d87f4543a139038fa1518613d 100644 --- a/paddle/fluid/distributed/fleet_executor/task_node.cc +++ b/paddle/fluid/distributed/fleet_executor/task_node.cc @@ -57,22 +57,6 @@ TaskNode::TaskNode(int32_t role, int64_t rank, int64_t task_id, max_run_times_(max_run_times), max_slot_nums_(max_slot_nums) {} -std::unique_ptr TaskNode::CreateEmptyTaskNode(int32_t role, - int64_t rank, - int64_t task_id, - int64_t max_run_times, - int64_t max_slot_nums) { - return std::make_unique(role, rank, task_id, max_run_times, - max_slot_nums); -} - -std::unique_ptr TaskNode::CreateTaskNode( - int32_t role, const std::vector& ops, int64_t rank, - int64_t task_id, int64_t max_run_times, int64_t max_slot_nums) { - return std::make_unique(role, ops, rank, task_id, max_run_times, - max_slot_nums); -} - bool TaskNode::AddUpstreamTask(int64_t task_id) { const auto& ret = upstream_.insert(task_id); return *ret.first == task_id; @@ -92,5 +76,34 @@ std::string TaskNode::DebugString() const { os << "\n"; return os.str(); } + +void TaskNode::SetRunPerSteps(int64_t value) { + PADDLE_ENFORCE_GE(value, 1, + platform::errors::InvalidArgument( + "run_per_steps must >= 1, but received %ld", value)); + run_per_steps_ = value; +} + +void TaskNode::SetRunAtOffset(int64_t value) { + PADDLE_ENFORCE_GE(value, 0, + platform::errors::InvalidArgument( + "run_at_offset must >= 0, but received %ld", value)); + run_at_offset_ = value; +} + +void TaskNode::SetReplyUpPerSteps(int64_t value) { + PADDLE_ENFORCE_GE( + value, 1, platform::errors::InvalidArgument( + "reply_up_per_steps must >= 1, but received %ld", value)); + reply_up_per_steps_ = value; +} + +void TaskNode::SetSendDownPerSteps(int64_t value) { + PADDLE_ENFORCE_GE( + value, 1, platform::errors::InvalidArgument( + "send_down_per_steps must >= 1, but received %ld", value)); + send_down_per_steps_ = value; +} + } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/fleet_executor/task_node.h b/paddle/fluid/distributed/fleet_executor/task_node.h index f5704e6ae0cccbd2ca686f3527a4336fa977c0b0..23fb4c0a7dbfcdf801afebd593a495d0322d78f1 100644 --- a/paddle/fluid/distributed/fleet_executor/task_node.h +++ b/paddle/fluid/distributed/fleet_executor/task_node.h @@ -54,25 +54,16 @@ class TaskNode final { const paddle::framework::ProgramDesc& program() const { return program_; } const std::vector& ops() const { return ops_; } - void SetRunPerSteps(int64_t value) { run_per_steps_ = value; } - void SetRunAtOffset(int64_t value) { run_at_offset_ = value; } - void SetReplyUpPerSteps(int64_t value) { reply_up_per_steps_ = value; } - void SetSendDownPerSteps(int64_t value) { send_down_per_steps_ = value; } + void SetRunPerSteps(int64_t value); + void SetRunAtOffset(int64_t value); + void SetReplyUpPerSteps(int64_t value); + void SetSendDownPerSteps(int64_t value); void SetType(const std::string& type) { type_ = type; } bool AddUpstreamTask(int64_t task_id); bool AddDownstreamTask(int64_t task_id); std::string DebugString() const; - static std::unique_ptr CreateEmptyTaskNode(int32_t role, - int64_t rank, - int64_t task_id, - int64_t max_run_times, - int64_t max_slot_nums); - static std::unique_ptr CreateTaskNode( - int32_t role, const std::vector& ops, int64_t rank, - int64_t task_id, int64_t max_run_times, int64_t max_slot_nums); - private: DISABLE_COPY_AND_ASSIGN(TaskNode); TaskNode() = default;