From f5840d89258b4d2dd8e3da77db8645d58bbf0d4d Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Fri, 11 May 2018 19:54:35 +0800 Subject: [PATCH] follow comments --- paddle/fluid/framework/parallel_executor.cc | 2 +- paddle/fluid/operators/CMakeLists.txt | 2 +- .../fluid/operators/detail/sendrecvop_utils.cc | 4 ++-- paddle/fluid/operators/gen_nccl_id_op.cc | 9 ++++++--- paddle/fluid/operators/test_send_nccl_id.cc | 6 +++--- paddle/fluid/platform/nccl_helper.h | 16 ++++++++-------- python/paddle/fluid/parallel_executor.py | 8 ++++---- 7 files changed, 25 insertions(+), 22 deletions(-) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index b080e516dec..4d62edfef4b 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -80,7 +80,7 @@ ParallelExecutor::ParallelExecutor( // Bcast Parameters to all GPUs #ifdef PADDLE_WITH_CUDA - auto *nccl_id_var = scope->FindVar("NCCLID"); + auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME); ncclUniqueId *nccl_id = nullptr; if (nccl_id_var != nullptr) { nccl_id = nccl_id_var->GetMutable(); diff --git a/paddle/fluid/operators/CMakeLists.txt b/paddle/fluid/operators/CMakeLists.txt index 2b8df6c35fb..48d0af1a5b2 100644 --- a/paddle/fluid/operators/CMakeLists.txt +++ b/paddle/fluid/operators/CMakeLists.txt @@ -187,7 +187,7 @@ if(WITH_DISTRIBUTE) if(WITH_GPU) op_library(gen_nccl_id_op DEPS nccl_common) else() - set(DEPS_OPS ${DEPS_OPS} gen_nccl_id_op) + set(DEPS_OPS ${DEPS_OPS} gen_nccl_id_op) endif() set(DISTRIBUTE_DEPS sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf) set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") diff --git a/paddle/fluid/operators/detail/sendrecvop_utils.cc b/paddle/fluid/operators/detail/sendrecvop_utils.cc index 247130f5719..f3ac1499475 100644 --- a/paddle/fluid/operators/detail/sendrecvop_utils.cc +++ b/paddle/fluid/operators/detail/sendrecvop_utils.cc @@ -162,8 +162,8 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, if (var->IsType()) { e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, NCCL_UNIQUE_ID_BYTES); - ncclUniqueId* uid = var->GetMutable(); - e.WriteRawBytes(std::string(uid->internal, NCCL_UNIQUE_ID_BYTES)); + ncclUniqueId& uid = var->Get(); + e.WriteRawBytes(std::string(uid.internal, NCCL_UNIQUE_ID_BYTES)); // for serialize NCCL_ID ::grpc::Slice slices(e.size()); diff --git a/paddle/fluid/operators/gen_nccl_id_op.cc b/paddle/fluid/operators/gen_nccl_id_op.cc index cfdeaee00b9..1cddc998e4c 100644 --- a/paddle/fluid/operators/gen_nccl_id_op.cc +++ b/paddle/fluid/operators/gen_nccl_id_op.cc @@ -52,17 +52,17 @@ class GenNCCLIdOp : public framework::OperatorBase { private: void GenerateAndSend(framework::Scope* scope, const platform::DeviceContext& dev_ctx) const { - auto var = scope->FindVar("NCCLID"); + auto var = scope->FindVar(NCCL_ID_VARNAME); PADDLE_ENFORCE_NOT_NULL(var); auto id = var->GetMutable(); - platform::dynload::ncclGetUniqueId(id); + PADDLE_ENFORCE(platform::dynload::ncclGetUniqueId(id)); std::vector endpoint_list = Attr>("endpoint_list"); detail::RPCClient client; for (auto& ep : endpoint_list) { VLOG(3) << "sending nccl id to " << ep; - client.AsyncSendVariable(ep, dev_ctx, *scope, "NCCLID"); + client.AsyncSendVariable(ep, dev_ctx, *scope, NCCL_ID_VARNAME); } client.Wait(); VLOG(3) << "sending completed..."; @@ -71,6 +71,9 @@ class GenNCCLIdOp : public framework::OperatorBase { void GetIdByServer(framework::Scope* scope, const platform::DeviceContext& dev_ctx) const { std::string endpoint = Attr("endpoint"); + // NOTE: Can not use unique_ptr here because the default + // deleter will call GRPC Server's base class's dtor and + // that will cause a wired crash. rpc_service_ = new detail::AsyncGRPCServer(endpoint, true); framework::ProgramDesc empty_program; framework::Executor executor(dev_ctx.GetPlace()); diff --git a/paddle/fluid/operators/test_send_nccl_id.cc b/paddle/fluid/operators/test_send_nccl_id.cc index 6781f85c4a3..2c3c5ea0a0d 100644 --- a/paddle/fluid/operators/test_send_nccl_id.cc +++ b/paddle/fluid/operators/test_send_nccl_id.cc @@ -39,7 +39,7 @@ std::unique_ptr rpc_service; void StartServer() { f::Scope scope; p::CPUPlace place; - scope.Var("NCCLID"); + scope.Var(NCCL_ID_VARNAME); p::DeviceContextPool& pool = p::DeviceContextPool::Instance(); auto& dev_ctx = *pool.Get(p::CPUPlace()); @@ -71,7 +71,7 @@ TEST(SendNcclId, Normal) { p::DeviceContextPool& pool = p::DeviceContextPool::Instance(); auto& dev_ctx = *pool.Get(p::CPUPlace()); - auto var = scope.Var("NCCLID"); + auto var = scope.Var(NCCL_ID_VARNAME); // var->SetType(f::proto::VarType_Type_RAW); auto id = var->GetMutable(); p::dynload::ncclGetUniqueId(id); @@ -80,7 +80,7 @@ TEST(SendNcclId, Normal) { std::string ep = string::Sprintf("127.0.0.1:%d", port); detail::RPCClient client; - client.AsyncSendVariable(ep, dev_ctx, scope, "NCCLID"); + client.AsyncSendVariable(ep, dev_ctx, scope, NCCL_ID_VARNAME); client.Wait(); server_thread.join(); auto* ptr = rpc_service.release(); diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index 094c47007e8..408721be8b1 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -21,6 +21,8 @@ #include "paddle/fluid/platform/dynload/nccl.h" #include "paddle/fluid/platform/enforce.h" +#define NCCL_ID_VARNAME "NCCLID" + namespace paddle { namespace platform { @@ -76,7 +78,7 @@ struct NCCLContextMap { explicit NCCLContextMap(const std::vector &places, ncclUniqueId *nccl_id = nullptr, - size_t node_count = 0, size_t trainer_id = 0) { + size_t num_trainers = 0, size_t trainer_id = 0) { PADDLE_ENFORCE(!places.empty()); order_.reserve(places.size()); for (auto &p : places) { @@ -94,16 +96,14 @@ struct NCCLContextMap { std::unique_ptr comms(new ncclComm_t[order_.size()]); // if pass nccl_id here, can assume we are doing multi node training if (nccl_id == nullptr) { - { - std::lock_guard guard(NCCLGroupGuard::NCCLMutex()); - PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( - comms.get(), static_cast(order_.size()), order_.data())); - } + std::lock_guard guard(NCCLGroupGuard::NCCLMutex()); + PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( + comms.get(), static_cast(order_.size()), order_.data())); } else { - PADDLE_ENFORCE_GT(node_count, 0); + PADDLE_ENFORCE_GT(num_trainers, 0); // TODO(wuyi): need to ensure each node have same number of GPUs { - int nranks = node_count * order_.size(); + int nranks = num_trainers * order_.size(); NCCLGroupGuard gurad; for (auto &gpu_id : order_) { int rank = trainer_id * order_.size() + gpu_id; diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index bd92ac548da..4f6db7c2be2 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -31,7 +31,7 @@ class ParallelExecutor(object): allow_op_delay=False, share_vars_from=None, use_default_grad_scale=True, - num_nodes=0, + num_trainers=0, trainer_id=0): """ ParallelExecutor can run program in parallel. @@ -53,10 +53,10 @@ class ParallelExecutor(object): gradients of each device and scaled gradients would be aggregated. Otherwise, a customized scale value should be fed to the network. - num_nodes(int, default 0): If greater than 0, NCCL will be + num_trainers(int, default 0): If greater than 0, NCCL will be initialized with multpile rank of nodes, each node should have same number of GPUs. Distributed training will be enabled then. - trainer_id(int, default 0): Must use together with num_nodes. + trainer_id(int, default 0): Must use together with num_trainers. trainer_id is the "rank" of current node starts from 0. Returns: @@ -137,7 +137,7 @@ class ParallelExecutor(object): local_scopes, allow_op_delay, use_default_grad_scale, - num_nodes, + num_trainers, trainer_id) self.scope = scope -- GitLab