From c9de6f1b05aa428d5e6ad9c16db5c2ca8c12cdc7 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 6 Dec 2018 21:16:10 +0800 Subject: [PATCH] init parallel graph mode --- paddle/fluid/framework/CMakeLists.txt | 2 +- paddle/fluid/framework/details/CMakeLists.txt | 2 + .../framework/details/all_reduce_op_handle.cc | 28 +++- .../fluid/framework/details/build_strategy.cc | 1 + .../details/computation_op_handle.cc | 12 +- .../framework/details/computation_op_handle.h | 1 + .../framework/details/execution_strategy.h | 2 +- .../details/multi_devices_graph_pass.cc | 8 +- .../fluid/framework/details/op_handle_base.cc | 3 +- .../fluid/framework/details/op_handle_base.h | 1 - .../details/parallel_ssa_graph_executor.cc | 66 ++++++++++ .../details/parallel_ssa_graph_executor.h | 51 +++++++ .../scope_buffered_ssa_graph_executor.cc | 41 +++--- .../scope_buffered_ssa_graph_executor.h | 5 +- .../details/threaded_ssa_graph_executor.h | 1 + paddle/fluid/framework/details/var_handle.cc | 2 +- paddle/fluid/framework/parallel_executor.cc | 124 +++++++++++++----- paddle/fluid/framework/parallel_executor.h | 2 + paddle/fluid/framework/scope.cc | 5 +- paddle/fluid/framework/threadpool.cc | 16 ++- paddle/fluid/framework/threadpool.h | 4 +- paddle/fluid/framework/threadpool_test.cc | 44 +++++++ .../fluid/operators/reader/blocking_queue.h | 3 + .../fluid/operators/reader/buffered_reader.cc | 5 + .../reader/create_double_buffer_reader_op.cc | 14 +- .../operators/reader/create_py_reader_op.cc | 2 + .../fluid/operators/reader/open_files_op.cc | 2 + paddle/fluid/platform/nccl_helper.h | 7 +- paddle/fluid/platform/profiler.cc | 12 +- paddle/fluid/pybind/pybind.cc | 24 ++-- 30 files changed, 399 insertions(+), 91 deletions(-) create mode 100644 paddle/fluid/framework/details/parallel_ssa_graph_executor.cc create mode 100644 paddle/fluid/framework/details/parallel_ssa_graph_executor.h diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index c701a2ad6..b419c8c29 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -177,7 +177,7 @@ else() endif() cc_library(parallel_executor SRCS parallel_executor.cc DEPS - threaded_ssa_graph_executor scope_buffered_ssa_graph_executor + threaded_ssa_graph_executor scope_buffered_ssa_graph_executor parallel_ssa_graph_executor graph build_strategy fast_threaded_ssa_graph_executor variable_helper) diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index 93288936f..652475332 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -54,6 +54,8 @@ cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS ${SSA_GRAPH_EXECUT cc_library(threaded_ssa_graph_executor SRCS threaded_ssa_graph_executor.cc DEPS fetch_op_handle ssa_graph_executor scope simple_threadpool device_context) +cc_library(parallel_ssa_graph_executor SRCS parallel_ssa_graph_executor.cc DEPS threaded_ssa_graph_executor) + cc_test(broadcast_op_test SRCS broadcast_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory device_context broadcast_op_handle) cc_test(gather_op_test SRCS gather_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory diff --git a/paddle/fluid/framework/details/all_reduce_op_handle.cc b/paddle/fluid/framework/details/all_reduce_op_handle.cc index e8bf53e16..ae17ea8a1 100644 --- a/paddle/fluid/framework/details/all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/all_reduce_op_handle.cc @@ -46,20 +46,27 @@ AllReduceOpHandle::AllReduceOpHandle(ir::Node *node, #endif void AllReduceOpHandle::RunImpl() { + int64_t start_ts = GetTS(); + int64_t func_ts = GetTS(); + VLOG(5) << "all_reduce_op_handle::RunImpl start"; platform::RecordEvent record_event(Name(), dev_ctxes_.cbegin()->second); // FIXME(typhoonzero): If scope0(global scope) have NCCL_ID_VAR, // this is a distributed or inter-process call, find a better way. #ifdef PADDLE_WITH_CUDA if (NoDummyInputSize() == 1 && - local_scopes_[0]->FindLocalVar(NCCL_ID_VARNAME) == nullptr) { + local_scopes_[0]->FindVar(NCCL_ID_VARNAME) == nullptr) { #else if (NoDummyInputSize() == 1) { #endif return; // No need to all reduce when GPU count = 1; } else { // Wait input done + start_ts = GetTS(); WaitInputVarGenerated(); + VLOG(5) << "all_reduce_op_handle wait input var spent: " + << GetTS() - start_ts << " (ns)."; + start_ts = GetTS(); auto in_var_handles = DynamicCast(this->Inputs()); auto out_var_handles = DynamicCast(this->Outputs()); PADDLE_ENFORCE_EQ( @@ -100,6 +107,8 @@ void AllReduceOpHandle::RunImpl() { } int dev_id = boost::get(p).device; + VLOG(5) << "call allreduce: " << in_var_handles[i]->name_ + << " on dev: " << dev_id; auto &nccl_ctx = nccl_ctxs_->at(dev_id); auto stream = nccl_ctx.stream(); auto comm = nccl_ctx.comm_; @@ -110,11 +119,20 @@ void AllReduceOpHandle::RunImpl() { }); } this->RunAndRecordEvent([&] { - platform::NCCLGroupGuard guard; - for (auto &call : all_reduce_calls) { - call(); + // TODO(Yancey1989): need allreduce operator to avoid this flag + if (nccl_ctxs_->need_group_call_) { + platform::NCCLGroupGuard guard; + for (auto &call : all_reduce_calls) { + call(); + } + } else { + // only used in executor_type == ParallalGraph, one thread one GPU + // TODO(Yancey1989): use allreduce operator to avoid this tricky. + PADDLE_ENFORCE(all_reduce_calls.size() == 1UL); + all_reduce_calls[0](); } }); + #else PADDLE_THROW("Not compiled with CUDA"); #endif @@ -144,6 +162,8 @@ void AllReduceOpHandle::RunImpl() { } } } + VLOG(5) << "all_reduce_op_handle Impl spent: " << GetTS() - func_ts + << " (ns)."; } std::string AllReduceOpHandle::Name() const { return "all_reduce"; } diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index 1e1b945f6..04c106153 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -118,6 +118,7 @@ std::unique_ptr BuildStrategy::Apply( std::unique_ptr graph(new ir::Graph(main_program)); for (std::shared_ptr &pass : pass_builder_->AllPasses()) { + VLOG(5) << "run pass: " << pass->Type(); if (pass->Type() == "multi_devices_pass") { pass->Erase("places"); pass->SetNotOwned>("places", &places); diff --git a/paddle/fluid/framework/details/computation_op_handle.cc b/paddle/fluid/framework/details/computation_op_handle.cc index 7ad1e40c6..35ba99a87 100644 --- a/paddle/fluid/framework/details/computation_op_handle.cc +++ b/paddle/fluid/framework/details/computation_op_handle.cc @@ -33,10 +33,18 @@ void ComputationOpHandle::RunImpl() { op_->Run(*scope_->FindVar(kLocalExecScopeName)->Get(), place_); }; - if (is_lock_and_record_event_free_) { + if (Name().compare("conv2d") || Name().compare("conv2d_grad")) { + int64_t start_ts = GetTS(); + auto varname = DynamicCast(this->Outputs())[0]->name_; run_func(); + VLOG(5) << Name() << "_op_handle: " << varname + << " spent: " << GetTS() - start_ts << " (ns)."; } else { - this->RunAndRecordEvent(run_func); + if (is_lock_and_record_event_free_) { + run_func(); + } else { + this->RunAndRecordEvent(run_func); + } } } diff --git a/paddle/fluid/framework/details/computation_op_handle.h b/paddle/fluid/framework/details/computation_op_handle.h index 662a91d6b..5346b56dd 100644 --- a/paddle/fluid/framework/details/computation_op_handle.h +++ b/paddle/fluid/framework/details/computation_op_handle.h @@ -17,6 +17,7 @@ #include #include +#include "paddle/fluid/framework/details/container_cast.h" #include "paddle/fluid/framework/details/op_handle_base.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/operator.h" diff --git a/paddle/fluid/framework/details/execution_strategy.h b/paddle/fluid/framework/details/execution_strategy.h index 15c496130..d3d5b6bf5 100644 --- a/paddle/fluid/framework/details/execution_strategy.h +++ b/paddle/fluid/framework/details/execution_strategy.h @@ -20,7 +20,7 @@ namespace framework { namespace details { struct ExecutionStrategy { - enum ExecutorType { kDefault = 0, kExperimental = 1 }; + enum ExecutorType { kDefault = 0, kExperimental = 1, kParallelGraph = 2 }; size_t num_threads_{0}; bool use_cuda_{true}; diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index cbae5321d..1bd238357 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -300,7 +300,7 @@ std::unique_ptr MultiDevSSAGraphBuilder::ApplyImpl( auto nodes = graph->ReleaseNodes(); ir::Graph &result = *graph; - int num_trainers = Get(kNumTrainers); + // int num_trainers = Get(kNumTrainers); for (auto &node : nodes) { if (node->IsVar() && node->Var()) { @@ -329,6 +329,7 @@ std::unique_ptr MultiDevSSAGraphBuilder::ApplyImpl( std::unordered_map sharded_var_device; for (ir::Node *node : sorted_ops) { + VLOG(5) << "op name: " << node->Op()->Type(); if (boost::get( node->Op()->GetAttr(OpProtoAndCheckerMaker::OpRoleAttrName())) == static_cast(OpRole::kRPC)) { @@ -365,9 +366,11 @@ std::unique_ptr MultiDevSSAGraphBuilder::ApplyImpl( // is true only for the op that scale the final scalar loss. // It also assumes backward op will always follow the forward op in // the block. + VLOG(5) << "this is loss scale op!"; is_forwarding = false; } else { int op_dev_id = GetOpDeviceID(result, node, sharded_var_device); + VLOG(5) << "on device id: " << op_dev_id; if (op_dev_id != -1) { // This op only runs on one specific device. CreateComputationalOp(&result, node, op_dev_id); for (ir::Node *n : node->outputs) { @@ -386,7 +389,8 @@ std::unique_ptr MultiDevSSAGraphBuilder::ApplyImpl( CreateComputationalOps(&result, node, places_.size()); } - if (!is_forwarding && (places_.size() > 1 || num_trainers > 1)) { + // if (!is_forwarding && (places_.size() > 1 || num_trainers > 1)) { + if (!is_forwarding && nccl_ctxs_->contexts_.size() > 1) { // Currently, we assume that once gradient is generated, it can be // broadcast, and each gradient is only broadcast once. if (static_cast(boost::get(node->Op()->GetAttr( diff --git a/paddle/fluid/framework/details/op_handle_base.cc b/paddle/fluid/framework/details/op_handle_base.cc index 4822627ac..d68d1ce71 100644 --- a/paddle/fluid/framework/details/op_handle_base.cc +++ b/paddle/fluid/framework/details/op_handle_base.cc @@ -41,6 +41,7 @@ OpHandleBase::~OpHandleBase() { void OpHandleBase::Run(bool use_cuda) { #ifdef PADDLE_WITH_CUDA + int64_t start_ts = 0; if (events_.empty() && use_cuda) { for (auto &p : dev_ctxes_) { int dev_id = boost::get(p.first).device; @@ -52,7 +53,6 @@ void OpHandleBase::Run(bool use_cuda) { #else PADDLE_ENFORCE(!use_cuda); #endif - RunImpl(); } @@ -125,6 +125,7 @@ bool OpHandleBase::NeedWait(VarHandleBase *in_var) { void OpHandleBase::RunAndRecordEvent(const std::function &callback) { #ifdef PADDLE_WITH_CUDA if (!events_.empty()) { // Use event + VLOG(5) << "events not empty"; std::function method = callback; for (auto &p : dev_ctxes_) { method = [method, p, this]() { diff --git a/paddle/fluid/framework/details/op_handle_base.h b/paddle/fluid/framework/details/op_handle_base.h index ba12ca3c6..88c78e067 100644 --- a/paddle/fluid/framework/details/op_handle_base.h +++ b/paddle/fluid/framework/details/op_handle_base.h @@ -26,7 +26,6 @@ namespace framework { namespace details { constexpr char kLocalExecScopeName[] = "@LCOAL_SCOPE@"; - // Wraps ir::Node and provide helper utilities. // It's responsible for populating necessary fields of ir::Node. class OpHandleBase { diff --git a/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc b/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc new file mode 100644 index 000000000..72beb74aa --- /dev/null +++ b/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc @@ -0,0 +1,66 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h" + +namespace paddle { +namespace framework { +namespace details { + +ParallelSSAGraphExecutor::ParallelSSAGraphExecutor( + const ExecutionStrategy &strategy, const std::vector &local_scopes, + const std::vector &places, + std::vector> graphs) + : strategy_(std::move(strategy)), + local_scopes_(std::move(local_scopes)), + places_(std::move(places)), + graphs_(std::move(graphs)), + pool_(places.size() >= 2 ? new ::ThreadPool(places.size()) : nullptr) { + PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size()); + for (size_t i = 0; i < places.size(); ++i) { + std::vector scopes = {local_scopes_[i]}; + std::vector places = {places_[i]}; + executors_.emplace_back(new details::ThreadedSSAGraphExecutor( + strategy_, scopes, places, std::move(graphs_[i]))); + } +} + +FeedFetchList ParallelSSAGraphExecutor::Run( + const std::vector &fetch_tensors) { + std::vector> run_futures; + FeedFetchList fetch_data; + + for (size_t i = 0; i < places_.size(); ++i) { + auto call = [this, i] { + // FIXME(Yancey1989): need to fix fetch data failed. + std::vector empty; + executors_[i]->Run(empty); + }; + if (pool_) { + run_futures.emplace_back(pool_->enqueue(std::move(call))); + } else { + call(); + } + } + if (pool_) { + for (auto &f : run_futures) { + f.wait(); + } + } + return fetch_data; +} + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/parallel_ssa_graph_executor.h b/paddle/fluid/framework/details/parallel_ssa_graph_executor.h new file mode 100644 index 000000000..c0ba1577f --- /dev/null +++ b/paddle/fluid/framework/details/parallel_ssa_graph_executor.h @@ -0,0 +1,51 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "ThreadPool.h" +#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h" +#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h" + +namespace paddle { +namespace framework { +namespace details { + +class ParallelSSAGraphExecutor : public SSAGraphExecutor { + public: + ParallelSSAGraphExecutor(const ExecutionStrategy &strategy, + const std::vector &local_scopes, + const std::vector &places, + std::vector> graphs); + ~ParallelSSAGraphExecutor() final = default; + const ir::Graph &Graph() const override { return *graphs_[0]; } + + FeedFetchList Run(const std::vector &fetch_tensors) override; + + private: + ExecutionStrategy strategy_; + std::vector local_scopes_; + std::vector places_; + std::vector> graphs_; + std::unique_ptr<::ThreadPool> pool_; + + std::vector> executors_; +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc index 499246a98..abc6b9f55 100644 --- a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc @@ -27,39 +27,40 @@ namespace framework { namespace details { ScopeBufferedSSAGraphExecutor::ScopeBufferedSSAGraphExecutor( ExecutionStrategy strategy, std::vector local_scopes, - std::vector var_infos, std::vector places, + std::vector> var_infos_list, + std::vector places, std::unique_ptr &&underlying_executor) : strategy_(std::move(strategy)), underlying_executor_(std::move(underlying_executor)), local_scopes_(std::move(local_scopes)), - var_infos_(std::move(var_infos)), + var_infos_list_(std::move(var_infos_list)), places_(std::move(places)) {} FeedFetchList ScopeBufferedSSAGraphExecutor::Run( const std::vector &fetch_tensors) { if (drop_scope_counter_ == 0) { // Create local scopes. - for (auto it = local_scopes_.rbegin(); it != local_scopes_.rend(); ++it) { - auto &scope = *it; + for (size_t i = 0; i < local_scopes_.size(); ++i) { + auto &scope = local_scopes_[i]; Scope &local_scope = scope->NewScope(); *scope->Var(details::kLocalExecScopeName)->GetMutable() = &local_scope; - - for (auto &info : var_infos_) { - if (scope->FindVar(info.name_) != nullptr) { - continue; - } - - if (info.persistable_) { // Persistable - InitializeVariable(scope->Var(info.name_), info.type_); - } else { - InitializeVariable(local_scope.Var(info.name_), info.type_); + for (auto &var_infos : var_infos_list_) { + for (auto &info : var_infos) { + if (scope->FindVar(info.name_) != nullptr) { + continue; + } + if (info.persistable_) { // Persistable + InitializeVariable(scope->Var(info.name_), info.type_); + } else { + InitializeVariable(local_scope.Var(info.name_), info.type_); + } } } } } std::vector fetch_data; - std::exception_ptr eptr; + std::exception_ptr eptr = nullptr; try { fetch_data = underlying_executor_->Run(fetch_tensors); } catch (...) { @@ -71,9 +72,13 @@ FeedFetchList ScopeBufferedSSAGraphExecutor::Run( #ifdef PADDLE_WITH_CUDA const std::string gc_name = "garbage_collector"; - DeviceGarbageCollectorMap *gc = - Graph().Has(gc_name) ? &(Graph().Get(gc_name)) - : nullptr; + DeviceGarbageCollectorMap *gc = nullptr; + // FIXME(Yancey1989): need to fix gc failed on parallel graph mode + if (strategy_.type_ != ExecutionStrategy::kParallelGraph) { + gc = Graph().Has(gc_name) + ? &(Graph().Get(gc_name)) + : nullptr; + } #endif if (!fetch_tensors.empty() || diff --git a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h index 5e87e0bf5..51230d4a4 100644 --- a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h @@ -38,7 +38,8 @@ class ScopeBufferedSSAGraphExecutor : public SSAGraphExecutor { public: ScopeBufferedSSAGraphExecutor( ExecutionStrategy strategy, std::vector local_scopes, - std::vector var_infos, std::vector places, + std::vector> var_info_list, + std::vector places, std::unique_ptr&& underlying_executor); const ir::Graph& Graph() const override { @@ -53,7 +54,7 @@ class ScopeBufferedSSAGraphExecutor : public SSAGraphExecutor { ExecutionStrategy strategy_; std::unique_ptr underlying_executor_; std::vector local_scopes_; - std::vector var_infos_; + std::vector> var_infos_list_; std::vector places_; }; } // namespace details diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index 24da56c09..b45afbc04 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -24,6 +24,7 @@ #include #include "ThreadPool.h" // ThreadPool in thrird party #include "paddle/fluid/framework/blocking_queue.h" +#include "paddle/fluid/framework/details/computation_op_handle.h" #include "paddle/fluid/framework/details/exception_holder.h" #include "paddle/fluid/framework/details/execution_strategy.h" #include "paddle/fluid/framework/details/fetch_op_handle.h" diff --git a/paddle/fluid/framework/details/var_handle.cc b/paddle/fluid/framework/details/var_handle.cc index 30da029ca..7de6025a2 100644 --- a/paddle/fluid/framework/details/var_handle.cc +++ b/paddle/fluid/framework/details/var_handle.cc @@ -20,7 +20,7 @@ namespace details { VarHandleBase::~VarHandleBase() {} -VarHandle::~VarHandle() { VLOG(4) << "deleting var handle " << DebugString(); } +VarHandle::~VarHandle() { VLOG(5) << "deleting var handle " << DebugString(); } std::string VarHandle::DebugString() const { std::stringstream ss; diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index b98408ee7..ff3d76fb0 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -26,6 +26,7 @@ limitations under the License. */ #include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h" #include "paddle/fluid/framework/details/multi_devices_helper.h" +#include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h" #include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h" #include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h" #include "paddle/fluid/platform/profiler.h" @@ -53,6 +54,7 @@ class ParallelExecutorPrivate { std::vector local_scopes_; Scope *global_scope_; // not owned std::unique_ptr executor_; + std::vector> executors_; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) std::unique_ptr nccl_ctxs_; @@ -84,6 +86,9 @@ ParallelExecutor::ParallelExecutor( PADDLE_ENFORCE(places.size() > 1, "If you set build_strategy.reduce with 'Reduce'," "the number of places must be greater than 1."); + PADDLE_ENFORCE(exec_strategy.type_ != ExecutionStrategy::kParallelGraph, + "You should set build_strategy.reduce with 'AllReduce' for " + "ParallelGraph executor type"); } // Step 1. Bcast the params to devs. @@ -106,31 +111,55 @@ ParallelExecutor::ParallelExecutor( // Bcast Parameters to all GPUs #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME); - ncclUniqueId *nccl_id = nullptr; + std::unique_ptr nccl_id = nullptr; + bool need_group_call = true; if (nccl_id_var != nullptr) { - nccl_id = nccl_id_var->GetMutable(); + nccl_id.reset(nccl_id_var->GetMutable()); + } else if (exec_strategy.type_ == ExecutionStrategy::kParallelGraph) { + nccl_id.reset(new ncclUniqueId()); + PADDLE_ENFORCE(platform::dynload::ncclGetUniqueId(nccl_id.get())); + *member_->global_scope_->Var(NCCL_ID_VARNAME) + ->GetMutable() = *nccl_id.get(); + need_group_call = false; + } else { + // init nccl_id in NCCLContextMap } + member_->nccl_ctxs_.reset(new platform::NCCLContextMap( - member_->places_, nccl_id, num_trainers, trainer_id)); + member_->places_, nccl_id.get(), num_trainers, trainer_id, + need_group_call)); #else PADDLE_THROW("Not compiled with CUDA"); #endif } - if (member_->local_scopes_.size() != 1 && local_scopes.empty()) { BCastParamsToDevices(bcast_vars); } -// Startup Program has been run. All local scopes has correct parameters. + // Startup Program has been run. All local scopes has correct parameters. -// Step 2. Convert main_program to SSA form and dependency graph. Also, insert -// ncclOp + // Step 2. Convert main_program to SSA form and dependency graph. Also, insert + // ncclOp + std::vector> graphs; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - std::unique_ptr graph = build_strategy.Apply( - main_program, member_->places_, loss_var_name, params, - member_->local_scopes_, member_->use_cuda_, member_->nccl_ctxs_.get()); + if (exec_strategy.type_ == ExecutionStrategy::kParallelGraph) { + for (size_t i = 0; i < member_->places_.size(); ++i) { + std::unique_ptr graph = build_strategy.Apply( + main_program, {member_->places_[i]}, loss_var_name, params, + {member_->local_scopes_[i]}, member_->use_cuda_, + member_->nccl_ctxs_.get()); + graphs.push_back(std::move(graph)); + } + } else { + std::unique_ptr graph = build_strategy.Apply( + main_program, member_->places_, loss_var_name, params, + member_->local_scopes_, member_->use_cuda_, member_->nccl_ctxs_.get()); + graphs.push_back(std::move(graph)); + } auto max_memory_size = GetEagerDeletionThreshold(); - if (max_memory_size >= 0) { + // FIXME(Yancey1989): need to fix on parallel graph mode + if (max_memory_size >= 0 && + exec_strategy.type_ != ExecutionStrategy::kParallelGraph) { for (auto &place : member_->places_) { if (!platform::is_gpu_place(place)) continue; auto gpu_place = boost::get(place); @@ -143,40 +172,48 @@ ParallelExecutor::ParallelExecutor( } } if (!gcs_.empty()) { - auto ref_cnt_pass = - ir::PassRegistry::Instance().Get("reference_count_pass"); - ref_cnt_pass->SetNotOwned(details::kGlobalReferenceCount, &ref_cnts_); - ref_cnt_pass->SetNotOwned(details::kCurReferenceCount, &cur_ref_cnts_); - ref_cnt_pass->SetNotOwned(details::kGarbageCollector, &gcs_); - graph = ref_cnt_pass->Apply(std::move(graph)); - graph->SetNotOwned("garbage_collector", &gcs_); + for (size_t i = 0; i < graphs.size(); ++i) { + auto ref_cnt_pass = + ir::PassRegistry::Instance().Get("reference_count_pass"); + ref_cnt_pass->SetNotOwned(details::kGlobalReferenceCount, &ref_cnts_); + ref_cnt_pass->SetNotOwned(details::kCurReferenceCount, &cur_ref_cnts_); + ref_cnt_pass->SetNotOwned(details::kGarbageCollector, &gcs_); + graphs[0] = ref_cnt_pass->Apply(std::move(graphs[i])); + graphs[0]->SetNotOwned("garbage_collector", &gcs_); + } } } #else std::unique_ptr graph = build_strategy.Apply(main_program, member_->places_, loss_var_name, params, member_->local_scopes_, member_->use_cuda_); + graphs.push_back(std::move(graph)); #endif // Step 3. Create vars in each scope. Passes may also create new vars. // skip control vars and empty vars - std::vector var_infos; - for (auto &node : graph->Nodes()) { - if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { - var_infos.emplace_back(); - var_infos.back().name_ = node->Var()->Name(); - var_infos.back().type_ = node->Var()->GetType(); - var_infos.back().persistable_ = node->Var()->Persistable(); + std::vector> var_infos_list; + for (size_t i = 0; i < graphs.size(); ++i) { + std::vector var_infos; + for (auto &node : graphs[i]->Nodes()) { + if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { + var_infos.emplace_back(); + var_infos.back().name_ = node->Var()->Name(); + var_infos.back().type_ = node->Var()->GetType(); + var_infos.back().persistable_ = node->Var()->Persistable(); + } } + var_infos_list.emplace_back(std::move(var_infos)); } + // If the loss_var_name is given, the number of graph should be only one. if (loss_var_name.size()) { - size_t graph_num = ir::GraphNum(*graph); + size_t graph_num = ir::GraphNum(*graphs[0]); if (graph_num > 1) { LOG(WARNING) << "The number of graph should be only one, " "but the current graph has " - << ir::GraphNum(*graph) + << ir::GraphNum(*graphs[0]) << " sub_graphs. If you want to see the nodes of the " "sub_graphs, you should use 'FLAGS_print_sub_graph_dir' " "to specify the output dir. NOTES: if you not do training, " @@ -185,15 +222,42 @@ ParallelExecutor::ParallelExecutor( } if (exec_strategy.type_ == ExecutionStrategy::kDefault) { + /** + for (size_t i = 0; i < member_->places_.size(); ++i) { + std::vector var_infos; + for (auto &node : graphs[i]->Nodes()) { + if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { + var_infos.emplace_back(); + var_infos.back().name_ = node->Var()->Name(); + var_infos.back().type_ = node->Var()->GetType(); + var_infos.back().persistable_ = node->Var()->Persistable(); + } + } + + std::vector places = {member_->places_[i]}; + std::vector scopes = {member_->local_scopes_[i]}; + std::unique_ptr p(new + details::ThreadedSSAGraphExecutor( + exec_strategy, scopes, places, std::move(graphs[i]))); + + member_->executors_.push_back(std::move(p)); + + member_->executors_[i].reset(new details::ScopeBufferedSSAGraphExecutor( + exec_strategy, scopes, std::move(var_infos), places, + std::move(member_->executors_[i]))); + }**/ member_->executor_.reset(new details::ThreadedSSAGraphExecutor( - exec_strategy, member_->local_scopes_, places, std::move(graph))); + exec_strategy, member_->local_scopes_, places, std::move(graphs[0]))); + } else if (exec_strategy.type_ == ExecutionStrategy::kParallelGraph) { + member_->executor_.reset(new details::ParallelSSAGraphExecutor( + exec_strategy, member_->local_scopes_, places, graphs)); } else { member_->executor_.reset(new details::FastThreadedSSAGraphExecutor( - exec_strategy, member_->local_scopes_, places, std::move(graph))); + exec_strategy, member_->local_scopes_, places, std::move(graphs[0]))); } member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor( - exec_strategy, member_->local_scopes_, std::move(var_infos), + exec_strategy, member_->local_scopes_, std::move(var_infos_list), member_->places_, std::move(member_->executor_))); } diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index ef09b98b2..319701f1e 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -20,6 +20,8 @@ limitations under the License. */ #include #include +#include "ThreadPool.h" + #include "paddle/fluid/framework/details/build_strategy.h" #include "paddle/fluid/framework/details/execution_strategy.h" #include "paddle/fluid/framework/executor.h" diff --git a/paddle/fluid/framework/scope.cc b/paddle/fluid/framework/scope.cc index 0d261dd7c..873f68e42 100644 --- a/paddle/fluid/framework/scope.cc +++ b/paddle/fluid/framework/scope.cc @@ -58,7 +58,10 @@ int64_t GetEagerDeletionThreshold() { (static_cast(1) << 30)); } -Scope::~Scope() { DropKids(); } +Scope::~Scope() { + VLOG(5) << "~Scope()"; + DropKids(); +} Scope& Scope::NewScope() const { SCOPE_LOCK_GUARD diff --git a/paddle/fluid/framework/threadpool.cc b/paddle/fluid/framework/threadpool.cc index fcec95536..7dc7430c5 100644 --- a/paddle/fluid/framework/threadpool.cc +++ b/paddle/fluid/framework/threadpool.cc @@ -48,9 +48,18 @@ void ThreadPool::Init() { ThreadPool::ThreadPool(int num_threads) : running_(true) { threads_.resize(num_threads); - for (auto& thread : threads_) { + for (int i = 0; i < num_threads; ++i) { + // for (auto& thread : threads_) { // TODO(Yancey1989): binding the thread on the specify CPU number - thread.reset(new std::thread(std::bind(&ThreadPool::TaskLoop, this))); + threads_[i].reset( + new std::thread(std::bind(&ThreadPool::TaskLoop, this, i))); + /** + sched_param sch; + int policy; + pthread_getschedparam(threads_[i]->native_handle(), &policy, &sch); + if (pthread_setschedparam(threads_[i]->native_handle(), SCHED_FIFO, &sch)) { + VLOG(1) << "Failed to setschedparam: " << errno; + }**/ } } @@ -68,7 +77,7 @@ ThreadPool::~ThreadPool() { } } -void ThreadPool::TaskLoop() { +void ThreadPool::TaskLoop(int i) { while (true) { Task task; @@ -89,7 +98,6 @@ void ThreadPool::TaskLoop() { task = std::move(tasks_.front()); tasks_.pop(); } - // run the task task(); } diff --git a/paddle/fluid/framework/threadpool.h b/paddle/fluid/framework/threadpool.h index 7a51d18fb..bd8c3cdee 100644 --- a/paddle/fluid/framework/threadpool.h +++ b/paddle/fluid/framework/threadpool.h @@ -14,6 +14,7 @@ limitations under the License. */ #pragma once +#include #include // NOLINT #include #include // NOLINT @@ -27,7 +28,6 @@ limitations under the License. */ namespace paddle { namespace framework { - struct ExceptionHandler { mutable std::future> future_; explicit ExceptionHandler( @@ -99,7 +99,7 @@ class ThreadPool { // The constructor starts threads to run TaskLoop, which retrieves // and runs tasks from the queue. - void TaskLoop(); + void TaskLoop(int i); // Init is called by GetInstance. static void Init(); diff --git a/paddle/fluid/framework/threadpool_test.cc b/paddle/fluid/framework/threadpool_test.cc index 884d61e23..1257a76e3 100644 --- a/paddle/fluid/framework/threadpool_test.cc +++ b/paddle/fluid/framework/threadpool_test.cc @@ -59,3 +59,47 @@ TEST(ThreadPool, ConcurrentRun) { } EXPECT_EQ(sum, ((n + 1) * n) / 2); } +static int64_t GetTS() { + struct timeval tp; + gettimeofday(&tp, NULL); + return tp.tv_sec * 1000000 + tp.tv_usec; +} + +void multi_call(std::function call) { + for (int i = 0; i < 500; ++i) { + call(); + } +} + +TEST(ThreadPool, PERFORMANCE) { + auto sum = [] { + int a = 0; + for (int i = 0; i < 1000; ++i) { + a += i; + } + }; + // framework::ThreadPool *pool = new framework::ThreadPool(2); + int64_t start = GetTS(); + for (int i = 0; i < 1000; ++i) { + // int64_t s = GetTS(); + framework::Async(std::move(sum)); + // pool->Run(std::move(sum)); + // VLOG(5) << "push to pool spent : " << GetTS() - s << " (us)."; + } + VLOG(5) << "pool spent: " << GetTS() - start << " (us)."; + start = GetTS(); + for (int i = 0; i < 1000; ++i) { + sum(); + } + VLOG(5) << "sequence call spent: " << GetTS() - start << " (us)."; + std::vector threads; + start = GetTS(); + for (int i = 0; i < 2; ++i) { + std::thread t(multi_call, std::ref(sum)); + threads.push_back(std::move(t)); + } + for (auto& thread : threads) { + thread.join(); + } + VLOG(5) << "two threads spent: " << GetTS() - start << " (us)."; +} diff --git a/paddle/fluid/operators/reader/blocking_queue.h b/paddle/fluid/operators/reader/blocking_queue.h index 51b980acb..10de11bfa 100644 --- a/paddle/fluid/operators/reader/blocking_queue.h +++ b/paddle/fluid/operators/reader/blocking_queue.h @@ -67,9 +67,12 @@ class BlockingQueue { } bool Receive(T* elem) { + VLOG(1) << "blocking queue::Receive ..."; std::unique_lock lock(mutex_); receive_cv_.wait(lock, [&] { return !queue_.empty() || closed_; }); + VLOG(1) << "queue_.empty()=" << queue_.empty(); if (!queue_.empty()) { + if (elem == nullptr) VLOG(1) << "elem is nullptr"; PADDLE_ENFORCE_NOT_NULL(elem); *elem = queue_.front(); if (LIKELY(!speed_test_mode_)) { diff --git a/paddle/fluid/operators/reader/buffered_reader.cc b/paddle/fluid/operators/reader/buffered_reader.cc index 26ff221df..2d66000f1 100644 --- a/paddle/fluid/operators/reader/buffered_reader.cc +++ b/paddle/fluid/operators/reader/buffered_reader.cc @@ -58,7 +58,9 @@ void BufferedReader::ReadAsync(size_t i) { TensorVec &gpu = gpu_buffer_[i]; gpu.resize(cpu.size()); for (size_t i = 0; i < cpu.size(); ++i) { + VLOG(1) << "launch tensor copy from cpu to cpu, idx: " << i; framework::TensorCopySync(cpu[i], place_, &gpu[i]); + VLOG(1) << "done " << i; gpu[i].set_lod(cpu[i].lod()); } } @@ -80,11 +82,13 @@ void BufferedReader::StartImpl() { } void BufferedReader::ReadNextImpl(std::vector *out) { + VLOG(1) << "ReadNextImpl start on place: " << place_; if (position_.empty()) { out->clear(); return; } size_t i = position_.front().get(); + VLOG(1) << "position front: " << i; position_.pop(); if (i == -1UL) { @@ -101,6 +105,7 @@ void BufferedReader::ReadNextImpl(std::vector *out) { ReadAsync(prev_pos_); } prev_pos_ = i; + VLOG(1) << "success ReadNextImpl"; } } // namespace reader diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index ed719f91d..924c92e0b 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -25,9 +25,15 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase { private: void RunImpl(const framework::Scope& scope, const platform::Place& dev_place) const override { - auto* out = scope.FindVar(Output("Out")) - ->template GetMutable(); + VLOG(1) << "find var in scope: " << &scope; + auto* out_var = scope.FindVar(Output("Out")); + VLOG(1) << "var " << Output("Out") << " -> " << out_var; + auto* out = out_var->GetMutable(); + + // auto* out = scope.Var(Output("Out")) + // ->template GetMutable(); if (out->Get() != nullptr) { + VLOG(1) << Output("Out") << " is not nullptr."; return; } const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) @@ -46,9 +52,11 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase { sin >> num; place = platform::CUDAPlace(static_cast(num)); } - + VLOG(1) << "create buffered reader on " << place; out->Reset(framework::MakeDecoratedReader(underlying_reader, place, 2)); + VLOG(1) << "Reset Buffered Reader in var: " + << scope.FindVar(Input("UnderlyingReader")); } }; diff --git a/paddle/fluid/operators/reader/create_py_reader_op.cc b/paddle/fluid/operators/reader/create_py_reader_op.cc index 901a92ab5..093b0e56b 100644 --- a/paddle/fluid/operators/reader/create_py_reader_op.cc +++ b/paddle/fluid/operators/reader/create_py_reader_op.cc @@ -28,8 +28,10 @@ class PyReader : public framework::FileReader { } void ReadNext(std::vector* out) override { + VLOG(1) << "come in PyReader::ReadNext function, out: " << out; bool success; *out = queue_->Pop(&success); + VLOG(1) << "call PyReader::ReadNext " << success; if (!success) out->clear(); } diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index 38223e069..ae37a1872 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -115,10 +115,12 @@ class PreemptiveReaderContainer : public IReaderContainer { } void ReadNext(std::vector* out) override { + VLOG(1) << "flag"; if (!pending_.empty()) { auto future_it = complete_queue_.Pop(); FutureItem item = future_it->get(); if (item.exception_) { + VLOG(1) << "item has exception!!!"; for (auto it = futures_.begin(); it != futures_.end(); ++it) { if (it != future_it) { it->wait(); // Wait all other threads complete. diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index 7c539d25f..53de53f43 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -82,12 +82,15 @@ struct NCCLContext { struct NCCLContextMap { std::unordered_map contexts_; std::vector order_; + bool need_group_call_; explicit NCCLContextMap(const std::vector &places, ncclUniqueId *nccl_id = nullptr, - size_t num_trainers = 1, size_t trainer_id = 0) { + size_t num_trainers = 1, size_t trainer_id = 0, + bool need_group_call = true) { PADDLE_ENFORCE(!places.empty()); order_.reserve(places.size()); + need_group_call_ = need_group_call; for (auto &p : places) { int dev_id = boost::get(p).device; order_.emplace_back(dev_id); @@ -102,7 +105,7 @@ struct NCCLContextMap { } std::unique_ptr comms(new ncclComm_t[order_.size()]); // if num_trainers == 1, should create a new nccl id for local comms. - if (num_trainers == 1) { + if (num_trainers == 1 && nccl_id != nullptr) { std::lock_guard guard(NCCLGroupGuard::NCCLMutex()); PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( comms.get(), static_cast(order_.size()), order_.data())); diff --git a/paddle/fluid/platform/profiler.cc b/paddle/fluid/platform/profiler.cc index 998242fb4..040a68f67 100644 --- a/paddle/fluid/platform/profiler.cc +++ b/paddle/fluid/platform/profiler.cc @@ -12,9 +12,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include "paddle/fluid/platform/profiler.h" -#include "paddle/fluid/platform/port.h" - #include #include #include @@ -25,9 +22,12 @@ limitations under the License. */ #ifdef PADDLE_WITH_CUDA #include #endif // PADDLE_WITH_CUDA + #include "glog/logging.h" #include "paddle/fluid/framework/block_desc.h" #include "paddle/fluid/platform/device_tracer.h" +#include "paddle/fluid/platform/port.h" +#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/string/printf.h" DEFINE_bool(enable_rpc_profiler, false, "Enable rpc profiler or not."); @@ -173,8 +173,9 @@ void PopEvent(const std::string& name, const DeviceContext* dev_ctx) { RecordEvent::RecordEvent(const std::string& name, const DeviceContext* dev_ctx) : is_enabled_(false), start_ns_(PosixInNsec()) { - std::lock_guard l(profiler_mu); if (g_state == ProfilerState::kDisabled) return; + std::lock_guard l(profiler_mu); + is_enabled_ = true; dev_ctx_ = dev_ctx; name_ = name; @@ -184,8 +185,9 @@ RecordEvent::RecordEvent(const std::string& name, const DeviceContext* dev_ctx) } RecordEvent::~RecordEvent() { - std::lock_guard l(profiler_mu); if (g_state == ProfilerState::kDisabled || !is_enabled_) return; + VLOG(5) << "call ~RecordEvent"; + std::lock_guard l(profiler_mu); DeviceTracer* tracer = GetDeviceTracer(); if (tracer) { tracer->AddCPURecords(CurAnnotation(), start_ns_, PosixInNsec(), diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index fc7991d29..c313ed2a8 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -720,6 +720,11 @@ All parameter, weight, gradient are variables in Paddle. )DOC"); + py::enum_(exec_strategy, "ExecutorType") + .value("Default", ExecutionStrategy::ExecutorType::kDefault) + .value("Experimental", ExecutionStrategy::ExecutorType::kExperimental) + .value("ParallelGraph", ExecutionStrategy::ExecutorType::kParallelGraph); + exec_strategy.def(py::init()) .def_property( "num_threads", @@ -777,17 +782,14 @@ All parameter, weight, gradient are variables in Paddle. [](const ExecutionStrategy &self) { return self.dry_run_; }, [](ExecutionStrategy &self, bool dry_run) { self.dry_run_ = dry_run; - }); - - exec_strategy.def_property( - "use_experimental_executor", - [](const ExecutionStrategy &self) { - return self.type_ == ExecutionStrategy::kExperimental; - }, - [](ExecutionStrategy &self, bool experimental) { - self.type_ = experimental ? ExecutionStrategy::kExperimental - : ExecutionStrategy::kDefault; - }); + }) + .def_property( + "executor_type", + [](const ExecutionStrategy &self) { return self.type_; }, + [](ExecutionStrategy &self, ExecutionStrategy::ExecutorType type) { + self.type_ = type; + }, + R"DOC()DOC"); py::class_ build_strategy(pe, "BuildStrategy", R"DOC( BuildStrategy allows the user to more preciously control how to -- GitLab