diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index 615cfaa4f31a2411685652c2a7581da6f361eaf3..4eba8177c56b818d5890504c7a5c69e3e317d559 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -93,6 +93,6 @@ cc_library(build_strategy SRCS build_strategy.cc DEPS fuse_elewise_add_act_pass multi_batch_merge_pass fuse_relu_depthwise_conv_pass memory_optimize_pass lock_free_optimize_pass - alloc_continuous_space_for_grad_pass fuse_all_reduce_op_pass + alloc_continuous_space_for_grad_pass fuse_all_reduce_op_pass backward_optimizer_op_deps_pass fuse_adam_op_pass fuse_sgd_op_pass fuse_momentum_op_pass record_skip_memory_opt_vars_pass) diff --git a/paddle/fluid/framework/details/all_reduce_op_handle.cc b/paddle/fluid/framework/details/all_reduce_op_handle.cc index f1bd26e26dc970d637b7ba025c1b5291e918780c..04ab58947af8f992714fd9e8e12a7a275696250b 100644 --- a/paddle/fluid/framework/details/all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/all_reduce_op_handle.cc @@ -134,6 +134,7 @@ void AllReduceOpHandle::RunImpl() { static_cast(dtype), ncclSum); }); } + VLOG(10) << "allreduce size:" << numel * SizeOfType(lod_tensors[0]->type()); RunAllReduceFuncs(all_reduce_calls); #else PADDLE_THROW("Not compiled with CUDA"); diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index 6e8be9553a49b429554ee6c74e896ff5d53d1ee8..10cead16ea044e73c63ebba5b57915ed023ca777 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -49,6 +49,7 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { : ir::PassBuilder(), strategy_(strategy) { // Add a graph viz pass to record a graph. if (!strategy_.debug_graphviz_path_.empty()) { + VLOG(1) << "Add graph_viz_pass"; auto viz_pass = AppendPass("graph_viz_pass"); const std::string graph_path = string::Sprintf( "%s%s", strategy_.debug_graphviz_path_.c_str(), "_original_graph"); @@ -56,11 +57,12 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { } // Note(zcd): record_skip_memory_opt_vars_pass should be the first pass. + VLOG(1) << "Add record_skip_memory_opt_vars_pass"; AppendPass("record_skip_memory_opt_vars_pass"); #ifdef PADDLE_WITH_MKLDNN if (FLAGS_use_mkldnn) { - VLOG(5) << "Add mkldnn_placement_pass"; + VLOG(1) << "Add mkldnn_placement_pass"; AppendPass("mkldnn_placement_pass"); } else if (!strategy_.mkldnn_enabled_op_types_.empty()) { LOG(WARNING) @@ -75,7 +77,7 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { "Please compile with MKLDNN first to use MKLDNN"); #endif if (strategy_.enable_sequential_execution_) { - VLOG(5) << "Add sequential_execution_pass"; + VLOG(1) << "Add sequential_execution_pass"; AppendPass("sequential_execution_pass"); } @@ -86,7 +88,7 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { // Add op fusion. if (strategy.fuse_relu_depthwise_conv_) { - VLOG(5) << "Add fuse_relu_depthwise_conv_pass"; + VLOG(1) << "Add fuse_relu_depthwise_conv_pass"; AppendPass("fuse_relu_depthwise_conv_pass"); } @@ -98,19 +100,19 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { // Add automatically inplace. if (strategy_.enable_inplace_) { - VLOG(5) << "Add inplace_pass"; + VLOG(1) << "Add inplace_pass"; AppendPass("inplace_pass"); } if (strategy_.fuse_elewise_add_act_ops_) { - VLOG(5) << "Add fuse_elewise_add_act_pass"; + VLOG(1) << "Add fuse_elewise_add_act_pass"; AppendPass("fuse_elewise_add_act_pass"); } // for single card training, fuse_all_reduce_ops is unnecessary. // alloc_continuous_space_for_grad_pass should be before of MultiDevPass. if (strategy_.fuse_all_reduce_ops_) { - VLOG(5) << "Add alloc_continuous_space_for_grad_pass"; + VLOG(1) << "Add alloc_continuous_space_for_grad_pass"; AppendPass("alloc_continuous_space_for_grad_pass"); } @@ -125,11 +127,11 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { // NOTE: fuse_all_xx_ops will count the number of xx operator first, // if the number is zero, fuse_all_reduce_ops will do nothing. // Currently, only one type of optimization algorithm can be fused. - VLOG(5) << "Add fuse_adam_op_pass"; + VLOG(1) << "Add fuse_adam_op_pass"; AppendPass("fuse_adam_op_pass"); - VLOG(5) << "Add fuse_sgd_op_pass"; + VLOG(1) << "Add fuse_sgd_op_pass"; AppendPass("fuse_sgd_op_pass"); - VLOG(5) << "Add fuse_momentum_op_pass"; + VLOG(1) << "Add fuse_momentum_op_pass"; AppendPass("fuse_momentum_op_pass"); } } @@ -159,7 +161,7 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { // A side-effect of that, memory optimize cannot forsee the fetched vars // , so fetchlist should be set persistable before call the Run interface. if (strategy_.memory_optimize_) { - VLOG(5) << "Add memory_optimize_pass"; + VLOG(1) << "Add memory_optimize_pass"; AppendPass("memory_optimize_pass"); } @@ -167,7 +169,7 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { // all original and fused operators. But no operators can be enabled this // attr if putting it after MultiDevPass. if (strategy_.cache_runtime_context_) { - VLOG(5) << "Add runtime_context_cache_pass"; + VLOG(1) << "Add runtime_context_cache_pass"; AppendPass("runtime_context_cache_pass"); } @@ -176,12 +178,13 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { if (strategy_.fuse_all_reduce_ops_) { // NOTE: fuse_all_reduce_ops will count the number of all_reduce operator // first, if the number is zero, fuse_all_reduce_ops will do nothing. - VLOG(5) << "Add fuse_all_reduce_op_pass"; + VLOG(1) << "Add fuse_all_reduce_op_pass"; AppendPass("fuse_all_reduce_op_pass"); } // Add a graph print pass to record a graph with device info. if (!strategy_.debug_graphviz_path_.empty()) { + VLOG(1) << "Add multi_devices_print_pass"; auto multi_devices_print_pass = AppendPass("multi_devices_print_pass"); const std::string graph_path = string::Sprintf("%s%s", strategy_.debug_graphviz_path_.c_str(), @@ -197,16 +200,22 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { if (!strategy_.enable_parallel_graph_ && (SeqOnlyAllReduceOps(strategy_) || strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce)) { - VLOG(5) << "Add all_reduce_deps_pass"; + VLOG(1) << "Add all_reduce_deps_pass"; AppendPass("all_reduce_deps_pass"); } + if (strategy_.enable_backward_optimizer_op_deps_) { + VLOG(1) << "Add backward_op_deps_pass"; + AppendPass("backward_optimizer_op_deps_pass"); + } + if (strategy_.remove_unnecessary_lock_) { - VLOG(5) << "Add modify_op_lock_and_record_event_pass"; + VLOG(1) << "Add modify_op_lock_and_record_event_pass"; AppendPass("modify_op_lock_and_record_event_pass"); } // Verify that the graph is correct for multi-device executor. + VLOG(1) << "Add multi_devices_check_pass"; AppendPass("multi_devices_check_pass"); } @@ -215,18 +224,19 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { ir::Pass *multi_devices_pass = nullptr; if (strategy_.async_mode_) { + VLOG(1) << "Add async_multi_devices_pass"; multi_devices_pass = AppendPass("async_multi_devices_pass").get(); } else if (strategy_.is_distribution_) { - VLOG(5) + VLOG(1) << "Add dist_multi_devices_pass, multi device parameter server mode"; multi_devices_pass = AppendPass("dist_multi_devices_pass").get(); } else { if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) { - VLOG(5) << "Add all_reduce_mode_multi_devices_pass"; + VLOG(1) << "Add all_reduce_mode_multi_devices_pass"; multi_devices_pass = AppendPass("all_reduce_mode_multi_devices_pass").get(); } else if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kReduce) { - VLOG(5) << "Add reduce_mode_multi_devices_pass"; + VLOG(1) << "Add reduce_mode_multi_devices_pass"; multi_devices_pass = AppendPass("reduce_mode_multi_devices_pass").get(); } else { PADDLE_THROW("Unknown reduce strategy."); @@ -365,6 +375,7 @@ USE_PASS(multi_devices_print_pass); USE_PASS(memory_optimize_pass); USE_PASS(sequential_execution_pass); USE_PASS(all_reduce_deps_pass); +USE_PASS(backward_optimizer_op_deps_pass); USE_PASS(modify_op_lock_and_record_event_pass); USE_PASS(inplace_pass); USE_PASS(lock_free_optimize_pass); diff --git a/paddle/fluid/framework/details/build_strategy.h b/paddle/fluid/framework/details/build_strategy.h index 96a0fce12a1fd79f743381e4b4f12647b3fa5e5c..bf698edaff5151819a4953ce288f60da6466153b 100644 --- a/paddle/fluid/framework/details/build_strategy.h +++ b/paddle/fluid/framework/details/build_strategy.h @@ -80,6 +80,8 @@ struct BuildStrategy { bool fuse_all_reduce_ops_{false}; + bool enable_backward_optimizer_op_deps_{false}; + bool fuse_relu_depthwise_conv_{false}; bool sync_batch_norm_{false}; diff --git a/paddle/fluid/framework/details/fused_all_reduce_op_handle.cc b/paddle/fluid/framework/details/fused_all_reduce_op_handle.cc index b6cc9866b161fe025d1ed587b1d787fac03386c5..4f27b7acff63170958f2d2e83399279ca7b340b2 100644 --- a/paddle/fluid/framework/details/fused_all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/fused_all_reduce_op_handle.cc @@ -166,6 +166,8 @@ void FusedAllReduceOpHandle::RunImpl() { }); } + VLOG(10) << "fusedallreduce size:" << numel * SizeOfType(dtype); + this->RunAndRecordEvent([&] { if (all_reduce_calls.size() == 1UL) { // Do not use NCCLGroup when manage NCCL by per thread per device diff --git a/paddle/fluid/framework/details/op_handle_base.cc b/paddle/fluid/framework/details/op_handle_base.cc index a3676ac0ca97909330b5f626e538f46a60c014ad..b0e6a87bddeecda4f13e1081efeabb1c70be76cf 100644 --- a/paddle/fluid/framework/details/op_handle_base.cc +++ b/paddle/fluid/framework/details/op_handle_base.cc @@ -20,7 +20,7 @@ namespace framework { namespace details { std::string OpHandleBase::DebugString() const { std::stringstream ss; - ss << "("; + ss << Name() << "("; for (auto *var : inputs_) { ss << var->DebugString() << ", "; } diff --git a/paddle/fluid/framework/ir/alloc_continuous_space_for_grad_pass.cc b/paddle/fluid/framework/ir/alloc_continuous_space_for_grad_pass.cc index 12d5ad7ed8ccbe3db925ce59dacf935dad158e5c..9e1cf5832bdc11089c0aacd9b9005dfb788dc764 100644 --- a/paddle/fluid/framework/ir/alloc_continuous_space_for_grad_pass.cc +++ b/paddle/fluid/framework/ir/alloc_continuous_space_for_grad_pass.cc @@ -23,15 +23,16 @@ #include "paddle/fluid/framework/ir/graph_helper.h" #include "paddle/fluid/framework/op_registry.h" -DEFINE_uint64(fuse_parameter_memory_size, 0, // 0 KB +DEFINE_uint64(fuse_parameter_memory_size, 0, // Bytes "fuse_parameter_memory_size is up limited memory size " "of one group parameters' gradient which is the input " "of communication calling(e.g NCCLAllReduce). " "The default value is 0, it means that " "not set group according to memory_size."); DEFINE_int32( - fuse_parameter_groups_size, 3, - "fuse_parameter_groups_size is the size of one group parameters' gradient. " + fuse_parameter_groups_size, 1, + "fuse_parameter_groups_size is the up limited size of one group " + "parameters' gradient. " "The default value is a experimental result. If the " "fuse_parameter_groups_size is 1, it means that the groups size is " "the number of parameters' gradient. If the fuse_parameter_groups_size is " @@ -58,7 +59,6 @@ uint64_t GetFuseParameterMemorySize() { return FLAGS_fuse_parameter_memory_size; } -static const char kUnKnow[] = "@UNKNOW@"; static framework::proto::VarType::Type kDefaultDtype = framework::proto::VarType::Type::VarType_Type_BOOL; @@ -83,7 +83,7 @@ class AllocContinuousSpaceForGradPass : public ir::Pass { } if (params_grads.size() == 0) { - VLOG(10) << "Doesn't find gradients"; + LOG(WARNING) << "Doesn't find gradients"; return; } @@ -169,7 +169,6 @@ class AllocContinuousSpaceForGradPass : public ir::Pass { details::GroupGradsAndParams *group_grads_params) const { SetGroupAccordingToLayers(var_nodes, params_grads, group_grads_params); SetGroupAccordingToMemorySize(var_nodes, group_grads_params); - SetGroupAccordingToGroupSize(var_nodes, group_grads_params); } void SetGroupAccordingToLayers( @@ -181,7 +180,7 @@ class AllocContinuousSpaceForGradPass : public ir::Pass { for (size_t i = 0; i < params_grads.size(); ++i) { auto pos = params_grads[i].first.find_first_of("."); if (pos == std::string::npos) { - layer_params[std::string(kUnKnow)].emplace_back(i); + layer_params[params_grads[i].first].emplace_back(i); } else { layer_params[params_grads[i].first.substr(0, pos)].emplace_back(i); } @@ -190,7 +189,7 @@ class AllocContinuousSpaceForGradPass : public ir::Pass { group_grads_params->reserve(layer_params.size()); for (size_t i = 0; i < params_grads.size(); ++i) { auto pos = params_grads[i].first.find_first_of("."); - std::string key = kUnKnow; + std::string key = params_grads[i].first; if (pos != std::string::npos) { key = params_grads[i].first.substr(0, pos); } @@ -207,13 +206,31 @@ class AllocContinuousSpaceForGradPass : public ir::Pass { } VLOG(10) << "SetGroupAccordingToLayers: "; + if (VLOG_IS_ON(10)) { + PrintGroupInfo(var_nodes, group_grads_params); + } + } + + void PrintGroupInfo( + const std::unordered_map &var_nodes, + details::GroupGradsAndParams *group_grads_params) const { for (size_t i = 0; i < group_grads_params->size(); ++i) { VLOG(10) << "group " << i; std::stringstream out; - for (auto &p_g : group_grads_params->at(i)) { - out << "(" << p_g.second << ", " << p_g.first << "), "; + size_t gps_size = 0; + for (auto &g_p : group_grads_params->at(i)) { + auto iter = var_nodes.find(g_p.second); + PADDLE_ENFORCE(iter != var_nodes.end(), "%s is not found.", g_p.second); + auto shape = iter->second->Var()->GetShape(); + size_t size = framework::SizeOfType(iter->second->Var()->GetDataType()); + std::for_each(shape.begin(), shape.end(), + [&size](const int64_t &n) { size *= n; }); + gps_size += size; + out << string::Sprintf("(%s(%d), %s)", g_p.second, size, g_p.first); } - VLOG(10) << out.str(); + VLOG(10) << out.str() + << ", group size:" << group_grads_params->at(i).size() + << ", group memory size:" << gps_size; } } @@ -248,6 +265,12 @@ class AllocContinuousSpaceForGradPass : public ir::Pass { group_p_g.insert(group_p_g.end(), group_grads_params->at(j).begin(), group_grads_params->at(j).end()); ++j; + if (GetFuseParameterGroupsSize() > 1 && + group_p_g.size() > + static_cast(GetFuseParameterGroupsSize())) { + break; + } + if (local_group_memory_size >= group_memory_size) { break; } @@ -258,59 +281,9 @@ class AllocContinuousSpaceForGradPass : public ir::Pass { VLOG(10) << string::Sprintf( "SetGroupAccordingToMemorySize(memory_size: %d):", group_memory_size); - for (size_t i = 0; i < group_grads_params->size(); ++i) { - VLOG(10) << "group " << i; - std::stringstream out; - for (auto &g_p : group_grads_params->at(i)) { - auto iter = var_nodes.find(g_p.second); - PADDLE_ENFORCE(iter != var_nodes.end(), "%s is not found.", g_p.second); - auto shape = iter->second->Var()->GetShape(); - size_t size = framework::SizeOfType(iter->second->Var()->GetDataType()); - std::for_each(shape.begin(), shape.end(), - [&size](const int64_t &n) { size *= n; }); - out << string::Sprintf("(%s(%d), %s)", g_p.second, size, g_p.first); - } - VLOG(10) << out.str(); - } - } - void SetGroupAccordingToGroupSize( - const std::unordered_map &var_nodes, - details::GroupGradsAndParams *group_grads_params) const { - if (GetFuseParameterGroupsSize() == 1) { - return; - } - const int group_size = GetFuseParameterGroupsSize() == -1 - ? static_cast(group_grads_params->size()) - : GetFuseParameterGroupsSize(); - PADDLE_ENFORCE_GT(group_size, 1); - size_t groups = (group_grads_params->size() + group_size - 1) / group_size; - details::GroupGradsAndParams local_group_grads_params; - local_group_grads_params.reserve(groups); - - size_t j = 0; - for (size_t i = 0; i < groups; ++i) { - local_group_grads_params.emplace_back(); - auto &group_p_g = local_group_grads_params.back(); - group_p_g.reserve(group_size); - while (j < group_grads_params->size()) { - group_p_g.insert(group_p_g.end(), group_grads_params->at(j).begin(), - group_grads_params->at(j).end()); - ++j; - if (j % group_size == 0) break; - } - } - std::swap(*group_grads_params, local_group_grads_params); - - VLOG(10) << string::Sprintf("SetGroupAccordingToGroupSize(group_size: %d):", - group_size); - for (size_t i = 0; i < group_grads_params->size(); ++i) { - VLOG(10) << "group " << i; - std::stringstream out; - for (auto &p_g : group_grads_params->at(i)) { - out << "(" << p_g.second << ", " << p_g.first << "), "; - } - VLOG(10) << out.str(); + if (VLOG_IS_ON(10)) { + PrintGroupInfo(var_nodes, group_grads_params); } } diff --git a/paddle/fluid/framework/ir/graph.cc b/paddle/fluid/framework/ir/graph.cc index 5eba32c4f3a846183d9bbad51b77a29cfca677f0..8ba0e8b80b1c69cad8f8796974828575da343ce8 100644 --- a/paddle/fluid/framework/ir/graph.cc +++ b/paddle/fluid/framework/ir/graph.cc @@ -134,6 +134,7 @@ void Graph::ResolveHazard( ir::Node *dep_var = CreateControlDepVar(); write_op->inputs.push_back(dep_var); upstream_op->outputs.push_back(dep_var); + VLOG(10) << "add dep_var:" << dep_var->Name(); dep_var->outputs.push_back(write_op); dep_var->inputs.push_back(upstream_op); } @@ -157,6 +158,7 @@ void Graph::ResolveHazard( if (has_dep) continue; ir::Node *dep_var = CreateControlDepVar(); + VLOG(10) << "add dep_var:" << dep_var->Name(); read_op->outputs.push_back(dep_var); dep_var->inputs.push_back(read_op); write_op->inputs.push_back(dep_var); diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt b/paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt index 096428e58ab17deda14e70229ef033dbdd7bd04b..4cdb6a7d30882d095a2666ccc45ed7716954c37c 100644 --- a/paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt @@ -14,3 +14,4 @@ cc_library(sequential_execution_pass SRCS sequential_execution_pass.cc DEPS grap cc_library(fuse_all_reduce_op_pass SRCS fuse_all_reduce_op_pass.cc DEPS graph graph_helper fused_all_reduce_op_handle) cc_library(all_reduce_deps_pass SRCS all_reduce_deps_pass.cc DEPS all_reduce_op_handle graph graph_helper pass) +cc_library(backward_optimizer_op_deps_pass SRCS backward_optimizer_op_deps_pass.cc DEPS graph graph_helper pass) diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/backward_optimizer_op_deps_pass.cc b/paddle/fluid/framework/ir/multi_devices_graph_pass/backward_optimizer_op_deps_pass.cc new file mode 100644 index 0000000000000000000000000000000000000000..c7ab32a2c1a91216c6ffc9c2d8e8dc812bd38cd4 --- /dev/null +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/backward_optimizer_op_deps_pass.cc @@ -0,0 +1,223 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include + +#include "paddle/fluid/framework/details/container_cast.h" +#include "paddle/fluid/framework/details/multi_devices_helper.h" +#include "paddle/fluid/framework/details/op_handle_base.h" +#include "paddle/fluid/framework/ir/graph.h" +#include "paddle/fluid/framework/ir/graph_helper.h" +#include "paddle/fluid/framework/ir/pass.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/op_proto_maker.h" +#include "paddle/fluid/framework/scope.h" + +namespace paddle { +namespace framework { +namespace ir { + +class BackWardOpDepsPass : public ir::Pass { + protected: + void AddDep(ir::Graph* graph, details::OpHandleBase* l, + details::OpHandleBase* r) const { + auto* dep_var = new details::DummyVarHandle(graph->CreateControlDepVar()); + graph->Get(details::kGraphDepVars).emplace(dep_var); + l->AddOutput(dep_var); + r->AddInput(dep_var); + VLOG(10) << "add deps:" << l->DebugString() << " and " << r->DebugString(); + } + + void ApplyImpl(ir::Graph* graph) const override { + // NOTE: The operator nodes should be in topology order. + std::vector backward_op_handles; + std::vector all_opt_handles; + details::ParamsAndGrads params_grads; + std::vector topo_nodes = ir::TopologySortOperations(*graph); + for (auto& node : topo_nodes) { + if (!node->Op()) continue; + + GetBackWardOpHandles(node, &backward_op_handles, ¶ms_grads); + GetOptimizerOpHandles(node, &all_opt_handles); + } + + VLOG(10) << "backward_op_handles size:" << backward_op_handles.size() + << ", opt_handles size:" << all_opt_handles.size(); + + if (backward_op_handles.size() <= 1 || all_opt_handles.size() <= 1) { + VLOG(10) << "need not backward_op_deps_pass"; + return; + } + + std::vector opt_handles; + GetOptimizerHandlesRoot(all_opt_handles, &opt_handles, params_grads); + + if (opt_handles.size() <= 1) { + VLOG(10) << "need not backward_op_deps_pass"; + return; + } + + VLOG(10) << "add optimize deps"; + for (size_t i = 1; i < opt_handles.size(); ++i) { + AddDep(graph, opt_handles[i - 1], opt_handles[i]); + } + + VLOG(10) << "add deps between backward and optimze:"; + AddDep(graph, backward_op_handles[backward_op_handles.size() - 1], + opt_handles[0]); + } + + /* + * When the backward ophandles complete, the optimizer ophandle's inputs var + * are ready.Since the optimizer ophandles can be seen as graphs which each of + * them doesn't connect to each other, they can run parallelly or by a + * specified order, such as by the grads generated order. This function will + * get these graphs' root. + */ + void GetOptimizerHandlesRoot( + const std::vector& ops, + std::vector* result, + const details::ParamsAndGrads& params_grads) const { + std::unordered_set visit; + for (auto op : ops) { + if (visit.find(op) != visit.end()) { + continue; + } + + VLOG(10) << "visiting all_opt_handles:" << op->DebugString(); + + result->emplace_back(op); + visit.insert(op); + VisitChildrens(op, &visit); + } + + for (size_t i = 0; i < result->size(); i++) { + VLOG(10) << "get potential head op:" << (*result)[i]->DebugString(); + } + + // sort by param_grad order + std::unordered_map pg_order; + int order = 0; + for (auto& p_g : params_grads) { + pg_order[p_g.second] = order++; + } + + std::vector> op_handles; + for (auto op : *result) { + int order = 0; + for (auto input : op->Inputs()) { + if (dynamic_cast(input) == nullptr) continue; + + if (pg_order.find(input->Name()) == pg_order.end()) { + VLOG(10) << "not find input " << input->Name() << " in grad"; + continue; + } + + if (order < pg_order.at(input->Name())) { + order = pg_order.at(input->Name()); + } + } + op_handles.emplace_back(std::make_pair(op, order)); + } + + sort(op_handles.begin(), op_handles.end(), + [](const std::pair& left, + const std::pair& right) -> bool { + return left.second < right.second; + }); + + result->clear(); + for (auto p : op_handles) { + result->emplace_back(p.first); + } + + for (size_t i = 0; i < result->size(); i++) { + VLOG(10) << "get head op:" << (*result)[i]->DebugString(); + } + } + + void VisitChildrens(details::OpHandleBase* op, + std::unordered_set* visit) const { + for (auto out : op->Outputs()) { + for (auto* pending_op : out->PendingOps()) { + if (visit->find(pending_op) != visit->end()) { + continue; + } + + VLOG(10) << "visiting:" << pending_op->DebugString(); + + visit->insert(pending_op); + VisitChildrens(pending_op, visit); + } + } + } + + void GetBackWardOpHandles( + ir::Node* node, std::vector* backward_op_handles, + details::ParamsAndGrads* params_grads) const { + try { + bool is_bk_op = + static_cast(boost::get(node->Op()->GetAttr( + OpProtoAndCheckerMaker::OpRoleAttrName())) & + static_cast(OpRole::kBackward)); + if (!is_bk_op) return; + + // Currently, we assume that once gradient is generated, it can be + // broadcast, and each gradient is only broadcast once. + auto backward_vars = + boost::get>(node->Op()->GetNullableAttr( + OpProtoAndCheckerMaker::OpRoleVarAttrName())); + PADDLE_ENFORCE_EQ(backward_vars.size() % 2, static_cast(0)); + PADDLE_ENFORCE(node->IsWrappedBy()); + + backward_op_handles->emplace_back( + &node->Wrapper()); + + for (size_t i = 0; i < backward_vars.size(); i += 2) { + VLOG(10) << "Trainable parameter: " << backward_vars[i] + << ", gradient: " << backward_vars[i + 1]; + + params_grads->emplace_back(std::make_pair( + backward_vars[i] /*param*/, backward_vars[i + 1] /*grad*/)); + } + } catch (boost::bad_get e) { + } + } + + void GetOptimizerOpHandles( + ir::Node* node, std::vector* opt_handles) const { + try { + bool is_opt_op = + static_cast(boost::get(node->Op()->GetAttr( + OpProtoAndCheckerMaker::OpRoleAttrName())) & + static_cast(OpRole::kOptimize)); + if (!is_opt_op) return; + + opt_handles->emplace_back(&node->Wrapper()); + } catch (boost::bad_get e) { + } + } +}; +} // namespace ir +} // namespace framework +} // namespace paddle + +REGISTER_PASS(backward_optimizer_op_deps_pass, + paddle::framework::ir::BackWardOpDepsPass); diff --git a/paddle/fluid/operators/alloc_continuous_space_op.cc b/paddle/fluid/operators/alloc_continuous_space_op.cc index d4bdecff62c016a31011266a0f066076d85fcdef..85da8a827f715456340cb9d0ba689235ea47095a 100644 --- a/paddle/fluid/operators/alloc_continuous_space_op.cc +++ b/paddle/fluid/operators/alloc_continuous_space_op.cc @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/operator.h" @@ -96,6 +97,8 @@ class AllocContinuousSpaceKernel : public framework::OpKernel { // Make the outputs point to the continuous space. offset = 0; + std::stringstream ss; + ss << "alloc_space_for_vars: "; for (size_t i = 0; i < out_tensors.size(); ++i) { size_t len = static_cast(out_tensors[i]->numel()); auto dim = out_tensors[i]->dims(); @@ -105,10 +108,10 @@ class AllocContinuousSpaceKernel : public framework::OpKernel { .Resize(dim); len = Alignment(len * size_of_dtype, context.GetPlace()) / size_of_dtype; offset += len; - VLOG(10) << "alloc_space_for_vars: output(" << out_var_names[i] - << ") ,dim:(" << dim << ")" - << " Address: " << out_tensors[i]->data(); + ss << "output(" << out_var_names[i] << ") dim:(" << dim << ")" + << " address: " << out_tensors[i]->data() << ", "; } + VLOG(10) << ss.str(); } private: @@ -133,6 +136,9 @@ class AllocContinuousSpaceKernel : public framework::OpKernel { PADDLE_ENFORCE_EQ(lod_tensors.size(), var_names.size()); *numel = 0; size_t size_of_dtype = 0; + + std::stringstream ss; + ss << "alloc_space_for_vars: "; for (size_t i = 0; i < var_names.size(); ++i) { PADDLE_ENFORCE(lod_tensors[i]->IsInitialized(), "%s is not initialized.", var_names[i]); @@ -148,11 +154,13 @@ class AllocContinuousSpaceKernel : public framework::OpKernel { auto size = lod_tensors[i]->numel(); PADDLE_ENFORCE_GT(size, 0); - VLOG(10) << "alloc_space_for_vars: input(" << var_names[i] << ") ,dim:(" - << lod_tensors[i]->dims() << ")"; + ss << "input(" << var_names[i] << ") dim:(" << lod_tensors[i]->dims() + << "), "; *numel += Alignment(static_cast(size) * size_of_dtype, place) / size_of_dtype; } + + VLOG(10) << ss.str(); } }; diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index fcd4168787cc5f27e85dac2d903442b14bd46667..f0702e5a00c42b20e7621c92e866ae7033621103 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1509,6 +1509,13 @@ All parameter, weight, gradient are variables in Paddle. "fuse_all_reduce_ops", [](const BuildStrategy &self) { return self.fuse_all_reduce_ops_; }, [](BuildStrategy &self, bool b) { self.fuse_all_reduce_ops_ = b; }) + .def_property("enable_backward_optimizer_op_deps", + [](const BuildStrategy &self) { + return self.enable_backward_optimizer_op_deps_; + }, + [](BuildStrategy &self, bool b) { + self.enable_backward_optimizer_op_deps_ = b; + }) .def_property( "cache_runtime_context", [](const BuildStrategy &self) { return self.cache_runtime_context_; }, diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 7ad87ef8261a5f80364f834618b7b73f681c301b..2051207d326a595ea0503510276601c6862fdc07 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -180,7 +180,7 @@ if(WITH_DISTRIBUTE) endif() if(NOT APPLE) set_tests_properties(test_dist_mnist PROPERTIES TIMEOUT 200) - set_tests_properties(test_dist_mnist_nccl PROPERTIES TIMEOUT 200) + set_tests_properties(test_dist_mnist_nccl PROPERTIES TIMEOUT 250) set_tests_properties(test_dist_mnist_lars PROPERTIES TIMEOUT 200) set_tests_properties(test_dist_word2vec PROPERTIES TIMEOUT 200) py_test_modules(test_dist_se_resnext MODULES test_dist_se_resnext) diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 45192e40566e60773856616f5f393153a92b7050..985215f9dc08c4ec8ea4f5410b72d24a0138df6d 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -140,6 +140,9 @@ class TestDistRunnerBase(object): build_stra.enable_inplace = False build_stra.memory_optimize = False + if args.enable_backward_deps: + build_stra.enable_backward_optimizer_op_deps = True + if args.use_reduce: build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce else: @@ -274,6 +277,8 @@ def runtime_main(test_class): parser.add_argument('--trainer_id', type=int, required=False, default=0) parser.add_argument('--trainers', type=int, required=False, default=1) parser.add_argument('--nccl_comm_num', type=int, required=False, default=1) + parser.add_argument( + '--enable_backward_deps', type=bool, required=False, default=1) parser.add_argument( '--current_endpoint', type=str, required=False, default="") parser.add_argument('--sync_mode', action='store_true') @@ -354,6 +359,7 @@ class TestDistBase(unittest.TestCase): self._nccl_comm_num = 1 self._setup_config() self._after_setup_config() + self._enable_backward_deps = False def _find_free_port(self): def __free_port(): @@ -606,6 +612,10 @@ class TestDistBase(unittest.TestCase): env0 = {"FLAGS_selected_gpus": "0"} env1 = {"FLAGS_selected_gpus": "1"} + if self._enable_backward_deps: + tr0_cmd += " --enable_backward_deps 1" + tr1_cmd += " --enable_backward_deps 1" + env0.update(envs) env1.update(envs) diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist_nccl.py b/python/paddle/fluid/tests/unittests/test_dist_mnist_nccl.py index f511fd6974056e127ecb53b7404cff6c61dd53dc..65df03b402ac6caa30866471797783dcb94f7f04 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_mnist_nccl.py +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist_nccl.py @@ -58,5 +58,19 @@ class TestDistMnistNCCL2DGC(TestDistBase): self.check_with_place("dist_mnist.py", delta=1e-5) +class TestDistMnistNCCL2BackWardDeps(TestDistBase): + def _setup_config(self): + self._sync_mode = True + self._use_reduce = False + self._use_reader_alloc = False + self._nccl2_mode = True + self._enable_backward_deps = True + + def test_dist_train(self): + import paddle.fluid as fluid + if fluid.core.is_compiled_with_cuda(): + self.check_with_place("dist_mnist.py", delta=1e-5) + + if __name__ == "__main__": unittest.main()