提交 baef1124 编写于 作者: Y Yu Yang

ParallelExecutor And dependency engine

上级 8f061e43
......@@ -13,7 +13,343 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/framework/parallel_executor.h"
#include "lod_tensor.h"
#include "op_registry.h"
namespace paddle {
namespace framework {} // namespace framework
namespace framework {
struct OpHandle;
struct VarHandle {
size_t version_;
std::string name_;
platform::Place place_;
OpHandle *generated_op_;
std::vector<OpHandle *> deps_ops_;
};
struct OpHandle {
std::vector<VarHandle *> inputs_;
std::vector<VarHandle *> outputs_;
platform::DeviceContext *dev_ctx_;
std::string DebugString() {
std::stringstream ss;
ss << "(";
for (auto *var : inputs_) {
ss << var->name_ << ":" << var->place_ << ", ";
}
ss << ") --> (";
for (auto *var : outputs_) {
ss << var->name_ << ":" << var->place_ << ", ";
}
ss << ")\n";
return ss.str();
}
virtual ~OpHandle() {}
};
struct ComputationOpHandle : public OpHandle {
std::unique_ptr<OperatorBase> op_;
explicit ComputationOpHandle(const OpDesc &op_desc)
: op_(framework::OpRegistry::CreateOp(op_desc)) {}
};
struct ScaleLossGradOpHandle : public OpHandle {};
struct NCCLAllReduceOpHandle : public OpHandle {};
class ParallelExecutorPrivate {
public:
std::unordered_map<platform::Place, Scope *, platform::PlaceHash>
local_scopes_;
std::unordered_map<platform::Place, platform::CUDADeviceContext,
platform::PlaceHash>
dev_ctxs_;
platform::Place main_place_;
std::unordered_map<platform::Place,
std::unordered_map<std::string, std::map<int, VarHandle>>,
platform::PlaceHash>
vars_;
std::vector<std::unique_ptr<OpHandle>> ops_;
};
// TODO(yy): Move this function somewhere
ncclDataType_t ToNCCLDataType(std::type_index type) {
// FIXME!!
return ncclFloat;
}
ParallelExecutor::ParallelExecutor(
const std::vector<platform::Place> &places,
const std::unordered_set<std::string> &params,
const ProgramDesc &startup_program, const ProgramDesc &main_program,
const std::string &loss_var_name, Scope *scope)
: member_(new ParallelExecutorPrivate()) {
// Step 1. RunStartupProgram and Bcast the params to devs.
Executor exe(places[0]);
exe.Run(startup_program, scope, 0);
// Create local scopes
for (auto &place : places) {
member_->local_scopes_[place] = &scope->NewScope();
}
member_->main_place_ = places[0];
// Bcast Parameters to all GPUs
if (platform::is_gpu_place(member_->main_place_)) { // Is CUDA
// BCastParamsToGPUs(startup_program);
}
// Startup Program has been run. All local scopes has correct parameters.
// Step 2. Convert main_program to SSA form and dependency graph. Also, insert
// ncclOp
ConstructDependencyGraph(params, main_program, loss_var_name);
}
void ParallelExecutor::ConstructDependencyGraph(
const std::unordered_set<std::string> &params,
const ProgramDesc &main_program, const std::string &loss_var_name) const {
std::unordered_set<std::__cxx11::string> grads;
for (auto &each_param : params) {
grads.insert(each_param + "@GRAD");
}
bool is_forwarding = true;
for (auto *op : main_program.Block(0).AllOps()) {
bool change_forward = false;
if (!is_forwarding) {
// FIXME(yy): Do not hard code like this
if (op->OutputArgumentNames().size() == 1 &&
op->OutputArgumentNames()[0] == loss_var_name + "@GRAD") {
continue; // Drop fill 1. for backward coeff;
}
}
for (auto &pair : member_->local_scopes_) {
member_->ops_.emplace_back(new ComputationOpHandle(*op));
auto *op_handle = member_->ops_.back().get();
auto var_names = op->InputArgumentNames();
for (auto &each_var_name : var_names) {
auto &place = pair.first;
VarHandle *var = GetVarHandle(each_var_name, place);
op_handle->inputs_.emplace_back(var);
var->deps_ops_.emplace_back(op_handle);
}
var_names = op->OutputArgumentNames();
for (auto &each_var_name : var_names) {
auto &place = pair.first;
GenerateVar(op_handle, each_var_name, place);
}
if (is_forwarding) {
if (var_names.size() == 1 && var_names[0] == loss_var_name) {
// Insert ScaleCost OpHandle
member_->ops_.emplace_back(new ScaleLossGradOpHandle());
op_handle = member_->ops_.back().get();
auto &place = pair.first;
VarHandle *loss = GetVarHandle(loss_var_name, place);
loss->deps_ops_.emplace_back(op_handle);
op_handle->inputs_.emplace_back(loss);
GenerateVar(op_handle, loss_var_name + "@GRAD", place);
change_forward = true;
LOG(INFO) << "Scale Loss " << op_handle->DebugString();
}
}
}
if (change_forward) {
is_forwarding = false;
}
if (!is_forwarding) {
auto var_names = op->OutputArgumentNames();
for (auto &og : var_names) {
if (grads.count(og) != 0) { // is param grad
// Insert NCCL AllReduce Op
member_->ops_.emplace_back(new NCCLAllReduceOpHandle());
auto *op_handle = member_->ops_.back().get();
for (auto &pair : member_->local_scopes_) {
auto &place = pair.first;
auto &vars = member_->vars_[place][og];
if (vars.empty()) { // This device has no data. continue.
continue;
}
auto *prev_grad = &vars[vars.size() - 1];
op_handle->inputs_.emplace_back(prev_grad);
prev_grad->deps_ops_.emplace_back(op_handle);
auto &var = vars[vars.size()];
var.place_ = place;
var.generated_op_ = op_handle;
var.name_ = og;
var.version_ = vars.size() - 1;
op_handle->outputs_.emplace_back(&var);
}
}
}
}
}
}
void ParallelExecutor::GenerateVar(OpHandle *op_handle,
const std::string &each_var_name,
const platform::Place &place) const {
auto &vars = member_->vars_[place][each_var_name];
size_t version = vars.size();
auto &var = vars[version];
var.version_ = version;
var.generated_op_ = op_handle;
var.name_ = each_var_name;
var.place_ = place;
op_handle->outputs_.emplace_back(&var);
}
VarHandle *ParallelExecutor::GetVarHandle(const std::string &each_var_name,
const platform::Place &place) const {
auto &var_holders = member_->vars_[place];
auto &var_holder = var_holders[each_var_name];
VarHandle *var = nullptr;
if (var_holder.empty()) {
auto &init_var = var_holder[0];
init_var.place_ = place;
init_var.name_ = each_var_name;
init_var.generated_op_ = nullptr;
init_var.version_ = 0;
var = &init_var;
} else {
var = &var_holder.rbegin()->second;
}
return var;
}
void ParallelExecutor::BCastParamsToGPUs(
const ProgramDesc &startup_program) const {
auto *main_scope = member_->local_scopes_[member_->main_place_];
for (auto *var_desc : startup_program.Block(0).AllVars()) {
if (var_desc->GetType() == proto::VarType::LOD_TENSOR) {
auto &main_tensor =
main_scope->FindVar(var_desc->Name())->Get<LoDTensor>();
ncclDataType_t data_type = ToNCCLDataType(main_tensor.type());
auto &dims = main_tensor.dims();
size_t numel = main_tensor.numel();
std::vector<std::pair<void *, const platform::DeviceContext *>> mems;
mems.emplace_back(
const_cast<void *>(main_tensor.data<void>()),
new platform::CUDADeviceContext(
boost::get<platform::CUDAPlace>(member_->main_place_)));
for (auto &pair : member_->local_scopes_) {
if (pair.first == member_->main_place_) {
continue;
}
auto local_scope = pair.second;
auto *t = local_scope->Var(var_desc->Name())->GetMutable<LoDTensor>();
t->Resize(dims);
mems.emplace_back(t->mutable_data(pair.first, main_tensor.type()),
new platform::CUDADeviceContext(
boost::get<platform::CUDAPlace>(pair.first)));
}
// TODO(yy): Invoke ncclBCast here. mems, numel, data_type. The mems[0]
// is the src, rests are dests.
(void)(data_type);
(void)(numel);
// Free Communication Ctx
for (auto &pair : mems) {
// Release Communication Ctx
// FIXME: Store CUDA DevCtx to member. Since NCCL All Reduce will use
// this
delete pair.second;
}
}
}
}
std::vector<LoDTensor> ParallelExecutor::Run(
const std::vector<std::string> &fetch_tensors) {
// Version --> VarHandle
std::unordered_set<VarHandle *> pending_vars;
std::unordered_map<OpHandle *, size_t> pending_ops;
for (auto &place_pair : member_->vars_) {
for (auto &name_pair : place_pair.second) {
for (auto &version_pair : name_pair.second) {
pending_vars.insert(&version_pair.second);
}
}
}
for (auto &op : member_->ops_) {
pending_ops.insert({op.get(), op->inputs_.size()});
}
std::unordered_set<OpHandle *> complete_op;
size_t num_op = pending_ops.size();
while (complete_op.size() != num_op) {
std::vector<VarHandle *> to_remove;
for (auto &var : pending_vars) {
if (var->generated_op_ == nullptr ||
complete_op.count(var->generated_op_) != 0) {
to_remove.push_back(var);
}
}
for (auto *var : to_remove) {
pending_vars.erase(var);
}
std::vector<OpHandle *> to_run;
for (auto *var : to_remove) {
for (auto *op : var->deps_ops_) {
if (var->name_ == "mean_0.tmp_0@GRAD") {
LOG(INFO) << op->DebugString();
}
auto &num = pending_ops[op];
--num;
if (num == 0) {
to_run.emplace_back(op);
}
}
}
for (auto *op : to_run) {
pending_ops.erase(op);
complete_op.insert(op);
}
if (to_run.empty()) break;
// TODO(yy): Use thead pool to run OpHandle. Operators in ToRun can be
// paralleled. We can also use another schedule method. Just a demo here.
std::stringstream ss;
ss << "\n";
for (auto *op : to_run) {
ss << op->DebugString() << "\n";
}
ss << std::endl;
LOG(INFO) << ss.str();
}
PADDLE_ENFORCE_EQ(complete_op.size(), num_op);
return std::vector<LoDTensor>();
}
} // namespace framework
} // namespace paddle
......@@ -28,32 +28,33 @@ limitations under the License. */
namespace paddle {
namespace framework {
struct AllReduceCallBack {
void operator()(framework::OperatorBase* op);
std::unordered_set<std::string> param_grad_names_;
platform::DeviceContext dev_ctx;
};
class ParallelExecutorPrivate;
class VarHandle;
class OpHandle;
class ParallelExecutor {
public:
explicit ParallelExecutor(const std::vector<platform::Place>& places,
const std::unordered_set& params);
/* @Brief
* Runtime evaluation of the given ProgramDesc under certain Scope
*
* @param
* ProgramDesc
* Scope
*/
void Run(const ProgramDesc& prog, Scope* scope, int block_id,
bool create_local_scope = true, bool create_vars = true);
const std::unordered_set<std::string>& params,
const ProgramDesc& startup_program,
const ProgramDesc& main_program,
const std::string& loss_var_name, Scope* scope);
std::vector<LoDTensor> Run(const std::vector<std::string>& fetch_tensors);
private:
std::vector<framework::Executor> exes_;
std::vector<framework::Scope*> scopes_;
std::vector<AllReduceCallBack> all_reduce_callbacks_;
platform::Communicator nccl_com_;
ParallelExecutorPrivate* member_;
void BCastParamsToGPUs(const ProgramDesc& startup_program) const;
VarHandle* GetVarHandle(const std::string& each_var_name,
const platform::Place& place) const;
void GenerateVar(OpHandle* op_handle, const std::string& each_var_name,
const platform::Place& place) const;
void ConstructDependencyGraph(const std::unordered_set<std::string>& params,
const ProgramDesc& main_program,
const std::string& loss_var_name) const;
};
} // namespace framework
......
......@@ -65,6 +65,17 @@ bool is_cpu_place(const Place &);
bool places_are_same_class(const Place &, const Place &);
bool is_same_place(const Place &, const Place &);
struct PlaceHash {
std::size_t operator()(const Place &p) const {
std::hash<int> ihash;
size_t dev_id = 0;
if (is_gpu_place(p)) {
dev_id = boost::get<CUDAPlace>(p).device;
}
return ihash(dev_id << 2 | p.which());
}
};
std::ostream &operator<<(std::ostream &, const Place &);
template <typename Visitor>
......
......@@ -2,6 +2,7 @@ if(WITH_PYTHON)
cc_library(paddle_pybind SHARED
SRCS pybind.cc exception.cc protobuf.cc const_value.cc recordio.cc
DEPS pybind python backward proto_desc paddle_memory executor prune init profiler feed_fetch_method
parallel_executor
${GLOB_OP_LIB})
if(NOT APPLE AND NOT ANDROID)
target_link_libraries(paddle_pybind rt)
......
......@@ -25,6 +25,7 @@ limitations under the License. */
#include "paddle/fluid/framework/lod_rank_table.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/lod_tensor_array.h"
#include "paddle/fluid/framework/parallel_executor.h"
#include "paddle/fluid/framework/prune.h"
#include "paddle/fluid/framework/reader.h"
#include "paddle/fluid/framework/selected_rows.h"
......@@ -488,6 +489,19 @@ All parameter, weight, gradient are variables in Paddle.
m.def("disable_profiler", platform::DisableProfiler);
m.def("reset_profiler", platform::ResetProfiler);
py::class_<ParallelExecutor>(m, "ParallelExecutor")
.def(
"__init__",
[](ParallelExecutor &self, const std::vector<platform::Place> &places,
const std::unordered_set<std::string> &params,
const ProgramDesc &startup_program,
const ProgramDesc &main_program, const std::string &loss_var_name,
Scope *scope) {
new (&self) ParallelExecutor(places, params, startup_program,
main_program, loss_var_name, scope);
})
.def("run", [](ParallelExecutor &self) { self.Run({}); });
BindRecordIOWriter(m);
return m.ptr();
}
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle.fluid as fluid
class ParallelExecutor(unittest.TestCase):
def test_main(self):
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
reader = fluid.layers.open_recordio_file(
filename='tmp',
shapes=[[-1, 784], [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'])
img, label = fluid.layers.read_file(reader)
hidden = fluid.layers.fc(img, size=200, act='tanh')
prediction = fluid.layers.fc(hidden, size=10, act='softmax')
loss = fluid.layers.cross_entropy(input=prediction, label=label)
loss = fluid.layers.mean(loss)
adam = fluid.optimizer.Adam()
adam.minimize(loss)
act_places = []
for each in [fluid.CUDAPlace(0), fluid.CUDAPlace(1)]:
p = fluid.core.Place()
p.set_place(each)
act_places.append(p)
exe = fluid.core.ParallelExecutor(
act_places,
set([p.name for p in main.global_block().iter_parameters()]),
startup.desc, main.desc, loss.name, fluid.global_scope())
exe.run()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册