diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index b080e516dec6afcf145abef6d6ad0f061deb985d..4d62edfef4b996235819d63147f2809dd50f2cfb 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -80,7 +80,7 @@ ParallelExecutor::ParallelExecutor( // Bcast Parameters to all GPUs #ifdef PADDLE_WITH_CUDA - auto *nccl_id_var = scope->FindVar("NCCLID"); + auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME); ncclUniqueId *nccl_id = nullptr; if (nccl_id_var != nullptr) { nccl_id = nccl_id_var->GetMutable(); diff --git a/paddle/fluid/operators/CMakeLists.txt b/paddle/fluid/operators/CMakeLists.txt index 2b8df6c35fb4b83310445ec21e9088286c891581..48d0af1a5b23b5e6e93edf9dca184ba7ac8b7629 100644 --- a/paddle/fluid/operators/CMakeLists.txt +++ b/paddle/fluid/operators/CMakeLists.txt @@ -187,7 +187,7 @@ if(WITH_DISTRIBUTE) if(WITH_GPU) op_library(gen_nccl_id_op DEPS nccl_common) else() - set(DEPS_OPS ${DEPS_OPS} gen_nccl_id_op) + set(DEPS_OPS ${DEPS_OPS} gen_nccl_id_op) endif() set(DISTRIBUTE_DEPS sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf) set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") diff --git a/paddle/fluid/operators/detail/sendrecvop_utils.cc b/paddle/fluid/operators/detail/sendrecvop_utils.cc index 247130f571963cb1d1ea197ba605bba615d7cd23..f3ac1499475b07244afad919c23693c76e1efe8b 100644 --- a/paddle/fluid/operators/detail/sendrecvop_utils.cc +++ b/paddle/fluid/operators/detail/sendrecvop_utils.cc @@ -162,8 +162,8 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, if (var->IsType()) { e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, NCCL_UNIQUE_ID_BYTES); - ncclUniqueId* uid = var->GetMutable(); - e.WriteRawBytes(std::string(uid->internal, NCCL_UNIQUE_ID_BYTES)); + ncclUniqueId& uid = var->Get(); + e.WriteRawBytes(std::string(uid.internal, NCCL_UNIQUE_ID_BYTES)); // for serialize NCCL_ID ::grpc::Slice slices(e.size()); diff --git a/paddle/fluid/operators/gen_nccl_id_op.cc b/paddle/fluid/operators/gen_nccl_id_op.cc index cfdeaee00b98343e7706fd3b7d5c54a2fb9dfffa..1cddc998e4cbabe42a7182c58c008074d38ddbf4 100644 --- a/paddle/fluid/operators/gen_nccl_id_op.cc +++ b/paddle/fluid/operators/gen_nccl_id_op.cc @@ -52,17 +52,17 @@ class GenNCCLIdOp : public framework::OperatorBase { private: void GenerateAndSend(framework::Scope* scope, const platform::DeviceContext& dev_ctx) const { - auto var = scope->FindVar("NCCLID"); + auto var = scope->FindVar(NCCL_ID_VARNAME); PADDLE_ENFORCE_NOT_NULL(var); auto id = var->GetMutable(); - platform::dynload::ncclGetUniqueId(id); + PADDLE_ENFORCE(platform::dynload::ncclGetUniqueId(id)); std::vector endpoint_list = Attr>("endpoint_list"); detail::RPCClient client; for (auto& ep : endpoint_list) { VLOG(3) << "sending nccl id to " << ep; - client.AsyncSendVariable(ep, dev_ctx, *scope, "NCCLID"); + client.AsyncSendVariable(ep, dev_ctx, *scope, NCCL_ID_VARNAME); } client.Wait(); VLOG(3) << "sending completed..."; @@ -71,6 +71,9 @@ class GenNCCLIdOp : public framework::OperatorBase { void GetIdByServer(framework::Scope* scope, const platform::DeviceContext& dev_ctx) const { std::string endpoint = Attr("endpoint"); + // NOTE: Can not use unique_ptr here because the default + // deleter will call GRPC Server's base class's dtor and + // that will cause a wired crash. rpc_service_ = new detail::AsyncGRPCServer(endpoint, true); framework::ProgramDesc empty_program; framework::Executor executor(dev_ctx.GetPlace()); diff --git a/paddle/fluid/operators/test_send_nccl_id.cc b/paddle/fluid/operators/test_send_nccl_id.cc index 6781f85c4a31ffd3c849f6a929fb151fbb2eb409..2c3c5ea0a0ddeeb2a0c5265cebad7fe014e6f6dd 100644 --- a/paddle/fluid/operators/test_send_nccl_id.cc +++ b/paddle/fluid/operators/test_send_nccl_id.cc @@ -39,7 +39,7 @@ std::unique_ptr rpc_service; void StartServer() { f::Scope scope; p::CPUPlace place; - scope.Var("NCCLID"); + scope.Var(NCCL_ID_VARNAME); p::DeviceContextPool& pool = p::DeviceContextPool::Instance(); auto& dev_ctx = *pool.Get(p::CPUPlace()); @@ -71,7 +71,7 @@ TEST(SendNcclId, Normal) { p::DeviceContextPool& pool = p::DeviceContextPool::Instance(); auto& dev_ctx = *pool.Get(p::CPUPlace()); - auto var = scope.Var("NCCLID"); + auto var = scope.Var(NCCL_ID_VARNAME); // var->SetType(f::proto::VarType_Type_RAW); auto id = var->GetMutable(); p::dynload::ncclGetUniqueId(id); @@ -80,7 +80,7 @@ TEST(SendNcclId, Normal) { std::string ep = string::Sprintf("127.0.0.1:%d", port); detail::RPCClient client; - client.AsyncSendVariable(ep, dev_ctx, scope, "NCCLID"); + client.AsyncSendVariable(ep, dev_ctx, scope, NCCL_ID_VARNAME); client.Wait(); server_thread.join(); auto* ptr = rpc_service.release(); diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index 094c47007e836cca564c016b7bde5f8221c77566..408721be8b118f11a77287cbb3aca2af8bd81cec 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -21,6 +21,8 @@ #include "paddle/fluid/platform/dynload/nccl.h" #include "paddle/fluid/platform/enforce.h" +#define NCCL_ID_VARNAME "NCCLID" + namespace paddle { namespace platform { @@ -76,7 +78,7 @@ struct NCCLContextMap { explicit NCCLContextMap(const std::vector &places, ncclUniqueId *nccl_id = nullptr, - size_t node_count = 0, size_t trainer_id = 0) { + size_t num_trainers = 0, size_t trainer_id = 0) { PADDLE_ENFORCE(!places.empty()); order_.reserve(places.size()); for (auto &p : places) { @@ -94,16 +96,14 @@ struct NCCLContextMap { std::unique_ptr comms(new ncclComm_t[order_.size()]); // if pass nccl_id here, can assume we are doing multi node training if (nccl_id == nullptr) { - { - std::lock_guard guard(NCCLGroupGuard::NCCLMutex()); - PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( - comms.get(), static_cast(order_.size()), order_.data())); - } + std::lock_guard guard(NCCLGroupGuard::NCCLMutex()); + PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( + comms.get(), static_cast(order_.size()), order_.data())); } else { - PADDLE_ENFORCE_GT(node_count, 0); + PADDLE_ENFORCE_GT(num_trainers, 0); // TODO(wuyi): need to ensure each node have same number of GPUs { - int nranks = node_count * order_.size(); + int nranks = num_trainers * order_.size(); NCCLGroupGuard gurad; for (auto &gpu_id : order_) { int rank = trainer_id * order_.size() + gpu_id; diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index bd92ac548da2a8ca442873228e2cc1bbd9541b45..4f6db7c2be23686f949002722d52fcb69cbe3cea 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -31,7 +31,7 @@ class ParallelExecutor(object): allow_op_delay=False, share_vars_from=None, use_default_grad_scale=True, - num_nodes=0, + num_trainers=0, trainer_id=0): """ ParallelExecutor can run program in parallel. @@ -53,10 +53,10 @@ class ParallelExecutor(object): gradients of each device and scaled gradients would be aggregated. Otherwise, a customized scale value should be fed to the network. - num_nodes(int, default 0): If greater than 0, NCCL will be + num_trainers(int, default 0): If greater than 0, NCCL will be initialized with multpile rank of nodes, each node should have same number of GPUs. Distributed training will be enabled then. - trainer_id(int, default 0): Must use together with num_nodes. + trainer_id(int, default 0): Must use together with num_trainers. trainer_id is the "rank" of current node starts from 0. Returns: @@ -137,7 +137,7 @@ class ParallelExecutor(object): local_scopes, allow_op_delay, use_default_grad_scale, - num_nodes, + num_trainers, trainer_id) self.scope = scope