diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index d6b5ad4570c1d8402dedb8596cc75d9eae5a91c7..93288936fea1fae897dc26e6d8850da612960333 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -39,11 +39,12 @@ if (WITH_GPU) endif() cc_library(sequential_execution_pass SRCS sequential_execution_pass.cc DEPS graph graph_helper pass) +cc_library(all_reduce_deps_pass SRCS all_reduce_deps_pass.cc DEPS graph graph_helper pass) cc_library(multi_devices_graph_pass SRCS multi_devices_graph_pass.cc DEPS multi_devices_helper computation_op_handle scale_loss_grad_op_handle rpc_op_handle all_reduce_op_handle reduce_op_handle broadcast_op_handle data_balance_op_handle fused_broadcast_op_handle) -set(SSA_GRAPH_EXECUTOR_DEPS graph framework_proto sequential_execution_pass modify_op_lock_and_record_event_pass) +set(SSA_GRAPH_EXECUTOR_DEPS graph framework_proto sequential_execution_pass modify_op_lock_and_record_event_pass all_reduce_deps_pass) if (WITH_GPU) list(APPEND SSA_GRAPH_EXECUTOR_DEPS reference_count_pass) endif() diff --git a/paddle/fluid/framework/details/all_reduce_deps_pass.cc b/paddle/fluid/framework/details/all_reduce_deps_pass.cc new file mode 100644 index 0000000000000000000000000000000000000000..fe21e21bcfc42bfb3251a7d0d15aa5926f56813f --- /dev/null +++ b/paddle/fluid/framework/details/all_reduce_deps_pass.cc @@ -0,0 +1,125 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include "paddle/fluid/framework/details/all_reduce_deps_pass.h" +#include "paddle/fluid/framework/details/all_reduce_op_handle.h" +#include "paddle/fluid/framework/details/multi_devices_helper.h" +#include "paddle/fluid/framework/details/op_graph_view.h" +#include "paddle/fluid/framework/details/var_handle.h" +#include "paddle/fluid/framework/ir/graph_helper.h" +#include "paddle/fluid/framework/op_proto_maker.h" + +namespace paddle { +namespace framework { +namespace details { + +static constexpr char kAllOpDescs[] = "all_op_descs"; + +VarHandle* GetValidInput(const OpHandleBase* a) { + for (auto p : a->Inputs()) { + VarHandle* b = dynamic_cast(p); + if (b) { + return b; + } + } + + return nullptr; +} + +std::unique_ptr AllReduceDepsPass::ApplyImpl( + std::unique_ptr graph) const { + auto graph_ops = ir::FilterByNodeWrapper(*graph); + + // get vars order + int order = 0; + std::unordered_map vars; + // TODO(gongwb): use graph topology sort to find the order of operators. + // Note that must assert topology sort is stable + auto& ops = Get>(kAllOpDescs); + for (auto* op_desc : ops) { + auto outputs = op_desc->Outputs(); + for (auto& o_it : outputs) { + for (auto& v : o_it.second) { // values + vars[v] = order; + } + } + order++; + } + + std::vector dist_ops; + // get allreduce ops. + for (auto& op : graph_ops) { + // FIXME(gongwb):add broad cast. + if (op->Name() == "all_reduce" || op->Name() == "reduce") { + dist_ops.push_back(op); + } + } + + VLOG(10) << "dist_ops size:" << dist_ops.size() << std::endl; + + std::sort(dist_ops.begin(), dist_ops.end(), [&](OpHandleBase* op1, + OpHandleBase* op2) { + VarHandle* i0 = dynamic_cast(GetValidInput(op1)); + VarHandle* i1 = dynamic_cast(GetValidInput(op2)); + + PADDLE_ENFORCE(i0 != nullptr && i1 != nullptr, "%s convert to %s error", + op1->DebugString(), op2->DebugString()); + + auto l_it = vars.find(i0->name_); + auto r_it = vars.find(i1->name_); + + if (l_it->second < r_it->second) return true; + + if (l_it->second == r_it->second) { + return i0->name_ < i1->name_; + } + + return false; + }); + + // add dependency. + auto& sorted_ops = dist_ops; + for (size_t i = 1; i < sorted_ops.size(); ++i) { + auto* dep_var = new DummyVarHandle(graph->CreateControlDepVar()); + + auto* pre_op = sorted_ops[i - 1]; + auto* op = sorted_ops[i]; + + pre_op->AddOutput(dep_var); + op->AddInput(dep_var); + graph->Get(kGraphDepVars).emplace(dep_var); + + VLOG(10) << "add all_reduce sequential dependencies between " << pre_op + << " and " << op; + + VLOG(10) << "pre_op:" << pre_op->DebugString() + << ", op:" << op->DebugString(); + } + + return graph; +} + +} // namespace details +} // namespace framework +} // namespace paddle + +REGISTER_PASS(all_reduce_deps_pass, + paddle::framework::details::AllReduceDepsPass) + .RequirePassAttr(paddle::framework::details::kAllOpDescs); diff --git a/paddle/fluid/framework/details/all_reduce_deps_pass.h b/paddle/fluid/framework/details/all_reduce_deps_pass.h new file mode 100644 index 0000000000000000000000000000000000000000..e8b91089816c71bc56ba7dba0105e85d73eb52ad --- /dev/null +++ b/paddle/fluid/framework/details/all_reduce_deps_pass.h @@ -0,0 +1,33 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "paddle/fluid/framework/ir/graph.h" +#include "paddle/fluid/framework/ir/pass.h" + +namespace paddle { +namespace framework { +namespace details { + +// TODO(gongwb): overlap allreduce with backward computation. +class AllReduceDepsPass : public ir::Pass { + protected: + std::unique_ptr ApplyImpl( + std::unique_ptr graph) const override; +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index 70baced0ada33c23ba05cd2722e607edf847585a..523f9eadf2d7e2e08504c5920372fb7cdb0d7aba 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -16,6 +16,7 @@ limitations under the License. */ #include "paddle/fluid/framework/details/multi_devices_graph_check_pass.h" #include "paddle/fluid/framework/details/multi_devices_graph_print_pass.h" +#include "paddle/fluid/framework/details/reduce_op_handle.h" #include "paddle/fluid/framework/details/sequential_execution_pass.h" #include "paddle/fluid/framework/ir/graph.h" #include "paddle/fluid/framework/ir/graph_viz_pass.h" @@ -24,6 +25,10 @@ namespace paddle { namespace framework { namespace details { +static inline bool SeqOnlyAllReduceOps(const BuildStrategy &strategy) { + return (!strategy.enable_sequential_execution_ && strategy.num_trainers_ > 1); +} + class ParallelExecutorPassBuilder : public ir::PassBuilder { public: explicit ParallelExecutorPassBuilder(const BuildStrategy &strategy) @@ -70,6 +75,10 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { // Verify that the graph is correct for multi-device executor. AppendPass("multi_devices_check_pass"); + if (SeqOnlyAllReduceOps(strategy)) { + AppendPass("all_reduce_deps_pass"); + } + if (strategy_.remove_unnecessary_lock_) { AppendPass("modify_op_lock_and_record_event_pass"); } @@ -124,6 +133,17 @@ std::unique_ptr BuildStrategy::Apply( pass->SetNotOwned("nccl_ctxs", nctx); #endif } else if (pass->Type() == "sequential_execution_pass") { + VLOG(1) << "set enable_sequential_execution:" + << enable_sequential_execution_; + + pass->Erase(kAllOpDescs); + pass->Set>( + kAllOpDescs, + new std::vector(main_program.Block(0).AllOps())); + } else if (pass->Type() == "all_reduce_deps_pass") { + VLOG(1) << "SeqOnlyAllReduceOps:" << SeqOnlyAllReduceOps(*this) + << ", num_trainers:" << num_trainers_; + pass->Erase(kAllOpDescs); pass->Set>( kAllOpDescs, @@ -144,4 +164,5 @@ USE_PASS(multi_devices_pass); USE_PASS(multi_devices_check_pass); USE_PASS(multi_devices_print_pass); USE_PASS(sequential_execution_pass); +USE_PASS(all_reduce_deps_pass); USE_PASS(modify_op_lock_and_record_event_pass); diff --git a/paddle/fluid/framework/details/build_strategy.h b/paddle/fluid/framework/details/build_strategy.h index 3236c35efdbf1175c3d06e531fc551f202ae17ad..9f0a25912886cea7a1f287125cfe8612e4b336eb 100644 --- a/paddle/fluid/framework/details/build_strategy.h +++ b/paddle/fluid/framework/details/build_strategy.h @@ -73,6 +73,7 @@ struct BuildStrategy { bool fuse_broadcast_op_{false}; + int num_trainers_{1}; bool remove_unnecessary_lock_{false}; // NOTE: diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index a2a629acdfe65ae250164dad4c2367d525887acf..e31c2f217322be8ef8b131189504b54cf6b4ad80 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -860,6 +860,12 @@ All parameter, weight, gradient are variables in Paddle. self.remove_unnecessary_lock_ = b; }, R"DOC(The type is BOOL. If set True, some locks in GPU ops would be released and ParallelExecutor would run faster. Default False.)DOC") + .def_property( + "num_trainers", + [](const BuildStrategy &self) { return self.num_trainers_; }, + [](BuildStrategy &self, int num_trainers) { + self.num_trainers_ = num_trainers; + }) .def_property( "fuse_elewise_add_act_ops", [](const BuildStrategy &self) { diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 3f4dd5eb712e738bbee8f93c062375033b8ab2f6..bdcd045341212d6cf9dbfbc3cebc72f320e37e9d 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -124,16 +124,11 @@ class ParallelExecutor(object): os.environ.get('CPU_NUM', multiprocessing.cpu_count())) exec_strategy.num_threads = cpu_num * 2 - # Set 1 thread num under nccl2 distribute - # env to make sure all gpus run ops in same order. - if num_trainers > 1: - assert (use_cuda) - # FIXME(gongwb): avoid this set. - exec_strategy.num_threads = 1 - if build_strategy is None: build_strategy = BuildStrategy() + build_strategy.num_trainers = num_trainers + main = main_program main = main if main else framework.default_main_program() if scope == None: