From b4c3a6aa0b87106c20fe2796c5220bb61a3fcc9a Mon Sep 17 00:00:00 2001 From: Yan Xu Date: Thu, 4 Apr 2019 10:12:05 +0800 Subject: [PATCH] [Imperative] implement imperative NCCLParallelContext (#16477) add NCCLParallelContext for parallel dygraph --- paddle/fluid/imperative/CMakeLists.txt | 3 + paddle/fluid/imperative/nccl_context.cc | 134 +++++++++++++++++++ paddle/fluid/imperative/nccl_context.h | 81 +++++++++++ paddle/fluid/imperative/nccl_context_test.cc | 52 +++++++ paddle/fluid/imperative/tracer.cc | 3 +- paddle/fluid/pybind/CMakeLists.txt | 2 +- paddle/fluid/pybind/imperative.cc | 43 +++++- paddle/fluid/pybind/imperative.h | 3 +- paddle/fluid/pybind/pybind.cc | 2 +- python/paddle/distributed/launch.py | 1 + python/paddle/fluid/dygraph/__init__.py | 4 + python/paddle/fluid/dygraph/parallel.py | 60 +++++++++ 12 files changed, 383 insertions(+), 5 deletions(-) create mode 100644 paddle/fluid/imperative/nccl_context.cc create mode 100644 paddle/fluid/imperative/nccl_context.h create mode 100644 paddle/fluid/imperative/nccl_context_test.cc create mode 100644 python/paddle/fluid/dygraph/parallel.py diff --git a/paddle/fluid/imperative/CMakeLists.txt b/paddle/fluid/imperative/CMakeLists.txt index 0d116a64954..e52a0283f72 100644 --- a/paddle/fluid/imperative/CMakeLists.txt +++ b/paddle/fluid/imperative/CMakeLists.txt @@ -3,4 +3,7 @@ cc_library(layer SRCS layer.cc DEPS proto_desc operator device_context blas pybi cc_library(tracer SRCS tracer.cc DEPS proto_desc device_context pybind) cc_library(engine SRCS engine.cc) cc_library(imperative_profiler SRCS profiler.cc) +cc_library(nccl_context SRCS nccl_context.cc DEPS device_context) + +cc_test(nccl_context_test SRCS nccl_context_test.cc DEPS nccl_context) endif() diff --git a/paddle/fluid/imperative/nccl_context.cc b/paddle/fluid/imperative/nccl_context.cc new file mode 100644 index 00000000000..04364e68342 --- /dev/null +++ b/paddle/fluid/imperative/nccl_context.cc @@ -0,0 +1,134 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/imperative/nccl_context.h" + +namespace paddle { +namespace imperative { +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +void NCCLParallelContext::RecvNCCLID(const std::string &ep, + ncclUniqueId *nccl_id) { + auto addr = paddle::string::Split(ep, ':'); + PADDLE_ENFORCE_EQ(addr.size(), 2UL, + "The endpoint should contain host and port: %s", ep); + std::string host = addr[0]; + int port = std::stoi(addr[1]); + + int server_fd, new_socket; + struct sockaddr_in address; + int addrlen = sizeof(address); + char buffer[1024] = {0}; + int opt = 0; + // creating socket fd + if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) + PADDLE_THROW("create server fd failed"); + if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, + sizeof(opt))) + PADDLE_THROW("set socket opt failed"); + + address.sin_family = AF_INET; + address.sin_addr.s_addr = INADDR_ANY; + address.sin_port = htons(port); + + if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) + PADDLE_THROW("binding failed on ep: %s", ep); + VLOG(3) << "listening on: " << ep; + if (listen(server_fd, 3) < 0) PADDLE_THROW("listen on server fd failed"); + + if ((new_socket = + accept(server_fd, reinterpret_cast(&address), + reinterpret_cast(&addrlen))) < 0) + PADDLE_THROW("accept the new socket fd failed"); + + if (read(new_socket, buffer, 1024) < 0) + PADDLE_THROW("reading the ncclUniqueId from socket failed"); + VLOG(3) << "recevived the ncclUniqueId"; + memcpy(nccl_id, buffer, NCCL_UNIQUE_ID_BYTES); + + VLOG(3) << "closing the socket server: " << ep; + close(server_fd); +} + +void NCCLParallelContext::SendNCCLID(const std::string &ep, + ncclUniqueId *nccl_id) { + auto addr = paddle::string::Split(ep, ':'); + PADDLE_ENFORCE_EQ(addr.size(), 2UL, + "The endpoint should contain host and port: %s", ep); + std::string host = addr[0]; + int port = std::stoi(addr[1]); + // struct sockaddr_in address; + int sock = 0; + struct sockaddr_in serv_addr; + char buffer[1024] = {0}; + + memcpy(buffer, nccl_id, NCCL_UNIQUE_ID_BYTES); + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) + PADDLE_THROW("create socket failed"); + + memset(&serv_addr, '0', sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(port); + + if (inet_pton(AF_INET, host.c_str(), &serv_addr.sin_addr) <= 0) + PADDLE_THROW("invalied address: %s", ep); + + while (true) { + if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) { + VLOG(0) << "worker: " << ep + << " is not ready, will retry after 3 seconds..."; + std::this_thread::sleep_for(std::chrono::seconds(3)); + continue; + } + VLOG(3) << "sending the ncclUniqueId to " << ep; + send(sock, buffer, NCCL_UNIQUE_ID_BYTES, 0); + break; + } +} + +void NCCLParallelContext::BcastNCCLId(ncclUniqueId *nccl_id, int root) { + if (strategy_.local_rank_ == root) { + for (auto ep : strategy_.trainer_endpoints_) { + if (ep != strategy_.current_endpoint_) SendNCCLID(ep, nccl_id); + } + } else { + RecvNCCLID(strategy_.current_endpoint_, nccl_id); + } +} + +void NCCLParallelContext::Init() { + ncclUniqueId nccl_id; + ncclComm_t comm; + if (strategy_.local_rank_ == 0) { + // generate the unique ncclid on the root worker + platform::dynload::ncclGetUniqueId(&nccl_id); + BcastNCCLId(&nccl_id, 0); + } else { + BcastNCCLId(&nccl_id, 0); + } + int gpu_id = boost::get(place_).device; + VLOG(0) << "init nccl context nranks: " << strategy_.nranks_ + << " local rank: " << strategy_.local_rank_ << " gpu id: " << gpu_id; + + PADDLE_ENFORCE(cudaSetDevice(gpu_id)); + PADDLE_ENFORCE(platform::dynload::ncclCommInitRank( + &comm, strategy_.nranks_, nccl_id, strategy_.local_rank_)); + + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + auto *dev_ctx = static_cast(pool.Get(place_)); + dev_ctx->set_nccl_comm(comm); +} +#endif + +} // namespace imperative +} // namespace paddle diff --git a/paddle/fluid/imperative/nccl_context.h b/paddle/fluid/imperative/nccl_context.h new file mode 100644 index 00000000000..51c86190439 --- /dev/null +++ b/paddle/fluid/imperative/nccl_context.h @@ -0,0 +1,81 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +// network header files +#ifndef _WIN32 +#include +#include +#include +#include +#endif + +#include +#include + +#include "paddle/fluid/framework/variable.h" +#include "paddle/fluid/platform/device_context.h" +#ifdef PADDLE_WITH_CUDA +#include "paddle/fluid/platform/dynload/nccl.h" +#endif +#include "paddle/fluid/platform/place.h" +#include "paddle/fluid/string/split.h" + +namespace paddle { +namespace imperative { + +struct ParallelStrategy { + int nranks_{1}; + int local_rank_{0}; + std::vector trainer_endpoints_{}; + std::string current_endpoint_{""}; +}; + +class ParallelContext { + public: + explicit ParallelContext(const ParallelStrategy& strategy, + const platform::Place& place) + : strategy_(strategy), place_(place) {} + + virtual ~ParallelContext() {} + + virtual void Init() = 0; + + protected: + ParallelStrategy strategy_; + platform::Place place_; +}; + +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +class NCCLParallelContext : ParallelContext { + public: + explicit NCCLParallelContext(const ParallelStrategy& strategy, + const platform::Place& place) + : ParallelContext(strategy, place) {} + + ~NCCLParallelContext() {} + + void BcastNCCLId(ncclUniqueId* nccl_id, int root); + + void Init() override; + + protected: + void RecvNCCLID(const std::string& endpoint, ncclUniqueId* nccl_id); + + void SendNCCLID(const std::string& endpoint, ncclUniqueId* nccl_id); +}; +#endif + +} // namespace imperative +} // namespace paddle diff --git a/paddle/fluid/imperative/nccl_context_test.cc b/paddle/fluid/imperative/nccl_context_test.cc new file mode 100644 index 00000000000..74a74ebe921 --- /dev/null +++ b/paddle/fluid/imperative/nccl_context_test.cc @@ -0,0 +1,52 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/imperative/nccl_context.h" +#include "gtest/gtest.h" +#include "paddle/fluid/platform/device_context.h" + +namespace imperative = paddle::imperative; +namespace platform = paddle::platform; + +imperative::ParallelStrategy GetStrategy(int local_rank) { + std::vector eps = {"127.0.0.1:9866", "127.0.0.1:9867"}; + imperative::ParallelStrategy strategy; + strategy.trainer_endpoints_ = eps; + strategy.current_endpoint_ = eps[local_rank]; + strategy.nranks_ = 2; + strategy.local_rank_ = local_rank; + return strategy; +} + +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +void BcastNCCLId(int local_rank, ncclUniqueId *nccl_id) { + auto strategy = GetStrategy(local_rank); + platform::CUDAPlace gpu(local_rank); + imperative::NCCLParallelContext ctx(strategy, gpu); + ctx.BcastNCCLId(nccl_id, 0); +} + +TEST(BcastNCCLId, Run) { + ncclUniqueId nccl_id; + platform::dynload::ncclGetUniqueId(&nccl_id); + std::thread t(BcastNCCLId, 0, &nccl_id); + + ncclUniqueId recv_nccl_id; + BcastNCCLId(1, &recv_nccl_id); + + t.join(); + EXPECT_EQ(0, std::memcmp(nccl_id.internal, recv_nccl_id.internal, + NCCL_UNIQUE_ID_BYTES)); +} +#endif diff --git a/paddle/fluid/imperative/tracer.cc b/paddle/fluid/imperative/tracer.cc index 7c9d0af3ecd..7c495ddd682 100644 --- a/paddle/fluid/imperative/tracer.cc +++ b/paddle/fluid/imperative/tracer.cc @@ -177,7 +177,7 @@ std::set Tracer::Trace(OpBase* op, const VarBasePtrMap& inputs, current_vars_map[out->Name()] = out; } - VLOG(3) << "input var name: " << out->Name() + VLOG(3) << "output var name: " << out->Name() << " inited: " << out->var_->IsInitialized() << " stop_grad: " << out->IsStopGradient(); } @@ -215,6 +215,7 @@ std::set Tracer::Trace(OpBase* op, const VarBasePtrMap& inputs, framework::Scope scope; op->place_ = GetExpectedPlace(expected_place, inputs); + PreparedOp prepared_op = PreparedOp::Prepare(ctx, *op_kernel, op->place_); prepared_op.op.RuntimeInferShape(scope, op->place_, ctx); prepared_op.func( diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index c8a0aa58859..16365c1fd0b 100644 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -1,6 +1,6 @@ set(PYBIND_DEPS pybind python proto_desc memory executor async_executor fleet_wrapper prune feed_fetch_method pass_builder parallel_executor profiler layer scope_pool - tracer analysis_predictor imperative_profiler) + tracer analysis_predictor imperative_profiler nccl_context) if(WITH_PYTHON) list(APPEND PYBIND_DEPS py_func_op) diff --git a/paddle/fluid/pybind/imperative.cc b/paddle/fluid/pybind/imperative.cc index e9ed4e16443..265707f1bcc 100644 --- a/paddle/fluid/pybind/imperative.cc +++ b/paddle/fluid/pybind/imperative.cc @@ -29,7 +29,7 @@ namespace paddle { namespace pybind { // Bind Methods -void BindTracer(pybind11::module* m) { +void BindImperative(pybind11::module* m) { pybind11::class_(*m, "Tracer", "") .def("__init__", [](imperative::Tracer& self, framework::BlockDesc* root_block) { @@ -59,6 +59,47 @@ void BindTracer(pybind11::module* m) { }) .def("py_trace", &imperative::Tracer::PyTrace, pybind11::return_value_policy::take_ownership); + + // define parallel context + pybind11::class_ parallel_strategy( + *m, "ParallelStrategy", ""); + parallel_strategy.def(pybind11::init()) + .def_property( + "nranks", + [](const imperative::ParallelStrategy& self) { return self.nranks_; }, + [](imperative::ParallelStrategy& self, int nranks) { + self.nranks_ = nranks; + }) + .def_property("local_rank", + [](const imperative::ParallelStrategy& self) { + return self.local_rank_; + }, + [](imperative::ParallelStrategy& self, int local_rank) { + self.local_rank_ = local_rank; + }) + .def_property( + "trainer_endpoints", + [](const imperative::ParallelStrategy& self) { + return self.trainer_endpoints_; + }, + [](imperative::ParallelStrategy& self, std::vector eps) { + self.trainer_endpoints_ = eps; + }) + .def_property("current_endpoint", + [](const imperative::ParallelStrategy& self) { + return self.current_endpoint_; + }, + [](imperative::ParallelStrategy& self, + const std::string& ep) { self.current_endpoint_ = ep; }); +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) + pybind11::class_ nccl_ctx( + *m, "NCCLParallelContext"); + + nccl_ctx + .def(pybind11::init()) + .def("init", [](imperative::NCCLParallelContext& self) { self.Init(); }); +#endif } } // namespace pybind diff --git a/paddle/fluid/pybind/imperative.h b/paddle/fluid/pybind/imperative.h index 8496cbfcb18..f9d4a7c990e 100644 --- a/paddle/fluid/pybind/imperative.h +++ b/paddle/fluid/pybind/imperative.h @@ -17,6 +17,7 @@ limitations under the License. */ #include #include #include "paddle/fluid/imperative/layer.h" +#include "paddle/fluid/imperative/nccl_context.h" #include "pybind11/pybind11.h" #include "pybind11/stl.h" @@ -46,7 +47,7 @@ class PyVarBase : public imperative::VarBase { using imperative::VarBase::VarBase; // Inherit constructors }; -void BindTracer(pybind11::module* m); +void BindImperative(pybind11::module* m); } // namespace pybind } // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 044677fb756..8c34e3efe2a 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -288,7 +288,7 @@ PYBIND11_MODULE(core, m) { }) .def_static("num_funcs", &imperative::PyLayer::NumFuncs); - BindTracer(&m); + BindImperative(&m); py::class_(m, "Tensor", py::buffer_protocol()) .def_buffer( diff --git a/python/paddle/distributed/launch.py b/python/paddle/distributed/launch.py index 03c4078775d..d8153fa0026 100644 --- a/python/paddle/distributed/launch.py +++ b/python/paddle/distributed/launch.py @@ -32,6 +32,7 @@ default_envs = { "NCCL_SOCKET_IFNAME": "eth0", "NCCL_IB_GID_INDEX": "3", "NCCL_IB_RETRY_CNT": "0", + "PYTHONPATH": os.getenv("PYTHONPATH", ""), } GPUS = 8 diff --git a/python/paddle/fluid/dygraph/__init__.py b/python/paddle/fluid/dygraph/__init__.py index 2d0c7b7ddaa..9bb72ede304 100644 --- a/python/paddle/fluid/dygraph/__init__.py +++ b/python/paddle/fluid/dygraph/__init__.py @@ -29,6 +29,9 @@ from .tracer import * from . import profiler from .profiler import * +from . import parallel +from .parallel import * + from . import checkpoint from .checkpoint import * @@ -41,5 +44,6 @@ __all__ += base.__all__ __all__ += nn.__all__ __all__ += tracer.__all__ __all__ += profiler.__all__ +__all__ += parallel.__all__ __all__ += checkpoint.__all__ __all__ += learning_rate_scheduler.__all__ diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py new file mode 100644 index 00000000000..f7decac963f --- /dev/null +++ b/python/paddle/fluid/dygraph/parallel.py @@ -0,0 +1,60 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except jin compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +from .. import core + +__all__ = ["prepare_context"] + +ParallelStrategy = core.ParallelStrategy + +__parallel_ctx__clz__ = None + + +def prepare_context(parallel_strategy, place): + global __parallel_ctx__clz__ + assert __parallel_ctx__clz__ is None, "ParallelContext can only be initialized once." + + if isinstance(place, core.CUDAPlace): + __parallel_ctx__clz__ = core.NCCLParallelContext(parallel_strategy, + place) + else: + # TODO(Yancey1989): add Gloo Parallel Context to support CPU parallel computation + assert ("Only support CUDAPlace for now.") + __parallel_ctx__clz__.init() + + +class Env(object): + def __init__(self): + self._nranks = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) + self._local_rank = int(os.getenv("PADDLE_TRAINER_ID", "0")) + self._dev_id = int(os.getenv("FLAGS_selected_gpus", "0")) + self._trainer_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS", + "").split(",") + self._current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT", "") + + @property + def nranks(self): + return self._nranks + + @property + def local_rank(self): + return self._local_rank + + @property + def dev_id(self): + return self._dev_id + + @property + def current_endpoint(self): + return self._current_endpoint -- GitLab