diff --git a/paddle/fluid/distributed/collective/ProcessGroup.cc b/paddle/fluid/distributed/collective/ProcessGroup.cc index 6fec3a41e1047edca6409f61a2286633097160be..e6d9975f75db67a7d54359835e1b6d7eaf7f207f 100644 --- a/paddle/fluid/distributed/collective/ProcessGroup.cc +++ b/paddle/fluid/distributed/collective/ProcessGroup.cc @@ -35,8 +35,9 @@ bool ProcessGroup::Task::Wait(std::chrono::milliseconds timeout) { void ProcessGroup::Task::Synchronize() {} -ProcessGroup::ProcessGroup(int rank, int size, int gid) - : rank_(rank), size_(size), gid_(gid) { +ProcessGroup::ProcessGroup(int rank, int size, const platform::Place& place, + int gid) + : rank_(rank), size_(size), place_(place), gid_(gid) { if (gid != IGNORE_ID) { auto map = ProcessGroupMapFromGid::getInstance(); map->insert(gid_, this); diff --git a/paddle/fluid/distributed/collective/ProcessGroup.h b/paddle/fluid/distributed/collective/ProcessGroup.h index fbc9c1f4762026bcd32c4bc2d38a832927176bab..fca395c5f2bf71b8446ecf15c1290f7a2f44436c 100644 --- a/paddle/fluid/distributed/collective/ProcessGroup.h +++ b/paddle/fluid/distributed/collective/ProcessGroup.h @@ -69,7 +69,8 @@ class ProcessGroup { bool is_completed_ = false; }; - explicit ProcessGroup(int rank, int size, int gid); + explicit ProcessGroup(int rank, int size, const platform::Place& place, + int gid); virtual ~ProcessGroup() {} int GetRank() const { return rank_; } @@ -145,6 +146,7 @@ class ProcessGroup { protected: const int rank_; const int size_; + const platform::Place place_; const int gid_; }; diff --git a/paddle/fluid/distributed/collective/ProcessGroupGloo.cc b/paddle/fluid/distributed/collective/ProcessGroupGloo.cc index 6ddea74d95db6bafebae4808878e6a5d9a59376d..824341c3cd97d5fbd21f809dcdd925aa2afee3e4 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupGloo.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupGloo.cc @@ -165,8 +165,9 @@ ProcessGroupGloo::GlooTask::GlooTask( ProcessGroupGloo::ProcessGroupGloo( const std::shared_ptr& store, int rank, int world_size, - int gid, const std::shared_ptr options) - : ProcessGroup(rank, world_size, gid), + const platform::Place& place, int gid, + const std::shared_ptr options) + : ProcessGroup(rank, world_size, place, gid), _tag(0), _store(new GlooStore(store)) { _context = std::make_shared(rank, world_size); diff --git a/paddle/fluid/distributed/collective/ProcessGroupGloo.h b/paddle/fluid/distributed/collective/ProcessGroupGloo.h index 335ca1bd17f2c4ded5c46d12019b219f13e202f1..1eb8b47a0922344e1483472a9807bb7d243fb63a 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupGloo.h +++ b/paddle/fluid/distributed/collective/ProcessGroupGloo.h @@ -102,7 +102,8 @@ class ProcessGroupGloo : public ProcessGroup { explicit ProcessGroupGloo( const std::shared_ptr& store, int rank, - int world_size, int gid, std::shared_ptr options); + int world_size, const platform::Place& place, int gid, + std::shared_ptr options); ~ProcessGroupGloo() = default; diff --git a/paddle/fluid/distributed/collective/ProcessGroupHCCL.cc b/paddle/fluid/distributed/collective/ProcessGroupHCCL.cc index 55ecdaaf6bfb7ad887212be2c1a7870e87d62a8f..9ed6c2198df4c433ff7deceaf56d96217fac9647 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupHCCL.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupHCCL.cc @@ -17,6 +17,7 @@ #include "paddle/fluid/distributed/collective/HCCLTools.h" #include "paddle/fluid/memory/malloc.h" #include "paddle/fluid/platform/device/npu/hccl_helper.h" +#include "paddle/fluid/platform/device/npu/npu_info.h" #include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/place.h" #include "paddle/phi/api/include/api.h" @@ -97,8 +98,11 @@ bool ProcessGroupHCCL::HCCLTask::Wait(std::chrono::milliseconds timeout) { void ProcessGroupHCCL::HCCLTask::Synchronize() { Wait(kWaitTimeout); } ProcessGroupHCCL::ProcessGroupHCCL(const std::shared_ptr& store, - int rank, int size, int gid) - : ProcessGroup(rank, size, gid), store_(store) {} + int rank, int size, + const platform::Place& place, int gid) + : ProcessGroup(rank, size, place, gid), store_(store) { + platform::SetNPUDeviceId(place_.device); +} void ProcessGroupHCCL::BroadcastUniqueHCCLID( std::vector& hccl_ids) { // NOLINT diff --git a/paddle/fluid/distributed/collective/ProcessGroupHCCL.h b/paddle/fluid/distributed/collective/ProcessGroupHCCL.h index f3d3fa2f8a72a25ddda37dc61682b35e7972a282..2f0ff6b9565ea7bf5201e3b0aee21c73118382fe 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupHCCL.h +++ b/paddle/fluid/distributed/collective/ProcessGroupHCCL.h @@ -71,7 +71,7 @@ class ProcessGroupHCCL : public ProcessGroup { }; ProcessGroupHCCL(const std::shared_ptr& store, int rank, int size, - int gid); + const platform::Place& place, int gid); const std::string GetBackendName() const override { return std::string(HCCL_BACKEND_NAME); diff --git a/paddle/fluid/distributed/collective/ProcessGroupHeter.cc b/paddle/fluid/distributed/collective/ProcessGroupHeter.cc index 354a8e23ae41fa0130ea5c8ab19b33b615e58255..ef57bb5ba232cffc23643f9c199bc96d1d28ae61 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupHeter.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupHeter.cc @@ -44,13 +44,11 @@ bool ProcessGroupHeter::HeterTask::Wait(std::chrono::milliseconds timeout) { return true; } -ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr& store, - int rank, int size, int gid, - int local_rank, int local_size, - int gloo_rank, int gloo_size, - bool with_switch, - std::string switch_endpoint) - : ProcessGroup(rank, size, gid), +ProcessGroupHeter::ProcessGroupHeter( + const std::shared_ptr& store, int rank, int size, + const platform::Place& place, int gid, int local_rank, int local_size, + int gloo_rank, int gloo_size, bool with_switch, std::string switch_endpoint) + : ProcessGroup(rank, size, place, gid), store_(store), local_rank_(local_rank), local_size_(local_size), @@ -60,10 +58,10 @@ ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr& store, switch_endpoint_(switch_endpoint) { #if defined(PADDLE_WITH_NCCL) inner_pg_ = std::make_shared(store, local_rank, local_size, - IGNORE_ID); + place_, IGNORE_ID); #elif defined(PADDLE_WITH_ASCEND_CL) inner_pg_ = std::make_shared(store, local_rank, local_size, - IGNORE_ID); + place_, IGNORE_ID); #else PADDLE_THROW(platform::errors::Fatal( "ProcessGroupHeter only supports NCCL and HCCL now."); @@ -71,8 +69,8 @@ ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr& store, if (local_rank_ == 0 && !with_switch_) { auto opts = ProcessGroupGloo::GlooOptions::create(); opts->device = ProcessGroupGloo::createDefaultDevice(); - inter_pg_ = std::make_shared(store, gloo_rank_, - gloo_size_, IGNORE_ID, opts); + inter_pg_ = std::make_shared( + store, gloo_rank_, gloo_size_, place_, IGNORE_ID, opts); } } diff --git a/paddle/fluid/distributed/collective/ProcessGroupHeter.h b/paddle/fluid/distributed/collective/ProcessGroupHeter.h index 05bacd93d7815aa12914df72cdf18354787249cc..640acdfb6a23ba3fd0b9a5cb755ca6429d598488 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupHeter.h +++ b/paddle/fluid/distributed/collective/ProcessGroupHeter.h @@ -81,9 +81,9 @@ class ProcessGroupHeter : public ProcessGroup { }; ProcessGroupHeter(const std::shared_ptr& store, int rank, int size, - int gid, int local_rank, int local_size, int gloo_rank, - int gloo_size, bool with_switch, - std::string switch_endpoints); + const platform::Place& place, int gid, int local_rank, + int local_size, int gloo_rank, int gloo_size, + bool with_switch, std::string switch_endpoints); const std::string GetBackendName() const override { return std::string(HETER_BACKEND_NAME); diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc index 30813b904df5303e63a85135028e6e077d6e9889..12de7d116e2b5d4ed2e669db60303a8dc77dae64 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc @@ -14,6 +14,7 @@ #include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h" #include "paddle/fluid/distributed/collective/Common.h" +#include "paddle/fluid/platform/device/gpu/gpu_info.h" #include "paddle/fluid/platform/device/gpu/nccl_helper.h" #include "paddle/fluid/platform/place.h" #include "paddle/phi/api/include/api.h" @@ -103,8 +104,11 @@ bool ProcessGroupNCCL::NCCLTask::Wait(std::chrono::milliseconds timeout) { void ProcessGroupNCCL::NCCLTask::Synchronize() { Wait(kWaitTimeout); } ProcessGroupNCCL::ProcessGroupNCCL(const std::shared_ptr& store, - int rank, int size, int gid) - : ProcessGroup(rank, size, gid), store_(store) {} + int rank, int size, + const platform::Place& place, int gid) + : ProcessGroup(rank, size, place, gid), store_(store) { + platform::SetDeviceId(place_.device); +} void ProcessGroupNCCL::BroadcastUniqueNCCLID( std::vector& nccl_ids) { // NOLINT diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h index cca84285ef4de1ad57efd38d910612757c834191..4b6c3f4031354d8d6530206d034595d07c361cda 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h @@ -77,7 +77,7 @@ class ProcessGroupNCCL : public ProcessGroup { }; ProcessGroupNCCL(const std::shared_ptr& store, int rank, int size, - int gid); + const platform::Place& place, int gid); const std::string GetBackendName() const override { return std::string(NCCL_BACKEND_NAME); diff --git a/paddle/fluid/pybind/distributed_py.cc b/paddle/fluid/pybind/distributed_py.cc index 716cd35f0a614b09a3e093b61c0fc4a19a681ec7..ab8bf0529dcfc006a1d29b1c78accbea1f20d765 100644 --- a/paddle/fluid/pybind/distributed_py.cc +++ b/paddle/fluid/pybind/distributed_py.cc @@ -241,49 +241,42 @@ void BindDistributed(py::module *m) { std::shared_ptr>( *m, "ProcessGroupNCCL", ProcessGroup) .def(py::init &, int, int, - int>(), + const platform::CUDAPlace &, int>(), py::arg("store"), py::arg("rank"), py::arg("world_size"), - py::arg("group_id") = 0, py::call_guard()); + py::arg("place"), py::arg("group_id") = 0, + py::call_guard()); +#endif #if defined(PADDLE_WITH_GLOO) && defined(PADDLE_WITH_PSCORE) && \ (defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_ASCEND_CL)) py::class_>( *m, "ProcessGroupHeter", ProcessGroup) - .def(py::init &, int, int, int, - int, int, int, int, bool, std::string>(), + .def(py::init &, int, int, +#if defined(PADDLE_WITH_ASCEND_CL) + const platform::NPUPlace &, +#else + const platform::CUDAPlace &, +#endif + int, int, int, int, int, bool, std::string>(), py::arg("store"), py::arg("rank"), py::arg("world_size"), - py::arg("gid") = 0, py::arg("local_rank") = 0, + py::arg("place"), py::arg("gid") = 0, py::arg("local_rank") = 0, py::arg("local_size") = 1, py::arg("gloo_rank") = 0, py::arg("gloo_size") = 1, py::arg("with_switch") = false, py::arg("switch_endpoint") = "", py::call_guard()); #endif -#endif #if defined(PADDLE_WITH_ASCEND_CL) py::class_>( *m, "ProcessGroupHCCL", ProcessGroup) .def(py::init &, int, int, - int>(), - py::arg("store"), py::arg("rank"), py::arg("world_size"), - py::arg("group_id") = 0, py::call_guard()); - -#if defined(PADDLE_WITH_GLOO) && defined(PADDLE_WITH_PSCORE) && \ - (defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_ASCEND_CL)) - py::class_>( - *m, "ProcessGroupHeter", ProcessGroup) - .def(py::init &, int, int, int, - int, int, int, int, bool, std::string>(), + const platform::NPUPlace &, int>(), py::arg("store"), py::arg("rank"), py::arg("world_size"), - py::arg("gid") = 0, py::arg("local_rank") = 0, - py::arg("local_size") = 1, py::arg("gloo_rank") = 0, - py::arg("gloo_rank") = 1, py::arg("with_switch") = false, - py::arg("switch_endpoint") = "", + py::arg("place"), py::arg("group_id") = 0, py::call_guard()); -#endif + #endif py::class_>( *m, "ProcessGroupGloo", ProcessGroup) .def(py::init &, int, - int, int, std::shared_ptr &>(), + int, const platform::CPUPlace &, int, + std::shared_ptr &>(), py::call_guard()) .def(py::init([](const std::shared_ptr &store, - int rank, int world_size, int gid) { + int rank, int world_size, + const platform::CPUPlace &place, int gid) { auto opts = GlooOptions::create(); char *ifname = getenv(GLOO_SOCKET_IFNAME_ENV.c_str()); if (ifname && strlen(ifname) > 1) { @@ -312,10 +307,11 @@ void BindDistributed(py::module *m) { opts->device = ProcessGroupGloo::createDefaultDevice(); } return std::make_shared(store, rank, world_size, - gid, opts); + place, gid, opts); }), py::arg("store"), py::arg("rank"), py::arg("world_size"), - py::arg("group_id") = 0, py::call_guard()) + py::arg("place"), py::arg("group_id") = 0, + py::call_guard()) .def_static("create_default_device", &ProcessGroupGloo::createDefaultDevice); #endif diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index 993b45b4eecf9ee826f9991d5daeeac150f7566a..35ab1193c2b0545ef08ca5ed63196139b1a76711 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -228,14 +228,23 @@ def _new_process_group_impl(backend, pg_options, group_id=0): pg = None + genv = _get_global_env() assert backend in _valid_backend_list, "Unsupported backend: %s." % backend if backend == "gloo": - pg = core.ProcessGroupGloo(store, rank, world_size, group_id) + place = core.CPUPlace() + pg = core.ProcessGroupGloo(store, rank, world_size, place, group_id) elif backend == "nccl": - pg = core.ProcessGroupNCCL(store, rank, world_size, group_id) + place = core.CUDAPlace(genv.device_id) + pg = core.ProcessGroupNCCL(store, rank, world_size, place, group_id) elif backend == "hccl": - pg = core.ProcessGroupHCCL(store, rank, world_size, group_id) + place = core.NPUPlace(genv.device_id) + pg = core.ProcessGroupHCCL(store, rank, world_size, place, group_id) elif backend == "heter": + place = None + if core.is_compiled_with_cuda(): + place = core.CUDAPlace(genv.device_id) + elif core.is_compiled_with_npu(): + place = core.NPUPlace(genv.device_id) cluster_id = int(os.getenv("CLUSTER_ID", "-1")) assert cluster_id >= 0, "please set the CLUSTER_ID variable." cluster_size = os.getenv("CLUSTER_SIZE", None) @@ -253,6 +262,7 @@ def _new_process_group_impl(backend, store, rank=global_rank, world_size=global_world_size, + place=place, gid=0, local_rank=rank, local_size=world_size, diff --git a/python/paddle/fluid/tests/unittests/process_group_gloo.py b/python/paddle/fluid/tests/unittests/process_group_gloo.py index 03886ab8a147f8d810ebd9b26230ba76ec695916..9be8a35f1ae1b28e5f9986b8c38a484fc6d0153f 100644 --- a/python/paddle/fluid/tests/unittests/process_group_gloo.py +++ b/python/paddle/fluid/tests/unittests/process_group_gloo.py @@ -47,7 +47,8 @@ class TestProcessGroupFp32(unittest.TestCase): is_master = True if rank == 0 else False store = paddle.fluid.core.TCPStore("127.0.0.1", 6272, is_master, nranks, datetime.timedelta(0)) - pg = paddle.fluid.core.ProcessGroupGloo(store, rank, nranks) + place = paddle.fluid.core.CPUPlace() + pg = paddle.fluid.core.ProcessGroupGloo(store, rank, nranks, place) # test allreduce sum # rank 0