From 2d383b811d9429eba1bd5c8278eae67b8dcea08e Mon Sep 17 00:00:00 2001 From: LiYuRio <63526175+LiYuRio@users.noreply.github.com> Date: Mon, 14 Nov 2022 20:53:31 +0800 Subject: [PATCH] Remove place for process group (#47857) --- .../distributed/collective/ProcessGroup.h | 7 +++---- .../collective/ProcessGroupBKCL.cc | 13 +++++++----- .../distributed/collective/ProcessGroupBKCL.h | 1 - .../collective/ProcessGroupCustom.cc | 15 ++++++------- .../collective/ProcessGroupCustom.h | 2 +- .../collective/ProcessGroupGloo.cc | 3 +-- .../distributed/collective/ProcessGroupGloo.h | 1 - .../collective/ProcessGroupNCCL.cc | 13 +++++++----- .../distributed/collective/ProcessGroupNCCL.h | 1 - .../collective/ProcessGroupStream.cc | 7 ++----- .../collective/ProcessGroupStream.h | 2 +- paddle/fluid/distributed/collective/Types.h | 3 ++- paddle/fluid/pybind/distributed_py.cc | 21 +++++++------------ python/paddle/distributed/collective.py | 21 +++++++++++-------- .../custom_runtime/process_group_xccl.py | 11 +++++----- .../collective/process_group_gloo.py | 3 +-- .../collective/process_group_nccl.py | 9 ++++---- .../tests/unittests/xpu/process_group_bkcl.py | 10 ++++----- 18 files changed, 67 insertions(+), 76 deletions(-) diff --git a/paddle/fluid/distributed/collective/ProcessGroup.h b/paddle/fluid/distributed/collective/ProcessGroup.h index 949dd62deb..50d2807202 100644 --- a/paddle/fluid/distributed/collective/ProcessGroup.h +++ b/paddle/fluid/distributed/collective/ProcessGroup.h @@ -83,15 +83,14 @@ class ProcessGroup { }; public: + explicit ProcessGroup(int rank, int size, int gid); + virtual ~ProcessGroup() = default; + // TODO(dev): This constructor will be removed later. explicit ProcessGroup(int rank, int size, const platform::Place& place, int gid); - explicit ProcessGroup(int rank, int size, int gid); - - virtual ~ProcessGroup() {} - int GetRank() const { return rank_; } int GetSize() const { return size_; } diff --git a/paddle/fluid/distributed/collective/ProcessGroupBKCL.cc b/paddle/fluid/distributed/collective/ProcessGroupBKCL.cc index 40f2172b37..d9b6d490a5 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupBKCL.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupBKCL.cc @@ -20,6 +20,7 @@ #include "paddle/fluid/platform/device/xpu/xpu_info.h" #include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/place.h" +#include "paddle/phi/core/errors.h" namespace paddle { namespace distributed { @@ -68,11 +69,8 @@ void ProcessGroupBKCL::BKCLTask::Synchronize() { Wait(kWaitTimeout); } ProcessGroupBKCL::ProcessGroupBKCL(const std::shared_ptr& store, int rank, int size, - const platform::Place& place, int gid) - : ProcessGroupStream(rank, size, place, gid), store_(store) { - platform::SetXPUDeviceId(place_.device); -} + : ProcessGroupStream(rank, size, gid), store_(store) {} void ProcessGroupBKCL::GroupStart() { PADDLE_ENFORCE_XPU_SUCCESS(bkcl_group_start()); @@ -255,8 +253,13 @@ std::shared_ptr ProcessGroupBKCL::AllGather( std::shared_ptr ProcessGroupBKCL::Barrier( const BarrierOptions& opts) { + PADDLE_ENFORCE_GE(opts.device_id, + 0, + platform::errors::PreconditionNotMet( + "The barrier device id must greater or equal than 0.")); + platform::XPUPlace place(opts.device_id); auto allocator = std::unique_ptr( - new paddle::experimental::DefaultAllocator(place_)); + new paddle::experimental::DefaultAllocator(place)); phi::DenseTensorMeta meta(phi::DataType::FLOAT32, phi::DDim{1}); phi::DenseTensor barrier_tensor{allocator.get(), meta}; diff --git a/paddle/fluid/distributed/collective/ProcessGroupBKCL.h b/paddle/fluid/distributed/collective/ProcessGroupBKCL.h index 0041d903de..6d457c88a8 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupBKCL.h +++ b/paddle/fluid/distributed/collective/ProcessGroupBKCL.h @@ -71,7 +71,6 @@ class ProcessGroupBKCL : public ProcessGroupStream { ProcessGroupBKCL(const std::shared_ptr& store, int rank, int size, - const platform::Place& place, int gid); std::string GetBackendName() const override { diff --git a/paddle/fluid/distributed/collective/ProcessGroupCustom.cc b/paddle/fluid/distributed/collective/ProcessGroupCustom.cc index 87bd474477..d71a8b975e 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupCustom.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupCustom.cc @@ -98,15 +98,11 @@ bool ProcessGroupCustom::CustomTask::Wait(std::chrono::milliseconds timeout) { void ProcessGroupCustom::CustomTask::Synchronize() { Wait(kWaitTimeout); } ProcessGroupCustom::ProcessGroupCustom(const std::shared_ptr& store, + const std::string& device_type, int rank, int size, - const platform::Place& place, int gid) - : ProcessGroup(rank, size, place, gid), - store_(store), - device_type_(place.GetDeviceType()) { - phi::DeviceManager::SetDevice(place_); -} + : ProcessGroup(rank, size, gid), store_(store), device_type_(device_type) {} void ProcessGroupCustom::BroadcastUniqueCustomID( std::vector& ccl_ids) { // NOLINT @@ -379,7 +375,12 @@ std::shared_ptr ProcessGroupCustom::Broadcast( std::shared_ptr ProcessGroupCustom::Barrier( const BarrierOptions& opts) { // Only support single card single process - std::vector places = {place_}; + PADDLE_ENFORCE_GE(opts.device_id, + 0, + platform::errors::PreconditionNotMet( + "The barrier device id must greater or equal than 0.")); + platform::CustomPlace place(device_type_, opts.device_id); + std::vector places = {place}; std::vector barrierTensors; barrierTensors.reserve(places.size()); diff --git a/paddle/fluid/distributed/collective/ProcessGroupCustom.h b/paddle/fluid/distributed/collective/ProcessGroupCustom.h index 15d6193237..b74d0c70de 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupCustom.h +++ b/paddle/fluid/distributed/collective/ProcessGroupCustom.h @@ -64,9 +64,9 @@ class ProcessGroupCustom : public ProcessGroup { }; ProcessGroupCustom(const std::shared_ptr& store, + const std::string& device_type, int rank, int size, - const platform::Place& place, int gid); std::string GetBackendName() const override { return "XCCL_" + device_type_; } diff --git a/paddle/fluid/distributed/collective/ProcessGroupGloo.cc b/paddle/fluid/distributed/collective/ProcessGroupGloo.cc index 5cb4daf728..2d6d4c88dd 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupGloo.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupGloo.cc @@ -180,10 +180,9 @@ ProcessGroupGloo::ProcessGroupGloo( const std::shared_ptr& store, int rank, int world_size, - const platform::Place& place, int gid, const std::shared_ptr options) - : ProcessGroup(rank, world_size, place, gid), + : ProcessGroup(rank, world_size, gid), _tag(0), _store(new GlooStore(store)) { _context = std::make_shared(rank, world_size); diff --git a/paddle/fluid/distributed/collective/ProcessGroupGloo.h b/paddle/fluid/distributed/collective/ProcessGroupGloo.h index 9796f91663..4e2c0eff12 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupGloo.h +++ b/paddle/fluid/distributed/collective/ProcessGroupGloo.h @@ -102,7 +102,6 @@ class ProcessGroupGloo : public ProcessGroup { const std::shared_ptr& store, int rank, int world_size, - const platform::Place& place, int gid, std::shared_ptr options); diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc index 3748f22ebe..a1f7754a57 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc @@ -16,6 +16,7 @@ #include "paddle/fluid/distributed/collective/Common.h" #include "paddle/fluid/platform/device/gpu/nccl_helper.h" +#include "paddle/fluid/platform/place.h" #include "paddle/phi/api/lib/utils/allocator.h" DECLARE_bool(nccl_blocking_wait); @@ -81,11 +82,8 @@ void ProcessGroupNCCL::NCCLTask::Synchronize() { Wait(kWaitTimeout); } ProcessGroupNCCL::ProcessGroupNCCL(const std::shared_ptr& store, int rank, int size, - const platform::Place& place, int gid) - : ProcessGroupStream(rank, size, place, gid), store_(store) { - platform::SetDeviceId(place_.device); -} + : ProcessGroupStream(rank, size, gid), store_(store) {} void ProcessGroupNCCL::GroupStart() { PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclGroupStart()); @@ -182,8 +180,13 @@ std::shared_ptr ProcessGroupNCCL::AllReduce( std::shared_ptr ProcessGroupNCCL::Barrier( const BarrierOptions& opts) { + PADDLE_ENFORCE_GE(opts.device_id, + 0, + platform::errors::PreconditionNotMet( + "The barrier device id must greater or equal than 0.")); + platform::CUDAPlace place(opts.device_id); auto allocator = std::unique_ptr( - new paddle::experimental::DefaultAllocator(place_)); + new paddle::experimental::DefaultAllocator(place)); phi::DenseTensorMeta meta(phi::DataType::FLOAT32, phi::DDim{1}); phi::DenseTensor barrier_tensor{allocator.get(), meta}; diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h index 54ac390231..7933636e3d 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h @@ -85,7 +85,6 @@ class ProcessGroupNCCL final : public ProcessGroupStream { ProcessGroupNCCL(const std::shared_ptr& store, int rank, int size, - const platform::Place& place, int gid); std::string GetBackendName() const override { return "NCCL"; } diff --git a/paddle/fluid/distributed/collective/ProcessGroupStream.cc b/paddle/fluid/distributed/collective/ProcessGroupStream.cc index 0c19142850..7fd01576fa 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupStream.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupStream.cc @@ -17,11 +17,8 @@ namespace paddle { namespace distributed { -ProcessGroupStream::ProcessGroupStream(int rank, - int size, - const platform::Place& place, - int gid) - : ProcessGroup(rank, size, place, gid) {} +ProcessGroupStream::ProcessGroupStream(int rank, int size, int gid) + : ProcessGroup(rank, size, gid) {} const phi::DeviceContext& ProcessGroupStream::GetDeviceContext( const Place& place, bool use_calc_stream) const { diff --git a/paddle/fluid/distributed/collective/ProcessGroupStream.h b/paddle/fluid/distributed/collective/ProcessGroupStream.h index ec1a339191..fd68f6db5e 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupStream.h +++ b/paddle/fluid/distributed/collective/ProcessGroupStream.h @@ -55,7 +55,7 @@ class ProcessGroupStream : public ProcessGroup { }; public: - ProcessGroupStream(int rank, int size, const platform::Place& place, int gid); + ProcessGroupStream(int rank, int size, int gid); virtual ~ProcessGroupStream() = default; virtual const phi::DeviceContext& GetDeviceContext( diff --git a/paddle/fluid/distributed/collective/Types.h b/paddle/fluid/distributed/collective/Types.h index 0ce92111f6..11628ea1f0 100644 --- a/paddle/fluid/distributed/collective/Types.h +++ b/paddle/fluid/distributed/collective/Types.h @@ -16,6 +16,7 @@ #include #include #include +#include "paddle/phi/common/place.h" namespace paddle { namespace distributed { @@ -33,7 +34,7 @@ struct BroadcastOptions { }; struct BarrierOptions { - std::vector place_ids; + int8_t device_id; }; struct ReduceOptions { diff --git a/paddle/fluid/pybind/distributed_py.cc b/paddle/fluid/pybind/distributed_py.cc index de415393ca..9c7a89c395 100644 --- a/paddle/fluid/pybind/distributed_py.cc +++ b/paddle/fluid/pybind/distributed_py.cc @@ -110,7 +110,7 @@ void BindDistributed(py::module *m) { py::class_(*m, "BarrierOptions") .def(py::init<>()) - .def_readwrite("place_ids", &distributed::BarrierOptions::place_ids); + .def_readwrite("device_id", &distributed::BarrierOptions::device_id); py::class_(*m, "ReduceOptions") .def(py::init<>()) @@ -513,12 +513,12 @@ void BindDistributed(py::module *m) { .def( "barrier", - [](distributed::ProcessGroup &self, std::vector place_ids) { + [](distributed::ProcessGroup &self, int8_t device_id) { distributed::BarrierOptions opts; - opts.place_ids = place_ids; + opts.device_id = device_id; return self.Barrier(opts); }, - py::arg("place_ids") = std::vector{}, + py::arg("device_id") = -1, py::call_guard()) // TODO(liyurui): Interface below will be removed in the future. @@ -1214,12 +1214,10 @@ void BindDistributed(py::module *m) { .def(py::init &, int, int, - const platform::CUDAPlace &, int>(), py::arg("store"), py::arg("rank"), py::arg("world_size"), - py::arg("place"), py::arg("group_id") = 0, py::call_guard()); @@ -1254,14 +1252,14 @@ void BindDistributed(py::module *m) { std::shared_ptr>( *m, "ProcessGroupCustom", ProcessGroup) .def(py::init &, + const std::string &, int, int, - const platform::CustomPlace &, int>(), py::arg("store"), + py::arg("device_type"), py::arg("rank"), py::arg("world_size"), - py::arg("place"), py::arg("group_id") = 0, py::call_guard()); @@ -1275,12 +1273,10 @@ void BindDistributed(py::module *m) { .def(py::init &, int, int, - const platform::XPUPlace &, int>(), py::arg("store"), py::arg("rank"), py::arg("world_size"), - py::arg("place"), py::arg("group_id") = 0, py::call_guard()); #endif @@ -1303,14 +1299,12 @@ void BindDistributed(py::module *m) { .def(py::init &, int, int, - const platform::CPUPlace &, int, std::shared_ptr &>(), py::call_guard()) .def(py::init([](const std::shared_ptr &store, int rank, int world_size, - const platform::CPUPlace &place, int gid) { auto opts = GlooOptions::create(); char *ifname = getenv(GLOO_SOCKET_IFNAME_ENV.c_str()); @@ -1321,12 +1315,11 @@ void BindDistributed(py::module *m) { opts->device = ProcessGroupGloo::createDefaultDevice(); } return std::make_shared( - store, rank, world_size, place, gid, opts); + store, rank, world_size, gid, opts); }), py::arg("store"), py::arg("rank"), py::arg("world_size"), - py::arg("place"), py::arg("group_id") = 0, py::call_guard()) .def_static("create_default_device", diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index c4e09a620f..4bdc473f9a 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -152,17 +152,15 @@ def _new_process_group_impl( genv = _get_global_env() assert backend in _valid_backend_list, "Unsupported backend: %s." % backend if backend == "gloo": - place = core.CPUPlace() - pg = core.ProcessGroupGloo(store, rank, world_size, place, group_id) + pg = core.ProcessGroupGloo(store, rank, world_size, group_id) elif backend == "nccl": - place = core.CUDAPlace(genv.device_id) - pg = core.ProcessGroupNCCL(store, rank, world_size, place, group_id) + pg = core.ProcessGroupNCCL(store, rank, world_size, group_id) elif backend == "xccl": - place = core.CustomPlace(genv.device_type, genv.device_id) - pg = core.ProcessGroupCustom(store, rank, world_size, place, group_id) + pg = core.ProcessGroupCustom( + store, genv.device_type, rank, world_size, group_id + ) elif backend == "bkcl": - place = core.XPUPlace(genv.device_id) - pg = core.ProcessGroupBKCL(store, rank, world_size, place, group_id) + pg = core.ProcessGroupBKCL(store, rank, world_size, group_id) return pg @@ -192,7 +190,12 @@ def barrier(group=None): if in_dygraph_mode(): group = _get_default_group() if group is None else group - task = group.process_group.barrier() + place = paddle.fluid.framework._current_expected_place() + if isinstance(place, paddle.fluid.core.CPUPlace): + task = group.process_group.barrier() + else: + device_id = place.get_device_id() + task = group.process_group.barrier(device_id) task.wait() return diff --git a/python/paddle/fluid/tests/custom_runtime/process_group_xccl.py b/python/paddle/fluid/tests/custom_runtime/process_group_xccl.py index 201b2d3df8..1d3dfce959 100644 --- a/python/paddle/fluid/tests/custom_runtime/process_group_xccl.py +++ b/python/paddle/fluid/tests/custom_runtime/process_group_xccl.py @@ -30,9 +30,9 @@ def init_process_group(strategy=None): store = paddle.fluid.core.TCPStore("127.0.0.1", 6173, is_master, nranks) pg_group = core.ProcessGroupCustom( store, + ParallelEnv().device_type, rank, nranks, - paddle.CustomPlace(ParallelEnv().device_type, ParallelEnv().device_id), ) return pg_group @@ -51,9 +51,8 @@ class TestProcessGroupFp32(unittest.TestCase): def test_create_process_group_xccl(self): with _test_eager_guard(): - paddle.set_device( - 'custom_cpu:%d' % paddle.distributed.ParallelEnv().dev_id - ) + device_id = paddle.distributed.ParallelEnv().dev_id + paddle.set_device('custom_cpu:%d' % device_id) pg = init_process_group() @@ -119,11 +118,11 @@ class TestProcessGroupFp32(unittest.TestCase): # test barrier # rank 0 if pg.rank() == 0: - task = pg.barrier() + task = pg.barrier(device_id) task.wait() # rank 1 else: - task = pg.barrier() + task = pg.barrier(device_id) task.wait() print("test barrier api ok\n") diff --git a/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py b/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py index 3ace517991..f93adb6091 100644 --- a/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py +++ b/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py @@ -42,8 +42,7 @@ class TestProcessGroupFp32(unittest.TestCase): store = paddle.fluid.core.TCPStore( "127.0.0.1", 6272, is_master, nranks, 30 ) - place = paddle.fluid.core.CPUPlace() - pg = paddle.fluid.core.ProcessGroupGloo(store, rank, nranks, place) + pg = paddle.fluid.core.ProcessGroupGloo(store, rank, nranks) # test allreduce sum # rank 0 diff --git a/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py b/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py index ff949d8f14..0303f469b3 100644 --- a/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py +++ b/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py @@ -44,9 +44,8 @@ class TestProcessGroupFp32(unittest.TestCase): def test_create_process_group_nccl(self): with _test_eager_guard(): - paddle.set_device( - 'gpu:%d' % paddle.distributed.ParallelEnv().dev_id - ) + device_id = paddle.distributed.ParallelEnv().dev_id + paddle.set_device('gpu:%d' % device_id) pg = init_process_group() print("rank:", pg.rank(), "size:", pg.size(), "name:", pg.name()) @@ -170,10 +169,10 @@ class TestProcessGroupFp32(unittest.TestCase): # test barrier # rank 0 if pg.rank() == 0: - dist.barrier() + pg.barrier(device_id) # rank 1 else: - task = pg.barrier() + task = pg.barrier(device_id) task.wait() print("test barrier api ok\n") diff --git a/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py b/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py index bb2cf6e1db..5ea3845c0b 100644 --- a/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py +++ b/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py @@ -20,7 +20,6 @@ import sys import paddle from paddle.fluid.framework import _test_eager_guard from paddle.fluid.dygraph.parallel import ParallelEnv -import paddle.distributed as dist def init_process_group(strategy=None): @@ -45,9 +44,8 @@ class TestProcessGroupFp32(unittest.TestCase): def test_create_process_group_bkcl(self): with _test_eager_guard(): - paddle.set_device( - 'xpu:%d' % paddle.distributed.ParallelEnv().dev_id - ) + device_id = paddle.distributed.ParallelEnv().dev_id + paddle.set_device('xpu:%d' % device_id) pg = init_process_group() sys.stdout.write( @@ -108,10 +106,10 @@ class TestProcessGroupFp32(unittest.TestCase): # test barrier # rank 0 if pg.rank() == 0: - dist.barrier() + pg.barrier(device_id) # rank 1 else: - task = pg.barrier() + task = pg.barrier(device_id) task.wait() sys.stdout.write("rank {}: test barrier api ok\n".format(pg.rank())) -- GitLab