From 9cb8f503026c6d3d25fa80e34b8fa2ca0bea6d2f Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Mon, 19 Mar 2018 14:58:50 +0800 Subject: [PATCH] Complete fetch op --- paddle/fluid/framework/CMakeLists.txt | 2 +- paddle/fluid/framework/parallel_executor.cc | 123 +++++++++++++++--- paddle/fluid/framework/parallel_executor.h | 3 +- paddle/fluid/operators/math/concat.h | 1 + paddle/fluid/pybind/pybind.cc | 2 +- .../tests/unittests/test_parallel_executor.py | 15 ++- 6 files changed, 124 insertions(+), 22 deletions(-) diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index fadc24ae5..6522a7a69 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -87,7 +87,7 @@ cc_library(feed_fetch_method SRCS feed_fetch_method.cc DEPS lod_tensor scope glo cc_library(executor SRCS executor.cc DEPS op_registry device_context scope framework_proto backward glog lod_rank_table feed_fetch_method) cc_library(parallel_executor SRCS parallel_executor.cc DEPS op_registry device_context scope - framework_proto backward glog lod_rank_table feed_fetch_method executor simple_threadpool) + framework_proto backward glog lod_rank_table feed_fetch_method executor simple_threadpool concat) cc_library(prune SRCS prune.cc DEPS framework_proto) cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index edc24cc13..cfaa2dbd1 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -16,7 +16,9 @@ limitations under the License. */ #include "ThreadPool.h" #include "executor.h" #include "lod_tensor.h" +#include "lod_tensor_array.h" #include "op_registry.h" +#include "paddle/fluid/operators/math/concat.h" namespace paddle { namespace framework { @@ -34,7 +36,7 @@ struct VarHandleBase { virtual std::string DebugString() const = 0; OpHandle *generated_op_; - std::vector pending_ops_; + std::unordered_set pending_ops_; }; struct VarHandle : public VarHandleBase { @@ -93,7 +95,6 @@ struct ComputationOpHandle : public OpHandle { void Run() override { // Wait other op if necessary - LOG(INFO) << "Run " << this << " " << DebugString(); auto *cur_ctx = dev_ctx_[place_]; for (auto *in : inputs_) { if (in->generated_op_ && in->generated_op_->dev_ctx_[place_] != cur_ctx) { @@ -102,7 +103,6 @@ struct ComputationOpHandle : public OpHandle { } op_->Run(*scope_, place_); - LOG(INFO) << "Done " << this; } void Wait(platform::DeviceContext *waited_dev) override { @@ -122,8 +122,6 @@ struct ScaleLossGradOpHandle : public OpHandle { place_(place) {} void Run() override { - LOG(INFO) << "Run Scale Loss Grad"; - std::string var_name = static_cast(this->outputs_[0])->name_; float *tmp = scope_->FindVar(var_name) @@ -146,6 +144,64 @@ struct ScaleLossGradOpHandle : public OpHandle { } }; +struct FetchedData { + public: + std::vector tensors_; + + explicit FetchedData(size_t num_fetched) { tensors_.resize(num_fetched); } +}; + +struct FetchOpHandle : public OpHandle { + std::shared_ptr data_; + size_t offset_; + std::vector *local_scopes_; + std::vector tensors_; + + ~FetchOpHandle() { + for (auto *input_var : inputs_) { + input_var->pending_ops_.erase(this); + } + for (auto &pair : dev_ctx_) { + pair.second->Wait(); + } + + // Lazily merge tensors. Will faster code. + MergeTensors(); + } + + void Run() override { + tensors_.resize(inputs_.size()); + auto *var = static_cast(inputs_[0]); + auto &var_name = var->name_; + platform::CPUPlace cpu; + auto &scopes = *local_scopes_; + + for (size_t i = 0; i < scopes.size(); ++i) { + auto &scope = scopes[i]; + auto &t = scope->FindVar(var_name)->Get(); + if (platform::is_gpu_place(var->place_)) { + TensorCopy(t, cpu, *dev_ctx_[t.place()], &tensors_[i]); + } else { + tensors_[i].ShareDataWith(t); + tensors_[i].set_lod(t.lod()); + } + } + } + + void Wait(platform::DeviceContext *waited_dev) override { + PADDLE_THROW("Nobody should wait FetchOp. Unexpceted Error"); + } + + private: + void MergeTensors() const { + std::vector tensors_ptr; + for (auto &t : tensors_) { + tensors_ptr.emplace_back(&t); + } + data_->tensors_[offset_].MergeLoDTensor(tensors_ptr, platform::CPUPlace()); + } +}; + class ParallelExecutorPrivate { public: explicit ParallelExecutorPrivate(size_t num_threads = 12) @@ -154,6 +210,7 @@ class ParallelExecutorPrivate { std::vector places_; std::vector local_scopes_; + Scope *global_scope_; #ifdef PADDLE_WITH_CUDA struct NCCLContext { @@ -297,7 +354,7 @@ ParallelExecutor::ParallelExecutor( const std::string &loss_var_name, Scope *scope) : member_(new ParallelExecutorPrivate()) { member_->places_ = places; - + member_->global_scope_ = scope; // Step 1. RunStartupProgram and Bcast the params to devs. Executor exe(places[0]); exe.Run(startup_program, scope, 0); @@ -308,9 +365,9 @@ ParallelExecutor::ParallelExecutor( member_->main_place_ = places[0]; // Bcast Parameters to all GPUs + BuildNCCLCommunicator(); if (platform::is_gpu_place(member_->main_place_) && member_->local_scopes_.size() != 1) { // Is CUDA - BuildNCCLCommunicator(); BCastParamsToGPUs(startup_program); } // Startup Program has been run. All local scopes has correct parameters. @@ -365,7 +422,7 @@ void ParallelExecutor::ConstructDependencyGraph( for (auto &each_var_name : var_names) { VarHandle *var = GetVarHandle(each_var_name, p); op_handle->inputs_.emplace_back(var); - var->pending_ops_.emplace_back(op_handle); + var->pending_ops_.emplace(op_handle); } var_names = op->OutputArgumentNames(); @@ -390,7 +447,6 @@ void ParallelExecutor::ConstructDependencyGraph( GenerateVar(op_handle, loss_var_name + "@GRAD", p); change_forward = true; - LOG(INFO) << "Scale Loss " << op_handle->DebugString(); } } } @@ -416,7 +472,7 @@ void ParallelExecutor::ConstructDependencyGraph( } auto *prev_grad = &vars[vars.size() - 1]; op_handle->inputs_.emplace_back(prev_grad); - prev_grad->pending_ops_.emplace_back(op_handle); + prev_grad->pending_ops_.emplace(op_handle); auto &var = vars[vars.size()]; var.place_ = p; var.generated_op_ = op_handle; @@ -463,10 +519,6 @@ void ParallelExecutor::PolishGraphToSupportDataHarzaeds() const { continue; } - LOG(INFO) << "Link " << it_new->second.DebugString() << " From " - << it_old->second.version_ << " To " - << it_new->second.version_; - for (auto *read_op : read_ops) { // Manually add a dependency var from read_op to write_op; if (read_op == write_op) { @@ -479,7 +531,7 @@ void ParallelExecutor::PolishGraphToSupportDataHarzaeds() const { dep_var->generated_op_ = read_op; read_op->outputs_.emplace_back(dep_var); - dep_var->pending_ops_.emplace_back(write_op); + dep_var->pending_ops_.emplace(write_op); write_op->inputs_.emplace_back(dep_var); member_->dep_vars_.emplace(dep_var); } @@ -572,8 +624,9 @@ void ParallelExecutor::BuildNCCLCommunicator() const { #endif } -std::vector ParallelExecutor::Run( - const std::vector &fetch_tensors) { +void ParallelExecutor::Run(const std::vector &fetch_tensors, + const std::string &fetched_var_name) { + auto fetched_data = std::make_shared(fetch_tensors.size()); // Version --> VarHandle member_->exception_.reset(); std::unordered_map pending_vars; @@ -602,6 +655,38 @@ std::vector ParallelExecutor::Run( } } + std::unordered_map> fetched_vars; + + for (auto &fetch_var_name : fetch_tensors) { + for (auto &pair : member_->vars_) { + auto it = pair.second.find(fetch_var_name); + if (it != pair.second.end()) { + fetched_vars[fetch_var_name].push_back(&it->second.rbegin()->second); + } + } + } + + std::vector fetch_ops; + + for (size_t i = 0; i < fetch_tensors.size(); ++i) { + auto &var_name = fetch_tensors[i]; + auto &vars = fetched_vars[var_name]; + fetch_ops.emplace_back(); + FetchOpHandle *op = &fetch_ops.back(); + op->data_ = fetched_data; + op->offset_ = i; + op->local_scopes_ = &member_->local_scopes_; + for (auto &p : member_->places_) { + op->dev_ctx_[p] = this->member_->GetNCCLCtx(p).ctx_.get(); + } + + for (auto *var : vars) { + var->pending_ops_.emplace(op); + op->inputs_.emplace_back(var); + } + pending_ops.insert({op, op->inputs_.size()}); + } + for (auto *op : to_run) { RunOp(pending_vars, op); } @@ -642,7 +727,9 @@ std::vector ParallelExecutor::Run( RunOp(pending_vars, op); } } - return std::vector(); + fetch_ops.clear(); + *member_->global_scope_->Var(fetched_var_name)->GetMutable() = + fetched_data->tensors_; } void ParallelExecutor::RunOp( diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 30416563f..e4857f0ee 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -40,7 +40,8 @@ class ParallelExecutor { const ProgramDesc& main_program, const std::string& loss_var_name, Scope* scope); - std::vector Run(const std::vector& fetch_tensors); + void Run(const std::vector& fetch_tensors, + const std::string& fetched_var_name = "fetched_var"); private: ParallelExecutorPrivate* member_; diff --git a/paddle/fluid/operators/math/concat.h b/paddle/fluid/operators/math/concat.h index 22147d79e..c0e983e4a 100644 --- a/paddle/fluid/operators/math/concat.h +++ b/paddle/fluid/operators/math/concat.h @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once +#include "paddle/fluid/framework/data_type.h" #include "paddle/fluid/framework/tensor.h" namespace paddle { diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index c2348d968..929c343f7 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -508,7 +508,7 @@ All parameter, weight, gradient are variables in Paddle. new (&self) ParallelExecutor(places, params, startup_program, main_program, loss_var_name, scope); }) - .def("run", [](ParallelExecutor &self) { self.Run({}); }); + .def("run", &ParallelExecutor::Run); BindRecordIOWriter(m); return m.ptr(); diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py index 2a614700b..1cea14fb9 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -16,6 +16,7 @@ import unittest import paddle.fluid as fluid import paddle.v2 as paddle import paddle.v2.dataset.mnist as mnist +import numpy class ParallelExecutor(unittest.TestCase): @@ -66,4 +67,16 @@ class ParallelExecutor(unittest.TestCase): act_places, set([p.name for p in main.global_block().iter_parameters()]), startup.desc, main.desc, loss.name, fluid.global_scope()) - exe.run() + exe.run([loss.name], 'fetched_var') + + first_loss = numpy.array(fluid.global_scope().find_var('fetched_var') + .get_lod_tensor_array()[0]) + + for i in xrange(10): + exe.run([], 'fetched_var') + exe.run([loss.name], 'fetched_var') + last_loss = numpy.array(fluid.global_scope().find_var('fetched_var') + .get_lod_tensor_array()[0]) + + print first_loss, last_loss + self.assertGreater(first_loss[0], last_loss[0]) -- GitLab