diff --git a/paddle/fluid/distributed/collective/CMakeLists.txt b/paddle/fluid/distributed/collective/CMakeLists.txt index 49ba9479d49e93143665b8314d04ee8e0efcbf51..6fb805a72e4de1a1d319748fe76d8bca5409e5e9 100644 --- a/paddle/fluid/distributed/collective/CMakeLists.txt +++ b/paddle/fluid/distributed/collective/CMakeLists.txt @@ -6,8 +6,15 @@ if (WITH_DISTRIBUTE) endif() if(WITH_NCCL) - cc_library(processgroup_nccl SRCS ProcessGroupNCCL.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api) + cc_library(processgroup_nccl SRCS ProcessGroupNCCL.cc NCCLTools.cc Common.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api) + if (WITH_DISTRIBUTE) + cc_library(processgroup_heter SRCS ProcessGroupHeter.cc NCCLTools.cc Common.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api) + endif() endif() + if(WITH_ASCEND_CL) - cc_library(processgroup_hccl SRCS ProcessGroupHCCL.cc DEPS place npu_stream enforce collective_helper device_context phi phi_api eager_api) + cc_library(processgroup_hccl SRCS ProcessGroupHCCL.cc HCCLTools.cc Common.cc DEPS place npu_stream enforce collective_helper device_context phi phi_api eager_api) + if (WITH_DISTRIBUTE) + cc_library(processgroup_heter SRCS ProcessGroupHeter.cc HCCLTools.cc Common.cc DEPS place npu_stream enforce collective_helper device_context phi phi_api eager_api) + endif() endif() diff --git a/paddle/fluid/distributed/collective/Common.cc b/paddle/fluid/distributed/collective/Common.cc new file mode 100644 index 0000000000000000000000000000000000000000..02eab58478ccce3664bcbc3ab99173ceb3f01f66 --- /dev/null +++ b/paddle/fluid/distributed/collective/Common.cc @@ -0,0 +1,54 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/collective/Common.h" + +namespace paddle { +namespace distributed { + +std::vector GetPlaceList(const std::vector& tensors) { + std::vector places; + places.reserve(tensors.size()); + for (auto& tensor : tensors) { + places.push_back(tensor.inner_place()); + } + return places; +} + +std::string GetKeyFromPlaces(const std::vector& places) { + std::string placeList; + for (auto& place : places) { + std::stringstream tmp; + tmp << place; + if (placeList.empty()) { + placeList += tmp.str(); + } else { + placeList += "," + tmp.str(); + } + } + return placeList; +} + +static bool CheckTensorsInPlace(const std::vector& tensors, + const PlaceType type) { + return std::all_of(tensors.cbegin(), tensors.cend(), + [&](const Tensor& t) { return t.place() == type; }); +} + +bool CheckTensorsInCudaPlace(const std::vector& tensors) { + return CheckTensorsInPlace(tensors, PlaceType::kGPU); +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/collective/Common.h b/paddle/fluid/distributed/collective/Common.h new file mode 100644 index 0000000000000000000000000000000000000000..9569f4c61acef6dc31a0be57f8e09c7858db2e50 --- /dev/null +++ b/paddle/fluid/distributed/collective/Common.h @@ -0,0 +1,33 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "paddle/fluid/platform/place.h" +#include "paddle/phi/api/include/api.h" +namespace paddle { +namespace distributed { + +using Tensor = paddle::experimental::Tensor; + +using Place = paddle::platform::Place; +// Get the list of devices from list of tensors +std::vector GetPlaceList(const std::vector& tensors); +// Get the deviceList String from the list of devices +std::string GetKeyFromPlaces(const std::vector& places); + +bool CheckTensorsInCudaPlace(const std::vector& tensors); + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/collective/HCCLTools.cc b/paddle/fluid/distributed/collective/HCCLTools.cc new file mode 100644 index 0000000000000000000000000000000000000000..526a683e057c04466063a34b4d37777446d7ba18 --- /dev/null +++ b/paddle/fluid/distributed/collective/HCCLTools.cc @@ -0,0 +1,46 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/collective/HCCLTools.h" +#include "paddle/fluid/distributed/collective/Types.h" + +namespace paddle { +namespace distributed { + +HcclReduceOp ToHCCLRedType(ReduceOp reduction) { + static const std::map red_type = { + {ReduceOp::MIN, HCCL_REDUCE_MIN}, + {ReduceOp::MAX, HCCL_REDUCE_MAX}, + {ReduceOp::SUM, HCCL_REDUCE_SUM}, + {ReduceOp::PRODUCT, HCCL_REDUCE_PROD}, + }; + auto it = red_type.find(reduction); + PADDLE_ENFORCE_EQ( + it != red_type.end(), true, + platform::errors::InvalidArgument("Invalid hccl reduction. " + "Must be Min | Max | Prod | Sum")); + return it->second; +} + +std::string SerializeHCCLUniqueId(const HcclRootInfo& hcclID) { + const uint8_t* bytes = reinterpret_cast(&hcclID); + std::ostringstream oss; + for (size_t i = 0; i < sizeof(hcclID); ++i) { + oss << std::hex << static_cast(bytes[i]); + } + return oss.str(); +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/collective/HCCLTools.h b/paddle/fluid/distributed/collective/HCCLTools.h index 09789bd4d378630f548f931bcac00fda89ef33be..a1dcf7cd9b62694cdf4ac68bf261c23e96b1a12b 100644 --- a/paddle/fluid/distributed/collective/HCCLTools.h +++ b/paddle/fluid/distributed/collective/HCCLTools.h @@ -18,6 +18,7 @@ #include #include "boost/variant.hpp" +#include "paddle/fluid/distributed/collective/Types.h" #include "paddle/fluid/framework/data_type.h" #include "paddle/fluid/framework/variable.h" #include "paddle/fluid/platform/collective_helper.h" @@ -170,5 +171,8 @@ class HCCLCommManager { mutable std::mutex mutex_; }; +HcclReduceOp ToHCCLRedType(ReduceOp reduction); +std::string SerializeHCCLUniqueId(const HcclRootInfo& hcclID); + } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/collective/NCCLTools.cc b/paddle/fluid/distributed/collective/NCCLTools.cc new file mode 100644 index 0000000000000000000000000000000000000000..7e842ebf92166a4d0f2cf34584af6cab3ec80e3e --- /dev/null +++ b/paddle/fluid/distributed/collective/NCCLTools.cc @@ -0,0 +1,46 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/collective/NCCLTools.h" +#include "paddle/fluid/distributed/collective/Types.h" + +namespace paddle { +namespace distributed { + +ncclRedOp_t ToNCCLRedType(ReduceOp reduction) { + static const std::map red_type = { + {ReduceOp::MIN, ncclMin}, + {ReduceOp::MAX, ncclMax}, + {ReduceOp::SUM, ncclSum}, + {ReduceOp::PRODUCT, ncclProd}, + }; + auto it = red_type.find(reduction); + PADDLE_ENFORCE_EQ(it != red_type.end(), true, + platform::errors::InvalidArgument( + "Invalid nccl reduction. Must be ncclMin | ncclMax | " + "ncclProd | ncclSum")); + return it->second; +} + +std::string SerializeNCCLUniqueId(const ncclUniqueId& ncclID) { + const uint8_t* bytes = reinterpret_cast(&ncclID); + std::ostringstream oss; + for (auto i = 0; i < NCCL_UNIQUE_ID_BYTES; ++i) { + oss << std::hex << static_cast(bytes[i]); + } + return oss.str(); +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/collective/NCCLTools.h b/paddle/fluid/distributed/collective/NCCLTools.h index f30b96e72d4536b0773c9b69b6cb90b2c8c2dc87..0454518b1836c8b232320ea4858874e67f587099 100644 --- a/paddle/fluid/distributed/collective/NCCLTools.h +++ b/paddle/fluid/distributed/collective/NCCLTools.h @@ -26,6 +26,8 @@ #include "paddle/fluid/platform/dynload/nccl.h" #include "paddle/fluid/platform/enforce.h" +#include "paddle/fluid/distributed/collective/Types.h" + namespace paddle { namespace distributed { @@ -194,5 +196,8 @@ class NCCLCommManager { mutable std::mutex mutex_; }; +ncclRedOp_t ToNCCLRedType(ReduceOp reduction); +std::string SerializeNCCLUniqueId(const ncclUniqueId& ncclID); + } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/collective/ProcessGroup.cc b/paddle/fluid/distributed/collective/ProcessGroup.cc index 42ca3bd5f5be49e72662d563ba6e20f3097840ef..ab118dadd5d8870f1da98124e2fe1d0dc95ae18d 100644 --- a/paddle/fluid/distributed/collective/ProcessGroup.cc +++ b/paddle/fluid/distributed/collective/ProcessGroup.cc @@ -34,7 +34,13 @@ bool ProcessGroup::Task::Wait(std::chrono::milliseconds timeout) { void ProcessGroup::Task::Synchronize() {} -ProcessGroup::ProcessGroup(int rank, int size) : rank_(rank), size_(size) {} +ProcessGroup::ProcessGroup(int rank, int size, int gid) + : rank_(rank), size_(size) { + if (gid != IGNORE_ID) { + auto map = ProcessGroupMapFromGid::getInstance(); + map->insert(gid, this); + } +} } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/collective/ProcessGroup.h b/paddle/fluid/distributed/collective/ProcessGroup.h index e43d0e8c183c7005f31b66c4c29dfc95361485e4..36a00a7d31758e668635b5948985d7d26dab1324 100644 --- a/paddle/fluid/distributed/collective/ProcessGroup.h +++ b/paddle/fluid/distributed/collective/ProcessGroup.h @@ -31,6 +31,7 @@ constexpr auto kWaitTimeout = std::chrono::milliseconds(0); namespace paddle { namespace distributed { +constexpr int IGNORE_ID = -1; using Tensor = paddle::experimental::Tensor; enum class CommType : std::uint8_t { @@ -49,14 +50,6 @@ enum class CommType : std::uint8_t { UNKNOWN = 100, }; -struct ProcessGroupStrategy { - int nranks_{1}; - int local_rank_{0}; - std::vector trainer_endpoints_{}; - std::string current_endpoint_{""}; - int nrings_{1}; -}; - class ProcessGroup { public: class Task { @@ -76,7 +69,7 @@ class ProcessGroup { bool is_completed_ = false; }; - explicit ProcessGroup(int rank, int size); + explicit ProcessGroup(int rank, int size, int gid); virtual ~ProcessGroup() {} int GetRank() const { return rank_; } @@ -99,6 +92,12 @@ class ProcessGroup { "ProcessGroup%s does not support broadcast", GetBackendName())); } + virtual void Broadcast(const phi::DenseTensor* in, phi::DenseTensor* out) { + PADDLE_THROW(platform::errors::InvalidArgument( + "ProcessGroup%s does not support broadcast for static", + GetBackendName())); + } + virtual std::shared_ptr Barrier( const BarrierOptions& = BarrierOptions()) { PADDLE_THROW(platform::errors::InvalidArgument( @@ -151,5 +150,38 @@ class ProcessGroup { const int size_; }; +class ProcessGroupMapFromGid { + public: + bool has(int gid) { + auto it = map_.find(gid); + return it != map_.end(); + } + + void insert(int gid, ProcessGroup* pg) { + PADDLE_ENFORCE_EQ(has(gid), false, + platform::errors::PreconditionNotMet( + "The process group with id %d doesnot exist.", gid)); + map_[gid] = pg; + } + + ProcessGroup* get(int gid) { + PADDLE_ENFORCE_EQ(has(gid), false, + platform::errors::PreconditionNotMet( + "The process group with id %d doesnot exist.", gid)); + return map_.find(gid)->second; + } + + static std::shared_ptr getInstance() { + static auto s_instance = std::make_shared(); + return s_instance; + } + + ProcessGroupMapFromGid() = default; + ~ProcessGroupMapFromGid() = default; + + private: + std::unordered_map map_; +}; + } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/collective/ProcessGroupGloo.cc b/paddle/fluid/distributed/collective/ProcessGroupGloo.cc index cb82677a281e990d9837f081b0d4d2f3b0a34a26..91c3bf93849e0f6f960554558896059f4ffd7a08 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupGloo.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupGloo.cc @@ -173,8 +173,10 @@ ProcessGroupGloo::GlooTask::GlooTask(int rank, ProcessGroupGloo::ProcessGroupGloo( const std::shared_ptr& store, int rank, - int world_size, const std::shared_ptr options) - : ProcessGroup(rank, world_size), _tag(0), _store(new GlooStore(store)) { + int world_size, int gid, const std::shared_ptr options) + : ProcessGroup(rank, world_size, gid), + _tag(0), + _store(new GlooStore(store)) { _context = std::make_shared(rank, world_size); auto prefix_store = ::gloo::rendezvous::PrefixStore(std::to_string(0), *_store); diff --git a/paddle/fluid/distributed/collective/ProcessGroupGloo.h b/paddle/fluid/distributed/collective/ProcessGroupGloo.h index 71e0a40f8a76181d9f4db13ddd57b31de676910b..f0bf872cfc9e42a4faee31a64e6d4a1465e803b9 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupGloo.h +++ b/paddle/fluid/distributed/collective/ProcessGroupGloo.h @@ -101,7 +101,7 @@ class ProcessGroupGloo : public ProcessGroup { explicit ProcessGroupGloo( const std::shared_ptr& store, int rank, - int world_size, std::shared_ptr options); + int world_size, int gid, std::shared_ptr options); ~ProcessGroupGloo() = default; diff --git a/paddle/fluid/distributed/collective/ProcessGroupHCCL.cc b/paddle/fluid/distributed/collective/ProcessGroupHCCL.cc index 2deeb7ca03003d0b6c8fa0948afa0a3394639f8b..b21155e09d06eec8869751ba00d30215d1bca01e 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupHCCL.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupHCCL.cc @@ -13,6 +13,8 @@ // limitations under the License. #include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h" +#include "paddle/fluid/distributed/collective/Common.h" +#include "paddle/fluid/distributed/collective/HCCLTools.h" #include "paddle/fluid/memory/malloc.h" #include "paddle/fluid/platform/device/npu/hccl_helper.h" #include "paddle/fluid/platform/device_context.h" @@ -28,55 +30,6 @@ constexpr int64_t kWaitBlockTImeout = 10; namespace paddle { namespace distributed { -static HcclReduceOp ToHCCLRedType(ReduceOp reduction) { - static const std::map red_type = { - {ReduceOp::MIN, HCCL_REDUCE_MIN}, - {ReduceOp::MAX, HCCL_REDUCE_MAX}, - {ReduceOp::SUM, HCCL_REDUCE_SUM}, - {ReduceOp::PRODUCT, HCCL_REDUCE_PROD}, - }; - auto it = red_type.find(reduction); - PADDLE_ENFORCE_EQ( - it != red_type.end(), true, - platform::errors::InvalidArgument("Invalid hccl reduction. " - "Must be Min | Max | Prod | Sum")); - return it->second; -} - -std::string SerializeHCCLUniqueId(const HcclRootInfo& hcclID) { - const uint8_t* bytes = reinterpret_cast(&hcclID); - std::ostringstream oss; - for (size_t i = 0; i < sizeof(hcclID); ++i) { - oss << std::hex << static_cast(bytes[i]); - } - return oss.str(); -} - -// Get the list of devices from list of tensors -std::vector GetPlaceList(const std::vector& tensors) { - std::vector places; - places.reserve(tensors.size()); - for (auto& tensor : tensors) { - places.push_back(tensor.inner_place()); - } - return places; -} - -// Get the deviceList String from the list of devices -std::string GetKeyFromPlaces(const std::vector& places) { - std::string placeList; - for (auto& place : places) { - std::stringstream tmp; - tmp << place; - if (placeList.empty()) { - placeList += tmp.str(); - } else { - placeList += "," + tmp.str(); - } - } - return placeList; -} - // bool CheckTensorsInNPUPlace(const std::vector& tensors) { // return std::all_of(tensors.cbegin(), tensors.cend(), [&](const Tensor& t) { // return t.place() == platform::DeviceType::NPU; @@ -150,8 +103,8 @@ bool ProcessGroupHCCL::HCCLTask::Wait(std::chrono::milliseconds timeout) { void ProcessGroupHCCL::HCCLTask::Synchronize() { Wait(kWaitTimeout); } ProcessGroupHCCL::ProcessGroupHCCL(const std::shared_ptr& store, - int rank, int size) - : ProcessGroup(rank, size), store_(store) {} + int rank, int size, int gid) + : ProcessGroup(rank, size, gid), store_(store) {} void ProcessGroupHCCL::BroadcastUniqueHCCLID( std::vector& hccl_ids) { // NOLINT diff --git a/paddle/fluid/distributed/collective/ProcessGroupHCCL.h b/paddle/fluid/distributed/collective/ProcessGroupHCCL.h index 83d509be2cdd7b79faf4e2a2f510c34361b94157..932ae75fc6b9d6578260f74f994706dae84d2a95 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupHCCL.h +++ b/paddle/fluid/distributed/collective/ProcessGroupHCCL.h @@ -70,7 +70,8 @@ class ProcessGroupHCCL : public ProcessGroup { private: }; - ProcessGroupHCCL(const std::shared_ptr& store, int rank, int size); + ProcessGroupHCCL(const std::shared_ptr& store, int rank, int size, + int gid); const std::string GetBackendName() const override { return std::string(HCCL_BACKEND_NAME); diff --git a/paddle/fluid/distributed/collective/ProcessGroupHeter.cc b/paddle/fluid/distributed/collective/ProcessGroupHeter.cc new file mode 100644 index 0000000000000000000000000000000000000000..ffd653042494ddac067ab86f1e857efea4b375a3 --- /dev/null +++ b/paddle/fluid/distributed/collective/ProcessGroupHeter.cc @@ -0,0 +1,209 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/collective/ProcessGroupHeter.h" +#include "paddle/fluid/platform/device/gpu/nccl_helper.h" +#include "paddle/fluid/platform/place.h" +#include "paddle/phi/api/include/api.h" +#include "paddle/phi/common/place.h" + +constexpr int64_t kWaitBlockTImeout = 10; + +namespace paddle { +namespace distributed { + +using Place = paddle::platform::Place; + +std::shared_ptr ProcessGroupHeter::CreateTask( + int rank, CommType comm_type, const std::vector& inputs) { + return std::make_shared(rank, comm_type, + inputs); +} + +ProcessGroupHeter::HeterTask::HeterTask(int rank, CommType CommType, + const std::vector& inputs) + : Task(rank, inputs, CommType) {} + +ProcessGroupHeter::HeterTask::~HeterTask() {} + +bool ProcessGroupHeter::HeterTask::IsCompleted() { return true; } + +// TODO(sheniang03): Add timeout for wait, now timeout unused +bool ProcessGroupHeter::HeterTask::Wait(std::chrono::milliseconds timeout) { + return true; +} + +ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr& store, + int rank, int size, int gid, + int local_rank, int local_size, + int gloo_rank, int gloo_size, + bool with_switch, + std::string switch_endpoint) + : ProcessGroup(rank, size, gid), + store_(store), + local_rank_(local_rank), + local_size_(local_size), + gloo_rank_(gloo_rank), + gloo_size_(gloo_size), + with_switch_(with_switch) { +#if defined(PADDLE_WITH_NCCL) + inner_pg_ = std::make_shared(store, local_rank, local_size, + IGNORE_ID); +#elif defined(PADDLE_WITH_ASCEND_CL) + inner_pg_ = std::make_shared(store, local_rank, local_size, + IGNORE_ID); +#else + PADDLE_THROW(platform::errors::InvalidArgument( + "ProcessGroupHeter only supports NCCL and HCCL now."); +#endif + if (with_switch_) { + // TODO(sandyhouse) starts a client to connect the cloud switch module + // std::shared_ptr client_ = + // HeterClient::GetInstance({switch_endpoint}, {}, 0); + } else if (local_rank_ == 0) { + auto opts = ProcessGroupGloo::GlooOptions::create(); + opts->device = ProcessGroupGloo::createDefaultDevice(); + inter_pg_ = std::make_shared(store, gloo_rank_, + gloo_size_, IGNORE_ID, opts); + } +} + +std::shared_ptr ProcessGroupHeter::AllReduce( + std::vector& tensors, const AllreduceOptions& opts) { +#if defined(PADDLE_WITH_NCCL) + PADDLE_ENFORCE_EQ( + CheckTensorsInCudaPlace(tensors), true, + platform::errors::InvalidArgument("All inputs should be in CudaPlace.")); +#endif + + // Step1: do allreduce in inner cluster + auto task = inner_pg_->AllReduce(tensors, opts); + task->Wait(); + + // Step2: copy tensors to CPU + if (local_rank_ == 0) { + std::vector cpu_tensors(tensors.size()); + for (size_t i = 0; i < tensors.size(); i++) { + auto dense_gpu_tensor = + std::dynamic_pointer_cast(tensors[i].impl()); + auto dense_cpu_tensor = + std::dynamic_pointer_cast(cpu_tensors[i].impl()); + dense_cpu_tensor->Resize(tensors[i].dims()); + framework::TensorCopySync(*dense_gpu_tensor, platform::CPUPlace(), + dense_cpu_tensor.get()); + } + // Step3: do inter cluster allreduce + if (with_switch_) { + // TODO(sandyhouse) send to and recv from switch, and do add + } else { + auto gloo_task = inter_pg_->AllReduce(cpu_tensors, opts); + gloo_task->Wait(); + } + // Step4: copy cpu tensors to gpu + // TODO(sandyhouse) + // copy cpu tensors to gpu + for (size_t i = 0; i < tensors.size(); i++) { + auto dense_gpu_tensor = + std::dynamic_pointer_cast(tensors[i].impl()); + auto dense_cpu_tensor = + std::dynamic_pointer_cast(cpu_tensors[i].impl()); + // framework::TensorCopySync(*dense_cpu_tensor, tensors[i].place(), + // dense_gpu_tensor.get()); + framework::TensorCopySync(*dense_cpu_tensor, dense_cpu_tensor->place(), + dense_gpu_tensor.get()); + } + } + + // Step5: broadcast among inner cluster + auto b_opts = BroadcastOptions(); + b_opts.source_root = 0; + auto broadcast_task = inner_pg_->Broadcast(tensors, b_opts); + broadcast_task->Wait(); + return CreateTask(rank_, CommType::ALLREDUCE, tensors); +} + +std::shared_ptr ProcessGroupHeter::Broadcast( + std::vector& tensors, const BroadcastOptions& opts) { +#if defined(PADDLE_WITH_NCCL) + PADDLE_ENFORCE_EQ( + CheckTensorsInCudaPlace(tensors), true, + platform::errors::InvalidArgument("All inputs should be in CudaPlace.")); +#endif + + // Step1: do broadcast in inner cluster + auto b_opts = BroadcastOptions(); + b_opts.source_root = 0; + inner_pg_->Broadcast(tensors, b_opts); + + if (local_rank_ == 0) { + std::vector cpu_tensors(tensors.size()); + for (size_t i = 0; i < tensors.size(); i++) { + auto dense_gpu_tensor = + std::dynamic_pointer_cast(tensors[i].impl()); + auto dense_cpu_tensor = + std::dynamic_pointer_cast(cpu_tensors[i].impl()); + dense_cpu_tensor->Resize(tensors[i].dims()); + framework::TensorCopySync(*dense_gpu_tensor, platform::CPUPlace(), + dense_cpu_tensor.get()); + } + if (with_switch_) { + // TODO(sandyhouse) send to and recv + } else { + auto gloo_task = inter_pg_->Broadcast(cpu_tensors, opts); + gloo_task->Wait(); + } + for (size_t i = 0; i < tensors.size(); i++) { + auto dense_gpu_tensor = + std::dynamic_pointer_cast(tensors[i].impl()); + auto dense_cpu_tensor = + std::dynamic_pointer_cast(cpu_tensors[i].impl()); + // framework::TensorCopySync(*dense_cpu_tensor, tensors[i].place(), + // dense_gpu_tensor.get()); + framework::TensorCopySync(*dense_cpu_tensor, dense_cpu_tensor->place(), + dense_gpu_tensor.get()); + } + } + auto broadcast_task = inner_pg_->Broadcast(tensors, b_opts); + broadcast_task->Wait(); + return CreateTask(rank_, CommType::BROADCAST, tensors); +} + +void ProcessGroupHeter::Broadcast(const phi::DenseTensor* in, + phi::DenseTensor* out) { + // Step1: do broadcast in inner cluster + inner_pg_->Broadcast(in, out); + + if (local_rank_ == 0) { + Tensor cpu_tensor; + auto dense_cpu_tensor = + std::dynamic_pointer_cast(cpu_tensor.impl()); + dense_cpu_tensor->Resize(in->dims()); + framework::TensorCopySync(*in, platform::CPUPlace(), + dense_cpu_tensor.get()); + if (with_switch_) { + // TODO(sandyhouse) send to and recv + } else { + std::vector cpu_tensors = {cpu_tensor}; + // auto gloo_task = inter_pg_->Broadcast(cpu_tensors); + // gloo_task->Wait(); + inter_pg_->Broadcast(cpu_tensors); + } + framework::TensorCopySync(*dense_cpu_tensor, dense_cpu_tensor->place(), + out); + } + inner_pg_->Broadcast(out, out); +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/collective/ProcessGroupHeter.h b/paddle/fluid/distributed/collective/ProcessGroupHeter.h new file mode 100644 index 0000000000000000000000000000000000000000..8a26adbea4d78dc428e0241cc50f47ef06756234 --- /dev/null +++ b/paddle/fluid/distributed/collective/ProcessGroupHeter.h @@ -0,0 +1,114 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paddle/fluid/distributed/collective/ProcessGroup.h" +#include "paddle/fluid/distributed/collective/ProcessGroupGloo.h" +// #include "paddle/fluid/distributed/ps/service/heter_client.h" +#include "paddle/fluid/platform/device_context.h" + +#ifdef PADDLE_WITH_GLOO +#include "paddle/fluid/framework/fleet/gloo_wrapper.h" +#endif + +#include "paddle/fluid/distributed/store/store.h" +#include "paddle/fluid/platform/enforce.h" +#include "paddle/fluid/platform/gen_comm_id_helper.h" +#include "paddle/fluid/platform/place.h" +#include "paddle/fluid/platform/stream/cuda_stream.h" + +#if defined(PADDLE_WITH_NCCL) +#include "paddle/fluid/distributed/collective/NCCLTools.h" +#include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h" +#include "paddle/fluid/platform/cuda_device_guard.h" +#include "paddle/fluid/platform/dynload/nccl.h" +#endif + +#if defined(PADDLE_WITH_ASCEND_CL) +#include "paddle/fluid/distributed/collective/HCCLTools.h" +#include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h" +#endif + +#include "paddle/fluid/distributed/collective/Common.h" + +constexpr const char* HETER_BACKEND_NAME = "HETER_BACKEND"; + +namespace paddle { +namespace distributed { + +using Place = paddle::platform::Place; + +class ProcessGroupHeter : public ProcessGroup { + public: + class HeterTask : public ProcessGroup::Task, + public std::enable_shared_from_this { + public: + HeterTask(int rank, CommType CommType, const std::vector& inputs); + + bool IsCompleted(); + + void SynchronizeStreams() {} + + bool Wait(std::chrono::milliseconds timeout = kWaitTimeout); + + void Synchronize() {} + + virtual ~HeterTask(); + }; + + ProcessGroupHeter(const std::shared_ptr& store, int rank, int size, + int gid, int local_rank, int local_size, int gloo_rank, + int gloo_size, bool with_switch, + std::string switch_endpoints); + + const std::string GetBackendName() const override { + return std::string(HETER_BACKEND_NAME); + } + + std::shared_ptr AllReduce( + std::vector& tensors, + const AllreduceOptions& = AllreduceOptions()) override; + + std::shared_ptr Broadcast( + std::vector& tensors, + const BroadcastOptions& = BroadcastOptions()) override; + + void Broadcast(const phi::DenseTensor* in, phi::DenseTensor* out) override; + + protected: + virtual std::shared_ptr CreateTask( + int rank, CommType opType, const std::vector& inputs); + + private: + std::shared_ptr store_; + std::shared_ptr inner_pg_; + std::shared_ptr inter_pg_; + + int local_rank_; + int local_size_; + int gloo_rank_; + int gloo_size_; + bool with_switch_; +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc index 2af407f711ec10097e5692d7e189cf847bcfeaf1..7c0752b5f367c376d1170c5f51d1e8a57483fa54 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h" +#include "paddle/fluid/distributed/collective/Common.h" #include "paddle/fluid/platform/device/gpu/nccl_helper.h" #include "paddle/fluid/platform/place.h" #include "paddle/phi/api/include/api.h" @@ -26,61 +27,6 @@ constexpr int64_t kWaitBlockTImeout = 10; namespace paddle { namespace distributed { -static ncclRedOp_t ToNCCLRedType(ReduceOp reduction) { - static const std::map red_type = { - {ReduceOp::MIN, ncclMin}, - {ReduceOp::MAX, ncclMax}, - {ReduceOp::SUM, ncclSum}, - {ReduceOp::PRODUCT, ncclProd}, - }; - auto it = red_type.find(reduction); - PADDLE_ENFORCE_EQ(it != red_type.end(), true, - platform::errors::InvalidArgument( - "Invalid nccl reduction. Must be ncclMin | ncclMax | " - "ncclProd | ncclSum")); - return it->second; -} - -std::string SerializeNCCLUniqueId(const ncclUniqueId& ncclID) { - const uint8_t* bytes = reinterpret_cast(&ncclID); - std::ostringstream oss; - for (auto i = 0; i < NCCL_UNIQUE_ID_BYTES; ++i) { - oss << std::hex << static_cast(bytes[i]); - } - return oss.str(); -} - -// Get the list of devices from list of tensors -std::vector GetPlaceList(const std::vector& tensors) { - std::vector places; - places.reserve(tensors.size()); - for (auto& tensor : tensors) { - places.push_back(tensor.inner_place()); - } - return places; -} - -// Get the deviceList String from the list of devices -std::string GetKeyFromPlaces(const std::vector& places) { - std::string placeList; - for (auto& place : places) { - std::stringstream tmp; - tmp << place; - if (placeList.empty()) { - placeList += tmp.str(); - } else { - placeList += "," + tmp.str(); - } - } - return placeList; -} - -bool CheckTensorsInCudaPlace(const std::vector& tensors) { - return std::all_of(tensors.cbegin(), tensors.cend(), [&](const Tensor& t) { - return t.place() == PlaceType::kGPU; - }); -} - void SyncDefaultStream( const std::vector& places, std::vector& ncclEvents, // NOLINT @@ -157,8 +103,8 @@ bool ProcessGroupNCCL::NCCLTask::Wait(std::chrono::milliseconds timeout) { void ProcessGroupNCCL::NCCLTask::Synchronize() { Wait(kWaitTimeout); } ProcessGroupNCCL::ProcessGroupNCCL(const std::shared_ptr& store, - int rank, int size) - : ProcessGroup(rank, size), store_(store) {} + int rank, int size, int gid) + : ProcessGroup(rank, size, gid), store_(store) {} void ProcessGroupNCCL::BroadcastUniqueNCCLID( std::vector& nccl_ids) { // NOLINT diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h index aa2a2b8fa2088cd30729ba5e6184ef7a9c507bf3..4ab5374dacaf4a94e028f4bdabd749cbd311992a 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h @@ -76,7 +76,8 @@ class ProcessGroupNCCL : public ProcessGroup { private: }; - ProcessGroupNCCL(const std::shared_ptr& store, int rank, int size); + ProcessGroupNCCL(const std::shared_ptr& store, int rank, int size, + int gid); const std::string GetBackendName() const override { return std::string(NCCL_BACKEND_NAME); diff --git a/paddle/fluid/operators/collective/c_broadcast_op.cu.cc b/paddle/fluid/operators/collective/c_broadcast_op.cu.cc index b16f256ee6cf3ddf76b2e01e9af78df81144502d..0ad61bb16b51ecacdef8e7da387ca36f65d36942 100644 --- a/paddle/fluid/operators/collective/c_broadcast_op.cu.cc +++ b/paddle/fluid/operators/collective/c_broadcast_op.cu.cc @@ -18,6 +18,8 @@ limitations under the License. */ #include "paddle/fluid/platform/collective_helper.h" #include "paddle/fluid/platform/device/gpu/nccl_helper.h" #endif +#include "paddle/fluid/distributed/collective/ProcessGroup.h" +#include "paddle/phi/api/include/tensor.h" namespace paddle { namespace operators { @@ -36,6 +38,13 @@ class CBroadcastOpCUDAKernel : public framework::OpKernel { int rid = ctx.Attr("ring_id"); auto place = ctx.GetPlace(); auto comm = platform::NCCLCommContext::Instance().Get(rid, place); + auto map = distributed::ProcessGroupMapFromGid::getInstance(); + if (map->has(rid)) { + // Use ProcessGroup + distributed::ProcessGroup* pg = map->get(rid); + pg->Broadcast(x, out); + return; + } gpuStream_t stream = nullptr; if (ctx.Attr("use_calc_stream")) { diff --git a/paddle/fluid/pybind/distributed_py.cc b/paddle/fluid/pybind/distributed_py.cc index e89d8d96342e723724bb867a14bc4262c6ab7b16..6c74ea2eef4d0de46bcee1c0aa1c50f8ed2fc8ef 100644 --- a/paddle/fluid/pybind/distributed_py.cc +++ b/paddle/fluid/pybind/distributed_py.cc @@ -213,16 +213,20 @@ void BindDistributed(py::module *m) { py::class_>( *m, "ProcessGroupNCCL", ProcessGroup) - .def(py::init &, int, int>(), - py::call_guard()); + .def(py::init &, int, int, + int>(), + py::arg("store"), py::arg("rank"), py::arg("world_size"), + py::arg("group_id") = 0, py::call_guard()); #endif #if defined(PADDLE_WITH_ASCEND_CL) py::class_>( *m, "ProcessGroupHCCL", ProcessGroup) - .def(py::init &, int, int>(), - py::call_guard()); + .def(py::init &, int, int, + int>(), + py::arg("store"), py::arg("rank"), py::arg("world_size"), + py::arg("group_id") = 0, py::call_guard()); #endif py::class_>( *m, "ProcessGroupGloo", ProcessGroup) .def(py::init &, int, - int, std::shared_ptr &>(), + int, int, std::shared_ptr &>(), py::call_guard()) .def(py::init([](const std::shared_ptr &store, - int rank, int world_size) { + int rank, int world_size, int gid) { auto opts = GlooOptions::create(); char *ifname = getenv(GLOO_SOCKET_IFNAME_ENV.c_str()); if (ifname && strlen(ifname) > 1) { @@ -251,10 +255,10 @@ void BindDistributed(py::module *m) { opts->device = ProcessGroupGloo::createDefaultDevice(); } return std::make_shared(store, rank, world_size, - opts); + gid, opts); }), py::arg("store"), py::arg("rank"), py::arg("world_size"), - py::call_guard()) + py::arg("group_id") = 0, py::call_guard()) .def_static("create_default_device", &ProcessGroupGloo::createDefaultDevice); #endif