提交 16658f7b 编写于 作者: Q qiaolongfei

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into refine-prefetch

......@@ -13,14 +13,14 @@ cc_library(ssa_graph_checker SRCS ssa_graph_checker.cc DEPS ssa_graph_builder)
cc_library(variable_visitor SRCS variable_visitor.cc DEPS lod_tensor selected_rows)
if(WITH_GPU)
nv_library(nccl_all_reduce_op_handle SRCS nccl_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory
nv_library(all_reduce_op_handle SRCS all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory
dynload_cuda variable_visitor)
set(multi_devices_graph_builder_deps nccl_all_reduce_op_handle)
nv_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope ddim dynload_cuda)
nv_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor dynload_cuda)
else()
set(multi_devices_graph_builder_deps)
cc_library(all_reduce_op_handle SRCS all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory
variable_visitor)
cc_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope ddim)
cc_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor)
endif()
......@@ -29,7 +29,7 @@ cc_library(gather_op_handle SRCS gather_op_handle.cc DEPS op_handle_base scope d
cc_library(fuse_vars_op_handle SRCS fuse_vars_op_handle.cc DEPS op_handle_base scope)
cc_library(multi_devices_graph_builder SRCS multi_devices_graph_builder.cc DEPS ssa_graph_builder computation_op_handle
scale_loss_grad_op_handle rpc_op_handle ${multi_devices_graph_builder_deps} reduce_op_handle broadcast_op_handle)
scale_loss_grad_op_handle rpc_op_handle all_reduce_op_handle reduce_op_handle broadcast_op_handle)
cc_library(ssa_graph_builder_factory SRCS ssa_graph_builder_factory.cc DEPS multi_devices_graph_builder ssa_graph_printer ssa_graph_checker)
......
......@@ -13,25 +13,33 @@
// limitations under the License.
#include <algorithm>
#include "paddle/fluid/framework/details/all_reduce_op_handle.h"
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/framework/details/nccl_all_reduce_op_handle.h"
#include "paddle/fluid/framework/details/reduce_and_gather.h"
#include "paddle/fluid/framework/details/variable_visitor.h"
namespace paddle {
namespace framework {
namespace details {
NCCLAllReduceOpHandle::NCCLAllReduceOpHandle(
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const platform::NCCLContextMap &ctxs)
#ifdef PADDLE_WITH_CUDA
AllReduceOpHandle::AllReduceOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const platform::NCCLContextMap *ctxs)
: local_scopes_(local_scopes), places_(places), nccl_ctxs_(ctxs) {
for (auto &p : places_) {
this->dev_ctxes_[p] = nccl_ctxs_.DevCtx(p);
if (nccl_ctxs_) {
for (auto &p : places_) {
this->dev_ctxes_[p] = nccl_ctxs_->DevCtx(p);
}
}
}
#else
AllReduceOpHandle::AllReduceOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places)
: local_scopes_(local_scopes), places_(places) {}
#endif
void NCCLAllReduceOpHandle::RunImpl() {
void AllReduceOpHandle::RunImpl() {
if (NoDummyInputSize() == 1) {
return; // No need to all reduce when GPU count = 1;
} else {
......@@ -58,6 +66,8 @@ void NCCLAllReduceOpHandle::RunImpl() {
}
if (platform::is_gpu_place(lod_tensors[0]->place())) {
#ifdef PADDLE_WITH_CUDA
PADDLE_ENFORCE(nccl_ctxs_, "nccl_ctxs should not be nullptr.");
int dtype = -1;
size_t numel = 0;
std::vector<std::function<void()>> all_reduce_calls;
......@@ -75,7 +85,7 @@ void NCCLAllReduceOpHandle::RunImpl() {
}
int dev_id = boost::get<platform::CUDAPlace>(p).device;
auto &nccl_ctx = nccl_ctxs_.at(dev_id);
auto &nccl_ctx = nccl_ctxs_->at(dev_id);
auto stream = nccl_ctx.stream();
auto comm = nccl_ctx.comm_;
all_reduce_calls.emplace_back([=] {
......@@ -90,22 +100,25 @@ void NCCLAllReduceOpHandle::RunImpl() {
call();
}
});
#else
PADDLE_THROW("Not compiled with CUDA");
#endif
} else { // Special handle CPU only Operator's gradient. Like CRF
auto &trg = *this->local_scopes_[0]
->FindVar(kLocalExecScopeName)
->Get<Scope *>()
->Var()
->FindVar(out_var_handles[0]->name_)
->GetMutable<framework::LoDTensor>();
// Reduce All Tensor to trg in CPU
ReduceLoDTensor func(lod_tensors, &trg);
VisitDataType(ToDataType(lod_tensors[0]->type()), func);
for (size_t i = 0; i < local_scopes_.size(); ++i) {
for (size_t i = 1; i < local_scopes_.size(); ++i) {
auto &scope =
*local_scopes_[i]->FindVar(kLocalExecScopeName)->Get<Scope *>();
auto &p = places_[i];
auto *var = scope.FindVar(in_var_handles[i]->name_);
auto *var = scope.FindVar(out_var_handles[i]->name_);
auto *dev_ctx = dev_ctxes_[p];
RunAndRecordEvent(p, [&trg, var, dev_ctx, p] {
......@@ -118,7 +131,7 @@ void NCCLAllReduceOpHandle::RunImpl() {
}
}
std::string NCCLAllReduceOpHandle::Name() const { return "nccl_all_reduce"; }
std::string AllReduceOpHandle::Name() const { return "all_reduce"; }
} // namespace details
} // namespace framework
} // namespace paddle
......@@ -20,17 +20,23 @@
#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/nccl_helper.h"
#endif
namespace paddle {
namespace framework {
namespace details {
struct NCCLAllReduceOpHandle : public OpHandleBase {
NCCLAllReduceOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const platform::NCCLContextMap &ctxs);
struct AllReduceOpHandle : public OpHandleBase {
#ifdef PADDLE_WITH_CUDA
AllReduceOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const platform::NCCLContextMap *ctxs);
#else
AllReduceOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places);
#endif
std::string Name() const override;
// Delay and buffer nccl_all_reduce together can significantly increase
......@@ -43,7 +49,9 @@ struct NCCLAllReduceOpHandle : public OpHandleBase {
private:
std::vector<Scope *> local_scopes_;
std::vector<platform::Place> places_;
const platform::NCCLContextMap &nccl_ctxs_;
#ifdef PADDLE_WITH_CUDA
const platform::NCCLContextMap *nccl_ctxs_;
#endif
};
} // namespace details
......
......@@ -20,7 +20,7 @@ namespace details {
struct ExecutionStrategy {
size_t num_threads_{0};
bool use_event_{true};
bool use_cuda_{true};
bool allow_op_delay_{false};
size_t num_iteration_per_drop_scope_{100};
};
......
......@@ -17,6 +17,7 @@
#include <utility>
#include <vector>
#include "paddle/fluid/framework/details/all_reduce_op_handle.h"
#include "paddle/fluid/framework/details/broadcast_op_handle.h"
#include "paddle/fluid/framework/details/computation_op_handle.h"
#include "paddle/fluid/framework/details/multi_devices_graph_builder.h"
......@@ -26,10 +27,6 @@
#include "paddle/fluid/framework/op_info.h"
#include "paddle/fluid/framework/scope.h"
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/framework/details/nccl_all_reduce_op_handle.h"
#endif
namespace paddle {
namespace framework {
namespace details {
......@@ -243,7 +240,7 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
CreateReduceOp(&result, g_name, 0);
CreateBroadcastOp(&result, g_name, 0);
} else {
InsertNCCLAllReduceOp(&result, g_name);
InsertAllReduceOp(&result, g_name);
}
break;
}
......@@ -286,6 +283,19 @@ bool MultiDevSSAGraphBuilder::IsSparseGradient(
return false;
}
void MultiDevSSAGraphBuilder::SetCommunicationContext(
OpHandleBase *op_handle, const platform::Place &p) const {
#ifdef PADDLE_WITH_CUDA
if (nccl_ctxs_ == nullptr) {
op_handle->SetDeviceContext(p,
platform::DeviceContextPool::Instance().Get(p));
}
#else
op_handle->SetDeviceContext(p,
platform::DeviceContextPool::Instance().Get(p));
#endif
}
void MultiDevSSAGraphBuilder::CreateBroadcastOp(SSAGraph *result,
const std::string &p_name,
size_t src_dev_id) const {
......@@ -300,15 +310,12 @@ void MultiDevSSAGraphBuilder::CreateBroadcastOp(SSAGraph *result,
op_handle->AddInput(in);
for (size_t i = 0; i < places_.size(); ++i) {
auto &vars = result->vars_.at(i).at(p_name);
auto &p = places_[i];
SetCommunicationContext(op_handle, p);
auto &vars = result->vars_.at(i).at(p_name);
auto *out_var = new VarHandle(vars.size(), i, p_name, p);
vars.emplace_back(out_var);
op_handle->AddOutput(out_var);
#ifndef ADDLE_WITH_CUDA
op_handle->SetDeviceContext(p,
platform::DeviceContextPool::Instance().Get(p));
#endif
}
}
......@@ -320,15 +327,19 @@ void MultiDevSSAGraphBuilder::CreateComputationalOp(SSAGraph *result,
CreateOpHandleIOs(result, op, dev_id);
}
void MultiDevSSAGraphBuilder::InsertNCCLAllReduceOp(
SSAGraph *result, const std::string &og) const {
void MultiDevSSAGraphBuilder::InsertAllReduceOp(SSAGraph *result,
const std::string &og) const {
#ifdef PADDLE_WITH_CUDA
result->ops_.emplace_back(
new NCCLAllReduceOpHandle(local_scopes_, places_, *nccl_ctxs_));
new AllReduceOpHandle(local_scopes_, places_, nccl_ctxs_));
#else
result->ops_.emplace_back(new AllReduceOpHandle(local_scopes_, places_));
#endif
auto *op_handle = result->ops_.back().get();
for (size_t i = 0; i < places_.size(); ++i) {
auto &p = places_[i];
SetCommunicationContext(op_handle, p);
auto &vars = result->vars_[i][og];
PADDLE_ENFORCE(!vars.empty());
auto &prev_grad = vars.back();
......@@ -338,9 +349,6 @@ void MultiDevSSAGraphBuilder::InsertNCCLAllReduceOp(
vars.emplace_back(var);
op_handle->AddOutput(var);
}
#else
PADDLE_ENFORCE("Not implemented");
#endif
}
bool MultiDevSSAGraphBuilder::IsParameterGradientOnce(
......@@ -379,7 +387,9 @@ void MultiDevSSAGraphBuilder::CreateScaleLossGradOp(SSAGraph *result) const {
for (size_t i = 0; i < places_.size(); ++i) {
// Insert ScaleCost OpHandle
#ifdef PADDLE_WITH_CUDA
auto *communication_dev_ctx = nccl_ctxs_->DevCtx(places_[i]);
auto *communication_dev_ctx =
nccl_ctxs_ ? nccl_ctxs_->DevCtx(places_[i])
: platform::DeviceContextPool::Instance().Get(places_[i]);
#else
auto *communication_dev_ctx =
platform::DeviceContextPool::Instance().Get(platform::CPUPlace());
......@@ -424,12 +434,9 @@ VarHandle *MultiDevSSAGraphBuilder::CreateReduceOp(SSAGraph *result,
auto *op_handle = result->ops_.back().get();
for (size_t i = 0; i < places_.size(); ++i) {
auto &vars = result->vars_[i][og];
#ifndef PADDLE_WITH_CUDA
auto &p = places_[i];
op_handle->SetDeviceContext(p,
platform::DeviceContextPool::Instance().Get(p));
#endif
SetCommunicationContext(op_handle, p);
auto &vars = result->vars_[i][og];
PADDLE_ENFORCE(!vars.empty());
auto &prev_grad = vars.back();
op_handle->AddInput(prev_grad.get());
......
......@@ -100,7 +100,7 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
const std::vector<std::unordered_set<std::string>> &var_name_on_devices,
const OpDesc &op) const;
void InsertNCCLAllReduceOp(SSAGraph *result, const std::string &og) const;
void InsertAllReduceOp(SSAGraph *result, const std::string &og) const;
void CreateBroadcastOp(SSAGraph *result, const std::string &p_name,
size_t src_dev_id) const;
......@@ -111,6 +111,9 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
private:
BuildStrategy strategy_;
void SetCommunicationContext(OpHandleBase *op_handle,
const platform::Place &p) const;
};
} // namespace details
} // namespace framework
......
......@@ -39,9 +39,9 @@ OpHandleBase::~OpHandleBase() {
#endif
}
void OpHandleBase::Run(bool use_event) {
void OpHandleBase::Run(bool use_cuda) {
#ifdef PADDLE_WITH_CUDA
if (events_.empty() && use_event) {
if (events_.empty() && use_cuda) {
for (auto &p : dev_ctxes_) {
int dev_id = boost::get<platform::CUDAPlace>(p.first).device;
PADDLE_ENFORCE(cudaSetDevice(dev_id));
......@@ -50,7 +50,7 @@ void OpHandleBase::Run(bool use_event) {
}
}
#else
PADDLE_ENFORCE(!use_event);
PADDLE_ENFORCE(!use_cuda);
#endif
RunImpl();
......
......@@ -36,7 +36,7 @@ class OpHandleBase {
virtual std::string Name() const = 0;
void Run(bool use_event);
void Run(bool use_cuda);
virtual void RecordWaitEventOnCtx(platform::DeviceContext *waited_ctx);
......
......@@ -37,7 +37,9 @@ struct ReduceLoDTensor {
PADDLE_ENFORCE_NE(t0.numel(), 0);
dst_tensor_.Resize(t0.dims());
T *dst = dst_tensor_.mutable_data<T>(platform::CPUPlace());
std::copy(t0.data<T>(), t0.data<T>() + t0.numel(), dst);
if (dst != t0.data<T>()) {
std::copy(t0.data<T>(), t0.data<T>() + t0.numel(), dst);
}
for (size_t i = 1; i < src_tensors_.size(); ++i) {
auto &t = *src_tensors_[i];
......
......@@ -40,7 +40,11 @@ class SSAGraphBuilderFactory {
loss_var_name_(loss_var_name),
param_names_(param_names),
local_scopes_(local_scopes),
strategy_(strategy) {}
strategy_(strategy) {
#ifdef PADDLE_WITH_CUDA
nccl_ctxs_ = nullptr;
#endif
}
#ifdef PADDLE_WITH_CUDA
void SetNCCLContextMap(platform::NCCLContextMap* nccl_ctxs) {
......
......@@ -193,7 +193,7 @@ void ThreadedSSAGraphExecutor::RunOp(
if (VLOG_IS_ON(10)) {
VLOG(10) << op << " " << op->Name() << " : " << op->DebugString();
}
op->Run(strategy_.use_event_);
op->Run(strategy_.use_cuda_);
VLOG(10) << op << " " << op->Name() << " Done ";
running_ops_--;
ready_var_q->Extend(op->Outputs());
......
......@@ -43,7 +43,8 @@ class ParallelExecutorPrivate {
#ifdef PADDLE_WITH_CUDA
std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
#endif
bool own_local_scope;
bool own_local_scope_;
bool use_cuda_;
};
std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
......@@ -60,35 +61,40 @@ ParallelExecutor::ParallelExecutor(
size_t num_trainers, size_t trainer_id)
: member_(new ParallelExecutorPrivate(places)) {
member_->global_scope_ = scope;
member_->use_cuda_ = exec_strategy.use_cuda_;
// Step 1. Bcast the params to devs.
// Create local scopes
if (local_scopes.empty()) {
member_->own_local_scope = true;
member_->own_local_scope_ = true;
member_->local_scopes_.emplace_back(member_->global_scope_);
for (size_t i = 1; i < member_->places_.size(); ++i) {
member_->local_scopes_.emplace_back(&scope->NewScope());
}
} else {
member_->own_local_scope = false;
member_->own_local_scope_ = false;
PADDLE_ENFORCE_EQ(member_->places_.size(), local_scopes.size());
for (size_t i = 0; i < member_->places_.size(); ++i) {
member_->local_scopes_.emplace_back(&local_scopes[i]->NewScope());
}
}
if (member_->use_cuda_) {
// Bcast Parameters to all GPUs
#ifdef PADDLE_WITH_CUDA
auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME);
ncclUniqueId *nccl_id = nullptr;
if (nccl_id_var != nullptr) {
nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
}
member_->nccl_ctxs_.reset(new platform::NCCLContextMap(
member_->places_, nccl_id, num_trainers, trainer_id));
auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME);
ncclUniqueId *nccl_id = nullptr;
if (nccl_id_var != nullptr) {
nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
}
member_->nccl_ctxs_.reset(new platform::NCCLContextMap(
member_->places_, nccl_id, num_trainers, trainer_id));
#else
PADDLE_THROW("Not compiled with CUDA");
#endif
if (platform::is_gpu_place(places[0]) && member_->local_scopes_.size() != 1 &&
local_scopes.empty()) { // Is CUDA
}
if (member_->local_scopes_.size() != 1 && local_scopes.empty()) {
BCastParamsToGPUs(bcast_vars);
}
// Startup Program has been run. All local scopes has correct parameters.
......@@ -108,9 +114,13 @@ ParallelExecutor::ParallelExecutor(
details::SSAGraphBuilderFactory builder_factory(
member_->places_, loss_var_name, params, member_->local_scopes_,
build_strategy);
if (member_->use_cuda_) {
#ifdef PADDLE_WITH_CUDA
builder_factory.SetNCCLContextMap(member_->nccl_ctxs_.get());
builder_factory.SetNCCLContextMap(member_->nccl_ctxs_.get());
#else
PADDLE_THROW("Not compiled with CUDA");
#endif
}
member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, places,
......@@ -123,7 +133,6 @@ ParallelExecutor::ParallelExecutor(
void ParallelExecutor::BCastParamsToGPUs(
const std::unordered_set<std::string> &vars) const {
#ifdef PADDLE_WITH_CUDA
auto *main_scope = member_->local_scopes_[0];
for (auto &var : vars) {
......@@ -135,6 +144,7 @@ void ParallelExecutor::BCastParamsToGPUs(
auto &main_tensor = main_var->Get<LoDTensor>();
auto &dims = main_tensor.dims();
if (paddle::platform::is_gpu_place(main_tensor.place())) {
#ifdef PADDLE_WITH_CUDA
size_t numel = main_tensor.numel();
ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
platform::NCCLGroupGuard guard;
......@@ -153,6 +163,10 @@ void ParallelExecutor::BCastParamsToGPUs(
platform::dynload::ncclBcast(buffer, numel, data_type, 0,
nccl_ctx.comm_, nccl_ctx.stream());
}
member_->nccl_ctxs_->WaitAll();
#else
PADDLE_THROW("Not compiled with CUDA");
#endif
} else {
platform::CPUPlace cpu;
for (size_t i = 1; i < member_->places_.size(); ++i) {
......@@ -163,11 +177,7 @@ void ParallelExecutor::BCastParamsToGPUs(
paddle::framework::TensorCopy(main_tensor, cpu, t);
}
}
member_->nccl_ctxs_->WaitAll();
}
#else
PADDLE_THROW("Not compiled with CUDA");
#endif
}
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
......@@ -213,7 +223,7 @@ void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
}
ParallelExecutor::~ParallelExecutor() {
if (member_->own_local_scope) {
if (member_->own_local_scope_) {
for (size_t i = 1; i < member_->local_scopes_.size(); ++i) {
member_->global_scope_->DeleteScope(member_->local_scopes_[i]);
}
......
......@@ -41,11 +41,22 @@ class RequestBase {
virtual ~RequestBase() {}
virtual void Process() = 0;
CallStatus Status() { return status_; }
void SetStatus(CallStatus status) { status_ = status; }
CallStatus Status() const {
std::lock_guard<std::mutex> l(status_mu_);
return status_;
}
template <typename T>
void Finish(const T& reply, ServerAsyncResponseWriter<T>* responder) {
std::lock_guard<std::mutex> l(status_mu_);
status_ = FINISH;
responder->Finish(reply, ::grpc::Status::OK,
reinterpret_cast<void*>(static_cast<intptr_t>(req_id_)));
}
virtual std::string GetReqName() = 0;
protected:
mutable std::mutex status_mu_;
::grpc::ServerContext ctx_;
GrpcService::AsyncService* service_;
::grpc::ServerCompletionQueue* cq_;
......@@ -80,9 +91,7 @@ class RequestSend final : public RequestBase {
framework::Variable* outvar = nullptr;
request_handler_->Handle(varname, scope, invar, &outvar);
status_ = FINISH;
responder_.Finish(reply_, ::grpc::Status::OK,
reinterpret_cast<void*>(static_cast<intptr_t>(req_id_)));
Finish(reply_, &responder_);
}
protected:
......@@ -122,9 +131,7 @@ class RequestGet final : public RequestBase {
SerializeToByteBuffer(varname, outvar, *request_handler_->dev_ctx(),
&reply_);
}
status_ = FINISH;
responder_.Finish(reply_, ::grpc::Status::OK,
reinterpret_cast<void*>(static_cast<intptr_t>(req_id_)));
Finish(reply_, &responder_);
}
protected:
......@@ -157,8 +164,8 @@ class RequestPrefetch final : public RequestBase {
// prefetch process...
std::string in_var_name = request_->Varname();
std::string out_var_name = request_->OutVarname();
VLOG(3) << "in_var_name: " << in_var_name
<< " RequestPrefetch: " << out_var_name;
VLOG(3) << "RequestPrefetch, in_var_name: " << in_var_name
<< " out_var_name: " << out_var_name;
auto scope = request_->GetMutableLocalScope();
auto invar = scope->FindVar(in_var_name);
......@@ -168,9 +175,7 @@ class RequestPrefetch final : public RequestBase {
SerializeToByteBuffer(out_var_name, outvar, *request_handler_->dev_ctx(),
&reply_);
status_ = FINISH;
responder_.Finish(reply_, ::grpc::Status::OK,
reinterpret_cast<void*>(static_cast<intptr_t>(req_id_)));
Finish(reply_, &responder_);
}
protected:
......
......@@ -53,6 +53,7 @@ class AsyncGRPCServer final : public RPCServer {
void StartServer() override;
private:
// HandleRequest needs to be thread-safe.
void HandleRequest(
::grpc::ServerCompletionQueue* cq, const std::string& rpc_name,
std::function<void(const std::string&, int)> TryToRegisterNewOne);
......
......@@ -509,10 +509,10 @@ All parameter, weight, gradient are variables in Paddle.
self.num_threads_ = num_threads;
})
.def_property(
"use_event",
[](const ExecutionStrategy &self) { return self.use_event_; },
[](ExecutionStrategy &self, bool use_event) {
self.use_event_ = use_event;
"use_cuda",
[](const ExecutionStrategy &self) { return self.use_cuda_; },
[](ExecutionStrategy &self, bool use_cuda) {
self.use_cuda_ = use_cuda;
})
.def_property(
"allow_op_delay",
......
......@@ -119,7 +119,8 @@ def reader_creator(data_file,
yield sample, int(label) - 1
if use_xmap:
return xmap_readers(mapper, reader, cpu_count(), buffered_size)
cpu_num = int(os.environ.get('CPU_NUM', cpu_count()))
return xmap_readers(mapper, reader, cpu_num, buffered_size)
else:
return map_readers(mapper, reader)
......
......@@ -15,6 +15,7 @@
from __future__ import print_function
import core
import numpy
import os
import six.moves as six
import multiprocessing
......@@ -150,7 +151,9 @@ class DataFeeder(object):
elif isinstance(self.place, core.CUDAPlace):
return core.get_cuda_device_count()
else:
return multiprocessing.cpu_count()
cpu_num = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
return cpu_num
def decorate_reader(self,
reader,
......
......@@ -18,6 +18,7 @@ import framework
import executor
import warnings
import sys
import os
__all__ = ['ParallelExecutor', 'ExecutionStrategy', 'BuildStrategy']
......@@ -101,7 +102,9 @@ class ParallelExecutor(object):
p.set_place(self._act_places[-1])
self._places.append(p)
else:
for i in xrange(multiprocessing.cpu_count()):
cpu_num = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
for i in xrange(cpu_num):
p = core.Place()
self._act_places.append(core.CPUPlace())
p.set_place(self._act_places[-1])
......@@ -110,19 +113,17 @@ class ParallelExecutor(object):
if exec_strategy is None:
exec_strategy = ExecutionStrategy()
if use_cuda:
exec_strategy.use_event = True
else:
exec_strategy.use_event = False
exec_strategy.use_cuda = use_cuda
if exec_strategy.num_threads == 0:
if use_cuda:
# Experiments on se-resnext shows that too many threads hurt
# performance. Worth tunning for other models in the future.
exec_strategy.num_threads = len(self._places) * 2
exec_strategy.num_threads = len(self._places) * 4
else:
exec_strategy.num_threads = min(
len(self._places) * 2, multiprocessing.cpu_count())
cpu_num = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
exec_strategy.num_threads = cpu_num
if build_strategy is None:
build_strategy = BuildStrategy()
......
......@@ -41,8 +41,8 @@ function(py_test_modules TARGET_NAME)
endfunction()
list(REMOVE_ITEM TEST_OPS test_warpctc_op)
list(REMOVE_ITEM TEST_OPS test_dist_train)
list(REMOVE_ITEM TEST_OPS test_parallel_executor_crf)
list(REMOVE_ITEM TEST_OPS test_parallel_executor_fetch_feed)
#list(REMOVE_ITEM TEST_OPS test_parallel_executor_crf)
#list(REMOVE_ITEM TEST_OPS test_parallel_executor_fetch_feed)
# TODO(wuyi): this test hungs on CI, will add it back later
list(REMOVE_ITEM TEST_OPS test_listen_and_serv_op)
foreach(TEST_OP ${TEST_OPS})
......
......@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
import os
import unittest
import paddle.fluid as fluid
import time
......@@ -23,6 +25,7 @@ __all__ = ['TestParallelExecutorBase']
class TestParallelExecutorBase(unittest.TestCase):
def check_network_convergence(self,
method,
use_cuda=True,
memory_opt=True,
iter=50,
batch_size=None,
......@@ -53,7 +56,7 @@ class TestParallelExecutorBase(unittest.TestCase):
adam.minimize(loss)
if memory_opt:
fluid.memory_optimize(main)
place = fluid.CUDAPlace(0)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
startup_exe = fluid.Executor(place)
startup_exe.run(startup)
exec_strategy = fluid.ExecutionStrategy()
......@@ -64,7 +67,7 @@ class TestParallelExecutorBase(unittest.TestCase):
if use_parallel_executor:
exe = fluid.ParallelExecutor(
True,
use_cuda,
loss_name=loss.name,
exec_strategy=exec_strategy,
build_strategy=build_strategy)
......@@ -72,7 +75,9 @@ class TestParallelExecutorBase(unittest.TestCase):
exe = fluid.Executor(place=place)
if batch_size is not None:
batch_size *= fluid.core.get_cuda_device_count()
batch_size *= fluid.core.get_cuda_device_count(
) if use_cuda else int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
begin = time.time()
first_loss, = run_executor(
exe=exe, feed=feed_dict, fetch_list=[loss.name])
......
......@@ -17,6 +17,7 @@ import paddle.fluid as fluid
import unittest
import paddle
import numpy as np
import os
word_dict, verb_dict, label_dict = conll05.get_dict()
word_dict_len = len(word_dict)
......@@ -101,7 +102,11 @@ def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark,
class TestCRFModel(unittest.TestCase):
def check_network_convergence(self, is_sparse, build_strategy=None):
def check_network_convergence(self,
is_sparse,
build_strategy=None,
use_cuda=True):
os.environ['CPU_NUM'] = str(4)
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
......@@ -145,12 +150,12 @@ class TestCRFModel(unittest.TestCase):
paddle.dataset.conll05.test(), buf_size=8192),
batch_size=16)
place = fluid.CUDAPlace(0)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup)
pe = fluid.ParallelExecutor(
use_cuda=True,
use_cuda=use_cuda,
loss_name=avg_cost.name,
build_strategy=build_strategy)
......@@ -172,25 +177,33 @@ class TestCRFModel(unittest.TestCase):
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce
self.check_network_convergence(
is_sparse=True, build_strategy=build_strategy)
is_sparse=True, build_strategy=build_strategy, use_cuda=True)
self.check_network_convergence(
is_sparse=True, build_strategy=build_strategy, use_cuda=False)
def test_update_dense_parameter_all_reduce(self):
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce
self.check_network_convergence(
is_sparse=False, build_strategy=build_strategy)
is_sparse=False, build_strategy=build_strategy, use_cuda=True)
self.check_network_convergence(
is_sparse=False, build_strategy=build_strategy, use_cuda=False)
def test_update_sparse_parameter_reduce(self):
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
self.check_network_convergence(
is_sparse=True, build_strategy=build_strategy)
is_sparse=True, build_strategy=build_strategy, use_cuda=True)
self.check_network_convergence(
is_sparse=True, build_strategy=build_strategy, use_cuda=False)
def test_update_dense_parameter_reduce(self):
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
self.check_network_convergence(
is_sparse=False, build_strategy=build_strategy)
is_sparse=False, build_strategy=build_strategy, use_cuda=True)
self.check_network_convergence(
is_sparse=False, build_strategy=build_strategy, use_cuda=False)
if __name__ == '__main__':
......
......@@ -18,6 +18,7 @@ import paddle.fluid as fluid
import unittest
import numpy as np
import paddle
import os
def Lenet(data, class_dim):
......@@ -35,7 +36,7 @@ def Lenet(data, class_dim):
class TestFetchOp(unittest.TestCase):
def parallel_exe(self, train_inputs, seed):
def parallel_exe(self, train_inputs, seed, use_cuda):
main = fluid.Program()
startup = fluid.Program()
startup.random_seed = seed
......@@ -59,13 +60,13 @@ class TestFetchOp(unittest.TestCase):
# conv2d_1.b_0@GRAD. Those variables should not be pruned.
# fluid.memory_optimize(main)
place = fluid.CUDAPlace(0)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup)
feeder = fluid.DataFeeder(place=place, feed_list=[data, label])
pe = fluid.ParallelExecutor(
use_cuda=True, loss_name=loss.name, main_program=main)
use_cuda=use_cuda, loss_name=loss.name, main_program=main)
fetch_list = []
all_vars = main.global_block().vars
......@@ -88,14 +89,16 @@ class TestFetchOp(unittest.TestCase):
for i in range(iters):
train_inputs.append(tst_reader_iter.next())
self.parallel_exe(train_inputs, seed=1)
os.environ['CPU_NUM'] = str(4)
self.parallel_exe(train_inputs, seed=1, use_cuda=True)
self.parallel_exe(train_inputs, seed=1, use_cuda=False)
class TestFeedParallel(unittest.TestCase):
def test_main(self):
def parallel_exe(self, use_cuda, seed):
main = fluid.Program()
startup = fluid.Program()
startup.random_seed = 1
startup.random_seed = seed
with fluid.scope_guard(fluid.core.Scope()):
with fluid.program_guard(main, startup):
data = fluid.layers.data(
......@@ -111,15 +114,18 @@ class TestFeedParallel(unittest.TestCase):
regularization=fluid.regularizer.L2Decay(1e-4))
opt.minimize(loss)
place = fluid.CUDAPlace(0)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
feeder = fluid.DataFeeder(place=place, feed_list=[data, label])
reader = feeder.decorate_reader(
paddle.batch(
flowers.train(), batch_size=16), multi_devices=True)
exe = fluid.Executor(place)
exe.run(startup)
pe = fluid.ParallelExecutor(
use_cuda=True, loss_name=loss.name, main_program=main)
use_cuda=use_cuda, loss_name=loss.name, main_program=main)
for batch_id, data in enumerate(reader()):
loss_np = np.array(pe.run(feed=data, fetch_list=[loss.name])[0])
......@@ -127,6 +133,11 @@ class TestFeedParallel(unittest.TestCase):
if batch_id == 2:
break
def test_feed_op(self):
os.environ['CPU_NUM'] = str(4)
self.parallel_exe(use_cuda=True, seed=1)
self.parallel_exe(use_cuda=False, seed=1)
if __name__ == '__main__':
unittest.main()
......@@ -18,6 +18,7 @@ import numpy as np
import paddle
import paddle.dataset.mnist as mnist
import unittest
import os
MNIST_RECORDIO_FILE = "./mnist_test_pe.recordio"
......@@ -85,6 +86,7 @@ def fc_with_batchnorm(use_feed):
class TestMNIST(TestParallelExecutorBase):
@classmethod
def setUpClass(cls):
os.environ['CPU_NUM'] = str(4)
# Convert mnist to recordio file
with fluid.program_guard(fluid.Program(), fluid.Program()):
reader = paddle.batch(mnist.train(), batch_size=4)
......@@ -99,9 +101,12 @@ class TestMNIST(TestParallelExecutorBase):
fluid.recordio_writer.convert_reader_to_recordio_file(
MNIST_RECORDIO_FILE, reader, feeder)
def check_simple_fc_convergence(self, balance_parameter_opt_between_cards):
self.check_network_convergence(simple_fc_net)
self.check_network_convergence(simple_fc_net, allow_op_delay=True)
def check_simple_fc_convergence(self,
balance_parameter_opt_between_cards,
use_cuda=True):
self.check_network_convergence(simple_fc_net, use_cuda=use_cuda)
self.check_network_convergence(
simple_fc_net, use_cuda=use_cuda, allow_op_delay=True)
img = np.zeros(shape=[32, 784], dtype='float32')
label = np.ones(shape=[32, 1], dtype='int64')
......@@ -109,17 +114,21 @@ class TestMNIST(TestParallelExecutorBase):
simple_fc_net,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
)
def test_simple_fc(self):
self.check_simple_fc_convergence(False)
self.check_simple_fc_convergence(False, use_cuda=True)
self.check_simple_fc_convergence(False, use_cuda=False)
def test_simple_fc_with_new_strategy(self):
self.check_simple_fc_convergence(True)
self.check_simple_fc_convergence(True, use_cuda=True)
self.check_simple_fc_convergence(True, use_cuda=False)
def check_simple_fc_parallel_accuracy(self,
balance_parameter_opt_between_cards):
balance_parameter_opt_between_cards,
use_cuda=True):
img = np.zeros(shape=[32, 784], dtype='float32')
label = np.ones(shape=[32, 1], dtype='int64')
single_first_loss, single_last_loss = self.check_network_convergence(
......@@ -127,12 +136,14 @@ class TestMNIST(TestParallelExecutorBase):
seed=1000,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_parallel_executor=False)
parallel_first_loss, parallel_last_loss = self.check_network_convergence(
method=simple_fc_net,
seed=1000,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_parallel_executor=True,
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
)
......@@ -143,28 +154,33 @@ class TestMNIST(TestParallelExecutorBase):
self.assertAlmostEquals(p_l, single_last_loss[0], delta=1e-6)
def test_simple_fc_parallel_accuracy(self):
self.check_simple_fc_parallel_accuracy(False)
self.check_simple_fc_parallel_accuracy(False, use_cuda=True)
self.check_simple_fc_parallel_accuracy(False, use_cuda=False)
def test_simple_fc_parallel_accuracy_with_new_strategy(self):
self.check_simple_fc_parallel_accuracy(True)
self.check_simple_fc_parallel_accuracy(True, use_cuda=True)
self.check_simple_fc_parallel_accuracy(True, use_cuda=False)
def check_batchnorm_fc_convergence(self,
balance_parameter_opt_between_cards):
self.check_network_convergence(fc_with_batchnorm)
def check_batchnorm_fc_convergence(
self, balance_parameter_opt_between_cards, use_cuda):
self.check_network_convergence(fc_with_batchnorm, use_cuda=use_cuda)
img = np.zeros(shape=[32, 784], dtype='float32')
label = np.ones(shape=[32, 1], dtype='int64')
self.check_network_convergence(
fc_with_batchnorm,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
)
def test_batchnorm_fc(self):
self.check_batchnorm_fc_convergence(False)
self.check_batchnorm_fc_convergence(False, use_cuda=True)
self.check_batchnorm_fc_convergence(False, use_cuda=False)
def test_batchnorm_fc_with_new_strategy(self):
self.check_batchnorm_fc_convergence(True)
self.check_batchnorm_fc_convergence(True, use_cuda=True)
self.check_batchnorm_fc_convergence(True, use_cuda=False)
if __name__ == '__main__':
......
......@@ -15,6 +15,7 @@
import paddle.fluid as fluid
from parallel_executor_test_base import TestParallelExecutorBase
import unittest
import os
def squeeze_excitation(input, num_channels, reduction_ratio):
......@@ -130,22 +131,30 @@ def SE_ResNeXt50Small(batch_size=2, use_feed=False):
class TestResnet(TestParallelExecutorBase):
def check_resnet_convergence(self, balance_parameter_opt_between_cards):
def check_resnet_convergence(self,
balance_parameter_opt_between_cards,
use_cuda=True,
iter=20):
os.environ['CPU_NUM'] = str(4)
import functools
batch_size = 2
self.check_network_convergence(
functools.partial(
SE_ResNeXt50Small, batch_size=batch_size),
iter=20,
iter=iter,
batch_size=batch_size,
use_cuda=use_cuda,
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
)
def test_resnet(self):
self.check_resnet_convergence(False)
self.check_resnet_convergence(False, use_cuda=True)
self.check_resnet_convergence(False, use_cuda=False, iter=5)
def test_resnet_with_new_strategy(self):
self.check_resnet_convergence(True)
self.check_resnet_convergence(True, use_cuda=True)
self.check_resnet_convergence(True, use_cuda=False, iter=5)
if __name__ == '__main__':
......
......@@ -15,6 +15,7 @@
import paddle.fluid as fluid
import numpy as np
import unittest
import os
def simple_fc_net():
......@@ -35,7 +36,8 @@ def simple_fc_net():
class ParallelExecutorTestingDuringTraining(unittest.TestCase):
def check_network_convergence(self, build_strategy=None):
def check_network_convergence(self, use_cuda, build_strategy=None):
os.environ['CPU_NUM'] = str(4)
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
......@@ -49,19 +51,19 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase):
image = np.random.normal(size=(batch_size, 784)).astype('float32')
label = np.random.randint(0, 10, (batch_size, 1), dtype="int64")
place = fluid.CUDAPlace(0)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup)
feed_dict = {'image': image, 'label': label}
train_exe = fluid.ParallelExecutor(
use_cuda=True,
use_cuda=use_cuda,
loss_name=loss.name,
main_program=main,
build_strategy=build_strategy)
test_exe = fluid.ParallelExecutor(
use_cuda=True,
use_cuda=use_cuda,
main_program=test_program,
share_vars_from=train_exe,
build_strategy=build_strategy)
......@@ -81,12 +83,18 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase):
def test_parallel_testing(self):
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce
self.check_network_convergence(build_strategy)
self.check_network_convergence(
use_cuda=True, build_strategy=build_strategy)
self.check_network_convergence(
use_cuda=False, build_strategy=build_strategy)
def test_parallel_testing_with_new_strategy(self):
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
self.check_network_convergence(build_strategy)
self.check_network_convergence(
use_cuda=True, build_strategy=build_strategy)
self.check_network_convergence(
use_cuda=False, build_strategy=build_strategy)
if __name__ == '__main__':
......
......@@ -19,6 +19,7 @@ from parallel_executor_test_base import TestParallelExecutorBase
import unittest
import paddle
import paddle.dataset.wmt16 as wmt16
import os
WMT16_RECORDIO_FILE = "./wmt16_test_pe.recordio"
......@@ -149,6 +150,7 @@ def transformer(use_feed):
class TestTransformer(TestParallelExecutorBase):
@classmethod
def setUpClass(cls):
os.environ['CPU_NUM'] = str(4)
reader = paddle.batch(
wmt16.train(ModelHyperParams.src_vocab_size,
ModelHyperParams.trg_vocab_size),
......@@ -167,7 +169,8 @@ class TestTransformer(TestParallelExecutorBase):
@unittest.skip("transformer is buggy in multi gpu")
def test_main(self):
self.check_network_convergence(transformer)
self.check_network_convergence(transformer, use_cuda=True)
self.check_network_convergence(transformer, use_cuda=False)
if __name__ == '__main__':
......
......@@ -119,7 +119,8 @@ def reader_creator(data_file,
yield sample, int(label) - 1
if use_xmap:
return xmap_readers(mapper, reader, cpu_count(), buffered_size)
cpu_num = int(os.environ.get('CPU_NUM', cpu_count()))
return xmap_readers(mapper, reader, cpu_num, buffered_size)
else:
return map_readers(mapper, reader)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册