From d9320dcd944bba599135a711113d831551fa4814 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Fri, 4 May 2018 16:23:30 +0800 Subject: [PATCH] complete code --- paddle/fluid/framework/parallel_executor.cc | 10 ++++-- paddle/fluid/framework/parallel_executor.h | 3 +- paddle/fluid/operators/detail/send_recv.proto | 1 + .../operators/detail/sendrecvop_utils.cc | 21 +++++++++++- .../operators/detail/variable_response.cc | 11 ++++++- paddle/fluid/operators/gen_nccl_id_op.cc | 4 +-- paddle/fluid/platform/nccl_helper.h | 32 +++++++++++++++---- paddle/fluid/pybind/pybind.cc | 6 ++-- 8 files changed, 73 insertions(+), 15 deletions(-) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 4712efeff6..bd2a2cfba5 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -58,7 +58,7 @@ ParallelExecutor::ParallelExecutor( const std::unordered_set &bcast_vars, const ProgramDesc &main_program, const std::string &loss_var_name, Scope *scope, const std::vector &local_scopes, bool allow_op_delay, - bool use_default_grad_scale) + bool use_default_grad_scale, size_t num_trainers, size_t trainer_id) : member_(new ParallelExecutorPrivate(places)) { member_->global_scope_ = scope; @@ -80,7 +80,13 @@ ParallelExecutor::ParallelExecutor( // Bcast Parameters to all GPUs #ifdef PADDLE_WITH_CUDA - member_->nccl_ctxs_.reset(new platform::NCCLContextMap(member_->places_)); + auto *nccl_id_var = scope->FindVar("NCCLID"); + ncclUniqueId *nccl_id = nullptr; + if (nccl_id_var != nullptr) { + nccl_id = nccl_id_var->GetMutable(); + } + member_->nccl_ctxs_.reset(new platform::NCCLContextMap( + member_->places_, nccl_id, num_trainers, trainer_id)); #endif if (platform::is_gpu_place(places[0]) && member_->local_scopes_.size() != 1 && local_scopes.empty()) { // Is CUDA diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index ecd107d81f..306d2bdfaf 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -40,7 +40,8 @@ class ParallelExecutor { const ProgramDesc& main_program, const std::string& loss_var_name, Scope* scope, const std::vector& local_scopes, - bool allow_op_delay, bool use_default_grad_scale); + bool allow_op_delay, bool use_default_grad_scale, + size_t num_trainers = 0, size_t trainer_id = 0); ~ParallelExecutor(); diff --git a/paddle/fluid/operators/detail/send_recv.proto b/paddle/fluid/operators/detail/send_recv.proto index 02bb2b9ceb..3b343a7e56 100644 --- a/paddle/fluid/operators/detail/send_recv.proto +++ b/paddle/fluid/operators/detail/send_recv.proto @@ -32,6 +32,7 @@ service SendRecvService { enum VarType { LOD_TENSOR = 0; SELECTED_ROWS = 1; + NCCL_ID = 2; } // NOTICE(gongwb):don't modify this proto if you are not diff --git a/paddle/fluid/operators/detail/sendrecvop_utils.cc b/paddle/fluid/operators/detail/sendrecvop_utils.cc index 766bcf1ac5..800d6d8bd2 100644 --- a/paddle/fluid/operators/detail/sendrecvop_utils.cc +++ b/paddle/fluid/operators/detail/sendrecvop_utils.cc @@ -43,13 +43,16 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, void* buf = buffer.get(); void* payload = nullptr; - size_t payload_size; + size_t payload_size = 0; ProtoEncodeHelper e(static_cast(buf), 1024); e.WriteString(VarMsg::kVarnameFieldNumber, name); if (var->IsType()) { e.WriteUint64(VarMsg::kTypeFieldNumber, 0); } else if (var->IsType()) { e.WriteUint64(VarMsg::kTypeFieldNumber, 1); + } else if (var->IsType()) { + // NOTE: sendrecv only support RAW type for NCCL_ID + e.WriteUint64(VarMsg::kTypeFieldNumber, 2); } if (!out_name.empty()) { @@ -139,11 +142,27 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, payload_size = tensor->numel() * framework::SizeOfType(tensor->type()); e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size); } break; + case framework::proto::VarType_Type_RAW: { + e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, + NCCL_UNIQUE_ID_BYTES); + ncclUniqueId* uid = var->GetMutable(); + e.WriteRawBytes(std::string(uid->internal, NCCL_UNIQUE_ID_BYTES)); + } break; default: PADDLE_THROW("Serialize does not support type: %s", typeid(var->Type()).name()); break; } + + if (framework::ToVarType(var->Type()) == framework::proto::VarType_Type_RAW) { + // for serialize NCCL_ID + ::grpc::Slice slices(e.size()); + memcpy(const_cast(slices.begin()), e.data(), e.size()); + ::grpc::ByteBuffer tmp(&slices, 1); + msg->Swap(&tmp); + return; + } + // steal reference of tensor data ::grpc::Slice slices[4]; // metadata, tensor, rows meta, rows int num_slices = 2; // only SelectedRows have rows buffer diff --git a/paddle/fluid/operators/detail/variable_response.cc b/paddle/fluid/operators/detail/variable_response.cc index fbef8d02a4..81d755f5fc 100644 --- a/paddle/fluid/operators/detail/variable_response.cc +++ b/paddle/fluid/operators/detail/variable_response.cc @@ -367,9 +367,18 @@ int VariableResponse::Parse(Source* source) { } case sendrecv::VariableMessage::kSerializedFieldNumber: { PADDLE_ENFORCE((meta_.type() == sendrecv::SELECTED_ROWS || - meta_.type() == sendrecv::LOD_TENSOR) && + meta_.type() == sendrecv::LOD_TENSOR || + meta_.type() == sendrecv::NCCL_ID) && meta_.varname() != "", "meta info should be got first!"); + if (meta_.type() == sendrecv::NCCL_ID) { + auto* var = scope_->FindVar(meta_.varname()); + if (var != nullptr) { + ncclUniqueId* id = var->GetMutable(); + memcpy(id->internal, meta_.serialized().c_str(), + meta_.serialized().size()); + } + } int length = 0; if (wt != WIRETYPE_LENGTH_DELIMITED || diff --git a/paddle/fluid/operators/gen_nccl_id_op.cc b/paddle/fluid/operators/gen_nccl_id_op.cc index 235995aeb4..afb228fa6f 100644 --- a/paddle/fluid/operators/gen_nccl_id_op.cc +++ b/paddle/fluid/operators/gen_nccl_id_op.cc @@ -54,7 +54,7 @@ class GenNCCLIdOp : public framework::OperatorBase { auto var = scope->FindVar("NCCLID"); PADDLE_ENFORCE_NOT_NULL(var); auto id = var->GetMutable(); - ncclGetUniqueId(id); + platform::dynload::ncclGetUniqueId(id); std::vector endpoint_list = Attr>("endpoint_list"); @@ -120,4 +120,4 @@ For trainer 1~n: start a gRPC server to get the UniqueId, once got, stop the ser namespace ops = paddle::operators; -REGISTER_OPERATOR(gen_nccl_id_op, ops::GenNCCLIdOp, ops::GenNCCLIdOpMaker); +REGISTER_OPERATOR(gen_nccl_id, ops::GenNCCLIdOp, ops::GenNCCLIdOpMaker); diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index 0013597fd5..3b52587a28 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -73,7 +73,9 @@ struct NCCLContextMap { std::unordered_map contexts_; std::vector order_; - explicit NCCLContextMap(const std::vector &places) { + explicit NCCLContextMap(const std::vector &places, + ncclUniqueId *nccl_id = nullptr, + size_t node_count = 0, size_t trainer_id = 0) { PADDLE_ENFORCE(!places.empty()); order_.reserve(places.size()); for (auto &p : places) { @@ -85,18 +87,36 @@ struct NCCLContextMap { order_.size(), contexts_.size(), "NCCL Context Map does not support contain two or more same device"); - if (places.size() > 1) { - std::unique_ptr comms(new ncclComm_t[order_.size()]); + if (places.size() <= 1) { + return; + } + std::unique_ptr comms(new ncclComm_t[order_.size()]); + // if pass nccl_id here, can assume we are doing multi node training + if (nccl_id == nullptr) { { std::lock_guard guard(NCCLGroupGuard::NCCLMutex()); PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( comms.get(), static_cast(order_.size()), order_.data())); } - int i = 0; - for (auto &dev_id : order_) { - contexts_.at(dev_id).comm_ = comms[i++]; + } else { + PADDLE_ENFORCE_GT(node_count, 0); + PADDLE_ENFORCE_EQ(node_count % places.size(), 0, + "must have same number of GPUs on each node"); + { + std::lock_guard guard(NCCLGroupGuard::NCCLMutex()); + int nranks = node_count * order_.size(); + for (auto &gpu_id : order_) { + int rank = trainer_id * order_.size() + gpu_id; + PADDLE_ENFORCE(cudaSetDevice(gpu_id)); + PADDLE_ENFORCE( + ncclCommInitRank(comms.get() + gpu_id, nranks, *nccl_id, rank)); + } } } + int i = 0; + for (auto &dev_id : order_) { + contexts_.at(dev_id).comm_ = comms[i++]; + } } NCCLContextMap(const NCCLContextMap &other) = delete; diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index c925686f83..827c2701ba 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -502,11 +502,13 @@ All parameter, weight, gradient are variables in Paddle. const std::unordered_set &bcast_vars, const ProgramDesc &main_program, const std::string &loss_var_name, Scope *scope, std::vector &local_scopes, - bool allow_op_delay, bool use_default_grad_scale) { + bool allow_op_delay, bool use_default_grad_scale, + size_t num_trainers, size_t trainer_id) { new (&self) ParallelExecutor( num_threads, use_event, places, params, bcast_vars, main_program, loss_var_name, scope, local_scopes, - allow_op_delay, use_default_grad_scale); + allow_op_delay, use_default_grad_scale, num_trainers, + trainer_id); }) .def("bcast_params", &ParallelExecutor::BCastParamsToGPUs) // NOTE: even we return a vec* to Python use reference policy. -- GitLab