diff --git a/paddle/fluid/distributed/collective/ProcessGroup.h b/paddle/fluid/distributed/collective/ProcessGroup.h index 949dd62deb53d8ab848da56644461d1e7fdcbf79..50d2807202d356c100eb2887e8ae9bde1b748774 100644 --- a/paddle/fluid/distributed/collective/ProcessGroup.h +++ b/paddle/fluid/distributed/collective/ProcessGroup.h @@ -83,15 +83,14 @@ class ProcessGroup { }; public: + explicit ProcessGroup(int rank, int size, int gid); + virtual ~ProcessGroup() = default; + // TODO(dev): This constructor will be removed later. explicit ProcessGroup(int rank, int size, const platform::Place& place, int gid); - explicit ProcessGroup(int rank, int size, int gid); - - virtual ~ProcessGroup() {} - int GetRank() const { return rank_; } int GetSize() const { return size_; } diff --git a/paddle/fluid/distributed/collective/ProcessGroupBKCL.cc b/paddle/fluid/distributed/collective/ProcessGroupBKCL.cc index 40f2172b374ca2e882e2ea6028ff7cf4d29ef5ce..d9b6d490a55708363e0d5d2bf85659904fdba3a8 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupBKCL.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupBKCL.cc @@ -20,6 +20,7 @@ #include "paddle/fluid/platform/device/xpu/xpu_info.h" #include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/place.h" +#include "paddle/phi/core/errors.h" namespace paddle { namespace distributed { @@ -68,11 +69,8 @@ void ProcessGroupBKCL::BKCLTask::Synchronize() { Wait(kWaitTimeout); } ProcessGroupBKCL::ProcessGroupBKCL(const std::shared_ptr& store, int rank, int size, - const platform::Place& place, int gid) - : ProcessGroupStream(rank, size, place, gid), store_(store) { - platform::SetXPUDeviceId(place_.device); -} + : ProcessGroupStream(rank, size, gid), store_(store) {} void ProcessGroupBKCL::GroupStart() { PADDLE_ENFORCE_XPU_SUCCESS(bkcl_group_start()); @@ -255,8 +253,13 @@ std::shared_ptr ProcessGroupBKCL::AllGather( std::shared_ptr ProcessGroupBKCL::Barrier( const BarrierOptions& opts) { + PADDLE_ENFORCE_GE(opts.device_id, + 0, + platform::errors::PreconditionNotMet( + "The barrier device id must greater or equal than 0.")); + platform::XPUPlace place(opts.device_id); auto allocator = std::unique_ptr( - new paddle::experimental::DefaultAllocator(place_)); + new paddle::experimental::DefaultAllocator(place)); phi::DenseTensorMeta meta(phi::DataType::FLOAT32, phi::DDim{1}); phi::DenseTensor barrier_tensor{allocator.get(), meta}; diff --git a/paddle/fluid/distributed/collective/ProcessGroupBKCL.h b/paddle/fluid/distributed/collective/ProcessGroupBKCL.h index 0041d903de78af7ea02c3efa89e5b8c506cd81ff..6d457c88a8e770aec564b25eaeb271531f183d1e 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupBKCL.h +++ b/paddle/fluid/distributed/collective/ProcessGroupBKCL.h @@ -71,7 +71,6 @@ class ProcessGroupBKCL : public ProcessGroupStream { ProcessGroupBKCL(const std::shared_ptr& store, int rank, int size, - const platform::Place& place, int gid); std::string GetBackendName() const override { diff --git a/paddle/fluid/distributed/collective/ProcessGroupCustom.cc b/paddle/fluid/distributed/collective/ProcessGroupCustom.cc index 87bd474477eb991f169b2067e870761154a07c34..d71a8b975e46e120b206aaae0e18e2bfc73ad2b6 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupCustom.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupCustom.cc @@ -98,15 +98,11 @@ bool ProcessGroupCustom::CustomTask::Wait(std::chrono::milliseconds timeout) { void ProcessGroupCustom::CustomTask::Synchronize() { Wait(kWaitTimeout); } ProcessGroupCustom::ProcessGroupCustom(const std::shared_ptr& store, + const std::string& device_type, int rank, int size, - const platform::Place& place, int gid) - : ProcessGroup(rank, size, place, gid), - store_(store), - device_type_(place.GetDeviceType()) { - phi::DeviceManager::SetDevice(place_); -} + : ProcessGroup(rank, size, gid), store_(store), device_type_(device_type) {} void ProcessGroupCustom::BroadcastUniqueCustomID( std::vector& ccl_ids) { // NOLINT @@ -379,7 +375,12 @@ std::shared_ptr ProcessGroupCustom::Broadcast( std::shared_ptr ProcessGroupCustom::Barrier( const BarrierOptions& opts) { // Only support single card single process - std::vector places = {place_}; + PADDLE_ENFORCE_GE(opts.device_id, + 0, + platform::errors::PreconditionNotMet( + "The barrier device id must greater or equal than 0.")); + platform::CustomPlace place(device_type_, opts.device_id); + std::vector places = {place}; std::vector barrierTensors; barrierTensors.reserve(places.size()); diff --git a/paddle/fluid/distributed/collective/ProcessGroupCustom.h b/paddle/fluid/distributed/collective/ProcessGroupCustom.h index 15d6193237d879c3616975a22d6436dc9cbcd430..b74d0c70de623d9dcbbaa6b958809225cc426db0 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupCustom.h +++ b/paddle/fluid/distributed/collective/ProcessGroupCustom.h @@ -64,9 +64,9 @@ class ProcessGroupCustom : public ProcessGroup { }; ProcessGroupCustom(const std::shared_ptr& store, + const std::string& device_type, int rank, int size, - const platform::Place& place, int gid); std::string GetBackendName() const override { return "XCCL_" + device_type_; } diff --git a/paddle/fluid/distributed/collective/ProcessGroupGloo.cc b/paddle/fluid/distributed/collective/ProcessGroupGloo.cc index 5cb4daf728b33d25096e6ea1a68baacae1130fde..2d6d4c88dd4d09f9499b83e9a3f0daf6ba48b3bf 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupGloo.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupGloo.cc @@ -180,10 +180,9 @@ ProcessGroupGloo::ProcessGroupGloo( const std::shared_ptr& store, int rank, int world_size, - const platform::Place& place, int gid, const std::shared_ptr options) - : ProcessGroup(rank, world_size, place, gid), + : ProcessGroup(rank, world_size, gid), _tag(0), _store(new GlooStore(store)) { _context = std::make_shared(rank, world_size); diff --git a/paddle/fluid/distributed/collective/ProcessGroupGloo.h b/paddle/fluid/distributed/collective/ProcessGroupGloo.h index 9796f916639540c69562fcd21063209364432d30..4e2c0eff12c82cbc37331ce4f8be006865ee8fa9 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupGloo.h +++ b/paddle/fluid/distributed/collective/ProcessGroupGloo.h @@ -102,7 +102,6 @@ class ProcessGroupGloo : public ProcessGroup { const std::shared_ptr& store, int rank, int world_size, - const platform::Place& place, int gid, std::shared_ptr options); diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc index 3748f22ebe52d0484117783d5cf90ceb6cc9e7a2..a1f7754a5790085fc0a88bb318acf9723a9b4a7d 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc @@ -16,6 +16,7 @@ #include "paddle/fluid/distributed/collective/Common.h" #include "paddle/fluid/platform/device/gpu/nccl_helper.h" +#include "paddle/fluid/platform/place.h" #include "paddle/phi/api/lib/utils/allocator.h" DECLARE_bool(nccl_blocking_wait); @@ -81,11 +82,8 @@ void ProcessGroupNCCL::NCCLTask::Synchronize() { Wait(kWaitTimeout); } ProcessGroupNCCL::ProcessGroupNCCL(const std::shared_ptr& store, int rank, int size, - const platform::Place& place, int gid) - : ProcessGroupStream(rank, size, place, gid), store_(store) { - platform::SetDeviceId(place_.device); -} + : ProcessGroupStream(rank, size, gid), store_(store) {} void ProcessGroupNCCL::GroupStart() { PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclGroupStart()); @@ -182,8 +180,13 @@ std::shared_ptr ProcessGroupNCCL::AllReduce( std::shared_ptr ProcessGroupNCCL::Barrier( const BarrierOptions& opts) { + PADDLE_ENFORCE_GE(opts.device_id, + 0, + platform::errors::PreconditionNotMet( + "The barrier device id must greater or equal than 0.")); + platform::CUDAPlace place(opts.device_id); auto allocator = std::unique_ptr( - new paddle::experimental::DefaultAllocator(place_)); + new paddle::experimental::DefaultAllocator(place)); phi::DenseTensorMeta meta(phi::DataType::FLOAT32, phi::DDim{1}); phi::DenseTensor barrier_tensor{allocator.get(), meta}; diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h index 54ac390231734a51f905aa205153e665008d102b..7933636e3d17bc2ce79caf485a94c9f04b7c8a69 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h @@ -85,7 +85,6 @@ class ProcessGroupNCCL final : public ProcessGroupStream { ProcessGroupNCCL(const std::shared_ptr& store, int rank, int size, - const platform::Place& place, int gid); std::string GetBackendName() const override { return "NCCL"; } diff --git a/paddle/fluid/distributed/collective/ProcessGroupStream.cc b/paddle/fluid/distributed/collective/ProcessGroupStream.cc index 0c191428502ddbe6e071904ce554830d191bc271..7fd01576fabe0ba4d1bf020a37dfd41f44fcb96e 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupStream.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupStream.cc @@ -17,11 +17,8 @@ namespace paddle { namespace distributed { -ProcessGroupStream::ProcessGroupStream(int rank, - int size, - const platform::Place& place, - int gid) - : ProcessGroup(rank, size, place, gid) {} +ProcessGroupStream::ProcessGroupStream(int rank, int size, int gid) + : ProcessGroup(rank, size, gid) {} const phi::DeviceContext& ProcessGroupStream::GetDeviceContext( const Place& place, bool use_calc_stream) const { diff --git a/paddle/fluid/distributed/collective/ProcessGroupStream.h b/paddle/fluid/distributed/collective/ProcessGroupStream.h index ec1a3391911feeeb4c8ca70cf281d8735c5300e0..fd68f6db5e360ed07877bbcad64e5a204a257c3e 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupStream.h +++ b/paddle/fluid/distributed/collective/ProcessGroupStream.h @@ -55,7 +55,7 @@ class ProcessGroupStream : public ProcessGroup { }; public: - ProcessGroupStream(int rank, int size, const platform::Place& place, int gid); + ProcessGroupStream(int rank, int size, int gid); virtual ~ProcessGroupStream() = default; virtual const phi::DeviceContext& GetDeviceContext( diff --git a/paddle/fluid/distributed/collective/Types.h b/paddle/fluid/distributed/collective/Types.h index 0ce92111f6a13579cf61dbce90565333affeb723..11628ea1f052aecc3003e9c7e5aa034099b07729 100644 --- a/paddle/fluid/distributed/collective/Types.h +++ b/paddle/fluid/distributed/collective/Types.h @@ -16,6 +16,7 @@ #include #include #include +#include "paddle/phi/common/place.h" namespace paddle { namespace distributed { @@ -33,7 +34,7 @@ struct BroadcastOptions { }; struct BarrierOptions { - std::vector place_ids; + int8_t device_id; }; struct ReduceOptions { diff --git a/paddle/fluid/pybind/distributed_py.cc b/paddle/fluid/pybind/distributed_py.cc index de415393caacf4c0efb60de61bae789a9da630a5..9c7a89c395fa91b6fc51d2d613f657f5d572a65a 100644 --- a/paddle/fluid/pybind/distributed_py.cc +++ b/paddle/fluid/pybind/distributed_py.cc @@ -110,7 +110,7 @@ void BindDistributed(py::module *m) { py::class_(*m, "BarrierOptions") .def(py::init<>()) - .def_readwrite("place_ids", &distributed::BarrierOptions::place_ids); + .def_readwrite("device_id", &distributed::BarrierOptions::device_id); py::class_(*m, "ReduceOptions") .def(py::init<>()) @@ -513,12 +513,12 @@ void BindDistributed(py::module *m) { .def( "barrier", - [](distributed::ProcessGroup &self, std::vector place_ids) { + [](distributed::ProcessGroup &self, int8_t device_id) { distributed::BarrierOptions opts; - opts.place_ids = place_ids; + opts.device_id = device_id; return self.Barrier(opts); }, - py::arg("place_ids") = std::vector{}, + py::arg("device_id") = -1, py::call_guard()) // TODO(liyurui): Interface below will be removed in the future. @@ -1214,12 +1214,10 @@ void BindDistributed(py::module *m) { .def(py::init &, int, int, - const platform::CUDAPlace &, int>(), py::arg("store"), py::arg("rank"), py::arg("world_size"), - py::arg("place"), py::arg("group_id") = 0, py::call_guard()); @@ -1254,14 +1252,14 @@ void BindDistributed(py::module *m) { std::shared_ptr>( *m, "ProcessGroupCustom", ProcessGroup) .def(py::init &, + const std::string &, int, int, - const platform::CustomPlace &, int>(), py::arg("store"), + py::arg("device_type"), py::arg("rank"), py::arg("world_size"), - py::arg("place"), py::arg("group_id") = 0, py::call_guard()); @@ -1275,12 +1273,10 @@ void BindDistributed(py::module *m) { .def(py::init &, int, int, - const platform::XPUPlace &, int>(), py::arg("store"), py::arg("rank"), py::arg("world_size"), - py::arg("place"), py::arg("group_id") = 0, py::call_guard()); #endif @@ -1303,14 +1299,12 @@ void BindDistributed(py::module *m) { .def(py::init &, int, int, - const platform::CPUPlace &, int, std::shared_ptr &>(), py::call_guard()) .def(py::init([](const std::shared_ptr &store, int rank, int world_size, - const platform::CPUPlace &place, int gid) { auto opts = GlooOptions::create(); char *ifname = getenv(GLOO_SOCKET_IFNAME_ENV.c_str()); @@ -1321,12 +1315,11 @@ void BindDistributed(py::module *m) { opts->device = ProcessGroupGloo::createDefaultDevice(); } return std::make_shared( - store, rank, world_size, place, gid, opts); + store, rank, world_size, gid, opts); }), py::arg("store"), py::arg("rank"), py::arg("world_size"), - py::arg("place"), py::arg("group_id") = 0, py::call_guard()) .def_static("create_default_device", diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index c4e09a620fc108a24da7c01c58ede02ae135421c..4bdc473f9a0acee9afb6f9f86f70ac103ec4c07b 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -152,17 +152,15 @@ def _new_process_group_impl( genv = _get_global_env() assert backend in _valid_backend_list, "Unsupported backend: %s." % backend if backend == "gloo": - place = core.CPUPlace() - pg = core.ProcessGroupGloo(store, rank, world_size, place, group_id) + pg = core.ProcessGroupGloo(store, rank, world_size, group_id) elif backend == "nccl": - place = core.CUDAPlace(genv.device_id) - pg = core.ProcessGroupNCCL(store, rank, world_size, place, group_id) + pg = core.ProcessGroupNCCL(store, rank, world_size, group_id) elif backend == "xccl": - place = core.CustomPlace(genv.device_type, genv.device_id) - pg = core.ProcessGroupCustom(store, rank, world_size, place, group_id) + pg = core.ProcessGroupCustom( + store, genv.device_type, rank, world_size, group_id + ) elif backend == "bkcl": - place = core.XPUPlace(genv.device_id) - pg = core.ProcessGroupBKCL(store, rank, world_size, place, group_id) + pg = core.ProcessGroupBKCL(store, rank, world_size, group_id) return pg @@ -192,7 +190,12 @@ def barrier(group=None): if in_dygraph_mode(): group = _get_default_group() if group is None else group - task = group.process_group.barrier() + place = paddle.fluid.framework._current_expected_place() + if isinstance(place, paddle.fluid.core.CPUPlace): + task = group.process_group.barrier() + else: + device_id = place.get_device_id() + task = group.process_group.barrier(device_id) task.wait() return diff --git a/python/paddle/fluid/tests/custom_runtime/process_group_xccl.py b/python/paddle/fluid/tests/custom_runtime/process_group_xccl.py index 201b2d3df8657565f3edbee304dc848248aaaf52..1d3dfce9597a1ccb75e085f75e8bec05b8d1b2c6 100644 --- a/python/paddle/fluid/tests/custom_runtime/process_group_xccl.py +++ b/python/paddle/fluid/tests/custom_runtime/process_group_xccl.py @@ -30,9 +30,9 @@ def init_process_group(strategy=None): store = paddle.fluid.core.TCPStore("127.0.0.1", 6173, is_master, nranks) pg_group = core.ProcessGroupCustom( store, + ParallelEnv().device_type, rank, nranks, - paddle.CustomPlace(ParallelEnv().device_type, ParallelEnv().device_id), ) return pg_group @@ -51,9 +51,8 @@ class TestProcessGroupFp32(unittest.TestCase): def test_create_process_group_xccl(self): with _test_eager_guard(): - paddle.set_device( - 'custom_cpu:%d' % paddle.distributed.ParallelEnv().dev_id - ) + device_id = paddle.distributed.ParallelEnv().dev_id + paddle.set_device('custom_cpu:%d' % device_id) pg = init_process_group() @@ -119,11 +118,11 @@ class TestProcessGroupFp32(unittest.TestCase): # test barrier # rank 0 if pg.rank() == 0: - task = pg.barrier() + task = pg.barrier(device_id) task.wait() # rank 1 else: - task = pg.barrier() + task = pg.barrier(device_id) task.wait() print("test barrier api ok\n") diff --git a/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py b/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py index 3ace517991322174397e3767101fd0e929425299..f93adb60910b5255ee1ff3406b4ea9260be10c63 100644 --- a/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py +++ b/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py @@ -42,8 +42,7 @@ class TestProcessGroupFp32(unittest.TestCase): store = paddle.fluid.core.TCPStore( "127.0.0.1", 6272, is_master, nranks, 30 ) - place = paddle.fluid.core.CPUPlace() - pg = paddle.fluid.core.ProcessGroupGloo(store, rank, nranks, place) + pg = paddle.fluid.core.ProcessGroupGloo(store, rank, nranks) # test allreduce sum # rank 0 diff --git a/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py b/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py index ff949d8f14cf477d235efb78afd552bdab617abb..0303f469b301c0acf72d8f3a4bf9f73c50db68c7 100644 --- a/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py +++ b/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py @@ -44,9 +44,8 @@ class TestProcessGroupFp32(unittest.TestCase): def test_create_process_group_nccl(self): with _test_eager_guard(): - paddle.set_device( - 'gpu:%d' % paddle.distributed.ParallelEnv().dev_id - ) + device_id = paddle.distributed.ParallelEnv().dev_id + paddle.set_device('gpu:%d' % device_id) pg = init_process_group() print("rank:", pg.rank(), "size:", pg.size(), "name:", pg.name()) @@ -170,10 +169,10 @@ class TestProcessGroupFp32(unittest.TestCase): # test barrier # rank 0 if pg.rank() == 0: - dist.barrier() + pg.barrier(device_id) # rank 1 else: - task = pg.barrier() + task = pg.barrier(device_id) task.wait() print("test barrier api ok\n") diff --git a/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py b/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py index bb2cf6e1db7e00733b1b899d406cff7172a759e6..5ea3845c0bd18262264a85e2fc02fe85a54a79e9 100644 --- a/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py +++ b/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py @@ -20,7 +20,6 @@ import sys import paddle from paddle.fluid.framework import _test_eager_guard from paddle.fluid.dygraph.parallel import ParallelEnv -import paddle.distributed as dist def init_process_group(strategy=None): @@ -45,9 +44,8 @@ class TestProcessGroupFp32(unittest.TestCase): def test_create_process_group_bkcl(self): with _test_eager_guard(): - paddle.set_device( - 'xpu:%d' % paddle.distributed.ParallelEnv().dev_id - ) + device_id = paddle.distributed.ParallelEnv().dev_id + paddle.set_device('xpu:%d' % device_id) pg = init_process_group() sys.stdout.write( @@ -108,10 +106,10 @@ class TestProcessGroupFp32(unittest.TestCase): # test barrier # rank 0 if pg.rank() == 0: - dist.barrier() + pg.barrier(device_id) # rank 1 else: - task = pg.barrier() + task = pg.barrier(device_id) task.wait() sys.stdout.write("rank {}: test barrier api ok\n".format(pg.rank()))