diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index f19943178b056999883aff4cbab6309657250122..b38abde25401df932a73cfa21fe52fcd586c5755 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -28,6 +28,8 @@ if(WITH_GPU) dynload_cuda variable_visitor) nv_library(fused_all_reduce_op_handle SRCS fused_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory dynload_cuda variable_visitor place device_memory_aligment) + nv_library(grad_merge_all_reduce_op_handle SRCS grad_merge_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor + ddim memory dynload_cuda variable_visitor place device_memory_aligment all_reduce_op_handle fused_all_reduce_op_handle) if(WITH_DGC) nv_library(sparse_all_reduce_op_handle SRCS sparse_all_reduce_op_handle.cc DEPS op_handle_base scope @@ -50,6 +52,8 @@ else() variable_visitor) cc_library(fused_all_reduce_op_handle SRCS fused_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory variable_visitor place device_memory_aligment) + cc_library(grad_merge_all_reduce_op_handle SRCS grad_merge_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor + ddim memory variable_visitor place device_memory_aligment all_reduce_op_handle fused_all_reduce_op_handle) if(WITH_DISTRIBUTE) cc_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope ddim selected_rows_functor) diff --git a/paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.cc b/paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.cc new file mode 100644 index 0000000000000000000000000000000000000000..c010b9e640d62c201f15853ca8714efdd7f9d216 --- /dev/null +++ b/paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.cc @@ -0,0 +1,132 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.h" + +#include + +#include "paddle/fluid/framework/details/container_cast.h" +#include "paddle/fluid/framework/details/reduce_and_gather.h" +#include "paddle/fluid/framework/details/variable_visitor.h" +#include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/platform/gpu_info.h" +#include "paddle/fluid/platform/profiler.h" + +#ifdef PADDLE_WITH_NCCL +DECLARE_bool(sync_nccl_allreduce); +#endif + +namespace paddle { +namespace framework { +namespace details { + +#if defined(PADDLE_WITH_NCCL) +GradMergeAllReduceOpHandle::GradMergeAllReduceOpHandle( + ir::Node *node, const std::vector &local_scopes, + const std::vector &places, + const std::string &grad_merge_cond_name, + const platform::NCCLCommunicator *ctxs) + : AllReduceOpHandle(node, local_scopes, places, ctxs), + grad_merge_cond_name_(grad_merge_cond_name) {} +#elif defined(PADDLE_WITH_XPU_BKCL) +GradMergeAllReduceOpHandle::GradMergeAllReduceOpHandle( + ir::Node *node, const std::vector &local_scopes, + const std::vector &places, + const std::string &grad_merge_cond_name, + const platform::BKCLCommunicator *ctxs) + : AllReduceOpHandle(node, local_scopes, places, ctxs), + grad_merge_cond_name_(grad_merge_cond_name) {} +#else +GradMergeAllReduceOpHandle::GradMergeAllReduceOpHandle( + ir::Node *node, const std::vector &local_scopes, + const std::vector &places, + const std::string &grad_merge_cond_name) + : AllReduceOpHandle(node, local_scopes, places), + grad_merge_cond_name_(grad_merge_cond_name) {} +#endif + +void GradMergeAllReduceOpHandle::RunImpl() { + PADDLE_ENFORCE_GT(local_scopes_.size(), 0, + platform::errors::PreconditionNotMet( + "The number of local scope should be > 0, but got %zu.", + local_scopes_.size())); + + auto *local_scope = local_exec_scopes_[0]; + auto cond_var = local_scope->FindVar(grad_merge_cond_name_); + PADDLE_ENFORCE_NOT_NULL( + cond_var, platform::errors::NotFound("Variable %s is not found in scope.", + cond_var)); + bool cond = *cond_var->Get().data(); + + if (cond) { + AllReduceOpHandle::RunImpl(); + } +} + +std::string GradMergeAllReduceOpHandle::Name() const { + return "grad_merge_all_reduce"; +} + +#if defined(PADDLE_WITH_NCCL) +FusedGradMergeAllReduceOpHandle::FusedGradMergeAllReduceOpHandle( + ir::Node *node, const std::vector &local_scopes, + const std::vector &places, const size_t num_of_all_reduce, + const std::string &grad_merge_cond_name, + const platform::NCCLCommunicator *ctxs) + : FusedAllReduceOpHandle(node, local_scopes, places, num_of_all_reduce, + ctxs), + grad_merge_cond_name_(grad_merge_cond_name) {} +#elif defined(PADDLE_WITH_XPU_BKCL) +FusedGradMergeAllReduceOpHandle::FusedGradMergeAllReduceOpHandle( + ir::Node *node, const std::vector &local_scopes, + const std::vector &places, const size_t num_of_all_reduce, + const std::string &grad_merge_cond_name, + const platform::BKCLCommunicator *ctxs) + : FusedAllReduceOpHandle(node, local_scopes, places, num_of_all_reduce, + ctxs), + grad_merge_cond_name_(grad_merge_cond_name) {} +#else +FusedGradMergeAllReduceOpHandle::FusedGradMergeAllReduceOpHandle( + ir::Node *node, const std::vector &local_scopes, + const std::vector &places, const size_t num_of_all_reduce, + const std::string &grad_merge_cond_name) + : FusedAllReduceOpHandle(node, local_scopes, places, num_of_all_reduce), + grad_merge_cond_name_(grad_merge_cond_name) {} +#endif + +void FusedGradMergeAllReduceOpHandle::RunImpl() { + PADDLE_ENFORCE_GT(local_scopes_.size(), 0, + platform::errors::PreconditionNotMet( + "The number of local scope should be > 0, but got %zu.", + local_scopes_.size())); + + auto *local_scope = local_exec_scopes_[0]; + auto cond_var = local_scope->FindVar(grad_merge_cond_name_); + PADDLE_ENFORCE_NOT_NULL( + cond_var, platform::errors::NotFound("Variable %s is not found in scope.", + cond_var)); + bool cond = *cond_var->Get().data(); + + if (cond) { + VLOG(10) << "run fused grad merge all reduce"; + FusedAllReduceOpHandle::RunImpl(); + } +} + +std::string FusedGradMergeAllReduceOpHandle::Name() const { + return "fused_grad_merge_all_reduce"; +} + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.h b/paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.h new file mode 100644 index 0000000000000000000000000000000000000000..5c18f8fef11f02fefa3fc52fa5016d613ba414fa --- /dev/null +++ b/paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.h @@ -0,0 +1,111 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "paddle/fluid/framework/details/all_reduce_op_handle.h" +#include "paddle/fluid/framework/details/fused_all_reduce_op_handle.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/scope.h" + +namespace paddle { +namespace framework { +namespace ir { +class Node; +} // namespace ir +} // namespace framework +namespace platform { +class NCCLCommunicator; +} // namespace platform +} // namespace paddle +#if defined(PADDLE_WITH_NCCL) +#include "paddle/fluid/framework/details/nccl_op_handle.h" +#include "paddle/fluid/platform/nccl_helper.h" +#endif + +namespace paddle { +namespace framework { +namespace details { + +class GradMergeAllReduceOpHandle : public AllReduceOpHandle { + public: +#if defined(PADDLE_WITH_NCCL) + GradMergeAllReduceOpHandle(ir::Node *node, + const std::vector &local_scopes, + const std::vector &places, + const std::string &grad_merge_cond_name, + const platform::NCCLCommunicator *ctxs); +#elif defined(PADDLE_WITH_XPU_BKCL) + GradMergeAllReduceOpHandle(ir::Node *node, + const std::vector &local_scopes, + const std::vector &places, + const std::string &grad_merge_cond_name, + const platform::BKCLCommunicator *ctxs); +#else + GradMergeAllReduceOpHandle(ir::Node *node, + const std::vector &local_scopes, + const std::vector &places, + const std::string &grad_merge_cond_name); +#endif + std::string Name() const override; + + std::string GradMergeCondName() { return grad_merge_cond_name_; } + + protected: + void RunImpl() override; + + private: + std::string grad_merge_cond_name_; +}; + +class FusedGradMergeAllReduceOpHandle : public FusedAllReduceOpHandle { + public: +#if defined(PADDLE_WITH_NCCL) + FusedGradMergeAllReduceOpHandle(ir::Node *node, + const std::vector &local_scopes, + const std::vector &places, + const size_t num_of_all_reduce, + const std::string &grad_merge_cond_name, + const platform::NCCLCommunicator *ctxs); +#elif defined(PADDLE_WITH_XPU_BKCL) + FusedGradMergeAllReduceOpHandle(ir::Node *node, + const std::vector &local_scopes, + const std::vector &places, + const size_t num_of_all_reduce, + const std::string &grad_merge_cond_name, + const platform::BKCLCommunicator *ctxs); +#else + FusedGradMergeAllReduceOpHandle(ir::Node *node, + const std::vector &local_scopes, + const std::vector &places, + const size_t num_of_all_reduce, + const std::string &grad_merge_cond_name); +#endif + + std::string Name() const override; + + protected: + void RunImpl() override; + + private: + std::string grad_merge_cond_name_; +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/multi_devices_helper.h b/paddle/fluid/framework/details/multi_devices_helper.h index 304e7f037520a7bb1de04a9210f478463cdf60be..7e2c41dd4f7950c304619243bc5c3db179407236 100644 --- a/paddle/fluid/framework/details/multi_devices_helper.h +++ b/paddle/fluid/framework/details/multi_devices_helper.h @@ -22,6 +22,7 @@ #include #include "paddle/fluid/framework/details/op_handle_base.h" +#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h" #include "paddle/fluid/framework/details/var_handle.h" #include "paddle/fluid/framework/ir/graph.h" #include "paddle/fluid/framework/ir/pass.h" @@ -62,7 +63,7 @@ constexpr char kUseHierarchicalAllReduce[] = "use_hierarchical_allreduce"; typedef std::unordered_set GraphDepVars; constexpr char kGraphDepVars[] = "dep_vars"; -typedef std::unordered_set FusedVars; +typedef std::unordered_map FusedVars; constexpr char kFusedVars[] = "fused_vars"; constexpr char kFusedVarNamePrefix[] = "@FUSEDVAR@"; @@ -78,6 +79,7 @@ constexpr char kParamsAndSparseGrads[] = "params_and_sparse_grads"; typedef std::vector ProgramDescs; constexpr char kProgramDescs[] = "program_descs"; +constexpr char kStartupProgramDescs[] = "startup_program_descs"; typedef std::unordered_set PinnedVars; constexpr char kPinnedVars[] = "pinned_vars"; diff --git a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc index 7cc1f54131416ed454846c75c8c8a6849ec20e6c..ad47846c59a05bf92c6f05d8b71cfb04fb3b0f07 100644 --- a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc @@ -123,17 +123,27 @@ void ScopeBufferedSSAGraphExecutor::InitVariables() { } const ir::Graph &graph = Graph(); + if (!is_initialized_) { + // startup_program_descs only need to be executed once + if (graph.Has(details::kStartupProgramDescs)) { + auto &program_descs = + graph.Get(details::kStartupProgramDescs); + + for (auto &program_desc : program_descs) { + for (auto &op_desc : program_desc.Block(0).AllOps()) { + for (size_t i = 0; i < local_exec_scopes_.size(); ++i) { + auto op = OpRegistry::CreateOp(*op_desc); + op->Run(*local_exec_scopes_[i], places_[i]); + } + } + } + } + is_initialized_ = true; + } + if (graph.Has(details::kProgramDescs)) { auto &program_descs = graph.Get(details::kProgramDescs); - // Init vars - auto &fused_grad_vars = graph.Get(details::kFusedVars); - for (size_t i = 0; i < local_exec_scopes_.size(); ++i) { - for (auto &var_name : fused_grad_vars) { - auto var = local_exec_scopes_[i]->Var(var_name); - var->GetMutable(); - } - } for (auto &program_desc : program_descs) { for (auto &op_desc : program_desc.Block(0).AllOps()) { diff --git a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h index f5d0ffe109501e86ac8b604f714fe402349b678f..aa2b113c960a38689223bf5d80c57b469d954019 100644 --- a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h @@ -64,6 +64,7 @@ class ScopeBufferedSSAGraphExecutor : public SSAGraphExecutor { bool DropScopeOrNot() const; + bool is_initialized_{false}; size_t drop_scope_counter_{0}; ExecutionStrategy strategy_; std::unique_ptr underlying_executor_; diff --git a/paddle/fluid/framework/ir/coalesce_grad_tensor_pass.cc b/paddle/fluid/framework/ir/coalesce_grad_tensor_pass.cc index f3634f90e6c6984f494d0f571d0b11ecc713696d..d93841a42544d148b1755602c28c298007839dc6 100644 --- a/paddle/fluid/framework/ir/coalesce_grad_tensor_pass.cc +++ b/paddle/fluid/framework/ir/coalesce_grad_tensor_pass.cc @@ -199,19 +199,42 @@ class CoalesceGradTensorPass : public ir::Pass { if (!result->Has(details::kFusedGrads)) { result->Set(details::kFusedGrads, new details::FusedGrads); } + if (!result->Has(details::kStartupProgramDescs)) { + result->Set(details::kStartupProgramDescs, new details::ProgramDescs); + } if (!result->Has(details::kProgramDescs)) { result->Set(details::kProgramDescs, new details::ProgramDescs); } + + auto type = GetTypeOfVar(vars_info, params_grads.front().second); + + bool persistable = false; + for (auto &p_g : params_grads) { + if (IsPersistableVar(vars_info, p_g.second)) { + // NOTE. If one of the grads is persistable, then the fused_grad_var + // should be set to persistable. + persistable = true; + break; + } + } + // the fused_var_name should be unique, so it appends // params_grads.begin()->second. auto fused_grad_var_name = std::string(details::kFusedVarNamePrefix) + "@GRAD@" + params_grads.begin()->second; + // what a pity, visual c++ unsupport {.type_ = type} + details::VariableInfo var_info; + var_info.name_ = fused_grad_var_name; + var_info.type_ = type; + var_info.persistable_ = persistable; + auto &fused_var_set = result->Get(details::kFusedVars); PADDLE_ENFORCE_EQ( fused_var_set.count(fused_grad_var_name), 0, platform::errors::AlreadyExists("Var(%s) is duplicate in FusedVars.", fused_grad_var_name)); - fused_var_set.insert(fused_grad_var_name); + fused_var_set.insert({fused_grad_var_name, var_info}); + result->Get(details::kFusedGrads) .emplace_back(fused_grad_var_name); @@ -414,6 +437,13 @@ class CoalesceGradTensorPass : public ir::Pass { return var_desc->GetType(); } + bool IsPersistableVar( + const std::unordered_map> &vars_info, + const std::string &name) const { + auto var_desc = GetVarDescFromVarsInfo(vars_info, name); + return var_desc->Persistable(); + } + private: bool IsLoDTensorType(const proto::VarType::Type &type) const { // Current only support LOD_TENSOR. @@ -494,18 +524,46 @@ class CoalesceGradTensorPass : public ir::Pass { DataTypeToString(next_dtype), DataTypeToString(dtype))); } - result->Get(details::kProgramDescs).emplace_back(); - ProgramDesc &program_desc = - result->Get(details::kProgramDescs).back(); - auto *global_block = program_desc.MutableBlock(0); - AppendAllocSpaceForVarsOp(params_name, grads_name, fused_var_name, dtype, - global_block); + bool any_persistable = false; + bool all_persistable = true; + for (auto &p_g : params_grads) { + if (IsPersistableVar(vars_info, p_g.second)) { + any_persistable = true; + } else { + all_persistable = false; + } + } + + if (all_persistable) { + // All grads are persistable, only need to be executed once at the + // beginning. + result->Get(details::kStartupProgramDescs) + .emplace_back(); + ProgramDesc &program_desc = + result->Get(details::kStartupProgramDescs) + .back(); + auto *global_block = program_desc.MutableBlock(0); + AppendAllocSpaceForVarsOp(params_name, grads_name, fused_var_name, dtype, + all_persistable, global_block); + } else { + // NOTE. In scope_buffered_ssa_graph_executor, after each execution of + // DropScope(), non persistable vars will be Erase or Clear. So + // coalesce_tensor op needs to be executed again after the execution + // of DropScope(). + result->Get(details::kProgramDescs).emplace_back(); + ProgramDesc &program_desc = + result->Get(details::kProgramDescs).back(); + auto *global_block = program_desc.MutableBlock(0); + AppendAllocSpaceForVarsOp(params_name, grads_name, fused_var_name, dtype, + any_persistable, global_block); + } } void AppendAllocSpaceForVarsOp(const std::vector ¶ms_name, const std::vector &grads_name, const std::string &fused_var_name, const proto::VarType::Type &dtype, + bool persistable, BlockDesc *global_block) const { auto op_desc = global_block->AppendOp(); op_desc->SetType("coalesce_tensor"); @@ -513,6 +571,8 @@ class CoalesceGradTensorPass : public ir::Pass { op_desc->SetOutput("Output", grads_name); op_desc->SetOutput("FusedOutput", {fused_var_name}); op_desc->SetAttr("dtype", static_cast(dtype)); + + op_desc->SetAttr("persist_output", persistable); } }; } // namespace ir diff --git a/paddle/fluid/framework/ir/fuse_optimizer_ops_pass/fuse_optimizer_op_pass.cc b/paddle/fluid/framework/ir/fuse_optimizer_ops_pass/fuse_optimizer_op_pass.cc index fa86db891f88108f96d42ca3f1640a5b878d16aa..ebc9f37d1db0f462196d5c0d255e344b1239c1b8 100644 --- a/paddle/fluid/framework/ir/fuse_optimizer_ops_pass/fuse_optimizer_op_pass.cc +++ b/paddle/fluid/framework/ir/fuse_optimizer_ops_pass/fuse_optimizer_op_pass.cc @@ -76,6 +76,9 @@ void FuseOptimizerOpPass::ApplyImpl(ir::Graph *graph) const { result.Set(details::kFusedOptType, new details::FusedOptType); result.Get(details::kFusedOptType) = fuse_op_type; + if (!result.Has(details::kStartupProgramDescs)) { + result.Set(details::kStartupProgramDescs, new details::ProgramDescs); + } if (!result.Has(details::kProgramDescs)) { result.Set(details::kProgramDescs, new details::ProgramDescs); } @@ -100,7 +103,12 @@ void FuseOptimizerOpPass::ApplyImpl(ir::Graph *graph) const { fused_var_set.count(fused_var_name), 0, platform::errors::AlreadyExists( "The fused variable(%s) already exists.", fused_var_name)); - fused_var_set.insert(fused_var_name); + // FIXME(wangxi). update persistable + details::VariableInfo var_info; + var_info.name_ = fused_var_name; + var_info.type_ = proto::VarType::LOD_TENSOR; + var_info.persistable_ = false; + fused_var_set.insert({fused_var_name, var_info}); fused_vars_name.emplace(var_name, fused_var_name); } @@ -151,8 +159,8 @@ void FuseOptimizerOpPass::ApplyImpl(ir::Graph *graph) const { return; } auto &fused_vars = result.Get(details::kFusedVars); - auto iter = - std::find(fused_vars.begin(), fused_vars.end(), fused_grad.front()); + + auto iter = fused_vars.find(fused_grad.front()); PADDLE_ENFORCE_EQ( iter != fused_vars.end(), true, platform::errors::NotFound("Not found the fused gradient variable.")); diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt b/paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt index 6eab32ab9202404fd3ab5e6056c2bfe768ebc1dd..2f79c425e1d164dd6fde523ad4825dcf483cc19c 100644 --- a/paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt @@ -4,6 +4,7 @@ cc_library(multi_devices_graph_print_pass SRCS multi_devices_graph_print_pass.cc cc_library(multi_devices_graph_check_pass SRCS multi_devices_graph_check_pass.cc DEPS multi_devices_helper) set(ALL_REDUCE_OP_HANDLES all_reduce_op_handle) +set(ALL_REDUCE_OP_HANDLES grad_merge_all_reduce_op_handle) if(WITH_GPU AND WITH_DGC) list(APPEND ALL_REDUCE_OP_HANDLES sparse_all_reduce_op_handle) endif() @@ -13,7 +14,7 @@ cc_library(multi_devices_graph_pass SRCS multi_devices_graph_pass.cc DEPS multi_ cc_library(sequential_execution_pass SRCS sequential_execution_pass.cc DEPS graph graph_helper pass) cc_library(set_reader_device_info_utils SRCS set_reader_device_info_utils.cc DEPS graph graph_helper pass multi_devices_graph_pass) -cc_library(fuse_all_reduce_op_pass SRCS fuse_all_reduce_op_pass.cc DEPS graph graph_helper fused_all_reduce_op_handle) +cc_library(fuse_all_reduce_op_pass SRCS fuse_all_reduce_op_pass.cc DEPS graph graph_helper fused_all_reduce_op_handle grad_merge_all_reduce_op_handle) cc_library(all_reduce_deps_pass SRCS all_reduce_deps_pass.cc DEPS all_reduce_op_handle graph graph_helper pass) cc_library(backward_optimizer_op_deps_pass SRCS backward_optimizer_op_deps_pass.cc DEPS graph graph_helper pass) cc_library(add_reader_dependency_pass SRCS add_reader_dependency_pass.cc DEPS graph graph_helper pass) diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/fuse_all_reduce_op_pass.cc b/paddle/fluid/framework/ir/multi_devices_graph_pass/fuse_all_reduce_op_pass.cc index b0ab6d23afb840af42234aabea27bafc3d3017a0..dfd275d9bc5b026ba05ec57fb49b73b6de02bfd0 100644 --- a/paddle/fluid/framework/ir/multi_devices_graph_pass/fuse_all_reduce_op_pass.cc +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/fuse_all_reduce_op_pass.cc @@ -19,6 +19,7 @@ #include "paddle/fluid/framework/details/all_reduce_op_handle.h" #include "paddle/fluid/framework/details/container_cast.h" #include "paddle/fluid/framework/details/fused_all_reduce_op_handle.h" +#include "paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.h" #include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/ir/graph_helper.h" @@ -164,6 +165,38 @@ class FuseAllReduceOpPass : public ir::Pass { const platform::BKCLCommunicator *multi_bkcl_ctxs, #endif ir::Graph *result) const { + bool is_grad_merge = false; + std::string grad_merge_cond_name; + for (auto &op : all_reduce_ops) { + auto *grad_merge_all_reduce_op_handle = + dynamic_cast( + &op->Wrapper()); + if (grad_merge_all_reduce_op_handle) { + if (is_grad_merge) { + auto this_grad_merge_cond_name = + grad_merge_all_reduce_op_handle->GradMergeCondName(); + + PADDLE_ENFORCE_EQ( + grad_merge_cond_name, this_grad_merge_cond_name, + platform::errors::InvalidArgument( + "grad_merge_cond_name is not same in different all_reduce, " + "prev_grad_merge_cond_name is %s, this_grad_merge_cond_name " + "is %s", + grad_merge_cond_name, this_grad_merge_cond_name)); + } else { + is_grad_merge = true; + grad_merge_cond_name = + grad_merge_all_reduce_op_handle->GradMergeCondName(); + } + } else { + PADDLE_ENFORCE_EQ(is_grad_merge, false, + platform::errors::InvalidArgument( + "if use grad_merge, all of allreduce must be " + "grad_merge_allreduce")); + } + } + VLOG(6) << "fused allreduce use_grad_merge=" << is_grad_merge; + std::vector inputs; std::vector outputs; for (auto &op : all_reduce_ops) { @@ -189,13 +222,16 @@ class FuseAllReduceOpPass : public ir::Pass { #if defined(PADDLE_WITH_NCCL) CreateFusedAllReduceOp(inputs, outputs, num_of_all_reduce, places, - local_scopes, multi_nccl_ctxs, result); + local_scopes, is_grad_merge, grad_merge_cond_name, + multi_nccl_ctxs, result); #elif defined(PADDLE_WITH_XPU_BKCL) CreateFusedAllReduceOp(inputs, outputs, num_of_all_reduce, places, - local_scopes, multi_bkcl_ctxs, result); + local_scopes, is_grad_merge, grad_merge_cond_name, + multi_bkcl_ctxs, result); #else CreateFusedAllReduceOp(inputs, outputs, num_of_all_reduce, places, - local_scopes, result); + local_scopes, is_grad_merge, grad_merge_cond_name, + result); #endif } @@ -205,26 +241,52 @@ class FuseAllReduceOpPass : public ir::Pass { const std::vector &outputs, const size_t num_of_all_reduce, const std::vector &places, - const std::vector &local_scopes, + const std::vector &local_scopes, bool is_grad_merge, + const std::string &grad_merge_cond_name, #if defined(PADDLE_WITH_NCCL) const platform::NCCLCommunicator *multi_nccl_ctxs, #elif defined(PADDLE_WITH_XPU_BKCL) const platform::BKCLCommunicator *multi_bkcl_ctxs, #endif ir::Graph *result) const { + details::FusedAllReduceOpHandle *op_handle = NULL; + if (is_grad_merge) { +#if defined(PADDLE_WITH_NCCL) + op_handle = new details::FusedGradMergeAllReduceOpHandle( + result->CreateEmptyNode("fused_all_reduce", + ir::Node::Type::kOperation), + local_scopes, places, num_of_all_reduce, grad_merge_cond_name, + multi_nccl_ctxs); +#elif defined(PADDLE_WITH_XPU_BKCL) + op_handle = new details::FusedGradMergeAllReduceOpHandle( + result->CreateEmptyNode("fused_all_reduce", + ir::Node::Type::kOperation), + local_scopes, places, num_of_all_reduce, grad_merge_cond_name, + multi_bkcl_ctxs); +#else + op_handle = new details::FusedGradMergeAllReduceOpHandle( + result->CreateEmptyNode("fused_all_reduce", + ir::Node::Type::kOperation), + local_scopes, places, num_of_all_reduce, grad_merge_cond_name); +#endif + } else { #if defined(PADDLE_WITH_NCCL) - auto *op_handle = new details::FusedAllReduceOpHandle( - result->CreateEmptyNode("fused_all_reduce", ir::Node::Type::kOperation), - local_scopes, places, num_of_all_reduce, multi_nccl_ctxs); + op_handle = new details::FusedAllReduceOpHandle( + result->CreateEmptyNode("fused_all_reduce", + ir::Node::Type::kOperation), + local_scopes, places, num_of_all_reduce, multi_nccl_ctxs); #elif defined(PADDLE_WITH_XPU_BKCL) - auto *op_handle = new details::FusedAllReduceOpHandle( - result->CreateEmptyNode("fused_all_reduce", ir::Node::Type::kOperation), - local_scopes, places, num_of_all_reduce, multi_bkcl_ctxs); + auto *op_handle = new details::FusedAllReduceOpHandle( + result->CreateEmptyNode("fused_all_reduce", + ir::Node::Type::kOperation), + local_scopes, places, num_of_all_reduce, multi_bkcl_ctxs); #else - auto *op_handle = new details::FusedAllReduceOpHandle( - result->CreateEmptyNode("fused_all_reduce", ir::Node::Type::kOperation), - local_scopes, places, num_of_all_reduce); + op_handle = new details::FusedAllReduceOpHandle( + result->CreateEmptyNode("fused_all_reduce", + ir::Node::Type::kOperation), + local_scopes, places, num_of_all_reduce); #endif + } for (auto in : inputs) { op_handle->AddInput(in); diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc b/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc index 6fe1fcdada273f9ba30f919647a706f2650c0998..c23d357b17ef162e2cab515273fded187c12ccfb 100644 --- a/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc @@ -25,6 +25,7 @@ #include "paddle/fluid/framework/details/computation_op_handle.h" #include "paddle/fluid/framework/details/fetch_barrier_op_handle.h" #include "paddle/fluid/framework/details/fused_broadcast_op_handle.h" +#include "paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.h" #include "paddle/fluid/framework/details/reduce_op_handle.h" #include "paddle/fluid/framework/details/rpc_op_handle.h" #include "paddle/fluid/framework/details/scale_loss_grad_op_handle.h" @@ -255,7 +256,7 @@ void MultiDevSSAGraphBuilderBase::ApplyImpl(ir::Graph *graph) const { VLOG(10) << "Bcast " << g_name << " for parameter " << p_name << " op_type " << node->Op()->Type(); if (NeedCollectiveForGrad(g_name, sorted_ops)) { - InsertCollectiveOp(&result, p_name, g_name); + InsertCollectiveOp(&result, node, p_name, g_name); } } } @@ -481,45 +482,77 @@ void MultiDevSSAGraphBuilderBase::CreateComputationalOp(ir::Graph *result, } void MultiDevSSAGraphBuilderBase::CreateAllReduceOp(ir::Graph *result, + ir::Node *node, const std::string &og, bool is_encoded) const { - details::OpHandleBase *op_handle = nullptr; + const std::string GRAD_MERGE_COND_NAME = "grad_merge_cond_name"; + + bool is_grad_merge = node->Op()->HasAttr(GRAD_MERGE_COND_NAME); + std::string grad_merge_cond_name; + PADDLE_ENFORCE_EQ((is_encoded && is_grad_merge), false, + platform::errors::InvalidArgument( + "DGC and GradMerge cannot use at same time, while " + "use_dgc=%d, use_grad_merge=%d", + is_encoded, is_grad_merge)); auto append_allreduce_op = [&]( const std::vector &scopes, const std::vector &places) -> details::OpHandleBase * { -#if defined(PADDLE_WITH_DGC) && defined(PADDLE_WITH_NCCL) if (is_encoded) { +#if defined(PADDLE_WITH_DGC) && defined(PADDLE_WITH_NCCL) result->Get(kGraphOps).emplace_back( new details::SparseAllReduceOpHandle( result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation), scopes, places, multi_nccl_ctxs_, is_encoded, strategy_.num_trainers_ * places_.size())); +#else + PADDLE_THROW(platform::errors::PreconditionNotMet( + "This version of PaddlePaddle does NOT support DGC, " + "but got DGC grad in CreateAllReduceOp. " + "Please compile PaddlePaddle WITH_DGC first.")); +#endif + } else if (is_grad_merge) { + grad_merge_cond_name = BOOST_GET_CONST( + std::string, node->Op()->GetAttr(GRAD_MERGE_COND_NAME)); + VLOG(10) << "og=" << og << " use grad_merge_allreduce"; +#if defined(PADDLE_WITH_NCCL) + result->Get(kGraphOps).emplace_back( + new details::GradMergeAllReduceOpHandle( + result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation), + scopes, places, grad_merge_cond_name, multi_nccl_ctxs_)); +#elif defined(PADDLE_WITH_XPU_BKCL) + result->Get(kGraphOps).emplace_back( + new datails::GradMergeAllReduceOpHandle( + result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation), + scopes, places, grad_merge_cond_name, multi_bkcl_ctxs_)); +#else + result->Get(kGraphOps).emplace_back( + new details::GradMergeAllReduceOpHandle( + result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation), + scopes, places, grad_merge_cond_name)); +#endif } else { +#ifdef PADDLE_WITH_NCCL result->Get(kGraphOps).emplace_back( new details::AllReduceOpHandle( result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation), scopes, places, multi_nccl_ctxs_)); - } -#elif defined(PADDLE_WITH_NCCL) - result->Get(kGraphOps).emplace_back( - new details::AllReduceOpHandle( - result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation), - scopes, places, multi_nccl_ctxs_)); #elif defined(PADDLE_WITH_XPU_BKCL) - result->Get(kGraphOps).emplace_back( - new details::AllReduceOpHandle( - result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation), - scopes, places, multi_bkcl_ctxs_)); + result->Get(kGraphOps).emplace_back( + new details::AllReduceOpHandle( + result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation), + scopes, places, multi_bkcl_ctxs_)); #else - result->Get(kGraphOps).emplace_back( - new details::AllReduceOpHandle( - result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation), - scopes, places)); + result->Get(kGraphOps).emplace_back( + new details::AllReduceOpHandle( + result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation), + scopes, places)); #endif + } return result->Get(kGraphOps).back(); }; + details::OpHandleBase *op_handle = nullptr; if (!strategy_.enable_parallel_graph_) op_handle = append_allreduce_op(local_scopes_, places_); @@ -546,6 +579,36 @@ void MultiDevSSAGraphBuilderBase::CreateAllReduceOp(ir::Graph *result, op_handle->AddOutput(var); VLOG(10) << "all_reduce_op_handle add output " << og << ", handle:" << var->DebugString(); + + if (is_grad_merge) { + // NOTE(wangxi). grad_merge_cond_var is used by + // GradMergeAllReduceOpHandle, but it is not the input of + // grad_merge_all_reduce_op_handle. So we must add dep_var to resolve + // WAR data hazard, for grad_merge_all_reduce_op_handle may be + // executed before grad_merge_cond_op. + auto &grad_merge_cond_vars = result->Get( + details::kGraphVars)[i][grad_merge_cond_name]; + PADDLE_ENFORCE_EQ( + grad_merge_cond_vars.empty(), false, + platform::errors::InvalidArgument( + "Can not find Var(%s) in Place[%d] " + "Paddle Can not add GradMergeAllReduce OP for Var(%s).", + grad_merge_cond_name, i, og)); + auto &grad_merge_cond_var = grad_merge_cond_vars.back(); + auto *cond_op = grad_merge_cond_var->GeneratedOp(); + PADDLE_ENFORCE_NOT_NULL( + cond_op, + platform::errors::Fatal( + "grad_merge_cond_var(%s)'s generated op handle must not be NULL", + grad_merge_cond_name)); + + auto *dep_var = + new details::DummyVarHandle(result->CreateControlDepVar()); + result->Get(details::kGraphDepVars) + .emplace(dep_var); + cond_op->AddOutput(dep_var); + op_handle->AddInput(dep_var); + } } } @@ -650,16 +713,16 @@ void MultiDevSSAGraphBuilderBase::CreateIsolatedVarNode( } void AllReduceSSAGraphBuilder::InsertCollectiveOp( - ir::Graph *result, const std::string &p_name, + ir::Graph *result, ir::Node *node, const std::string &p_name, const std::string &g_name) const { if (IsSparseGradient(g_name)) { CreateReduceOp(result, g_name, 0); CreateBroadcastOp(result, g_name, 0); } else { #if defined(PADDLE_WITH_DGC) - CreateAllReduceOp(result, g_name, IsEncoded(p_name)); + CreateAllReduceOp(result, node, g_name, IsEncoded(p_name)); #else - CreateAllReduceOp(result, g_name); + CreateAllReduceOp(result, node, g_name); #endif } } @@ -750,7 +813,7 @@ void ReduceSSAGraphBuilder::ResetState() const { } void ReduceSSAGraphBuilder::InsertCollectiveOp( - ir::Graph *result, const std::string &p_name, + ir::Graph *result, ir::Node *node, const std::string &p_name, const std::string &g_name) const { size_t cur_device_id = GetAppropriateDeviceID({g_name}); CreateReduceOp(result, g_name, cur_device_id); @@ -1128,7 +1191,7 @@ bool AllReduceSSAGraphBuilder::IsEncoded(const std::string &p_name) const { } #endif -void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result, +void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result, ir::Node *node, const std::string &p_name, const std::string &g_name) const { // collective gradient to each device @@ -1144,7 +1207,7 @@ void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result, CreateReduceOp(result, g_name, 0); CreateBroadcastOp(result, g_name, 0); } else { - CreateAllReduceOp(result, g_name); + CreateAllReduceOp(result, node, g_name); } break; default: diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h b/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h index 97d3a40874b31c7de80da7b5fefddd6542c96d3e..32c7119ce3c4a7482a5b892b518efed80dafd98f 100644 --- a/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h @@ -66,7 +66,8 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass { virtual std::vector SortOperations(const ir::Graph &graph) const; - virtual void InsertCollectiveOp(ir::Graph *result, const std::string &p_name, + virtual void InsertCollectiveOp(ir::Graph *result, ir::Node *node, + const std::string &p_name, const std::string &g_name) const = 0; virtual bool DealWithSpecialOp(ir::Graph *result, ir::Node *node) const; @@ -96,8 +97,8 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass { bool IsSparseGradient(const std::string &og) const; - void CreateAllReduceOp(ir::Graph *result, const std::string &og, - bool is_encoded = false) const; + void CreateAllReduceOp(ir::Graph *result, ir::Node *node, + const std::string &og, bool is_encoded = false) const; void CreateBroadcastOp(ir::Graph *result, const std::string &p_name, size_t src_dev_id) const; @@ -134,7 +135,8 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass { class AllReduceSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { protected: - virtual void InsertCollectiveOp(ir::Graph *result, const std::string &p_name, + virtual void InsertCollectiveOp(ir::Graph *result, ir::Node *node, + const std::string &p_name, const std::string &g_name) const; virtual void InsertPostprocessOps(ir::Graph *result) const {} @@ -144,7 +146,8 @@ class AllReduceSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { class AsyncSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { protected: - void InsertCollectiveOp(ir::Graph *result, const std::string &p_name, + void InsertCollectiveOp(ir::Graph *result, ir::Node *node, + const std::string &p_name, const std::string &g_name) const override {} bool NeedCollectiveForGrad(const std::string &grad_name, @@ -183,7 +186,8 @@ class ReduceSSAGraphBuilder : public BalanceVarSSAGraphBuilder { protected: virtual void Init() const; - virtual void InsertCollectiveOp(ir::Graph *result, const std::string &p_name, + virtual void InsertCollectiveOp(ir::Graph *result, ir::Node *node, + const std::string &p_name, const std::string &g_name) const; virtual bool DealWithSpecialOp(ir::Graph *result, ir::Node *node) const; @@ -212,7 +216,8 @@ class DistSSAGraphBuilder : public BalanceVarSSAGraphBuilder { virtual void InsertPostprocessOps(ir::Graph *result) const; - virtual void InsertCollectiveOp(ir::Graph *result, const std::string &p_name, + virtual void InsertCollectiveOp(ir::Graph *result, ir::Node *node, + const std::string &p_name, const std::string &g_name) const; virtual void ResetState() const; diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 947a3c9455f1c71f59b8f129ea800d44282cbe61..e7a2fadf4705ee97938f05e7218f65cf324d46a1 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -847,6 +847,17 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, } } + if (graph->Has(details::kFusedVars)) { + auto &fused_vars = graph->Get(details::kFusedVars); + for (auto &fused_var : fused_vars) { + var_infos.emplace_back(); + var_infos.back() = fused_var.second; + + member_->is_persistable_.emplace(fused_var.first, + fused_var.second.persistable_); + } + } + std::unordered_map scope_map; for (auto *scope : member_->local_scopes_) { auto &local_exec_scope = scope->NewScope(); diff --git a/paddle/fluid/operators/coalesce_tensor_op.cc b/paddle/fluid/operators/coalesce_tensor_op.cc index 628657d4e49f8520731731bceaffa6315198d2bf..464d8c8d56f5c425706c01ea01d14f7ac2aed0ab 100644 --- a/paddle/fluid/operators/coalesce_tensor_op.cc +++ b/paddle/fluid/operators/coalesce_tensor_op.cc @@ -64,7 +64,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel { platform::errors::InvalidArgument( "The output variable %s of CoalesceTensor operator " "is not LoDTensor.", - in_var_names[i])); + out_var_names[i])); } auto in_tensors = context.MultiInput("Input"); @@ -123,6 +123,22 @@ class CoalesceTensorOpKernel : public framework::OpKernel { math::SetConstant set_constant; set_constant(dev_ctx, fused_tensor, static_cast(context.Attr("constant"))); + } else if (context.Attr("persist_output")) { + for (size_t i = 0; i < out_var_names.size(); ++i) { + size_t len = static_cast(out_tensors[i]->numel()); + auto sub_tensor = fused_tensor->Slice( + static_cast(offset), static_cast(offset + len)); + // some var may not persistable, or persistable var may not init + if (out_tensors[i]->IsInitialized()) { + framework::TensorCopy(*out_tensors[i], context.GetPlace(), dev_ctx, + &sub_tensor); + } + offset += + use_align + ? platform::Alignment(len * size_of_dtype, context.GetPlace()) / + size_of_dtype + : len; + } } // Make the outputs point to the continuous space. @@ -225,6 +241,9 @@ class CoalesceTensorOpMaker : public framework::OpProtoAndCheckerMaker { AddAttr("set_constant", "Whether to set the Output with a constant value.") .SetDefault(false); + AddAttr("persist_output", + "Whether to persist the original Output value.") + .SetDefault(false); AddAttr("constant", "If set_constant is true, the constant value will be used " "to set the Output.") @@ -250,7 +269,8 @@ Note that, the dtype of Input should be the same, and the dim of Input and Output should equal. The tensors of Input and Output could be the same or different. And coalesce_tensor allows copying the value of Input to Output, or -setting the Output with a constant value. +setting the Output with a constant value, or persist the original Output +value. )DOC"); } diff --git a/python/paddle/fluid/layers/control_flow.py b/python/paddle/fluid/layers/control_flow.py index 0e49c743fe3d6929100b97206dc3c2a2c40bab32..2ab807d1cf56d52ce7c8489018dc3f39282bda28 100755 --- a/python/paddle/fluid/layers/control_flow.py +++ b/python/paddle/fluid/layers/control_flow.py @@ -2188,8 +2188,11 @@ class ConditionalBlock(object): def need_append_conditional_block_grad(self, inside_block): grad_sub_block_idx = inside_block.backward_block_idx + inside_block_idx = inside_block.idx - return grad_sub_block_idx != -1 + # if inside_block have grad_block and grad_block is not itself, + # we will append conditional block grad. + return grad_sub_block_idx != -1 and grad_sub_block_idx != inside_block_idx def append_conditional_block_grad(self, parent_block, inside_block, conditional_block_op): diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index e9d48d8562927d9c77ec59366e927912492f00dc..a7d6ef871749874f2aaf50662ef2ba53158aa767 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5063,6 +5063,8 @@ class GradientMergeOptimizer(object): print("step=%d, cost=%f" % (i, cost_val[0])) """ + GRAD_MERGE_COND_NAME = "grad_merge_cond_name" + def __init__(self, inner_optimizer, k_steps=1, avg=True): if framework.in_dygraph_mode(): raise Exception( @@ -5078,6 +5080,7 @@ class GradientMergeOptimizer(object): self.k_steps = k_steps self.type = "gradient_merge" self.avg = avg + self._optimize_ops = None def _set_k_steps(self, k_steps): self.k_steps = k_steps @@ -5085,12 +5088,12 @@ class GradientMergeOptimizer(object): def _set_avg(self, avg): self.avg = avg - def minimize(self, + def backward(self, loss, startup_program=None, parameter_list=None, - no_grad_set=None): - + no_grad_set=None, + callbacks=None): assert isinstance(loss, Variable), "The loss should be an Variable." assert ( parameter_list is None @@ -5101,26 +5104,142 @@ class GradientMergeOptimizer(object): params_grads = self.inner_optimizer.backward( loss, startup_program=startup_program) + return params_grads + + def apply_optimize(self, loss, startup_program, params_grads): + program = loss.block.program + with program_guard(program, startup_program): + optimize_ops = self.apply_gradients(params_grads) + return optimize_ops + + def _is_the_backward_op(self, op): + op_maker = core.op_proto_and_checker_maker + backward = core.op_proto_and_checker_maker.OpRole.Backward + if op_maker.kOpRoleVarAttrName() in op.attr_names and \ + int(op.all_attrs()[op_maker.kOpRoleAttrName()]) == int(backward): + return True + return False + + def _remove_op_role_var(self, param, grad): + op_maker = core.op_proto_and_checker_maker + op = grad.op + assert self._is_the_backward_op(op), \ + 'grad.op={} is not the backward op which produces the grad={}' \ + .format(op, grad.name) + + block = grad.block + var_attr = op.all_attrs()[op_maker.kOpRoleVarAttrName()] + assert param.name in var_attr, \ + 'when using GradientMergeOptimizer, param={} must be in var_attr={}' \ + .format(param.name, var_attr) + assert grad.name in var_attr, \ + 'when using GradientMergeOptimizer, grad={} must be in var_attr={}' \ + .format(param.name, var_attr) + + # remove (param, grad) from op_role_var + var_attr.remove(param.name) + var_attr.remove(grad.name) + if len(var_attr) > 1: + op._set_attr(op_maker.kOpRoleVarAttrName(), var_attr) + else: + op._remove_attr(op_maker.kOpRoleVarAttrName()) + + def _add_gm_op_role_var(self, op, param, grad, cond): + grad.op = op + op_maker = core.op_proto_and_checker_maker + backward = op_maker.OpRole.Backward + + # NOTE(wangxi). When distributed, we will insert grad_merge_all_reduce_op_handle + # in multi_devices_graph_pass, which will allreduce(grad) if cond is True, else + # do nothing. + # In this way, the gradient can be merged first, and then communicate when the + # condition is met, reducing the number of communications to increase the + # speed. + op._set_attr(self.GRAD_MERGE_COND_NAME, cond.name) + op._set_attr(op_maker.kOpRoleAttrName(), backward) + op._set_attr(op_maker.kOpRoleVarAttrName(), [param.name, grad.name]) + + def _get_gm_cond_var(self, main_block): + # Add const var + k_step_var = layers.create_global_var( + name="gradient_merge_k", + shape=[1], + value=int(self.k_steps), + dtype='int32', + persistable=True, + force_cpu=True) + + zero_var = layers.create_global_var( + name="gradient_merge_zero", + shape=[1], + value=int(0), + dtype='int32', + persistable=True, + force_cpu=True) + + # Add step var & cond var + step_var = layers.create_global_var( + name="gradient_merge_step", + shape=[1], + value=int(0), + dtype='int32', + persistable=True, + force_cpu=True) + + cond_var = layers.create_global_var( + name="gradient_merge_cond", + shape=[1], + value=bool(0), + dtype='bool', + persistable=True, + force_cpu=True) + + with device_guard("cpu"): + # step_var = (step_var + 1) % k_step + layers.increment(x=step_var, value=1.0, in_place=True) + main_block.append_op( + type='elementwise_mod', + inputs={'X': step_var, + 'Y': k_step_var}, + outputs={'Out': step_var}, + attrs={'axis': -1, + 'use_mkldnn': False}) + + # cond_var = (step_var == 0) + main_block.append_op( + type='equal', + inputs={'X': step_var, + 'Y': zero_var}, + outputs={'Out': cond_var}) + + return cond_var + + def apply_gradients(self, params_grads): + main_program = default_main_program() + startup_program = default_startup_program() + main_block = main_program.global_block() + startup_block = startup_program.global_block() + + cond = self._get_gm_cond_var(main_block) #TODO(mapingshuo) support sparse embedding - for k, v in params_grads: + # step1: remove grad.op's op_role_var + for param, grad in params_grads: assert ( - v.type != core.VarDesc.VarType.SELECTED_ROWS + param.type != core.VarDesc.VarType.SELECTED_ROWS ), "SELECTED_ROWS is not supported in GradientMergeOptimizer for now" - param_to_grad = {k.name: v for (k, v) in params_grads} - - # Get startup_program and main_program - if startup_program is None: - startup_program = default_startup_program() - main_block = loss.block + self._remove_op_role_var(param, grad) - # add some vars to the main_program and startup_program - startup_block = startup_program.global_block() + param_to_grad = {k.name: v for (k, v) in params_grads} param_names = param_to_grad.keys() param_to_gradient_merge = {} - for param_name in param_names: + new_params_grads = [] + # step2: create gradient_merge var and init with 0 + # and update op_role_var + for param, grad in params_grads: + param_name = param.name param_var = main_block.var(param_name) assert (param_var is not None) gradient_merge_var = main_block.create_var( @@ -5129,6 +5248,7 @@ class GradientMergeOptimizer(object): dtype=param_var.dtype, persistable=True) param_to_gradient_merge[param_name] = gradient_merge_var + startup_gradient_merge_var = startup_block.create_var( name=param_name + "@GRAD@GradientMerge", shape=param_var.shape, @@ -5143,92 +5263,75 @@ class GradientMergeOptimizer(object): "value": float(0), }) - with framework.program_guard(main_block.program, startup_program): - # Add Var k to main prog and startup prog - gradient_merge_k = layers.create_global_var( - name="gradient_merge_k", - shape=[1], - value=int(self.k_steps), - dtype='int32', - persistable=True) + # grad_merge += grad + new_grad_op = main_block.append_op( + type="elementwise_add", + inputs={'X': grad, + 'Y': gradient_merge_var}, + outputs={'Out': gradient_merge_var}, + attrs={'axis': -1, + 'use_mkldnn': False}) + self._add_gm_op_role_var(new_grad_op, param, gradient_merge_var, + cond) + new_params_grads.append([param, gradient_merge_var]) + + def true_apply_gradient(): + cur_block_idx = main_program.current_block_idx + cur_block = main_program.current_block() + + # cur_block's forward_block & backward_block is itself + cur_block._set_forward_block_idx(cur_block_idx) + + if self.avg: + for param, new_grad in new_params_grads: + # grad /= k_steps + cur_block.append_op( + type='scale', + inputs={'X': new_grad}, + outputs={'Out': new_grad}, + attrs={ + 'scale': 1.0 / self.k_steps, + 'bias': 0.0, + 'bias_after_scale': False + }) - # Add Var step - gradient_merge_step = layers.create_global_var( - name="gradient_merge_step", - shape=[1], - value=int(0), - dtype='int32', - persistable=True) - layers.increment(x=gradient_merge_step, value=1.0, in_place=True) + for param, new_grad in new_params_grads: + # NOTE. regularization will append ops to grad.block, + # while new_grad's real block is global_block, + # but we want append regularization ops to cur_block, + # so we set new_grad.block = cur_block + new_grad.block = cur_block - # gradient merge - zero_var = layers.fill_constant( - shape=[1], dtype='float32', value=0.0) - one_var = layers.fill_constant( - shape=[1], dtype='float32', value=1.0) + self._optimize_ops = self.inner_optimizer.apply_gradients( + new_params_grads) - mod = layers.elementwise_mod(gradient_merge_step, gradient_merge_k) - with layers.control_flow.Switch() as switch: - with switch.case(mod != zero_var): - # 1. update the gradient_merge_vars - # gradient_merge_vars += gradient_vars - cur_block = main_block.program.current_block() - for param_name in param_names: - grad = param_to_grad[param_name] - grad_merge = param_to_gradient_merge[param_name] - cur_block.append_op( - type="elementwise_add", - inputs={'X': grad, - 'Y': grad_merge}, - outputs={'Out': grad_merge}, - attrs={'axis': -1, - 'use_mkldnn': False}) + # clear gradient_merge_vars + for param, new_grad in new_params_grads: + layers.fill_constant( + shape=new_grad.shape, + dtype=new_grad.dtype, + value=0.0, + out=new_grad) + + # step3. apply gradient + layers.cond(cond, true_fn=true_apply_gradient, false_fn=None) + + return self._optimize_ops + + def minimize(self, + loss, + startup_program=None, + parameter_list=None, + no_grad_set=None): + assert isinstance(loss, Variable), "The loss should be an Variable." + + params_grads = self.backward( + loss, + startup_program=startup_program, + parameter_list=parameter_list, + no_grad_set=no_grad_set) + + optimize_ops = self.apply_optimize( + loss, startup_program=startup_program, params_grads=params_grads) - with switch.default(): - # 1. update the graient_vars - # gradient_vars += gradient_merge_vars - cur_block_idx = main_block.program.current_block_idx - cur_block = main_block.program.current_block() - for param_name in param_names: - grad = param_to_grad[param_name] - grad_merge = param_to_gradient_merge[param_name] - if self.avg: - tmp_var = layers.elementwise_add(grad, grad_merge) - cur_block.append_op( - type='scale', - inputs={'X': tmp_var}, - outputs={'Out': grad}, - attrs={ - 'scale': 1.0 / self.k_steps, - 'bias': 0.0, - 'bias_after_scale': False - }) - else: - cur_block.append_op( - type="elementwise_add", - inputs={'X': grad, - 'Y': grad_merge}, - outputs={'Out': grad}, - attrs={'axis': -1, - 'use_mkldnn': False}) - - # 2. apply_optimize - target_grad_block = main_block.program._create_block( - parent_idx=cur_block.parent_idx) - target_grad_block._set_forward_block_idx(cur_block_idx) - main_block.program.current_block_idx = cur_block_idx - - optimize_ops = self.inner_optimizer.apply_optimize( - loss, - startup_program=startup_program, - params_grads=params_grads) - - # 3. clear gradient_merge_vars - for param_name in param_names: - grad_merge = param_to_gradient_merge[param_name] - layers.fill_constant( - shape=grad_merge.shape, - dtype=grad_merge.dtype, - value=0.0, - out=grad_merge) return optimize_ops, params_grads diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 2f67cdd4514d36f5f233ed80051a564b7941e886..365e8ed48473e30a4cefaf5dbafd2dd36d14cde8 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -658,6 +658,7 @@ if (WITH_DISTRIBUTE AND NOT APPLE) if(WITH_GPU) set_tests_properties(test_c_comm_init_op PROPERTIES TIMEOUT 120) set_tests_properties(test_fleet_checkpoint PROPERTIES TIMEOUT 120) + set_tests_properties(test_dist_mnist_gradient_merge PROPERTIES TIMEOUT 120) endif() endif() diff --git a/python/paddle/fluid/tests/unittests/dist_mnist_gradient_merge.py b/python/paddle/fluid/tests/unittests/dist_mnist_gradient_merge.py new file mode 100644 index 0000000000000000000000000000000000000000..66ea24e0bde2d71bff590b439797c104ce55975c --- /dev/null +++ b/python/paddle/fluid/tests/unittests/dist_mnist_gradient_merge.py @@ -0,0 +1,62 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import paddle +import paddle.fluid as fluid +from test_dist_base import TestDistRunnerBase, runtime_main +from dist_mnist import cnn_model + +DTYPE = "float32" +paddle.dataset.mnist.fetch() + +# Fix seed for test +fluid.default_startup_program().random_seed = 1 +fluid.default_main_program().random_seed = 1 + + +class TestDistMnist2x2(TestDistRunnerBase): + def get_model(self, batch_size=2): + # Input data + images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE) + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + # Train program + predict = cnn_model(images) + cost = fluid.layers.cross_entropy(input=predict, label=label) + avg_cost = fluid.layers.mean(x=cost) + + # Evaluator + batch_size_tensor = fluid.layers.create_tensor(dtype='int64') + batch_acc = fluid.layers.accuracy( + input=predict, label=label, total=batch_size_tensor) + + inference_program = fluid.default_main_program().clone() + # Optimization + opt = fluid.optimizer.MomentumOptimizer( + learning_rate=0.001, momentum=0.9) + opt = fluid.optimizer.GradientMergeOptimizer(opt, 2) + + # Reader + train_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=batch_size) + test_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=batch_size) + opt.minimize(avg_cost) + return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict + + +if __name__ == "__main__": + runtime_main(TestDistMnist2x2) diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 29ac46e81d85db122cf9bdc13e944b10660dccc9..d30de1020209c8f6837db11038ffcd785d62013b 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -15,6 +15,7 @@ from __future__ import print_function import time +import ast import unittest import os import sys @@ -373,6 +374,10 @@ class TestDistRunnerBase(object): build_stra.enable_inplace = False build_stra.memory_optimize = False + if args.fuse_all_reduce is not None: + sys.stderr.write('fuse_all_reduce={}'.format(args.fuse_all_reduce)) + build_stra.fuse_all_reduce_ops = args.fuse_all_reduce + if args.hogwild: build_stra.async_mode = True @@ -620,6 +625,11 @@ def runtime_main(test_class): type=bool, default=False) parser.add_argument('--sync_batch_norm', action='store_true') + parser.add_argument( + '--fuse_all_reduce', + required=False, + type=ast.literal_eval, + default=None) args = parser.parse_args() @@ -688,6 +698,7 @@ class TestDistBase(unittest.TestCase): self._ut4grad_allreduce = False self._use_hallreduce = False self._save_model = False + self._fuse_all_reduce = None self._setup_config() global DIST_UT_PORT @@ -971,6 +982,9 @@ class TestDistBase(unittest.TestCase): if self._enable_backward_deps: tr_cmd += " --enable_backward_deps" + if self._fuse_all_reduce is not None: + tr_cmd += " --fuse_all_reduce {}".format(self._fuse_all_reduce) + if self._gpu_fleet_api: tr_cmd += " --gpu_fleet_api" if self._use_local_sgd: diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist_gradient_merge.py b/python/paddle/fluid/tests/unittests/test_dist_mnist_gradient_merge.py new file mode 100644 index 0000000000000000000000000000000000000000..a5610caa52e19cc60556c95b824434bcdf3ec4d8 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist_gradient_merge.py @@ -0,0 +1,57 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import os +import unittest +from test_dist_base import TestDistBase + +flag_name = os.path.splitext(__file__)[0] + + +class TestDistMnistGradMerge(TestDistBase): + def _setup_config(self): + self._sync_mode = True + self._use_reduce = False + self._nccl2_mode = True + + def test_dist_train(self): + import paddle.fluid as fluid + if fluid.core.is_compiled_with_cuda(): + self.check_with_place( + "dist_mnist_gradient_merge.py", + delta=1e-5, + check_error_log=True, + log_name=flag_name) + + +class TestDistMnistGradMergeNoFuse(TestDistBase): + def _setup_config(self): + self._sync_mode = True + self._use_reduce = False + self._nccl2_mode = True + self._fuse_all_reduce = False + + def test_dist_train(self): + import paddle.fluid as fluid + if fluid.core.is_compiled_with_cuda(): + self.check_with_place( + "dist_mnist_gradient_merge.py", + delta=1e-5, + check_error_log=True, + log_name=flag_name + "_no_fuse") + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_optimizer.py b/python/paddle/fluid/tests/unittests/test_optimizer.py index 369a5bdae046f26d6cc0e20205701965e9918421..ffecec1815b1596b6366b95983b6c8e3e7e3b7b8 100644 --- a/python/paddle/fluid/tests/unittests/test_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_optimizer.py @@ -25,6 +25,7 @@ import numpy as np from paddle.fluid.backward import append_backward from paddle.fluid.framework import Program, program_guard, convert_np_dtype_to_dtype_ import paddle +paddle.enable_static() class TestOptimizer(unittest.TestCase): @@ -1011,37 +1012,34 @@ class TestGradientMergeOptimizer(unittest.TestCase): with framework.program_guard(main_program, init_program): ops, params_grads = opt.minimize(cost) - self.assertEqual(main_program.num_blocks, 4) + self.assertEqual(main_program.num_blocks, 2) # main block - self.assertEqual(len(cost.block.ops), 17) - self.assertEqual([op.type for op in cost.block.ops], [ - 'mul', 'elementwise_add', 'mean', 'fill_constant', 'mean_grad', - 'elementwise_add_grad', 'mul_grad', 'increment', 'fill_constant', - 'fill_constant', 'elementwise_mod', 'cast', 'not_equal', - 'logical_not', 'conditional_block', 'conditional_block', - 'conditional_block_grad' - ]) + self.assertEqual(len(cost.block.ops), 13) + self.assertEqual( + [op.type for op in cost.block.ops], + [ + 'mul', + 'elementwise_add', + 'mean', + 'fill_constant', + 'mean_grad', + 'elementwise_add_grad', + 'mul_grad', + 'increment', # step += 1 + 'elementwise_mod', # step %= k_steps + 'equal', # cond_var == (step == 0) + 'elementwise_add', + 'elementwise_add', + 'conditional_block', + ]) - # merge block - self.assertEqual(len(main_program.block(1).ops), 2) + # optimize block + self.assertEqual(len(main_program.block(1).ops), 6) self.assertEqual([op.type for op in main_program.block(1).ops], [ - 'elementwise_add', - 'elementwise_add', + 'scale', 'scale', 'sgd', 'sgd', 'fill_constant', 'fill_constant' ]) - # reset block - self.assertEqual(len(main_program.block(2).ops), 6) - self.assertEqual([op.type for op in main_program.block(2).ops], [ - 'elementwise_add', 'scale', 'elementwise_add', 'scale', - 'fill_constant', 'fill_constant' - ]) - - # optimize block - self.assertEqual(len(main_program.block(3).ops), 2) - self.assertEqual([op.type for op in main_program.block(3).ops], - ['sgd', 'sgd']) - class TestOptimizerDtype(unittest.TestCase): '''