diff --git a/paddle/fluid/framework/details/all_reduce_deps_pass.cc b/paddle/fluid/framework/details/all_reduce_deps_pass.cc index 878b950858a71ba0e10ab2643667922420d29099..c44793cd11d22b29b4b3422a047d81fe26624982 100644 --- a/paddle/fluid/framework/details/all_reduce_deps_pass.cc +++ b/paddle/fluid/framework/details/all_reduce_deps_pass.cc @@ -13,125 +13,186 @@ // limitations under the License. #include -#include +#include #include #include #include +#include #include -#include "paddle/fluid/framework/details/all_reduce_deps_pass.h" #include "paddle/fluid/framework/details/all_reduce_op_handle.h" +#include "paddle/fluid/framework/details/container_cast.h" #include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/details/op_graph_view.h" -#include "paddle/fluid/framework/details/var_handle.h" +#include "paddle/fluid/framework/ir/graph.h" #include "paddle/fluid/framework/ir/graph_helper.h" +#include "paddle/fluid/framework/ir/pass.h" #include "paddle/fluid/framework/op_proto_maker.h" namespace paddle { namespace framework { namespace details { -VarHandle* GetValidInput(const OpHandleBase* a) { - for (auto p : a->Inputs()) { - VarHandle* b = dynamic_cast(p); - if (b) { - return b; +class AllReduceDepsPass : public ir::Pass { + protected: + void ApplyImpl(ir::Graph* graph) const override { + std::vector all_reduce_op_handles = + GetSortedAllReduceOps(*graph); + + for (size_t i = 1; i < all_reduce_op_handles.size(); ++i) { + auto* dep_var = new DummyVarHandle(graph->CreateControlDepVar()); + graph->Get(kGraphDepVars).emplace(dep_var); + all_reduce_op_handles[i - 1]->AddOutput(dep_var); + all_reduce_op_handles[i]->AddInput(dep_var); } - } - return nullptr; -} - -void AllReduceDepsPass::ApplyImpl(ir::Graph* graph) const { - auto graph_ops = ir::FilterByNodeWrapper(*graph); - - // get vars order - int order = 0; - std::unordered_map vars; - // TODO(gongwb): use graph topology sort to find the order of operators. - // Note that must assert topology sort is stable - auto& ops = graph->Get>(kStaleProgramOpDescs); - for (auto* op_desc : ops) { - try { - bool is_bk_op = - static_cast(boost::get(op_desc->GetAttr( - OpProtoAndCheckerMaker::OpRoleAttrName())) & - static_cast(OpRole::kBackward)); - if (!is_bk_op) continue; - - auto backward_vars = - boost::get>(op_desc->GetNullableAttr( - OpProtoAndCheckerMaker::OpRoleVarAttrName())); - PADDLE_ENFORCE_EQ(backward_vars.size() % 2, 0); - - auto outputs = op_desc->Outputs(); - for (auto& o_it : outputs) { - for (auto& v : o_it.second) { // values - vars[v] = order; - VLOG(10) << "in all_reduce_deps_pass:" << v; - } - } - order++; - } catch (boost::bad_get e) { + if (VLOG_IS_ON(10)) { + DebugString(*graph, all_reduce_op_handles); } } - std::vector dist_ops; - // get allreduce ops. - for (auto& op : graph_ops) { - // FIXME(gongwb):add broad cast. - if (op->Name() == "all_reduce" || op->Name() == "reduce") { - dist_ops.push_back(op); + std::vector GetSortedAllReduceOps( + const ir::Graph& graph) const { + std::vector all_reduce_op_handles; + std::unordered_map pending_ops; + std::unordered_set ready_ops; + std::unordered_set next_ready_ops; + + auto op_handles = ir::FilterByNodeWrapper(graph); + size_t num_of_ops = op_handles.size(); + for (OpHandleBase* op : op_handles) { + size_t not_ready_vars = op->NotReadyInputSize(); + if (not_ready_vars) { + pending_ops.insert({op, not_ready_vars}); + } else { + ready_ops.insert(op); + } } - } - - VLOG(10) << "dist_ops size:" << dist_ops.size() - << ", outputs size:" << vars.size() << ", ops size:" << ops.size(); - - std::sort(dist_ops.begin(), dist_ops.end(), [&](OpHandleBase* op1, - OpHandleBase* op2) { - VarHandle* i0 = dynamic_cast(GetValidInput(op1)); - VarHandle* i1 = dynamic_cast(GetValidInput(op2)); - - PADDLE_ENFORCE(i0 != nullptr && i1 != nullptr, "%s convert to %s error", - op1->DebugString(), op2->DebugString()); - auto l_it = vars.find(i0->name()); - auto r_it = vars.find(i1->name()); - - PADDLE_ENFORCE(l_it != vars.end() && r_it != vars.end(), - "can't find var's name %s and %s in opdesc", i0->name(), - i1->name()); - - if (l_it->second < r_it->second) return true; + GetSortedAllReduceOps(ready_ops, &all_reduce_op_handles); + + size_t has_run_ops = ready_ops.size(); + while (has_run_ops != num_of_ops) { + for (auto* op : ready_ops) { + for (auto& ready_var : op->Outputs()) { + for (auto* pend_op : ready_var->PendingOps()) { + auto& deps = --pending_ops[pend_op]; + if (deps == 0) { + next_ready_ops.insert(pend_op); + } + } + } + } - if (l_it->second == r_it->second) { - return i0->name() < i1->name(); + PADDLE_ENFORCE_NE(next_ready_ops.size(), 0, "There maybe have a cycle."); + ready_ops.clear(); + std::swap(ready_ops, next_ready_ops); + GetSortedAllReduceOps(ready_ops, &all_reduce_op_handles); + has_run_ops += ready_ops.size(); } + return all_reduce_op_handles; + } - return false; - }); - - // add dependency. - auto& sorted_ops = dist_ops; - for (size_t i = 1; i < sorted_ops.size(); ++i) { - auto* dep_var = new DummyVarHandle(graph->CreateControlDepVar()); - - auto* pre_op = sorted_ops[i - 1]; - auto* op = sorted_ops[i]; - - pre_op->AddOutput(dep_var); - op->AddInput(dep_var); - graph->Get(kGraphDepVars).emplace(dep_var); + void GetSortedAllReduceOps( + const std::unordered_set& ready_ops, + std::vector* all_reduce_op_handles) const { + std::vector current_all_reduce_op_handles; + for (auto& op_handle : ready_ops) { + auto all_reduce_op_handle = dynamic_cast(op_handle); + if (all_reduce_op_handle) { + current_all_reduce_op_handles.emplace_back(all_reduce_op_handle); + } + } - VLOG(10) << "add all_reduce sequential dependencies between " << pre_op - << " and " << op; + // NOTE(zcd): For distributed training, it is important to keep the order of + // allReduce on each node consistent. Otherwise, hang may occur. + // Sort the current_all_reduce_op_handles according to the name of input. + sort(current_all_reduce_op_handles.begin(), + current_all_reduce_op_handles.end(), + [](const AllReduceOpHandle* left, + const AllReduceOpHandle* right) -> bool { + auto left_in_vars = DynamicCast(left->Inputs()); + auto right_in_vars = DynamicCast(right->Inputs()); + PADDLE_ENFORCE_GT(left_in_vars.size(), 0); + PADDLE_ENFORCE_EQ(left_in_vars.size(), right_in_vars.size()); + return left_in_vars[0]->Name() > right_in_vars[0]->Name(); + }); + + all_reduce_op_handles->insert(all_reduce_op_handles->end(), + current_all_reduce_op_handles.begin(), + current_all_reduce_op_handles.end()); + } - VLOG(10) << "pre_op:" << pre_op->DebugString() - << ", op:" << op->DebugString(); + void DebugString( + const ir::Graph& graph, + const std::vector& all_reduce_op_handles) const { + // get vars order + std::map> vars = + GetSoredGradientsFromStaleProgram(graph); + std::stringstream out; + size_t grads_of_stale_program = 0; + out << "Get Order From kStaleProgramOpDescs: "; + for (auto& var : vars) { + out << "Order " << var.first << " ["; + for (auto& var_name : var.second) { + out << var_name << ", "; + ++grads_of_stale_program; + } + out << "], "; + } + VLOG(10) << out.str(); + + std::stringstream out2; + out2 << "Get Order From Topological order: "; + for (auto& op : all_reduce_op_handles) { + bool find_valid_input = false; + for (auto& in_var : op->Inputs()) { + if (dynamic_cast(in_var)) { + out2 << in_var->Name() << ", "; + find_valid_input = true; + break; + } + } + PADDLE_ENFORCE(find_valid_input, "Doesn't find valid input."); + } + VLOG(10) << out2.str(); + if (grads_of_stale_program != all_reduce_op_handles.size()) { + VLOG(10) + << "The gradients number of stale program and graph is not equal."; + } } -} + std::map> GetSoredGradientsFromStaleProgram( + const ir::Graph& graph) const { + std::map> vars; + auto ops = graph.Get>(kStaleProgramOpDescs); + int order = 0; + for (auto* op_desc : ops) { + try { + bool is_bk_op = + static_cast(boost::get(op_desc->GetAttr( + OpProtoAndCheckerMaker::OpRoleAttrName())) & + static_cast(OpRole::kBackward)); + if (!is_bk_op) continue; + + auto backward_vars = + boost::get>(op_desc->GetNullableAttr( + OpProtoAndCheckerMaker::OpRoleVarAttrName())); + if (backward_vars.empty()) continue; + + PADDLE_ENFORCE_EQ(backward_vars.size() % 2, 0); + for (size_t i = 1; i < backward_vars.size(); i += 2) { + vars[order].emplace_back(backward_vars[i]); + VLOG(1) << "get parameter and gradient: " << backward_vars[i - 1] + << ", " << backward_vars[i]; + } + order++; + } catch (boost::bad_get e) { + } + } + return vars; + } +}; } // namespace details } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/details/all_reduce_deps_pass.h b/paddle/fluid/framework/details/all_reduce_deps_pass.h deleted file mode 100644 index 4ed3736587aa3d45e288e3dc7e6ab3099f935f41..0000000000000000000000000000000000000000 --- a/paddle/fluid/framework/details/all_reduce_deps_pass.h +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include "paddle/fluid/framework/ir/graph.h" -#include "paddle/fluid/framework/ir/pass.h" - -namespace paddle { -namespace framework { -namespace details { - -// TODO(gongwb): overlap allreduce with backward computation. -class AllReduceDepsPass : public ir::Pass { - protected: - void ApplyImpl(ir::Graph* graph) const override; -}; - -} // namespace details -} // namespace framework -} // namespace paddle diff --git a/paddle/fluid/framework/details/all_reduce_op_handle.cc b/paddle/fluid/framework/details/all_reduce_op_handle.cc index 6e477cd2977561ddb914e4a6343f677044fad4be..ed75b48090b27a9c430afe067467a1a39d711938 100644 --- a/paddle/fluid/framework/details/all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/all_reduce_op_handle.cc @@ -28,7 +28,7 @@ // asynchronous nccl allreduce or synchronous issue: // https://github.com/PaddlePaddle/Paddle/issues/15049 DEFINE_bool( - sync_nccl_allreduce, false, + sync_nccl_allreduce, true, "If set true, will call `cudaStreamSynchronize(nccl_stream)`" "after allreduce, this mode can get better performance in some scenarios."); diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index df69b11ec6ae3bb08ba03b749c69eb718525de4d..36720b8ad97c6cacfccc106066bcdbe20c39ff47 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -163,15 +163,11 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { "graph_printer", new details::GraphvizSSAGraphPrinter); } - // Verify that the graph is correct for multi-device executor. - AppendPass("multi_devices_check_pass"); - - if (VLOG_IS_ON(2)) { - AppendPass("all_reduce_deps_pass"); - } - - if (SeqOnlyAllReduceOps(strategy_)) { - VLOG(10) << "Add all_reduce_deps_pass"; + // experimental shows that the program will be faster if append + // all_reduce_deps_pass here. + if (!strategy_.enable_parallel_graph_ && + (SeqOnlyAllReduceOps(strategy_) || + strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce)) { AppendPass("all_reduce_deps_pass"); } @@ -179,6 +175,9 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { VLOG(10) << "Add modify_op_lock_and_record_event_pass"; AppendPass("modify_op_lock_and_record_event_pass"); } + + // Verify that the graph is correct for multi-device executor. + AppendPass("multi_devices_check_pass"); } // Convert graph to run on multi-devices. diff --git a/paddle/fluid/framework/details/op_handle_base.cc b/paddle/fluid/framework/details/op_handle_base.cc index 413b14961631b3459e0d05af685ad1c5395844c2..69cd84ebf2d678c089141f09a92c46e3a03fe4d9 100644 --- a/paddle/fluid/framework/details/op_handle_base.cc +++ b/paddle/fluid/framework/details/op_handle_base.cc @@ -68,7 +68,7 @@ void OpHandleBase::Run(bool use_cuda) { if (out_var_handle) { PADDLE_ENFORCE( platform::is_same_place(place, out_var_handle->place()), - "The place of input(%s) is not consistent with the " + "The place of output(%s) is not consistent with the " "place of current op(%s).", out_var_handle->Name(), Name()); out_var_handle->SetGenerateEvent(events_.at(dev_id)); diff --git a/paddle/fluid/framework/ir/multi_batch_merge_pass.cc b/paddle/fluid/framework/ir/multi_batch_merge_pass.cc index dcc48fb934e7a06f2e85fa34fde335261f551415..a8720ff4bfb5c7fa7aee6d23949b030c328b90e6 100644 --- a/paddle/fluid/framework/ir/multi_batch_merge_pass.cc +++ b/paddle/fluid/framework/ir/multi_batch_merge_pass.cc @@ -84,7 +84,8 @@ void BatchMergePass::ApplyImpl(ir::Graph* graph) const { // 1. record op nodes of different roles for (auto node : nodes) { - if (node->IsVar()) continue; + if (!node->IsOp()) continue; + PADDLE_ENFORCE(node->Op(), "must find opdesc"); int op_role = boost::get(node->Op()->GetAttr( framework::OpProtoAndCheckerMaker::OpRoleAttrName())); if ((op_role == static_cast(framework::OpRole::kForward)) || diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index ab0947c631fe9a409406b3b092972ae6512beae7..3f10daf56ebe0d7a351236b58514d7ad6bb6352e 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -19,17 +19,14 @@ limitations under the License. */ #include #include #include -#include "paddle/fluid/framework/ir/graph_helper.h" - -#include "paddle/fluid/framework/ir/graph.h" - -#include "paddle/fluid/framework/details/all_reduce_deps_pass.h" #include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h" #include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h" #include "paddle/fluid/framework/details/reference_count_pass_helper.h" #include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h" #include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h" +#include "paddle/fluid/framework/ir/graph.h" +#include "paddle/fluid/framework/ir/graph_helper.h" #include "paddle/fluid/platform/profiler.h" #ifdef WITH_GPERFTOOLS diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 9c0efe6d905929f87106f18ecf74a7915e39eba9..e75c4cb30a1b084e44f07119d4590785850c5d9c 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -139,8 +139,7 @@ class TestDistRunnerBase(object): pass_builder = None if args.batch_merge_repeat > 1: pass_builder = build_stra._finalize_strategy_and_create_passes() - mypass = pass_builder.insert_pass( - len(pass_builder.all_passes()) - 3, "multi_batch_merge_pass") + mypass = pass_builder.insert_pass(0, "multi_batch_merge_pass") mypass.set("num_repeats", args.batch_merge_repeat) if args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":