提交 ea2a2f77 编写于 作者: C chengduo 提交者: gongweibao

Fix the bug of AllReduceDepPass (#16393)

上级 ec2750b3
...@@ -13,44 +13,160 @@ ...@@ -13,44 +13,160 @@
// limitations under the License. // limitations under the License.
#include <algorithm> #include <algorithm>
#include <memory> #include <map>
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <unordered_set> #include <unordered_set>
#include <utility>
#include <vector> #include <vector>
#include "paddle/fluid/framework/details/all_reduce_deps_pass.h"
#include "paddle/fluid/framework/details/all_reduce_op_handle.h" #include "paddle/fluid/framework/details/all_reduce_op_handle.h"
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/details/op_graph_view.h" #include "paddle/fluid/framework/details/op_graph_view.h"
#include "paddle/fluid/framework/details/var_handle.h" #include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/ir/graph_helper.h" #include "paddle/fluid/framework/ir/graph_helper.h"
#include "paddle/fluid/framework/ir/pass.h"
#include "paddle/fluid/framework/op_proto_maker.h" #include "paddle/fluid/framework/op_proto_maker.h"
namespace paddle { namespace paddle {
namespace framework { namespace framework {
namespace details { namespace details {
VarHandle* GetValidInput(const OpHandleBase* a) { class AllReduceDepsPass : public ir::Pass {
for (auto p : a->Inputs()) { protected:
VarHandle* b = dynamic_cast<VarHandle*>(p); void ApplyImpl(ir::Graph* graph) const override {
if (b) { std::vector<AllReduceOpHandle*> all_reduce_op_handles =
return b; GetSortedAllReduceOps(*graph);
for (size_t i = 1; i < all_reduce_op_handles.size(); ++i) {
auto* dep_var = new DummyVarHandle(graph->CreateControlDepVar());
graph->Get<GraphDepVars>(kGraphDepVars).emplace(dep_var);
all_reduce_op_handles[i - 1]->AddOutput(dep_var);
all_reduce_op_handles[i]->AddInput(dep_var);
}
if (VLOG_IS_ON(10)) {
DebugString(*graph, all_reduce_op_handles);
}
}
std::vector<AllReduceOpHandle*> GetSortedAllReduceOps(
const ir::Graph& graph) const {
std::vector<AllReduceOpHandle*> all_reduce_op_handles;
std::unordered_map<OpHandleBase*, size_t> pending_ops;
std::unordered_set<OpHandleBase*> ready_ops;
std::unordered_set<OpHandleBase*> next_ready_ops;
auto op_handles = ir::FilterByNodeWrapper<OpHandleBase>(graph);
size_t num_of_ops = op_handles.size();
for (OpHandleBase* op : op_handles) {
size_t not_ready_vars = op->NotReadyInputSize();
if (not_ready_vars) {
pending_ops.insert({op, not_ready_vars});
} else {
ready_ops.insert(op);
} }
} }
return nullptr; GetSortedAllReduceOps(ready_ops, &all_reduce_op_handles);
}
void AllReduceDepsPass::ApplyImpl(ir::Graph* graph) const { size_t has_run_ops = ready_ops.size();
auto graph_ops = ir::FilterByNodeWrapper<OpHandleBase>(*graph); while (has_run_ops != num_of_ops) {
for (auto* op : ready_ops) {
for (auto& ready_var : op->Outputs()) {
for (auto* pend_op : ready_var->PendingOps()) {
auto& deps = --pending_ops[pend_op];
if (deps == 0) {
next_ready_ops.insert(pend_op);
}
}
}
}
PADDLE_ENFORCE_NE(next_ready_ops.size(), 0, "There maybe have a cycle.");
ready_ops.clear();
std::swap(ready_ops, next_ready_ops);
GetSortedAllReduceOps(ready_ops, &all_reduce_op_handles);
has_run_ops += ready_ops.size();
}
return all_reduce_op_handles;
}
void GetSortedAllReduceOps(
const std::unordered_set<OpHandleBase*>& ready_ops,
std::vector<AllReduceOpHandle*>* all_reduce_op_handles) const {
std::vector<AllReduceOpHandle*> current_all_reduce_op_handles;
for (auto& op_handle : ready_ops) {
auto all_reduce_op_handle = dynamic_cast<AllReduceOpHandle*>(op_handle);
if (all_reduce_op_handle) {
current_all_reduce_op_handles.emplace_back(all_reduce_op_handle);
}
}
// NOTE(zcd): For distributed training, it is important to keep the order of
// allReduce on each node consistent. Otherwise, hang may occur.
// Sort the current_all_reduce_op_handles according to the name of input.
sort(current_all_reduce_op_handles.begin(),
current_all_reduce_op_handles.end(),
[](const AllReduceOpHandle* left,
const AllReduceOpHandle* right) -> bool {
auto left_in_vars = DynamicCast<VarHandle>(left->Inputs());
auto right_in_vars = DynamicCast<VarHandle>(right->Inputs());
PADDLE_ENFORCE_GT(left_in_vars.size(), 0);
PADDLE_ENFORCE_EQ(left_in_vars.size(), right_in_vars.size());
return left_in_vars[0]->Name() > right_in_vars[0]->Name();
});
all_reduce_op_handles->insert(all_reduce_op_handles->end(),
current_all_reduce_op_handles.begin(),
current_all_reduce_op_handles.end());
}
void DebugString(
const ir::Graph& graph,
const std::vector<AllReduceOpHandle*>& all_reduce_op_handles) const {
// get vars order // get vars order
std::map<int, std::vector<std::string>> vars =
GetSoredGradientsFromStaleProgram(graph);
std::stringstream out;
size_t grads_of_stale_program = 0;
out << "Get Order From kStaleProgramOpDescs: ";
for (auto& var : vars) {
out << "Order " << var.first << " [";
for (auto& var_name : var.second) {
out << var_name << ", ";
++grads_of_stale_program;
}
out << "], ";
}
VLOG(10) << out.str();
std::stringstream out2;
out2 << "Get Order From Topological order: ";
for (auto& op : all_reduce_op_handles) {
bool find_valid_input = false;
for (auto& in_var : op->Inputs()) {
if (dynamic_cast<VarHandle*>(in_var)) {
out2 << in_var->Name() << ", ";
find_valid_input = true;
break;
}
}
PADDLE_ENFORCE(find_valid_input, "Doesn't find valid input.");
}
VLOG(10) << out2.str();
if (grads_of_stale_program != all_reduce_op_handles.size()) {
VLOG(10)
<< "The gradients number of stale program and graph is not equal.";
}
}
std::map<int, std::vector<std::string>> GetSoredGradientsFromStaleProgram(
const ir::Graph& graph) const {
std::map<int, std::vector<std::string>> vars;
auto ops = graph.Get<const std::vector<OpDesc*>>(kStaleProgramOpDescs);
int order = 0; int order = 0;
std::unordered_map<std::string, int> vars;
// TODO(gongwb): use graph topology sort to find the order of operators.
// Note that must assert topology sort is stable
auto& ops = graph->Get<const std::vector<OpDesc*>>(kStaleProgramOpDescs);
for (auto* op_desc : ops) { for (auto* op_desc : ops) {
try { try {
bool is_bk_op = bool is_bk_op =
...@@ -62,76 +178,21 @@ void AllReduceDepsPass::ApplyImpl(ir::Graph* graph) const { ...@@ -62,76 +178,21 @@ void AllReduceDepsPass::ApplyImpl(ir::Graph* graph) const {
auto backward_vars = auto backward_vars =
boost::get<std::vector<std::string>>(op_desc->GetNullableAttr( boost::get<std::vector<std::string>>(op_desc->GetNullableAttr(
OpProtoAndCheckerMaker::OpRoleVarAttrName())); OpProtoAndCheckerMaker::OpRoleVarAttrName()));
PADDLE_ENFORCE_EQ(backward_vars.size() % 2, 0); if (backward_vars.empty()) continue;
auto outputs = op_desc->Outputs(); PADDLE_ENFORCE_EQ(backward_vars.size() % 2, 0);
for (auto& o_it : outputs) { for (size_t i = 1; i < backward_vars.size(); i += 2) {
for (auto& v : o_it.second) { // values vars[order].emplace_back(backward_vars[i]);
vars[v] = order; VLOG(1) << "get parameter and gradient: " << backward_vars[i - 1]
VLOG(10) << "in all_reduce_deps_pass:" << v; << ", " << backward_vars[i];
}
} }
order++; order++;
} catch (boost::bad_get e) { } catch (boost::bad_get e) {
} }
} }
return vars;
std::vector<OpHandleBase*> dist_ops;
// get allreduce ops.
for (auto& op : graph_ops) {
// FIXME(gongwb):add broad cast.
if (op->Name() == "all_reduce" || op->Name() == "reduce") {
dist_ops.push_back(op);
}
} }
};
VLOG(10) << "dist_ops size:" << dist_ops.size()
<< ", outputs size:" << vars.size() << ", ops size:" << ops.size();
std::sort(dist_ops.begin(), dist_ops.end(), [&](OpHandleBase* op1,
OpHandleBase* op2) {
VarHandle* i0 = dynamic_cast<VarHandle*>(GetValidInput(op1));
VarHandle* i1 = dynamic_cast<VarHandle*>(GetValidInput(op2));
PADDLE_ENFORCE(i0 != nullptr && i1 != nullptr, "%s convert to %s error",
op1->DebugString(), op2->DebugString());
auto l_it = vars.find(i0->name());
auto r_it = vars.find(i1->name());
PADDLE_ENFORCE(l_it != vars.end() && r_it != vars.end(),
"can't find var's name %s and %s in opdesc", i0->name(),
i1->name());
if (l_it->second < r_it->second) return true;
if (l_it->second == r_it->second) {
return i0->name() < i1->name();
}
return false;
});
// add dependency.
auto& sorted_ops = dist_ops;
for (size_t i = 1; i < sorted_ops.size(); ++i) {
auto* dep_var = new DummyVarHandle(graph->CreateControlDepVar());
auto* pre_op = sorted_ops[i - 1];
auto* op = sorted_ops[i];
pre_op->AddOutput(dep_var);
op->AddInput(dep_var);
graph->Get<GraphDepVars>(kGraphDepVars).emplace(dep_var);
VLOG(10) << "add all_reduce sequential dependencies between " << pre_op
<< " and " << op;
VLOG(10) << "pre_op:" << pre_op->DebugString()
<< ", op:" << op->DebugString();
}
}
} // namespace details } // namespace details
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/ir/pass.h"
namespace paddle {
namespace framework {
namespace details {
// TODO(gongwb): overlap allreduce with backward computation.
class AllReduceDepsPass : public ir::Pass {
protected:
void ApplyImpl(ir::Graph* graph) const override;
};
} // namespace details
} // namespace framework
} // namespace paddle
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
// asynchronous nccl allreduce or synchronous issue: // asynchronous nccl allreduce or synchronous issue:
// https://github.com/PaddlePaddle/Paddle/issues/15049 // https://github.com/PaddlePaddle/Paddle/issues/15049
DEFINE_bool( DEFINE_bool(
sync_nccl_allreduce, false, sync_nccl_allreduce, true,
"If set true, will call `cudaStreamSynchronize(nccl_stream)`" "If set true, will call `cudaStreamSynchronize(nccl_stream)`"
"after allreduce, this mode can get better performance in some scenarios."); "after allreduce, this mode can get better performance in some scenarios.");
......
...@@ -163,14 +163,11 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { ...@@ -163,14 +163,11 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
"graph_printer", new details::GraphvizSSAGraphPrinter); "graph_printer", new details::GraphvizSSAGraphPrinter);
} }
// Verify that the graph is correct for multi-device executor. // experimental shows that the program will be faster if append
AppendPass("multi_devices_check_pass"); // all_reduce_deps_pass here.
if (!strategy_.enable_parallel_graph_ &&
if (VLOG_IS_ON(2)) { (SeqOnlyAllReduceOps(strategy_) ||
AppendPass("all_reduce_deps_pass"); strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce)) {
}
if (SeqOnlyAllReduceOps(strategy_)) {
VLOG(10) << "Add all_reduce_deps_pass"; VLOG(10) << "Add all_reduce_deps_pass";
AppendPass("all_reduce_deps_pass"); AppendPass("all_reduce_deps_pass");
} }
...@@ -179,6 +176,9 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { ...@@ -179,6 +176,9 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
VLOG(10) << "Add modify_op_lock_and_record_event_pass"; VLOG(10) << "Add modify_op_lock_and_record_event_pass";
AppendPass("modify_op_lock_and_record_event_pass"); AppendPass("modify_op_lock_and_record_event_pass");
} }
// Verify that the graph is correct for multi-device executor.
AppendPass("multi_devices_check_pass");
} }
// Convert graph to run on multi-devices. // Convert graph to run on multi-devices.
......
...@@ -68,7 +68,7 @@ void OpHandleBase::Run(bool use_cuda) { ...@@ -68,7 +68,7 @@ void OpHandleBase::Run(bool use_cuda) {
if (out_var_handle) { if (out_var_handle) {
PADDLE_ENFORCE( PADDLE_ENFORCE(
platform::is_same_place(place, out_var_handle->place()), platform::is_same_place(place, out_var_handle->place()),
"The place of input(%s) is not consistent with the " "The place of output(%s) is not consistent with the "
"place of current op(%s).", "place of current op(%s).",
out_var_handle->Name(), Name()); out_var_handle->Name(), Name());
out_var_handle->SetGenerateEvent(events_.at(dev_id)); out_var_handle->SetGenerateEvent(events_.at(dev_id));
......
...@@ -19,11 +19,6 @@ limitations under the License. */ ...@@ -19,11 +19,6 @@ limitations under the License. */
#include <tuple> #include <tuple>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "paddle/fluid/framework/ir/graph_helper.h"
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/details/all_reduce_deps_pass.h"
#include "paddle/fluid/framework/details/async_ssa_graph_executor.h" #include "paddle/fluid/framework/details/async_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h" #include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/details/multi_devices_helper.h"
...@@ -31,6 +26,8 @@ limitations under the License. */ ...@@ -31,6 +26,8 @@ limitations under the License. */
#include "paddle/fluid/framework/details/reference_count_pass_helper.h" #include "paddle/fluid/framework/details/reference_count_pass_helper.h"
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h" #include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h" #include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
#ifdef WITH_GPERFTOOLS #ifdef WITH_GPERFTOOLS
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册