diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index 046ec6978a84fa6eba11513860523de5a63a31d8..d4939779a2401c9828e0478f5f5de780907c767e 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -10,7 +10,10 @@ cc_library(fetch_barrier_op_handle SRCS fetch_barrier_op_handle.cc DEPS framewor cc_library(multi_devices_helper SRCS multi_devices_helper.cc DEPS graph graph_helper) cc_library(multi_devices_graph_print_pass SRCS multi_devices_graph_print_pass.cc DEPS multi_devices_helper) cc_library(multi_devices_graph_check_pass SRCS multi_devices_graph_check_pass.cc DEPS multi_devices_helper) + cc_library(alloc_continuous_space_for_grad_pass SRCS alloc_continuous_space_for_grad_pass.cc DEPS graph graph_helper) +cc_library(fuse_adam_op_pass SRCS fuse_adam_op_pass.cc fuse_optimizer_op_pass.cc DEPS graph graph_helper) +cc_library(fuse_sgd_op_pass SRCS fuse_sgd_op_pass.cc fuse_optimizer_op_pass.cc DEPS graph graph_helper) cc_library(variable_visitor SRCS variable_visitor.cc DEPS lod_tensor selected_rows) @@ -104,5 +107,7 @@ cc_library(build_strategy SRCS build_strategy.cc DEPS graph_viz_pass multi_devices_graph_pass multi_devices_graph_print_pass multi_devices_graph_check_pass fuse_elewise_add_act_pass multi_batch_merge_pass - fuse_relu_depthwise_conv_pass - memory_optimize_pass lock_free_optimize_pass alloc_continuous_space_for_grad_pass fuse_all_reduce_op_pass) + fuse_relu_depthwise_conv_pass + memory_optimize_pass lock_free_optimize_pass + alloc_continuous_space_for_grad_pass fuse_all_reduce_op_pass + fuse_adam_op_pass fuse_sgd_op_pass) diff --git a/paddle/fluid/framework/details/alloc_continuous_space_for_grad_pass.cc b/paddle/fluid/framework/details/alloc_continuous_space_for_grad_pass.cc index e195e93fb842ef71d4849aa2995788a91dafe924..8e8258ffb124e5008954a455264f5c0bc5cabc37 100644 --- a/paddle/fluid/framework/details/alloc_continuous_space_for_grad_pass.cc +++ b/paddle/fluid/framework/details/alloc_continuous_space_for_grad_pass.cc @@ -21,6 +21,7 @@ #include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/ir/graph_helper.h" #include "paddle/fluid/framework/op_registry.h" + DEFINE_uint32(fuse_parameter_memory_size, 0, // 0 KB "fuse_parameter_memory_size is up limited memory size " "of one group parameters' gradient which is the input " @@ -105,20 +106,29 @@ class AllocContinuousSpaceForGradPass : public ir::Pass { auto ele_dtype = iter->second->Var()->GetDataType(); if (dtype == kDefaultDtype) { dtype = ele_dtype; - PADDLE_ENFORCE_NE(ele_dtype, kDefaultDtype); + PADDLE_ENFORCE_NE(ele_dtype, kDefaultDtype, + "The data type should not be bool."); } - PADDLE_ENFORCE_EQ(ele_dtype, dtype); + PADDLE_ENFORCE_EQ(ele_dtype, dtype, + "The data type of input is not consistent."); } - // Create the fused variable name. + // Create a FusedVarsSet to avoid duplicating names for fused_var in other + // pass. if (!result.Has(kFusedVars)) { result.Set(kFusedVars, new FusedVars); } - const std::string prefix(kFusedVarNamePrefix); - // The fused_var_name should be unique. - auto fused_var_name = prefix + "GRAD@" + params_grads[0].second; + // the kFusedGrads is used be fuse_optimizer_op_pass. + result.Set(kFusedGrads, new FusedGrads); + + // the fused_var_name should be unique, so it appends + // params_grads.begin()->second. + auto fused_var_name = std::string(kFusedVarNamePrefix) + "@GRAD@" + + params_grads.begin()->second; + result.Get(kFusedGrads) = fused_var_name; auto &fused_var_set = result.Get(kFusedVars); - PADDLE_ENFORCE_EQ(fused_var_set.count(fused_var_name), 0); + PADDLE_ENFORCE_EQ(fused_var_set.count(fused_var_name), 0, + "%s is duplicate in FusedVars.", fused_var_name); fused_var_set.insert(fused_var_name); InitFusedVarsAndAllocSpaceForVars(places, local_scopes, vars, @@ -295,17 +305,6 @@ class AllocContinuousSpaceForGradPass : public ir::Pass { return type == proto::VarType::LOD_TENSOR; } - void AppendAllocSpaceForVarsOp(const std::vector ¶ms_name, - const std::vector &grads_name, - const std::string &fused_var_name, - BlockDesc *global_block) const { - auto op_desc = global_block->AppendOp(); - op_desc->SetType("alloc_continuous_space"); - op_desc->SetInput("Input", params_name); - op_desc->SetOutput("Output", grads_name); - op_desc->SetOutput("FusedOutput", {fused_var_name}); - } - void RecordParamsAndGrads(ir::Node *node, ParamsAndGrads *params_grads) const { try { @@ -358,6 +357,7 @@ class AllocContinuousSpaceForGradPass : public ir::Pass { } } + // Alloc continuous space for vars. std::vector grads_name; std::vector params_name; grads_name.reserve(params_grads.size()); @@ -370,7 +370,6 @@ class AllocContinuousSpaceForGradPass : public ir::Pass { AppendAllocSpaceForVarsOp(params_name, grads_name, fused_var_name, program_desc.MutableBlock(0)); - // Run Only Once Programs for (size_t i = 0; i < local_scopes.size(); ++i) { for (auto &op_desc : program_desc.Block(0).AllOps()) { auto op = OpRegistry::CreateOp(*op_desc); @@ -378,6 +377,17 @@ class AllocContinuousSpaceForGradPass : public ir::Pass { } } } + + void AppendAllocSpaceForVarsOp(const std::vector ¶ms_name, + const std::vector &grads_name, + const std::string &fused_var_name, + BlockDesc *global_block) const { + auto op_desc = global_block->AppendOp(); + op_desc->SetType("alloc_continuous_space"); + op_desc->SetInput("Input", params_name); + op_desc->SetOutput("Output", grads_name); + op_desc->SetOutput("FusedOutput", {fused_var_name}); + } }; } // namespace details diff --git a/paddle/fluid/framework/details/broadcast_op_handle.cc b/paddle/fluid/framework/details/broadcast_op_handle.cc index fdff83b92819b39974f3b2ce0848710f1ee02a41..752c932a215bad53f47f19f143a8008b66617a51 100644 --- a/paddle/fluid/framework/details/broadcast_op_handle.cc +++ b/paddle/fluid/framework/details/broadcast_op_handle.cc @@ -27,20 +27,17 @@ void BroadcastOpHandle::RunImpl() { if (places_.size() == 1) return; // The input and output may have dummy vars. - VarHandle *in_var_handle; - { - auto in_var_handles = DynamicCast(inputs_); - PADDLE_ENFORCE_EQ(in_var_handles.size(), 1UL, - "The number of input should be one."); - in_var_handle = in_var_handles[0]; - } - + auto in_var_handles = DynamicCast(inputs_); auto out_var_handles = DynamicCast(outputs_); + PADDLE_ENFORCE_EQ(in_var_handles.size(), 1UL, + "The number of input should be one."); PADDLE_ENFORCE_EQ( out_var_handles.size(), places_.size(), "The number of output should equal to the number of places."); + VarHandle *in_var_handle = in_var_handles[0]; + WaitInputVarGenerated(); std::vector var_scopes; diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index 078403f30f10072d3cddae89de4f9cea43ed956e..df69b11ec6ae3bb08ba03b749c69eb718525de4d 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -17,7 +17,6 @@ limitations under the License. */ #include #include #include - #include "paddle/fluid/framework/details/memory_optimize_helper.h" #include "paddle/fluid/framework/details/multi_devices_graph_pass.h" #include "paddle/fluid/framework/details/multi_devices_graph_print_pass.h" @@ -82,23 +81,43 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { AppendPass("inplace_pass"); } - if (strategy.fuse_elewise_add_act_ops_) { + if (strategy_.fuse_elewise_add_act_ops_) { VLOG(10) << "Add fuse_elewise_add_act_pass"; AppendPass("fuse_elewise_add_act_pass"); } // for single card training, fuse_all_reduce_ops is unnecessary. // alloc_continuous_space_for_grad_pass should be before of MultiDevPass. - if (strategy.fuse_all_reduce_ops_) { + if (strategy_.fuse_all_reduce_ops_) { VLOG(10) << "Add alloc_continuous_space_for_grad_pass"; AppendPass("alloc_continuous_space_for_grad_pass"); } + if (strategy_.fuse_all_optimizer_ops_) { + if (strategy_.reduce_ == BuildStrategy::ReduceStrategy::kReduce || + strategy_.is_distribution_) { + VLOG(3) + << "Currently, fuse_all_optimizer_ops only works under AllReduce " + "mode."; + strategy_.fuse_all_optimizer_ops_ = false; + } else { + VLOG(10) << "Add alloc_continuous_space_for_grad_pass"; + AppendPass("alloc_continuous_space_for_grad_pass"); + // NOTE: fuse_all_xx_ops will count the number of xx operator first, + // if the number is zero, fuse_all_reduce_ops will do nothing. + // Currently, only one type of optimization algorithm can be fused. + VLOG(10) << "Add fuse_adam_op_pass"; + AppendPass("fuse_adam_op_pass"); + VLOG(10) << "Add fuse_sgd_op_pass"; + AppendPass("fuse_sgd_op_pass"); + } + } + // Add a graph viz pass to record a graph. if (!strategy.debug_graphviz_path_.empty()) { auto viz_pass = AppendPass("graph_viz_pass"); const std::string graph_path = string::Sprintf( - "%s%s", strategy.debug_graphviz_path_.c_str(), "_fused_graph"); + "%s%s", strategy_.debug_graphviz_path_.c_str(), "_fused_graph"); viz_pass->Set("graph_viz_path", new std::string(graph_path)); } @@ -118,14 +137,14 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { // the de-fact IR, any reuse on Graph is meaningless. // A side-effect of that, memory optimize cannot forsee the fetched vars // , so fetchlist should be set persistable before call the Run interface. - if (strategy.memory_optimize_) { + if (strategy_.memory_optimize_) { VLOG(10) << "Add memory_optimize_pass"; AppendPass("memory_optimize_pass"); } - AppendMultiDevPass(strategy); + AppendMultiDevPass(strategy_); - if (strategy.fuse_all_reduce_ops_) { + if (strategy_.fuse_all_reduce_ops_) { // NOTE: fuse_all_reduce_ops will count the number of all_reduce operator // first, if the number is zero, fuse_all_reduce_ops will do nothing. VLOG(10) << "Add fuse_all_reduce_op_pass"; @@ -151,7 +170,7 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { AppendPass("all_reduce_deps_pass"); } - if (SeqOnlyAllReduceOps(strategy)) { + if (SeqOnlyAllReduceOps(strategy_)) { VLOG(10) << "Add all_reduce_deps_pass"; AppendPass("all_reduce_deps_pass"); } @@ -165,7 +184,7 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { // Convert graph to run on multi-devices. void AppendMultiDevPass(const BuildStrategy &strategy) { ir::Pass *multi_devices_pass = nullptr; - if (strategy_.is_distribution_) { + if (strategy.is_distribution_) { VLOG(10) << "Add dist_multi_devices_pass"; multi_devices_pass = AppendPass("dist_multi_devices_pass").get(); } else { @@ -235,17 +254,22 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph, pass->Erase(kNCCLCtxs); pass->SetNotOwned(kNCCLCtxs, nctx); #endif - } else if (pass->Type() == "fuse_all_reduce_op_pass") { + } else if (pass->Type() == "alloc_continuous_space_for_grad_pass" || + pass->Type() == "fuse_adam_op_pass" || + pass->Type() == "fuse_sgd_op_pass" || + pass->Type() == "fuse_all_reduce_op_pass") { pass->Erase(kPlaces); pass->SetNotOwned>(kPlaces, &places); pass->Erase(kLocalScopes); pass->SetNotOwned>(kLocalScopes, &local_scopes); + if (pass->Type() == "fuse_all_reduce_op_pass") { #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - platform::NCCLContextMap *nctx = use_cuda ? nccl_ctxs : nullptr; - pass->Erase(kNCCLCtxs); - pass->SetNotOwned(kNCCLCtxs, nctx); + platform::NCCLContextMap *nctx = use_cuda ? nccl_ctxs : nullptr; + pass->Erase(kNCCLCtxs); + pass->SetNotOwned(kNCCLCtxs, nctx); #endif + } } else if (pass->Type() == "alloc_continuous_space_for_grad_pass") { pass->Erase(kPlaces); pass->SetNotOwned>(kPlaces, &places); @@ -294,4 +318,6 @@ USE_PASS(inplace_pass); USE_PASS(lock_free_optimize_pass); USE_PASS(alloc_continuous_space_for_grad_pass); USE_PASS(graph_to_program_pass); +USE_PASS(fuse_adam_op_pass); +USE_PASS(fuse_sgd_op_pass); USE_PASS(fuse_all_reduce_op_pass); diff --git a/paddle/fluid/framework/details/build_strategy.h b/paddle/fluid/framework/details/build_strategy.h index 9587a6f0f9318616633a2ca8b69991cdd52b8942..85f328b7c40568cc9246fd4ecab34e8e6778439b 100644 --- a/paddle/fluid/framework/details/build_strategy.h +++ b/paddle/fluid/framework/details/build_strategy.h @@ -18,7 +18,6 @@ #include #include #include - #include "paddle/fluid/framework/ir/pass_builder.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/scope.h" @@ -76,6 +75,8 @@ struct BuildStrategy { bool fuse_elewise_add_act_ops_{false}; + bool fuse_all_optimizer_ops_{false}; + bool fuse_all_reduce_ops_{false}; bool fuse_relu_depthwise_conv_{false}; diff --git a/paddle/fluid/framework/details/fuse_adam_op_pass.cc b/paddle/fluid/framework/details/fuse_adam_op_pass.cc new file mode 100644 index 0000000000000000000000000000000000000000..0ef75e319244e2ccc63dfa3f93f0cd764cf67633 --- /dev/null +++ b/paddle/fluid/framework/details/fuse_adam_op_pass.cc @@ -0,0 +1,199 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/details/fuse_adam_op_pass.h" +#include +#include "paddle/fluid/framework/ir/graph_helper.h" +#include "paddle/fluid/framework/op_registry.h" + +namespace paddle { +namespace framework { +namespace details { + +const std::string FuseAdamOpPass::GetOpType() const { return "adam"; } + +const std::vector FuseAdamOpPass::GetAuxiliaryVarNames() const { + return {"Param", "Moment1", "Moment2", "Beta1Pow", "Beta2Pow"}; +} + +void FuseAdamOpPass::FuseOptimizerOps( + const std::unordered_map> + &aux_var_set, + const std::unordered_map &fused_vars_name, + const std::vector &adam_ops, ir::Graph *graph) const { + FuseAdamOps(aux_var_set, fused_vars_name, adam_ops, graph); + FuseScaleOps(aux_var_set.at("Beta1Pow"), fused_vars_name.at("Beta1Pow"), + adam_ops, graph); + FuseScaleOps(aux_var_set.at("Beta2Pow"), fused_vars_name.at("Beta2Pow"), + adam_ops, graph); +} + +void FuseAdamOpPass::FuseAdamOps( + const std::unordered_map> &vars_set, + const std::unordered_map &fused_vars_name, + const std::vector &adam_ops, ir::Graph *graph) const { + PADDLE_ENFORCE_GT(adam_ops.size(), static_cast(0)); + + // Check attributions + // NOTE: If new attribution is added, the following code maybe need change. + int op_role = boost::get( + adam_ops[0]->Op()->GetAttr(OpProtoAndCheckerMaker::OpRoleAttrName())); + float beta1 = boost::get(adam_ops[0]->Op()->GetAttr("beta1")); + float beta2 = boost::get(adam_ops[0]->Op()->GetAttr("beta2")); + float epsilon = boost::get(adam_ops[0]->Op()->GetAttr("epsilon")); + bool lazy_mode = boost::get(adam_ops[0]->Op()->GetAttr("lazy_mode")); + int64_t min_row_size_to_use_multithread = boost::get( + adam_ops[0]->Op()->GetAttr("min_row_size_to_use_multithread")); + for (auto &adam_op : adam_ops) { + PADDLE_ENFORCE_EQ(beta1, + boost::get(adam_op->Op()->GetAttr("beta1"))); + PADDLE_ENFORCE_EQ(beta2, + boost::get(adam_op->Op()->GetAttr("beta2"))); + PADDLE_ENFORCE_EQ(epsilon, + boost::get(adam_op->Op()->GetAttr("epsilon"))); + PADDLE_ENFORCE_EQ(lazy_mode, + boost::get(adam_op->Op()->GetAttr("lazy_mode"))); + PADDLE_ENFORCE_EQ(min_row_size_to_use_multithread, + boost::get(adam_op->Op()->GetAttr( + "min_row_size_to_use_multithread"))); + PADDLE_ENFORCE_EQ(op_role, boost::get(adam_op->Op()->GetAttr( + OpProtoAndCheckerMaker::OpRoleAttrName()))); + } + + // NOTE: fused_var is only exist in scope, so the graph doesn't have fused_var + // node. + + VLOG(10) << "Insert adam to graph "; + OpDesc adam_desc(adam_ops[0]->Op()->Block()); + adam_desc.SetType("adam"); + adam_desc.SetInput("Param", {fused_vars_name.at("Param")}); + adam_desc.SetInput("Grad", {fused_vars_name.at("Grad")}); + adam_desc.SetInput("Moment1", {fused_vars_name.at("Moment1")}); + adam_desc.SetInput("Moment2", {fused_vars_name.at("Moment2")}); + // TODO(zcd): The LearningRate, Beta1Pow, Beta2Pow should be equal. + adam_desc.SetInput("LearningRate", adam_ops[0]->Op()->Input("LearningRate")); + adam_desc.SetInput("Beta1Pow", adam_ops[0]->Op()->Input("Beta1Pow")); + adam_desc.SetInput("Beta2Pow", adam_ops[0]->Op()->Input("Beta2Pow")); + + adam_desc.SetOutput("ParamOut", {fused_vars_name.at("Param")}); + adam_desc.SetOutput("Moment1Out", {fused_vars_name.at("Moment1")}); + adam_desc.SetOutput("Moment2Out", {fused_vars_name.at("Moment2")}); + adam_desc.SetAttr("beta1", beta1); + adam_desc.SetAttr("beta2", beta2); + adam_desc.SetAttr("epsilon", epsilon); + adam_desc.SetAttr("lazy_mode", lazy_mode); + adam_desc.SetAttr("min_row_size_to_use_multithread", + min_row_size_to_use_multithread); + adam_desc.SetAttr(OpProtoAndCheckerMaker::OpRoleAttrName(), op_role); + + auto adam_node = graph->CreateOpNode(&adam_desc); + + InserInputAndOutputForOptOps(adam_ops, adam_node); +} + +void FuseAdamOpPass::FuseScaleOps(const std::vector &beta_name, + const std::string &fused_var_name, + const std::vector &adam_ops, + ir::Graph *graph) const { + PADDLE_ENFORCE_EQ(beta_name.size(), adam_ops.size()); + const std::string scale_op_name = "scale"; + + // Get the scale_ops of dealing the adam's beta var. + std::vector scale_ops; + scale_ops.reserve(beta_name.size()); + for (size_t i = 0; i < adam_ops.size(); ++i) { + auto &beta_1_pow_name = beta_name[i]; + auto beta_pow_iter = std::find_if( + adam_ops[i]->inputs.begin(), adam_ops[i]->inputs.end(), + [&beta_name, &beta_1_pow_name](ir::Node *var_node) -> bool { + return var_node->Var() && var_node->Var()->Name() == beta_1_pow_name; + }); + PADDLE_ENFORCE(beta_pow_iter != adam_ops[i]->inputs.end()); + + auto beta_pow_node = *beta_pow_iter; + auto scale_op_iter = std::find_if( + beta_pow_node->outputs.begin(), beta_pow_node->outputs.end(), + [&scale_op_name](ir::Node *op_node) -> bool { + return op_node->Op() && op_node->Op()->Type() == scale_op_name; + }); + PADDLE_ENFORCE(scale_op_iter != beta_pow_node->outputs.end()); + + scale_ops.emplace_back(*scale_op_iter); + } + PADDLE_ENFORCE_EQ(scale_ops.size(), beta_name.size()); + + // Check attributions + // NOTE: If new attribution is added, the following code maybe need change. + int op_role = boost::get( + scale_ops[0]->Op()->GetAttr(OpProtoAndCheckerMaker::OpRoleAttrName())); + float scale = boost::get(scale_ops[0]->Op()->GetAttr("scale")); + float bias = boost::get(scale_ops[0]->Op()->GetAttr("bias")); + bool bias_after_scale = + boost::get(scale_ops[0]->Op()->GetAttr("bias_after_scale")); + for (auto &scale_op : scale_ops) { + PADDLE_ENFORCE_EQ(scale, + boost::get(scale_op->Op()->GetAttr("scale"))); + PADDLE_ENFORCE_EQ(bias, boost::get(scale_op->Op()->GetAttr("bias"))); + PADDLE_ENFORCE_EQ( + bias_after_scale, + boost::get(scale_op->Op()->GetAttr("bias_after_scale"))); + PADDLE_ENFORCE_EQ(op_role, boost::get(scale_op->Op()->GetAttr( + OpProtoAndCheckerMaker::OpRoleAttrName()))); + } + + // NOTE: fused_var is only exist in scope, so the graph doesn't have fused_var + // node. + + VLOG(10) << "Insert fused scale to graph."; + OpDesc scale_desc(scale_ops[0]->Op()->Block()); + scale_desc.SetType("scale"); + scale_desc.SetInput("X", {fused_var_name}); + scale_desc.SetOutput("Out", {fused_var_name}); + scale_desc.SetAttr("scale", scale); + scale_desc.SetAttr("bias", bias); + scale_desc.SetAttr("bias_after_scale", bias_after_scale); + scale_desc.SetAttr(OpProtoAndCheckerMaker::OpRoleAttrName(), op_role); + auto scale_node = graph->CreateOpNode(&scale_desc); + + for (auto scale_op : scale_ops) { + // set inputs + scale_node->inputs.insert(scale_node->inputs.begin(), + scale_op->inputs.begin(), scale_op->inputs.end()); + for (auto &input : scale_op->inputs) { + std::replace(input->outputs.begin(), input->outputs.end(), scale_op, + scale_node); + } + // set outputs + scale_node->outputs.insert(scale_node->outputs.begin(), + scale_op->outputs.begin(), + scale_op->outputs.end()); + for (auto &output : scale_op->outputs) { + std::replace(output->inputs.begin(), output->inputs.end(), scale_op, + scale_node); + } + } + + // Delete scale_ops + for (auto &scale_op : scale_ops) { + graph->RemoveNode(scale_op); + } +} + +} // namespace details +} // namespace framework +} // namespace paddle + +REGISTER_PASS(fuse_adam_op_pass, paddle::framework::details::FuseAdamOpPass) + .RequirePassAttr(paddle::framework::details::kPlaces) + .RequirePassAttr(paddle::framework::details::kLocalScopes); diff --git a/paddle/fluid/framework/details/fuse_adam_op_pass.h b/paddle/fluid/framework/details/fuse_adam_op_pass.h new file mode 100644 index 0000000000000000000000000000000000000000..5866c37552e26d9b14fa946e119f20121ecf7cb2 --- /dev/null +++ b/paddle/fluid/framework/details/fuse_adam_op_pass.h @@ -0,0 +1,55 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include "paddle/fluid/framework/details/build_strategy.h" +#include "paddle/fluid/framework/details/fuse_optimizer_op_pass.h" +#include "paddle/fluid/framework/details/multi_devices_helper.h" +#include "paddle/fluid/framework/ir/graph.h" + +namespace paddle { +namespace framework { +namespace details { + +class FuseAdamOpPass : public FuseOptimizerOpPass { + private: + virtual const std::string GetOpType() const; + + virtual const std::vector GetAuxiliaryVarNames() const; + + // Fuse Adam Ops and Scale Ops which are used to update "Beta1Pow", "Beta2Pow" + virtual void FuseOptimizerOps( + const std::unordered_map> &vars_set, + const std::unordered_map &fused_vars_name, + const std::vector &adam_ops, ir::Graph *graph) const; + + void FuseAdamOps( + const std::unordered_map> &vars_set, + const std::unordered_map &fused_vars_name, + const std::vector &adam_ops, ir::Graph *graph) const; + + void FuseScaleOps(const std::vector &aux_var_set, + const std::string &fused_var_name, + const std::vector &adam_ops, + ir::Graph *graph) const; +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/fuse_optimizer_op_pass.cc b/paddle/fluid/framework/details/fuse_optimizer_op_pass.cc new file mode 100644 index 0000000000000000000000000000000000000000..b49f095d428a017dd1a3bed2788a048af9afa6bb --- /dev/null +++ b/paddle/fluid/framework/details/fuse_optimizer_op_pass.cc @@ -0,0 +1,240 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/details/fuse_optimizer_op_pass.h" +#include +#include +#include "paddle/fluid/framework/ir/graph_helper.h" +#include "paddle/fluid/framework/op_registry.h" + +namespace paddle { +namespace framework { +namespace details { + +void FuseOptimizerOpPass::ApplyImpl(ir::Graph *graph) const { + ir::Graph &result = *graph; + + auto &places = Get>(kPlaces); + auto &local_scopes = Get>(kLocalScopes); + + const std::string fuse_op_type = GetOpType(); + const std::vector aux_var_names = GetAuxiliaryVarNames(); + + // Step 1: Get the specified op and auxiliary variables. + std::vector topo_nodes = ir::TopologySortOperations(result); + std::unordered_map> aux_var_set; + std::vector opt_ops; + for (auto &node : topo_nodes) { + GetSpecifiedOpsAndVars(fuse_op_type, aux_var_names, node, &opt_ops, + &aux_var_set); + } + + VLOG(10) << "Find " << fuse_op_type << " operators: " << opt_ops.size(); + if (opt_ops.size() == 0) { + return; + } + + if (result.Has(kFusedOptType)) { + VLOG(10) + << "Currently only support fusing one type optimizer op. Has fused " + << result.Get(kFusedOptType); + return; + } else { + result.Set(kFusedOptType, new FusedOptType); + } + result.Get(kFusedOptType) = fuse_op_type; + + // Step 2: Insert fused_var_name to FusedVars, and the FusedVars need be + // initialized in scopes before execution. + if (!result.Has(kFusedVars)) { + result.Set(kFusedVars, new FusedVars); + } + std::unordered_map fused_vars_name; + fused_vars_name.reserve(aux_var_names.size() + 1); + auto &fused_var_set = result.Get(kFusedVars); + const std::string prefix(kFusedVarNamePrefix); + // NOTE: the fused_var_name should be unique. + for (auto &var_name : aux_var_names) { + auto fused_var_name = prefix + "_" + fuse_op_type + "_" + var_name + "_" + + aux_var_set[var_name][0]; + VLOG(10) << fused_var_name; + fused_vars_name.emplace(var_name, fused_var_name); + PADDLE_ENFORCE_EQ(fused_var_set.count(fused_var_name), 0); + fused_var_set.insert(fused_var_name); + } + + // Step 3: Get the fused Gradient's name + auto ¶ms_grads = result.Get(kParamsAndGrads); + if (!result.Has(kFusedGrads)) { + PADDLE_THROW( + "The alloc_continuous_space_for_grad_pass should be called before this " + "pass."); + } + auto &fused_grad = result.Get(kFusedGrads); + auto &fused_vars = result.Get(kFusedVars); + auto iter = std::find(fused_vars.begin(), fused_vars.end(), fused_grad); + PADDLE_ENFORCE(iter != fused_vars.end(), "Not find the fused_grad."); + fused_vars_name.emplace("Grad", fused_grad); + + // Step 4: Sort the parameters and auxiliary variables according + // to parameters' name to make variables' name correspond correctly. + PADDLE_ENFORCE(result.Has(kParamsAndGrads), "Does't find kParamsAndGrads."); + PADDLE_ENFORCE_EQ(params_grads.size(), aux_var_set.begin()->second.size(), + "The size of params_grads and aux_var_set are not equal."); + SortParametersAndAuxVars(params_grads, &aux_var_set, &opt_ops); + + // Step 5: Alloc continuous space for Parameters and AuxiliaryVar(e.g. + // Moment1, Moment2, Beta1Pow, Beta2Pow) of all the optimizer ops separately. + InitFusedVarsAndAllocSpaceForVars(places, local_scopes, aux_var_names, + aux_var_set, fused_vars_name); + + // Step 6: Fuse optimizer Ops and Scale Ops + FuseOptimizerOps(aux_var_set, fused_vars_name, opt_ops, &result); + + // Step 7: Remove optimizer Ops + for (auto &opt_op : opt_ops) { + graph->RemoveNode(opt_op); + } +} + +void FuseOptimizerOpPass::InitFusedVarsAndAllocSpaceForVars( + const std::vector &places, + const std::vector &local_scopes, + const std::vector &aux_var_names, + const std::unordered_map> + &aux_var_set, + const std::unordered_map &fused_vars_name) const { + VLOG(10) << "Init FusedVars."; + // Alloc parameters and auxiliary vars in the respective scope. + size_t idx = local_scopes.size(); + for (auto iter = local_scopes.rbegin(); iter != local_scopes.rend(); + ++iter, --idx) { + auto &scope = *iter; + for (auto &var_name : aux_var_names) { + auto fused_var_name = fused_vars_name.at(var_name); + VLOG(10) << "Init " << fused_var_name; + PADDLE_ENFORCE(scope->FindVar(fused_var_name) == nullptr, + "%s has exist in scope[%d]", fused_var_name, idx); + scope->Var(fused_var_name)->GetMutable(); + } + } + + ProgramDesc program_desc; + auto *global_block = program_desc.MutableBlock(0); + for (auto &var_name : aux_var_names) { + AppendAllocContinuousSpace(aux_var_set.at(var_name), + fused_vars_name.at(var_name), true, + global_block); + } + + for (size_t i = 0; i < local_scopes.size(); ++i) { + for (auto &op_desc : global_block->AllOps()) { + auto op = OpRegistry::CreateOp(*op_desc); + op->Run(*local_scopes[i], places[i]); + } + } +} + +void FuseOptimizerOpPass::SortParametersAndAuxVars( + const std::vector> ¶ms_grads, + std::unordered_map> *aux_vars_set, + std::vector *ops) const { + PADDLE_ENFORCE_NE(aux_vars_set->count("Param"), static_cast(0)); + auto ¶m_vec = aux_vars_set->at("Param"); + + std::vector param_sort_idx; + param_sort_idx.reserve(param_vec.size()); + + for (auto &p_g : params_grads) { + auto iter = std::find(param_vec.begin(), param_vec.end(), p_g.first); + PADDLE_ENFORCE(iter != param_vec.end()); + auto idx = std::distance(param_vec.begin(), iter); + param_sort_idx.emplace_back(idx); + } + + for (auto &aux_vars : *aux_vars_set) { + std::vector sorted_vars; + sorted_vars.reserve(aux_vars.second.size()); + for (size_t i = 0; i < aux_vars.second.size(); ++i) { + sorted_vars.emplace_back(aux_vars.second.at(param_sort_idx[i])); + } + std::swap(aux_vars.second, sorted_vars); + + std::stringstream out; + for (auto &var_name : aux_vars.second) { + out << var_name << " "; + } + VLOG(10) << aux_vars.first << ": " << out.str(); + } + + std::vector sorted_ops; + sorted_ops.reserve(ops->size()); + for (size_t i = 0; i < ops->size(); ++i) { + sorted_ops.emplace_back(ops->at(param_sort_idx[i])); + } + std::swap(*ops, sorted_ops); +} + +void FuseOptimizerOpPass::GetSpecifiedOpsAndVars( + const std::string &op_type, const std::vector &aux_vars_name, + ir::Node *node, std::vector *ops, + std::unordered_map> *aux_args_name) + const { + if (node->Op()->Type() != op_type) return; + + for (auto &var_n : aux_vars_name) { + auto arg_names = node->Op()->Input(var_n); + PADDLE_ENFORCE_EQ(arg_names.size(), static_cast(1)); + (*aux_args_name)[var_n].emplace_back(arg_names[0]); + VLOG(10) << var_n << ", " << arg_names[0]; + } + ops->emplace_back(node); +} + +void FuseOptimizerOpPass::AppendAllocContinuousSpace( + const std::vector &args, const std::string &out_arg, + bool copy_data, BlockDesc *global_block) const { + auto op_desc = global_block->AppendOp(); + op_desc->SetType("alloc_continuous_space"); + op_desc->SetInput("Input", args); + op_desc->SetOutput("Output", args); + op_desc->SetOutput("FusedOutput", {out_arg}); + op_desc->SetAttr("copy_data", copy_data); + op_desc->SetAttr("check_name", true); +} + +void FuseOptimizerOpPass::InserInputAndOutputForOptOps( + const std::vector &opt_ops, ir::Node *opt_node) const { + std::unordered_set inputs; + std::unordered_set outputs; + for (auto opt_op : opt_ops) { + // set inputs + inputs.insert(opt_op->inputs.begin(), opt_op->inputs.end()); + for (auto &input : opt_op->inputs) { + replace(input->outputs.begin(), input->outputs.end(), opt_op, opt_node); + } + // set outputs + outputs.insert(opt_op->outputs.begin(), opt_op->outputs.end()); + for (auto &output : opt_op->outputs) { + replace(output->inputs.begin(), output->inputs.end(), opt_op, opt_node); + } + } + opt_node->inputs.insert(opt_node->inputs.begin(), inputs.begin(), + inputs.end()); + opt_node->outputs.insert(opt_node->outputs.begin(), outputs.begin(), + outputs.end()); +} +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/fuse_optimizer_op_pass.h b/paddle/fluid/framework/details/fuse_optimizer_op_pass.h new file mode 100644 index 0000000000000000000000000000000000000000..0240f1594d7ef9d855eb6e96e8e8a32ee1d957ba --- /dev/null +++ b/paddle/fluid/framework/details/fuse_optimizer_op_pass.h @@ -0,0 +1,75 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include "paddle/fluid/framework/details/build_strategy.h" +#include "paddle/fluid/framework/details/multi_devices_helper.h" +#include "paddle/fluid/framework/ir/graph.h" + +namespace paddle { +namespace framework { +namespace details { + +class FuseOptimizerOpPass : public ir::Pass { + protected: + void ApplyImpl(ir::Graph *graph) const override; + + protected: + virtual void SortParametersAndAuxVars( + const std::vector> ¶ms_grads, + std::unordered_map> *aux_var_set, + std::vector *ops) const; + + void InserInputAndOutputForOptOps(const std::vector &opt_ops, + ir::Node *opt_node) const; + + private: + virtual const std::string GetOpType() const = 0; + + virtual const std::vector GetAuxiliaryVarNames() const = 0; + + virtual void FuseOptimizerOps( + const std::unordered_map> &vars_set, + const std::unordered_map &fused_vars_name, + const std::vector &adam_ops, ir::Graph *graph) const = 0; + + void GetSpecifiedOpsAndVars( + const std::string &op_type, const std::vector &aux_vars_name, + ir::Node *node, std::vector *ops, + std::unordered_map> *aux_args_name) + const; + + void AppendAllocContinuousSpace(const std::vector &args, + const std::string &out_arg, bool copy_data, + BlockDesc *global_block) const; + + void InitFusedVarsAndAllocSpaceForVars( + const std::vector &places, + const std::vector &local_scopes, + const std::vector &aux_var_names, + const std::unordered_map> + &aux_var_set, + const std::unordered_map &fused_vars_name) + const; +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/fuse_sgd_op_pass.cc b/paddle/fluid/framework/details/fuse_sgd_op_pass.cc new file mode 100644 index 0000000000000000000000000000000000000000..f91c21e3cc869de1a6d67146eb99f27a2ca5497c --- /dev/null +++ b/paddle/fluid/framework/details/fuse_sgd_op_pass.cc @@ -0,0 +1,74 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/details/fuse_sgd_op_pass.h" +#include +#include "paddle/fluid/framework/ir/graph_helper.h" +#include "paddle/fluid/framework/op_registry.h" + +namespace paddle { +namespace framework { +namespace details { + +const std::string FuseSgdOpPass::GetOpType() const { return "sgd"; } + +const std::vector FuseSgdOpPass::GetAuxiliaryVarNames() const { + return {"Param"}; +} + +void FuseSgdOpPass::FuseOptimizerOps( + const std::unordered_map> + &aux_var_set, + const std::unordered_map &fused_vars_name, + const std::vector &sgd_ops, ir::Graph *graph) const { + FuseSgdOps(aux_var_set, fused_vars_name, sgd_ops, graph); +} + +void FuseSgdOpPass::FuseSgdOps( + const std::unordered_map> &vars_set, + const std::unordered_map &fused_vars_name, + const std::vector &sgd_ops, ir::Graph *graph) const { + PADDLE_ENFORCE_GT(sgd_ops.size(), static_cast(0)); + + // NOTE: fused_var is only exist in scope, so the graph doesn't have fused_var + // node. + + int op_role = boost::get( + sgd_ops[0]->Op()->GetAttr(OpProtoAndCheckerMaker::OpRoleAttrName())); + VLOG(10) << "Insert sgd to graph "; + // Add fused scale + OpDesc Sgd_desc(sgd_ops[0]->Op()->Block()); + Sgd_desc.SetType("sgd"); + Sgd_desc.SetInput("Param", {fused_vars_name.at("Param")}); + Sgd_desc.SetInput("Grad", {fused_vars_name.at("Grad")}); + Sgd_desc.SetOutput("ParamOut", {fused_vars_name.at("Param")}); + + // TODO(zcd): The LearningRate, Beta1Pow, Beta2Pow should be equal. + Sgd_desc.SetInput("LearningRate", sgd_ops[0]->Op()->Input("LearningRate")); + + // NOTE: multi_devices_pass requires that every op should have a role. + Sgd_desc.SetAttr(OpProtoAndCheckerMaker::OpRoleAttrName(), op_role); + + auto sgd_node = graph->CreateOpNode(&Sgd_desc); + + InserInputAndOutputForOptOps(sgd_ops, sgd_node); +} + +} // namespace details +} // namespace framework +} // namespace paddle + +REGISTER_PASS(fuse_sgd_op_pass, paddle::framework::details::FuseSgdOpPass) + .RequirePassAttr(paddle::framework::details::kPlaces) + .RequirePassAttr(paddle::framework::details::kLocalScopes); diff --git a/paddle/fluid/framework/details/fuse_sgd_op_pass.h b/paddle/fluid/framework/details/fuse_sgd_op_pass.h new file mode 100644 index 0000000000000000000000000000000000000000..b3aa6a203b726a5a1540ce533c0305d7f579d4a9 --- /dev/null +++ b/paddle/fluid/framework/details/fuse_sgd_op_pass.h @@ -0,0 +1,50 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include "paddle/fluid/framework/details/build_strategy.h" +#include "paddle/fluid/framework/details/fuse_optimizer_op_pass.h" +#include "paddle/fluid/framework/details/multi_devices_helper.h" +#include "paddle/fluid/framework/ir/graph.h" + +namespace paddle { +namespace framework { +namespace details { + +class FuseSgdOpPass : public FuseOptimizerOpPass { + private: + virtual const std::string GetOpType() const; + + virtual const std::vector GetAuxiliaryVarNames() const; + + // Fuse Sgd Ops + virtual void FuseOptimizerOps( + const std::unordered_map> &vars_set, + const std::unordered_map &fused_vars_name, + const std::vector &sgd_ops, ir::Graph *graph) const; + + void FuseSgdOps( + const std::unordered_map> &vars_set, + const std::unordered_map &fused_vars_name, + const std::vector &sgd_ops, ir::Graph *graph) const; +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/fused_all_reduce_op_handle.cc b/paddle/fluid/framework/details/fused_all_reduce_op_handle.cc index 644cd4e15083519d6c685ae3e6a0737692018a07..a57d670f118f2eb0bdcbeb7ed080729e4f9e4f2b 100644 --- a/paddle/fluid/framework/details/fused_all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/fused_all_reduce_op_handle.cc @@ -24,6 +24,19 @@ namespace paddle { namespace framework { namespace details { +// Note(zcd): Addresses should be aligned, otherwise, the results may have +// diff. +static size_t Alignment(size_t size, const platform::Place &place) { + // Allow to allocate the minimum chunk size is 4 KB. + size_t alignment = 1 << 12; + if (platform::is_gpu_place(place)) { + // Allow to allocate the minimum chunk size is 256 B. + alignment = 1 << 8; + } + size_t remaining = size % alignment; + return remaining == 0 ? size : size + (alignment - remaining); +} + typedef std::vector>> GradientAndLoDTensor; @@ -111,10 +124,11 @@ void FusedAllReduceOpHandle::RunImpl() { return grad1.second->data() < grad2.second->data(); }); + size_t size_of_dtype = framework::SizeOfType(dtype); for (size_t k = 1; k < g_tensor.size(); ++k) { const void *cur_address = g_tensor.at(k - 1).second->data(); int64_t len = g_tensor.at(k - 1).second->numel(); - auto offset = len * framework::SizeOfType(dtype); + auto offset = Alignment(len * size_of_dtype, places_[0]); void *infer_next_address = reinterpret_cast( reinterpret_cast(cur_address) + offset); const void *next_address = g_tensor.at(k).second->data(); @@ -228,18 +242,21 @@ void FusedAllReduceOpHandle::GetDTypeAndNumel( const std::vector> &grad_tensor, proto::VarType::Type *dtype, int64_t *numel) const { *numel = 0; + size_t size_of_dtype = 0; for (size_t i = 0; i < grad_tensor.size(); ++i) { - // Get element number - int64_t len = grad_tensor.at(i).second->numel(); - PADDLE_ENFORCE_GT(len, 0); - *numel += len; - // Get dtype auto ele_type = grad_tensor.at(i).second->type(); if (i == 0) { *dtype = ele_type; + size_of_dtype = framework::SizeOfType(ele_type); } PADDLE_ENFORCE_EQ(ele_type, *dtype); + + // Get element number + int64_t len = grad_tensor.at(i).second->numel(); + PADDLE_ENFORCE_GT(len, 0); + // Alignment(len) + *numel += Alignment(len * size_of_dtype, places_[0]) / size_of_dtype; } } diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.h b/paddle/fluid/framework/details/multi_devices_graph_pass.h index 884089df38de14ff65c9f766ea4c89dbc6721ce9..611693fc7c241f0afed39ab86390df69b9cf4797 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.h +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.h @@ -20,7 +20,6 @@ #include #include #include - #include "paddle/fluid/framework/details/build_strategy.h" #include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/ir/graph.h" @@ -34,6 +33,10 @@ namespace framework { class Scope; namespace details { +constexpr char kLossVarName[] = "loss_var_name"; +constexpr char kStrategy[] = "strategy"; +constexpr char kNRanks[] = "nranks"; + class MultiDevSSAGraphBuilderBase : public ir::Pass { protected: void ApplyImpl(ir::Graph *graph) const override; diff --git a/paddle/fluid/framework/details/multi_devices_helper.h b/paddle/fluid/framework/details/multi_devices_helper.h index ab5e099023382c4e28a9613d321ea8dc182d3534..6e6ef074db3450ebbb5567743b908e0aee382c27 100644 --- a/paddle/fluid/framework/details/multi_devices_helper.h +++ b/paddle/fluid/framework/details/multi_devices_helper.h @@ -20,7 +20,6 @@ #include #include #include - #include "paddle/fluid/framework/details/op_handle_base.h" #include "paddle/fluid/framework/details/var_handle.h" @@ -41,22 +40,25 @@ namespace details { // `std::vector` is the version of varaibles. typedef std::vector>> GraphVars; -const char kGraphVars[] = "vars"; - -// aux variables to represent dependency. Useful to resolve data hazard. -typedef std::unordered_set GraphDepVars; -const char kGraphDepVars[] = "dep_vars"; +constexpr char kGraphVars[] = "vars"; -constexpr char kNCCLCtxs[] = "nccl_ctxs"; - -constexpr char kLossVarName[] = "loss_var_name"; constexpr char kPlaces[] = "places"; constexpr char kLocalScopes[] = "local_scopes"; -constexpr char kStrategy[] = "strategy"; -constexpr char kNRanks[] = "nranks"; +constexpr char kNCCLCtxs[] = "nccl_ctxs"; + +// aux variables to represent dependency. Useful to resolve data hazard. +typedef std::unordered_set GraphDepVars; +constexpr char kGraphDepVars[] = "dep_vars"; typedef std::unordered_set FusedVars; constexpr char kFusedVars[] = "fused_vars"; +constexpr char kFusedVarNamePrefix[] = "@FUSEDVAR@"; + +typedef std::string FusedOptType; +constexpr char kFusedOptType[] = "fused_opt_type"; + +typedef std::string FusedGrads; +constexpr char kFusedGrads[] = "fused_gradients"; typedef std::vector> ParamsAndGrads; constexpr char kParamsAndGrads[] = "params_grads"; @@ -65,8 +67,6 @@ typedef std::vector>> GroupGradsAndParams; constexpr char kGroupGradsAndParams[] = "group_grads_params"; -constexpr char kFusedVarNamePrefix[] = "@FUSEDVAR@"; - } // namespace details } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/tensor.cc b/paddle/fluid/framework/tensor.cc index ef096c2b810187c50fbcde7d93d9e5a2ecd8b0f3..ea7f8c496a9fc3ff78fce06b69fb21e44e5be9ee 100644 --- a/paddle/fluid/framework/tensor.cc +++ b/paddle/fluid/framework/tensor.cc @@ -70,7 +70,7 @@ Tensor& Tensor::ShareDataWith(const Tensor& src) { return *this; } -Tensor Tensor::Slice(int begin_idx, int end_idx) const { +Tensor Tensor::Slice(int64_t begin_idx, int64_t end_idx) const { check_memory_size(); PADDLE_ENFORCE_GE(begin_idx, 0, "The start row index must be greater than 0."); diff --git a/paddle/fluid/framework/tensor.h b/paddle/fluid/framework/tensor.h index a3c1063ce95a525a9584629091e37d8d00226e0e..0fa76f943ec1417dc712771565f7ff2b263e6365 100644 --- a/paddle/fluid/framework/tensor.h +++ b/paddle/fluid/framework/tensor.h @@ -133,7 +133,7 @@ class Tensor { * @param[in] end_idx The index of the end row(exclusive) to slice. * The index number begins from 0. */ - Tensor Slice(int begin_idx, int end_idx) const; + Tensor Slice(int64_t begin_idx, int64_t end_idx) const; platform::Place place() const { PADDLE_ENFORCE_NOT_NULL( diff --git a/paddle/fluid/operators/alloc_continuous_space_op.cc b/paddle/fluid/operators/alloc_continuous_space_op.cc index df0e9911cf7186e952cfd7fbf7f43889e9098c84..d4bdecff62c016a31011266a0f066076d85fcdef 100644 --- a/paddle/fluid/operators/alloc_continuous_space_op.cc +++ b/paddle/fluid/operators/alloc_continuous_space_op.cc @@ -65,7 +65,8 @@ class AllocContinuousSpaceKernel : public framework::OpKernel { // Get numel and dtype size_t numel = 0; auto dtype = kDefaultDtype; - GetMemSizeAndDtype(in_tensors, in_var_names, &numel, &dtype); + GetMemSizeAndDtype(in_tensors, in_var_names, &numel, &dtype, + context.GetPlace()); // Alloc the continuous space auto fused_tensor = context.Output("FusedOutput"); @@ -74,14 +75,18 @@ class AllocContinuousSpaceKernel : public framework::OpKernel { // Init the continuous space auto out_tensors = context.MultiOutput("Output"); - int64_t offset = 0; + size_t offset = 0; + size_t size_of_dtype = framework::SizeOfType(dtype); if (context.Attr("copy_data")) { for (size_t i = 0; i < in_var_names.size(); ++i) { - int64_t len = out_tensors[i]->numel(); - auto sub_tensor = fused_tensor->Slice(offset, offset + len); - offset += len; - framework::TensorCopy(*out_tensors[i], context.GetPlace(), dev_ctx, + size_t len = static_cast(in_tensors[i]->numel()); + auto sub_tensor = fused_tensor->Slice( + static_cast(offset), static_cast(offset + len)); + framework::TensorCopy(*in_tensors[i], context.GetPlace(), dev_ctx, &sub_tensor); + + offset += + Alignment(len * size_of_dtype, context.GetPlace()) / size_of_dtype; } } else if (context.Attr("set_constant")) { math::SetConstant set_constant; @@ -92,11 +97,13 @@ class AllocContinuousSpaceKernel : public framework::OpKernel { // Make the outputs point to the continuous space. offset = 0; for (size_t i = 0; i < out_tensors.size(); ++i) { - int64_t len = out_tensors[i]->numel(); + size_t len = static_cast(out_tensors[i]->numel()); auto dim = out_tensors[i]->dims(); out_tensors[i] - ->ShareDataWith(fused_tensor->Slice(offset, offset + len)) + ->ShareDataWith(fused_tensor->Slice( + static_cast(offset), static_cast(offset + len))) .Resize(dim); + len = Alignment(len * size_of_dtype, context.GetPlace()) / size_of_dtype; offset += len; VLOG(10) << "alloc_space_for_vars: output(" << out_var_names[i] << ") ,dim:(" << dim << ")" @@ -104,12 +111,28 @@ class AllocContinuousSpaceKernel : public framework::OpKernel { } } + private: + // Note(zcd): Addresses should be aligned, otherwise, the results may have + // diff. + size_t Alignment(size_t size, const platform::Place &place) const { + // Allow to allocate the minimum chunk size is 4 KB. + size_t alignment = 1 << 12; + if (platform::is_gpu_place(place)) { + // Allow to allocate the minimum chunk size is 256 B. + alignment = 1 << 8; + } + size_t remaining = size % alignment; + return remaining == 0 ? size : size + (alignment - remaining); + } + void GetMemSizeAndDtype( const std::vector &lod_tensors, const std::vector var_names, size_t *numel, - framework::proto::VarType::Type *dtype) const { + framework::proto::VarType::Type *dtype, + const platform::Place &place) const { PADDLE_ENFORCE_EQ(lod_tensors.size(), var_names.size()); *numel = 0; + size_t size_of_dtype = 0; for (size_t i = 0; i < var_names.size(); ++i) { PADDLE_ENFORCE(lod_tensors[i]->IsInitialized(), "%s is not initialized.", var_names[i]); @@ -119,6 +142,7 @@ class AllocContinuousSpaceKernel : public framework::OpKernel { PADDLE_ENFORCE_NE(p_dtype, kDefaultDtype, "%s's type should not be %s.", var_names[i], kDefaultDtype); *dtype = p_dtype; + size_of_dtype = framework::SizeOfType(p_dtype); } PADDLE_ENFORCE_EQ(p_dtype, *dtype, "Input vars is not equal."); @@ -126,7 +150,8 @@ class AllocContinuousSpaceKernel : public framework::OpKernel { PADDLE_ENFORCE_GT(size, 0); VLOG(10) << "alloc_space_for_vars: input(" << var_names[i] << ") ,dim:(" << lod_tensors[i]->dims() << ")"; - *numel += size; + *numel += Alignment(static_cast(size) * size_of_dtype, place) / + size_of_dtype; } } }; diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index f62531c7bbd4222de89b0a65efef4c2f13878b06..fa978f1c99b144708c660b537142fb56354c9e6b 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1282,6 +1282,15 @@ All parameter, weight, gradient are variables in Paddle. it will save GPU memory and may make the execution faster. This options is only available in GPU devices. Default False)DOC") + .def_property("fuse_all_optimizer_ops", + [](const BuildStrategy &self) { + return self.fuse_all_optimizer_ops_; + }, + [](BuildStrategy &self, bool b) { + PADDLE_ENFORCE(!self.IsFinalized(), + "BuildStrategy is finlaized."); + self.fuse_all_optimizer_ops_ = b; + }) .def_property( "sync_batch_norm", [](const BuildStrategy &self) { return self.sync_batch_norm_; }, diff --git a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py index 61fd9af1275865f2d03e759199c219b36d3a0b5b..18ed02a72275437fa6106e57c0383e17647d9700 100644 --- a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py +++ b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py @@ -43,6 +43,7 @@ class TestParallelExecutorBase(unittest.TestCase): use_ir_memory_optimize=True, enable_inplace=True, fuse_elewise_add_act_ops=False, + fuse_all_optimizer_ops=False, fuse_all_reduce_ops=False, fuse_relu_depthwise_conv=False, optimizer=fluid.optimizer.Adam, @@ -81,6 +82,7 @@ class TestParallelExecutorBase(unittest.TestCase): build_strategy.fuse_elewise_add_act_ops = fuse_elewise_add_act_ops build_strategy.fuse_relu_depthwise_conv = fuse_relu_depthwise_conv build_strategy.memory_optimize = False if memory_opt else use_ir_memory_optimize + build_strategy.fuse_all_optimizer_ops = fuse_all_optimizer_ops build_strategy.fuse_all_reduce_ops = fuse_all_reduce_ops # python memory optimization is conflict with inplace pass. # Use ir graph memory optimization after inplace pass is the correct way. diff --git a/python/paddle/fluid/tests/unittests/test_alloc_continuous_space_op.py b/python/paddle/fluid/tests/unittests/test_alloc_continuous_space_op.py index 9d5fe114bad2b2bae73cf18e17ebd7af288a91da..29eb0166b771bbea5509de8b7714bc4608a07cd1 100644 --- a/python/paddle/fluid/tests/unittests/test_alloc_continuous_space_op.py +++ b/python/paddle/fluid/tests/unittests/test_alloc_continuous_space_op.py @@ -16,8 +16,10 @@ from __future__ import print_function import unittest import numpy as np - from op_test import OpTest +from paddle.fluid import core + +alignment = 256 class TestAllocContinuousSpace(OpTest): @@ -29,11 +31,11 @@ class TestAllocContinuousSpace(OpTest): self.constant = attrs["constant"] self.set_constant = attrs["set_constant"] self.Inputs = self.init_input() - self.FusedOutput = self.init_output(self.Inputs, self.set_constant, - self.constant) + self.Outputs, self.FusedOutput = self.init_output( + self.Inputs, self.set_constant, self.constant) self.inputs = {'Input': self.Inputs} self.attrs = attrs - self.outputs = {'Output': self.Inputs, 'FusedOutput': self.FusedOutput} + self.outputs = {'Output': self.Outputs, 'FusedOutput': self.FusedOutput} def init_dtype(self): self.dtype = np.float32 @@ -52,14 +54,31 @@ class TestAllocContinuousSpace(OpTest): return {"copy_data": True, "set_constant": False, "constant": 0.0} def init_output(self, input_list, set_constant, constant): - inputs = [input[1].flatten() for input in input_list] - output = np.concatenate(inputs) + inputs = [] + outputs = input_list + + for input in input_list: + length = len(input[1].flatten()) + aligned_len = (length + alignment) / alignment * alignment + out = np.zeros(int(aligned_len)) + out[0:length] = input[1].flatten() + inputs.append(out) + + alloc_continuous_space_var = np.concatenate([input for input in inputs]) if set_constant: - output = np.ones((len(output))) * constant - return output + alloc_continuous_space_var = np.ones( + (len(alloc_continuous_space_var))) * constant + outputs = [(out[0], + np.ones(out[1].shape).astype(self.dtype) * constant) + for out in outputs] + return outputs, alloc_continuous_space_var def test_check_output(self): - self.check_output() + if core.is_compiled_with_cuda(): + self.check_output_with_place( + place=core.CUDAPlace(0), + no_check_set=["FusedOutput"], + atol=1e-5) class TestAllocContinuousSpace2(TestAllocContinuousSpace): @@ -67,7 +86,11 @@ class TestAllocContinuousSpace2(TestAllocContinuousSpace): return {"copy_data": False, "set_constant": True, "constant": 0.5} def test_check_output(self): - self.check_output(no_check_set=["Output"]) + if core.is_compiled_with_cuda(): + self.check_output_with_place( + place=core.CUDAPlace(0), + no_check_set=["FusedOutput"], + atol=1e-5) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_fuse_optimizer_pass.py b/python/paddle/fluid/tests/unittests/test_fuse_optimizer_pass.py new file mode 100644 index 0000000000000000000000000000000000000000..93e67deaf3c9f7fe17296049137fbbe00374c6f1 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fuse_optimizer_pass.py @@ -0,0 +1,135 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from parallel_executor_test_base import TestParallelExecutorBase +import paddle.fluid as fluid +import paddle.fluid.core as core +import numpy as np +import paddle +import paddle.dataset.mnist as mnist +import unittest +import os + + +def simple_fc_net(use_feed): + img = fluid.layers.data(name='image', shape=[784], dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + hidden = img + for _ in range(4): + hidden = fluid.layers.fc( + hidden, + size=200, + act='relu', + bias_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=1.0))) + prediction = fluid.layers.fc(hidden, size=10, act='softmax') + loss = fluid.layers.cross_entropy(input=prediction, label=label) + loss = fluid.layers.mean(loss) + return loss + + +def fc_with_batchnorm(use_feed): + img = fluid.layers.data(name='image', shape=[784], dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + hidden = img + for _ in range(2): + hidden = fluid.layers.fc( + hidden, + size=200, + act='relu', + bias_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=1.0))) + + hidden = fluid.layers.batch_norm(input=hidden) + + prediction = fluid.layers.fc(hidden, size=10, act='softmax') + loss = fluid.layers.cross_entropy(input=prediction, label=label) + loss = fluid.layers.mean(loss) + return loss + + +class TestFuseAdamOps(TestParallelExecutorBase): + @classmethod + def setUpClass(cls): + os.environ['CPU_NUM'] = str(4) + + def _init_data(self, random=True): + np.random.seed(5) + if random: + img = np.random.random(size=[32, 784]).astype(np.float32) + else: + img = np.ones(shape=[32, 784], dtype='float32') + label = np.ones(shape=[32, 1], dtype='int64') + return img, label + + def _compare_fused_optimizer_ops(self, + model, + use_cuda, + random_data=True, + optimizer=fluid.optimizer.Adam): + if use_cuda and not core.is_compiled_with_cuda(): + return + img, label = self._init_data(random_data) + not_fuse_op_first_loss, not_fuse_op_last_loss = self.check_network_convergence( + model, + feed_dict={"image": img, + "label": label}, + use_cuda=use_cuda, + fuse_all_optimizer_ops=False, + memory_opt=False, # avoid the gradient's name changed in Python side. + optimizer=optimizer) + fuse_op_first_loss, fuse_op_last_loss = self.check_network_convergence( + model, + feed_dict={"image": img, + "label": label}, + use_cuda=use_cuda, + fuse_all_optimizer_ops=True, + memory_opt=False, # avoid the gradient's name changed in Python side. + optimizer=optimizer) + + for loss in zip(not_fuse_op_first_loss, fuse_op_first_loss): + self.assertAlmostEquals(loss[0], loss[1], delta=1e-6) + for loss in zip(not_fuse_op_last_loss, fuse_op_last_loss): + self.assertAlmostEquals(loss[0], loss[1], delta=1e-6) + + def test_simple_fc_with_fuse_op(self): + self._compare_fused_optimizer_ops(simple_fc_net, True) + self._compare_fused_optimizer_ops(simple_fc_net, False) + + def test_batchnorm_fc_with_fuse_op(self): + self._compare_fused_optimizer_ops(fc_with_batchnorm, True) + # self._compare_fused_optimizer_ops(fc_with_batchnorm, False) + + +class TestFuseSGDOps(TestFuseAdamOps): + def sgd_optimizer(self, learning_rate=1e-4): + return fluid.optimizer.SGD(learning_rate=learning_rate) + + def test_simple_fc_with_fuse_op(self): + self._compare_fused_optimizer_ops( + simple_fc_net, True, optimizer=self.sgd_optimizer) + self._compare_fused_optimizer_ops( + simple_fc_net, False, optimizer=self.sgd_optimizer) + + def test_batchnorm_fc_with_fuse_op(self): + self._compare_fused_optimizer_ops( + fc_with_batchnorm, True, optimizer=self.sgd_optimizer) + self._compare_fused_optimizer_ops( + fc_with_batchnorm, False, optimizer=self.sgd_optimizer) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_crf.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_crf.py index ba63213a410b8b2579b6842c5a6ecd720c7957b3..6671a2def3cccd2acd76025e73486b06b4bb1471 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_crf.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_crf.py @@ -61,6 +61,11 @@ def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark, param_attr=fluid.ParamAttr( name=embedding_name, trainable=False)) for x in word_input ] + # TODO(zcd): if the parameter is not trainable, the + # parameter's gradient should not generated. + for emb_layer in emb_layers: + emb_layer.stop_gradient = True + emb_layers.append(predicate_embedding) emb_layers.append(mark_embedding) @@ -113,60 +118,62 @@ class TestCRFModel(unittest.TestCase): os.environ['CPU_NUM'] = str(4) main = fluid.Program() startup = fluid.Program() - with fluid.program_guard(main, startup): - word = fluid.layers.data( - name='word_data', shape=[1], dtype='int64', lod_level=1) - predicate = fluid.layers.data( - name='verb_data', shape=[1], dtype='int64', lod_level=1) - ctx_n2 = fluid.layers.data( - name='ctx_n2_data', shape=[1], dtype='int64', lod_level=1) - ctx_n1 = fluid.layers.data( - name='ctx_n1_data', shape=[1], dtype='int64', lod_level=1) - ctx_0 = fluid.layers.data( - name='ctx_0_data', shape=[1], dtype='int64', lod_level=1) - ctx_p1 = fluid.layers.data( - name='ctx_p1_data', shape=[1], dtype='int64', lod_level=1) - ctx_p2 = fluid.layers.data( - name='ctx_p2_data', shape=[1], dtype='int64', lod_level=1) - mark = fluid.layers.data( - name='mark_data', shape=[1], dtype='int64', lod_level=1) - - feature_out = db_lstm(**locals()) - target = fluid.layers.data( - name='target', shape=[1], dtype='int64', lod_level=1) - crf_cost = fluid.layers.linear_chain_crf( - input=feature_out, - label=target, - param_attr=fluid.ParamAttr( - name='crfw', learning_rate=1e-1)) - avg_cost = fluid.layers.mean(crf_cost) - - sgd_optimizer = fluid.optimizer.SGD( - learning_rate=fluid.layers.exponential_decay( - learning_rate=0.01, - decay_steps=100000, - decay_rate=0.5, - staircase=True)) - sgd_optimizer.minimize(avg_cost) - - train_data = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.conll05.test(), buf_size=8192), - batch_size=16) - - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(startup) - - train_cp = compiler.CompiledProgram(main).with_data_parallel( - loss_name=avg_cost.name, build_strategy=build_strategy) - - feeder = fluid.DataFeeder( - feed_list=[ - word, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, predicate, - mark, target - ], - place=fluid.CPUPlace()) + scope = fluid.Scope() + with fluid.scope_guard(scope): + with fluid.program_guard(main, startup): + word = fluid.layers.data( + name='word_data', shape=[1], dtype='int64', lod_level=1) + predicate = fluid.layers.data( + name='verb_data', shape=[1], dtype='int64', lod_level=1) + ctx_n2 = fluid.layers.data( + name='ctx_n2_data', shape=[1], dtype='int64', lod_level=1) + ctx_n1 = fluid.layers.data( + name='ctx_n1_data', shape=[1], dtype='int64', lod_level=1) + ctx_0 = fluid.layers.data( + name='ctx_0_data', shape=[1], dtype='int64', lod_level=1) + ctx_p1 = fluid.layers.data( + name='ctx_p1_data', shape=[1], dtype='int64', lod_level=1) + ctx_p2 = fluid.layers.data( + name='ctx_p2_data', shape=[1], dtype='int64', lod_level=1) + mark = fluid.layers.data( + name='mark_data', shape=[1], dtype='int64', lod_level=1) + + feature_out = db_lstm(**locals()) + target = fluid.layers.data( + name='target', shape=[1], dtype='int64', lod_level=1) + crf_cost = fluid.layers.linear_chain_crf( + input=feature_out, + label=target, + param_attr=fluid.ParamAttr( + name='crfw', learning_rate=1e-1)) + avg_cost = fluid.layers.mean(crf_cost) + + sgd_optimizer = fluid.optimizer.SGD( + learning_rate=fluid.layers.exponential_decay( + learning_rate=0.01, + decay_steps=100000, + decay_rate=0.5, + staircase=True)) + sgd_optimizer.minimize(avg_cost) + + train_data = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.conll05.test(), buf_size=8192), + batch_size=16) + + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(startup) + + train_cp = compiler.CompiledProgram(main).with_data_parallel( + loss_name=avg_cost.name, build_strategy=build_strategy) + + feeder = fluid.DataFeeder( + feed_list=[ + word, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, predicate, + mark, target + ], + place=fluid.CPUPlace()) data = train_data() for i in range(10): diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_dry_run.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_dry_run.py index 17f8f5a0b4f753aabe8af3f97c2018cd2cf54dc1..d0eca7d6dfbdf03828125508c798a9bd31f8bbd6 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_dry_run.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_dry_run.py @@ -41,14 +41,15 @@ class TestBase(unittest.TestCase): fluid.CUDAPlace(0) if use_gpu else fluid.CPUPlace()) exe.run(startup_prog) - for _ in six.moves.xrange(iter): - exe_strategy = fluid.ExecutionStrategy() - exe_strategy._dry_run = True - exe_strategy.use_experimental_executor = use_experimental_executor - train_cp = compiler.CompiledProgram(main_prog).with_data_parallel( - loss_name=loss.name, exec_strategy=exe_strategy) - for _ in six.moves.xrange(iter_per_pe): - exe.run(train_cp) + exe_strategy = fluid.ExecutionStrategy() + exe_strategy._dry_run = True + exe_strategy.use_experimental_executor = use_experimental_executor + train_cp = compiler.CompiledProgram( + main_prog).with_data_parallel( + loss_name=loss.name, exec_strategy=exe_strategy) + for _ in six.moves.xrange(iter): + for _ in six.moves.xrange(iter_per_pe): + exe.run(train_cp) class TestMNISTDryRun(TestBase):