From 17009d0627a97274299249e3680f58d6e2270ff7 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 7 May 2018 17:10:53 +0800 Subject: [PATCH] workable version --- paddle/fluid/operators/CMakeLists.txt | 1 + paddle/fluid/operators/test_send_nccl_id.cc | 88 +++++++++++++++++++++ paddle/fluid/platform/nccl_helper.h | 7 +- python/paddle/fluid/parallel_executor.py | 5 ++ 4 files changed, 98 insertions(+), 3 deletions(-) create mode 100644 paddle/fluid/operators/test_send_nccl_id.cc diff --git a/paddle/fluid/operators/CMakeLists.txt b/paddle/fluid/operators/CMakeLists.txt index ad0732131..2b8df6c35 100644 --- a/paddle/fluid/operators/CMakeLists.txt +++ b/paddle/fluid/operators/CMakeLists.txt @@ -205,6 +205,7 @@ if(WITH_DISTRIBUTE) set_source_files_properties(send_barrier_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(send_recv_op_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS prefetch_op send_op listen_and_serv_op sum_op executor) + cc_test(test_send_nccl_id SRCS test_send_nccl_id.cc DEPS send_op listen_and_serv_op executor) else() set(DEPS_OPS ${DEPS_OPS} send_op prefetch_op recv_op listen_and_serv_op send_vars_op send_barrier_op gen_nccl_id_op) endif() diff --git a/paddle/fluid/operators/test_send_nccl_id.cc b/paddle/fluid/operators/test_send_nccl_id.cc new file mode 100644 index 000000000..7a8b42566 --- /dev/null +++ b/paddle/fluid/operators/test_send_nccl_id.cc @@ -0,0 +1,88 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include +#include // NOLINT + +#include "gtest/gtest.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/operators/detail/grpc_client.h" +#include "paddle/fluid/operators/listen_and_serv_op.h" +#include "paddle/fluid/operators/math/math_function.h" +#include "paddle/fluid/operators/math/selected_rows_functor.h" +#include "paddle/fluid/string/printf.h" + +USE_NO_KERNEL_OP(listen_and_serv); + +namespace f = paddle::framework; +namespace p = paddle::platform; +namespace m = paddle::operators::math; +namespace detail = paddle::operators::detail; +namespace string = paddle::string; + +std::unique_ptr rpc_service; + +void StartServer() { + f::Scope scope; + p::CPUPlace place; + scope.Var("NCCLID"); + p::DeviceContextPool& pool = p::DeviceContextPool::Instance(); + auto& dev_ctx = *pool.Get(p::CPUPlace()); + + rpc_service.reset(new detail::AsyncGRPCServer("127.0.0.1:0", true)); + + f::ProgramDesc empty_program; + f::Executor executor(dev_ctx.GetPlace()); + rpc_service->SetScope(&scope); + rpc_service->SetDevCtx(&dev_ctx); + rpc_service->SetProgram(&empty_program); + rpc_service->SetExecutor(&executor); + + std::thread server_thread( + std::bind(&detail::AsyncGRPCServer::RunSyncUpdate, rpc_service.get())); + rpc_service->SetCond(0); + auto recv = rpc_service->Get(); + LOG(INFO) << "got nccl id and stop server..."; + rpc_service->ShutDown(); + server_thread.join(); +} + +TEST(SendNcclId, Normal) { + std::thread server_thread(StartServer); + // wait server to start + sleep(2); + + f::Scope scope; + p::CPUPlace place; + p::DeviceContextPool& pool = p::DeviceContextPool::Instance(); + auto& dev_ctx = *pool.Get(p::CPUPlace()); + + auto var = scope.Var("NCCLID"); + // var->SetType(f::proto::VarType_Type_RAW); + auto id = var->GetMutable(); + p::dynload::ncclGetUniqueId(id); + + int port = rpc_service->GetSelectedPort(); + std::string ep = string::Sprintf("127.0.0.1:%d", port); + detail::RPCClient client; + + client.AsyncSendVariable(ep, dev_ctx, scope, "NCCLID"); + client.Wait(); + server_thread.join(); + auto* ptr = rpc_service.release(); + delete ptr; +} diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index f3c4c92af..094c47007 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -14,6 +14,7 @@ #pragma once +#include #include // NOLINT #include #include @@ -100,13 +101,13 @@ struct NCCLContextMap { } } else { PADDLE_ENFORCE_GT(node_count, 0); - PADDLE_ENFORCE_EQ(node_count % places.size(), 0, - "must have same number of GPUs on each node"); + // TODO(wuyi): need to ensure each node have same number of GPUs { - std::lock_guard guard(NCCLGroupGuard::NCCLMutex()); int nranks = node_count * order_.size(); + NCCLGroupGuard gurad; for (auto &gpu_id : order_) { int rank = trainer_id * order_.size() + gpu_id; + VLOG(3) << "init nccl rank: " << rank << " nranks: " << nranks; PADDLE_ENFORCE(cudaSetDevice(gpu_id)); PADDLE_ENFORCE(platform::dynload::ncclCommInitRank( comms.get() + gpu_id, nranks, *nccl_id, rank)); diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 50ec438be..bd92ac548 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -53,6 +53,11 @@ class ParallelExecutor(object): gradients of each device and scaled gradients would be aggregated. Otherwise, a customized scale value should be fed to the network. + num_nodes(int, default 0): If greater than 0, NCCL will be + initialized with multpile rank of nodes, each node should have + same number of GPUs. Distributed training will be enabled then. + trainer_id(int, default 0): Must use together with num_nodes. + trainer_id is the "rank" of current node starts from 0. Returns: A ParallelExecutor object. -- GitLab