diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index e9f213ae2cff61ccdd0b87fd84ac1b68706f11b0..748845874377252acffc6ae52586cbeca0e6f078 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -13,7 +13,343 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/framework/parallel_executor.h" +#include "lod_tensor.h" +#include "op_registry.h" namespace paddle { -namespace framework {} // namespace framework +namespace framework { + +struct OpHandle; + +struct VarHandle { + size_t version_; + std::string name_; + platform::Place place_; + + OpHandle *generated_op_; + std::vector deps_ops_; +}; + +struct OpHandle { + std::vector inputs_; + std::vector outputs_; + platform::DeviceContext *dev_ctx_; + + std::string DebugString() { + std::stringstream ss; + ss << "("; + for (auto *var : inputs_) { + ss << var->name_ << ":" << var->place_ << ", "; + } + ss << ") --> ("; + for (auto *var : outputs_) { + ss << var->name_ << ":" << var->place_ << ", "; + } + ss << ")\n"; + return ss.str(); + } + + virtual ~OpHandle() {} +}; + +struct ComputationOpHandle : public OpHandle { + std::unique_ptr op_; + + explicit ComputationOpHandle(const OpDesc &op_desc) + : op_(framework::OpRegistry::CreateOp(op_desc)) {} +}; + +struct ScaleLossGradOpHandle : public OpHandle {}; + +struct NCCLAllReduceOpHandle : public OpHandle {}; + +class ParallelExecutorPrivate { + public: + std::unordered_map + local_scopes_; + std::unordered_map + dev_ctxs_; + platform::Place main_place_; + + std::unordered_map>, + platform::PlaceHash> + vars_; + std::vector> ops_; +}; + +// TODO(yy): Move this function somewhere +ncclDataType_t ToNCCLDataType(std::type_index type) { + // FIXME!! + return ncclFloat; +} + +ParallelExecutor::ParallelExecutor( + const std::vector &places, + const std::unordered_set ¶ms, + const ProgramDesc &startup_program, const ProgramDesc &main_program, + const std::string &loss_var_name, Scope *scope) + : member_(new ParallelExecutorPrivate()) { + // Step 1. RunStartupProgram and Bcast the params to devs. + Executor exe(places[0]); + exe.Run(startup_program, scope, 0); + // Create local scopes + for (auto &place : places) { + member_->local_scopes_[place] = &scope->NewScope(); + } + member_->main_place_ = places[0]; + + // Bcast Parameters to all GPUs + if (platform::is_gpu_place(member_->main_place_)) { // Is CUDA + // BCastParamsToGPUs(startup_program); + } + // Startup Program has been run. All local scopes has correct parameters. + + // Step 2. Convert main_program to SSA form and dependency graph. Also, insert + // ncclOp + ConstructDependencyGraph(params, main_program, loss_var_name); +} + +void ParallelExecutor::ConstructDependencyGraph( + const std::unordered_set ¶ms, + const ProgramDesc &main_program, const std::string &loss_var_name) const { + std::unordered_set grads; + for (auto &each_param : params) { + grads.insert(each_param + "@GRAD"); + } + + bool is_forwarding = true; + for (auto *op : main_program.Block(0).AllOps()) { + bool change_forward = false; + + if (!is_forwarding) { + // FIXME(yy): Do not hard code like this + if (op->OutputArgumentNames().size() == 1 && + op->OutputArgumentNames()[0] == loss_var_name + "@GRAD") { + continue; // Drop fill 1. for backward coeff; + } + } + + for (auto &pair : member_->local_scopes_) { + member_->ops_.emplace_back(new ComputationOpHandle(*op)); + auto *op_handle = member_->ops_.back().get(); + + auto var_names = op->InputArgumentNames(); + + for (auto &each_var_name : var_names) { + auto &place = pair.first; + VarHandle *var = GetVarHandle(each_var_name, place); + op_handle->inputs_.emplace_back(var); + var->deps_ops_.emplace_back(op_handle); + } + var_names = op->OutputArgumentNames(); + + for (auto &each_var_name : var_names) { + auto &place = pair.first; + GenerateVar(op_handle, each_var_name, place); + } + + if (is_forwarding) { + if (var_names.size() == 1 && var_names[0] == loss_var_name) { + // Insert ScaleCost OpHandle + member_->ops_.emplace_back(new ScaleLossGradOpHandle()); + + op_handle = member_->ops_.back().get(); + auto &place = pair.first; + VarHandle *loss = GetVarHandle(loss_var_name, place); + loss->deps_ops_.emplace_back(op_handle); + op_handle->inputs_.emplace_back(loss); + GenerateVar(op_handle, loss_var_name + "@GRAD", place); + change_forward = true; + LOG(INFO) << "Scale Loss " << op_handle->DebugString(); + } + } + } + + if (change_forward) { + is_forwarding = false; + } + + if (!is_forwarding) { + auto var_names = op->OutputArgumentNames(); + for (auto &og : var_names) { + if (grads.count(og) != 0) { // is param grad + // Insert NCCL AllReduce Op + member_->ops_.emplace_back(new NCCLAllReduceOpHandle()); + auto *op_handle = member_->ops_.back().get(); + + for (auto &pair : member_->local_scopes_) { + auto &place = pair.first; + auto &vars = member_->vars_[place][og]; + + if (vars.empty()) { // This device has no data. continue. + continue; + } + auto *prev_grad = &vars[vars.size() - 1]; + op_handle->inputs_.emplace_back(prev_grad); + prev_grad->deps_ops_.emplace_back(op_handle); + auto &var = vars[vars.size()]; + var.place_ = place; + var.generated_op_ = op_handle; + var.name_ = og; + var.version_ = vars.size() - 1; + op_handle->outputs_.emplace_back(&var); + } + } + } + } + } +} + +void ParallelExecutor::GenerateVar(OpHandle *op_handle, + const std::string &each_var_name, + const platform::Place &place) const { + auto &vars = member_->vars_[place][each_var_name]; + size_t version = vars.size(); + auto &var = vars[version]; + var.version_ = version; + var.generated_op_ = op_handle; + var.name_ = each_var_name; + var.place_ = place; + op_handle->outputs_.emplace_back(&var); +} + +VarHandle *ParallelExecutor::GetVarHandle(const std::string &each_var_name, + const platform::Place &place) const { + auto &var_holders = member_->vars_[place]; + auto &var_holder = var_holders[each_var_name]; + VarHandle *var = nullptr; + if (var_holder.empty()) { + auto &init_var = var_holder[0]; + init_var.place_ = place; + init_var.name_ = each_var_name; + init_var.generated_op_ = nullptr; + init_var.version_ = 0; + var = &init_var; + } else { + var = &var_holder.rbegin()->second; + } + return var; +} + +void ParallelExecutor::BCastParamsToGPUs( + const ProgramDesc &startup_program) const { + auto *main_scope = member_->local_scopes_[member_->main_place_]; + for (auto *var_desc : startup_program.Block(0).AllVars()) { + if (var_desc->GetType() == proto::VarType::LOD_TENSOR) { + auto &main_tensor = + main_scope->FindVar(var_desc->Name())->Get(); + + ncclDataType_t data_type = ToNCCLDataType(main_tensor.type()); + auto &dims = main_tensor.dims(); + size_t numel = main_tensor.numel(); + std::vector> mems; + mems.emplace_back( + const_cast(main_tensor.data()), + new platform::CUDADeviceContext( + boost::get(member_->main_place_))); + + for (auto &pair : member_->local_scopes_) { + if (pair.first == member_->main_place_) { + continue; + } + + auto local_scope = pair.second; + auto *t = local_scope->Var(var_desc->Name())->GetMutable(); + t->Resize(dims); + mems.emplace_back(t->mutable_data(pair.first, main_tensor.type()), + new platform::CUDADeviceContext( + boost::get(pair.first))); + } + + // TODO(yy): Invoke ncclBCast here. mems, numel, data_type. The mems[0] + // is the src, rests are dests. + + (void)(data_type); + (void)(numel); + + // Free Communication Ctx + for (auto &pair : mems) { + // Release Communication Ctx + + // FIXME: Store CUDA DevCtx to member. Since NCCL All Reduce will use + // this + delete pair.second; + } + } + } +} + +std::vector ParallelExecutor::Run( + const std::vector &fetch_tensors) { + // Version --> VarHandle + std::unordered_set pending_vars; + std::unordered_map pending_ops; + + for (auto &place_pair : member_->vars_) { + for (auto &name_pair : place_pair.second) { + for (auto &version_pair : name_pair.second) { + pending_vars.insert(&version_pair.second); + } + } + } + + for (auto &op : member_->ops_) { + pending_ops.insert({op.get(), op->inputs_.size()}); + } + + std::unordered_set complete_op; + + size_t num_op = pending_ops.size(); + + while (complete_op.size() != num_op) { + std::vector to_remove; + for (auto &var : pending_vars) { + if (var->generated_op_ == nullptr || + complete_op.count(var->generated_op_) != 0) { + to_remove.push_back(var); + } + } + for (auto *var : to_remove) { + pending_vars.erase(var); + } + + std::vector to_run; + for (auto *var : to_remove) { + for (auto *op : var->deps_ops_) { + if (var->name_ == "mean_0.tmp_0@GRAD") { + LOG(INFO) << op->DebugString(); + } + auto &num = pending_ops[op]; + --num; + if (num == 0) { + to_run.emplace_back(op); + } + } + } + + for (auto *op : to_run) { + pending_ops.erase(op); + complete_op.insert(op); + } + + if (to_run.empty()) break; + + // TODO(yy): Use thead pool to run OpHandle. Operators in ToRun can be + // paralleled. We can also use another schedule method. Just a demo here. + + std::stringstream ss; + ss << "\n"; + for (auto *op : to_run) { + ss << op->DebugString() << "\n"; + } + ss << std::endl; + LOG(INFO) << ss.str(); + } + + PADDLE_ENFORCE_EQ(complete_op.size(), num_op); + return std::vector(); +} +} // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index f67b9266949dedf90cccbd2398e2dda66e8f21c4..ec80f89f0e84a03ca70f7a71073e5c673acd8776 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -28,32 +28,33 @@ limitations under the License. */ namespace paddle { namespace framework { -struct AllReduceCallBack { - void operator()(framework::OperatorBase* op); - - std::unordered_set param_grad_names_; - platform::DeviceContext dev_ctx; -}; - +class ParallelExecutorPrivate; +class VarHandle; +class OpHandle; class ParallelExecutor { + public: explicit ParallelExecutor(const std::vector& places, - const std::unordered_set& params); - - /* @Brief - * Runtime evaluation of the given ProgramDesc under certain Scope - * - * @param - * ProgramDesc - * Scope - */ - void Run(const ProgramDesc& prog, Scope* scope, int block_id, - bool create_local_scope = true, bool create_vars = true); + const std::unordered_set& params, + const ProgramDesc& startup_program, + const ProgramDesc& main_program, + const std::string& loss_var_name, Scope* scope); + + std::vector Run(const std::vector& fetch_tensors); private: - std::vector exes_; - std::vector scopes_; - std::vector all_reduce_callbacks_; - platform::Communicator nccl_com_; + ParallelExecutorPrivate* member_; + + void BCastParamsToGPUs(const ProgramDesc& startup_program) const; + + VarHandle* GetVarHandle(const std::string& each_var_name, + const platform::Place& place) const; + + void GenerateVar(OpHandle* op_handle, const std::string& each_var_name, + const platform::Place& place) const; + + void ConstructDependencyGraph(const std::unordered_set& params, + const ProgramDesc& main_program, + const std::string& loss_var_name) const; }; } // namespace framework diff --git a/paddle/fluid/platform/place.h b/paddle/fluid/platform/place.h index 501bddfc6ec8b5d0bf554b0911c32e47fd51ec15..633251eb474274ba526d60c8d4bddfb964495d69 100644 --- a/paddle/fluid/platform/place.h +++ b/paddle/fluid/platform/place.h @@ -65,6 +65,17 @@ bool is_cpu_place(const Place &); bool places_are_same_class(const Place &, const Place &); bool is_same_place(const Place &, const Place &); +struct PlaceHash { + std::size_t operator()(const Place &p) const { + std::hash ihash; + size_t dev_id = 0; + if (is_gpu_place(p)) { + dev_id = boost::get(p).device; + } + return ihash(dev_id << 2 | p.which()); + } +}; + std::ostream &operator<<(std::ostream &, const Place &); template diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index 8942b5c9430ffa4e499b0ad1d2b5acf6d18ec0ab..ecf9e47884990e1e8a07fe7ab0e884f0c94438c9 100644 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -2,6 +2,7 @@ if(WITH_PYTHON) cc_library(paddle_pybind SHARED SRCS pybind.cc exception.cc protobuf.cc const_value.cc recordio.cc DEPS pybind python backward proto_desc paddle_memory executor prune init profiler feed_fetch_method + parallel_executor ${GLOB_OP_LIB}) if(NOT APPLE AND NOT ANDROID) target_link_libraries(paddle_pybind rt) diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index d2e883caccdd34a9d662f06b83cf9a71d3d4a51e..8b752c4efbcd0d7c3b5a8d2be73e7e7aed37f09d 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -25,6 +25,7 @@ limitations under the License. */ #include "paddle/fluid/framework/lod_rank_table.h" #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/lod_tensor_array.h" +#include "paddle/fluid/framework/parallel_executor.h" #include "paddle/fluid/framework/prune.h" #include "paddle/fluid/framework/reader.h" #include "paddle/fluid/framework/selected_rows.h" @@ -488,6 +489,19 @@ All parameter, weight, gradient are variables in Paddle. m.def("disable_profiler", platform::DisableProfiler); m.def("reset_profiler", platform::ResetProfiler); + py::class_(m, "ParallelExecutor") + .def( + "__init__", + [](ParallelExecutor &self, const std::vector &places, + const std::unordered_set ¶ms, + const ProgramDesc &startup_program, + const ProgramDesc &main_program, const std::string &loss_var_name, + Scope *scope) { + new (&self) ParallelExecutor(places, params, startup_program, + main_program, loss_var_name, scope); + }) + .def("run", [](ParallelExecutor &self) { self.Run({}); }); + BindRecordIOWriter(m); return m.ptr(); } diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py new file mode 100644 index 0000000000000000000000000000000000000000..2b41b2c9b4e7b9788880b950712ac14f4aa4a023 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -0,0 +1,47 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle.fluid as fluid + + +class ParallelExecutor(unittest.TestCase): + def test_main(self): + main = fluid.Program() + startup = fluid.Program() + + with fluid.program_guard(main, startup): + reader = fluid.layers.open_recordio_file( + filename='tmp', + shapes=[[-1, 784], [-1, 1]], + lod_levels=[0, 0], + dtypes=['float32', 'int64']) + img, label = fluid.layers.read_file(reader) + hidden = fluid.layers.fc(img, size=200, act='tanh') + prediction = fluid.layers.fc(hidden, size=10, act='softmax') + loss = fluid.layers.cross_entropy(input=prediction, label=label) + loss = fluid.layers.mean(loss) + adam = fluid.optimizer.Adam() + adam.minimize(loss) + act_places = [] + for each in [fluid.CUDAPlace(0), fluid.CUDAPlace(1)]: + p = fluid.core.Place() + p.set_place(each) + act_places.append(p) + + exe = fluid.core.ParallelExecutor( + act_places, + set([p.name for p in main.global_block().iter_parameters()]), + startup.desc, main.desc, loss.name, fluid.global_scope()) + exe.run()