diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index fd2ab54fdaf774e73e0d33cf5defa2733bb6acd7..c41788cc69f827ad7f217360e059a25a5ce9777d 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -132,14 +132,14 @@ AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( ProcessGraph(graphs_, local_scopes_[0]); } -void AsyncSSAGraphExecutor::StartOffPythonTrainLoop() { +void AsyncSSAGraphExecutor::StartOffPythonTrainLoop(bool return_merged) { VLOG(3) << "StartOffPythonTrainLoop size = " << places_.size(); for (size_t i = 1; i < places_.size(); ++i) { - auto call = [this, i]() -> void { + auto call = [this, i, return_merged]() -> void { VLOG(3) << "start off python thread " << i; try { while (true) { - executors_[i]->Run({}); + executors_[i]->Run({}, return_merged); } } catch (...) { exception_holder_.Catch(std::current_exception()); @@ -164,8 +164,12 @@ void AsyncSSAGraphExecutor::HandleException() { } } -FeedFetchList AsyncSSAGraphExecutor::Run( - const std::vector &fetch_tensors) { +FetchResultType AsyncSSAGraphExecutor::Run( + const std::vector &fetch_tensors, bool return_merged) { + PADDLE_ENFORCE_EQ(return_merged, true, + platform::errors::InvalidArgument( + "AsyncSSAGraphExecutor does not support unmerged " + "results to be fetched!")); // init once if (run_futures_.size() == 0 && places_.size() > 1) { if (strategy_.thread_barrier_) { @@ -175,18 +179,17 @@ FeedFetchList AsyncSSAGraphExecutor::Run( #endif } exception_holder_.Clear(); - StartOffPythonTrainLoop(); + StartOffPythonTrainLoop(return_merged); } if (places_.size() == 1) { exception_holder_.Clear(); } - FeedFetchList fetch_data; - fetch_data.reserve(fetch_tensors.size()); + FetchResultType fetch_data; try { - fetch_data = executors_[0]->Run(fetch_tensors); + fetch_data = executors_[0]->Run(fetch_tensors, return_merged); } catch (...) { exception_holder_.Catch(std::current_exception()); } @@ -194,9 +197,10 @@ FeedFetchList AsyncSSAGraphExecutor::Run( HandleException(); FeedFetchList ret; + auto &val = boost::get(fetch_data); for (size_t fetch_idx = 0; fetch_idx < fetch_tensors.size(); ++fetch_idx) { std::vector lodtensor_ptrs; - lodtensor_ptrs.push_back(&fetch_data.at(fetch_idx)); + lodtensor_ptrs.push_back(&val.at(fetch_idx)); ret.emplace_back(); ret.back().MergeLoDTensor(lodtensor_ptrs, platform::CPUPlace()); } diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.h b/paddle/fluid/framework/details/async_ssa_graph_executor.h index 97472674fada8cc1c531b54be49816e76ebde3f8..ae7b81e6ada75184f150e96daea7d60a33a1a8d8 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.h @@ -42,10 +42,11 @@ class AsyncSSAGraphExecutor : public SSAGraphExecutor { ~AsyncSSAGraphExecutor() final = default; const ir::Graph &Graph() const override { return *graphs_[0]; } - FeedFetchList Run(const std::vector &fetch_tensors) override; + FetchResultType Run(const std::vector &fetch_tensors, + bool return_merged) override; private: - void StartOffPythonTrainLoop(); + void StartOffPythonTrainLoop(bool return_merged); void HandleException(); private: diff --git a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc index aba8e5c6c976e4765ceaed799e3d8f542d355e9b..78ea3654a4f4cfb7e24fd3170724a10936b6540d 100644 --- a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc @@ -51,8 +51,8 @@ FastThreadedSSAGraphExecutor::FastThreadedSSAGraphExecutor( PrepareAtomicOpDeps(); } -FeedFetchList FastThreadedSSAGraphExecutor::Run( - const std::vector &fetch_tensors) { +FetchResultType FastThreadedSSAGraphExecutor::Run( + const std::vector &fetch_tensors, bool return_merged) { VLOG(3) << "enter FastThreadedSSAGraphExecutor Run"; std::unique_ptr event( new platform::RecordEvent("FastThreadedSSAGraphExecutorPrepare")); @@ -61,15 +61,19 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run( PrepareAtomicOpDeps(); size_t num_ops = op_deps->size(); - paddle::framework::FeedFetchList fetches; - fetches.resize(fetch_tensors.size()); + FetchResultType fetches; + if (return_merged) { + fetches = FeedFetchList(fetch_tensors.size()); + } else { + fetches = FetchUnmergedList(fetch_tensors.size()); + } std::unordered_map> fetched_vars; std::vector fetch_ops; std::vector ready_fetch_ops; exception_.Clear(); InsertFetchOps(fetch_tensors, &fetches, &fetched_vars, op_deps.get(), - &fetch_ops, &ready_fetch_ops); + &fetch_ops, &ready_fetch_ops, return_merged); event.reset(nullptr); if (strategy_.num_threads_ == 1 && traced_ops_.size() == num_ops) { // If the num_threads is 1, we can record the order of operator's @@ -120,11 +124,11 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run( } void FastThreadedSSAGraphExecutor::InsertFetchOps( - const std::vector &fetch_tensors, FeedFetchList *fetches, + const std::vector &fetch_tensors, FetchResultType *fetches, std::unordered_map> *fetched_vars, std::unordered_map> *op_deps, std::vector *fetch_ops, - std::vector *ready_fetch_ops) { + std::vector *ready_fetch_ops, bool return_merged) { std::unordered_set fetch_tensor_set(fetch_tensors.begin(), fetch_tensors.end()); for (auto &fetch_var_name : fetch_tensor_set) { @@ -154,7 +158,7 @@ void FastThreadedSSAGraphExecutor::InsertFetchOps( ir::Node *fetch_node = graph_->CreateEmptyNode("fetch", ir::Node::Type::kOperation); auto *op = new FetchOpHandle(fetch_node, fetches, i, &local_scopes_, - &local_exec_scopes_); + &local_exec_scopes_, return_merged); fetch_ops->emplace_back(op); for (auto &p : places_) { diff --git a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h index 0e904554d83c17cbd0b8f436fadaa85b8b8b68e9..72f7412602f786cd099e17a87747fb87990dc226 100644 --- a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h @@ -36,7 +36,8 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor { const std::vector &local_exec_scopes, const std::vector &places, ir::Graph *graph); - FeedFetchList Run(const std::vector &fetch_tensors) override; + FetchResultType Run(const std::vector &fetch_tensors, + bool return_merged) override; const ir::Graph &Graph() const override; private: @@ -83,12 +84,12 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor { bool RunTracedOps(const std::vector &traced_ops); void InsertFetchOps( - const std::vector &fetch_tensors, FeedFetchList *fetches, + const std::vector &fetch_tensors, FetchResultType *fetches, std::unordered_map> *fetched_vars, std::unordered_map> *op_deps, std::vector *fetch_ops, - std::vector *ready_fetch_ops); + std::vector *ready_fetch_ops, bool return_merged); }; } // namespace details } // namespace framework diff --git a/paddle/fluid/framework/details/fetch_op_handle.cc b/paddle/fluid/framework/details/fetch_op_handle.cc index 221dec7235322f2a6fb6a1ce2e7563f8cdeeeba5..87b3cec8cff4cddacc7186cca1797f93e13f54db 100644 --- a/paddle/fluid/framework/details/fetch_op_handle.cc +++ b/paddle/fluid/framework/details/fetch_op_handle.cc @@ -14,6 +14,7 @@ #include "paddle/fluid/framework/details/fetch_op_handle.h" #include +#include #include #include "paddle/fluid/platform/profiler.h" @@ -21,14 +22,16 @@ namespace paddle { namespace framework { namespace details { -FetchOpHandle::FetchOpHandle(ir::Node *node, FeedFetchList *data, size_t offset, - std::vector *local_scopes, - std::vector *local_exec_scopes) +FetchOpHandle::FetchOpHandle(ir::Node *node, FetchResultType *data, + size_t offset, std::vector *local_scopes, + std::vector *local_exec_scopes, + bool return_merged) : OpHandleBase(node), data_(data), offset_(offset), local_scopes_(local_scopes), - local_exec_scopes_(local_exec_scopes) {} + local_exec_scopes_(local_exec_scopes), + return_merged_(return_merged) {} FetchOpHandle::~FetchOpHandle() {} @@ -37,12 +40,42 @@ void FetchOpHandle::RecordWaitEventOnCtx(platform::DeviceContext *waited_ctx) { } void FetchOpHandle::WaitAndMergeCPUTensors() const { - std::vector tensors_ptr; - tensors_ptr.reserve(tensors_.size()); - for (auto &t : tensors_) { - tensors_ptr.emplace_back(&t); + if (return_merged_) { + const auto &tensor_dims = tensors_[0].dims(); + for (size_t i = 1; i < tensors_.size(); i++) { + const auto &ele_dims = tensors_[i].dims(); + PADDLE_ENFORCE_EQ( + tensor_dims.size(), ele_dims.size(), + platform::errors::Fatal("The dimension sizes of fetched Tensors are " + "different from each other on different " + "devices. And the error is caused by the %zu " + "(th) fetched variable. Please set the " + "parameter `return_merged = False` when you " + "call the `Executor.run()` method.", + offset_)); + for (int j = 1; j < tensor_dims.size(); j++) { + PADDLE_ENFORCE_EQ( + tensor_dims[j], ele_dims[j], + platform::errors::Fatal("The dimensions of fetched Tensors are " + "different from each other on different " + "devices. And the error is caused by the " + "%zu (th) fetched variable. Please set the " + "parameter `return_merged = False` when " + "you call the `Executor.run()` method.", + offset_)); + } + } + std::vector tensors_ptr; + tensors_ptr.reserve(tensors_.size()); + for (auto &t : tensors_) { + tensors_ptr.emplace_back(&t); + } + auto &val = boost::get(*data_); + val.at(offset_).MergeLoDTensor(tensors_ptr, platform::CPUPlace()); + } else { + auto &val = boost::get(*data_); + val.at(offset_) = std::move(tensors_); } - data_->at(offset_).MergeLoDTensor(tensors_ptr, platform::CPUPlace()); } void FetchOpHandle::RunImpl() { diff --git a/paddle/fluid/framework/details/fetch_op_handle.h b/paddle/fluid/framework/details/fetch_op_handle.h index f3af4e61e2ba7664275eaed5f34c05940d0ec582..48753cb45d578bfb26d7b6572d24ebf4d23b6a97 100644 --- a/paddle/fluid/framework/details/fetch_op_handle.h +++ b/paddle/fluid/framework/details/fetch_op_handle.h @@ -28,9 +28,9 @@ namespace details { struct FetchOpHandle : public OpHandleBase { public: - FetchOpHandle(ir::Node *node, FeedFetchList *data, size_t offset, + FetchOpHandle(ir::Node *node, FetchResultType *data, size_t offset, std::vector *local_scopes, - std::vector *local_exec_scopes); + std::vector *local_exec_scopes, bool return_merged); ~FetchOpHandle(); @@ -50,11 +50,12 @@ struct FetchOpHandle : public OpHandleBase { void WaitInputVarGenerated(const platform::Place &place) override; private: - FeedFetchList *data_; + FetchResultType *data_; size_t offset_; std::vector *local_scopes_; std::vector *local_exec_scopes_; std::vector tensors_; + bool return_merged_; }; } // namespace details diff --git a/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc b/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc index 1a3c753e7d2b075eba9af98f7b206e42b51b650c..8c1d5188a9cb4fff5816ac9ca45dce2335eee5d2 100644 --- a/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc @@ -123,25 +123,33 @@ std::vector ParallelSSAGraphExecutor::Graphs() { return result; } -FeedFetchList ParallelSSAGraphExecutor::Run( - const std::vector &fetch_tensors) { - std::vector> run_futures; - - std::vector fetch_data; - FeedFetchList ret; +FetchResultType ParallelSSAGraphExecutor::Run( + const std::vector &fetch_tensors, bool return_merged) { + std::vector> run_futures; + std::vector fetch_data; + FetchResultType ret; fetch_data.reserve(places_.size()); - ret.reserve(fetch_tensors.size()); + if (return_merged) { + ret = FeedFetchList(); + } else { + ret = FetchUnmergedList(); + } + exception_holder_.Clear(); for (size_t i = 0; i < places_.size(); ++i) { - auto call = [this, i, &fetch_tensors]() -> FeedFetchList { + auto call = [this, i, return_merged, &fetch_tensors]() -> FetchResultType { try { - return executors_[i]->Run(fetch_tensors); + return executors_[i]->Run(fetch_tensors, return_merged); } catch (...) { exception_holder_.Catch(std::current_exception()); } - return FeedFetchList(); + if (return_merged) { + return FeedFetchList(); + } else { + return FetchUnmergedList(); + } }; if (pool_) { @@ -164,14 +172,33 @@ FeedFetchList ParallelSSAGraphExecutor::Run( exception_holder_.ReThrow(); } - for (size_t fetch_idx = 0; fetch_idx < fetch_tensors.size(); ++fetch_idx) { - std::vector lodtensor_ptrs; - lodtensor_ptrs.reserve(local_scopes_.size()); - for (size_t scope_idx = 0; scope_idx < local_scopes_.size(); ++scope_idx) { - lodtensor_ptrs.push_back(&fetch_data.at(scope_idx).at(fetch_idx)); + if (return_merged) { + auto &ret_val = boost::get(ret); + for (size_t fetch_idx = 0; fetch_idx < fetch_tensors.size(); ++fetch_idx) { + std::vector lodtensor_ptrs; + lodtensor_ptrs.reserve(local_scopes_.size()); + for (size_t scope_idx = 0; scope_idx < local_scopes_.size(); + ++scope_idx) { + auto &val = boost::get(fetch_data.at(scope_idx)); + lodtensor_ptrs.push_back(&val.at(fetch_idx)); + } + ret_val.emplace_back(); + ret_val.back().MergeLoDTensor(lodtensor_ptrs, platform::CPUPlace()); + } + } else { + auto &ret_val = boost::get(ret); + for (size_t fetch_idx = 0; fetch_idx < fetch_tensors.size(); ++fetch_idx) { + ret_val.emplace_back(); + for (size_t scope_idx = 0; scope_idx < local_scopes_.size(); + ++scope_idx) { + auto &val = boost::get(fetch_data.at(scope_idx)); + PADDLE_ENFORCE_EQ( + val.at(fetch_idx).size(), 1, + platform::errors::Fatal( + "Each place must have only one fetched LoDTensor!")); + ret_val.back().emplace_back(val.at(fetch_idx)[0]); + } } - ret.emplace_back(); - ret.back().MergeLoDTensor(lodtensor_ptrs, platform::CPUPlace()); } return ret; } diff --git a/paddle/fluid/framework/details/parallel_ssa_graph_executor.h b/paddle/fluid/framework/details/parallel_ssa_graph_executor.h index 6889c54dd4c6906b179036386f8d38dad04f5c9f..5dcf01c8dced6e93a548ad25e96bf3e96cee4944 100644 --- a/paddle/fluid/framework/details/parallel_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/parallel_ssa_graph_executor.h @@ -39,7 +39,8 @@ class ParallelSSAGraphExecutor : public SSAGraphExecutor { std::vector Graphs(); - FeedFetchList Run(const std::vector &fetch_tensors) override; + FetchResultType Run(const std::vector &fetch_tensors, + bool return_merged) override; private: std::vector> SeparateMultiDevicesGraph( diff --git a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc index 3640e9f7dbfa5fac3c09b455ece6f98603a832b2..fe86d002ca8b33695839be3c5d2ff5fd20672952 100644 --- a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc @@ -41,19 +41,19 @@ ScopeBufferedSSAGraphExecutor::ScopeBufferedSSAGraphExecutor( PrepareLocalExeScopes(); } -FeedFetchList ScopeBufferedSSAGraphExecutor::Run( - const std::vector &fetch_tensors) { +FetchResultType ScopeBufferedSSAGraphExecutor::Run( + const std::vector &fetch_tensors, bool return_merged) { if (drop_scope_counter_ == 0) { platform::RecordEvent e("InitLocalVars"); InitVariables(); } - std::vector fetch_data; + FetchResultType fetch_data; std::exception_ptr eptr = nullptr; auto exe_run_func = [&]() { try { - fetch_data = underlying_executor_->Run(fetch_tensors); + fetch_data = underlying_executor_->Run(fetch_tensors, return_merged); } catch (...) { eptr = std::current_exception(); } diff --git a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h index 17493a89a660588b0e0f8f8da42518961b008773..f5d0ffe109501e86ac8b604f714fe402349b678f 100644 --- a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h @@ -50,7 +50,8 @@ class ScopeBufferedSSAGraphExecutor : public SSAGraphExecutor { return underlying_executor_->Graph(); } - FeedFetchList Run(const std::vector& fetch_tensors) override; + FetchResultType Run(const std::vector& fetch_tensors, + bool return_merged) override; void DropLocalExeScopes(); diff --git a/paddle/fluid/framework/details/ssa_graph_executor.h b/paddle/fluid/framework/details/ssa_graph_executor.h index 2454ec2b27d9d2060f28b8d6cea0ce49fe347433..0ac46bbc4da25a811d42dc2cdf400366f34cbc17 100644 --- a/paddle/fluid/framework/details/ssa_graph_executor.h +++ b/paddle/fluid/framework/details/ssa_graph_executor.h @@ -35,7 +35,8 @@ class SSAGraphExecutor { virtual const ir::Graph& Graph() const = 0; - virtual FeedFetchList Run(const std::vector& fetch_tensors) = 0; + virtual FetchResultType Run(const std::vector& fetch_tensors, + bool return_merged = true) = 0; }; void ClearFetchOp(ir::Graph* graph, std::vector* fetch_ops); diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index d84b0ffe7a62be6e07c584c48bf5fad7825f721a..63942f41967c90fca3f959c962832dcd866dc993 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -52,8 +52,8 @@ ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor( CopyOpDeps(); } -inline FeedFetchList ThreadedSSAGraphExecutor::RunImpl( - const std::vector &fetch_tensors) { +inline FetchResultType ThreadedSSAGraphExecutor::RunImpl( + const std::vector &fetch_tensors, bool return_merged) { std::unique_ptr event( new platform::RecordEvent("ThreadedSSAGraphExecutorPrepare")); std::unique_ptr op_deps = op_deps_futures_.get(); @@ -70,10 +70,15 @@ inline FeedFetchList ThreadedSSAGraphExecutor::RunImpl( // Step 2. Insert FetchOps std::vector fetch_ops; std::unordered_set fetch_dependencies; - FeedFetchList fetch_data(fetch_tensors.size()); + FetchResultType fetch_data; + if (return_merged) { + fetch_data = FeedFetchList(fetch_tensors.size()); + } else { + fetch_data = FetchUnmergedList(fetch_tensors.size()); + } InsertFetchOps(fetch_tensors, &fetch_ops, &fetch_dependencies, &ready_ops, - &pending_ops, &pending_vars, &fetch_data); + &pending_ops, &pending_vars, &fetch_data, return_merged); exception_holder_.Clear(); event.reset(nullptr); @@ -142,12 +147,12 @@ inline FeedFetchList ThreadedSSAGraphExecutor::RunImpl( return fetch_data; } -FeedFetchList ThreadedSSAGraphExecutor::Run( - const std::vector &fetch_tensors) { +FetchResultType ThreadedSSAGraphExecutor::Run( + const std::vector &fetch_tensors, bool return_merged) { for (size_t j = 0; j < strategy_.num_iteration_per_run_ - 1; ++j) { - RunImpl({}); + RunImpl({}, return_merged); } - return RunImpl(fetch_tensors); + return RunImpl(fetch_tensors, return_merged); } void ThreadedSSAGraphExecutor::InsertFetchOps( @@ -157,7 +162,7 @@ void ThreadedSSAGraphExecutor::InsertFetchOps( std::unordered_set *ready_ops, std::unordered_map *pending_ops, std::unordered_set *pending_vars, - FeedFetchList *fetch_data) { + FetchResultType *fetch_data, bool return_merged) { std::unordered_map> fetched_vars; std::unordered_set local_ready_vars; std::unordered_set fetch_tensor_set(fetch_tensors.begin(), @@ -189,7 +194,7 @@ void ThreadedSSAGraphExecutor::InsertFetchOps( ir::Node *fetch_node = graph_->CreateEmptyNode("fetch", ir::Node::Type::kOperation); auto *op = new FetchOpHandle(fetch_node, fetch_data, i, &local_scopes_, - &local_exec_scopes_); + &local_exec_scopes_, return_merged); fetch_ops->emplace_back(op); for (auto &p : places_) { diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index 8576e2e65a9256bfba1f45da2cc608301b8f79ad..b8b584f27200bd3f89efcc20be2c6a3435274a56 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -58,12 +58,14 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { const ir::Graph &Graph() const override { return *graph_; } // Run a SSAGraph by a thread pool // Use topological sort algorithm - FeedFetchList Run(const std::vector &fetch_tensors) override; + FetchResultType Run(const std::vector &fetch_tensors, + bool return_merged) override; ~ThreadedSSAGraphExecutor() final = default; private: - inline FeedFetchList RunImpl(const std::vector &fetch_tensors); + inline FetchResultType RunImpl(const std::vector &fetch_tensors, + bool return_merged); void RunOp(const std::shared_ptr> &ready_var_q, details::OpHandleBase *op); @@ -99,7 +101,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { std::unordered_set *ready_ops, std::unordered_map *pending_ops, std::unordered_set *pending_vars, - FeedFetchList *fetch_data); + FetchResultType *fetch_data, bool return_merged); void PrepareOpDeps(); diff --git a/paddle/fluid/framework/feed_fetch_type.h b/paddle/fluid/framework/feed_fetch_type.h index fae792ad9fa766f456ed706cc9adeb4e34d20123..210d549edf2640759a0713b9d23529a1100ae3c9 100644 --- a/paddle/fluid/framework/feed_fetch_type.h +++ b/paddle/fluid/framework/feed_fetch_type.h @@ -15,11 +15,14 @@ limitations under the License. */ #pragma once #include #include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/platform/variant.h" namespace paddle { namespace framework { using FeedFetchType = LoDTensor; using FeedFetchList = std::vector; +using FetchUnmergedList = std::vector>; +using FetchResultType = boost::variant; static const char kFeedOpType[] = "feed"; static const char kFetchOpType[] = "fetch"; diff --git a/paddle/fluid/framework/lod_tensor_array.h b/paddle/fluid/framework/lod_tensor_array.h index 36a5c3c5d601390beedaf37ceb98ee2c63ecf5a6..7b15289c1b5121a148f9b3d5d72cc40b026c9106 100644 --- a/paddle/fluid/framework/lod_tensor_array.h +++ b/paddle/fluid/framework/lod_tensor_array.h @@ -20,6 +20,7 @@ namespace paddle { namespace framework { using LoDTensorArray = std::vector; +using LoDTensor2DArray = std::vector>; } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index f43ca6ff6128368e4b2422d1692ebaa1af2b9540..d9a4c771201087e8fbcd36debbbc9c5e9d39c8c4 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -723,8 +723,8 @@ void ParallelExecutor::BCastParamsToDevices( } } -FeedFetchList ParallelExecutor::Run( - const std::vector &fetch_tensors) { +FetchResultType ParallelExecutor::Run( + const std::vector &fetch_tensors, bool return_merged) { VLOG(3) << "enter ParallelExecutor Run"; #ifdef WITH_GPERFTOOLS if (gProfileStarted) { @@ -738,7 +738,7 @@ FeedFetchList ParallelExecutor::Run( member_->HasGarbageCollectors()); VLOG(3) << "ParallelExecutor begin to run member_->executor_->Run"; - auto fetch_data = member_->executor_->Run(fetch_tensors); + auto fetch_data = member_->executor_->Run(fetch_tensors, return_merged); return fetch_data; } diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 56aeb21531cbb32dc634b832774b1c0aab0c74db..be36592cc1d03002a6d7b2c95d02f0b284be5eef 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -77,7 +77,8 @@ class ParallelExecutor { void FeedAndSplitTensorIntoLocalScopes( const std::unordered_map &tensors); - FeedFetchList Run(const std::vector &fetch_tensors); + FetchResultType Run(const std::vector &fetch_tensors, + bool return_merged = true); private: // broadcast the parameters from the 0th device. diff --git a/paddle/fluid/pybind/global_value_getter_setter.cc b/paddle/fluid/pybind/global_value_getter_setter.cc index 4a0e09bb2ae7239ea429b51e464c1451ec0ae27f..3faa8c83c7b53db770a0e32fe05356aed3039328 100644 --- a/paddle/fluid/pybind/global_value_getter_setter.cc +++ b/paddle/fluid/pybind/global_value_getter_setter.cc @@ -32,6 +32,7 @@ DECLARE_bool(use_ngraph); DECLARE_bool(use_system_allocator); DECLARE_bool(free_idle_chunk); DECLARE_bool(free_when_no_cache_hit); +DECLARE_bool(enable_parallel_graph); namespace paddle { namespace pybind { @@ -169,6 +170,7 @@ static void RegisterGlobalVarGetterSetter() { REGISTER_GLOBAL_VAR_GETTER_ONLY(FLAGS_use_ngraph); REGISTER_GLOBAL_VAR_GETTER_SETTER(FLAGS_eager_delete_tensor_gb); REGISTER_GLOBAL_VAR_GETTER_SETTER(FLAGS_use_system_allocator); + REGISTER_GLOBAL_VAR_GETTER_SETTER(FLAGS_enable_parallel_graph); REGISTER_GLOBAL_VAR_GETTER_ONLY(FLAGS_free_idle_chunk); REGISTER_GLOBAL_VAR_GETTER_ONLY(FLAGS_free_when_no_cache_hit); } diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index a0cf35ea29935636ed885de2f7f23d38bff1c683..6ff6e722bf04f440a42cd5af7ceab9e380951837 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -24,6 +24,7 @@ limitations under the License. */ #include #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/feed_fetch_method.h" +#include "paddle/fluid/framework/feed_fetch_type.h" #include "paddle/fluid/framework/framework.pb.h" #include "paddle/fluid/framework/garbage_collector.h" #include "paddle/fluid/framework/io/fs.h" @@ -103,6 +104,7 @@ DECLARE_bool(use_ngraph); // disable auto conversion to list in Python PYBIND11_MAKE_OPAQUE(paddle::framework::LoDTensorArray); +PYBIND11_MAKE_OPAQUE(paddle::framework::LoDTensor2DArray); namespace paddle { namespace pybind { @@ -1614,6 +1616,25 @@ All parameter, weight, gradient are variables in Paddle. }, py::return_value_policy::take_ownership); + py::class_(m, "LoDTensor2DArray", R"DOC( + LoDTensor2DArray is 2-D array of LoDTensor. + )DOC") + .def("_move_to_list", + [](LoDTensor2DArray &self) -> py::list { + py::list res(self.size()); + for (size_t i = 0; i < self.size(); ++i) { + py::list tmp(self[i].size()); + for (size_t j = 0; j < self[i].size(); ++j) { + tmp[j] = py::cast(std::move(self[i][j])); + } + res[i] = std::move(tmp); + self[i].clear(); + } + self.clear(); + return res; + }, + py::return_value_policy::take_ownership); + m.def("op_support_gpu", OpSupportGPU); #ifdef PADDLE_WITH_CUDA m.def("get_cuda_device_count", platform::GetCUDADeviceCount); @@ -2306,9 +2327,20 @@ All parameter, weight, gradient are variables in Paddle. &ParallelExecutor::FeedAndSplitTensorIntoLocalScopes) .def("run", [](ParallelExecutor &self, - const std::vector &fetch_tensors) { - pybind11::gil_scoped_release release; - return self.Run(fetch_tensors); + const std::vector &fetch_tensors, + bool return_merged) -> py::object { + paddle::framework::FetchResultType ret; + { + pybind11::gil_scoped_release release; + ret = self.Run(fetch_tensors, return_merged); + } + if (return_merged) { + return py::cast(std::move( + boost::get(ret))); + } else { + return py::cast(std::move( + boost::get(ret))); + } }) .def("device_count", &ParallelExecutor::DeviceCount); diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index af5c7e1c0e8088cc358140a6039b93278f23869a..69ac324a382ee610ad9c663c1ea43a1583cedaba 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -620,7 +620,7 @@ class Executor(object): self._closed = True def _run_parallel(self, program, scope, feed, fetch_list, fetch_var_name, - return_numpy): + return_numpy, return_merged): exe = program._executor # TODO(zhenghuihuang): quantization uses Graph in CompiledProgram # instead of program. We will add support for checking Vars in Graph @@ -674,7 +674,7 @@ class Executor(object): exe.feed_tensors_into_local_scopes(res) fetch_var_names = list(map(_to_name_str, fetch_list)) - tensors = exe.run(fetch_var_names)._move_to_list() + tensors = exe.run(fetch_var_names, return_merged)._move_to_list() return as_numpy(tensors) if return_numpy else tensors def run(self, @@ -685,7 +685,8 @@ class Executor(object): fetch_var_name='fetch', scope=None, return_numpy=True, - use_program_cache=False): + use_program_cache=False, + return_merged=True): """ Run the specified :code:`Program` or :code:`CompiledProgram`. It should be noted that the executor will execute all the operators in :code:`Program` or :code:`CompiledProgram` without pruning some @@ -724,6 +725,17 @@ class Executor(object): the input program is :code:`fluid.Program`, and the parameters(program, feed variable name and fetch_list variable) of this interface remains unchanged during running. The default is False. + return_merged(bool): This parameter indicates whether fetched variables (the variables + specified in the fetch list) should be merged according to the execution device dimension. + If :code:`return_merged` is False, the type of the return value is a two-dimensional list + of :code:`Tensor` ( :code:`return_numpy` is False) or a two-dimensional list of + :code:`numpy.ndarray` ( :code:`return_numpy` is True). If :code:`return_merged` is True, + the type of the return value is an one-dimensional list of :code:`Tensor` ( :code:`return_numpy` + is False) or an one-dimensional list of :code:`numpy.ndarray` ( :code:`return_numpy` is True). + Please see Examples 2 for more details. If the lengths of fetched results are variant, please + set :code:`return_merged` as False, which denotes that the fetched results will not be merged. + The default is True, but it is just for the compatibility, and may use False as default value + in the future version. Returns: @@ -743,7 +755,7 @@ class Executor(object): results are spliced together in dimension 0 for the same variable values (variables in fetch_list) on different devices. - Examples: + Examples 1: .. code-block:: python import paddle.fluid as fluid @@ -765,6 +777,66 @@ class Executor(object): x = numpy.random.random(size=(10, 1)).astype('float32') outs = exe.run(feed={'X': x}, fetch_list=[loss.name]) + + Examples 2: + .. code-block:: python + + import paddle.fluid as fluid + import numpy as np + + # First create the Executor. + place = fluid.CUDAPlace(0) + exe = fluid.Executor(place) + + data = fluid.data(name='X', shape=[None, 1], dtype='float32') + class_dim = 2 + prediction = fluid.layers.fc(input=data, size=class_dim) + loss = fluid.layers.mean(prediction) + adam = fluid.optimizer.Adam() + adam.minimize(loss) + + # Run the startup program once and only once. + exe.run(fluid.default_startup_program()) + build_strategy = fluid.BuildStrategy() + binary = fluid.CompiledProgram(fluid.default_main_program()).with_data_parallel( + loss_name=loss.name, build_strategy=build_strategy) + batch_size = 6 + x = np.random.random(size=(batch_size, 1)).astype('float32') + + # Set return_merged as False to fetch unmerged results: + unmerged_prediction, = exe.run(binary, feed={'X': x}, + fetch_list=[prediction.name], + return_merged=False) + # If the user uses two GPU cards to run this python code, the printed result will be + # (2, 3, class_dim). The first dimension value of the printed result is the number of used + # GPU cards, and the second dimension value is the quotient of batch_size and the + # number of used GPU cards. + print("The unmerged prediction shape: {}".format(np.array(unmerged_prediction).shape)) + print(unmerged_prediction) + + # Set return_merged as True to fetch merged results: + merged_prediction, = exe.run(binary, feed={'X': x}, + fetch_list=[prediction.name], + return_merged=True) + # If the user uses two GPU cards to run this python code, the printed result will be + # (6, class_dim). The first dimension value of the printed result is the batch_size. + print("The merged prediction shape: {}".format(np.array(merged_prediction).shape)) + print(merged_prediction) + + # Out: + # The unmerged prediction shape: (2, 3, 2) + # [array([[-0.37620035, -0.19752218], + # [-0.3561043 , -0.18697084], + # [-0.24129935, -0.12669306]], dtype=float32), array([[-0.24489994, -0.12858354], + # [-0.49041364, -0.25748932], + # [-0.44331917, -0.23276259]], dtype=float32)] + # The merged prediction shape: (6, 2) + # [[-0.37789783 -0.19921964] + # [-0.3577645 -0.18863106] + # [-0.24274671 -0.12814042] + # [-0.24635398 -0.13003758] + # [-0.49232286 -0.25939852] + # [-0.44514108 -0.2345845 ]] """ try: return self._run_impl( @@ -775,7 +847,8 @@ class Executor(object): fetch_var_name=fetch_var_name, scope=scope, return_numpy=return_numpy, - use_program_cache=use_program_cache) + use_program_cache=use_program_cache, + return_merged=return_merged) except Exception as e: if not isinstance(e, core.EOFException): warnings.warn( @@ -783,7 +856,8 @@ class Executor(object): six.reraise(*sys.exc_info()) def _run_impl(self, program, feed, fetch_list, feed_var_name, - fetch_var_name, scope, return_numpy, use_program_cache): + fetch_var_name, scope, return_numpy, use_program_cache, + return_merged): if self._closed: raise RuntimeError("Attempted to use a closed Executor") @@ -840,7 +914,8 @@ class Executor(object): feed=feed, fetch_list=fetch_list, fetch_var_name=fetch_var_name, - return_numpy=return_numpy) + return_numpy=return_numpy, + return_merged=return_merged) def _run_program(self, program, feed, fetch_list, feed_var_name, fetch_var_name, scope, return_numpy, use_program_cache): diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 7bd2eabaf53cb8053061024dac10377e42a4688b..1185312886e250f49ffb0eacc93165bffe06fdcd 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -362,5 +362,5 @@ set_tests_properties(test_parallel_executor_test_while_train test_parallel_execu test_parallel_executor_feed_persistable_var test_parallel_executor_crf_auto_growth test_buffer_shared_memory_reuse_pass_and_fuse_optimization_op_pass test_data_norm_op test_imperative_using_non_zero_gpu test_fuse_bn_act_pass - test_optimizer_in_control_flow + test_optimizer_in_control_flow test_fetch_unmerged test_buffer_shared_memory_reuse_pass PROPERTIES LABELS "RUN_TYPE=DIST") diff --git a/python/paddle/fluid/tests/unittests/test_fetch_unmerged.py b/python/paddle/fluid/tests/unittests/test_fetch_unmerged.py new file mode 100644 index 0000000000000000000000000000000000000000..1181272bd98b00f65e6925b44da814662f96045f --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fetch_unmerged.py @@ -0,0 +1,121 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest +import random +import numpy as np +import paddle.fluid as fluid +import six +import paddle + +os.environ["CPU_NUM"] = "2" + + +class TestFetchUnmerged(unittest.TestCase): + def conv_net(self, img, label): + conv_pool_1 = fluid.nets.simple_img_conv_pool( + input=img, + filter_size=5, + num_filters=20, + pool_size=2, + pool_stride=2, + pool_type='max', + act="relu") + conv_pool_1 = fluid.layers.batch_norm(conv_pool_1) + conv_pool_2 = fluid.nets.simple_img_conv_pool( + input=conv_pool_1, + filter_size=5, + num_filters=50, + pool_size=2, + pool_stride=2, + pool_type='avg', + act="relu") + hidden = fluid.layers.fc(input=conv_pool_2, size=100, act='relu') + prediction = fluid.layers.fc(input=hidden, size=10, act='softmax') + loss = fluid.layers.cross_entropy(input=prediction, label=label) + avg_loss = fluid.layers.mean(loss) + return avg_loss, prediction + + def build_program(self, main, startup, is_test): + with fluid.unique_name.guard(): + with fluid.program_guard(main, startup): + img = fluid.layers.data( + name='image', shape=[1, 28, 28], dtype='float32') + label = fluid.layers.data( + name='label', shape=[1], dtype='int64') + loss, prediction = self.conv_net(img, label) + if not is_test: + opt = fluid.optimizer.Adam(learning_rate=0.001) + opt.minimize(loss) + return [img, label], loss, prediction + + def fetch_unmerged(self, use_cuda=True): + main_program = fluid.Program() + startup_program = fluid.Program() + feeds, loss, prediction = self.build_program(main_program, + startup_program, False) + + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(startup_program) + + build_strategy = fluid.BuildStrategy() + binary = fluid.CompiledProgram(main_program).with_data_parallel( + loss_name=loss.name, build_strategy=build_strategy) + + iters = 3 + batch_size = 64 + train_reader = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.mnist.train(), buf_size=500), + batch_size=batch_size) + feeder = fluid.DataFeeder(feed_list=feeds, place=place) + + device_num = fluid.core.get_cuda_device_count() if use_cuda else 2 + for _ in range(iters): + data = next(train_reader()) + loss_v, prediction_v = exe.run(binary, + feed=feeder.feed(data), + fetch_list=[loss, prediction], + return_merged=False) + self.assertEqual(np.array(loss_v).shape, (device_num, 1)) + self.assertEqual( + np.array(prediction_v).shape, + (device_num, batch_size / device_num, 10)) + + for _ in range(iters): + data = next(train_reader()) + loss_v, prediction_v = exe.run(binary, + feed=feeder.feed(data), + fetch_list=[loss, prediction], + return_merged=True) + self.assertEqual(np.array(loss_v).shape, (device_num, )) + self.assertEqual(np.array(prediction_v).shape, (batch_size, 10)) + + def test_fetch_unmerged(self): + if fluid.core.is_compiled_with_cuda(): + self.fetch_unmerged(use_cuda=True) + self.fetch_unmerged(use_cuda=False) + + def test_fetch_unmerged_parallel_graph(self): + fluid.core.globals()['FLAGS_enable_parallel_graph'] = True + if fluid.core.is_compiled_with_cuda(): + self.fetch_unmerged(use_cuda=True) + self.fetch_unmerged(use_cuda=False) + fluid.core.globals()['FLAGS_enable_parallel_graph'] = False + + +if __name__ == '__main__': + unittest.main()