未验证 提交 e283dc6f 编写于 作者: W WangXi 提交者: GitHub

[cherry-pick 2.0] optimize gradient merge (#30185)

* Optimization grad merge performance (#29784)

* [fleet] combine amp and gradient merge, test=develop (#30086)

* fix assign_op_xpu concat_op_xpu warining (#30120)
Co-authored-by: Nliuyuhui <liuyuhui@baidu.com>
上级 1fa98c5d
......@@ -28,6 +28,8 @@ if(WITH_GPU)
dynload_cuda variable_visitor)
nv_library(fused_all_reduce_op_handle SRCS fused_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory
dynload_cuda variable_visitor place device_memory_aligment)
nv_library(grad_merge_all_reduce_op_handle SRCS grad_merge_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor
ddim memory dynload_cuda variable_visitor place device_memory_aligment all_reduce_op_handle fused_all_reduce_op_handle)
if(WITH_DGC)
nv_library(sparse_all_reduce_op_handle SRCS sparse_all_reduce_op_handle.cc DEPS op_handle_base scope
......@@ -50,6 +52,8 @@ else()
variable_visitor)
cc_library(fused_all_reduce_op_handle SRCS fused_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory
variable_visitor place device_memory_aligment)
cc_library(grad_merge_all_reduce_op_handle SRCS grad_merge_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor
ddim memory variable_visitor place device_memory_aligment all_reduce_op_handle fused_all_reduce_op_handle)
if(WITH_DISTRIBUTE)
cc_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope
ddim selected_rows_functor)
......
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.h"
#include <algorithm>
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/framework/details/reduce_and_gather.h"
#include "paddle/fluid/framework/details/variable_visitor.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/platform/gpu_info.h"
#include "paddle/fluid/platform/profiler.h"
#ifdef PADDLE_WITH_NCCL
DECLARE_bool(sync_nccl_allreduce);
#endif
namespace paddle {
namespace framework {
namespace details {
#if defined(PADDLE_WITH_NCCL)
GradMergeAllReduceOpHandle::GradMergeAllReduceOpHandle(
ir::Node *node, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const std::string &grad_merge_cond_name,
const platform::NCCLCommunicator *ctxs)
: AllReduceOpHandle(node, local_scopes, places, ctxs),
grad_merge_cond_name_(grad_merge_cond_name) {}
#elif defined(PADDLE_WITH_XPU_BKCL)
GradMergeAllReduceOpHandle::GradMergeAllReduceOpHandle(
ir::Node *node, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const std::string &grad_merge_cond_name,
const platform::BKCLCommunicator *ctxs)
: AllReduceOpHandle(node, local_scopes, places, ctxs),
grad_merge_cond_name_(grad_merge_cond_name) {}
#else
GradMergeAllReduceOpHandle::GradMergeAllReduceOpHandle(
ir::Node *node, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const std::string &grad_merge_cond_name)
: AllReduceOpHandle(node, local_scopes, places),
grad_merge_cond_name_(grad_merge_cond_name) {}
#endif
void GradMergeAllReduceOpHandle::RunImpl() {
PADDLE_ENFORCE_GT(local_scopes_.size(), 0,
platform::errors::PreconditionNotMet(
"The number of local scope should be > 0, but got %zu.",
local_scopes_.size()));
auto *local_scope = local_exec_scopes_[0];
auto cond_var = local_scope->FindVar(grad_merge_cond_name_);
PADDLE_ENFORCE_NOT_NULL(
cond_var, platform::errors::NotFound("Variable %s is not found in scope.",
cond_var));
bool cond = *cond_var->Get<LoDTensor>().data<bool>();
if (cond) {
AllReduceOpHandle::RunImpl();
}
}
std::string GradMergeAllReduceOpHandle::Name() const {
return "grad_merge_all_reduce";
}
#if defined(PADDLE_WITH_NCCL)
FusedGradMergeAllReduceOpHandle::FusedGradMergeAllReduceOpHandle(
ir::Node *node, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places, const size_t num_of_all_reduce,
const std::string &grad_merge_cond_name,
const platform::NCCLCommunicator *ctxs)
: FusedAllReduceOpHandle(node, local_scopes, places, num_of_all_reduce,
ctxs),
grad_merge_cond_name_(grad_merge_cond_name) {}
#elif defined(PADDLE_WITH_XPU_BKCL)
FusedGradMergeAllReduceOpHandle::FusedGradMergeAllReduceOpHandle(
ir::Node *node, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places, const size_t num_of_all_reduce,
const std::string &grad_merge_cond_name,
const platform::BKCLCommunicator *ctxs)
: FusedAllReduceOpHandle(node, local_scopes, places, num_of_all_reduce,
ctxs),
grad_merge_cond_name_(grad_merge_cond_name) {}
#else
FusedGradMergeAllReduceOpHandle::FusedGradMergeAllReduceOpHandle(
ir::Node *node, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places, const size_t num_of_all_reduce,
const std::string &grad_merge_cond_name)
: FusedAllReduceOpHandle(node, local_scopes, places, num_of_all_reduce),
grad_merge_cond_name_(grad_merge_cond_name) {}
#endif
void FusedGradMergeAllReduceOpHandle::RunImpl() {
PADDLE_ENFORCE_GT(local_scopes_.size(), 0,
platform::errors::PreconditionNotMet(
"The number of local scope should be > 0, but got %zu.",
local_scopes_.size()));
auto *local_scope = local_exec_scopes_[0];
auto cond_var = local_scope->FindVar(grad_merge_cond_name_);
PADDLE_ENFORCE_NOT_NULL(
cond_var, platform::errors::NotFound("Variable %s is not found in scope.",
cond_var));
bool cond = *cond_var->Get<LoDTensor>().data<bool>();
if (cond) {
VLOG(10) << "run fused grad merge all reduce";
FusedAllReduceOpHandle::RunImpl();
}
}
std::string FusedGradMergeAllReduceOpHandle::Name() const {
return "fused_grad_merge_all_reduce";
}
} // namespace details
} // namespace framework
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include <utility>
#include <vector>
#include "paddle/fluid/framework/details/all_reduce_op_handle.h"
#include "paddle/fluid/framework/details/fused_all_reduce_op_handle.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
namespace paddle {
namespace framework {
namespace ir {
class Node;
} // namespace ir
} // namespace framework
namespace platform {
class NCCLCommunicator;
} // namespace platform
} // namespace paddle
#if defined(PADDLE_WITH_NCCL)
#include "paddle/fluid/framework/details/nccl_op_handle.h"
#include "paddle/fluid/platform/nccl_helper.h"
#endif
namespace paddle {
namespace framework {
namespace details {
class GradMergeAllReduceOpHandle : public AllReduceOpHandle {
public:
#if defined(PADDLE_WITH_NCCL)
GradMergeAllReduceOpHandle(ir::Node *node,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const std::string &grad_merge_cond_name,
const platform::NCCLCommunicator *ctxs);
#elif defined(PADDLE_WITH_XPU_BKCL)
GradMergeAllReduceOpHandle(ir::Node *node,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const std::string &grad_merge_cond_name,
const platform::BKCLCommunicator *ctxs);
#else
GradMergeAllReduceOpHandle(ir::Node *node,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const std::string &grad_merge_cond_name);
#endif
std::string Name() const override;
std::string GradMergeCondName() { return grad_merge_cond_name_; }
protected:
void RunImpl() override;
private:
std::string grad_merge_cond_name_;
};
class FusedGradMergeAllReduceOpHandle : public FusedAllReduceOpHandle {
public:
#if defined(PADDLE_WITH_NCCL)
FusedGradMergeAllReduceOpHandle(ir::Node *node,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const size_t num_of_all_reduce,
const std::string &grad_merge_cond_name,
const platform::NCCLCommunicator *ctxs);
#elif defined(PADDLE_WITH_XPU_BKCL)
FusedGradMergeAllReduceOpHandle(ir::Node *node,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const size_t num_of_all_reduce,
const std::string &grad_merge_cond_name,
const platform::BKCLCommunicator *ctxs);
#else
FusedGradMergeAllReduceOpHandle(ir::Node *node,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const size_t num_of_all_reduce,
const std::string &grad_merge_cond_name);
#endif
std::string Name() const override;
protected:
void RunImpl() override;
private:
std::string grad_merge_cond_name_;
};
} // namespace details
} // namespace framework
} // namespace paddle
......@@ -22,6 +22,7 @@
#include <vector>
#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/var_handle.h"
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/ir/pass.h"
......@@ -62,7 +63,7 @@ constexpr char kUseHierarchicalAllReduce[] = "use_hierarchical_allreduce";
typedef std::unordered_set<VarHandleBase *> GraphDepVars;
constexpr char kGraphDepVars[] = "dep_vars";
typedef std::unordered_set<std::string> FusedVars;
typedef std::unordered_map<std::string, details::VariableInfo> FusedVars;
constexpr char kFusedVars[] = "fused_vars";
constexpr char kFusedVarNamePrefix[] = "@FUSEDVAR@";
......@@ -78,6 +79,7 @@ constexpr char kParamsAndSparseGrads[] = "params_and_sparse_grads";
typedef std::vector<ProgramDesc> ProgramDescs;
constexpr char kProgramDescs[] = "program_descs";
constexpr char kStartupProgramDescs[] = "startup_program_descs";
typedef std::unordered_set<std::string> PinnedVars;
constexpr char kPinnedVars[] = "pinned_vars";
......
......@@ -123,17 +123,27 @@ void ScopeBufferedSSAGraphExecutor::InitVariables() {
}
const ir::Graph &graph = Graph();
if (!is_initialized_) {
// startup_program_descs only need to be executed once
if (graph.Has(details::kStartupProgramDescs)) {
auto &program_descs =
graph.Get<details::ProgramDescs>(details::kStartupProgramDescs);
for (auto &program_desc : program_descs) {
for (auto &op_desc : program_desc.Block(0).AllOps()) {
for (size_t i = 0; i < local_exec_scopes_.size(); ++i) {
auto op = OpRegistry::CreateOp(*op_desc);
op->Run(*local_exec_scopes_[i], places_[i]);
}
}
}
}
is_initialized_ = true;
}
if (graph.Has(details::kProgramDescs)) {
auto &program_descs =
graph.Get<details::ProgramDescs>(details::kProgramDescs);
// Init vars
auto &fused_grad_vars = graph.Get<details::FusedVars>(details::kFusedVars);
for (size_t i = 0; i < local_exec_scopes_.size(); ++i) {
for (auto &var_name : fused_grad_vars) {
auto var = local_exec_scopes_[i]->Var(var_name);
var->GetMutable<LoDTensor>();
}
}
for (auto &program_desc : program_descs) {
for (auto &op_desc : program_desc.Block(0).AllOps()) {
......
......@@ -64,6 +64,7 @@ class ScopeBufferedSSAGraphExecutor : public SSAGraphExecutor {
bool DropScopeOrNot() const;
bool is_initialized_{false};
size_t drop_scope_counter_{0};
ExecutionStrategy strategy_;
std::unique_ptr<SSAGraphExecutor> underlying_executor_;
......
......@@ -199,19 +199,42 @@ class CoalesceGradTensorPass : public ir::Pass {
if (!result->Has(details::kFusedGrads)) {
result->Set(details::kFusedGrads, new details::FusedGrads);
}
if (!result->Has(details::kStartupProgramDescs)) {
result->Set(details::kStartupProgramDescs, new details::ProgramDescs);
}
if (!result->Has(details::kProgramDescs)) {
result->Set(details::kProgramDescs, new details::ProgramDescs);
}
auto type = GetTypeOfVar(vars_info, params_grads.front().second);
bool persistable = false;
for (auto &p_g : params_grads) {
if (IsPersistableVar(vars_info, p_g.second)) {
// NOTE. If one of the grads is persistable, then the fused_grad_var
// should be set to persistable.
persistable = true;
break;
}
}
// the fused_var_name should be unique, so it appends
// params_grads.begin()->second.
auto fused_grad_var_name = std::string(details::kFusedVarNamePrefix) +
"@GRAD@" + params_grads.begin()->second;
// what a pity, visual c++ unsupport {.type_ = type}
details::VariableInfo var_info;
var_info.name_ = fused_grad_var_name;
var_info.type_ = type;
var_info.persistable_ = persistable;
auto &fused_var_set = result->Get<details::FusedVars>(details::kFusedVars);
PADDLE_ENFORCE_EQ(
fused_var_set.count(fused_grad_var_name), 0,
platform::errors::AlreadyExists("Var(%s) is duplicate in FusedVars.",
fused_grad_var_name));
fused_var_set.insert(fused_grad_var_name);
fused_var_set.insert({fused_grad_var_name, var_info});
result->Get<details::FusedGrads>(details::kFusedGrads)
.emplace_back(fused_grad_var_name);
......@@ -414,6 +437,13 @@ class CoalesceGradTensorPass : public ir::Pass {
return var_desc->GetType();
}
bool IsPersistableVar(
const std::unordered_map<std::string, std::vector<ir::Node *>> &vars_info,
const std::string &name) const {
auto var_desc = GetVarDescFromVarsInfo(vars_info, name);
return var_desc->Persistable();
}
private:
bool IsLoDTensorType(const proto::VarType::Type &type) const {
// Current only support LOD_TENSOR.
......@@ -494,18 +524,46 @@ class CoalesceGradTensorPass : public ir::Pass {
DataTypeToString(next_dtype), DataTypeToString(dtype)));
}
result->Get<details::ProgramDescs>(details::kProgramDescs).emplace_back();
ProgramDesc &program_desc =
result->Get<details::ProgramDescs>(details::kProgramDescs).back();
auto *global_block = program_desc.MutableBlock(0);
AppendAllocSpaceForVarsOp(params_name, grads_name, fused_var_name, dtype,
global_block);
bool any_persistable = false;
bool all_persistable = true;
for (auto &p_g : params_grads) {
if (IsPersistableVar(vars_info, p_g.second)) {
any_persistable = true;
} else {
all_persistable = false;
}
}
if (all_persistable) {
// All grads are persistable, only need to be executed once at the
// beginning.
result->Get<details::ProgramDescs>(details::kStartupProgramDescs)
.emplace_back();
ProgramDesc &program_desc =
result->Get<details::ProgramDescs>(details::kStartupProgramDescs)
.back();
auto *global_block = program_desc.MutableBlock(0);
AppendAllocSpaceForVarsOp(params_name, grads_name, fused_var_name, dtype,
all_persistable, global_block);
} else {
// NOTE. In scope_buffered_ssa_graph_executor, after each execution of
// DropScope(), non persistable vars will be Erase or Clear. So
// coalesce_tensor op needs to be executed again after the execution
// of DropScope().
result->Get<details::ProgramDescs>(details::kProgramDescs).emplace_back();
ProgramDesc &program_desc =
result->Get<details::ProgramDescs>(details::kProgramDescs).back();
auto *global_block = program_desc.MutableBlock(0);
AppendAllocSpaceForVarsOp(params_name, grads_name, fused_var_name, dtype,
any_persistable, global_block);
}
}
void AppendAllocSpaceForVarsOp(const std::vector<std::string> &params_name,
const std::vector<std::string> &grads_name,
const std::string &fused_var_name,
const proto::VarType::Type &dtype,
bool persistable,
BlockDesc *global_block) const {
auto op_desc = global_block->AppendOp();
op_desc->SetType("coalesce_tensor");
......@@ -513,6 +571,8 @@ class CoalesceGradTensorPass : public ir::Pass {
op_desc->SetOutput("Output", grads_name);
op_desc->SetOutput("FusedOutput", {fused_var_name});
op_desc->SetAttr("dtype", static_cast<int>(dtype));
op_desc->SetAttr("persist_output", persistable);
}
};
} // namespace ir
......
......@@ -76,6 +76,9 @@ void FuseOptimizerOpPass::ApplyImpl(ir::Graph *graph) const {
result.Set(details::kFusedOptType, new details::FusedOptType);
result.Get<details::FusedOptType>(details::kFusedOptType) = fuse_op_type;
if (!result.Has(details::kStartupProgramDescs)) {
result.Set(details::kStartupProgramDescs, new details::ProgramDescs);
}
if (!result.Has(details::kProgramDescs)) {
result.Set(details::kProgramDescs, new details::ProgramDescs);
}
......@@ -100,7 +103,12 @@ void FuseOptimizerOpPass::ApplyImpl(ir::Graph *graph) const {
fused_var_set.count(fused_var_name), 0,
platform::errors::AlreadyExists(
"The fused variable(%s) already exists.", fused_var_name));
fused_var_set.insert(fused_var_name);
// FIXME(wangxi). update persistable
details::VariableInfo var_info;
var_info.name_ = fused_var_name;
var_info.type_ = proto::VarType::LOD_TENSOR;
var_info.persistable_ = false;
fused_var_set.insert({fused_var_name, var_info});
fused_vars_name.emplace(var_name, fused_var_name);
}
......@@ -151,8 +159,8 @@ void FuseOptimizerOpPass::ApplyImpl(ir::Graph *graph) const {
return;
}
auto &fused_vars = result.Get<details::FusedVars>(details::kFusedVars);
auto iter =
std::find(fused_vars.begin(), fused_vars.end(), fused_grad.front());
auto iter = fused_vars.find(fused_grad.front());
PADDLE_ENFORCE_EQ(
iter != fused_vars.end(), true,
platform::errors::NotFound("Not found the fused gradient variable."));
......
......@@ -4,6 +4,7 @@ cc_library(multi_devices_graph_print_pass SRCS multi_devices_graph_print_pass.cc
cc_library(multi_devices_graph_check_pass SRCS multi_devices_graph_check_pass.cc DEPS multi_devices_helper)
set(ALL_REDUCE_OP_HANDLES all_reduce_op_handle)
set(ALL_REDUCE_OP_HANDLES grad_merge_all_reduce_op_handle)
if(WITH_GPU AND WITH_DGC)
list(APPEND ALL_REDUCE_OP_HANDLES sparse_all_reduce_op_handle)
endif()
......@@ -13,7 +14,7 @@ cc_library(multi_devices_graph_pass SRCS multi_devices_graph_pass.cc DEPS multi_
cc_library(sequential_execution_pass SRCS sequential_execution_pass.cc DEPS graph graph_helper pass)
cc_library(set_reader_device_info_utils SRCS set_reader_device_info_utils.cc DEPS graph graph_helper pass multi_devices_graph_pass)
cc_library(fuse_all_reduce_op_pass SRCS fuse_all_reduce_op_pass.cc DEPS graph graph_helper fused_all_reduce_op_handle)
cc_library(fuse_all_reduce_op_pass SRCS fuse_all_reduce_op_pass.cc DEPS graph graph_helper fused_all_reduce_op_handle grad_merge_all_reduce_op_handle)
cc_library(all_reduce_deps_pass SRCS all_reduce_deps_pass.cc DEPS all_reduce_op_handle graph graph_helper pass)
cc_library(backward_optimizer_op_deps_pass SRCS backward_optimizer_op_deps_pass.cc DEPS graph graph_helper pass)
cc_library(add_reader_dependency_pass SRCS add_reader_dependency_pass.cc DEPS graph graph_helper pass)
......@@ -19,6 +19,7 @@
#include "paddle/fluid/framework/details/all_reduce_op_handle.h"
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/framework/details/fused_all_reduce_op_handle.h"
#include "paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
......@@ -164,6 +165,38 @@ class FuseAllReduceOpPass : public ir::Pass {
const platform::BKCLCommunicator *multi_bkcl_ctxs,
#endif
ir::Graph *result) const {
bool is_grad_merge = false;
std::string grad_merge_cond_name;
for (auto &op : all_reduce_ops) {
auto *grad_merge_all_reduce_op_handle =
dynamic_cast<details::GradMergeAllReduceOpHandle *>(
&op->Wrapper<details::OpHandleBase>());
if (grad_merge_all_reduce_op_handle) {
if (is_grad_merge) {
auto this_grad_merge_cond_name =
grad_merge_all_reduce_op_handle->GradMergeCondName();
PADDLE_ENFORCE_EQ(
grad_merge_cond_name, this_grad_merge_cond_name,
platform::errors::InvalidArgument(
"grad_merge_cond_name is not same in different all_reduce, "
"prev_grad_merge_cond_name is %s, this_grad_merge_cond_name "
"is %s",
grad_merge_cond_name, this_grad_merge_cond_name));
} else {
is_grad_merge = true;
grad_merge_cond_name =
grad_merge_all_reduce_op_handle->GradMergeCondName();
}
} else {
PADDLE_ENFORCE_EQ(is_grad_merge, false,
platform::errors::InvalidArgument(
"if use grad_merge, all of allreduce must be "
"grad_merge_allreduce"));
}
}
VLOG(6) << "fused allreduce use_grad_merge=" << is_grad_merge;
std::vector<details::VarHandleBase *> inputs;
std::vector<details::VarHandleBase *> outputs;
for (auto &op : all_reduce_ops) {
......@@ -189,13 +222,16 @@ class FuseAllReduceOpPass : public ir::Pass {
#if defined(PADDLE_WITH_NCCL)
CreateFusedAllReduceOp(inputs, outputs, num_of_all_reduce, places,
local_scopes, multi_nccl_ctxs, result);
local_scopes, is_grad_merge, grad_merge_cond_name,
multi_nccl_ctxs, result);
#elif defined(PADDLE_WITH_XPU_BKCL)
CreateFusedAllReduceOp(inputs, outputs, num_of_all_reduce, places,
local_scopes, multi_bkcl_ctxs, result);
local_scopes, is_grad_merge, grad_merge_cond_name,
multi_bkcl_ctxs, result);
#else
CreateFusedAllReduceOp(inputs, outputs, num_of_all_reduce, places,
local_scopes, result);
local_scopes, is_grad_merge, grad_merge_cond_name,
result);
#endif
}
......@@ -205,26 +241,52 @@ class FuseAllReduceOpPass : public ir::Pass {
const std::vector<details::VarHandleBase *> &outputs,
const size_t num_of_all_reduce,
const std::vector<platform::Place> &places,
const std::vector<Scope *> &local_scopes,
const std::vector<Scope *> &local_scopes, bool is_grad_merge,
const std::string &grad_merge_cond_name,
#if defined(PADDLE_WITH_NCCL)
const platform::NCCLCommunicator *multi_nccl_ctxs,
#elif defined(PADDLE_WITH_XPU_BKCL)
const platform::BKCLCommunicator *multi_bkcl_ctxs,
#endif
ir::Graph *result) const {
details::FusedAllReduceOpHandle *op_handle = NULL;
if (is_grad_merge) {
#if defined(PADDLE_WITH_NCCL)
op_handle = new details::FusedGradMergeAllReduceOpHandle(
result->CreateEmptyNode("fused_all_reduce",
ir::Node::Type::kOperation),
local_scopes, places, num_of_all_reduce, grad_merge_cond_name,
multi_nccl_ctxs);
#elif defined(PADDLE_WITH_XPU_BKCL)
op_handle = new details::FusedGradMergeAllReduceOpHandle(
result->CreateEmptyNode("fused_all_reduce",
ir::Node::Type::kOperation),
local_scopes, places, num_of_all_reduce, grad_merge_cond_name,
multi_bkcl_ctxs);
#else
op_handle = new details::FusedGradMergeAllReduceOpHandle(
result->CreateEmptyNode("fused_all_reduce",
ir::Node::Type::kOperation),
local_scopes, places, num_of_all_reduce, grad_merge_cond_name);
#endif
} else {
#if defined(PADDLE_WITH_NCCL)
auto *op_handle = new details::FusedAllReduceOpHandle(
result->CreateEmptyNode("fused_all_reduce", ir::Node::Type::kOperation),
local_scopes, places, num_of_all_reduce, multi_nccl_ctxs);
op_handle = new details::FusedAllReduceOpHandle(
result->CreateEmptyNode("fused_all_reduce",
ir::Node::Type::kOperation),
local_scopes, places, num_of_all_reduce, multi_nccl_ctxs);
#elif defined(PADDLE_WITH_XPU_BKCL)
auto *op_handle = new details::FusedAllReduceOpHandle(
result->CreateEmptyNode("fused_all_reduce", ir::Node::Type::kOperation),
local_scopes, places, num_of_all_reduce, multi_bkcl_ctxs);
op_handle = new details::FusedAllReduceOpHandle(
result->CreateEmptyNode("fused_all_reduce",
ir::Node::Type::kOperation),
local_scopes, places, num_of_all_reduce, multi_bkcl_ctxs);
#else
auto *op_handle = new details::FusedAllReduceOpHandle(
result->CreateEmptyNode("fused_all_reduce", ir::Node::Type::kOperation),
local_scopes, places, num_of_all_reduce);
op_handle = new details::FusedAllReduceOpHandle(
result->CreateEmptyNode("fused_all_reduce",
ir::Node::Type::kOperation),
local_scopes, places, num_of_all_reduce);
#endif
}
for (auto in : inputs) {
op_handle->AddInput(in);
......
......@@ -25,6 +25,7 @@
#include "paddle/fluid/framework/details/computation_op_handle.h"
#include "paddle/fluid/framework/details/fetch_barrier_op_handle.h"
#include "paddle/fluid/framework/details/fused_broadcast_op_handle.h"
#include "paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.h"
#include "paddle/fluid/framework/details/reduce_op_handle.h"
#include "paddle/fluid/framework/details/rpc_op_handle.h"
#include "paddle/fluid/framework/details/scale_loss_grad_op_handle.h"
......@@ -255,7 +256,7 @@ void MultiDevSSAGraphBuilderBase::ApplyImpl(ir::Graph *graph) const {
VLOG(10) << "Bcast " << g_name << " for parameter " << p_name
<< " op_type " << node->Op()->Type();
if (NeedCollectiveForGrad(g_name, sorted_ops)) {
InsertCollectiveOp(&result, p_name, g_name);
InsertCollectiveOp(&result, node, p_name, g_name);
}
}
}
......@@ -481,45 +482,77 @@ void MultiDevSSAGraphBuilderBase::CreateComputationalOp(ir::Graph *result,
}
void MultiDevSSAGraphBuilderBase::CreateAllReduceOp(ir::Graph *result,
ir::Node *node,
const std::string &og,
bool is_encoded) const {
details::OpHandleBase *op_handle = nullptr;
const std::string GRAD_MERGE_COND_NAME = "grad_merge_cond_name";
bool is_grad_merge = node->Op()->HasAttr(GRAD_MERGE_COND_NAME);
std::string grad_merge_cond_name;
PADDLE_ENFORCE_EQ((is_encoded && is_grad_merge), false,
platform::errors::InvalidArgument(
"DGC and GradMerge cannot use at same time, while "
"use_dgc=%d, use_grad_merge=%d",
is_encoded, is_grad_merge));
auto append_allreduce_op = [&](
const std::vector<Scope *> &scopes,
const std::vector<platform::Place> &places) -> details::OpHandleBase * {
#if defined(PADDLE_WITH_DGC) && defined(PADDLE_WITH_NCCL)
if (is_encoded) {
#if defined(PADDLE_WITH_DGC) && defined(PADDLE_WITH_NCCL)
result->Get<GraphOps>(kGraphOps).emplace_back(
new details::SparseAllReduceOpHandle(
result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation),
scopes, places, multi_nccl_ctxs_, is_encoded,
strategy_.num_trainers_ * places_.size()));
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"This version of PaddlePaddle does NOT support DGC, "
"but got DGC grad in CreateAllReduceOp. "
"Please compile PaddlePaddle WITH_DGC first."));
#endif
} else if (is_grad_merge) {
grad_merge_cond_name = BOOST_GET_CONST(
std::string, node->Op()->GetAttr(GRAD_MERGE_COND_NAME));
VLOG(10) << "og=" << og << " use grad_merge_allreduce";
#if defined(PADDLE_WITH_NCCL)
result->Get<GraphOps>(kGraphOps).emplace_back(
new details::GradMergeAllReduceOpHandle(
result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation),
scopes, places, grad_merge_cond_name, multi_nccl_ctxs_));
#elif defined(PADDLE_WITH_XPU_BKCL)
result->Get<GraphOps>(kGraphOps).emplace_back(
new details::GradMergeAllReduceOpHandle(
result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation),
scopes, places, grad_merge_cond_name, multi_bkcl_ctxs_));
#else
result->Get<GraphOps>(kGraphOps).emplace_back(
new details::GradMergeAllReduceOpHandle(
result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation),
scopes, places, grad_merge_cond_name));
#endif
} else {
#ifdef PADDLE_WITH_NCCL
result->Get<GraphOps>(kGraphOps).emplace_back(
new details::AllReduceOpHandle(
result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation),
scopes, places, multi_nccl_ctxs_));
}
#elif defined(PADDLE_WITH_NCCL)
result->Get<GraphOps>(kGraphOps).emplace_back(
new details::AllReduceOpHandle(
result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation),
scopes, places, multi_nccl_ctxs_));
#elif defined(PADDLE_WITH_XPU_BKCL)
result->Get<GraphOps>(kGraphOps).emplace_back(
new details::AllReduceOpHandle(
result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation),
scopes, places, multi_bkcl_ctxs_));
result->Get<GraphOps>(kGraphOps).emplace_back(
new details::AllReduceOpHandle(
result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation),
scopes, places, multi_bkcl_ctxs_));
#else
result->Get<GraphOps>(kGraphOps).emplace_back(
new details::AllReduceOpHandle(
result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation),
scopes, places));
result->Get<GraphOps>(kGraphOps).emplace_back(
new details::AllReduceOpHandle(
result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation),
scopes, places));
#endif
}
return result->Get<GraphOps>(kGraphOps).back();
};
details::OpHandleBase *op_handle = nullptr;
if (!strategy_.enable_parallel_graph_)
op_handle = append_allreduce_op(local_scopes_, places_);
......@@ -546,6 +579,36 @@ void MultiDevSSAGraphBuilderBase::CreateAllReduceOp(ir::Graph *result,
op_handle->AddOutput(var);
VLOG(10) << "all_reduce_op_handle add output " << og
<< ", handle:" << var->DebugString();
if (is_grad_merge) {
// NOTE(wangxi). grad_merge_cond_var is used by
// GradMergeAllReduceOpHandle, but it is not the input of
// grad_merge_all_reduce_op_handle. So we must add dep_var to resolve
// WAR data hazard, for grad_merge_all_reduce_op_handle may be
// executed before grad_merge_cond_op.
auto &grad_merge_cond_vars = result->Get<details::GraphVars>(
details::kGraphVars)[i][grad_merge_cond_name];
PADDLE_ENFORCE_EQ(
grad_merge_cond_vars.empty(), false,
platform::errors::InvalidArgument(
"Can not find Var(%s) in Place[%d] "
"Paddle Can not add GradMergeAllReduce OP for Var(%s).",
grad_merge_cond_name, i, og));
auto &grad_merge_cond_var = grad_merge_cond_vars.back();
auto *cond_op = grad_merge_cond_var->GeneratedOp();
PADDLE_ENFORCE_NOT_NULL(
cond_op,
platform::errors::Fatal(
"grad_merge_cond_var(%s)'s generated op handle must not be NULL",
grad_merge_cond_name));
auto *dep_var =
new details::DummyVarHandle(result->CreateControlDepVar());
result->Get<details::GraphDepVars>(details::kGraphDepVars)
.emplace(dep_var);
cond_op->AddOutput(dep_var);
op_handle->AddInput(dep_var);
}
}
}
......@@ -650,16 +713,16 @@ void MultiDevSSAGraphBuilderBase::CreateIsolatedVarNode(
}
void AllReduceSSAGraphBuilder::InsertCollectiveOp(
ir::Graph *result, const std::string &p_name,
ir::Graph *result, ir::Node *node, const std::string &p_name,
const std::string &g_name) const {
if (IsSparseGradient(g_name)) {
CreateReduceOp(result, g_name, 0);
CreateBroadcastOp(result, g_name, 0);
} else {
#if defined(PADDLE_WITH_DGC)
CreateAllReduceOp(result, g_name, IsEncoded(p_name));
CreateAllReduceOp(result, node, g_name, IsEncoded(p_name));
#else
CreateAllReduceOp(result, g_name);
CreateAllReduceOp(result, node, g_name);
#endif
}
}
......@@ -750,7 +813,7 @@ void ReduceSSAGraphBuilder::ResetState() const {
}
void ReduceSSAGraphBuilder::InsertCollectiveOp(
ir::Graph *result, const std::string &p_name,
ir::Graph *result, ir::Node *node, const std::string &p_name,
const std::string &g_name) const {
size_t cur_device_id = GetAppropriateDeviceID({g_name});
CreateReduceOp(result, g_name, cur_device_id);
......@@ -1128,7 +1191,7 @@ bool AllReduceSSAGraphBuilder::IsEncoded(const std::string &p_name) const {
}
#endif
void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result,
void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result, ir::Node *node,
const std::string &p_name,
const std::string &g_name) const {
// collective gradient to each device
......@@ -1144,7 +1207,7 @@ void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result,
CreateReduceOp(result, g_name, 0);
CreateBroadcastOp(result, g_name, 0);
} else {
CreateAllReduceOp(result, g_name);
CreateAllReduceOp(result, node, g_name);
}
break;
default:
......
......@@ -66,7 +66,8 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass {
virtual std::vector<ir::Node *> SortOperations(const ir::Graph &graph) const;
virtual void InsertCollectiveOp(ir::Graph *result, const std::string &p_name,
virtual void InsertCollectiveOp(ir::Graph *result, ir::Node *node,
const std::string &p_name,
const std::string &g_name) const = 0;
virtual bool DealWithSpecialOp(ir::Graph *result, ir::Node *node) const;
......@@ -96,8 +97,8 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass {
bool IsSparseGradient(const std::string &og) const;
void CreateAllReduceOp(ir::Graph *result, const std::string &og,
bool is_encoded = false) const;
void CreateAllReduceOp(ir::Graph *result, ir::Node *node,
const std::string &og, bool is_encoded = false) const;
void CreateBroadcastOp(ir::Graph *result, const std::string &p_name,
size_t src_dev_id) const;
......@@ -134,7 +135,8 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass {
class AllReduceSSAGraphBuilder : public MultiDevSSAGraphBuilderBase {
protected:
virtual void InsertCollectiveOp(ir::Graph *result, const std::string &p_name,
virtual void InsertCollectiveOp(ir::Graph *result, ir::Node *node,
const std::string &p_name,
const std::string &g_name) const;
virtual void InsertPostprocessOps(ir::Graph *result) const {}
......@@ -144,7 +146,8 @@ class AllReduceSSAGraphBuilder : public MultiDevSSAGraphBuilderBase {
class AsyncSSAGraphBuilder : public MultiDevSSAGraphBuilderBase {
protected:
void InsertCollectiveOp(ir::Graph *result, const std::string &p_name,
void InsertCollectiveOp(ir::Graph *result, ir::Node *node,
const std::string &p_name,
const std::string &g_name) const override {}
bool NeedCollectiveForGrad(const std::string &grad_name,
......@@ -183,7 +186,8 @@ class ReduceSSAGraphBuilder : public BalanceVarSSAGraphBuilder {
protected:
virtual void Init() const;
virtual void InsertCollectiveOp(ir::Graph *result, const std::string &p_name,
virtual void InsertCollectiveOp(ir::Graph *result, ir::Node *node,
const std::string &p_name,
const std::string &g_name) const;
virtual bool DealWithSpecialOp(ir::Graph *result, ir::Node *node) const;
......@@ -212,7 +216,8 @@ class DistSSAGraphBuilder : public BalanceVarSSAGraphBuilder {
virtual void InsertPostprocessOps(ir::Graph *result) const;
virtual void InsertCollectiveOp(ir::Graph *result, const std::string &p_name,
virtual void InsertCollectiveOp(ir::Graph *result, ir::Node *node,
const std::string &p_name,
const std::string &g_name) const;
virtual void ResetState() const;
......
......@@ -847,6 +847,17 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
}
}
if (graph->Has(details::kFusedVars)) {
auto &fused_vars = graph->Get<details::FusedVars>(details::kFusedVars);
for (auto &fused_var : fused_vars) {
var_infos.emplace_back();
var_infos.back() = fused_var.second;
member_->is_persistable_.emplace(fused_var.first,
fused_var.second.persistable_);
}
}
std::unordered_map<Scope *, Scope *> scope_map;
for (auto *scope : member_->local_scopes_) {
auto &local_exec_scope = scope->NewScope();
......
......@@ -63,7 +63,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
platform::errors::InvalidArgument(
"The output variable %s of CoalesceTensor operator "
"is not LoDTensor.",
in_var_names[i]));
out_var_names[i]));
}
auto in_tensors = context.MultiInput<framework::LoDTensor>("Input");
......@@ -122,6 +122,22 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
math::SetConstant<DeviceContext, T> set_constant;
set_constant(dev_ctx, fused_tensor,
static_cast<T>(context.Attr<float>("constant")));
} else if (context.Attr<bool>("persist_output")) {
for (size_t i = 0; i < out_var_names.size(); ++i) {
size_t len = static_cast<size_t>(out_tensors[i]->numel());
auto sub_tensor = fused_tensor->Slice(
static_cast<int64_t>(offset), static_cast<int64_t>(offset + len));
// some var may not persistable, or persistable var may not init
if (out_tensors[i]->IsInitialized()) {
framework::TensorCopy(*out_tensors[i], context.GetPlace(), dev_ctx,
&sub_tensor);
}
offset +=
use_align
? platform::Alignment(len * size_of_dtype, context.GetPlace()) /
size_of_dtype
: len;
}
}
// Make the outputs point to the continuous space.
......@@ -224,6 +240,9 @@ class CoalesceTensorOpMaker : public framework::OpProtoAndCheckerMaker {
AddAttr<bool>("set_constant",
"Whether to set the Output with a constant value.")
.SetDefault(false);
AddAttr<bool>("persist_output",
"Whether to persist the original Output value.")
.SetDefault(false);
AddAttr<float>("constant",
"If set_constant is true, the constant value will be used "
"to set the Output.")
......@@ -249,7 +268,8 @@ Note that, the dtype of Input should be the same, and the dim of Input
and Output should equal.
The tensors of Input and Output could be the same or different. And
coalesce_tensor allows copying the value of Input to Output, or
setting the Output with a constant value.
setting the Output with a constant value, or persist the original Output
value.
)DOC");
}
......
......@@ -36,11 +36,16 @@ class ConcatXPUKernel : public framework::OpKernel<T> {
"XPU donot surpport AxisTensor for now"));
axis = ComputeAxis(static_cast<int64_t>(axis),
static_cast<int64_t>(ins[0]->dims().size()));
PADDLE_ENFORCE_GE(
axis, 0, platform::errors::InvalidArgument("concat: axis shoud >= 0!"));
PADDLE_ENFORCE_GE(axis, 0, platform::errors::InvalidArgument(
"concat: axis should be larger than or "
"equal to 0, but received axis is %d.",
axis));
PADDLE_ENFORCE_LT(axis, ins[0]->dims().size(),
platform::errors::InvalidArgument(
"concat: axis shoud < ins[0]->dims()!"));
"concat: axis should be less than ins[0]->dims()!"
"But received axis is %d, while ins[0]->dims()"
"size is %d.",
axis, ins[0]->dims().size()));
auto place = ctx.GetPlace();
out->mutable_data<T>(place);
......@@ -151,10 +156,16 @@ class ConcatGradXPUKernel : public framework::OpKernel<T> {
}
}
PADDLE_ENFORCE_GE(axis, 0, platform::errors::InvalidArgument(
"concat_grad: axis shoud >= 0!"));
PADDLE_ENFORCE_LT(axis, out_grad->dims().size(),
platform::errors::InvalidArgument(
"concat_grad: axis shoud < ins[0]->dims()!"));
"concat_grad: axis should be larger than or "
"equal to 0, but received axis is %d.",
axis));
PADDLE_ENFORCE_LT(
axis, out_grad->dims().size(),
platform::errors::InvalidArgument(
"concat_grad: axis should be less than ins[0]->dims()!"
"But received axis is %d, while ins[0]->dims()"
"size is %d.",
axis, out_grad->dims().size()));
auto input_dims = ins[0]->dims();
std::vector<int> split_list(n);
......
......@@ -25,7 +25,6 @@ class AMPOptimizer(MetaOptimizerBase):
"LarsOptimizer",
"LambOptimizer",
"RecomputeOptimizer",
"GradientMergeOptimizer",
"GraphExecutionOptimizer",
]
self.meta_optimizers_black_list = ["DGCOptimizer"]
......
......@@ -21,6 +21,7 @@ class GradientMergeOptimizer(MetaOptimizerBase):
self.inner_opt = optimizer
self.wrapped_opt = None
self.meta_optimizers_white_list = [
"AMPOptimizer",
"LarsOptimizer",
"LambOptimizer",
"GraphExecutionOptimizer",
......
......@@ -159,9 +159,6 @@ class OptimizerWithMixedPrecision(object):
params_grads = self._optimizer.backward(
self._scaled_loss, startup_program, parameter_list, no_grad_set,
callbacks)
# Change the op_role_var attr for some ops, so that gradients
# transferred across GPUs can be FP16.
update_role_var_grad(train_program, params_grads)
return params_grads
def apply_gradients(self, params_grads):
......@@ -176,6 +173,10 @@ class OptimizerWithMixedPrecision(object):
A list of optimize operators.
"""
# Change the op_role_var attr for some ops, so that gradients
# transferred across GPUs can be FP16.
update_role_var_grad(self._train_program, params_grads)
grads = [g for _, g in params_grads]
if not self._is_distributed:
with self._train_program._optimized_guard(grads):
......
......@@ -2188,8 +2188,11 @@ class ConditionalBlock(object):
def need_append_conditional_block_grad(self, inside_block):
grad_sub_block_idx = inside_block.backward_block_idx
inside_block_idx = inside_block.idx
return grad_sub_block_idx != -1
# if inside_block have grad_block and grad_block is not itself,
# we will append conditional block grad.
return grad_sub_block_idx != -1 and grad_sub_block_idx != inside_block_idx
def append_conditional_block_grad(self, parent_block, inside_block,
conditional_block_op):
......
......@@ -5056,6 +5056,8 @@ class GradientMergeOptimizer(object):
print("step=%d, cost=%f" % (i, cost_val[0]))
"""
GRAD_MERGE_COND_NAME = "grad_merge_cond_name"
def __init__(self, inner_optimizer, k_steps=1, avg=True):
if framework.in_dygraph_mode():
raise Exception(
......@@ -5071,6 +5073,7 @@ class GradientMergeOptimizer(object):
self.k_steps = k_steps
self.type = "gradient_merge"
self.avg = avg
self._optimize_ops = None
def _set_k_steps(self, k_steps):
self.k_steps = k_steps
......@@ -5078,12 +5081,12 @@ class GradientMergeOptimizer(object):
def _set_avg(self, avg):
self.avg = avg
def minimize(self,
def backward(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
no_grad_set=None,
callbacks=None):
assert isinstance(loss, Variable), "The loss should be an Variable."
assert (
parameter_list is None
......@@ -5094,26 +5097,142 @@ class GradientMergeOptimizer(object):
params_grads = self.inner_optimizer.backward(
loss, startup_program=startup_program)
return params_grads
def apply_optimize(self, loss, startup_program, params_grads):
program = loss.block.program
with program_guard(program, startup_program):
optimize_ops = self.apply_gradients(params_grads)
return optimize_ops
def _is_the_backward_op(self, op):
op_maker = core.op_proto_and_checker_maker
backward = core.op_proto_and_checker_maker.OpRole.Backward
if op_maker.kOpRoleVarAttrName() in op.attr_names and \
int(op.all_attrs()[op_maker.kOpRoleAttrName()]) == int(backward):
return True
return False
def _remove_op_role_var(self, param, grad):
op_maker = core.op_proto_and_checker_maker
op = grad.op
assert self._is_the_backward_op(op), \
'grad.op={} is not the backward op which produces the grad={}' \
.format(op, grad.name)
block = grad.block
var_attr = op.all_attrs()[op_maker.kOpRoleVarAttrName()]
assert param.name in var_attr, \
'when using GradientMergeOptimizer, param={} must be in var_attr={}' \
.format(param.name, var_attr)
assert grad.name in var_attr, \
'when using GradientMergeOptimizer, grad={} must be in var_attr={}' \
.format(param.name, var_attr)
# remove (param, grad) from op_role_var
var_attr.remove(param.name)
var_attr.remove(grad.name)
if len(var_attr) > 1:
op._set_attr(op_maker.kOpRoleVarAttrName(), var_attr)
else:
op._remove_attr(op_maker.kOpRoleVarAttrName())
def _add_gm_op_role_var(self, op, param, grad, cond):
grad.op = op
op_maker = core.op_proto_and_checker_maker
backward = op_maker.OpRole.Backward
# NOTE(wangxi). When distributed, we will insert grad_merge_all_reduce_op_handle
# in multi_devices_graph_pass, which will allreduce(grad) if cond is True, else
# do nothing.
# In this way, the gradient can be merged first, and then communicate when the
# condition is met, reducing the number of communications to increase the
# speed.
op._set_attr(self.GRAD_MERGE_COND_NAME, cond.name)
op._set_attr(op_maker.kOpRoleAttrName(), backward)
op._set_attr(op_maker.kOpRoleVarAttrName(), [param.name, grad.name])
def _get_gm_cond_var(self, main_block):
# Add const var
k_step_var = layers.create_global_var(
name="gradient_merge_k",
shape=[1],
value=int(self.k_steps),
dtype='int32',
persistable=True,
force_cpu=True)
zero_var = layers.create_global_var(
name="gradient_merge_zero",
shape=[1],
value=int(0),
dtype='int32',
persistable=True,
force_cpu=True)
# Add step var & cond var
step_var = layers.create_global_var(
name="gradient_merge_step",
shape=[1],
value=int(0),
dtype='int32',
persistable=True,
force_cpu=True)
cond_var = layers.create_global_var(
name="gradient_merge_cond",
shape=[1],
value=bool(0),
dtype='bool',
persistable=True,
force_cpu=True)
with device_guard("cpu"):
# step_var = (step_var + 1) % k_step
layers.increment(x=step_var, value=1.0, in_place=True)
main_block.append_op(
type='elementwise_mod',
inputs={'X': step_var,
'Y': k_step_var},
outputs={'Out': step_var},
attrs={'axis': -1,
'use_mkldnn': False})
# cond_var = (step_var == 0)
main_block.append_op(
type='equal',
inputs={'X': step_var,
'Y': zero_var},
outputs={'Out': cond_var})
return cond_var
def apply_gradients(self, params_grads):
main_program = default_main_program()
startup_program = default_startup_program()
main_block = main_program.global_block()
startup_block = startup_program.global_block()
cond = self._get_gm_cond_var(main_block)
#TODO(mapingshuo) support sparse embedding
for k, v in params_grads:
# step1: remove grad.op's op_role_var
for param, grad in params_grads:
assert (
v.type != core.VarDesc.VarType.SELECTED_ROWS
param.type != core.VarDesc.VarType.SELECTED_ROWS
), "SELECTED_ROWS is not supported in GradientMergeOptimizer for now"
param_to_grad = {k.name: v for (k, v) in params_grads}
# Get startup_program and main_program
if startup_program is None:
startup_program = default_startup_program()
main_block = loss.block
self._remove_op_role_var(param, grad)
# add some vars to the main_program and startup_program
startup_block = startup_program.global_block()
param_to_grad = {k.name: v for (k, v) in params_grads}
param_names = param_to_grad.keys()
param_to_gradient_merge = {}
for param_name in param_names:
new_params_grads = []
# step2: create gradient_merge var and init with 0
# and update op_role_var
for param, grad in params_grads:
param_name = param.name
param_var = main_block.var(param_name)
assert (param_var is not None)
gradient_merge_var = main_block.create_var(
......@@ -5122,6 +5241,7 @@ class GradientMergeOptimizer(object):
dtype=param_var.dtype,
persistable=True)
param_to_gradient_merge[param_name] = gradient_merge_var
startup_gradient_merge_var = startup_block.create_var(
name=param_name + "@GRAD@GradientMerge",
shape=param_var.shape,
......@@ -5136,92 +5256,75 @@ class GradientMergeOptimizer(object):
"value": float(0),
})
with framework.program_guard(main_block.program, startup_program):
# Add Var k to main prog and startup prog
gradient_merge_k = layers.create_global_var(
name="gradient_merge_k",
shape=[1],
value=int(self.k_steps),
dtype='int32',
persistable=True)
# grad_merge += grad
new_grad_op = main_block.append_op(
type="elementwise_add",
inputs={'X': grad,
'Y': gradient_merge_var},
outputs={'Out': gradient_merge_var},
attrs={'axis': -1,
'use_mkldnn': False})
self._add_gm_op_role_var(new_grad_op, param, gradient_merge_var,
cond)
new_params_grads.append([param, gradient_merge_var])
def true_apply_gradient():
cur_block_idx = main_program.current_block_idx
cur_block = main_program.current_block()
# cur_block's forward_block & backward_block is itself
cur_block._set_forward_block_idx(cur_block_idx)
if self.avg:
for param, new_grad in new_params_grads:
# grad /= k_steps
cur_block.append_op(
type='scale',
inputs={'X': new_grad},
outputs={'Out': new_grad},
attrs={
'scale': 1.0 / self.k_steps,
'bias': 0.0,
'bias_after_scale': False
})
# Add Var step
gradient_merge_step = layers.create_global_var(
name="gradient_merge_step",
shape=[1],
value=int(0),
dtype='int32',
persistable=True)
layers.increment(x=gradient_merge_step, value=1.0, in_place=True)
for param, new_grad in new_params_grads:
# NOTE. regularization will append ops to grad.block,
# while new_grad's real block is global_block,
# but we want append regularization ops to cur_block,
# so we set new_grad.block = cur_block
new_grad.block = cur_block
# gradient merge
zero_var = layers.fill_constant(
shape=[1], dtype='float32', value=0.0)
one_var = layers.fill_constant(
shape=[1], dtype='float32', value=1.0)
self._optimize_ops = self.inner_optimizer.apply_gradients(
new_params_grads)
mod = layers.elementwise_mod(gradient_merge_step, gradient_merge_k)
with layers.control_flow.Switch() as switch:
with switch.case(mod != zero_var):
# 1. update the gradient_merge_vars
# gradient_merge_vars += gradient_vars
cur_block = main_block.program.current_block()
for param_name in param_names:
grad = param_to_grad[param_name]
grad_merge = param_to_gradient_merge[param_name]
cur_block.append_op(
type="elementwise_add",
inputs={'X': grad,
'Y': grad_merge},
outputs={'Out': grad_merge},
attrs={'axis': -1,
'use_mkldnn': False})
# clear gradient_merge_vars
for param, new_grad in new_params_grads:
layers.fill_constant(
shape=new_grad.shape,
dtype=new_grad.dtype,
value=0.0,
out=new_grad)
# step3. apply gradient
layers.cond(cond, true_fn=true_apply_gradient, false_fn=None)
return self._optimize_ops
def minimize(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
assert isinstance(loss, Variable), "The loss should be an Variable."
params_grads = self.backward(
loss,
startup_program=startup_program,
parameter_list=parameter_list,
no_grad_set=no_grad_set)
optimize_ops = self.apply_optimize(
loss, startup_program=startup_program, params_grads=params_grads)
with switch.default():
# 1. update the graient_vars
# gradient_vars += gradient_merge_vars
cur_block_idx = main_block.program.current_block_idx
cur_block = main_block.program.current_block()
for param_name in param_names:
grad = param_to_grad[param_name]
grad_merge = param_to_gradient_merge[param_name]
if self.avg:
tmp_var = layers.elementwise_add(grad, grad_merge)
cur_block.append_op(
type='scale',
inputs={'X': tmp_var},
outputs={'Out': grad},
attrs={
'scale': 1.0 / self.k_steps,
'bias': 0.0,
'bias_after_scale': False
})
else:
cur_block.append_op(
type="elementwise_add",
inputs={'X': grad,
'Y': grad_merge},
outputs={'Out': grad},
attrs={'axis': -1,
'use_mkldnn': False})
# 2. apply_optimize
target_grad_block = main_block.program._create_block(
parent_idx=cur_block.parent_idx)
target_grad_block._set_forward_block_idx(cur_block_idx)
main_block.program.current_block_idx = cur_block_idx
optimize_ops = self.inner_optimizer.apply_optimize(
loss,
startup_program=startup_program,
params_grads=params_grads)
# 3. clear gradient_merge_vars
for param_name in param_names:
grad_merge = param_to_gradient_merge[param_name]
layers.fill_constant(
shape=grad_merge.shape,
dtype=grad_merge.dtype,
value=0.0,
out=grad_merge)
return optimize_ops, params_grads
......@@ -664,6 +664,7 @@ if (WITH_DISTRIBUTE AND NOT APPLE)
if(WITH_GPU)
set_tests_properties(test_c_comm_init_op PROPERTIES TIMEOUT 120)
set_tests_properties(test_fleet_checkpoint PROPERTIES TIMEOUT 120)
set_tests_properties(test_dist_mnist_gradient_merge PROPERTIES TIMEOUT 120)
endif()
endif()
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import paddle
import paddle.fluid as fluid
from test_dist_base import TestDistRunnerBase, runtime_main
from dist_mnist import cnn_model
DTYPE = "float32"
paddle.dataset.mnist.fetch()
# Fix seed for test
fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1
class TestDistMnist2x2(TestDistRunnerBase):
def get_model(self, batch_size=2):
# Input data
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE)
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
# Train program
predict = cnn_model(images)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
# Evaluator
batch_size_tensor = fluid.layers.create_tensor(dtype='int64')
batch_acc = fluid.layers.accuracy(
input=predict, label=label, total=batch_size_tensor)
inference_program = fluid.default_main_program().clone()
# Optimization
opt = fluid.optimizer.MomentumOptimizer(
learning_rate=0.001, momentum=0.9)
opt = fluid.optimizer.GradientMergeOptimizer(opt, 2)
# Reader
train_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=batch_size)
test_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=batch_size)
opt.minimize(avg_cost)
return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict
if __name__ == "__main__":
runtime_main(TestDistMnist2x2)
......@@ -15,6 +15,7 @@
from __future__ import print_function
import time
import ast
import unittest
import os
import sys
......@@ -373,6 +374,10 @@ class TestDistRunnerBase(object):
build_stra.enable_inplace = False
build_stra.memory_optimize = False
if args.fuse_all_reduce is not None:
sys.stderr.write('fuse_all_reduce={}'.format(args.fuse_all_reduce))
build_stra.fuse_all_reduce_ops = args.fuse_all_reduce
if args.hogwild:
build_stra.async_mode = True
......@@ -620,6 +625,11 @@ def runtime_main(test_class):
type=bool,
default=False)
parser.add_argument('--sync_batch_norm', action='store_true')
parser.add_argument(
'--fuse_all_reduce',
required=False,
type=ast.literal_eval,
default=None)
args = parser.parse_args()
......@@ -688,6 +698,7 @@ class TestDistBase(unittest.TestCase):
self._ut4grad_allreduce = False
self._use_hallreduce = False
self._save_model = False
self._fuse_all_reduce = None
self._setup_config()
global DIST_UT_PORT
......@@ -971,6 +982,9 @@ class TestDistBase(unittest.TestCase):
if self._enable_backward_deps:
tr_cmd += " --enable_backward_deps"
if self._fuse_all_reduce is not None:
tr_cmd += " --fuse_all_reduce {}".format(self._fuse_all_reduce)
if self._gpu_fleet_api:
tr_cmd += " --gpu_fleet_api"
if self._use_local_sgd:
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import unittest
from test_dist_base import TestDistBase
flag_name = os.path.splitext(__file__)[0]
class TestDistMnistGradMerge(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._use_reduce = False
self._nccl2_mode = True
def test_dist_train(self):
import paddle.fluid as fluid
if fluid.core.is_compiled_with_cuda():
self.check_with_place(
"dist_mnist_gradient_merge.py",
delta=1e-5,
check_error_log=True,
log_name=flag_name)
class TestDistMnistGradMergeNoFuse(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._use_reduce = False
self._nccl2_mode = True
self._fuse_all_reduce = False
def test_dist_train(self):
import paddle.fluid as fluid
if fluid.core.is_compiled_with_cuda():
self.check_with_place(
"dist_mnist_gradient_merge.py",
delta=1e-5,
check_error_log=True,
log_name=flag_name + "_no_fuse")
if __name__ == "__main__":
unittest.main()
......@@ -46,6 +46,19 @@ class TestFleetGradientMergeMetaOptimizer(TestFleetMetaOptimizer):
self.assertIn('@GradientMerge', ''.join(vars))
self.assertIn('subprog', ''.join(vars))
def test_gm_amp_optimizer(self):
train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program(
)
avg_cost, strategy = self.net(train_prog, startup_prog)
self.set_strategy(strategy, 'gradient_merge')
self.set_strategy(strategy, 'amp')
self.optimizer(avg_cost, strategy, train_prog, startup_prog)
print(train_prog)
vars = [x.name for x in train_prog.list_vars()]
self.assertIn('@GradientMerge', ''.join(vars))
self.assertIn('cast', ''.join(vars))
if __name__ == "__main__":
unittest.main()
......@@ -23,7 +23,9 @@ import paddle.fluid.core as core
import paddle.compat as cpt
import numpy as np
from paddle.fluid.backward import append_backward
from paddle.fluid.framework import Program, program_guard
from paddle.fluid.framework import Program, program_guard, convert_np_dtype_to_dtype_
import paddle
paddle.enable_static()
class TestOptimizer(unittest.TestCase):
......@@ -1010,37 +1012,34 @@ class TestGradientMergeOptimizer(unittest.TestCase):
with framework.program_guard(main_program, init_program):
ops, params_grads = opt.minimize(cost)
self.assertEqual(main_program.num_blocks, 4)
self.assertEqual(main_program.num_blocks, 2)
# main block
self.assertEqual(len(cost.block.ops), 17)
self.assertEqual([op.type for op in cost.block.ops], [
'mul', 'elementwise_add', 'mean', 'fill_constant', 'mean_grad',
'elementwise_add_grad', 'mul_grad', 'increment', 'fill_constant',
'fill_constant', 'elementwise_mod', 'cast', 'not_equal',
'logical_not', 'conditional_block', 'conditional_block',
'conditional_block_grad'
])
self.assertEqual(len(cost.block.ops), 13)
self.assertEqual(
[op.type for op in cost.block.ops],
[
'mul',
'elementwise_add',
'mean',
'fill_constant',
'mean_grad',
'elementwise_add_grad',
'mul_grad',
'increment', # step += 1
'elementwise_mod', # step %= k_steps
'equal', # cond_var == (step == 0)
'elementwise_add',
'elementwise_add',
'conditional_block',
])
# merge block
self.assertEqual(len(main_program.block(1).ops), 2)
# optimize block
self.assertEqual(len(main_program.block(1).ops), 6)
self.assertEqual([op.type for op in main_program.block(1).ops], [
'elementwise_add',
'elementwise_add',
'scale', 'scale', 'sgd', 'sgd', 'fill_constant', 'fill_constant'
])
# reset block
self.assertEqual(len(main_program.block(2).ops), 6)
self.assertEqual([op.type for op in main_program.block(2).ops], [
'elementwise_add', 'scale', 'elementwise_add', 'scale',
'fill_constant', 'fill_constant'
])
# optimize block
self.assertEqual(len(main_program.block(3).ops), 2)
self.assertEqual([op.type for op in main_program.block(3).ops],
['sgd', 'sgd'])
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册