未验证 提交 fbbdc9cc 编写于 作者: G gongweibao 提交者: GitHub

Add backward and optimizer operator dependency pass. (#17746)

上级 4cb7d32c
......@@ -93,6 +93,6 @@ cc_library(build_strategy SRCS build_strategy.cc DEPS
fuse_elewise_add_act_pass multi_batch_merge_pass
fuse_relu_depthwise_conv_pass
memory_optimize_pass lock_free_optimize_pass
alloc_continuous_space_for_grad_pass fuse_all_reduce_op_pass
alloc_continuous_space_for_grad_pass fuse_all_reduce_op_pass backward_optimizer_op_deps_pass
fuse_adam_op_pass fuse_sgd_op_pass fuse_momentum_op_pass
record_skip_memory_opt_vars_pass)
......@@ -134,6 +134,7 @@ void AllReduceOpHandle::RunImpl() {
static_cast<ncclDataType_t>(dtype), ncclSum);
});
}
VLOG(10) << "allreduce size:" << numel * SizeOfType(lod_tensors[0]->type());
RunAllReduceFuncs(all_reduce_calls);
#else
PADDLE_THROW("Not compiled with CUDA");
......
......@@ -49,6 +49,7 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
: ir::PassBuilder(), strategy_(strategy) {
// Add a graph viz pass to record a graph.
if (!strategy_.debug_graphviz_path_.empty()) {
VLOG(1) << "Add graph_viz_pass";
auto viz_pass = AppendPass("graph_viz_pass");
const std::string graph_path = string::Sprintf(
"%s%s", strategy_.debug_graphviz_path_.c_str(), "_original_graph");
......@@ -56,11 +57,12 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
}
// Note(zcd): record_skip_memory_opt_vars_pass should be the first pass.
VLOG(1) << "Add record_skip_memory_opt_vars_pass";
AppendPass("record_skip_memory_opt_vars_pass");
#ifdef PADDLE_WITH_MKLDNN
if (FLAGS_use_mkldnn) {
VLOG(5) << "Add mkldnn_placement_pass";
VLOG(1) << "Add mkldnn_placement_pass";
AppendPass("mkldnn_placement_pass");
} else if (!strategy_.mkldnn_enabled_op_types_.empty()) {
LOG(WARNING)
......@@ -75,7 +77,7 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
"Please compile with MKLDNN first to use MKLDNN");
#endif
if (strategy_.enable_sequential_execution_) {
VLOG(5) << "Add sequential_execution_pass";
VLOG(1) << "Add sequential_execution_pass";
AppendPass("sequential_execution_pass");
}
......@@ -86,7 +88,7 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
// Add op fusion.
if (strategy.fuse_relu_depthwise_conv_) {
VLOG(5) << "Add fuse_relu_depthwise_conv_pass";
VLOG(1) << "Add fuse_relu_depthwise_conv_pass";
AppendPass("fuse_relu_depthwise_conv_pass");
}
......@@ -98,19 +100,19 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
// Add automatically inplace.
if (strategy_.enable_inplace_) {
VLOG(5) << "Add inplace_pass";
VLOG(1) << "Add inplace_pass";
AppendPass("inplace_pass");
}
if (strategy_.fuse_elewise_add_act_ops_) {
VLOG(5) << "Add fuse_elewise_add_act_pass";
VLOG(1) << "Add fuse_elewise_add_act_pass";
AppendPass("fuse_elewise_add_act_pass");
}
// for single card training, fuse_all_reduce_ops is unnecessary.
// alloc_continuous_space_for_grad_pass should be before of MultiDevPass.
if (strategy_.fuse_all_reduce_ops_) {
VLOG(5) << "Add alloc_continuous_space_for_grad_pass";
VLOG(1) << "Add alloc_continuous_space_for_grad_pass";
AppendPass("alloc_continuous_space_for_grad_pass");
}
......@@ -125,11 +127,11 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
// NOTE: fuse_all_xx_ops will count the number of xx operator first,
// if the number is zero, fuse_all_reduce_ops will do nothing.
// Currently, only one type of optimization algorithm can be fused.
VLOG(5) << "Add fuse_adam_op_pass";
VLOG(1) << "Add fuse_adam_op_pass";
AppendPass("fuse_adam_op_pass");
VLOG(5) << "Add fuse_sgd_op_pass";
VLOG(1) << "Add fuse_sgd_op_pass";
AppendPass("fuse_sgd_op_pass");
VLOG(5) << "Add fuse_momentum_op_pass";
VLOG(1) << "Add fuse_momentum_op_pass";
AppendPass("fuse_momentum_op_pass");
}
}
......@@ -159,7 +161,7 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
// A side-effect of that, memory optimize cannot forsee the fetched vars
// , so fetchlist should be set persistable before call the Run interface.
if (strategy_.memory_optimize_) {
VLOG(5) << "Add memory_optimize_pass";
VLOG(1) << "Add memory_optimize_pass";
AppendPass("memory_optimize_pass");
}
......@@ -167,7 +169,7 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
// all original and fused operators. But no operators can be enabled this
// attr if putting it after MultiDevPass.
if (strategy_.cache_runtime_context_) {
VLOG(5) << "Add runtime_context_cache_pass";
VLOG(1) << "Add runtime_context_cache_pass";
AppendPass("runtime_context_cache_pass");
}
......@@ -176,12 +178,13 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
if (strategy_.fuse_all_reduce_ops_) {
// NOTE: fuse_all_reduce_ops will count the number of all_reduce operator
// first, if the number is zero, fuse_all_reduce_ops will do nothing.
VLOG(5) << "Add fuse_all_reduce_op_pass";
VLOG(1) << "Add fuse_all_reduce_op_pass";
AppendPass("fuse_all_reduce_op_pass");
}
// Add a graph print pass to record a graph with device info.
if (!strategy_.debug_graphviz_path_.empty()) {
VLOG(1) << "Add multi_devices_print_pass";
auto multi_devices_print_pass = AppendPass("multi_devices_print_pass");
const std::string graph_path =
string::Sprintf("%s%s", strategy_.debug_graphviz_path_.c_str(),
......@@ -197,16 +200,22 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
if (!strategy_.enable_parallel_graph_ &&
(SeqOnlyAllReduceOps(strategy_) ||
strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce)) {
VLOG(5) << "Add all_reduce_deps_pass";
VLOG(1) << "Add all_reduce_deps_pass";
AppendPass("all_reduce_deps_pass");
}
if (strategy_.enable_backward_optimizer_op_deps_) {
VLOG(1) << "Add backward_op_deps_pass";
AppendPass("backward_optimizer_op_deps_pass");
}
if (strategy_.remove_unnecessary_lock_) {
VLOG(5) << "Add modify_op_lock_and_record_event_pass";
VLOG(1) << "Add modify_op_lock_and_record_event_pass";
AppendPass("modify_op_lock_and_record_event_pass");
}
// Verify that the graph is correct for multi-device executor.
VLOG(1) << "Add multi_devices_check_pass";
AppendPass("multi_devices_check_pass");
}
......@@ -215,18 +224,19 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
ir::Pass *multi_devices_pass = nullptr;
if (strategy_.async_mode_) {
VLOG(1) << "Add async_multi_devices_pass";
multi_devices_pass = AppendPass("async_multi_devices_pass").get();
} else if (strategy_.is_distribution_) {
VLOG(5)
VLOG(1)
<< "Add dist_multi_devices_pass, multi device parameter server mode";
multi_devices_pass = AppendPass("dist_multi_devices_pass").get();
} else {
if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) {
VLOG(5) << "Add all_reduce_mode_multi_devices_pass";
VLOG(1) << "Add all_reduce_mode_multi_devices_pass";
multi_devices_pass =
AppendPass("all_reduce_mode_multi_devices_pass").get();
} else if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kReduce) {
VLOG(5) << "Add reduce_mode_multi_devices_pass";
VLOG(1) << "Add reduce_mode_multi_devices_pass";
multi_devices_pass = AppendPass("reduce_mode_multi_devices_pass").get();
} else {
PADDLE_THROW("Unknown reduce strategy.");
......@@ -365,6 +375,7 @@ USE_PASS(multi_devices_print_pass);
USE_PASS(memory_optimize_pass);
USE_PASS(sequential_execution_pass);
USE_PASS(all_reduce_deps_pass);
USE_PASS(backward_optimizer_op_deps_pass);
USE_PASS(modify_op_lock_and_record_event_pass);
USE_PASS(inplace_pass);
USE_PASS(lock_free_optimize_pass);
......
......@@ -80,6 +80,8 @@ struct BuildStrategy {
bool fuse_all_reduce_ops_{false};
bool enable_backward_optimizer_op_deps_{false};
bool fuse_relu_depthwise_conv_{false};
bool sync_batch_norm_{false};
......
......@@ -166,6 +166,8 @@ void FusedAllReduceOpHandle::RunImpl() {
});
}
VLOG(10) << "fusedallreduce size:" << numel * SizeOfType(dtype);
this->RunAndRecordEvent([&] {
if (all_reduce_calls.size() == 1UL) {
// Do not use NCCLGroup when manage NCCL by per thread per device
......
......@@ -20,7 +20,7 @@ namespace framework {
namespace details {
std::string OpHandleBase::DebugString() const {
std::stringstream ss;
ss << "(";
ss << Name() << "(";
for (auto *var : inputs_) {
ss << var->DebugString() << ", ";
}
......
......@@ -23,15 +23,16 @@
#include "paddle/fluid/framework/ir/graph_helper.h"
#include "paddle/fluid/framework/op_registry.h"
DEFINE_uint64(fuse_parameter_memory_size, 0, // 0 KB
DEFINE_uint64(fuse_parameter_memory_size, 0, // Bytes
"fuse_parameter_memory_size is up limited memory size "
"of one group parameters' gradient which is the input "
"of communication calling(e.g NCCLAllReduce). "
"The default value is 0, it means that "
"not set group according to memory_size.");
DEFINE_int32(
fuse_parameter_groups_size, 3,
"fuse_parameter_groups_size is the size of one group parameters' gradient. "
fuse_parameter_groups_size, 1,
"fuse_parameter_groups_size is the up limited size of one group "
"parameters' gradient. "
"The default value is a experimental result. If the "
"fuse_parameter_groups_size is 1, it means that the groups size is "
"the number of parameters' gradient. If the fuse_parameter_groups_size is "
......@@ -58,7 +59,6 @@ uint64_t GetFuseParameterMemorySize() {
return FLAGS_fuse_parameter_memory_size;
}
static const char kUnKnow[] = "@UNKNOW@";
static framework::proto::VarType::Type kDefaultDtype =
framework::proto::VarType::Type::VarType_Type_BOOL;
......@@ -83,7 +83,7 @@ class AllocContinuousSpaceForGradPass : public ir::Pass {
}
if (params_grads.size() == 0) {
VLOG(10) << "Doesn't find gradients";
LOG(WARNING) << "Doesn't find gradients";
return;
}
......@@ -169,7 +169,6 @@ class AllocContinuousSpaceForGradPass : public ir::Pass {
details::GroupGradsAndParams *group_grads_params) const {
SetGroupAccordingToLayers(var_nodes, params_grads, group_grads_params);
SetGroupAccordingToMemorySize(var_nodes, group_grads_params);
SetGroupAccordingToGroupSize(var_nodes, group_grads_params);
}
void SetGroupAccordingToLayers(
......@@ -181,7 +180,7 @@ class AllocContinuousSpaceForGradPass : public ir::Pass {
for (size_t i = 0; i < params_grads.size(); ++i) {
auto pos = params_grads[i].first.find_first_of(".");
if (pos == std::string::npos) {
layer_params[std::string(kUnKnow)].emplace_back(i);
layer_params[params_grads[i].first].emplace_back(i);
} else {
layer_params[params_grads[i].first.substr(0, pos)].emplace_back(i);
}
......@@ -190,7 +189,7 @@ class AllocContinuousSpaceForGradPass : public ir::Pass {
group_grads_params->reserve(layer_params.size());
for (size_t i = 0; i < params_grads.size(); ++i) {
auto pos = params_grads[i].first.find_first_of(".");
std::string key = kUnKnow;
std::string key = params_grads[i].first;
if (pos != std::string::npos) {
key = params_grads[i].first.substr(0, pos);
}
......@@ -207,13 +206,31 @@ class AllocContinuousSpaceForGradPass : public ir::Pass {
}
VLOG(10) << "SetGroupAccordingToLayers: ";
if (VLOG_IS_ON(10)) {
PrintGroupInfo(var_nodes, group_grads_params);
}
}
void PrintGroupInfo(
const std::unordered_map<std::string, ir::Node *> &var_nodes,
details::GroupGradsAndParams *group_grads_params) const {
for (size_t i = 0; i < group_grads_params->size(); ++i) {
VLOG(10) << "group " << i;
std::stringstream out;
for (auto &p_g : group_grads_params->at(i)) {
out << "(" << p_g.second << ", " << p_g.first << "), ";
size_t gps_size = 0;
for (auto &g_p : group_grads_params->at(i)) {
auto iter = var_nodes.find(g_p.second);
PADDLE_ENFORCE(iter != var_nodes.end(), "%s is not found.", g_p.second);
auto shape = iter->second->Var()->GetShape();
size_t size = framework::SizeOfType(iter->second->Var()->GetDataType());
std::for_each(shape.begin(), shape.end(),
[&size](const int64_t &n) { size *= n; });
gps_size += size;
out << string::Sprintf("(%s(%d), %s)", g_p.second, size, g_p.first);
}
VLOG(10) << out.str();
VLOG(10) << out.str()
<< ", group size:" << group_grads_params->at(i).size()
<< ", group memory size:" << gps_size;
}
}
......@@ -248,6 +265,12 @@ class AllocContinuousSpaceForGradPass : public ir::Pass {
group_p_g.insert(group_p_g.end(), group_grads_params->at(j).begin(),
group_grads_params->at(j).end());
++j;
if (GetFuseParameterGroupsSize() > 1 &&
group_p_g.size() >
static_cast<size_t>(GetFuseParameterGroupsSize())) {
break;
}
if (local_group_memory_size >= group_memory_size) {
break;
}
......@@ -258,59 +281,9 @@ class AllocContinuousSpaceForGradPass : public ir::Pass {
VLOG(10) << string::Sprintf(
"SetGroupAccordingToMemorySize(memory_size: %d):", group_memory_size);
for (size_t i = 0; i < group_grads_params->size(); ++i) {
VLOG(10) << "group " << i;
std::stringstream out;
for (auto &g_p : group_grads_params->at(i)) {
auto iter = var_nodes.find(g_p.second);
PADDLE_ENFORCE(iter != var_nodes.end(), "%s is not found.", g_p.second);
auto shape = iter->second->Var()->GetShape();
size_t size = framework::SizeOfType(iter->second->Var()->GetDataType());
std::for_each(shape.begin(), shape.end(),
[&size](const int64_t &n) { size *= n; });
out << string::Sprintf("(%s(%d), %s)", g_p.second, size, g_p.first);
}
VLOG(10) << out.str();
}
}
void SetGroupAccordingToGroupSize(
const std::unordered_map<std::string, ir::Node *> &var_nodes,
details::GroupGradsAndParams *group_grads_params) const {
if (GetFuseParameterGroupsSize() == 1) {
return;
}
const int group_size = GetFuseParameterGroupsSize() == -1
? static_cast<int>(group_grads_params->size())
: GetFuseParameterGroupsSize();
PADDLE_ENFORCE_GT(group_size, 1);
size_t groups = (group_grads_params->size() + group_size - 1) / group_size;
details::GroupGradsAndParams local_group_grads_params;
local_group_grads_params.reserve(groups);
size_t j = 0;
for (size_t i = 0; i < groups; ++i) {
local_group_grads_params.emplace_back();
auto &group_p_g = local_group_grads_params.back();
group_p_g.reserve(group_size);
while (j < group_grads_params->size()) {
group_p_g.insert(group_p_g.end(), group_grads_params->at(j).begin(),
group_grads_params->at(j).end());
++j;
if (j % group_size == 0) break;
}
}
std::swap(*group_grads_params, local_group_grads_params);
VLOG(10) << string::Sprintf("SetGroupAccordingToGroupSize(group_size: %d):",
group_size);
for (size_t i = 0; i < group_grads_params->size(); ++i) {
VLOG(10) << "group " << i;
std::stringstream out;
for (auto &p_g : group_grads_params->at(i)) {
out << "(" << p_g.second << ", " << p_g.first << "), ";
}
VLOG(10) << out.str();
if (VLOG_IS_ON(10)) {
PrintGroupInfo(var_nodes, group_grads_params);
}
}
......
......@@ -134,6 +134,7 @@ void Graph::ResolveHazard(
ir::Node *dep_var = CreateControlDepVar();
write_op->inputs.push_back(dep_var);
upstream_op->outputs.push_back(dep_var);
VLOG(10) << "add dep_var:" << dep_var->Name();
dep_var->outputs.push_back(write_op);
dep_var->inputs.push_back(upstream_op);
}
......@@ -157,6 +158,7 @@ void Graph::ResolveHazard(
if (has_dep) continue;
ir::Node *dep_var = CreateControlDepVar();
VLOG(10) << "add dep_var:" << dep_var->Name();
read_op->outputs.push_back(dep_var);
dep_var->inputs.push_back(read_op);
write_op->inputs.push_back(dep_var);
......
......@@ -14,3 +14,4 @@ cc_library(sequential_execution_pass SRCS sequential_execution_pass.cc DEPS grap
cc_library(fuse_all_reduce_op_pass SRCS fuse_all_reduce_op_pass.cc DEPS graph graph_helper fused_all_reduce_op_handle)
cc_library(all_reduce_deps_pass SRCS all_reduce_deps_pass.cc DEPS all_reduce_op_handle graph graph_helper pass)
cc_library(backward_optimizer_op_deps_pass SRCS backward_optimizer_op_deps_pass.cc DEPS graph graph_helper pass)
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <algorithm>
#include <map>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
#include "paddle/fluid/framework/ir/pass.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_proto_maker.h"
#include "paddle/fluid/framework/scope.h"
namespace paddle {
namespace framework {
namespace ir {
class BackWardOpDepsPass : public ir::Pass {
protected:
void AddDep(ir::Graph* graph, details::OpHandleBase* l,
details::OpHandleBase* r) const {
auto* dep_var = new details::DummyVarHandle(graph->CreateControlDepVar());
graph->Get<details::GraphDepVars>(details::kGraphDepVars).emplace(dep_var);
l->AddOutput(dep_var);
r->AddInput(dep_var);
VLOG(10) << "add deps:" << l->DebugString() << " and " << r->DebugString();
}
void ApplyImpl(ir::Graph* graph) const override {
// NOTE: The operator nodes should be in topology order.
std::vector<details::OpHandleBase*> backward_op_handles;
std::vector<details::OpHandleBase*> all_opt_handles;
details::ParamsAndGrads params_grads;
std::vector<ir::Node*> topo_nodes = ir::TopologySortOperations(*graph);
for (auto& node : topo_nodes) {
if (!node->Op()) continue;
GetBackWardOpHandles(node, &backward_op_handles, &params_grads);
GetOptimizerOpHandles(node, &all_opt_handles);
}
VLOG(10) << "backward_op_handles size:" << backward_op_handles.size()
<< ", opt_handles size:" << all_opt_handles.size();
if (backward_op_handles.size() <= 1 || all_opt_handles.size() <= 1) {
VLOG(10) << "need not backward_op_deps_pass";
return;
}
std::vector<details::OpHandleBase*> opt_handles;
GetOptimizerHandlesRoot(all_opt_handles, &opt_handles, params_grads);
if (opt_handles.size() <= 1) {
VLOG(10) << "need not backward_op_deps_pass";
return;
}
VLOG(10) << "add optimize deps";
for (size_t i = 1; i < opt_handles.size(); ++i) {
AddDep(graph, opt_handles[i - 1], opt_handles[i]);
}
VLOG(10) << "add deps between backward and optimze:";
AddDep(graph, backward_op_handles[backward_op_handles.size() - 1],
opt_handles[0]);
}
/*
* When the backward ophandles complete, the optimizer ophandle's inputs var
* are ready.Since the optimizer ophandles can be seen as graphs which each of
* them doesn't connect to each other, they can run parallelly or by a
* specified order, such as by the grads generated order. This function will
* get these graphs' root.
*/
void GetOptimizerHandlesRoot(
const std::vector<details::OpHandleBase*>& ops,
std::vector<details::OpHandleBase*>* result,
const details::ParamsAndGrads& params_grads) const {
std::unordered_set<details::OpHandleBase*> visit;
for (auto op : ops) {
if (visit.find(op) != visit.end()) {
continue;
}
VLOG(10) << "visiting all_opt_handles:" << op->DebugString();
result->emplace_back(op);
visit.insert(op);
VisitChildrens(op, &visit);
}
for (size_t i = 0; i < result->size(); i++) {
VLOG(10) << "get potential head op:" << (*result)[i]->DebugString();
}
// sort by param_grad order
std::unordered_map<std::string, int> pg_order;
int order = 0;
for (auto& p_g : params_grads) {
pg_order[p_g.second] = order++;
}
std::vector<std::pair<details::OpHandleBase*, int>> op_handles;
for (auto op : *result) {
int order = 0;
for (auto input : op->Inputs()) {
if (dynamic_cast<details::VarHandle*>(input) == nullptr) continue;
if (pg_order.find(input->Name()) == pg_order.end()) {
VLOG(10) << "not find input " << input->Name() << " in grad";
continue;
}
if (order < pg_order.at(input->Name())) {
order = pg_order.at(input->Name());
}
}
op_handles.emplace_back(std::make_pair(op, order));
}
sort(op_handles.begin(), op_handles.end(),
[](const std::pair<details::OpHandleBase*, int>& left,
const std::pair<details::OpHandleBase*, int>& right) -> bool {
return left.second < right.second;
});
result->clear();
for (auto p : op_handles) {
result->emplace_back(p.first);
}
for (size_t i = 0; i < result->size(); i++) {
VLOG(10) << "get head op:" << (*result)[i]->DebugString();
}
}
void VisitChildrens(details::OpHandleBase* op,
std::unordered_set<details::OpHandleBase*>* visit) const {
for (auto out : op->Outputs()) {
for (auto* pending_op : out->PendingOps()) {
if (visit->find(pending_op) != visit->end()) {
continue;
}
VLOG(10) << "visiting:" << pending_op->DebugString();
visit->insert(pending_op);
VisitChildrens(pending_op, visit);
}
}
}
void GetBackWardOpHandles(
ir::Node* node, std::vector<details::OpHandleBase*>* backward_op_handles,
details::ParamsAndGrads* params_grads) const {
try {
bool is_bk_op =
static_cast<bool>(boost::get<int>(node->Op()->GetAttr(
OpProtoAndCheckerMaker::OpRoleAttrName())) &
static_cast<int>(OpRole::kBackward));
if (!is_bk_op) return;
// Currently, we assume that once gradient is generated, it can be
// broadcast, and each gradient is only broadcast once.
auto backward_vars =
boost::get<std::vector<std::string>>(node->Op()->GetNullableAttr(
OpProtoAndCheckerMaker::OpRoleVarAttrName()));
PADDLE_ENFORCE_EQ(backward_vars.size() % 2, static_cast<size_t>(0));
PADDLE_ENFORCE(node->IsWrappedBy<details::OpHandleBase>());
backward_op_handles->emplace_back(
&node->Wrapper<details::OpHandleBase>());
for (size_t i = 0; i < backward_vars.size(); i += 2) {
VLOG(10) << "Trainable parameter: " << backward_vars[i]
<< ", gradient: " << backward_vars[i + 1];
params_grads->emplace_back(std::make_pair(
backward_vars[i] /*param*/, backward_vars[i + 1] /*grad*/));
}
} catch (boost::bad_get e) {
}
}
void GetOptimizerOpHandles(
ir::Node* node, std::vector<details::OpHandleBase*>* opt_handles) const {
try {
bool is_opt_op =
static_cast<bool>(boost::get<int>(node->Op()->GetAttr(
OpProtoAndCheckerMaker::OpRoleAttrName())) &
static_cast<int>(OpRole::kOptimize));
if (!is_opt_op) return;
opt_handles->emplace_back(&node->Wrapper<details::OpHandleBase>());
} catch (boost::bad_get e) {
}
}
};
} // namespace ir
} // namespace framework
} // namespace paddle
REGISTER_PASS(backward_optimizer_op_deps_pass,
paddle::framework::ir::BackWardOpDepsPass);
......@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <sstream>
#include <vector>
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
......@@ -96,6 +97,8 @@ class AllocContinuousSpaceKernel : public framework::OpKernel<T> {
// Make the outputs point to the continuous space.
offset = 0;
std::stringstream ss;
ss << "alloc_space_for_vars: ";
for (size_t i = 0; i < out_tensors.size(); ++i) {
size_t len = static_cast<size_t>(out_tensors[i]->numel());
auto dim = out_tensors[i]->dims();
......@@ -105,10 +108,10 @@ class AllocContinuousSpaceKernel : public framework::OpKernel<T> {
.Resize(dim);
len = Alignment(len * size_of_dtype, context.GetPlace()) / size_of_dtype;
offset += len;
VLOG(10) << "alloc_space_for_vars: output(" << out_var_names[i]
<< ") ,dim:(" << dim << ")"
<< " Address: " << out_tensors[i]->data<void>();
ss << "output(" << out_var_names[i] << ") dim:(" << dim << ")"
<< " address: " << out_tensors[i]->data<void>() << ", ";
}
VLOG(10) << ss.str();
}
private:
......@@ -133,6 +136,9 @@ class AllocContinuousSpaceKernel : public framework::OpKernel<T> {
PADDLE_ENFORCE_EQ(lod_tensors.size(), var_names.size());
*numel = 0;
size_t size_of_dtype = 0;
std::stringstream ss;
ss << "alloc_space_for_vars: ";
for (size_t i = 0; i < var_names.size(); ++i) {
PADDLE_ENFORCE(lod_tensors[i]->IsInitialized(), "%s is not initialized.",
var_names[i]);
......@@ -148,11 +154,13 @@ class AllocContinuousSpaceKernel : public framework::OpKernel<T> {
auto size = lod_tensors[i]->numel();
PADDLE_ENFORCE_GT(size, 0);
VLOG(10) << "alloc_space_for_vars: input(" << var_names[i] << ") ,dim:("
<< lod_tensors[i]->dims() << ")";
ss << "input(" << var_names[i] << ") dim:(" << lod_tensors[i]->dims()
<< "), ";
*numel += Alignment(static_cast<size_t>(size) * size_of_dtype, place) /
size_of_dtype;
}
VLOG(10) << ss.str();
}
};
......
......@@ -1509,6 +1509,13 @@ All parameter, weight, gradient are variables in Paddle.
"fuse_all_reduce_ops",
[](const BuildStrategy &self) { return self.fuse_all_reduce_ops_; },
[](BuildStrategy &self, bool b) { self.fuse_all_reduce_ops_ = b; })
.def_property("enable_backward_optimizer_op_deps",
[](const BuildStrategy &self) {
return self.enable_backward_optimizer_op_deps_;
},
[](BuildStrategy &self, bool b) {
self.enable_backward_optimizer_op_deps_ = b;
})
.def_property(
"cache_runtime_context",
[](const BuildStrategy &self) { return self.cache_runtime_context_; },
......
......@@ -180,7 +180,7 @@ if(WITH_DISTRIBUTE)
endif()
if(NOT APPLE)
set_tests_properties(test_dist_mnist PROPERTIES TIMEOUT 200)
set_tests_properties(test_dist_mnist_nccl PROPERTIES TIMEOUT 200)
set_tests_properties(test_dist_mnist_nccl PROPERTIES TIMEOUT 250)
set_tests_properties(test_dist_mnist_lars PROPERTIES TIMEOUT 200)
set_tests_properties(test_dist_word2vec PROPERTIES TIMEOUT 200)
py_test_modules(test_dist_se_resnext MODULES test_dist_se_resnext)
......
......@@ -140,6 +140,9 @@ class TestDistRunnerBase(object):
build_stra.enable_inplace = False
build_stra.memory_optimize = False
if args.enable_backward_deps:
build_stra.enable_backward_optimizer_op_deps = True
if args.use_reduce:
build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
else:
......@@ -274,6 +277,8 @@ def runtime_main(test_class):
parser.add_argument('--trainer_id', type=int, required=False, default=0)
parser.add_argument('--trainers', type=int, required=False, default=1)
parser.add_argument('--nccl_comm_num', type=int, required=False, default=1)
parser.add_argument(
'--enable_backward_deps', type=bool, required=False, default=1)
parser.add_argument(
'--current_endpoint', type=str, required=False, default="")
parser.add_argument('--sync_mode', action='store_true')
......@@ -354,6 +359,7 @@ class TestDistBase(unittest.TestCase):
self._nccl_comm_num = 1
self._setup_config()
self._after_setup_config()
self._enable_backward_deps = False
def _find_free_port(self):
def __free_port():
......@@ -606,6 +612,10 @@ class TestDistBase(unittest.TestCase):
env0 = {"FLAGS_selected_gpus": "0"}
env1 = {"FLAGS_selected_gpus": "1"}
if self._enable_backward_deps:
tr0_cmd += " --enable_backward_deps 1"
tr1_cmd += " --enable_backward_deps 1"
env0.update(envs)
env1.update(envs)
......
......@@ -58,5 +58,19 @@ class TestDistMnistNCCL2DGC(TestDistBase):
self.check_with_place("dist_mnist.py", delta=1e-5)
class TestDistMnistNCCL2BackWardDeps(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._use_reduce = False
self._use_reader_alloc = False
self._nccl2_mode = True
self._enable_backward_deps = True
def test_dist_train(self):
import paddle.fluid as fluid
if fluid.core.is_compiled_with_cuda():
self.check_with_place("dist_mnist.py", delta=1e-5)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册