diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index ce2eb9455ae77743bd6878c054753cda867ad2b7..4e00630bb124c5e10a3b4e0e346326a45642fa3e 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -196,7 +196,7 @@ endif() target_link_libraries(executor while_op_helper executor_gc_helper) cc_library(parallel_executor SRCS parallel_executor.cc DEPS - threaded_ssa_graph_executor scope_buffered_ssa_graph_executor parallel_ssa_graph_executor + threaded_ssa_graph_executor scope_buffered_ssa_graph_executor parallel_ssa_graph_executor async_ssa_graph_executor graph build_strategy fast_threaded_ssa_graph_executor variable_helper) diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index f1ce744a93b73aa5f00554f93796663c8a698e80..2c1f3ae638cf95c3ab49219909fe3b1f22137099 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -96,6 +96,12 @@ cc_library(threaded_ssa_graph_executor SRCS threaded_ssa_graph_executor.cc DEPS cc_library(parallel_ssa_graph_executor SRCS parallel_ssa_graph_executor.cc DEPS threaded_ssa_graph_executor) +set(ASYNC_SSA_GRAPH_EXECUTOR_DEPS threaded_ssa_graph_executor) +if(WITH_DISTRIBUTE) + list(APPEND ASYNC_SSA_GRAPH_EXECUTOR_DEPS communicator) +endif() +cc_library(async_ssa_graph_executor SRCS async_ssa_graph_executor.cc DEPS ${ASYNC_SSA_GRAPH_EXECUTOR_DEPS}) + cc_test(broadcast_op_test SRCS broadcast_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory device_context broadcast_op_handle) cc_test(gather_op_test SRCS gather_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc new file mode 100644 index 0000000000000000000000000000000000000000..e9aad5d264d1745662848d1ba313b573d0974cb7 --- /dev/null +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -0,0 +1,203 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/details/async_ssa_graph_executor.h" + +#include "paddle/fluid/framework/variable_helper.h" + +#ifdef PADDLE_WITH_DISTRIBUTE +#include "paddle/fluid/operators/distributed/communicator.h" +#endif + +namespace paddle { +namespace framework { +namespace details { + +inline void NewTempScopeAndInitVars(const std::vector &var_infos, + Scope *scope) { + VLOG(3) << "NewTempScopeAndInitVars"; + Scope &local_scope = scope->NewScope(); + *scope->Var(details::kLocalExecScopeName)->GetMutable() = + &local_scope; + + for (auto &info : var_infos) { + if (scope->FindVar(info.name_) != nullptr) { + continue; + } + + if (info.persistable_) { // Persistable + InitializeVariable(scope->Var(info.name_), info.type_); + } else { + InitializeVariable(local_scope.Var(info.name_), info.type_); + } + } +} + +// get RpcContext and remote send and recv op +void ProcessGraph(std::vector graphs, Scope *scope) { +#ifdef PADDLE_WITH_DISTRIBUTE + using RpcCtxMap = operators::distributed::RpcCtxMap; + VLOG(3) << "ProcessGraph"; + RpcCtxMap send_varname_to_ctx; + RpcCtxMap recv_varname_to_ctx; + for (auto i = 0; i < graphs.size(); ++i) { + std::vector nodes_to_delete; + for (auto &node : graphs[i]->Nodes()) { + VLOG(3) << "node name " << node->Name(); + if (node && node->IsOp()) { + if (node->Name() == "send") { + auto send_var_name = node->Op()->Input("X")[0]; + auto send_varnames = boost::get>( + node->Op()->GetNullableAttr("send_varnames")); + auto epmap = boost::get>( + node->Op()->GetNullableAttr("epmap")); + auto height_section = boost::get>( + node->Op()->GetNullableAttr("sections")); + send_varname_to_ctx[send_var_name] = + operators::distributed::RpcContext(send_var_name, send_varnames, + epmap, height_section); + VLOG(3) << "find and init an send op: " + << send_varname_to_ctx[send_var_name]; + } else if (node->Name() == "recv") { + auto recv_var_name = node->Op()->Output("Out")[0]; + auto recv_varnames = boost::get>( + node->Op()->GetNullableAttr("recv_varnames")); + auto epmap = boost::get>( + node->Op()->GetNullableAttr("epmap")); + recv_varname_to_ctx[recv_var_name] = + operators::distributed::RpcContext(recv_var_name, recv_varnames, + epmap, {}); + nodes_to_delete.push_back(node); + VLOG(3) << "find and remove an recv op: " + << recv_varname_to_ctx[recv_var_name]; + } + } + } + } + // init communicator here + if (send_varname_to_ctx.size() > 0) { + VLOG(3) << "this is distribute mode, will use communicator"; + operators::distributed::Communicator::Init(send_varname_to_ctx, + recv_varname_to_ctx, scope); + operators::distributed::Communicator::GetInstance()->Start(); + } +#endif +} + +AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( + const ExecutionStrategy &strategy, const std::vector &local_scopes, + const std::vector &places, std::vector graphs) + : strategy_(std::move(strategy)), + local_scopes_(std::move(local_scopes)), + pool_(places.size() >= 2 ? new ::ThreadPool(places.size()) : nullptr), + places_(std::move(places)), + graphs_(std::move(graphs)) { + VLOG(3) << "build AsyncSSAGraphExecutor"; + PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size()); + + // set the correct size of thread pool to each device. + strategy_.num_threads_ = strategy_.num_threads_ < places_.size() + ? 1UL + : strategy_.num_threads_ / places_.size(); + VLOG(1) << "set num_threads: " << strategy_.num_threads_ + << " to run the operators of the graph on each device."; + for (size_t i = 0; i < places.size(); ++i) { + executors_.emplace_back(new details::ThreadedSSAGraphExecutor( + strategy_, {local_scopes_[i]}, {places_[i]}, graphs_[i])); + } + + for (auto &node : graphs_[0]->Nodes()) { + if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { + var_infos_.emplace_back(); + var_infos_.back().name_ = node->Var()->Name(); + var_infos_.back().type_ = node->Var()->GetType(); + var_infos_.back().persistable_ = node->Var()->Persistable(); + } + } + for (auto *scope : local_scopes_) { + NewTempScopeAndInitVars(var_infos_, scope); + } + ProcessGraph(graphs_, local_scopes_[0]); +} + +void AsyncSSAGraphExecutor::StartOffPythonTrainLoop() { + VLOG(3) << "StartOffPythonTrainLoop size = " << places_.size(); + for (size_t i = 1; i < places_.size(); ++i) { + auto call = [this, i]() -> void { + VLOG(3) << "start off python thread " << i; + try { + while (true) { + executors_[i]->Run({}); + } + } catch (...) { + exception_holder_.Catch(std::current_exception()); + VLOG(3) << "get exception type = " << exception_holder_.Type(); + } + VLOG(3) << "thread " << i << " exited!"; + }; + run_futures_.emplace_back(pool_->enqueue(std::move(call))); + } +} + +void AsyncSSAGraphExecutor::HandleException() { + if (exception_holder_.IsCaught()) { + for (auto &f : run_futures_) { + VLOG(3) << "wait future"; + f.wait(); + } + VLOG(3) << "caught exception " << exception_holder_.Type() + << ", rethrow it"; + run_futures_.clear(); + exception_holder_.ReThrow(); + } +} + +FeedFetchList AsyncSSAGraphExecutor::Run( + const std::vector &fetch_tensors) { + // init once + if (run_futures_.size() == 0 && places_.size() > 1) { + exception_holder_.Clear(); + StartOffPythonTrainLoop(); + } + + if (places_.size() == 1) { + exception_holder_.Clear(); + } else { + HandleException(); + } + + FeedFetchList fetch_data; + fetch_data.reserve(fetch_tensors.size()); + + try { + fetch_data = executors_[0]->Run(fetch_tensors); + } catch (...) { + exception_holder_.Catch(std::current_exception()); + } + + HandleException(); + + FeedFetchList ret; + for (size_t fetch_idx = 0; fetch_idx < fetch_tensors.size(); ++fetch_idx) { + std::vector lodtensor_ptrs; + lodtensor_ptrs.push_back(&fetch_data.at(fetch_idx)); + ret.emplace_back(); + ret.back().MergeLoDTensor(lodtensor_ptrs, platform::CPUPlace()); + } + return ret; +} + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.h b/paddle/fluid/framework/details/async_ssa_graph_executor.h new file mode 100644 index 0000000000000000000000000000000000000000..6aaf8f9a165f2eae3a64874e60084e4d9bdbc182 --- /dev/null +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.h @@ -0,0 +1,65 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include "ThreadPool.h" +#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h" + +namespace paddle { +namespace framework { +namespace details { + +struct VarInfo { + std::string name_; + proto::VarType::Type type_; + bool persistable_; +}; + +class AsyncSSAGraphExecutor : public SSAGraphExecutor { + public: + AsyncSSAGraphExecutor(const ExecutionStrategy &strategy, + const std::vector &local_scopes, + const std::vector &places, + std::vector graphs); + ~AsyncSSAGraphExecutor() final = default; + const ir::Graph &Graph() const override { return *graphs_[0]; } + + FeedFetchList Run(const std::vector &fetch_tensors) override; + + private: + void StartOffPythonTrainLoop(); + void HandleException(); + + private: + ExecutionStrategy strategy_; + std::vector local_scopes_; + std::unique_ptr<::ThreadPool> pool_{nullptr}; + std::vector places_; + std::vector graphs_; + + std::vector> executors_; + ExceptionHolder exception_holder_; + std::vector> run_futures_; + std::vector var_infos_; +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index df69b11ec6ae3bb08ba03b749c69eb718525de4d..027de4cda410178fbae11f1db9a580c2b7ad22a3 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -184,8 +184,12 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { // Convert graph to run on multi-devices. void AppendMultiDevPass(const BuildStrategy &strategy) { ir::Pass *multi_devices_pass = nullptr; - if (strategy.is_distribution_) { - VLOG(10) << "Add dist_multi_devices_pass"; + + if (strategy_.async_mode_) { + multi_devices_pass = AppendPass("async_multi_devices_pass").get(); + } else if (strategy_.is_distribution_) { + VLOG(10) + << "Add dist_multi_devices_pass, multi device parameter server mode"; multi_devices_pass = AppendPass("dist_multi_devices_pass").get(); } else { if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) { @@ -234,10 +238,12 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph, #else const bool use_cuda) const { #endif + VLOG(3) << "apply all passes"; // Create a default one if not finalized by user. CreatePassesFromStrategy(false); for (std::shared_ptr &pass : pass_builder_->AllPasses()) { + VLOG(3) << "apply " << pass->Type(); if (IsMultiDevPass(pass->Type())) { pass->Erase(kPlaces); pass->SetNotOwned>(kPlaces, &places); @@ -293,6 +299,7 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph, graph = pass->Apply(graph); VLOG(3) << "Finish Apply Pass " << pass->Type(); } + VLOG(3) << "All Passes Applied"; return graph; } diff --git a/paddle/fluid/framework/details/build_strategy.h b/paddle/fluid/framework/details/build_strategy.h index 85f328b7c40568cc9246fd4ecab34e8e6778439b..5811693b7ce6d6f2aebc9a8896960226295bd3e5 100644 --- a/paddle/fluid/framework/details/build_strategy.h +++ b/paddle/fluid/framework/details/build_strategy.h @@ -97,6 +97,7 @@ struct BuildStrategy { // num_trainers is 1, so the current fields of build_strategy doesn't tell if // it's distributed model. bool is_distribution_{false}; + bool async_mode_{false}; int num_trainers_{1}; int trainer_id_{0}; std::vector trainers_endpoints_; diff --git a/paddle/fluid/framework/details/exception_holder.h b/paddle/fluid/framework/details/exception_holder.h index 1b1afce04ebbf803f543f839eadc26c522cc89ef..f8fd395bd9cc1e569bf7789e6a3adc63b00716ac 100644 --- a/paddle/fluid/framework/details/exception_holder.h +++ b/paddle/fluid/framework/details/exception_holder.h @@ -14,6 +14,9 @@ #pragma once +#include +#include + #include "glog/logging.h" #include "paddle/fluid/platform/enforce.h" @@ -64,6 +67,21 @@ class ExceptionHolder { ClearImpl(); } + std::string Type() { + std::lock_guard lock(mu_); + switch (type_) { + case kNone: + return "None"; + case kEnforceNotMet: { + return "EnforceNotMet"; + } + case kEOF: { + return "EOF"; + } + } + return "unknown"; + } + private: void ClearImpl() { exception_.reset(); diff --git a/paddle/fluid/framework/details/execution_strategy.h b/paddle/fluid/framework/details/execution_strategy.h index 318694a1d4b0599655f05bf01c907fb6c07a4193..6a8d99f900cf29d5e579a3c9dd5739d2122b7deb 100644 --- a/paddle/fluid/framework/details/execution_strategy.h +++ b/paddle/fluid/framework/details/execution_strategy.h @@ -31,6 +31,8 @@ struct ExecutionStrategy { size_t num_iteration_per_drop_scope_{1}; ExecutorType type_{kDefault}; bool dry_run_{false}; + size_t num_iteration_per_run_{1}; // only use with async_ssa_graph_executor + // and pyreader with data queue }; } // namespace details diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index f80a098bfa26f160d6008cdefbad1803a85f9161..e22bd3917895be8d84a83b3986c6919564b2ddab 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -198,8 +198,22 @@ void MultiDevSSAGraphBuilderBase::ApplyImpl(ir::Graph *graph) const { static_cast(boost::get(node->Op()->GetAttr( OpProtoAndCheckerMaker::OpRoleAttrName())) & static_cast(OpRole::kBackward)); + // optimize op is already processed in DealWithSpecialOp, + // here we only consider backward op if (!is_bk_op) continue; + /* + * the op that will generate the gradient of on parameter will have + one attr op_role_var + * to record the parameter and gradient, like: + attrs { + name: "op_role_var" + type: STRINGS + strings: "fc_1.b_0" + strings: "fc_1.b_0@GRAD" + } + */ + // Currently, we assume that once gradient is generated, it can be // broadcast, and each gradient is only broadcast once. auto backward_vars = @@ -256,6 +270,8 @@ void MultiDevSSAGraphBuilderBase::InsertScaleLossGradOp( break; } + VLOG(3) << "loss_scale: " << loss_scale; + if (loss_scale) { // TODO(paddle-dev): Why is there no input for this op_handle? auto loss_grad_name = node->Op()->OutputArgumentNames()[0]; @@ -407,7 +423,7 @@ void MultiDevSSAGraphBuilderBase::CreateFusedBroadcastOp( void MultiDevSSAGraphBuilderBase::CreateComputationalOp(ir::Graph *result, ir::Node *node, - int dev_id) const { + size_t dev_id) const { result->Get(kGraphOps).emplace_back( new ComputationOpHandle(result->CreateOpNode(node->Op()), local_scopes_[dev_id], places_[dev_id], dev_id)); @@ -494,9 +510,8 @@ void MultiDevSSAGraphBuilderBase::CreateComputationalOps( } } -VarHandle *MultiDevSSAGraphBuilderBase::CreateReduceOp(ir::Graph *result, - const std::string &og, - int dst_dev_id) const { +VarHandle *MultiDevSSAGraphBuilderBase::CreateReduceOp( + ir::Graph *result, const std::string &og, size_t dst_dev_id) const { #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) result->Get(kGraphOps).emplace_back(new ReduceOpHandle( result->CreateEmptyNode("reduce", ir::Node::Type::kOperation), @@ -774,6 +789,8 @@ bool DistSSAGraphBuilder::DealWithSpecialOp(ir::Graph *result, } else if (OpHaveRole(*node, OpRole::kDist)) { int op_dev_id = CreateDistTrainOp(result, node); if (node->Op()->Type() == "concat") { + // the input(block of parameter) of concat is on different device, + // the output(parameter) will on one device. auto origin_param_name = node->Op()->OutputArgumentNames()[0]; bcast_var_name_set_[op_dev_id].emplace(origin_param_name); } @@ -781,6 +798,7 @@ bool DistSSAGraphBuilder::DealWithSpecialOp(ir::Graph *result, } else { int op_dev_id = GetOpDeviceID(node); if (op_dev_id != -1) { // This op only runs on one specific device. + // optimize op will be processed here. CreateComputationalOp(result, node, op_dev_id); for (ir::Node *n : node->outputs) { sharded_var_device_.emplace(n->Name(), op_dev_id); @@ -961,6 +979,7 @@ bool DistSSAGraphBuilder::IsEncoded(const std::string &p_name) const { void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result, const std::string &p_name, const std::string &g_name) const { + // collective gradient to each device size_t cur_device_id = 0; switch (strategy_.reduce_) { case BuildStrategy::ReduceStrategy::kReduce: @@ -1049,3 +1068,5 @@ REGISTER_MULTI_DEVICES_PASS( paddle::framework::details::AllReduceSSAGraphBuilder); REGISTER_MULTI_DEVICES_PASS(dist_multi_devices_pass, paddle::framework::details::DistSSAGraphBuilder); +REGISTER_MULTI_DEVICES_PASS(async_multi_devices_pass, + paddle::framework::details::AsyncSSAGraphBuilder); diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.h b/paddle/fluid/framework/details/multi_devices_graph_pass.h index 611693fc7c241f0afed39ab86390df69b9cf4797..7cc68dd2d5a422cfa1ac3a4bfdd48545a6e5691d 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.h +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.h @@ -56,8 +56,8 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass { bool UseGPU() const; - bool NeedCollectiveForGrad(const std::string &grad_name, - std::vector ops) const; + virtual bool NeedCollectiveForGrad(const std::string &grad_name, + std::vector ops) const; bool IsScaleLossOp(ir::Node *node) const; @@ -70,10 +70,10 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass { proto::VarType::Type dtype) const; VarHandle *CreateReduceOp(ir::Graph *result, const std::string &og, - int dst_dev_id) const; + size_t dst_dev_id) const; void CreateComputationalOp(ir::Graph *result, ir::Node *node, - int dev_id) const; + size_t dev_id) const; bool IsSparseGradient(const std::string &og) const; @@ -115,6 +115,35 @@ class AllReduceSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { virtual void InsertPostprocessOps(ir::Graph *result) const {} }; +class AsyncSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { + protected: + void InsertCollectiveOp(ir::Graph *result, const std::string &p_name, + const std::string &g_name) const override {} + + bool NeedCollectiveForGrad(const std::string &grad_name, + std::vector ops) const { + return false; + } + + bool DealWithSpecialOp(ir::Graph *result, ir::Node *node) const override { + if (node->Op()->Type() == "recv") { + VLOG(1) << "set recv op do_not_run to true"; + node->Op()->SetAttr("do_not_run", true); + node->Op()->Flush(); + } else if (node->Name() == "lookup_table" || node->Name() == "nce" || + node->Name() == "hierarchical_sigmoid") { + // in async_mode, we do not need remote prefetch, because communicator + // will do async parameter recv. + VLOG(1) << "set " << node->Name() << " op remote_prefetch to false"; + node->Op()->SetAttr("remote_prefetch", false); + node->Op()->Flush(); + } + return false; + } + + void InsertPostprocessOps(ir::Graph *result) const override {} +}; + class BalanceVarSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { protected: int GetVarDeviceID(const std::string &varname) const; diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index c00932a7bdb170e63b5fd4d43ccb2072f1a0a9c9..5dde0b76b816bc5309b455d58deb8942300c6af5 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -31,11 +31,23 @@ ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor( prepare_pool_(1), pool_(strategy.num_threads_ >= 2 ? new ::ThreadPool(strategy.num_threads_) : nullptr) { + if (strategy_.num_iteration_per_run_ > 1) { + int read_op_num = 0; + for (auto *node : graph_->Nodes()) { + if (node->IsOp() && node->Name() == "read") { + read_op_num++; + } + } + if (read_op_num == 0) { + LOG(WARNING) << "when num_iteration_per_run_ is larger then 1, the model " + "should use pyreader to feed data!"; + } + } PrepareOpDeps(); CopyOpDeps(); } -FeedFetchList ThreadedSSAGraphExecutor::Run( +inline FeedFetchList ThreadedSSAGraphExecutor::RunImpl( const std::vector &fetch_tensors) { std::unique_ptr event( new platform::RecordEvent("ThreadedSSAGraphExecutorPrepare")); @@ -84,6 +96,8 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( auto cur_ready_vars = ready_vars->PopAll(1, &timeout); if (timeout) { if (exception_holder_.IsCaught()) { + VLOG(3) << "caught exception " << exception_holder_.Type() + << ", rethrow it"; for (auto &run_op_future : run_op_futures_) { run_op_future.wait(); } @@ -114,6 +128,14 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( return fetch_data; } +FeedFetchList ThreadedSSAGraphExecutor::Run( + const std::vector &fetch_tensors) { + for (size_t j = 0; j < strategy_.num_iteration_per_run_ - 1; ++j) { + RunImpl({}); + } + return RunImpl(fetch_tensors); +} + void ThreadedSSAGraphExecutor::InsertFetchOps( const std::vector &fetch_tensors, std::vector *fetch_ops, diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index 1fa5196970512ccc4a3dee698f477711be1e7101..8c026057b480fbc40b7b8f12d8e6b8e54195a141 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -23,7 +23,9 @@ #include #include #include -#include "ThreadPool.h" // ThreadPool in thrird party + +#include // ThreadPool in thrird party + #include "paddle/fluid/framework/blocking_queue.h" #include "paddle/fluid/framework/details/exception_holder.h" #include "paddle/fluid/framework/details/execution_strategy.h" @@ -59,6 +61,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { ~ThreadedSSAGraphExecutor() final = default; private: + inline FeedFetchList RunImpl(const std::vector &fetch_tensors); void RunOp(const std::shared_ptr> &ready_var_q, details::OpHandleBase *op); diff --git a/paddle/fluid/framework/ir/pass.cc b/paddle/fluid/framework/ir/pass.cc index c0ed0519b1ff6aa5960c20e9af697fd1da74a8b5..4a29bde0917d3cce97d69ff3b896d09a2aae82ba 100644 --- a/paddle/fluid/framework/ir/pass.cc +++ b/paddle/fluid/framework/ir/pass.cc @@ -13,11 +13,16 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/framework/ir/pass.h" + +#include +#include + #include "paddle/fluid/framework/ir/graph_helper.h" namespace paddle { namespace framework { namespace ir { + Graph* Pass::Apply(Graph* graph) const { PADDLE_ENFORCE(graph, "graph passed to Pass::Apply() cannot be empty."); for (const std::string& attr : required_pass_attrs_) { diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index ab0947c631fe9a409406b3b092972ae6512beae7..a2a8083da955c93175ab2f01a37737c145e6f1b8 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -24,6 +24,7 @@ limitations under the License. */ #include "paddle/fluid/framework/ir/graph.h" #include "paddle/fluid/framework/details/all_reduce_deps_pass.h" +#include "paddle/fluid/framework/details/async_ssa_graph_executor.h" #include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h" #include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h" @@ -218,6 +219,18 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, } } + std::vector graphs; + if (build_strategy.async_mode_) { + PADDLE_ENFORCE(!member_->use_cuda_, + "gpu mode does not support async_mode_ now!"); + graphs.push_back(graph); + for (int i = 1; i < places.size(); ++i) { + auto *tmp_graph = new ir::Graph(graph->OriginProgram()); + async_graphs_.emplace_back(tmp_graph); + graphs.push_back(tmp_graph); + } + } + // FIXME(Yancey1989): parallel graph mode get better performance // in GPU allreduce distributed training. Need an elegant way to // choice the execution strategy. @@ -294,19 +307,46 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, if (need_broadcast()) { BCastParamsToDevices(bcast_vars, build_strategy.trainer_id_); } + // Startup Program has been run. All local scopes has correct parameters. -// Startup Program has been run. All local scopes has correct parameters. - -// Step 2. Convert main_program to SSA form and dependency graph. Also, insert -// ncclOp + // Step 2. Convert main_program to SSA form and dependency graph. Also, insert + // ncclOp + std::vector async_graphs(places.size()); #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - graph = build_strategy.Apply(graph, member_->places_, loss_var_name, - member_->local_scopes_, member_->nranks_, + if (build_strategy.async_mode_) { + VLOG(3) << "use local async mode"; + graph = build_strategy.Apply(graph, {member_->places_[0]}, loss_var_name, + {member_->local_scopes_[0]}, 1, + member_->use_cuda_, member_->nccl_ctxs_.get()); + for (int i = 1; i < member_->places_.size(); ++i) { + graphs[i] = + build_strategy.Apply(graphs[i], {member_->places_[i]}, loss_var_name, + {member_->local_scopes_[i]}, 1, member_->use_cuda_, member_->nccl_ctxs_.get()); + async_graphs[i] = graphs[i]; + } + } else { + graph = build_strategy.Apply(graph, member_->places_, loss_var_name, + member_->local_scopes_, member_->nranks_, + member_->use_cuda_, member_->nccl_ctxs_.get()); + } #else - graph = build_strategy.Apply(graph, member_->places_, loss_var_name, - member_->local_scopes_, member_->nranks_, - member_->use_cuda_); + if (build_strategy.async_mode_) { + VLOG(3) << "use local async mode"; + graph = build_strategy.Apply(graph, {member_->places_[0]}, loss_var_name, + {member_->local_scopes_[0]}, 1, + member_->use_cuda_); + for (int i = 1; i < member_->places_.size(); ++i) { + graphs[i] = build_strategy.Apply( + graphs[i], {member_->places_[i]}, loss_var_name, + {member_->local_scopes_[i]}, 1, member_->use_cuda_); + async_graphs[i] = graphs[i]; + } + } else { + graph = build_strategy.Apply(graph, member_->places_, loss_var_name, + member_->local_scopes_, member_->nranks_, + member_->use_cuda_); + } #endif auto max_memory_size = GetEagerDeletionThreshold(); @@ -317,6 +357,8 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, static_cast(max_memory_size)); } + async_graphs[0] = graph; + // Step 3. Create vars in each scope. Passes may also create new vars. // skip control vars and empty vars std::vector var_infos; @@ -344,7 +386,12 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, } } - if (build_strategy.enable_parallel_graph_) { + if (build_strategy.async_mode_) { + VLOG(3) << "use AsyncSSAGraphExecutor"; + member_->executor_.reset(new details::AsyncSSAGraphExecutor( + exec_strategy, member_->local_scopes_, member_->places_, async_graphs)); + } else if (build_strategy.enable_parallel_graph_) { + VLOG(3) << "use ParallelSSAGraphExecutor"; #ifdef PADDLE_WITH_CUDA // TODO(Yancey1989): Remove passing in the main_program when // allreduce_seq_pass doesn't need it as the attr. @@ -356,21 +403,27 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, #endif } else { if (exec_strategy.type_ == ExecutionStrategy::kDefault) { + VLOG(3) << "use ThreadedSSAGraphExecutor"; member_->executor_.reset(new details::ThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, graph)); } else { + VLOG(3) << "use FastThreadedSSAGraphExecutor"; member_->executor_.reset(new details::FastThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, graph)); } } - member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor( - exec_strategy, member_->local_scopes_, std::move(var_infos), - member_->places_, std::move(member_->executor_))); + VLOG(3) << "use ScopeBufferedSSAGraphExecutor"; + if (!build_strategy.async_mode_) { + member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor( + exec_strategy, member_->local_scopes_, std::move(var_infos), + member_->places_, std::move(member_->executor_))); + } } void ParallelExecutor::BCastParamsToDevices( const std::vector &vars, int trainer_id) const { + VLOG(3) << "BCastParamsToDevices"; // the initializing bcast, all vars would be bcast from device(0). for (auto &var : vars) { framework::Variable *main_var = member_->local_scopes_[0]->FindVar(var); @@ -425,14 +478,22 @@ void ParallelExecutor::BCastParamsToDevices( auto local_scope = member_->local_scopes_[i]; auto *t = local_scope->Var(var)->GetMutable(); - // FIXME(zcd): LR_DECAY_COUNTER should not be shared. This is a hot fix. - if (member_->use_all_reduce_ || member_->use_cuda_ || - var == "@LR_DECAY_COUNTER@") { + auto copy_memory = [&] { t->Resize(dims); t->mutable_data(cpu, main_tensor.type()); paddle::framework::TensorCopy(main_tensor, cpu, t); + }; + + auto share_memory = [&] { t->ShareDataWith(main_tensor); }; + + // FIXME(zcd): LR_DECAY_COUNTER should not be shared. This is a hot fix. + if (member_->build_strategy_.async_mode_) { + share_memory(); + } else if (member_->use_all_reduce_ || member_->use_cuda_ || + var == "@LR_DECAY_COUNTER@") { + copy_memory(); } else { - t->ShareDataWith(main_tensor); + share_memory(); } } } diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index d4658b9623fe8c23b6a8b2903e3c48d794ba1652..5756627fbd8583428014e24e5aa3f626c908ce1c 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -81,6 +81,7 @@ class ParallelExecutor { const BuildStrategy &build_strategy) const; ParallelExecutorPrivate *member_; + std::vector> async_graphs_; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) std::unique_ptr local_nccl_id_; #endif diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc index 40eafda9bf294f7e8ddd067e9014447f4de1cc0e..d3513fb7dbed0413e61796d8a843c38fbbcf93dc 100644 --- a/paddle/fluid/framework/reader.cc +++ b/paddle/fluid/framework/reader.cc @@ -69,6 +69,9 @@ void ReaderBase::Start() { ReaderBase::~ReaderBase() {} -DecoratedReader::~DecoratedReader() { reader_->Shutdown(); } +DecoratedReader::~DecoratedReader() { + VLOG(1) << "~DecoratedReader"; + reader_->Shutdown(); +} } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 82562bf883d88787858912f7039cf8fef003eccf..4b400e72a4cacd3848b57ac3ba2b3ef5f9a9a9c4 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -16,6 +16,7 @@ #include #include +#include #include #include "paddle/fluid/framework/ddim.h" @@ -77,7 +78,10 @@ class DecoratedReader : public ReaderBase, ~DecoratedReader(); protected: - void ShutdownImpl() override { reader_->Shutdown(); } + void ShutdownImpl() override { + VLOG(1) << "ShutdownImpl"; + reader_->Shutdown(); + } void StartImpl() override { reader_->Start(); } @@ -98,6 +102,8 @@ class ReaderHolder { reader_ = reader_base; } + ~ReaderHolder() { VLOG(1) << "~ReaderHolder"; } + const std::shared_ptr& Get() const { return reader_; } void ReadNext(std::vector* out) { @@ -106,6 +112,7 @@ class ReaderHolder { } void ResetAll() { + VLOG(1) << "ResetAll"; auto end_readers = reader_->GetEndPoints(); for (auto* reader : end_readers) { reader->Shutdown(); @@ -116,11 +123,13 @@ class ReaderHolder { } void Shutdown() { + VLOG(1) << "Shutdown"; PADDLE_ENFORCE_NOT_NULL(reader_); reader_->Shutdown(); } void Start() { + VLOG(1) << "start"; PADDLE_ENFORCE_NOT_NULL(reader_); reader_->Start(); } diff --git a/paddle/fluid/framework/scope.cc b/paddle/fluid/framework/scope.cc index a96baaf41f3fcd24817421a7b620343558cd78d1..49e22a5ad3093c2d61d0ef513974c9938e287729 100644 --- a/paddle/fluid/framework/scope.cc +++ b/paddle/fluid/framework/scope.cc @@ -59,6 +59,10 @@ Scope& Scope::NewScope() const { return *child; } +std::unique_ptr Scope::NewTmpScope() const { + return std::unique_ptr(new Scope(this)); +} + Variable* Scope::Var(const std::string& name) { SCOPE_VARS_WRITER_LOCK return VarInternal(name); diff --git a/paddle/fluid/framework/scope.h b/paddle/fluid/framework/scope.h index 242cbae7163c48fa44dca9237f1cd35f9ec98442..5f3d106e091ace05cfbdbbde2d79d48fe01b4a38 100644 --- a/paddle/fluid/framework/scope.h +++ b/paddle/fluid/framework/scope.h @@ -52,6 +52,10 @@ class Scope { /// Mark it to const because that new kid scope cannot change parent scope. Scope& NewScope() const; + /// Create a sub-scope for current scope but do not record it in the kids to + /// avoid performance problems. + std::unique_ptr NewTmpScope() const; + /// Create a variable with given name if it doesn't exist. /// Caller doesn't own the returned Variable. Variable* Var(const std::string& name); diff --git a/paddle/fluid/framework/variable_helper.cc b/paddle/fluid/framework/variable_helper.cc index 470b596bf84b8b952fbc009c02b0717a1b8e321b..65c939af173a8a2a22d69c636de355293f95dec6 100644 --- a/paddle/fluid/framework/variable_helper.cc +++ b/paddle/fluid/framework/variable_helper.cc @@ -28,7 +28,7 @@ limitations under the License. */ namespace paddle { namespace framework { -void InitializeVariable(Variable* var, proto::VarType::Type var_type) { +void InitializeVariable(Variable *var, proto::VarType::Type var_type) { if (var_type == proto::VarType::LOD_TENSOR) { var->GetMutable(); } else if (var_type == proto::VarType::SELECTED_ROWS) { @@ -38,7 +38,7 @@ void InitializeVariable(Variable* var, proto::VarType::Type var_type) { } else if (var_type == proto::VarType::FETCH_LIST) { var->GetMutable(); } else if (var_type == proto::VarType::STEP_SCOPES) { - var->GetMutable>(); + var->GetMutable>(); } else if (var_type == proto::VarType::LOD_RANK_TABLE) { var->GetMutable(); } else if (var_type == proto::VarType::LOD_TENSOR_ARRAY) { @@ -57,5 +57,27 @@ void InitializeVariable(Variable* var, proto::VarType::Type var_type) { var_type); } } + +void CopyVariable(const Variable &src_var, Variable *dst_var) { + // only support cpu now + auto cpu_place = platform::CPUPlace(); + + if (src_var.IsType()) { + auto *tmp_grad_tensor = dst_var->GetMutable(); + auto &src_tensor = src_var.Get(); + tmp_grad_tensor->set_lod(src_tensor.lod()); + framework::TensorCopy(src_tensor, cpu_place, tmp_grad_tensor); + } else if (src_var.IsType()) { + auto &src_slr = src_var.Get(); + auto *tmp_grad_slr = dst_var->GetMutable(); + tmp_grad_slr->set_rows(src_slr.rows()); + tmp_grad_slr->set_height(src_slr.height()); + auto &src_t = src_slr.value(); + auto *dst_t = tmp_grad_slr->mutable_value(); + framework::TensorCopy(src_t, cpu_place, dst_t); + } else { + PADDLE_THROW("unknown var type to copy"); + } +} } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/variable_helper.h b/paddle/fluid/framework/variable_helper.h index 471869508b9fb07f35b89e08d13c6ca4a2727d6a..5a2c267b7388f6c2de89054dc480fd74b4544bed 100644 --- a/paddle/fluid/framework/variable_helper.h +++ b/paddle/fluid/framework/variable_helper.h @@ -17,7 +17,9 @@ limitations under the License. */ #include "paddle/fluid/framework/variable.h" namespace paddle { namespace framework { -void InitializeVariable(Variable *var, proto::VarType::Type var_type); + +void InitializeVariable(Variable* var, proto::VarType::Type var_type); +void CopyVariable(const Variable& src_var, Variable* dst_var); } // end namespace framework } // end namespace paddle diff --git a/paddle/fluid/operators/distributed/CMakeLists.txt b/paddle/fluid/operators/distributed/CMakeLists.txt index fc28fe818dc0bd2a8607118c015b6b5fd168fb43..972b4f67a8388ce68952fa90aaa224cd45c6d226 100644 --- a/paddle/fluid/operators/distributed/CMakeLists.txt +++ b/paddle/fluid/operators/distributed/CMakeLists.txt @@ -30,7 +30,7 @@ if(WITH_GRPC) else() set(BRPC_SRCS brpc/brpc_client.cc brpc/brpc_server.cc brpc/brpc_sendrecvop_utils.cc brpc/brpc_variable_response.cc brpc/brpc_rdma_pool.cc) - set_source_files_properties(${BRPC_SRCS} parameter_prefetch.cc rpc_server_test.cc brpc/brpc_serde_test.cc collective_server.cc collective_server_test.cc collective_client.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) + set_source_files_properties(${BRPC_SRCS} parameter_prefetch.cc parameter_send.cc parameter_recv.cc communicator.cc rpc_server_test.cc brpc/brpc_serde_test.cc collective_server.cc collective_server_test.cc collective_client.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set(BRPC_DEPS brpc ssl crypto protobuf leveldb snappystream snappy zlib) @@ -50,8 +50,12 @@ endif() cc_test(rpc_server_test SRCS rpc_server_test.cc DEPS ${RPC_DEPS} executor proto_desc lookup_sparse_table_op SERIAL) -cc_test(varhandle_test SRCS varhandle_test.cc DEPS profiler) +cc_test(varhandle_test SRCS varhandle_test.cc DEPS profiler scope) cc_library(parameter_prefetch SRCS parameter_prefetch.cc DEPS sendrecvop_rpc memory) +cc_library(parameter_send SRCS parameter_send.cc DEPS sendrecvop_rpc memory) +cc_library(parameter_recv SRCS parameter_recv.cc DEPS sendrecvop_rpc memory) +cc_library(communicator SRCS communicator.cc DEPS scope selected_rows tensor variable_helper selected_rows_functor simple_threadpool parameter_send parameter_recv) +cc_test(communicator_test SRCS communicator_test.cc DEPS communicator) if(WITH_GPU) cc_test(collective_server_test SRCS collective_server_test.cc DEPS sendrecvop_rpc executor ${RPC_DEPS} diff --git a/paddle/fluid/operators/distributed/communicator.cc b/paddle/fluid/operators/distributed/communicator.cc new file mode 100644 index 0000000000000000000000000000000000000000..eba18c67771fa26eed855b0f19591e06101f424d --- /dev/null +++ b/paddle/fluid/operators/distributed/communicator.cc @@ -0,0 +1,213 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/distributed/communicator.h" + +#include +#include // NOLINT +#include // NOLINT + +#include "paddle/fluid/framework/eigen.h" +#include "paddle/fluid/framework/selected_rows.h" +#include "paddle/fluid/framework/tensor_util.h" +#include "paddle/fluid/framework/variable_helper.h" +#include "paddle/fluid/operators/distributed/parameter_recv.h" +#include "paddle/fluid/operators/distributed/parameter_send.h" + +DEFINE_bool(communicator_independent_recv_thread, true, + "use an independent to recv vars from parameter server"); +DEFINE_int32(communicator_send_queue_size, 20, + "queue size to recv gradient before send"); +DEFINE_int32(communicator_max_send_grad_num_before_recv, 20, + "max grad num to send before recv parameters"); +DEFINE_int32(communicator_thread_pool_size, 5, "thread num to do send or recv"); +DEFINE_int32(communicator_max_merge_var_num, 20, + "max var num to merge and send"); +DEFINE_bool(communicator_fake_rpc, false, + "fake mode does not really send any thing"); + +namespace paddle { +namespace operators { +namespace distributed { + +inline double GetCurrentUS() { + struct timeval time; + gettimeofday(&time, NULL); + return 1e+6 * time.tv_sec + time.tv_usec; +} + +std::unique_ptr Communicator::communicator_(nullptr); +std::once_flag Communicator::init_flag_; + +Communicator::Communicator(const RpcCtxMap &send_varname_to_ctx, + const RpcCtxMap &recv_varname_to_ctx, + Scope *recv_scope) + : send_varname_to_ctx_(send_varname_to_ctx), + recv_varname_to_ctx_(recv_varname_to_ctx), + recv_scope_(recv_scope) { + // get all send information from graph, build vars_to_send + VLOG(0) << "communicator_independent_recv_thread: " + << FLAGS_communicator_independent_recv_thread; + VLOG(0) << "communicator_send_queue_size: " + << FLAGS_communicator_send_queue_size; + VLOG(0) << "communicator_max_send_grad_num_before_recv: " + << FLAGS_communicator_max_send_grad_num_before_recv; + VLOG(0) << "communicator_thread_pool_size: " + << FLAGS_communicator_thread_pool_size; + VLOG(0) << "communicator_max_merge_var_num: " + << FLAGS_communicator_max_merge_var_num; + VLOG(0) << "communicator_fake_rpc: " << FLAGS_communicator_fake_rpc; + send_scope_.reset(new Scope()); + for (auto &iter : send_varname_to_ctx_) { + send_varname_to_queue_[iter.first] = + std::make_shared>>( + FLAGS_communicator_send_queue_size); + } + send_threadpool_.reset(new ::ThreadPool(FLAGS_communicator_thread_pool_size)); + recv_threadpool_.reset(new ::ThreadPool(FLAGS_communicator_thread_pool_size)); +} + +Communicator::~Communicator() { + VLOG(3) << "~Communicator"; + running_ = false; + if (send_thread_) send_thread_->join(); + if (recv_thread_) recv_thread_->join(); + VLOG(3) << "~Communicator done"; +} + +void Communicator::SendThread() { + VLOG(3) << "SendThread start!"; + while (running_) { + std::vector> task_futures; + task_futures.reserve(send_varname_to_ctx_.size()); + VLOG(3) << "run send graph"; + auto before_run_send_graph = GetCurrentUS(); + for (auto &iter : send_varname_to_queue_) { + auto &var_name = iter.first; + auto &var_queue = iter.second; + if (var_queue->Size() > 0) { + auto send_task = [this, &var_name, &var_queue] { + VLOG(3) << var_name << " merge and send"; + std::vector> vars; + size_t merged_var_num = 0; + while (var_queue->Size() > 0 && + merged_var_num < FLAGS_communicator_max_merge_var_num) { + vars.push_back(var_queue->Pop()); + // only count the send number of the first var + if (var_name == send_varname_to_queue_.begin()->first) { + grad_num_.fetch_add(1, std::memory_order_relaxed); + } + merged_var_num++; + } + auto before_merge = GetCurrentUS(); + MergeVars(var_name, vars, send_scope_.get()); + auto after_merge = GetCurrentUS(); + VLOG(3) << "merge " << var_name << " use time " + << after_merge - before_merge; + auto send_functor = distributed::ParameterSend(); + auto &ctx = send_varname_to_ctx_.at(var_name); + if (!FLAGS_communicator_fake_rpc) { + send_functor(ctx, *send_scope_, true); + } + auto after_send = GetCurrentUS(); + VLOG(3) << "send " << var_name << " use time " + << after_send - after_merge; + }; + task_futures.emplace_back( + send_threadpool_->enqueue(std::move(send_task))); + } else { + VLOG(3) << var_name << " queue empty"; + } + } + for (auto &task_f : task_futures) { + task_f.wait(); + } + auto after_run_send_graph = GetCurrentUS(); + auto send_graph_use_time = after_run_send_graph - before_run_send_graph; + if (send_graph_use_time > 100) { + VLOG(1) << "run send graph use time " + << after_run_send_graph - before_run_send_graph; + } + if (!FLAGS_communicator_independent_recv_thread) { + RecvAll(); + } + } +} + +void Communicator::RecvAll() { + VLOG(3) << "parallel run recv graph"; + auto before_send = GetCurrentUS(); + std::vector> task_futures; + task_futures.reserve(recv_varname_to_ctx_.size()); + for (auto &iter : recv_varname_to_ctx_) { + auto recv_task = [this, &iter] { + auto &var_name = iter.first; + VLOG(3) << "recv var " << var_name; + auto recv_functor = distributed::ParameterRecv(); + if (!FLAGS_communicator_fake_rpc) { + recv_functor(iter.second, *recv_scope_); + } + }; + task_futures.emplace_back(recv_threadpool_->enqueue(std::move(recv_task))); + } + for (auto &task : task_futures) { + task.wait(); + } + auto after_recv = GetCurrentUS(); + VLOG(1) << "run recv graph use time " << after_recv - before_send; +} + +void Communicator::RecvThread() { + VLOG(3) << "RecvThread start!"; + while (running_) { + auto grad_num = grad_num_.load(); + if (grad_num > FLAGS_communicator_max_send_grad_num_before_recv) { + VLOG(1) << "current grad num " << grad_num; + RecvAll(); + grad_num_.store(0); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } +} + +void Communicator::Send(const std::string &var_name, + const framework::Scope &scope) { + VLOG(3) << "communicator send " << var_name; + // push var into send queue by var_name + auto *grad_var = scope.FindVar(var_name); + PADDLE_ENFORCE(grad_var->IsInitialized(), "grad var should be inited"); + auto tmp_grad_var = std::make_shared(); + framework::CopyVariable(*grad_var, tmp_grad_var.get()); + auto &queue = send_varname_to_queue_.at(var_name); + VLOG(3) << "send " << var_name << " queue size " << queue->Size(); + queue->Push(tmp_grad_var); +} + +Communicator *Communicator::GetInstance() { return communicator_.get(); } + +void Communicator::Start() { + running_ = true; + // start send and recv thread + send_thread_.reset( + new std::thread(std::bind(&Communicator::SendThread, this))); + if (FLAGS_communicator_independent_recv_thread) { + recv_thread_.reset( + new std::thread(std::bind(&Communicator::RecvThread, this))); + } +} + +} // namespace distributed +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/operators/distributed/communicator.h b/paddle/fluid/operators/distributed/communicator.h new file mode 100644 index 0000000000000000000000000000000000000000..41155bfc31bb31520fdcf5bd50b203f2e1f2c516 --- /dev/null +++ b/paddle/fluid/operators/distributed/communicator.h @@ -0,0 +1,219 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/variable.h" +#include "paddle/fluid/operators/distributed/rpc_common.h" +#include "paddle/fluid/operators/math/math_function.h" +#include "paddle/fluid/operators/math/selected_rows_functor.h" +#include "paddle/fluid/platform/device_context.h" +#include "paddle/fluid/platform/enforce.h" +#include "paddle/fluid/platform/place.h" + +namespace paddle { +namespace operators { +namespace distributed { + +using Scope = framework::Scope; +using Variable = framework::Variable; + +template +class BlockingQueue { + public: + explicit BlockingQueue(size_t capacity) : capacity_(capacity) { + PADDLE_ENFORCE_GT(capacity_, 0, "The capacity must be greater than 0."); + } + + bool Push(const T& elem) { + { + std::unique_lock lock(mutex_); + cv_.wait(lock, [&] { return queue_.size() < capacity_; }); + PADDLE_ENFORCE_LT(queue_.size(), capacity_); + queue_.push_back(elem); + } + cv_.notify_one(); + return true; + } + + bool Push(T&& elem) { + { + std::unique_lock lock(mutex_); + cv_.wait(lock, [&] { return queue_.size() < capacity_; }); + PADDLE_ENFORCE_LT(queue_.size(), capacity_); + queue_.emplace_back(std::move(elem)); + } + cv_.notify_one(); + return true; + } + + T Pop() { + std::unique_lock lock(mutex_); + cv_.wait(lock, [=] { return !queue_.empty(); }); + T rc(std::move(queue_.front())); + queue_.pop_front(); + cv_.notify_one(); + return rc; + } + + size_t Cap() const { + std::lock_guard lock(mutex_); + return capacity_; + } + + size_t Size() const { + std::lock_guard lock(mutex_); + return queue_.size(); + } + + private: + const size_t capacity_; + std::deque queue_; + + mutable std::mutex mutex_; + std::condition_variable cv_; +}; + +template +using EigenVector = framework::EigenVector; + +inline void MergeVars(const std::string& var_name, + const std::vector>& vars, + Scope* scope) { + PADDLE_ENFORCE(!vars.empty(), "should have value to merge!"); + auto cpu_place = platform::CPUPlace(); + auto& var0 = vars[0]; + auto* out_var = scope->Var(var_name); + if (var0->IsType()) { + auto dims = var0->Get().dims(); + VLOG(3) << "merge " << var_name << " LoDTensor " << dims; + + // init output tensor + auto* out_t = out_var->GetMutable(); + out_t->mutable_data(dims, cpu_place); + + // check the input dims + for (auto& var : vars) { + auto& var_t = var->Get(); + PADDLE_ENFORCE_EQ(var_t.dims(), dims, "should have the same dims"); + } + + // set output tensor to 0. + auto cpu_ctx = paddle::platform::CPUDeviceContext(); + math::SetConstant + constant_functor; + constant_functor(cpu_ctx, out_t, static_cast(0)); + + // sum all vars to out + auto result = EigenVector::Flatten(*out_t); + for (auto& var : vars) { + auto& in_t = var->Get(); + auto in = EigenVector::Flatten(in_t); + result.device(*cpu_ctx.eigen_device()) = result + in; + } + } else if (var0->IsType()) { + auto& slr0 = var0->Get(); + auto* out_slr = out_var->GetMutable(); + out_slr->mutable_rows()->clear(); + out_slr->mutable_value()->mutable_data({{}}, cpu_place); + std::vector inputs; + inputs.reserve(vars.size()); + for (auto& var : vars) { + inputs.push_back(&var->Get()); + } + math::scatter::MergeAdd + merge_add; + auto dev_ctx = paddle::platform::CPUDeviceContext(); + merge_add(dev_ctx, inputs, out_slr, false); + VLOG(3) << "merge " << var_name << " SelectedRows height: " << slr0.height() + << " dims: " << slr0.value().dims(); + } else { + PADDLE_THROW("unsupported var type!"); + } +} + +using RpcCtxMap = std::unordered_map; + +class Communicator { + public: + Communicator(const RpcCtxMap& send_varname_to_ctx, + const RpcCtxMap& recv_varname_to_ctx, Scope* recv_scope); + + ~Communicator(); + + void Start(); + + // send grad + void Send(const std::string& var_name, const framework::Scope& scope); + + private: + // recv all parameter + void RecvAll(); + void SendThread(); + void RecvThread(); + + bool running_ = false; + std::unordered_map>>> + send_varname_to_queue_; + RpcCtxMap send_varname_to_ctx_; + RpcCtxMap recv_varname_to_ctx_; + std::unique_ptr send_thread_; + std::unique_ptr recv_thread_; + Scope* recv_scope_; // should be global scope + std::unique_ptr send_scope_; // an independent scope + std::unique_ptr<::ThreadPool> send_threadpool_{nullptr}; + std::unique_ptr<::ThreadPool> recv_threadpool_{nullptr}; + std::atomic_uint grad_num_{0}; // the num of gradient sent since last recv + + // the following code is for initialize the commnunicator + public: + static void Init(const RpcCtxMap& send_varname_to_ctx, + const RpcCtxMap& recv_varname_to_ctx, Scope* recv_scope) { + InitImpl(send_varname_to_ctx, recv_varname_to_ctx, recv_scope); + } + + static Communicator* GetInstance(); + + private: + // Init is called by GetInstance. + static void InitImpl(const RpcCtxMap& send_varname_to_ctx, + const RpcCtxMap& recv_varname_to_ctx, + Scope* recv_scope) { + if (communicator_ == nullptr) { + communicator_.reset(new Communicator(send_varname_to_ctx, + recv_varname_to_ctx, recv_scope)); + } + } + + private: + static std::once_flag init_flag_; + static std::unique_ptr communicator_; +}; + +} // namespace distributed +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/operators/distributed/communicator_test.cc b/paddle/fluid/operators/distributed/communicator_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..5294ac33d15611a003eeb7971891e8ca85ec6a73 --- /dev/null +++ b/paddle/fluid/operators/distributed/communicator_test.cc @@ -0,0 +1,110 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include "paddle/fluid/operators/distributed/communicator.h" + +namespace paddle { +namespace operators { +namespace distributed { + +using LoDTensor = framework::LoDTensor; +using SelectedRows = framework::SelectedRows; + +TEST(communicator, merge_lod_tensors) { + auto cpu_place = platform::CPUPlace(); + auto dims = framework::make_ddim({2, 3}); + std::vector> in_vars; + float out_value = 0; + for (auto i = 0; i < 10; ++i) { + auto var = std::make_shared(); + in_vars.emplace_back(var); + auto *tensor = var->GetMutable(); + auto *data = tensor->mutable_data(dims, cpu_place); + for (auto j = 0; j < tensor->numel(); ++j) { + data[j] = static_cast(i); + } + out_value += static_cast(i); + } + const std::string out_name = "Out"; + std::unique_ptr scope; + scope.reset(new framework::Scope()); + scope->Var(out_name); + for (auto i = 0; i < 10; ++i) { + MergeVars(out_name, in_vars, scope.get()); + } + auto &out_tensor = scope->FindVar(out_name)->Get(); + auto *out_data = out_tensor.data(); + ASSERT_EQ(out_tensor.dims(), dims); + for (auto i = 0; i < out_tensor.numel(); ++i) { + ASSERT_EQ(out_data[i], out_value); + } +} + +TEST(communicator, merge_selected_rows) { + auto cpu_place = platform::CPUPlace(); + int64_t width = 10; + std::vector> in_vars; + const int64_t height = 100; + for (auto i = 0; i < 10; ++i) { + std::vector rows; + for (auto k = 0; k <= i; ++k) { + rows.push_back(k); + } + auto var = std::make_shared(); + in_vars.emplace_back(var); + auto *slr = var->GetMutable(); + slr->set_height(height); + slr->set_rows(rows); + auto dims = + framework::make_ddim({static_cast(rows.size()), width}); + auto *data = slr->mutable_value()->mutable_data(dims, cpu_place); + for (auto i = 0; i < rows.size(); ++i) { + for (auto j = 0; j < width; ++j) { + data[i * width + j] = static_cast(rows[i]); + } + } + } + const std::string out_name = "Out"; + std::unique_ptr scope; + scope.reset(new framework::Scope()); + scope->Var(out_name); + for (auto i = 0; i < 10; ++i) { + MergeVars(out_name, in_vars, scope.get()); + } + auto &out_slr = scope->FindVar(out_name)->Get(); + auto &out_t = out_slr.value(); + auto *out_data = out_t.data(); + ASSERT_EQ(out_t.dims(), framework::make_ddim({10, width})); + std::vector out_values; + out_values.reserve(10); + for (auto i = 0; i < 10; ++i) { + out_values.push_back(static_cast(i * (10 - i))); + } + for (auto i = 0; i < out_slr.rows().size(); ++i) { + ASSERT_EQ(out_slr.rows()[i], i); + for (auto j = 0; j < width; ++j) { + ASSERT_EQ(out_data[i * width + j], out_values[i]); + } + } +} + +} // namespace distributed +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/operators/distributed/grpc/grpc_server.cc b/paddle/fluid/operators/distributed/grpc/grpc_server.cc index 4a9c158cb0ab7f2d6fecbba9f957ae6ef153074c..0eb313f75dfa64f8722faa365128f3111f72bd0b 100644 --- a/paddle/fluid/operators/distributed/grpc/grpc_server.cc +++ b/paddle/fluid/operators/distributed/grpc/grpc_server.cc @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #include +#include #include #include "paddle/fluid/operators/distributed/grpc/grpc_serde.h" @@ -106,7 +107,6 @@ class RequestSend final : public RequestBase { auto invar = request_->GetVar(); int trainer_id = request_->GetTrainerId(); framework::Variable* outvar = nullptr; - request_handler_->Handle(varname, scope, invar, &outvar, trainer_id); Finish(reply_, &responder_); } diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.cc b/paddle/fluid/operators/distributed/parameter_prefetch.cc index 65295c2c103ceca50d9de3ae314246256497d084..0e8d877e08cf6186cef79cd550035cb8699271d2 100644 --- a/paddle/fluid/operators/distributed/parameter_prefetch.cc +++ b/paddle/fluid/operators/distributed/parameter_prefetch.cc @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -38,30 +39,9 @@ using LoDTensor = framework::LoDTensor; using SelectedRows = framework::SelectedRows; using DDim = framework::DDim; -static size_t GetSectionIndex(int64_t id, - const std::vector& abs_sections) { - for (size_t i = 1; i < abs_sections.size(); ++i) { - if (id < abs_sections[i]) { - return i - 1; - } - } - return abs_sections.size() - 1; -} - -static std::vector ToAbsoluteSection( - const std::vector& height_sections) { - std::vector abs_sections; - abs_sections.resize(height_sections.size()); - abs_sections[0] = 0; - for (size_t i = 1; i < height_sections.size(); ++i) { - abs_sections[i] = height_sections[i - 1] + abs_sections[i - 1]; - } - return abs_sections; -} - static std::vector> SplitIds( const std::vector& ids_vector, - const std::vector& height_section, framework::Scope* scope) { + const std::vector& height_section) { std::set all_ids; for (auto id : ids_vector) { all_ids.insert(id); @@ -79,7 +59,7 @@ static std::vector> SplitIds( static void SplitIdsIntoMultipleVarsBySection( const std::vector& in_var_names, - const std::vector& height_section, + const std::vector& height_section, const std::vector>& splited_ids, framework::Scope* scope) { PADDLE_ENFORCE_EQ(in_var_names.size(), height_section.size(), ""); @@ -101,7 +81,7 @@ static void SplitIdsIntoMultipleVarsBySection( static void MergeMultipleVarsIntoOneBySection( const std::string& id_name, const std::vector& ids_vector, const std::string& out_name, const std::vector& out_var_names, - const std::vector& height_section, + const std::vector& height_section, const std::vector>& splited_ids, const framework::ExecutionContext& context, framework::Scope* scope, platform::DeviceContext* actual_ctx) { @@ -178,10 +158,10 @@ static void MergeMultipleVarsIntoOneBySection( void prefetch(const std::string& id_name, const std::string& out_name, const std::vector& table_names, const std::vector& epmap, - const std::vector& height_sections, + const std::vector& height_sections, const framework::ExecutionContext& context, const framework::Scope& scope) { - auto& local_scope = scope.NewScope(); + std::unique_ptr local_scope = scope.NewTmpScope(); platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); auto& cpu_ctx = *pool.Get(platform::CPUPlace()); @@ -225,23 +205,23 @@ void prefetch(const std::string& id_name, const std::string& out_name, #endif } - auto splited_ids = SplitIds(ids_vector, height_sections, &local_scope); + auto splited_ids = SplitIds(ids_vector, height_sections); SplitIdsIntoMultipleVarsBySection(in_var_names, height_sections, splited_ids, - &local_scope); + local_scope.get()); // create output var in local scope for (auto& name : out_var_names) { - local_scope.Var(name)->GetMutable(); + local_scope->Var(name)->GetMutable(); } std::vector rets; for (size_t i = 0; i < in_var_names.size(); i++) { - if (NeedSend(local_scope, in_var_names[i])) { + if (NeedSend(*local_scope.get(), in_var_names[i])) { VLOG(3) << "sending " << in_var_names[i] << " to " << epmap[i] << " to get " << out_var_names[i] << " back"; rets.push_back(rpc_client->AsyncPrefetchVar( - epmap[i], cpu_ctx, local_scope, in_var_names[i], out_var_names[i], - table_names[i])); + epmap[i], cpu_ctx, *local_scope.get(), in_var_names[i], + out_var_names[i], table_names[i])); } else { VLOG(3) << "don't send no-initialied variable: " << out_var_names[i]; } @@ -253,8 +233,7 @@ void prefetch(const std::string& id_name, const std::string& out_name, MergeMultipleVarsIntoOneBySection(id_name, ids_vector, out_name, out_var_names, height_sections, splited_ids, - context, &local_scope, &actual_ctx); - scope.DeleteScope(&local_scope); + context, local_scope.get(), &actual_ctx); } }; // namespace distributed diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.h b/paddle/fluid/operators/distributed/parameter_prefetch.h index 2f850a0332256d458e79ed9da361c86eb8a2f780..0429ec4415dca19ff620cd7af5a8c0a935e17e2f 100644 --- a/paddle/fluid/operators/distributed/parameter_prefetch.h +++ b/paddle/fluid/operators/distributed/parameter_prefetch.h @@ -26,7 +26,7 @@ namespace distributed { void prefetch(const std::string& id_name, const std::string& out_name, const std::vector& table_names, const std::vector& epmap, - const std::vector& height_sections, + const std::vector& height_sections, const framework::ExecutionContext& context, const framework::Scope& scope); @@ -35,7 +35,7 @@ void prefetch_with_reconstruct(const std::string& id_name, const std::string& out_name, const std::vector& table_names, const std::vector& epmap, - const std::vector& height_sections, + const std::vector& height_sections, const framework::ExecutionContext& context, const framework::Scope& scope, framework::LoDTensor* original) { diff --git a/paddle/fluid/operators/distributed/parameter_recv.cc b/paddle/fluid/operators/distributed/parameter_recv.cc new file mode 100644 index 0000000000000000000000000000000000000000..e7d4c262aa9fad10a23adc61b94ba0c38577c0e8 --- /dev/null +++ b/paddle/fluid/operators/distributed/parameter_recv.cc @@ -0,0 +1,104 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include "paddle/fluid/operators/distributed/parameter_recv.h" + +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/selected_rows.h" +#include "paddle/fluid/framework/tensor.h" + +#include "paddle/fluid/operators/distributed/distributed.h" +#include "paddle/fluid/operators/distributed/rpc_client.h" +#include "paddle/fluid/operators/distributed/variable_response.h" +#include "paddle/fluid/operators/distributed_ops/send_recv_util.h" +#include "paddle/fluid/operators/strided_memcpy.h" + +namespace paddle { +namespace operators { +namespace distributed { + +using LoDTensor = framework::LoDTensor; +using LoDTensor = framework::LoDTensor; +using SelectedRows = framework::SelectedRows; +using DDim = framework::DDim; + +template +void ParameterRecv::operator()(const RpcContext &rpc_ctx, + const framework::Scope &scope) { + VLOG(3) << "ParameterRecv in"; + std::unique_ptr local_scope = scope.NewTmpScope(); + + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + auto &cpu_ctx = *pool.Get(platform::CPUPlace()); + + distributed::RPCClient *rpc_client = + distributed::RPCClient::GetInstance(0); + + auto *recv_var = scope.FindVar(rpc_ctx.var_name); + + std::vector recved_tensors; + + // recv all vars to local scope + if (recv_var->IsType()) { + std::vector rets; + for (size_t i = 0; i < rpc_ctx.splited_var_names.size(); i++) { + auto &recv_var_name = rpc_ctx.splited_var_names[i]; + framework::Tensor *t = + local_scope->Var(recv_var_name)->GetMutable(); + recved_tensors.push_back(t); + VLOG(3) << "recv " << recv_var_name << " from " << rpc_ctx.epmap[i]; + rets.push_back(rpc_client->AsyncGetVar(rpc_ctx.epmap[i], cpu_ctx, + *local_scope.get(), recv_var_name, + recv_var_name)); + } + for (size_t i = 0; i < rets.size(); i++) { + PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + } + } else { + PADDLE_THROW("unsupported var type to recv!"); + } + + // concat recved tensor into one var + { + size_t output_offset = 0; + framework::Tensor *recv_tensor = + recv_var->GetMutable(); + auto dev_ctx = paddle::platform::CPUDeviceContext(); + int64_t recv_numel = 0; + for (auto *in : recved_tensors) { + recv_numel += in->numel(); + auto in_stride = framework::stride_numel(in->dims()); + auto out_stride = framework::stride_numel(recv_tensor->dims()); + StridedNumelCopyWithAxis( + dev_ctx, 0, recv_tensor->data() + output_offset, out_stride, + in->data(), in_stride, in_stride[0]); + output_offset += in_stride[0]; + } + PADDLE_ENFORCE_EQ(recv_numel, recv_tensor->numel()); + } + + VLOG(3) << "ParameterRecv out"; +} + +template struct ParameterRecv; + +}; // namespace distributed +}; // namespace operators +}; // namespace paddle diff --git a/paddle/fluid/operators/distributed/parameter_recv.h b/paddle/fluid/operators/distributed/parameter_recv.h new file mode 100644 index 0000000000000000000000000000000000000000..e955fca7250ecc88f3b1a08611f380da50df788d --- /dev/null +++ b/paddle/fluid/operators/distributed/parameter_recv.h @@ -0,0 +1,34 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/operators/distributed/rpc_common.h" + +namespace paddle { +namespace operators { +namespace distributed { + +template +struct ParameterRecv { + void operator()(const RpcContext &rpc_ctx, const framework::Scope &scope); +}; + +}; // namespace distributed +}; // namespace operators +}; // namespace paddle diff --git a/paddle/fluid/operators/distributed/parameter_send.cc b/paddle/fluid/operators/distributed/parameter_send.cc new file mode 100644 index 0000000000000000000000000000000000000000..9ce424445229cde0a7e775c95f4af8839f4d4d68 --- /dev/null +++ b/paddle/fluid/operators/distributed/parameter_send.cc @@ -0,0 +1,175 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include "paddle/fluid/operators/distributed/parameter_send.h" + +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/selected_rows.h" +#include "paddle/fluid/framework/tensor.h" + +#include "paddle/fluid/operators/distributed/distributed.h" +#include "paddle/fluid/operators/distributed/rpc_client.h" +#include "paddle/fluid/operators/distributed/variable_response.h" +#include "paddle/fluid/operators/distributed_ops/send_recv_util.h" + +namespace paddle { +namespace operators { +namespace distributed { + +using LoDTensor = framework::LoDTensor; +using LoDTensor = framework::LoDTensor; +using SelectedRows = framework::SelectedRows; +using DDim = framework::DDim; + +template +void ParameterSend::operator()(const RpcContext &rpc_ctx, + const framework::Scope &scope, bool sync) { + std::unique_ptr local_scope = scope.NewTmpScope(); + + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + auto &cpu_ctx = *pool.Get(platform::CPUPlace()); + + distributed::RPCClient *rpc_client = + distributed::RPCClient::GetInstance(0); + + auto *send_var = scope.FindVar(rpc_ctx.var_name); + size_t out_num = rpc_ctx.splited_var_names.size(); + if (send_var->IsType()) { + if (out_num > 1) { + auto &send_tensor = send_var->Get(); + auto &send_tensor_dims = send_tensor.dims(); + std::vector outs_dims; + outs_dims.reserve(out_num); + + // infer output shape + PADDLE_ENFORCE_EQ(rpc_ctx.height_sections.size(), out_num, + "tensor split sections size" + "should be equal to output size."); + for (size_t i = 0; i < out_num; ++i) { + auto dim = send_tensor_dims; + dim[0] = rpc_ctx.height_sections[i]; + outs_dims.push_back(dim); + } + + // create output var in local scope + size_t row_offset = 0; + for (auto i = 0; i < out_num; ++i) { + framework::Tensor *out = local_scope->Var(rpc_ctx.splited_var_names[i]) + ->GetMutable(); + *out = send_tensor.Slice(row_offset, row_offset + outs_dims[i][0]); + row_offset += outs_dims[i][0]; + } + } + } else if (send_var->IsType()) { + auto &send_slr = send_var->Get(); + auto abs_sections = ToAbsoluteSection(rpc_ctx.height_sections); + + auto &send_rows = send_slr.rows(); + std::vector> outs_rows_idx; + std::vector> outs_dense_idx; + + outs_rows_idx.resize(out_num); + outs_dense_idx.resize(out_num); + + auto row_numel = send_slr.value().numel() / send_slr.value().dims()[0]; + auto *src = send_slr.value().data(); + + // create output var in local scope + std::vector outs; + for (auto &name : rpc_ctx.splited_var_names) { + auto *out = local_scope->Var(name)->GetMutable(); + outs.push_back(out); + } + + // split rows index into output sparse vars + for (size_t i = 0; i < send_rows.size(); ++i) { + size_t out_idx = GetSectionIndex(send_rows[i], abs_sections); + outs_rows_idx[out_idx].push_back(send_rows[i]); + outs_dense_idx[out_idx].push_back(i); + } + auto place = platform::CPUPlace(); + + for (size_t i = 0; i < outs_rows_idx.size(); ++i) { + auto rows_idx = outs_rows_idx[i]; + outs[i]->set_height(rpc_ctx.height_sections[i]); + auto dims = send_slr.GetCompleteDims(); + dims[0] = rows_idx.size(); + outs[i]->mutable_rows()->clear(); + outs[i]->mutable_value()->mutable_data(dims, send_slr.place()); + if (rows_idx.size() > 0) { + for (auto idx : rows_idx) { + outs[i]->mutable_rows()->push_back(idx - abs_sections[i]); + } + auto dst = outs[i]->mutable_value()->mutable_data(place); + for (size_t j = 0; j < rows_idx.size(); j++) { + if (platform::is_cpu_place(place)) { + memory::Copy( + platform::CPUPlace(), dst + j * row_numel, platform::CPUPlace(), + src + outs_dense_idx[i][j] * row_numel, sizeof(T) * row_numel); + } else { + PADDLE_THROW("do not support GPU now"); + /* + #ifdef PADDLE_WITH_CUDA + auto stream = ctx.cuda_device_context().stream(); + memory::Copy(platform::CUDAPlace(), dst + j * row_numel, + platform::CUDAPlace(), + src + outs_dense_idx[i][j] * row_numel, + sizeof(T) * row_numel, stream); + #else + PADDLE_THROW("Paddle is not compiled with GPU"); + #endif + */ + } + } + } + PADDLE_ENFORCE_EQ(rows_idx.size(), outs[i]->rows().size(), + "rows should has the same size with tensor dim 0"); + } + + } else { + PADDLE_THROW("unsupported var type to send!"); + } + + std::vector rets; + for (size_t i = 0; i < rpc_ctx.splited_var_names.size(); i++) { + auto &send_var_name = rpc_ctx.splited_var_names[i]; + auto &endpoint = rpc_ctx.epmap[i]; + if (NeedSend(*local_scope.get(), send_var_name)) { + VLOG(3) << "sending " << send_var_name << " to " << endpoint; + rets.push_back(rpc_client->AsyncSendVar( + endpoint, cpu_ctx, *local_scope.get(), send_var_name)); + } else { + VLOG(3) << "don't send non-initialized variable: " + << rpc_ctx.splited_var_names[i]; + } + } + + if (sync) { + for (auto &handle : rets) { + PADDLE_ENFORCE(handle->Wait(), "internal error in RPCClient"); + } + } +} + +template struct ParameterSend; + +}; // namespace distributed +}; // namespace operators +}; // namespace paddle diff --git a/paddle/fluid/operators/distributed/parameter_send.h b/paddle/fluid/operators/distributed/parameter_send.h new file mode 100644 index 0000000000000000000000000000000000000000..9077f4a4fb9fd9d7152e8be72519f16b1999e93d --- /dev/null +++ b/paddle/fluid/operators/distributed/parameter_send.h @@ -0,0 +1,35 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/operators/distributed/rpc_common.h" + +namespace paddle { +namespace operators { +namespace distributed { + +template +struct ParameterSend { + void operator()(const RpcContext &rpc_ctx, const framework::Scope &scope, + bool sync); +}; + +}; // namespace distributed +}; // namespace operators +}; // namespace paddle diff --git a/paddle/fluid/operators/distributed/request_handler_impl.cc b/paddle/fluid/operators/distributed/request_handler_impl.cc index a1c5c0777402b808eed6306862fd6dd41b529dbd..e289ec929dbd6643a2518b92c1a25b7d63e790a9 100644 --- a/paddle/fluid/operators/distributed/request_handler_impl.cc +++ b/paddle/fluid/operators/distributed/request_handler_impl.cc @@ -59,13 +59,8 @@ bool RequestSendHandler::Handle(const std::string& varname, "async mode should not recv BATCH_BARRIER_MESSAGE or " "COMPLETE_MESSAGE"); } - try { - executor_->RunPreparedContext((*grad_to_prepared_ctx_)[varname].get(), - scope); - } catch (std::exception& e) { - LOG(ERROR) << "async: run sub program error " << e.what(); - return false; - } + executor_->RunPreparedContext((*grad_to_prepared_ctx_)[varname].get(), + scope); return true; } else { // sync rpc_server_->WaitCond(kRequestSend); diff --git a/paddle/fluid/operators/distributed/rpc_common.h b/paddle/fluid/operators/distributed/rpc_common.h new file mode 100644 index 0000000000000000000000000000000000000000..3de89c2ae89d29edc317ca123882d1c55038b6ca --- /dev/null +++ b/paddle/fluid/operators/distributed/rpc_common.h @@ -0,0 +1,76 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include +#include +#include + +namespace paddle { +namespace operators { +namespace distributed { + +struct RpcContext { + RpcContext() = default; + + RpcContext(const std::string &name, const std::vector &names, + const std::vector &emap, + const std::vector §ions) + : var_name(name), + splited_var_names(names), + epmap(emap), + height_sections(sections) {} + + RpcContext(const RpcContext &ctx) { + var_name = ctx.var_name; + splited_var_names = ctx.splited_var_names; + epmap = ctx.epmap; + height_sections = ctx.height_sections; + } + + std::string var_name; + std::vector splited_var_names; + std::vector epmap; + std::vector height_sections; +}; + +inline std::ostream &operator<<(std::ostream &os, const RpcContext &rpc_ctx) { + os << "{"; + os << "var_name: " << rpc_ctx.var_name << "\n"; + + os << "splited_var_names: ["; + for (auto &name : rpc_ctx.splited_var_names) { + os << name << ", "; + } + os << "]\n"; + + os << "epmap: ["; + for (auto &ep : rpc_ctx.epmap) { + os << ep << ", "; + } + os << "]\n"; + + os << "height_sections: ["; + for (auto §ion : rpc_ctx.height_sections) { + os << section << ", "; + } + os << "]\n"; + os << "}"; + return os; +} + +} // namespace distributed +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/operators/distributed/variable_response.h b/paddle/fluid/operators/distributed/variable_response.h index 294cae5f44a4701c064c3669af7b4138f68659e6..3cabcd22cd52222aff2555a8449e558de2c287c0 100644 --- a/paddle/fluid/operators/distributed/variable_response.h +++ b/paddle/fluid/operators/distributed/variable_response.h @@ -60,13 +60,14 @@ class VariableResponse { bool create_scope = false) : scope_(scope), dev_ctx_(dev_ctx), create_scope_(create_scope) { if (create_scope) { - local_scope_ = &scope->NewScope(); + local_scope_ = scope->NewTmpScope().release(); } } virtual ~VariableResponse() { - if (create_scope_) { - scope_->DeleteScope(local_scope_); + if (local_scope_) { + delete local_scope_; + local_scope_ = nullptr; } } diff --git a/paddle/fluid/operators/distributed_ops/CMakeLists.txt b/paddle/fluid/operators/distributed_ops/CMakeLists.txt index a8bb597cbd59290df1347c164d37104c6ac431e9..a1ef1af39ff2ab1456706ebafbd3d7ce1acc0c07 100644 --- a/paddle/fluid/operators/distributed_ops/CMakeLists.txt +++ b/paddle/fluid/operators/distributed_ops/CMakeLists.txt @@ -2,9 +2,9 @@ include(operators) set(DISTRIBUTE_DEPS "") if(WITH_GRPC) - set(DISTRIBUTE_DEPS sendrecvop_rpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf node) + set(DISTRIBUTE_DEPS sendrecvop_rpc parameter_send parameter_recv communicator grpc++_unsecure grpc_unsecure gpr cares zlib protobuf node) else() - set(DISTRIBUTE_DEPS sendrecvop_rpc brpc leveldb snappystream snappy protobuf ssl crypto zlib node) + set(DISTRIBUTE_DEPS sendrecvop_rpc parameter_send parameter_recv communicator brpc leveldb snappystream snappy protobuf ssl crypto zlib node) if(WITH_BRPC_RDMA) find_library(IBVERBS_LIBRARY NAMES ibverbs) ADD_LIBRARY(ibverbs SHARED IMPORTED GLOBAL) diff --git a/paddle/fluid/operators/distributed_ops/recv_op.cc b/paddle/fluid/operators/distributed_ops/recv_op.cc index 120c65f29699bf2745b09ea312d1de069c8173c5..3fd0700a077321d931e87b1d94c3637d167c9eff 100644 --- a/paddle/fluid/operators/distributed_ops/recv_op.cc +++ b/paddle/fluid/operators/distributed_ops/recv_op.cc @@ -20,6 +20,8 @@ limitations under the License. */ #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/operators/distributed/distributed.h" +#include "paddle/fluid/operators/distributed/parameter_recv.h" +#include "paddle/fluid/operators/distributed/rpc_common.h" #include "paddle/fluid/platform/profiler.h" namespace paddle { @@ -34,6 +36,11 @@ class RecvOp : public framework::OperatorBase { void RunImpl(const framework::Scope &scope, const platform::Place &place) const override { + bool do_not_run = Attr("do_not_run"); + if (do_not_run) { + VLOG(3) << "recv do not run!"; + return; + } std::vector epmap = Attr>("epmap"); std::vector varnames = Attr>("varnames"); @@ -48,32 +55,41 @@ class RecvOp : public framework::OperatorBase { distributed::RPCClient::GetInstance( Attr("trainer_id")); - if (with_barrier) { - std::vector rets; - for (size_t i = 0; i < outs.size(); i++) { - std::string varname = varnames.size() == 0 ? outs[i] : varnames[i]; - VLOG(4) << "recv " << outs[i] << " from " << epmap[i] << " with " - << varname << " and with AsyncGetVar"; - rets.push_back( - rpc_client->AsyncGetVar(epmap[i], ctx, scope, varname, outs[i])); - } - if (sync_mode) { + std::vector recv_varnames = + Attr>("recv_varnames"); + + if (recv_varnames.size() > 0) { + auto recv_functor = distributed::ParameterRecv(); + auto rpc_ctx = distributed::RpcContext(outs[0], recv_varnames, epmap, {}); + recv_functor(rpc_ctx, scope); + } else { + if (with_barrier) { + std::vector rets; + for (size_t i = 0; i < outs.size(); i++) { + std::string varname = varnames.size() == 0 ? outs[i] : varnames[i]; + VLOG(4) << "recv " << outs[i] << " from " << epmap[i] << " with " + << varname << " and with AsyncGetVar"; + rets.push_back( + rpc_client->AsyncGetVar(epmap[i], ctx, scope, varname, outs[i])); + } + if (sync_mode) { + for (size_t i = 0; i < rets.size(); i++) { + PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + } + } + } else { + std::vector rets; + for (size_t i = 0; i < outs.size(); i++) { + std::string varname = varnames.size() == 0 ? outs[i] : varnames[i]; + VLOG(4) << "recv " << outs[i] << " from " << epmap[i] << " with " + << varname << " and with AsyncGetVarNoBarrier"; + rets.push_back(rpc_client->AsyncGetVarNoBarrier(epmap[i], ctx, scope, + varname, outs[i])); + } for (size_t i = 0; i < rets.size(); i++) { PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); } } - } else { - std::vector rets; - for (size_t i = 0; i < outs.size(); i++) { - std::string varname = varnames.size() == 0 ? outs[i] : varnames[i]; - VLOG(4) << "recv " << outs[i] << " from " << epmap[i] << " with " - << varname << " and with AsyncGetVarNoBarrier"; - rets.push_back(rpc_client->AsyncGetVarNoBarrier(epmap[i], ctx, scope, - varname, outs[i])); - } - for (size_t i = 0; i < rets.size(); i++) { - PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); - } } } }; @@ -110,6 +126,12 @@ This operator can get variables from server side. "for example: we need var named 'moment_1@127.0.0.1:1001', " "and it real name on parameter server is 'moment_1'. ") .SetDefault({}); + AddAttr>( + "recv_varnames", + "(vector) " + "the splited parameter varnames to be recved from pserver") + .SetDefault(std::vector{}); + AddAttr("do_not_run", "if recv need to really run").SetDefault(false); } }; diff --git a/paddle/fluid/operators/distributed_ops/send_op.cc b/paddle/fluid/operators/distributed_ops/send_op.cc index e2c2147ab5e9a76498a0fd9e1f18b75eed32e91e..b08cd0942f8c89b60d722c931d0cec2063b96578 100644 --- a/paddle/fluid/operators/distributed_ops/send_op.cc +++ b/paddle/fluid/operators/distributed_ops/send_op.cc @@ -19,7 +19,10 @@ limitations under the License. */ #include "paddle/fluid/framework/data_type.h" #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/operators/distributed/communicator.h" #include "paddle/fluid/operators/distributed/distributed.h" +#include "paddle/fluid/operators/distributed/parameter_send.h" +#include "paddle/fluid/operators/distributed/rpc_common.h" #include "paddle/fluid/operators/distributed_ops/send_recv_util.h" #include "paddle/fluid/platform/profiler.h" @@ -37,30 +40,47 @@ class SendOp : public framework::OperatorBase { const platform::Place& place) const override { auto ins = Inputs("X"); - std::vector epmap = Attr>("epmap"); + auto epmap = Attr>("epmap"); int sync_send = Attr("sync_mode"); - platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); - auto& ctx = *pool.Get(place); + auto send_varnames = Attr>("send_varnames"); + auto height_sections = Attr>("sections"); - distributed::RPCClient* rpc_client = - distributed::RPCClient::GetInstance( - Attr("trainer_id")); - - std::vector rets; - for (size_t i = 0; i < ins.size(); i++) { - if (NeedSend(scope, ins[i])) { - VLOG(3) << "sending " << ins[i] << " to " << epmap[i]; - rets.push_back(rpc_client->AsyncSendVar(epmap[i], ctx, scope, ins[i])); + if (send_varnames.size() > 0) { + PADDLE_ENFORCE_EQ(ins.size(), 1, ""); + if (distributed::Communicator::GetInstance() == nullptr) { + auto send_functor = distributed::ParameterSend(); + auto rpc_ctx = distributed::RpcContext(ins[0], send_varnames, epmap, + height_sections); + send_functor(rpc_ctx, scope, true); } else { - VLOG(3) << "don't send no-initialied variable: " << ins[i]; + distributed::Communicator::GetInstance()->Send(ins[0], scope); } - } - if (sync_send) { - for (size_t i = 0; i < rets.size(); i++) { - VLOG(7) << "before sync_send " << ins[i] << "from " << epmap[i]; - PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); - VLOG(7) << "after sync_send " << ins[i] << "from " << epmap[i]; + } else { + platform::DeviceContextPool& pool = + platform::DeviceContextPool::Instance(); + auto& ctx = *pool.Get(place); + + distributed::RPCClient* rpc_client = + distributed::RPCClient::GetInstance( + Attr("trainer_id")); + + std::vector rets; + for (size_t i = 0; i < ins.size(); i++) { + if (NeedSend(scope, ins[i])) { + VLOG(3) << "sending " << ins[i] << " to " << epmap[i]; + rets.push_back( + rpc_client->AsyncSendVar(epmap[i], ctx, scope, ins[i])); + } else { + VLOG(3) << "don't send no-initialied variable: " << ins[i]; + } + } + if (sync_send) { + for (size_t i = 0; i < rets.size(); i++) { + VLOG(7) << "before sync_send " << ins[i] << "from " << epmap[i]; + PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + VLOG(7) << "after sync_send " << ins[i] << "from " << epmap[i]; + } } } } @@ -88,6 +108,21 @@ This operator will send variables to listen_and_serve op at the parameter server "Server endpoints in the order of input " "variables for mapping") .SetDefault({"127.0.0.1:6164"}); + AddAttr>("sections", + "(vector) " + "the length of each output along the " + "specified axis.") + .SetDefault(std::vector{}); + AddAttr>( + "send_varnames", + "(vector) " + "the splited output varnames to send to pserver") + .SetDefault(std::vector{}); + AddAttr("num", + "(int, default 0)" + "Number of sub-tensors. This must evenly divide " + "Input.dims()[axis]") + .SetDefault(0); } }; diff --git a/paddle/fluid/operators/distributed_ops/send_recv_util.h b/paddle/fluid/operators/distributed_ops/send_recv_util.h index dc26c53c64f06ce21856fb5af8f2a5eb3fc75bb7..c05a1ff1da8803c1ef3161d0e9d8604f9f1e5f3b 100644 --- a/paddle/fluid/operators/distributed_ops/send_recv_util.h +++ b/paddle/fluid/operators/distributed_ops/send_recv_util.h @@ -13,8 +13,14 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once + #include +#include + #include "paddle/fluid/framework/ir/node.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/selected_rows.h" namespace paddle { namespace operators { @@ -42,5 +48,26 @@ inline bool NeedSend(const framework::Scope& scope, return false; } +inline std::vector ToAbsoluteSection( + const std::vector& height_sections) { + std::vector abs_sections; + abs_sections.resize(height_sections.size()); + abs_sections[0] = 0; + for (size_t i = 1; i < height_sections.size(); ++i) { + abs_sections[i] = height_sections[i - 1] + abs_sections[i - 1]; + } + return abs_sections; +} + +inline size_t GetSectionIndex(int64_t id, + const std::vector& abs_sections) { + for (size_t i = 1; i < abs_sections.size(); ++i) { + if (id < abs_sections[i]) { + return i - 1; + } + } + return abs_sections.size() - 1; +} + } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/hierarchical_sigmoid_op.cc b/paddle/fluid/operators/hierarchical_sigmoid_op.cc index d0e1057c4357e372d3ab396841de7b2d0577d365..479b839e473591ba57945b496b83b0e76f620534 100644 --- a/paddle/fluid/operators/hierarchical_sigmoid_op.cc +++ b/paddle/fluid/operators/hierarchical_sigmoid_op.cc @@ -134,9 +134,9 @@ class HierarchicalSigmoidOpMaker : public framework::OpProtoAndCheckerMaker { // for parameter prefetch AddAttr("remote_prefetch", "").SetDefault(false); AddAttr("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0); - AddAttr>("height_sections", - "Height for each output SelectedRows.") - .SetDefault(std::vector({})); + AddAttr>("height_sections", + "Height for each output SelectedRows.") + .SetDefault(std::vector({})); AddAttr>( "epmap", "(string vector, default 127.0.0.1:6164)" diff --git a/paddle/fluid/operators/hierarchical_sigmoid_op.h b/paddle/fluid/operators/hierarchical_sigmoid_op.h index 4d5a84bcafed1ab0739349e1dbc7b5a9f9ad64ec..82c8171ca52ffb128df103f27bafbdba1e72e52f 100644 --- a/paddle/fluid/operators/hierarchical_sigmoid_op.h +++ b/paddle/fluid/operators/hierarchical_sigmoid_op.h @@ -13,11 +13,14 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once + #include #include +#include #include #include #include + #include "paddle/fluid/framework/mixed_vector.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/operators/clip_op.h" @@ -65,12 +68,13 @@ class HierarchicalSigmoidOpKernel : public framework::OpKernel { size_t num_classes = static_cast(ctx.Attr("num_classes")); // for remote prefetch + auto remote_prefetch = ctx.Attr("remote_prefetch"); auto epmap = ctx.Attr>("epmap"); - if (!epmap.empty()) { + if (remote_prefetch && !epmap.empty()) { // if epmap is not empty, then the parameter will be fetched from remote // parameter // server - auto height_sections = ctx.Attr>("height_sections"); + auto height_sections = ctx.Attr>("height_sections"); auto table_names = ctx.Attr>("table_names"); std::vector real_rows = PathToRows(*path); framework::Scope& local_scope = ctx.scope().NewScope(); diff --git a/paddle/fluid/operators/lookup_table_op.cc b/paddle/fluid/operators/lookup_table_op.cc index de1baaf510005b634f20e6606048caab3a4b1073..04323eee02c8dbed6eeffef67ef75b18f351e46b 100644 --- a/paddle/fluid/operators/lookup_table_op.cc +++ b/paddle/fluid/operators/lookup_table_op.cc @@ -91,9 +91,9 @@ class LookupTableOpMaker : public framework::OpProtoAndCheckerMaker { // for parameter prefetch AddAttr("remote_prefetch", "").SetDefault(false); AddAttr("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0); - AddAttr>("height_sections", - "Height for each output SelectedRows.") - .SetDefault(std::vector({})); + AddAttr>("height_sections", + "Height for each output SelectedRows.") + .SetDefault(std::vector({})); AddAttr>( "epmap", "(string vector, default 127.0.0.1:6164)" diff --git a/paddle/fluid/operators/lookup_table_op.cu b/paddle/fluid/operators/lookup_table_op.cu index 0af8b9e69cfe09890f28ef2028baa19319a5c379..a863af4af914095a9ee2a7fcc986cc878fd808ea 100644 --- a/paddle/fluid/operators/lookup_table_op.cu +++ b/paddle/fluid/operators/lookup_table_op.cu @@ -84,7 +84,8 @@ class LookupTableCUDAKernel : public framework::OpKernel { // for remote prefetch auto epmap = context.Attr>("epmap"); - auto height_sections = context.Attr>("height_sections"); + auto height_sections = + context.Attr>("height_sections"); auto table_names = context.Attr>("table_names"); if (!epmap.empty()) { diff --git a/paddle/fluid/operators/lookup_table_op.h b/paddle/fluid/operators/lookup_table_op.h index 56c6e37ae3c62e1f9af66ef6ed16111dc1e93d9d..62e298e066948c93a84a131a0dffc0a1d53f2a5b 100644 --- a/paddle/fluid/operators/lookup_table_op.h +++ b/paddle/fluid/operators/lookup_table_op.h @@ -50,10 +50,12 @@ class LookupTableKernel : public framework::OpKernel { // for remote prefetch auto epmap = context.Attr>("epmap"); - auto height_sections = context.Attr>("height_sections"); + auto remote_prefetch = context.Attr("remote_prefetch"); + auto height_sections = + context.Attr>("height_sections"); auto table_names = context.Attr>("table_names"); - if (!epmap.empty()) { + if (remote_prefetch && !epmap.empty()) { // if epmap is not empty, then the parameter will be fetched from remote // parameter // server diff --git a/paddle/fluid/operators/math/selected_rows_functor.h b/paddle/fluid/operators/math/selected_rows_functor.h index 222d761ef91d8aee4843d717dabba7edf131f8dc..db0ee9bc1695f7b1a55b4d111dc470b462210963 100644 --- a/paddle/fluid/operators/math/selected_rows_functor.h +++ b/paddle/fluid/operators/math/selected_rows_functor.h @@ -95,7 +95,7 @@ struct MergeAdd { enum class ScatterOps { ASSIGN, ADD, SUB, SUBBY, MUL, DIV, DIVBY }; -// out = seleted_rows_in / tensor +// out = selected_rows_in / tensor template struct UpdateToTensor { void operator()(const DeviceContext& context, const ScatterOps& op, diff --git a/paddle/fluid/operators/nce_op.cc b/paddle/fluid/operators/nce_op.cc index 3caa4e60656956051bdaf3db4a3aebe1da996b03..358e4f37b5b45c53b88f5477452ebf6448dcc461 100644 --- a/paddle/fluid/operators/nce_op.cc +++ b/paddle/fluid/operators/nce_op.cc @@ -156,9 +156,9 @@ class NCEOpMaker : public framework::OpProtoAndCheckerMaker { // for parameter prefetch AddAttr("remote_prefetch", "").SetDefault(false); AddAttr("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0); - AddAttr>("height_sections", - "Height for each output SelectedRows.") - .SetDefault(std::vector({})); + AddAttr>("height_sections", + "Height for each output SelectedRows.") + .SetDefault(std::vector({})); AddAttr>( "epmap", "(string vector, default 127.0.0.1:6164)" diff --git a/paddle/fluid/operators/nce_op.h b/paddle/fluid/operators/nce_op.h index 3e48b67a570d41482e358ae3941eb1e2b6ab91f8..12f3118ec775dfce13d1f7ff836d82e1d999c65b 100644 --- a/paddle/fluid/operators/nce_op.h +++ b/paddle/fluid/operators/nce_op.h @@ -156,9 +156,10 @@ class NCEKernel : public framework::OpKernel { auto input_mat = EigenMatrix::From(*(context.Input("Input"))); // for remote prefetch + auto remote_prefetch = context.Attr("remote_prefetch"); auto epmap = context.Attr>("epmap"); - if (!epmap.empty()) { + if (remote_prefetch && !epmap.empty()) { // if epmap is not empty, then the parameter will be fetched from remote // parameter // server @@ -172,7 +173,8 @@ class NCEKernel : public framework::OpKernel { framework::Scope &local_scope = context.scope().NewScope(); - auto height_sections = context.Attr>("height_sections"); + auto height_sections = + context.Attr>("height_sections"); auto table_names = context.Attr>("table_names"); auto *ids = local_scope.Var("Ids@Prefetch"); diff --git a/paddle/fluid/operators/reader/blocking_queue.h b/paddle/fluid/operators/reader/blocking_queue.h index 78d238aa6115265023d5d87c01048a87180448d0..b23105916bcef4759c5a212ef019e33e21f2a1b7 100644 --- a/paddle/fluid/operators/reader/blocking_queue.h +++ b/paddle/fluid/operators/reader/blocking_queue.h @@ -80,12 +80,14 @@ class BlockingQueue { return true; } else { PADDLE_ENFORCE(closed_); + VLOG(3) << "queue is closed! return nothing."; return false; } } void ReOpen() { std::lock_guard lock(mutex_); + VLOG(1) << "reopen queue"; closed_ = false; std::deque new_deque; queue_.swap(new_deque); @@ -95,6 +97,7 @@ class BlockingQueue { void Close() { std::lock_guard lock(mutex_); + VLOG(1) << "close queue"; closed_ = true; send_cv_.notify_all(); receive_cv_.notify_all(); diff --git a/paddle/fluid/operators/reader/buffered_reader.cc b/paddle/fluid/operators/reader/buffered_reader.cc index c24e9aedc4ebd8f4fa9e483b1c1cc71fe0bf0aa7..5d93d2e32ef65c7f52723e21e79c825340efc990 100644 --- a/paddle/fluid/operators/reader/buffered_reader.cc +++ b/paddle/fluid/operators/reader/buffered_reader.cc @@ -22,6 +22,7 @@ namespace paddle { namespace operators { namespace reader { BufferedReader::~BufferedReader() { + VLOG(1) << "~BufferedReader"; reader_->Shutdown(); while (!position_.empty()) { position_.front().wait(); @@ -45,6 +46,7 @@ BufferedReader::BufferedReader( thread_pool_(1), place_(place), buffer_size_(buffer_size) { + VLOG(1) << "BufferedReader"; #ifdef PADDLE_WITH_CUDA if (platform::is_gpu_place(place_)) { platform::SetDeviceId(boost::get(place_).device); @@ -131,6 +133,7 @@ void BufferedReader::ReadAsync(size_t i) { } void BufferedReader::ShutdownImpl() { + VLOG(1) << "ShutdownImpl"; reader_->Shutdown(); while (!position_.empty()) { position_.pop(); diff --git a/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h b/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h index 5b53edff5d8ea79a03542231dbf34f5a6f254986..be044085f1435089b3fb736df684358136ea7c10 100644 --- a/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h +++ b/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include "paddle/fluid/framework/ddim.h" @@ -57,7 +58,10 @@ class LoDTensorBlockingQueue { inline void ReOpen() { queue_.ReOpen(); } - inline void Close() { queue_.Close(); } + inline void Close() { + VLOG(1) << "LoDTensorBlockingQueue close"; + queue_.Close(); + } inline bool IsClosed() const { return queue_.IsClosed(); } diff --git a/paddle/fluid/operators/split_selected_rows_op.h b/paddle/fluid/operators/split_selected_rows_op.h index 1fef2b3d378c96d087118d0136885e7e29aa237c..9ec459e2a68d85af526e741d7fd9ecd858383132 100644 --- a/paddle/fluid/operators/split_selected_rows_op.h +++ b/paddle/fluid/operators/split_selected_rows_op.h @@ -16,31 +16,12 @@ limitations under the License. */ #include #include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/operators/distributed_ops/send_recv_util.h" #include "paddle/fluid/operators/math/selected_rows_functor.h" namespace paddle { namespace operators { -static int FindOutIdx(int row, const std::vector& abs_sections) { - for (size_t i = 1; i < abs_sections.size(); ++i) { - if (row < abs_sections[i]) { - return i - 1; - } - } - return abs_sections.size() - 1; -} - -static std::vector ToAbsoluteSection( - const std::vector& height_sections) { - std::vector abs_sections; - abs_sections.resize(height_sections.size()); - abs_sections[0] = 0; - for (size_t i = 1; i < height_sections.size(); ++i) { - abs_sections[i] = height_sections[i - 1] + abs_sections[i - 1]; - } - return abs_sections; -} - template class SplitSelectedRowsOpKernel : public framework::OpKernel { public: @@ -51,7 +32,8 @@ class SplitSelectedRowsOpKernel : public framework::OpKernel { auto abs_sections = ToAbsoluteSection(height_sections); - auto x_rows = x->rows(); + auto& x_rows = x->rows(); + auto height = x->height(); std::vector> outs_rows_idx; std::vector> outs_dense_idx; @@ -63,8 +45,10 @@ class SplitSelectedRowsOpKernel : public framework::OpKernel { // split rows index into output sparse vars for (size_t i = 0; i < x_rows.size(); ++i) { - int out_idx = FindOutIdx(x_rows[i], abs_sections); - outs_rows_idx[out_idx].push_back(x_rows[i]); + auto& id = x_rows[i]; + PADDLE_ENFORCE_LT(id, height); + int out_idx = GetSectionIndex(id, abs_sections); + outs_rows_idx[out_idx].push_back(id); outs_dense_idx[out_idx].push_back(i); } auto place = ctx.GetPlace(); @@ -78,7 +62,9 @@ class SplitSelectedRowsOpKernel : public framework::OpKernel { outs[i]->mutable_rows()->clear(); if (rows_idx.size() > 0) { for (auto idx : rows_idx) { - outs[i]->mutable_rows()->push_back(idx - abs_sections[i]); + auto id_offset = idx - abs_sections[i]; + PADDLE_ENFORCE_LT(id_offset, height_sections[i]); + outs[i]->mutable_rows()->push_back(id_offset); } auto dst = outs[i]->mutable_value()->mutable_data(ctx.GetPlace()); for (size_t j = 0; j < rows_idx.size(); j++) { diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 8b6731256bae27bf8e5af47fcd329141fd388846..5b79b759d555934bbc40a03da316d138f4e81a99 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -625,6 +625,7 @@ All parameter, weight, gradient are variables in Paddle. m.def("init_lod_tensor_blocking_queue", [](Variable &var, size_t capacity) -> std::shared_ptr { + VLOG(1) << "init_lod_tensor_blocking_queue"; auto *holder = var.GetMutable(); holder->InitOnce(capacity, FLAGS_reader_queue_speed_test_mode); return holder->GetQueue(); @@ -1144,6 +1145,17 @@ All parameter, weight, gradient are variables in Paddle. 2. In some NLP model, it may cause the GPU memory is insufficient, in this case, you should reduce `num_iteration_per_drop_scope`. )DOC") + .def_property( + "num_iteration_per_run", + [](const ExecutionStrategy &self) { + return self.num_iteration_per_run_; + }, + [](ExecutionStrategy &self, size_t num_iteration_per_run) { + self.num_iteration_per_run_ = num_iteration_per_run; + }, + R"DOC(This config that how many iteration the executor will run when + user call pe.run() in python + )DOC") .def_property("_dry_run", [](const ExecutionStrategy &self) { return self.dry_run_; }, [](ExecutionStrategy &self, bool dry_run) { @@ -1320,6 +1332,9 @@ All parameter, weight, gradient are variables in Paddle. "is_distribution", [](const BuildStrategy &self) { return self.is_distribution_; }, [](BuildStrategy &self, bool b) { self.is_distribution_ = b; }) + .def_property("async_mode", + [](const BuildStrategy &self) { return self.async_mode_; }, + [](BuildStrategy &self, bool b) { self.async_mode_ = b; }) .def_property( "enable_inplace", [](const BuildStrategy &self) { return self.enable_inplace_; }, diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index eb6895f2a69ade2f5e5c3fe7742fab6fc0a75491..0af883764e157db24e17a1a4ef1bff27f9b39b0f 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -157,6 +157,7 @@ def __bootstrap__(): read_env_flags.append('use_ngraph') if core.is_compiled_with_dist(): + #env for rpc read_env_flags.append('rpc_deadline') read_env_flags.append('rpc_server_profile_path') read_env_flags.append('enable_rpc_profiler') @@ -164,6 +165,14 @@ def __bootstrap__(): read_env_flags.append('rpc_get_thread_num') read_env_flags.append('rpc_prefetch_thread_num') read_env_flags.append('rpc_disable_reuse_port') + + # env for communicator + read_env_flags.append('communicator_independent_recv_thread') + read_env_flags.append('communicator_send_queue_size') + read_env_flags.append('communicator_max_send_grad_num_before_recv') + read_env_flags.append('communicator_thread_pool_size') + read_env_flags.append('communicator_max_merge_var_num') + read_env_flags.append('communicator_fake_rpc') if core.is_compiled_with_brpc(): read_env_flags.append('max_body_size') #set brpc max body size diff --git a/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py new file mode 100644 index 0000000000000000000000000000000000000000..5e77ce9b811bc0474f1e0950e15dedf013dcb4ea --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py @@ -0,0 +1,186 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import unittest + +import numpy +import time +import paddle +import paddle.fluid as fluid + +BATCH_SIZE = 64 + + +def convolutional_neural_network(use_py_reader): + with fluid.unique_name.guard(): + img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + py_reader = None + if use_py_reader: + py_reader = fluid.layers.create_py_reader_by_data( + capacity=64, + feed_list=[img, label], + name='py_reader', + use_double_buffer=False) + img, label = fluid.layers.read_file(py_reader) + + conv_pool_1 = fluid.nets.simple_img_conv_pool( + input=img, + filter_size=5, + num_filters=20, + pool_size=2, + pool_stride=2, + act="relu") + conv_pool_1 = fluid.layers.batch_norm(conv_pool_1) + conv_pool_2 = fluid.nets.simple_img_conv_pool( + input=conv_pool_1, + filter_size=5, + num_filters=50, + pool_size=2, + pool_stride=2, + act="relu") + + prediction = fluid.layers.fc(input=conv_pool_2, size=10, act='softmax') + loss = fluid.layers.cross_entropy(input=prediction, label=label) + avg_loss = fluid.layers.mean(loss) + acc = fluid.layers.accuracy(input=prediction, label=label) + + return img, label, prediction, avg_loss, acc, py_reader + + +def test(): + place = fluid.CPUPlace() + exe = fluid.Executor(place) + + test_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=BATCH_SIZE) + + img, label, prediction, avg_loss, acc, py_reader = convolutional_neural_network( + use_py_reader=False) + feeder = fluid.DataFeeder(feed_list=[img, label], place=place) + + def train_test(train_test_program, train_test_feed, train_test_reader): + acc_set = [] + avg_loss_set = [] + for test_data in train_test_reader(): + acc_np, avg_loss_np = exe.run(program=train_test_program, + feed=train_test_feed.feed(test_data), + fetch_list=[acc, avg_loss]) + acc_set.append(float(acc_np)) + avg_loss_set.append(float(avg_loss_np)) + # get test acc and loss + acc_val_mean = numpy.array(acc_set).mean() + avg_loss_val_mean = numpy.array(avg_loss_set).mean() + return avg_loss_val_mean, acc_val_mean + + # test for epoch + avg_loss_val, acc_val = train_test( + train_test_program=fluid.default_main_program(), + train_test_reader=test_reader, + train_test_feed=feeder) + + print("Test: avg_cost: %s, acc: %s" % (avg_loss_val, acc_val)) + assert acc_val > 0.96 + + +def train(use_cuda, thread_num, cpu_num): + if use_cuda and not fluid.core.is_compiled_with_cuda(): + print("paddle is not compiled with cuda, exit!") + return + + img, label, prediction, avg_loss, acc, py_reader = convolutional_neural_network( + use_py_reader=True) + + optimizer = fluid.optimizer.Adam(learning_rate=0.001) + optimizer.minimize(avg_loss) + + train_reader = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.mnist.train(), buf_size=500), + batch_size=BATCH_SIZE) + + place = fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(fluid.default_startup_program()) + + os.environ['CPU_NUM'] = str(cpu_num) + + print("cpu_num:" + str(cpu_num)) + print("thread_num:" + str(thread_num)) + + build_strategy = fluid.BuildStrategy() + build_strategy.async_mode = True + + exec_strategy = fluid.ExecutionStrategy() + exec_strategy.num_threads = thread_num + exec_strategy.num_iteration_per_run = 10 + + main_program = fluid.default_main_program() + pe = fluid.ParallelExecutor( + use_cuda=False, + loss_name=avg_loss.name, + main_program=main_program, + build_strategy=build_strategy, + exec_strategy=exec_strategy) + + py_reader.decorate_paddle_reader(train_reader) + + for pass_id in range(2): + step = 0 + py_reader.start() + try: + while True: + loss_val = pe.run(fetch_list=[avg_loss.name]) + loss_val = numpy.mean(loss_val) + if step % 10 == 0: + print("Pass %d, Batch %d, Cost %f, queue size %d" % + (pass_id, step, loss_val, py_reader.queue.size())) + step += 1 + except fluid.core.EOFException: + print("train end pass = " + str(pass_id)) + py_reader.reset() + + return step + + +class TestAsyncSSAGraphExecutor(unittest.TestCase): + def test_check_async_ssa_exe_train(self): + step_list = [] + for cpu_num in [1, 2, 4]: + print("run cpu_num -> " + str(cpu_num)) + with fluid.scope_guard(fluid.core.Scope()): + with fluid.program_guard( + main_program=fluid.Program(), + startup_program=fluid.Program()): + start_time = time.time() + step = train( + use_cuda=False, thread_num=cpu_num, cpu_num=cpu_num) + end_time = time.time() + step_list.append(step) + print("cpu_num -> " + str(cpu_num) + " step -> " + str(step) + + " time -> " + str(end_time - start_time)) + with fluid.program_guard( + main_program=fluid.Program(), + startup_program=fluid.Program()): + test() + assert abs(int(step_list[0] / 2) - int(step_list[1])) < 5 + assert abs(int(step_list[1] / 2) - int(step_list[2])) < 5 + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 9c0efe6d905929f87106f18ecf74a7915e39eba9..a5d8cd4660f7428176b82610b1f4e0ace824f1f2 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -52,6 +52,7 @@ class TestDistRunnerBase(object): # NOTE: import fluid until runtime, or else forking processes will cause error. config = fluid.DistributeTranspilerConfig() config.enable_dc_asgd = dc_asgd + # config.runtime_split_send_recv = True t = fluid.DistributeTranspiler(config=config) t.transpile( trainer_id=trainer_id, @@ -139,8 +140,7 @@ class TestDistRunnerBase(object): pass_builder = None if args.batch_merge_repeat > 1: pass_builder = build_stra._finalize_strategy_and_create_passes() - mypass = pass_builder.insert_pass( - len(pass_builder.all_passes()) - 3, "multi_batch_merge_pass") + mypass = pass_builder.insert_pass(0, "multi_batch_merge_pass") mypass.set("num_repeats", args.batch_merge_repeat) if args.update_method == "nccl2" or args.update_method == "nccl2_reduce_layer": diff --git a/python/paddle/fluid/tests/unittests/test_split_selected_rows_op.py b/python/paddle/fluid/tests/unittests/test_split_selected_rows_op.py index f8847e1570dc47d432777faa15f4004f1a7111a6..d8c57d964da706f12b8865195ea94329ca0f10e2 100644 --- a/python/paddle/fluid/tests/unittests/test_split_selected_rows_op.py +++ b/python/paddle/fluid/tests/unittests/test_split_selected_rows_op.py @@ -38,7 +38,7 @@ class TestSpliteSelectedRows(unittest.TestCase): def check_with_place(self, place): scope = core.Scope() rows = [0, 5, 7, 4, 20] - height = 20 + height = 21 row_numel = 2 # initialize input variable X diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index eb54068650e8b3f4e64317778e2ad7c7aa7fe1b2..41e5f47976c566306ad141f655a0f6516831d690 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -156,6 +156,8 @@ class DistributeTranspilerConfig(object): mode = "pserver" print_log = False wait_port = True + # split the send recv var in runtime + runtime_split_send_recv = False class DistributeTranspiler(object): @@ -398,8 +400,10 @@ class DistributeTranspiler(object): orig_var = program.global_block().vars[splited_grad_varname] index = find_op_by_output_arg( program.global_block(), splited_grad_varname, reverse=True) - self._insert_split_op(program, orig_var, index, splited_vars) - index += 1 + if not self.config.runtime_split_send_recv: + self._insert_split_op(program, orig_var, index, + splited_vars) + index += 1 else: AssertionError("Can not insert the send op by original " "variable name :", splited_grad_varname) @@ -408,6 +412,17 @@ class DistributeTranspiler(object): name=framework.generate_control_dev_var_name()) self.grad_name_to_send_dummy_out[grad_varname] = dummy_output + if self.config.runtime_split_send_recv: + send_input_vars = [ + program.global_block().vars[splited_grad_varname] + ] + sections = self._get_splited_var_sections(splited_vars) + send_varnames = [var.name for var in splited_vars] + else: + send_input_vars = splited_vars + sections = [] + send_varnames = [] + # get send op_role_var, if not splited, the grad should have .trainer suffix # if splited, grad should be the original grad var name (split_by_ref and send # will be on the same place). ParallelExecutor @@ -415,10 +430,12 @@ class DistributeTranspiler(object): program.global_block()._insert_op( index=index + 1, type="send", - inputs={"X": splited_vars}, + inputs={"X": send_input_vars}, outputs={"Out": dummy_output}, attrs={ "epmap": eplist, + "sections": sections, + "send_varnames": send_varnames, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE, OP_ROLE_VAR_ATTR_NAME: [ self.grad_name_to_param_name[grad_varname], @@ -501,13 +518,20 @@ class DistributeTranspiler(object): self._update_remote_sparse_update_op( param_varname, height_sections, eps, table_names) else: + recv_varnames = [] + if self.config.runtime_split_send_recv: + orig_param = program.global_block().vars[param_varname] + recv_varnames = [var.name for var in splited_var] + splited_var = [orig_param] all_recv_outputs.extend(splited_var) + program.global_block().append_op( type="recv", inputs={"X": [recv_dep_in]}, outputs={"Out": splited_var}, attrs={ "epmap": eps, + "recv_varnames": recv_varnames, "trainer_id": self.trainer_id, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE, OP_ROLE_VAR_ATTR_NAME: @@ -532,14 +556,15 @@ class DistributeTranspiler(object): continue orig_param = program.global_block().vars[param_varname] if param_varname not in self.sparse_param_to_height_sections: - program.global_block().append_op( - type="concat", - inputs={"X": splited_var}, - outputs={"Out": [orig_param]}, - attrs={ - "axis": 0, - RPC_OP_ROLE_ATTR_NAME: DIST_OP_ROLE_ATTR_VALUE - }) + if not self.config.runtime_split_send_recv: + program.global_block().append_op( + type="concat", + inputs={"X": splited_var}, + outputs={"Out": [orig_param]}, + attrs={ + "axis": 0, + RPC_OP_ROLE_ATTR_NAME: DIST_OP_ROLE_ATTR_VALUE + }) self._get_trainer_startup_program(recv_vars=recv_vars, eplist=eplist) @@ -1552,11 +1577,17 @@ class DistributeTranspiler(object): lod_level=var.lod_level, persistable=persistable) + @staticmethod + def _get_splited_var_sections(splited_vars): + height_sections = [] + for v in splited_vars: + height_sections.append(v.shape[0]) + return height_sections + def _insert_split_op(self, program, orig_var, index, splited_vars): + height_sections = self._get_splited_var_sections(splited_vars) + if orig_var.type == core.VarDesc.VarType.SELECTED_ROWS: - height_sections = [] - for v in splited_vars: - height_sections.append(v.shape[0]) sparse_param_name = self.grad_name_to_param_name[orig_var.name] if self._is_input_of_remote_sparse_update_op(sparse_param_name): self.sparse_param_to_height_sections[ @@ -1571,16 +1602,13 @@ class DistributeTranspiler(object): RPC_OP_ROLE_ATTR_NAME: DIST_OP_ROLE_ATTR_VALUE }) elif orig_var.type == core.VarDesc.VarType.LOD_TENSOR: - sections = [] - for v in splited_vars: - sections.append(v.shape[0]) program.global_block()._insert_op( index=index + 1, type="split_byref", inputs={"X": orig_var}, outputs={"Out": splited_vars}, attrs={ - "sections": sections, + "sections": height_sections, RPC_OP_ROLE_ATTR_NAME: DIST_OP_ROLE_ATTR_VALUE }) else: @@ -2052,7 +2080,7 @@ class DistributeTranspiler(object): Get optimizer operators, parameters and gradients from origin_program Returns: opt_ops (list): optimize operators. - params_grads (dict): paramter->gradient. + params_grads (dict): parameter->gradient. """ block = self.origin_program.global_block() opt_ops = []