未验证 提交 4cc61441 编写于 作者: C chengduo 提交者: GitHub

[Cherry-pick]Fix the bug of all_reduce_deps_pass (#16648)

* fix the bug of all_reduce_deps_pass
test=release/1.4
上级 d3b62910
......@@ -13,125 +13,186 @@
// limitations under the License.
#include <algorithm>
#include <memory>
#include <map>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "paddle/fluid/framework/details/all_reduce_deps_pass.h"
#include "paddle/fluid/framework/details/all_reduce_op_handle.h"
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/details/op_graph_view.h"
#include "paddle/fluid/framework/details/var_handle.h"
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
#include "paddle/fluid/framework/ir/pass.h"
#include "paddle/fluid/framework/op_proto_maker.h"
namespace paddle {
namespace framework {
namespace details {
VarHandle* GetValidInput(const OpHandleBase* a) {
for (auto p : a->Inputs()) {
VarHandle* b = dynamic_cast<VarHandle*>(p);
if (b) {
return b;
class AllReduceDepsPass : public ir::Pass {
protected:
void ApplyImpl(ir::Graph* graph) const override {
std::vector<AllReduceOpHandle*> all_reduce_op_handles =
GetSortedAllReduceOps(*graph);
for (size_t i = 1; i < all_reduce_op_handles.size(); ++i) {
auto* dep_var = new DummyVarHandle(graph->CreateControlDepVar());
graph->Get<GraphDepVars>(kGraphDepVars).emplace(dep_var);
all_reduce_op_handles[i - 1]->AddOutput(dep_var);
all_reduce_op_handles[i]->AddInput(dep_var);
}
}
return nullptr;
}
void AllReduceDepsPass::ApplyImpl(ir::Graph* graph) const {
auto graph_ops = ir::FilterByNodeWrapper<OpHandleBase>(*graph);
// get vars order
int order = 0;
std::unordered_map<std::string, int> vars;
// TODO(gongwb): use graph topology sort to find the order of operators.
// Note that must assert topology sort is stable
auto& ops = graph->Get<const std::vector<OpDesc*>>(kStaleProgramOpDescs);
for (auto* op_desc : ops) {
try {
bool is_bk_op =
static_cast<bool>(boost::get<int>(op_desc->GetAttr(
OpProtoAndCheckerMaker::OpRoleAttrName())) &
static_cast<int>(OpRole::kBackward));
if (!is_bk_op) continue;
auto backward_vars =
boost::get<std::vector<std::string>>(op_desc->GetNullableAttr(
OpProtoAndCheckerMaker::OpRoleVarAttrName()));
PADDLE_ENFORCE_EQ(backward_vars.size() % 2, 0);
auto outputs = op_desc->Outputs();
for (auto& o_it : outputs) {
for (auto& v : o_it.second) { // values
vars[v] = order;
VLOG(10) << "in all_reduce_deps_pass:" << v;
}
}
order++;
} catch (boost::bad_get e) {
if (VLOG_IS_ON(10)) {
DebugString(*graph, all_reduce_op_handles);
}
}
std::vector<OpHandleBase*> dist_ops;
// get allreduce ops.
for (auto& op : graph_ops) {
// FIXME(gongwb):add broad cast.
if (op->Name() == "all_reduce" || op->Name() == "reduce") {
dist_ops.push_back(op);
std::vector<AllReduceOpHandle*> GetSortedAllReduceOps(
const ir::Graph& graph) const {
std::vector<AllReduceOpHandle*> all_reduce_op_handles;
std::unordered_map<OpHandleBase*, size_t> pending_ops;
std::unordered_set<OpHandleBase*> ready_ops;
std::unordered_set<OpHandleBase*> next_ready_ops;
auto op_handles = ir::FilterByNodeWrapper<OpHandleBase>(graph);
size_t num_of_ops = op_handles.size();
for (OpHandleBase* op : op_handles) {
size_t not_ready_vars = op->NotReadyInputSize();
if (not_ready_vars) {
pending_ops.insert({op, not_ready_vars});
} else {
ready_ops.insert(op);
}
}
}
VLOG(10) << "dist_ops size:" << dist_ops.size()
<< ", outputs size:" << vars.size() << ", ops size:" << ops.size();
std::sort(dist_ops.begin(), dist_ops.end(), [&](OpHandleBase* op1,
OpHandleBase* op2) {
VarHandle* i0 = dynamic_cast<VarHandle*>(GetValidInput(op1));
VarHandle* i1 = dynamic_cast<VarHandle*>(GetValidInput(op2));
PADDLE_ENFORCE(i0 != nullptr && i1 != nullptr, "%s convert to %s error",
op1->DebugString(), op2->DebugString());
auto l_it = vars.find(i0->name());
auto r_it = vars.find(i1->name());
PADDLE_ENFORCE(l_it != vars.end() && r_it != vars.end(),
"can't find var's name %s and %s in opdesc", i0->name(),
i1->name());
if (l_it->second < r_it->second) return true;
GetSortedAllReduceOps(ready_ops, &all_reduce_op_handles);
size_t has_run_ops = ready_ops.size();
while (has_run_ops != num_of_ops) {
for (auto* op : ready_ops) {
for (auto& ready_var : op->Outputs()) {
for (auto* pend_op : ready_var->PendingOps()) {
auto& deps = --pending_ops[pend_op];
if (deps == 0) {
next_ready_ops.insert(pend_op);
}
}
}
}
if (l_it->second == r_it->second) {
return i0->name() < i1->name();
PADDLE_ENFORCE_NE(next_ready_ops.size(), 0, "There maybe have a cycle.");
ready_ops.clear();
std::swap(ready_ops, next_ready_ops);
GetSortedAllReduceOps(ready_ops, &all_reduce_op_handles);
has_run_ops += ready_ops.size();
}
return all_reduce_op_handles;
}
return false;
});
// add dependency.
auto& sorted_ops = dist_ops;
for (size_t i = 1; i < sorted_ops.size(); ++i) {
auto* dep_var = new DummyVarHandle(graph->CreateControlDepVar());
auto* pre_op = sorted_ops[i - 1];
auto* op = sorted_ops[i];
pre_op->AddOutput(dep_var);
op->AddInput(dep_var);
graph->Get<GraphDepVars>(kGraphDepVars).emplace(dep_var);
void GetSortedAllReduceOps(
const std::unordered_set<OpHandleBase*>& ready_ops,
std::vector<AllReduceOpHandle*>* all_reduce_op_handles) const {
std::vector<AllReduceOpHandle*> current_all_reduce_op_handles;
for (auto& op_handle : ready_ops) {
auto all_reduce_op_handle = dynamic_cast<AllReduceOpHandle*>(op_handle);
if (all_reduce_op_handle) {
current_all_reduce_op_handles.emplace_back(all_reduce_op_handle);
}
}
VLOG(10) << "add all_reduce sequential dependencies between " << pre_op
<< " and " << op;
// NOTE(zcd): For distributed training, it is important to keep the order of
// allReduce on each node consistent. Otherwise, hang may occur.
// Sort the current_all_reduce_op_handles according to the name of input.
sort(current_all_reduce_op_handles.begin(),
current_all_reduce_op_handles.end(),
[](const AllReduceOpHandle* left,
const AllReduceOpHandle* right) -> bool {
auto left_in_vars = DynamicCast<VarHandle>(left->Inputs());
auto right_in_vars = DynamicCast<VarHandle>(right->Inputs());
PADDLE_ENFORCE_GT(left_in_vars.size(), 0);
PADDLE_ENFORCE_EQ(left_in_vars.size(), right_in_vars.size());
return left_in_vars[0]->Name() > right_in_vars[0]->Name();
});
all_reduce_op_handles->insert(all_reduce_op_handles->end(),
current_all_reduce_op_handles.begin(),
current_all_reduce_op_handles.end());
}
VLOG(10) << "pre_op:" << pre_op->DebugString()
<< ", op:" << op->DebugString();
void DebugString(
const ir::Graph& graph,
const std::vector<AllReduceOpHandle*>& all_reduce_op_handles) const {
// get vars order
std::map<int, std::vector<std::string>> vars =
GetSoredGradientsFromStaleProgram(graph);
std::stringstream out;
size_t grads_of_stale_program = 0;
out << "Get Order From kStaleProgramOpDescs: ";
for (auto& var : vars) {
out << "Order " << var.first << " [";
for (auto& var_name : var.second) {
out << var_name << ", ";
++grads_of_stale_program;
}
out << "], ";
}
VLOG(10) << out.str();
std::stringstream out2;
out2 << "Get Order From Topological order: ";
for (auto& op : all_reduce_op_handles) {
bool find_valid_input = false;
for (auto& in_var : op->Inputs()) {
if (dynamic_cast<VarHandle*>(in_var)) {
out2 << in_var->Name() << ", ";
find_valid_input = true;
break;
}
}
PADDLE_ENFORCE(find_valid_input, "Doesn't find valid input.");
}
VLOG(10) << out2.str();
if (grads_of_stale_program != all_reduce_op_handles.size()) {
VLOG(10)
<< "The gradients number of stale program and graph is not equal.";
}
}
}
std::map<int, std::vector<std::string>> GetSoredGradientsFromStaleProgram(
const ir::Graph& graph) const {
std::map<int, std::vector<std::string>> vars;
auto ops = graph.Get<const std::vector<OpDesc*>>(kStaleProgramOpDescs);
int order = 0;
for (auto* op_desc : ops) {
try {
bool is_bk_op =
static_cast<bool>(boost::get<int>(op_desc->GetAttr(
OpProtoAndCheckerMaker::OpRoleAttrName())) &
static_cast<int>(OpRole::kBackward));
if (!is_bk_op) continue;
auto backward_vars =
boost::get<std::vector<std::string>>(op_desc->GetNullableAttr(
OpProtoAndCheckerMaker::OpRoleVarAttrName()));
if (backward_vars.empty()) continue;
PADDLE_ENFORCE_EQ(backward_vars.size() % 2, 0);
for (size_t i = 1; i < backward_vars.size(); i += 2) {
vars[order].emplace_back(backward_vars[i]);
VLOG(1) << "get parameter and gradient: " << backward_vars[i - 1]
<< ", " << backward_vars[i];
}
order++;
} catch (boost::bad_get e) {
}
}
return vars;
}
};
} // namespace details
} // namespace framework
} // namespace paddle
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/ir/pass.h"
namespace paddle {
namespace framework {
namespace details {
// TODO(gongwb): overlap allreduce with backward computation.
class AllReduceDepsPass : public ir::Pass {
protected:
void ApplyImpl(ir::Graph* graph) const override;
};
} // namespace details
} // namespace framework
} // namespace paddle
......@@ -28,7 +28,7 @@
// asynchronous nccl allreduce or synchronous issue:
// https://github.com/PaddlePaddle/Paddle/issues/15049
DEFINE_bool(
sync_nccl_allreduce, false,
sync_nccl_allreduce, true,
"If set true, will call `cudaStreamSynchronize(nccl_stream)`"
"after allreduce, this mode can get better performance in some scenarios.");
......
......@@ -163,15 +163,11 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
"graph_printer", new details::GraphvizSSAGraphPrinter);
}
// Verify that the graph is correct for multi-device executor.
AppendPass("multi_devices_check_pass");
if (VLOG_IS_ON(2)) {
AppendPass("all_reduce_deps_pass");
}
if (SeqOnlyAllReduceOps(strategy_)) {
VLOG(10) << "Add all_reduce_deps_pass";
// experimental shows that the program will be faster if append
// all_reduce_deps_pass here.
if (!strategy_.enable_parallel_graph_ &&
(SeqOnlyAllReduceOps(strategy_) ||
strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce)) {
AppendPass("all_reduce_deps_pass");
}
......@@ -179,6 +175,9 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
VLOG(10) << "Add modify_op_lock_and_record_event_pass";
AppendPass("modify_op_lock_and_record_event_pass");
}
// Verify that the graph is correct for multi-device executor.
AppendPass("multi_devices_check_pass");
}
// Convert graph to run on multi-devices.
......
......@@ -68,7 +68,7 @@ void OpHandleBase::Run(bool use_cuda) {
if (out_var_handle) {
PADDLE_ENFORCE(
platform::is_same_place(place, out_var_handle->place()),
"The place of input(%s) is not consistent with the "
"The place of output(%s) is not consistent with the "
"place of current op(%s).",
out_var_handle->Name(), Name());
out_var_handle->SetGenerateEvent(events_.at(dev_id));
......
......@@ -84,7 +84,8 @@ void BatchMergePass::ApplyImpl(ir::Graph* graph) const {
// 1. record op nodes of different roles
for (auto node : nodes) {
if (node->IsVar()) continue;
if (!node->IsOp()) continue;
PADDLE_ENFORCE(node->Op(), "must find opdesc");
int op_role = boost::get<int>(node->Op()->GetAttr(
framework::OpProtoAndCheckerMaker::OpRoleAttrName()));
if ((op_role == static_cast<int>(framework::OpRole::kForward)) ||
......
......@@ -19,17 +19,14 @@ limitations under the License. */
#include <tuple>
#include <utility>
#include <vector>
#include "paddle/fluid/framework/ir/graph_helper.h"
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/details/all_reduce_deps_pass.h"
#include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/reference_count_pass_helper.h"
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
#include "paddle/fluid/platform/profiler.h"
#ifdef WITH_GPERFTOOLS
......
......@@ -139,8 +139,7 @@ class TestDistRunnerBase(object):
pass_builder = None
if args.batch_merge_repeat > 1:
pass_builder = build_stra._finalize_strategy_and_create_passes()
mypass = pass_builder.insert_pass(
len(pass_builder.all_passes()) - 3, "multi_batch_merge_pass")
mypass = pass_builder.insert_pass(0, "multi_batch_merge_pass")
mypass.set("num_repeats", args.batch_merge_repeat)
if args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer":
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册