diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index 3ba9c1bba1812f2363aaca3d1b2f9eb1fa411c7a..3c73b6cc55c187c3f6e7edd1ce38cc58f4e8413d 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -13,14 +13,14 @@ cc_library(ssa_graph_checker SRCS ssa_graph_checker.cc DEPS ssa_graph_builder) cc_library(variable_visitor SRCS variable_visitor.cc DEPS lod_tensor selected_rows) if(WITH_GPU) - nv_library(nccl_all_reduce_op_handle SRCS nccl_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory + nv_library(all_reduce_op_handle SRCS all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory dynload_cuda variable_visitor) - set(multi_devices_graph_builder_deps nccl_all_reduce_op_handle) nv_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope ddim dynload_cuda) nv_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor dynload_cuda) else() - set(multi_devices_graph_builder_deps) + cc_library(all_reduce_op_handle SRCS all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory + variable_visitor) cc_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope ddim) cc_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor) endif() @@ -29,7 +29,7 @@ cc_library(gather_op_handle SRCS gather_op_handle.cc DEPS op_handle_base scope d cc_library(fuse_vars_op_handle SRCS fuse_vars_op_handle.cc DEPS op_handle_base scope) cc_library(multi_devices_graph_builder SRCS multi_devices_graph_builder.cc DEPS ssa_graph_builder computation_op_handle - scale_loss_grad_op_handle rpc_op_handle ${multi_devices_graph_builder_deps} reduce_op_handle broadcast_op_handle) + scale_loss_grad_op_handle rpc_op_handle all_reduce_op_handle reduce_op_handle broadcast_op_handle) cc_library(ssa_graph_builder_factory SRCS ssa_graph_builder_factory.cc DEPS multi_devices_graph_builder ssa_graph_printer ssa_graph_checker) diff --git a/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc b/paddle/fluid/framework/details/all_reduce_op_handle.cc similarity index 78% rename from paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc rename to paddle/fluid/framework/details/all_reduce_op_handle.cc index 5bba089ade801a06e0364835efe5249105dcfcac..b335d3a0d364c916e19574de8d3ed89aaec7de41 100644 --- a/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/all_reduce_op_handle.cc @@ -13,25 +13,33 @@ // limitations under the License. #include +#include "paddle/fluid/framework/details/all_reduce_op_handle.h" #include "paddle/fluid/framework/details/container_cast.h" -#include "paddle/fluid/framework/details/nccl_all_reduce_op_handle.h" #include "paddle/fluid/framework/details/reduce_and_gather.h" #include "paddle/fluid/framework/details/variable_visitor.h" namespace paddle { namespace framework { namespace details { -NCCLAllReduceOpHandle::NCCLAllReduceOpHandle( - const std::vector &local_scopes, - const std::vector &places, - const platform::NCCLContextMap &ctxs) + +#ifdef PADDLE_WITH_CUDA +AllReduceOpHandle::AllReduceOpHandle(const std::vector &local_scopes, + const std::vector &places, + const platform::NCCLContextMap *ctxs) : local_scopes_(local_scopes), places_(places), nccl_ctxs_(ctxs) { - for (auto &p : places_) { - this->dev_ctxes_[p] = nccl_ctxs_.DevCtx(p); + if (nccl_ctxs_) { + for (auto &p : places_) { + this->dev_ctxes_[p] = nccl_ctxs_->DevCtx(p); + } } } +#else +AllReduceOpHandle::AllReduceOpHandle(const std::vector &local_scopes, + const std::vector &places) + : local_scopes_(local_scopes), places_(places) {} +#endif -void NCCLAllReduceOpHandle::RunImpl() { +void AllReduceOpHandle::RunImpl() { if (NoDummyInputSize() == 1) { return; // No need to all reduce when GPU count = 1; } else { @@ -58,6 +66,8 @@ void NCCLAllReduceOpHandle::RunImpl() { } if (platform::is_gpu_place(lod_tensors[0]->place())) { +#ifdef PADDLE_WITH_CUDA + PADDLE_ENFORCE(nccl_ctxs_, "nccl_ctxs should not be nullptr."); int dtype = -1; size_t numel = 0; std::vector> all_reduce_calls; @@ -75,7 +85,7 @@ void NCCLAllReduceOpHandle::RunImpl() { } int dev_id = boost::get(p).device; - auto &nccl_ctx = nccl_ctxs_.at(dev_id); + auto &nccl_ctx = nccl_ctxs_->at(dev_id); auto stream = nccl_ctx.stream(); auto comm = nccl_ctx.comm_; all_reduce_calls.emplace_back([=] { @@ -90,22 +100,25 @@ void NCCLAllReduceOpHandle::RunImpl() { call(); } }); +#else + PADDLE_THROW("Not compiled with CUDA"); +#endif } else { // Special handle CPU only Operator's gradient. Like CRF auto &trg = *this->local_scopes_[0] ->FindVar(kLocalExecScopeName) ->Get() - ->Var() + ->FindVar(out_var_handles[0]->name_) ->GetMutable(); // Reduce All Tensor to trg in CPU ReduceLoDTensor func(lod_tensors, &trg); VisitDataType(ToDataType(lod_tensors[0]->type()), func); - for (size_t i = 0; i < local_scopes_.size(); ++i) { + for (size_t i = 1; i < local_scopes_.size(); ++i) { auto &scope = *local_scopes_[i]->FindVar(kLocalExecScopeName)->Get(); auto &p = places_[i]; - auto *var = scope.FindVar(in_var_handles[i]->name_); + auto *var = scope.FindVar(out_var_handles[i]->name_); auto *dev_ctx = dev_ctxes_[p]; RunAndRecordEvent(p, [&trg, var, dev_ctx, p] { @@ -118,7 +131,7 @@ void NCCLAllReduceOpHandle::RunImpl() { } } -std::string NCCLAllReduceOpHandle::Name() const { return "nccl_all_reduce"; } +std::string AllReduceOpHandle::Name() const { return "all_reduce"; } } // namespace details } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/details/nccl_all_reduce_op_handle.h b/paddle/fluid/framework/details/all_reduce_op_handle.h similarity index 73% rename from paddle/fluid/framework/details/nccl_all_reduce_op_handle.h rename to paddle/fluid/framework/details/all_reduce_op_handle.h index 8e98d894b828b4162059b30f5c6a74cfc06f402e..fdd250b0d3eb166249271a95f7592b9fadee5265 100644 --- a/paddle/fluid/framework/details/nccl_all_reduce_op_handle.h +++ b/paddle/fluid/framework/details/all_reduce_op_handle.h @@ -20,17 +20,23 @@ #include "paddle/fluid/framework/details/op_handle_base.h" #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/scope.h" +#ifdef PADDLE_WITH_CUDA #include "paddle/fluid/platform/nccl_helper.h" +#endif namespace paddle { namespace framework { namespace details { -struct NCCLAllReduceOpHandle : public OpHandleBase { - NCCLAllReduceOpHandle(const std::vector &local_scopes, - const std::vector &places, - const platform::NCCLContextMap &ctxs); - +struct AllReduceOpHandle : public OpHandleBase { +#ifdef PADDLE_WITH_CUDA + AllReduceOpHandle(const std::vector &local_scopes, + const std::vector &places, + const platform::NCCLContextMap *ctxs); +#else + AllReduceOpHandle(const std::vector &local_scopes, + const std::vector &places); +#endif std::string Name() const override; // Delay and buffer nccl_all_reduce together can significantly increase @@ -43,7 +49,9 @@ struct NCCLAllReduceOpHandle : public OpHandleBase { private: std::vector local_scopes_; std::vector places_; - const platform::NCCLContextMap &nccl_ctxs_; +#ifdef PADDLE_WITH_CUDA + const platform::NCCLContextMap *nccl_ctxs_; +#endif }; } // namespace details diff --git a/paddle/fluid/framework/details/execution_strategy.h b/paddle/fluid/framework/details/execution_strategy.h index e7aa74742f827efabff1189d3213edd748d9082d..716d674fa29bad9321fc20979775c06f26bf4679 100644 --- a/paddle/fluid/framework/details/execution_strategy.h +++ b/paddle/fluid/framework/details/execution_strategy.h @@ -20,7 +20,7 @@ namespace details { struct ExecutionStrategy { size_t num_threads_{0}; - bool use_event_{true}; + bool use_cuda_{true}; bool allow_op_delay_{false}; size_t num_iteration_per_drop_scope_{100}; }; diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.cc b/paddle/fluid/framework/details/multi_devices_graph_builder.cc index b568c1344b7861f404945c79694d2b9e1221b218..78356cb1be3bd089c26dde663275e2c8109df951 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.cc @@ -17,6 +17,7 @@ #include #include +#include "paddle/fluid/framework/details/all_reduce_op_handle.h" #include "paddle/fluid/framework/details/broadcast_op_handle.h" #include "paddle/fluid/framework/details/computation_op_handle.h" #include "paddle/fluid/framework/details/multi_devices_graph_builder.h" @@ -26,10 +27,6 @@ #include "paddle/fluid/framework/op_info.h" #include "paddle/fluid/framework/scope.h" -#ifdef PADDLE_WITH_CUDA -#include "paddle/fluid/framework/details/nccl_all_reduce_op_handle.h" -#endif - namespace paddle { namespace framework { namespace details { @@ -243,7 +240,7 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( CreateReduceOp(&result, g_name, 0); CreateBroadcastOp(&result, g_name, 0); } else { - InsertNCCLAllReduceOp(&result, g_name); + InsertAllReduceOp(&result, g_name); } break; } @@ -286,6 +283,19 @@ bool MultiDevSSAGraphBuilder::IsSparseGradient( return false; } +void MultiDevSSAGraphBuilder::SetCommunicationContext( + OpHandleBase *op_handle, const platform::Place &p) const { +#ifdef PADDLE_WITH_CUDA + if (nccl_ctxs_ == nullptr) { + op_handle->SetDeviceContext(p, + platform::DeviceContextPool::Instance().Get(p)); + } +#else + op_handle->SetDeviceContext(p, + platform::DeviceContextPool::Instance().Get(p)); +#endif +} + void MultiDevSSAGraphBuilder::CreateBroadcastOp(SSAGraph *result, const std::string &p_name, size_t src_dev_id) const { @@ -300,15 +310,12 @@ void MultiDevSSAGraphBuilder::CreateBroadcastOp(SSAGraph *result, op_handle->AddInput(in); for (size_t i = 0; i < places_.size(); ++i) { - auto &vars = result->vars_.at(i).at(p_name); auto &p = places_[i]; + SetCommunicationContext(op_handle, p); + auto &vars = result->vars_.at(i).at(p_name); auto *out_var = new VarHandle(vars.size(), i, p_name, p); vars.emplace_back(out_var); op_handle->AddOutput(out_var); -#ifndef ADDLE_WITH_CUDA - op_handle->SetDeviceContext(p, - platform::DeviceContextPool::Instance().Get(p)); -#endif } } @@ -320,15 +327,19 @@ void MultiDevSSAGraphBuilder::CreateComputationalOp(SSAGraph *result, CreateOpHandleIOs(result, op, dev_id); } -void MultiDevSSAGraphBuilder::InsertNCCLAllReduceOp( - SSAGraph *result, const std::string &og) const { +void MultiDevSSAGraphBuilder::InsertAllReduceOp(SSAGraph *result, + const std::string &og) const { #ifdef PADDLE_WITH_CUDA result->ops_.emplace_back( - new NCCLAllReduceOpHandle(local_scopes_, places_, *nccl_ctxs_)); + new AllReduceOpHandle(local_scopes_, places_, nccl_ctxs_)); +#else + result->ops_.emplace_back(new AllReduceOpHandle(local_scopes_, places_)); +#endif auto *op_handle = result->ops_.back().get(); for (size_t i = 0; i < places_.size(); ++i) { auto &p = places_[i]; + SetCommunicationContext(op_handle, p); auto &vars = result->vars_[i][og]; PADDLE_ENFORCE(!vars.empty()); auto &prev_grad = vars.back(); @@ -338,9 +349,6 @@ void MultiDevSSAGraphBuilder::InsertNCCLAllReduceOp( vars.emplace_back(var); op_handle->AddOutput(var); } -#else - PADDLE_ENFORCE("Not implemented"); -#endif } bool MultiDevSSAGraphBuilder::IsParameterGradientOnce( @@ -379,7 +387,9 @@ void MultiDevSSAGraphBuilder::CreateScaleLossGradOp(SSAGraph *result) const { for (size_t i = 0; i < places_.size(); ++i) { // Insert ScaleCost OpHandle #ifdef PADDLE_WITH_CUDA - auto *communication_dev_ctx = nccl_ctxs_->DevCtx(places_[i]); + auto *communication_dev_ctx = + nccl_ctxs_ ? nccl_ctxs_->DevCtx(places_[i]) + : platform::DeviceContextPool::Instance().Get(places_[i]); #else auto *communication_dev_ctx = platform::DeviceContextPool::Instance().Get(platform::CPUPlace()); @@ -424,12 +434,9 @@ VarHandle *MultiDevSSAGraphBuilder::CreateReduceOp(SSAGraph *result, auto *op_handle = result->ops_.back().get(); for (size_t i = 0; i < places_.size(); ++i) { - auto &vars = result->vars_[i][og]; -#ifndef PADDLE_WITH_CUDA auto &p = places_[i]; - op_handle->SetDeviceContext(p, - platform::DeviceContextPool::Instance().Get(p)); -#endif + SetCommunicationContext(op_handle, p); + auto &vars = result->vars_[i][og]; PADDLE_ENFORCE(!vars.empty()); auto &prev_grad = vars.back(); op_handle->AddInput(prev_grad.get()); diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.h b/paddle/fluid/framework/details/multi_devices_graph_builder.h index 544cbe585c7423b5f3eb98ee698ca5668376f1ca..78581755fe4890800636944d6cd89875a852cc19 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.h +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.h @@ -100,7 +100,7 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { const std::vector> &var_name_on_devices, const OpDesc &op) const; - void InsertNCCLAllReduceOp(SSAGraph *result, const std::string &og) const; + void InsertAllReduceOp(SSAGraph *result, const std::string &og) const; void CreateBroadcastOp(SSAGraph *result, const std::string &p_name, size_t src_dev_id) const; @@ -111,6 +111,9 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { private: BuildStrategy strategy_; + + void SetCommunicationContext(OpHandleBase *op_handle, + const platform::Place &p) const; }; } // namespace details } // namespace framework diff --git a/paddle/fluid/framework/details/op_handle_base.cc b/paddle/fluid/framework/details/op_handle_base.cc index 3849cca59a3347137b769f97261cfbf97da8d6ff..f79565fe71c4aef140475c922cbbf5a1e0b7fe03 100644 --- a/paddle/fluid/framework/details/op_handle_base.cc +++ b/paddle/fluid/framework/details/op_handle_base.cc @@ -39,9 +39,9 @@ OpHandleBase::~OpHandleBase() { #endif } -void OpHandleBase::Run(bool use_event) { +void OpHandleBase::Run(bool use_cuda) { #ifdef PADDLE_WITH_CUDA - if (events_.empty() && use_event) { + if (events_.empty() && use_cuda) { for (auto &p : dev_ctxes_) { int dev_id = boost::get(p.first).device; PADDLE_ENFORCE(cudaSetDevice(dev_id)); @@ -50,7 +50,7 @@ void OpHandleBase::Run(bool use_event) { } } #else - PADDLE_ENFORCE(!use_event); + PADDLE_ENFORCE(!use_cuda); #endif RunImpl(); diff --git a/paddle/fluid/framework/details/op_handle_base.h b/paddle/fluid/framework/details/op_handle_base.h index dc92b0fe9f760d95d4869fdd56c0400b6710437f..fbd90a3296bca92b097cab925b218b91e7f4752f 100644 --- a/paddle/fluid/framework/details/op_handle_base.h +++ b/paddle/fluid/framework/details/op_handle_base.h @@ -36,7 +36,7 @@ class OpHandleBase { virtual std::string Name() const = 0; - void Run(bool use_event); + void Run(bool use_cuda); virtual void RecordWaitEventOnCtx(platform::DeviceContext *waited_ctx); diff --git a/paddle/fluid/framework/details/reduce_and_gather.h b/paddle/fluid/framework/details/reduce_and_gather.h index 2b95a284990da8f9b7c16d6e4221eb1ed061f74b..a6ffb37313a88120bc9e8d5ce326f60aeebdff69 100644 --- a/paddle/fluid/framework/details/reduce_and_gather.h +++ b/paddle/fluid/framework/details/reduce_and_gather.h @@ -37,7 +37,9 @@ struct ReduceLoDTensor { PADDLE_ENFORCE_NE(t0.numel(), 0); dst_tensor_.Resize(t0.dims()); T *dst = dst_tensor_.mutable_data(platform::CPUPlace()); - std::copy(t0.data(), t0.data() + t0.numel(), dst); + if (dst != t0.data()) { + std::copy(t0.data(), t0.data() + t0.numel(), dst); + } for (size_t i = 1; i < src_tensors_.size(); ++i) { auto &t = *src_tensors_[i]; diff --git a/paddle/fluid/framework/details/ssa_graph_builder_factory.h b/paddle/fluid/framework/details/ssa_graph_builder_factory.h index 857ab12d684e19788597e144fc0c46571d06aafc..91a119de83ed3d1573803e48faf86c874eed98d6 100644 --- a/paddle/fluid/framework/details/ssa_graph_builder_factory.h +++ b/paddle/fluid/framework/details/ssa_graph_builder_factory.h @@ -40,7 +40,11 @@ class SSAGraphBuilderFactory { loss_var_name_(loss_var_name), param_names_(param_names), local_scopes_(local_scopes), - strategy_(strategy) {} + strategy_(strategy) { +#ifdef PADDLE_WITH_CUDA + nccl_ctxs_ = nullptr; +#endif + } #ifdef PADDLE_WITH_CUDA void SetNCCLContextMap(platform::NCCLContextMap* nccl_ctxs) { diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index bcbf5736267f0d760d14c96784f0994c6bd013ac..6c5098ce85b784a3edcf8f48d2cc828aabd8e161 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -193,7 +193,7 @@ void ThreadedSSAGraphExecutor::RunOp( if (VLOG_IS_ON(10)) { VLOG(10) << op << " " << op->Name() << " : " << op->DebugString(); } - op->Run(strategy_.use_event_); + op->Run(strategy_.use_cuda_); VLOG(10) << op << " " << op->Name() << " Done "; running_ops_--; ready_var_q->Extend(op->Outputs()); diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 5d95dc214ac39117a2ec4674da7f3bd50fa6d3d0..ac4d1f58a5b3b11f034af7618681ebd913d8afb9 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -43,7 +43,8 @@ class ParallelExecutorPrivate { #ifdef PADDLE_WITH_CUDA std::unique_ptr nccl_ctxs_; #endif - bool own_local_scope; + bool own_local_scope_; + bool use_cuda_; }; std::vector &ParallelExecutor::GetLocalScopes() { @@ -60,35 +61,40 @@ ParallelExecutor::ParallelExecutor( size_t num_trainers, size_t trainer_id) : member_(new ParallelExecutorPrivate(places)) { member_->global_scope_ = scope; + member_->use_cuda_ = exec_strategy.use_cuda_; // Step 1. Bcast the params to devs. // Create local scopes if (local_scopes.empty()) { - member_->own_local_scope = true; + member_->own_local_scope_ = true; member_->local_scopes_.emplace_back(member_->global_scope_); for (size_t i = 1; i < member_->places_.size(); ++i) { member_->local_scopes_.emplace_back(&scope->NewScope()); } } else { - member_->own_local_scope = false; + member_->own_local_scope_ = false; PADDLE_ENFORCE_EQ(member_->places_.size(), local_scopes.size()); for (size_t i = 0; i < member_->places_.size(); ++i) { member_->local_scopes_.emplace_back(&local_scopes[i]->NewScope()); } } + if (member_->use_cuda_) { // Bcast Parameters to all GPUs #ifdef PADDLE_WITH_CUDA - auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME); - ncclUniqueId *nccl_id = nullptr; - if (nccl_id_var != nullptr) { - nccl_id = nccl_id_var->GetMutable(); - } - member_->nccl_ctxs_.reset(new platform::NCCLContextMap( - member_->places_, nccl_id, num_trainers, trainer_id)); + auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME); + ncclUniqueId *nccl_id = nullptr; + if (nccl_id_var != nullptr) { + nccl_id = nccl_id_var->GetMutable(); + } + member_->nccl_ctxs_.reset(new platform::NCCLContextMap( + member_->places_, nccl_id, num_trainers, trainer_id)); +#else + PADDLE_THROW("Not compiled with CUDA"); #endif - if (platform::is_gpu_place(places[0]) && member_->local_scopes_.size() != 1 && - local_scopes.empty()) { // Is CUDA + } + + if (member_->local_scopes_.size() != 1 && local_scopes.empty()) { BCastParamsToGPUs(bcast_vars); } // Startup Program has been run. All local scopes has correct parameters. @@ -108,9 +114,13 @@ ParallelExecutor::ParallelExecutor( details::SSAGraphBuilderFactory builder_factory( member_->places_, loss_var_name, params, member_->local_scopes_, build_strategy); + if (member_->use_cuda_) { #ifdef PADDLE_WITH_CUDA - builder_factory.SetNCCLContextMap(member_->nccl_ctxs_.get()); + builder_factory.SetNCCLContextMap(member_->nccl_ctxs_.get()); +#else + PADDLE_THROW("Not compiled with CUDA"); #endif + } member_->executor_.reset(new details::ThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, places, @@ -123,7 +133,6 @@ ParallelExecutor::ParallelExecutor( void ParallelExecutor::BCastParamsToGPUs( const std::unordered_set &vars) const { -#ifdef PADDLE_WITH_CUDA auto *main_scope = member_->local_scopes_[0]; for (auto &var : vars) { @@ -135,6 +144,7 @@ void ParallelExecutor::BCastParamsToGPUs( auto &main_tensor = main_var->Get(); auto &dims = main_tensor.dims(); if (paddle::platform::is_gpu_place(main_tensor.place())) { +#ifdef PADDLE_WITH_CUDA size_t numel = main_tensor.numel(); ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type()); platform::NCCLGroupGuard guard; @@ -153,6 +163,10 @@ void ParallelExecutor::BCastParamsToGPUs( platform::dynload::ncclBcast(buffer, numel, data_type, 0, nccl_ctx.comm_, nccl_ctx.stream()); } + member_->nccl_ctxs_->WaitAll(); +#else + PADDLE_THROW("Not compiled with CUDA"); +#endif } else { platform::CPUPlace cpu; for (size_t i = 1; i < member_->places_.size(); ++i) { @@ -163,11 +177,7 @@ void ParallelExecutor::BCastParamsToGPUs( paddle::framework::TensorCopy(main_tensor, cpu, t); } } - member_->nccl_ctxs_->WaitAll(); } -#else - PADDLE_THROW("Not compiled with CUDA"); -#endif } void ParallelExecutor::Run(const std::vector &fetch_tensors, @@ -213,7 +223,7 @@ void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes( } ParallelExecutor::~ParallelExecutor() { - if (member_->own_local_scope) { + if (member_->own_local_scope_) { for (size_t i = 1; i < member_->local_scopes_.size(); ++i) { member_->global_scope_->DeleteScope(member_->local_scopes_[i]); } diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index 16c30da0a7582849e756989f2336a38700d0215f..2d34f85838c34f1dfe43d2130e127d0258072fa7 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -41,11 +41,22 @@ class RequestBase { virtual ~RequestBase() {} virtual void Process() = 0; - CallStatus Status() { return status_; } - void SetStatus(CallStatus status) { status_ = status; } + CallStatus Status() const { + std::lock_guard l(status_mu_); + return status_; + } + + template + void Finish(const T& reply, ServerAsyncResponseWriter* responder) { + std::lock_guard l(status_mu_); + status_ = FINISH; + responder->Finish(reply, ::grpc::Status::OK, + reinterpret_cast(static_cast(req_id_))); + } virtual std::string GetReqName() = 0; protected: + mutable std::mutex status_mu_; ::grpc::ServerContext ctx_; GrpcService::AsyncService* service_; ::grpc::ServerCompletionQueue* cq_; @@ -80,9 +91,7 @@ class RequestSend final : public RequestBase { framework::Variable* outvar = nullptr; request_handler_->Handle(varname, scope, invar, &outvar); - status_ = FINISH; - responder_.Finish(reply_, ::grpc::Status::OK, - reinterpret_cast(static_cast(req_id_))); + Finish(reply_, &responder_); } protected: @@ -122,9 +131,7 @@ class RequestGet final : public RequestBase { SerializeToByteBuffer(varname, outvar, *request_handler_->dev_ctx(), &reply_); } - status_ = FINISH; - responder_.Finish(reply_, ::grpc::Status::OK, - reinterpret_cast(static_cast(req_id_))); + Finish(reply_, &responder_); } protected: @@ -157,8 +164,8 @@ class RequestPrefetch final : public RequestBase { // prefetch process... std::string in_var_name = request_->Varname(); std::string out_var_name = request_->OutVarname(); - VLOG(3) << "in_var_name: " << in_var_name - << " RequestPrefetch: " << out_var_name; + VLOG(3) << "RequestPrefetch, in_var_name: " << in_var_name + << " out_var_name: " << out_var_name; auto scope = request_->GetMutableLocalScope(); auto invar = scope->FindVar(in_var_name); @@ -168,9 +175,7 @@ class RequestPrefetch final : public RequestBase { SerializeToByteBuffer(out_var_name, outvar, *request_handler_->dev_ctx(), &reply_); - status_ = FINISH; - responder_.Finish(reply_, ::grpc::Status::OK, - reinterpret_cast(static_cast(req_id_))); + Finish(reply_, &responder_); } protected: diff --git a/paddle/fluid/operators/detail/grpc_server.h b/paddle/fluid/operators/detail/grpc_server.h index e6ffc7066f24d5088a95801ed1c0670b24d5771f..f1db7590f6f14d5d44acc12453861a446e278cd2 100644 --- a/paddle/fluid/operators/detail/grpc_server.h +++ b/paddle/fluid/operators/detail/grpc_server.h @@ -53,6 +53,7 @@ class AsyncGRPCServer final : public RPCServer { void StartServer() override; private: + // HandleRequest needs to be thread-safe. void HandleRequest( ::grpc::ServerCompletionQueue* cq, const std::string& rpc_name, std::function TryToRegisterNewOne); diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 669d1bdaa3ec194be817cdc5e1f8484770c70c68..c88fbef63cf26c671246b15ea9872da0e7a92c1a 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -509,10 +509,10 @@ All parameter, weight, gradient are variables in Paddle. self.num_threads_ = num_threads; }) .def_property( - "use_event", - [](const ExecutionStrategy &self) { return self.use_event_; }, - [](ExecutionStrategy &self, bool use_event) { - self.use_event_ = use_event; + "use_cuda", + [](const ExecutionStrategy &self) { return self.use_cuda_; }, + [](ExecutionStrategy &self, bool use_cuda) { + self.use_cuda_ = use_cuda; }) .def_property( "allow_op_delay", diff --git a/python/paddle/dataset/flowers.py b/python/paddle/dataset/flowers.py index f082e33be3357fbe405ab1a1ef5e0e601108a363..527044b415533cc640e3cfc5837c08ab0f8b74b1 100644 --- a/python/paddle/dataset/flowers.py +++ b/python/paddle/dataset/flowers.py @@ -119,7 +119,8 @@ def reader_creator(data_file, yield sample, int(label) - 1 if use_xmap: - return xmap_readers(mapper, reader, cpu_count(), buffered_size) + cpu_num = int(os.environ.get('CPU_NUM', cpu_count())) + return xmap_readers(mapper, reader, cpu_num, buffered_size) else: return map_readers(mapper, reader) diff --git a/python/paddle/fluid/data_feeder.py b/python/paddle/fluid/data_feeder.py index 7940dabcfb03cc9eb46f678365685a6e99bcceec..e2013137b14f73bb0fcfb57b4bdc35fcc043bdc0 100644 --- a/python/paddle/fluid/data_feeder.py +++ b/python/paddle/fluid/data_feeder.py @@ -15,6 +15,7 @@ from __future__ import print_function import core import numpy +import os import six.moves as six import multiprocessing @@ -150,7 +151,9 @@ class DataFeeder(object): elif isinstance(self.place, core.CUDAPlace): return core.get_cuda_device_count() else: - return multiprocessing.cpu_count() + cpu_num = int( + os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + return cpu_num def decorate_reader(self, reader, diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 3117dfe00c7a3df1035c439dc31b81e67781d0cc..0fdc9a035292b3390cece6c5821a60b1b281e54d 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -18,6 +18,7 @@ import framework import executor import warnings import sys +import os __all__ = ['ParallelExecutor', 'ExecutionStrategy', 'BuildStrategy'] @@ -101,7 +102,9 @@ class ParallelExecutor(object): p.set_place(self._act_places[-1]) self._places.append(p) else: - for i in xrange(multiprocessing.cpu_count()): + cpu_num = int( + os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + for i in xrange(cpu_num): p = core.Place() self._act_places.append(core.CPUPlace()) p.set_place(self._act_places[-1]) @@ -110,19 +113,17 @@ class ParallelExecutor(object): if exec_strategy is None: exec_strategy = ExecutionStrategy() - if use_cuda: - exec_strategy.use_event = True - else: - exec_strategy.use_event = False + exec_strategy.use_cuda = use_cuda if exec_strategy.num_threads == 0: if use_cuda: # Experiments on se-resnext shows that too many threads hurt # performance. Worth tunning for other models in the future. - exec_strategy.num_threads = len(self._places) * 2 + exec_strategy.num_threads = len(self._places) * 4 else: - exec_strategy.num_threads = min( - len(self._places) * 2, multiprocessing.cpu_count()) + cpu_num = int( + os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + exec_strategy.num_threads = cpu_num if build_strategy is None: build_strategy = BuildStrategy() diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 673bd728718ca233b426fe2aaae307413d875174..ab683bc101728ba008e01f26ff4d3828b3b99787 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -41,8 +41,8 @@ function(py_test_modules TARGET_NAME) endfunction() list(REMOVE_ITEM TEST_OPS test_warpctc_op) list(REMOVE_ITEM TEST_OPS test_dist_train) -list(REMOVE_ITEM TEST_OPS test_parallel_executor_crf) -list(REMOVE_ITEM TEST_OPS test_parallel_executor_fetch_feed) +#list(REMOVE_ITEM TEST_OPS test_parallel_executor_crf) +#list(REMOVE_ITEM TEST_OPS test_parallel_executor_fetch_feed) # TODO(wuyi): this test hungs on CI, will add it back later list(REMOVE_ITEM TEST_OPS test_listen_and_serv_op) foreach(TEST_OP ${TEST_OPS}) diff --git a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py index c9c3c648717814c28c39a401487925824e885946..829c5a1a5fd099543e9e98b9587d4f316a91b587 100644 --- a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py +++ b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import multiprocessing +import os import unittest import paddle.fluid as fluid import time @@ -23,6 +25,7 @@ __all__ = ['TestParallelExecutorBase'] class TestParallelExecutorBase(unittest.TestCase): def check_network_convergence(self, method, + use_cuda=True, memory_opt=True, iter=50, batch_size=None, @@ -53,7 +56,7 @@ class TestParallelExecutorBase(unittest.TestCase): adam.minimize(loss) if memory_opt: fluid.memory_optimize(main) - place = fluid.CUDAPlace(0) + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() startup_exe = fluid.Executor(place) startup_exe.run(startup) exec_strategy = fluid.ExecutionStrategy() @@ -64,7 +67,7 @@ class TestParallelExecutorBase(unittest.TestCase): if use_parallel_executor: exe = fluid.ParallelExecutor( - True, + use_cuda, loss_name=loss.name, exec_strategy=exec_strategy, build_strategy=build_strategy) @@ -72,7 +75,9 @@ class TestParallelExecutorBase(unittest.TestCase): exe = fluid.Executor(place=place) if batch_size is not None: - batch_size *= fluid.core.get_cuda_device_count() + batch_size *= fluid.core.get_cuda_device_count( + ) if use_cuda else int( + os.environ.get('CPU_NUM', multiprocessing.cpu_count())) begin = time.time() first_loss, = run_executor( exe=exe, feed=feed_dict, fetch_list=[loss.name]) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_crf.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_crf.py index 66e138b03f3b170aca4fb2207438eb9af1783c33..163975555ec2cea5c169cc1da3c4324d91ba3616 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_crf.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_crf.py @@ -17,6 +17,7 @@ import paddle.fluid as fluid import unittest import paddle import numpy as np +import os word_dict, verb_dict, label_dict = conll05.get_dict() word_dict_len = len(word_dict) @@ -101,7 +102,11 @@ def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark, class TestCRFModel(unittest.TestCase): - def check_network_convergence(self, is_sparse, build_strategy=None): + def check_network_convergence(self, + is_sparse, + build_strategy=None, + use_cuda=True): + os.environ['CPU_NUM'] = str(4) main = fluid.Program() startup = fluid.Program() with fluid.program_guard(main, startup): @@ -145,12 +150,12 @@ class TestCRFModel(unittest.TestCase): paddle.dataset.conll05.test(), buf_size=8192), batch_size=16) - place = fluid.CUDAPlace(0) + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup) pe = fluid.ParallelExecutor( - use_cuda=True, + use_cuda=use_cuda, loss_name=avg_cost.name, build_strategy=build_strategy) @@ -172,25 +177,33 @@ class TestCRFModel(unittest.TestCase): build_strategy = fluid.BuildStrategy() build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce self.check_network_convergence( - is_sparse=True, build_strategy=build_strategy) + is_sparse=True, build_strategy=build_strategy, use_cuda=True) + self.check_network_convergence( + is_sparse=True, build_strategy=build_strategy, use_cuda=False) def test_update_dense_parameter_all_reduce(self): build_strategy = fluid.BuildStrategy() build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce self.check_network_convergence( - is_sparse=False, build_strategy=build_strategy) + is_sparse=False, build_strategy=build_strategy, use_cuda=True) + self.check_network_convergence( + is_sparse=False, build_strategy=build_strategy, use_cuda=False) def test_update_sparse_parameter_reduce(self): build_strategy = fluid.BuildStrategy() build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce self.check_network_convergence( - is_sparse=True, build_strategy=build_strategy) + is_sparse=True, build_strategy=build_strategy, use_cuda=True) + self.check_network_convergence( + is_sparse=True, build_strategy=build_strategy, use_cuda=False) def test_update_dense_parameter_reduce(self): build_strategy = fluid.BuildStrategy() build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce self.check_network_convergence( - is_sparse=False, build_strategy=build_strategy) + is_sparse=False, build_strategy=build_strategy, use_cuda=True) + self.check_network_convergence( + is_sparse=False, build_strategy=build_strategy, use_cuda=False) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py index 24f8d28c0304a77a99213374b25d0db728eca265..79702475cca86ca22107d4b1824fda277dd83157 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py @@ -18,6 +18,7 @@ import paddle.fluid as fluid import unittest import numpy as np import paddle +import os def Lenet(data, class_dim): @@ -35,7 +36,7 @@ def Lenet(data, class_dim): class TestFetchOp(unittest.TestCase): - def parallel_exe(self, train_inputs, seed): + def parallel_exe(self, train_inputs, seed, use_cuda): main = fluid.Program() startup = fluid.Program() startup.random_seed = seed @@ -59,13 +60,13 @@ class TestFetchOp(unittest.TestCase): # conv2d_1.b_0@GRAD. Those variables should not be pruned. # fluid.memory_optimize(main) - place = fluid.CUDAPlace(0) + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup) feeder = fluid.DataFeeder(place=place, feed_list=[data, label]) pe = fluid.ParallelExecutor( - use_cuda=True, loss_name=loss.name, main_program=main) + use_cuda=use_cuda, loss_name=loss.name, main_program=main) fetch_list = [] all_vars = main.global_block().vars @@ -88,14 +89,16 @@ class TestFetchOp(unittest.TestCase): for i in range(iters): train_inputs.append(tst_reader_iter.next()) - self.parallel_exe(train_inputs, seed=1) + os.environ['CPU_NUM'] = str(4) + self.parallel_exe(train_inputs, seed=1, use_cuda=True) + self.parallel_exe(train_inputs, seed=1, use_cuda=False) class TestFeedParallel(unittest.TestCase): - def test_main(self): + def parallel_exe(self, use_cuda, seed): main = fluid.Program() startup = fluid.Program() - startup.random_seed = 1 + startup.random_seed = seed with fluid.scope_guard(fluid.core.Scope()): with fluid.program_guard(main, startup): data = fluid.layers.data( @@ -111,15 +114,18 @@ class TestFeedParallel(unittest.TestCase): regularization=fluid.regularizer.L2Decay(1e-4)) opt.minimize(loss) - place = fluid.CUDAPlace(0) + + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() feeder = fluid.DataFeeder(place=place, feed_list=[data, label]) reader = feeder.decorate_reader( paddle.batch( flowers.train(), batch_size=16), multi_devices=True) + exe = fluid.Executor(place) exe.run(startup) + pe = fluid.ParallelExecutor( - use_cuda=True, loss_name=loss.name, main_program=main) + use_cuda=use_cuda, loss_name=loss.name, main_program=main) for batch_id, data in enumerate(reader()): loss_np = np.array(pe.run(feed=data, fetch_list=[loss.name])[0]) @@ -127,6 +133,11 @@ class TestFeedParallel(unittest.TestCase): if batch_id == 2: break + def test_feed_op(self): + os.environ['CPU_NUM'] = str(4) + self.parallel_exe(use_cuda=True, seed=1) + self.parallel_exe(use_cuda=False, seed=1) + if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py index 015703c3e25f4e11e64ab6a7de99da12bee608f6..a801d99aa1ced35eb7f081fde63ad541f0eb2589 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py @@ -18,6 +18,7 @@ import numpy as np import paddle import paddle.dataset.mnist as mnist import unittest +import os MNIST_RECORDIO_FILE = "./mnist_test_pe.recordio" @@ -85,6 +86,7 @@ def fc_with_batchnorm(use_feed): class TestMNIST(TestParallelExecutorBase): @classmethod def setUpClass(cls): + os.environ['CPU_NUM'] = str(4) # Convert mnist to recordio file with fluid.program_guard(fluid.Program(), fluid.Program()): reader = paddle.batch(mnist.train(), batch_size=4) @@ -99,9 +101,12 @@ class TestMNIST(TestParallelExecutorBase): fluid.recordio_writer.convert_reader_to_recordio_file( MNIST_RECORDIO_FILE, reader, feeder) - def check_simple_fc_convergence(self, balance_parameter_opt_between_cards): - self.check_network_convergence(simple_fc_net) - self.check_network_convergence(simple_fc_net, allow_op_delay=True) + def check_simple_fc_convergence(self, + balance_parameter_opt_between_cards, + use_cuda=True): + self.check_network_convergence(simple_fc_net, use_cuda=use_cuda) + self.check_network_convergence( + simple_fc_net, use_cuda=use_cuda, allow_op_delay=True) img = np.zeros(shape=[32, 784], dtype='float32') label = np.ones(shape=[32, 1], dtype='int64') @@ -109,17 +114,21 @@ class TestMNIST(TestParallelExecutorBase): simple_fc_net, feed_dict={"image": img, "label": label}, + use_cuda=use_cuda, balance_parameter_opt_between_cards=balance_parameter_opt_between_cards ) def test_simple_fc(self): - self.check_simple_fc_convergence(False) + self.check_simple_fc_convergence(False, use_cuda=True) + self.check_simple_fc_convergence(False, use_cuda=False) def test_simple_fc_with_new_strategy(self): - self.check_simple_fc_convergence(True) + self.check_simple_fc_convergence(True, use_cuda=True) + self.check_simple_fc_convergence(True, use_cuda=False) def check_simple_fc_parallel_accuracy(self, - balance_parameter_opt_between_cards): + balance_parameter_opt_between_cards, + use_cuda=True): img = np.zeros(shape=[32, 784], dtype='float32') label = np.ones(shape=[32, 1], dtype='int64') single_first_loss, single_last_loss = self.check_network_convergence( @@ -127,12 +136,14 @@ class TestMNIST(TestParallelExecutorBase): seed=1000, feed_dict={"image": img, "label": label}, + use_cuda=use_cuda, use_parallel_executor=False) parallel_first_loss, parallel_last_loss = self.check_network_convergence( method=simple_fc_net, seed=1000, feed_dict={"image": img, "label": label}, + use_cuda=use_cuda, use_parallel_executor=True, balance_parameter_opt_between_cards=balance_parameter_opt_between_cards ) @@ -143,28 +154,33 @@ class TestMNIST(TestParallelExecutorBase): self.assertAlmostEquals(p_l, single_last_loss[0], delta=1e-6) def test_simple_fc_parallel_accuracy(self): - self.check_simple_fc_parallel_accuracy(False) + self.check_simple_fc_parallel_accuracy(False, use_cuda=True) + self.check_simple_fc_parallel_accuracy(False, use_cuda=False) def test_simple_fc_parallel_accuracy_with_new_strategy(self): - self.check_simple_fc_parallel_accuracy(True) + self.check_simple_fc_parallel_accuracy(True, use_cuda=True) + self.check_simple_fc_parallel_accuracy(True, use_cuda=False) - def check_batchnorm_fc_convergence(self, - balance_parameter_opt_between_cards): - self.check_network_convergence(fc_with_batchnorm) + def check_batchnorm_fc_convergence( + self, balance_parameter_opt_between_cards, use_cuda): + self.check_network_convergence(fc_with_batchnorm, use_cuda=use_cuda) img = np.zeros(shape=[32, 784], dtype='float32') label = np.ones(shape=[32, 1], dtype='int64') self.check_network_convergence( fc_with_batchnorm, feed_dict={"image": img, "label": label}, + use_cuda=use_cuda, balance_parameter_opt_between_cards=balance_parameter_opt_between_cards ) def test_batchnorm_fc(self): - self.check_batchnorm_fc_convergence(False) + self.check_batchnorm_fc_convergence(False, use_cuda=True) + self.check_batchnorm_fc_convergence(False, use_cuda=False) def test_batchnorm_fc_with_new_strategy(self): - self.check_batchnorm_fc_convergence(True) + self.check_batchnorm_fc_convergence(True, use_cuda=True) + self.check_batchnorm_fc_convergence(True, use_cuda=False) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py index a3fa140cbb7994a36d2cbee26d598165f1f771d2..066299e6c6f7f6c159cb0886e86d3404b027b698 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py @@ -15,6 +15,7 @@ import paddle.fluid as fluid from parallel_executor_test_base import TestParallelExecutorBase import unittest +import os def squeeze_excitation(input, num_channels, reduction_ratio): @@ -130,22 +131,30 @@ def SE_ResNeXt50Small(batch_size=2, use_feed=False): class TestResnet(TestParallelExecutorBase): - def check_resnet_convergence(self, balance_parameter_opt_between_cards): + def check_resnet_convergence(self, + balance_parameter_opt_between_cards, + use_cuda=True, + iter=20): + os.environ['CPU_NUM'] = str(4) + import functools batch_size = 2 self.check_network_convergence( functools.partial( SE_ResNeXt50Small, batch_size=batch_size), - iter=20, + iter=iter, batch_size=batch_size, + use_cuda=use_cuda, balance_parameter_opt_between_cards=balance_parameter_opt_between_cards ) def test_resnet(self): - self.check_resnet_convergence(False) + self.check_resnet_convergence(False, use_cuda=True) + self.check_resnet_convergence(False, use_cuda=False, iter=5) def test_resnet_with_new_strategy(self): - self.check_resnet_convergence(True) + self.check_resnet_convergence(True, use_cuda=True) + self.check_resnet_convergence(True, use_cuda=False, iter=5) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py index 93a5f767867d68110cf7b8f441cc740ecd843cf9..31ba8c1d6096c9c89e0695c8eca8e16a5e303a61 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py @@ -15,6 +15,7 @@ import paddle.fluid as fluid import numpy as np import unittest +import os def simple_fc_net(): @@ -35,7 +36,8 @@ def simple_fc_net(): class ParallelExecutorTestingDuringTraining(unittest.TestCase): - def check_network_convergence(self, build_strategy=None): + def check_network_convergence(self, use_cuda, build_strategy=None): + os.environ['CPU_NUM'] = str(4) main = fluid.Program() startup = fluid.Program() with fluid.program_guard(main, startup): @@ -49,19 +51,19 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase): image = np.random.normal(size=(batch_size, 784)).astype('float32') label = np.random.randint(0, 10, (batch_size, 1), dtype="int64") - place = fluid.CUDAPlace(0) + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup) feed_dict = {'image': image, 'label': label} train_exe = fluid.ParallelExecutor( - use_cuda=True, + use_cuda=use_cuda, loss_name=loss.name, main_program=main, build_strategy=build_strategy) test_exe = fluid.ParallelExecutor( - use_cuda=True, + use_cuda=use_cuda, main_program=test_program, share_vars_from=train_exe, build_strategy=build_strategy) @@ -81,12 +83,18 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase): def test_parallel_testing(self): build_strategy = fluid.BuildStrategy() build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce - self.check_network_convergence(build_strategy) + self.check_network_convergence( + use_cuda=True, build_strategy=build_strategy) + self.check_network_convergence( + use_cuda=False, build_strategy=build_strategy) def test_parallel_testing_with_new_strategy(self): build_strategy = fluid.BuildStrategy() build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce - self.check_network_convergence(build_strategy) + self.check_network_convergence( + use_cuda=True, build_strategy=build_strategy) + self.check_network_convergence( + use_cuda=False, build_strategy=build_strategy) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py index c81df66d987f3d3856af0e19fc935df7de2edacc..b6215fddb11bb6b3a76b5a6395e7254d21971c13 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py @@ -19,6 +19,7 @@ from parallel_executor_test_base import TestParallelExecutorBase import unittest import paddle import paddle.dataset.wmt16 as wmt16 +import os WMT16_RECORDIO_FILE = "./wmt16_test_pe.recordio" @@ -149,6 +150,7 @@ def transformer(use_feed): class TestTransformer(TestParallelExecutorBase): @classmethod def setUpClass(cls): + os.environ['CPU_NUM'] = str(4) reader = paddle.batch( wmt16.train(ModelHyperParams.src_vocab_size, ModelHyperParams.trg_vocab_size), @@ -167,7 +169,8 @@ class TestTransformer(TestParallelExecutorBase): @unittest.skip("transformer is buggy in multi gpu") def test_main(self): - self.check_network_convergence(transformer) + self.check_network_convergence(transformer, use_cuda=True) + self.check_network_convergence(transformer, use_cuda=False) if __name__ == '__main__': diff --git a/python/paddle/v2/dataset/flowers.py b/python/paddle/v2/dataset/flowers.py index 7bdddeaabec733ef26b3f766c6437f5c53d65044..357a4e9b000ea81afe291ff39dde2bed5c67e619 100644 --- a/python/paddle/v2/dataset/flowers.py +++ b/python/paddle/v2/dataset/flowers.py @@ -119,7 +119,8 @@ def reader_creator(data_file, yield sample, int(label) - 1 if use_xmap: - return xmap_readers(mapper, reader, cpu_count(), buffered_size) + cpu_num = int(os.environ.get('CPU_NUM', cpu_count())) + return xmap_readers(mapper, reader, cpu_num, buffered_size) else: return map_readers(mapper, reader)