From ddf38a3f806cfae466a0643269548b84162bba54 Mon Sep 17 00:00:00 2001 From: WangXi Date: Thu, 2 Dec 2021 10:43:21 +0800 Subject: [PATCH] [fleet_executor] Add amplifier interceptor and 1F1B scheduler test (#37755) --- .../distributed/fleet_executor/CMakeLists.txt | 3 +- .../fleet_executor/amplifier_interceptor.cc | 82 ++++++++++++++++ .../fleet_executor/amplifier_interceptor.h | 43 +++++++++ .../distributed/fleet_executor/carrier.cc | 1 + .../fleet_executor/compute_interceptor.cc | 15 +-- .../fleet_executor/compute_interceptor.h | 13 ++- .../distributed/fleet_executor/task_node.h | 17 ++++ .../fleet_executor/test/CMakeLists.txt | 6 ++ .../interceptor_pipeline_long_path_test.cc | 94 +++++++++++++++++++ .../interceptor_pipeline_short_path_test.cc | 90 ++++++++++++++++++ 10 files changed, 352 insertions(+), 12 deletions(-) create mode 100644 paddle/fluid/distributed/fleet_executor/amplifier_interceptor.cc create mode 100644 paddle/fluid/distributed/fleet_executor/amplifier_interceptor.h create mode 100644 paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_long_path_test.cc create mode 100644 paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_short_path_test.cc diff --git a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt index 641110802f1..b615088b3b1 100644 --- a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt +++ b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt @@ -11,7 +11,7 @@ else() endif() cc_library(fleet_executor SRCS fleet_executor.cc carrier.cc task_node.cc runtime_graph.cc - interceptor.cc compute_interceptor.cc interceptor_message_service.cc message_bus.cc + interceptor.cc compute_interceptor.cc amplifier_interceptor.cc interceptor_message_service.cc message_bus.cc DEPS proto_desc fleet_executor_desc_proto interceptor_message_proto collective_helper ${BRPC_DEPS}) @@ -19,6 +19,7 @@ if(WITH_DISTRIBUTE) set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") set_source_files_properties(interceptor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(compute_interceptor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) + set_source_files_properties(amplifier_interceptor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(message_bus.h PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(message_bus.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(fleet_executor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) diff --git a/paddle/fluid/distributed/fleet_executor/amplifier_interceptor.cc b/paddle/fluid/distributed/fleet_executor/amplifier_interceptor.cc new file mode 100644 index 00000000000..7d71f8e7b22 --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/amplifier_interceptor.cc @@ -0,0 +1,82 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/fleet_executor/amplifier_interceptor.h" + +#include "paddle/fluid/distributed/fleet_executor/task_node.h" +#include "paddle/fluid/framework/operator.h" + +namespace paddle { +namespace distributed { + +AmplifierInterceptor::AmplifierInterceptor(int64_t interceptor_id, + TaskNode* node) + : ComputeInterceptor(interceptor_id, node) { + run_per_steps_ = node->run_per_steps(); + run_at_offset_ = node->run_at_offset(); + reply_up_per_steps_ = node->reply_up_per_steps(); + send_down_per_steps_ = node->send_down_per_steps(); + + PADDLE_ENFORCE_GE( + run_per_steps_, 1, + platform::errors::InvalidArgument( + "run_per_steps must >= 1, but now is %ld", run_per_steps_)); + PADDLE_ENFORCE_GE( + run_at_offset_, 0, + platform::errors::InvalidArgument( + "run_at_offset must >= 0, but now is %ld", run_at_offset_)); + PADDLE_ENFORCE_LT(run_at_offset_, run_per_steps_, + platform::errors::InvalidArgument( + "run_at_offset must < run_per_steps, must now " + "run_at_offset=%ld run_per_steps=%ld", + run_at_offset_, run_per_steps_)); + PADDLE_ENFORCE_GE( + reply_up_per_steps_, 1, + platform::errors::InvalidArgument( + "reply_up_per_steps must >= 1, but now is %ld", reply_up_per_steps_)); + PADDLE_ENFORCE_GE(send_down_per_steps_, 1, + platform::errors::InvalidArgument( + "send_down_per_steps must >= 1, but now is %ld", + send_down_per_steps_)); +} + +void AmplifierInterceptor::RunOps() { + // run_per_steps_, run_at_offset_ + // 4, 0 --> run at step 0, 4, 8, 12 + // 4, 3 --> run at step 3, 7, 11, 15 + if ((step_ % run_per_steps_) == run_at_offset_) { + ComputeInterceptor::RunOps(); + } +} + +void AmplifierInterceptor::SendDataReadyToDownStream() { + // run multi times, send ready one times to downstream, that is + // input multi times, output one times + if (step_ % send_down_per_steps_ == 0) { + ComputeInterceptor::SendDataReadyToDownStream(); + } +} + +void AmplifierInterceptor::ReplyCompletedToUpStream() { + // run multi times, reply one times to upstream, that is + // input one times, output multi times + if (step_ % reply_up_per_steps_ == 0) { + ComputeInterceptor::ReplyCompletedToUpStream(); + } +} + +REGISTER_INTERCEPTOR(Amplifier, AmplifierInterceptor); + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/fleet_executor/amplifier_interceptor.h b/paddle/fluid/distributed/fleet_executor/amplifier_interceptor.h new file mode 100644 index 00000000000..776aa8d3e88 --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/amplifier_interceptor.h @@ -0,0 +1,43 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include "paddle/fluid/distributed/fleet_executor/compute_interceptor.h" + +namespace paddle { +namespace distributed { + +class AmplifierInterceptor : public ComputeInterceptor { + public: + AmplifierInterceptor(int64_t interceptor_id, TaskNode* node); + + private: + void RunOps() override; + void SendDataReadyToDownStream() override; + void ReplyCompletedToUpStream() override; + + int64_t run_per_steps_{1}; + int64_t run_at_offset_{0}; + + // one input produces multi times output + int64_t reply_up_per_steps_{1}; + // one output need multi times input + int64_t send_down_per_steps_{1}; +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index 73f22592dc3..55878a1000e 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -24,6 +24,7 @@ namespace paddle { namespace distributed { USE_INTERCEPTOR(Compute); +USE_INTERCEPTOR(Amplifier); void Carrier::Init(std::shared_ptr runtime_graph, framework::Scope* root_scope, diff --git a/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc b/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc index 3d4078c932f..09275dc10a1 100644 --- a/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc +++ b/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc @@ -160,15 +160,18 @@ void ComputeInterceptor::ReplyCompletedToUpStream() { } } +void ComputeInterceptor::RunOps() { + VLOG(3) << "ComputeInterceptor " << interceptor_id_ << " running ops."; + for (auto op : node_->ops()) { + op->Run(*microbatch_scopes_[step_ % node_->max_run_times()], place_); + } +} + void ComputeInterceptor::Run() { while (IsInputReady() && CanWriteOutput() && !ShouldReset()) { VLOG(3) << "id=" << GetInterceptorId() << " ComputeInterceptor running"; - // step_ %= node_->max_run_times(); - for (auto op : node_->ops()) { - auto* scope = microbatch_scopes_[step_ % node_->max_run_times()]; - op->Run(*scope, place_); - } + RunOps(); ++step_; // send to downstream and increase buff used @@ -176,7 +179,7 @@ void ComputeInterceptor::Run() { // reply to upstream and decrease ready data ReplyCompletedToUpStream(); // Try to stop Carrier - if (step_ % node_->max_run_times() == 0 && is_last_) { + if (is_last_ && (step_ % node_->max_run_times() == 0)) { StopCarrier(); } } diff --git a/paddle/fluid/distributed/fleet_executor/compute_interceptor.h b/paddle/fluid/distributed/fleet_executor/compute_interceptor.h index 8ed443ca971..ae253f844aa 100644 --- a/paddle/fluid/distributed/fleet_executor/compute_interceptor.h +++ b/paddle/fluid/distributed/fleet_executor/compute_interceptor.h @@ -25,6 +25,14 @@ class ComputeInterceptor : public Interceptor { public: ComputeInterceptor(int64_t interceptor_id, TaskNode* node); + protected: + virtual void RunOps(); + virtual void SendDataReadyToDownStream(); + virtual void ReplyCompletedToUpStream(); + + int64_t step_{0}; + + private: void PrepareDeps(); void IncreaseReady(int64_t up_id); @@ -33,19 +41,14 @@ class ComputeInterceptor : public Interceptor { bool CanWriteOutput(); bool ShouldReset(); - void SendDataReadyToDownStream(); - void ReplyCompletedToUpStream(); - void Run(); void Compute(const InterceptorMessage& msg); void ReceivedStop(int64_t up_id); void TryStop(); - private: bool is_source_{false}; bool is_last_{false}; - int64_t step_{0}; // upstream_id-->(max_ready_size, ready_size) std::map> in_readys_{}; diff --git a/paddle/fluid/distributed/fleet_executor/task_node.h b/paddle/fluid/distributed/fleet_executor/task_node.h index 8f4f9d80c42..762b46d6230 100644 --- a/paddle/fluid/distributed/fleet_executor/task_node.h +++ b/paddle/fluid/distributed/fleet_executor/task_node.h @@ -44,12 +44,22 @@ class TaskNode final { int32_t role() const { return role_; } int64_t max_run_times() const { return max_run_times_; } int64_t max_slot_nums() const { return max_slot_nums_; } + int64_t run_per_steps() const { return run_per_steps_; } + int64_t run_at_offset() const { return run_at_offset_; } + int64_t reply_up_per_steps() const { return reply_up_per_steps_; } + int64_t send_down_per_steps() const { return send_down_per_steps_; } const std::unordered_set& upstream() const { return upstream_; } const std::unordered_set& downstream() const { return downstream_; } const std::string& type() const { return type_; } const paddle::framework::ProgramDesc& program() const { return program_; } const std::vector& ops() const { return ops_; } + void SetRunPerSteps(int64_t value) { run_per_steps_ = value; } + void SetRunAtOffset(int64_t value) { run_at_offset_ = value; } + void SetReplyUpPerSteps(int64_t value) { reply_up_per_steps_ = value; } + void SetSendDownPerSteps(int64_t value) { send_down_per_steps_ = value; } + void SetType(const std::string& type) { type_ = type; } + bool AddUpstreamTask(int64_t task_id); bool AddDownstreamTask(int64_t task_id); std::string DebugString() const; @@ -76,6 +86,13 @@ class TaskNode final { int64_t max_run_times_; int64_t max_slot_nums_; + int64_t run_per_steps_{1}; + int64_t run_at_offset_{0}; + // one input produces multi times output + int64_t reply_up_per_steps_{1}; + // one output need multi times input + int64_t send_down_per_steps_{1}; + std::string type_; }; diff --git a/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt index b0f00d70584..d4587b90c87 100644 --- a/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt +++ b/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt @@ -4,6 +4,12 @@ cc_test(interceptor_ping_pong_test SRCS interceptor_ping_pong_test.cc DEPS fleet set_source_files_properties(compute_interceptor_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) cc_test(compute_interceptor_test SRCS compute_interceptor_test.cc DEPS fleet_executor ${BRPC_DEPS}) +set_source_files_properties(interceptor_pipeline_short_path_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) +cc_test(interceptor_pipeline_short_path_test SRCS interceptor_pipeline_short_path_test.cc DEPS fleet_executor ${BRPC_DEPS}) + +set_source_files_properties(interceptor_pipeline_long_path_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) +cc_test(interceptor_pipeline_long_path_test SRCS interceptor_pipeline_long_path_test.cc DEPS fleet_executor ${BRPC_DEPS}) + set_source_files_properties(compute_interceptor_run_op_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) cc_test(compute_interceptor_run_op_test SRCS compute_interceptor_run_op_test.cc DEPS fleet_executor ${BRPC_DEPS} op_registry fill_constant_op elementwise_add_op scope device_context) diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_long_path_test.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_long_path_test.cc new file mode 100644 index 00000000000..b3fdb0b7adf --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_long_path_test.cc @@ -0,0 +1,94 @@ +/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include + +#include "gtest/gtest.h" + +#include "paddle/fluid/distributed/fleet_executor/carrier.h" +#include "paddle/fluid/distributed/fleet_executor/interceptor.h" +#include "paddle/fluid/distributed/fleet_executor/message_bus.h" +#include "paddle/fluid/distributed/fleet_executor/task_node.h" + +namespace paddle { +namespace distributed { + +void LinkNodes(const std::vector& nodes) { + size_t size = nodes.size(); + if (size <= 1) return; + + { // i = 0 + TaskNode* now = nodes[0]; + TaskNode* next = nodes[1]; + now->AddDownstreamTask(next->task_id()); + } + { // i = size - 1 + TaskNode* prev = nodes[size - 2]; + TaskNode* now = nodes[size - 1]; + now->AddUpstreamTask(prev->task_id()); + } + + for (size_t i = 1; i < size - 1; ++i) { + TaskNode* prev = nodes[i - 1]; + TaskNode* now = nodes[i]; + TaskNode* next = nodes[i + 1]; + + now->AddUpstreamTask(prev->task_id()); + now->AddDownstreamTask(next->task_id()); + } +} + +TEST(AmplifierInterceptor, Amplifier) { + Carrier& carrier = Carrier::Instance(); + MessageBus& msg_bus = MessageBus::Instance(); + msg_bus.Init({{0, 0}, {1, 0}, {2, 0}, {3, 0}, {4, 0}, {5, 0}}, + {{0, "127.0.0.0:0"}}, "127.0.0.0:0"); + + int64_t micro_steps = 3; + + // NOTE: don't delete, otherwise interceptor will use undefined node + TaskNode* node_a = new TaskNode(0, 0, 0, 1, 0); // role, rank, task_id + TaskNode* node_b = new TaskNode(0, 0, 1, 1, 0); + TaskNode* node_c = new TaskNode(0, 0, 2, 1, 0); + TaskNode* node_d = new TaskNode(0, 0, 3, 1, 0); + TaskNode* node_e = new TaskNode(0, 0, 4, 1, 0); + TaskNode* node_f = new TaskNode(0, 0, 5, 1, 0); + + // a->b->c->d->e->f + LinkNodes({node_a, node_b, node_c, node_d, node_e, node_f}); + + // LR->b(1:3)->F->B->e(3:1)->U + node_b->SetReplyUpPerSteps(micro_steps); + node_e->SetSendDownPerSteps(micro_steps); + + carrier.SetInterceptor(0, InterceptorFactory::Create("Compute", 0, node_a)); + carrier.SetInterceptor(1, InterceptorFactory::Create("Amplifier", 1, node_b)); + carrier.SetInterceptor(2, InterceptorFactory::Create("Compute", 2, node_c)); + carrier.SetInterceptor(3, InterceptorFactory::Create("Compute", 3, node_d)); + carrier.SetInterceptor(4, InterceptorFactory::Create("Amplifier", 4, node_e)); + carrier.SetInterceptor(5, InterceptorFactory::Create("Compute", 5, node_f)); + + carrier.SetCreatingFlag(false); + + // start + InterceptorMessage msg; + msg.set_message_type(DATA_IS_READY); + msg.set_src_id(-1); + msg.set_dst_id(0); + carrier.EnqueueInterceptorMessage(msg); +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_short_path_test.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_short_path_test.cc new file mode 100644 index 00000000000..db42135040a --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_short_path_test.cc @@ -0,0 +1,90 @@ +/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include + +#include "gtest/gtest.h" + +#include "paddle/fluid/distributed/fleet_executor/carrier.h" +#include "paddle/fluid/distributed/fleet_executor/interceptor.h" +#include "paddle/fluid/distributed/fleet_executor/message_bus.h" +#include "paddle/fluid/distributed/fleet_executor/task_node.h" + +namespace paddle { +namespace distributed { + +void LinkNodes(const std::vector& nodes) { + size_t size = nodes.size(); + if (size <= 1) return; + + { // i = 0 + TaskNode* now = nodes[0]; + TaskNode* next = nodes[1]; + now->AddDownstreamTask(next->task_id()); + } + { // i = size - 1 + TaskNode* prev = nodes[size - 2]; + TaskNode* now = nodes[size - 1]; + now->AddUpstreamTask(prev->task_id()); + } + + for (size_t i = 1; i < size - 1; ++i) { + TaskNode* prev = nodes[i - 1]; + TaskNode* now = nodes[i]; + TaskNode* next = nodes[i + 1]; + + now->AddUpstreamTask(prev->task_id()); + now->AddDownstreamTask(next->task_id()); + } +} + +TEST(AmplifierInterceptor, Amplifier) { + Carrier& carrier = Carrier::Instance(); + MessageBus& msg_bus = MessageBus::Instance(); + msg_bus.Init({{0, 0}, {1, 0}, {2, 0}, {3, 0}}, {{0, ""}}, ""); + + int64_t micro_steps = 3; + + // NOTE: don't delete, otherwise interceptor will use undefined node + TaskNode* node_a = + new TaskNode(0, 0, 0, micro_steps, 0); // role, rank, task_id + TaskNode* node_b = new TaskNode(0, 0, 1, 3, 0); + TaskNode* node_c = new TaskNode(0, 0, 2, 3, 0); + TaskNode* node_d = new TaskNode(0, 0, 3, micro_steps, 0); + + // a->b->c->d + LinkNodes({node_a, node_b, node_c, node_d}); + + node_a->SetRunPerSteps(micro_steps); + node_d->SetRunPerSteps(micro_steps); + node_d->SetRunAtOffset(micro_steps - 1); + + carrier.SetInterceptor(0, InterceptorFactory::Create("Amplifier", 0, node_a)); + carrier.SetInterceptor(1, InterceptorFactory::Create("Compute", 1, node_b)); + carrier.SetInterceptor(2, InterceptorFactory::Create("Compute", 2, node_c)); + carrier.SetInterceptor(3, InterceptorFactory::Create("Amplifier", 3, node_d)); + + carrier.SetCreatingFlag(false); + + // start + InterceptorMessage msg; + msg.set_message_type(DATA_IS_READY); + msg.set_src_id(-1); + msg.set_dst_id(0); + carrier.EnqueueInterceptorMessage(msg); +} + +} // namespace distributed +} // namespace paddle -- GitLab