From 80ca78a212c38cfa6763cd5f70d3fb0d16d44e1b Mon Sep 17 00:00:00 2001 From: ronnywang Date: Wed, 3 Aug 2022 18:52:16 +0800 Subject: [PATCH] [CustomDevice] add custom ccl 2/2 (#44650) * [CustomDevice] add custom ccl 2/2 * update * update * update launch --- .../distributed/collective/CMakeLists.txt | 13 + .../distributed/collective/CustomCCLTools.cc | 47 +++ .../distributed/collective/CustomCCLTools.h | 198 ++++++++++++ .../collective/ProcessGroupCustom.cc | 289 ++++++++++++++++++ .../collective/ProcessGroupCustom.h | 129 ++++++++ .../collective/ProcessGroupHeter.cc | 14 +- .../collective/ProcessGroupHeter.h | 5 + .../fluid/distributed/collective/reducer.cc | 157 +++++++--- paddle/fluid/pybind/CMakeLists.txt | 3 + paddle/fluid/pybind/distributed_py.cc | 22 ++ python/paddle/distributed/collective.py | 5 +- .../paddle/distributed/fleet/launch_utils.py | 13 +- .../distributed/launch/context/device.py | 38 ++- .../launch/controllers/collective.py | 3 + python/paddle/distributed/parallel.py | 42 +-- python/paddle/fluid/dygraph/parallel.py | 42 ++- .../fluid/tests/custom_runtime/CMakeLists.txt | 10 +- .../test_fleet_launch_custom_device.sh | 6 +- 18 files changed, 956 insertions(+), 80 deletions(-) create mode 100644 paddle/fluid/distributed/collective/CustomCCLTools.cc create mode 100644 paddle/fluid/distributed/collective/CustomCCLTools.h create mode 100644 paddle/fluid/distributed/collective/ProcessGroupCustom.cc create mode 100644 paddle/fluid/distributed/collective/ProcessGroupCustom.h diff --git a/paddle/fluid/distributed/collective/CMakeLists.txt b/paddle/fluid/distributed/collective/CMakeLists.txt index 8644a2bfa43..116b3db40d6 100644 --- a/paddle/fluid/distributed/collective/CMakeLists.txt +++ b/paddle/fluid/distributed/collective/CMakeLists.txt @@ -51,3 +51,16 @@ if(WITH_ASCEND_CL) eager_api) endif() endif() + +if(WITH_CUSTOM_DEVICE) + cc_library( + processgroup_custom + SRCS ProcessGroupCustom.cc CustomCCLTools.cc Common.cc + DEPS phi_backends + place + enforce + collective_helper + device_context + phi_api + eager_api) +endif() diff --git a/paddle/fluid/distributed/collective/CustomCCLTools.cc b/paddle/fluid/distributed/collective/CustomCCLTools.cc new file mode 100644 index 00000000000..40c7c11529d --- /dev/null +++ b/paddle/fluid/distributed/collective/CustomCCLTools.cc @@ -0,0 +1,47 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/collective/CustomCCLTools.h" +#include "paddle/fluid/distributed/collective/Types.h" + +namespace paddle { +namespace distributed { + +phi::ccl::CCLReduceOp ToCustomCCLRedType(ReduceOp reduction) { + static const std::map red_type = { + {ReduceOp::MIN, phi::ccl::CCLReduceOp::MIN}, + {ReduceOp::MAX, phi::ccl::CCLReduceOp::MAX}, + {ReduceOp::SUM, phi::ccl::CCLReduceOp::SUM}, + {ReduceOp::PRODUCT, phi::ccl::CCLReduceOp::PRODUCT}, + }; + auto it = red_type.find(reduction); + PADDLE_ENFORCE_EQ( + it != red_type.end(), + true, + platform::errors::InvalidArgument("Invalid hccl reduction. " + "Must be Min | Max | Prod | Sum")); + return it->second; +} + +std::string SerializeCustomCCLUniqueId(const phi::ccl::CCLRootId& ccl_id) { + const uint8_t* bytes = ccl_id.data(); + std::ostringstream oss; + for (size_t i = 0; i < ccl_id.size(); ++i) { + oss << std::hex << static_cast(bytes[i]); + } + return oss.str(); +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/collective/CustomCCLTools.h b/paddle/fluid/distributed/collective/CustomCCLTools.h new file mode 100644 index 00000000000..cc8faba5224 --- /dev/null +++ b/paddle/fluid/distributed/collective/CustomCCLTools.h @@ -0,0 +1,198 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "paddle/fluid/distributed/collective/Types.h" +#include "paddle/fluid/framework/data_type.h" +#include "paddle/fluid/framework/variable.h" +#include "paddle/fluid/platform/collective_helper.h" +#include "paddle/fluid/platform/device_context.h" +#include "paddle/fluid/platform/enforce.h" +#include "paddle/phi/backends/device_guard.h" +#include "paddle/phi/backends/device_manager.h" + +namespace paddle { +namespace distributed { + +class CustomEventManager { + public: + CustomEventManager() = default; + + ~CustomEventManager() { + if (is_created_) { + event_->Destroy(); + } + } + + CustomEventManager(const CustomEventManager&) = delete; + CustomEventManager& operator=(const CustomEventManager&) = delete; + + CustomEventManager(CustomEventManager&& other) { + std::swap(is_created_, other.is_created_); + std::swap(device_index_, other.device_index_); + std::swap(device_type_, other.device_type_); + std::swap(event_, other.event_); + } + + CustomEventManager& operator=(CustomEventManager&& other) { + std::swap(is_created_, other.is_created_); + std::swap(device_index_, other.device_index_); + std::swap(device_type_, other.device_type_); + std::swap(event_, other.event_); + return *this; + } + + bool IsCreated() const { return is_created_; } + int8_t DeviceId() const { return device_index_; } + std::string DeviceType() const { return device_type_; } + phi::event::event_t GetRawCustomEvent() const { return event_->raw_event(); } + phi::event::Event* GetCustomEvent() const { return event_.get(); } + + void Record(const paddle::platform::CustomDeviceContext& ctx) { + auto place = ctx.GetPlace(); + auto device_type = place.GetDeviceType(); + auto device_index = place.GetDeviceId(); + if (!is_created_) { + CreateEvent(place); + } + PADDLE_ENFORCE_EQ(device_index, + device_index_, + platform::errors::PreconditionNotMet( + "CustomDeviceContext's device %d does not match" + "Event's device %d", + device_index, + device_index_)); + PADDLE_ENFORCE_EQ(device_type, + device_type_, + platform::errors::PreconditionNotMet( + "CustomDeviceContext's device %d does not match" + "Event's device type %d", + device_type, + device_type_)); + + phi::DeviceGuard guard(place); + phi::stream::Stream stream(place, ctx.stream()); + event_->Record(&stream); + } + + bool Query() const { return event_->Query(); } + + void Block(const paddle::platform::CustomDeviceContext& ctx) const { + if (is_created_) { + auto place = ctx.GetPlace(); + auto device_type = place.GetDeviceType(); + auto device_index = place.GetDeviceId(); + PADDLE_ENFORCE_EQ(device_index, + device_index_, + platform::errors::PreconditionNotMet( + "CustomDeviceContext's device %d does not match" + "Event's device %d", + device_index, + device_index_)); + PADDLE_ENFORCE_EQ(device_type, + device_type_, + platform::errors::PreconditionNotMet( + "CustomDeviceContext's device %d does not match" + "Event's device type %d", + device_type, + device_type_)); + phi::DeviceGuard guard(place); + phi::stream::Stream stream(place, ctx.stream()); + stream.WaitEvent(event_.get()); + } + } + + private: + bool is_created_{false}; + std::shared_ptr event_{nullptr}; + int8_t device_index_{0}; + std::string device_type_; + + private: + void CreateEvent(const platform::Place& place) { + device_index_ = place.GetDeviceId(); + device_type_ = place.GetDeviceType(); + event_.reset(new phi::event::Event); + event_->Init(place); + is_created_ = true; + } +}; + +class CustomCCLCommManager { + public: + CustomCCLCommManager(const std::string& device_type, + phi::ccl::CCLComm ccl_comm) + : device_type_(device_type), ccl_comm_(ccl_comm) {} + + CustomCCLCommManager() : CustomCCLCommManager("", nullptr) {} + + ~CustomCCLCommManager() noexcept { + std::unique_lock lock(mutex_); + if (ccl_comm_) { + phi::DeviceManager::CCLDestroyComm(device_type_, ccl_comm_); + } + } + + static std::shared_ptr Create( + const std::string& device_type, + int num_ranks, + int rank, + phi::ccl::CCLRootId* comm_id, + phi::ccl::CCLComm* ccl_comm) { + auto custom_ccl_manager = std::make_shared(); + phi::DeviceManager::CCLCommInitRank( + device_type, num_ranks, comm_id, rank, ccl_comm); + custom_ccl_manager->device_type_ = device_type; + custom_ccl_manager->ccl_id_ = comm_id; + custom_ccl_manager->rank_ = rank; + custom_ccl_manager->ccl_comm_ = *ccl_comm; + return custom_ccl_manager; + } + + phi::ccl::CCLRootId* GetCustomCCLId() const { + std::unique_lock lock(mutex_); + return ccl_id_; + } + + phi::ccl::CCLComm GetCustomCCLComm() const { + std::unique_lock lock(mutex_); + return ccl_comm_; + } + + CustomCCLCommManager(const CustomCCLCommManager&) = delete; + CustomCCLCommManager& operator=(const CustomCCLCommManager&) = delete; + CustomCCLCommManager& operator=(CustomCCLCommManager&& other) = delete; + + CustomCCLCommManager(CustomCCLCommManager&& other) { + std::unique_lock lock(other.mutex_); + std::swap(ccl_comm_, other.ccl_comm_); + } + + protected: + std::string device_type_; + phi::ccl::CCLComm ccl_comm_; + phi::ccl::CCLRootId* ccl_id_; + int rank_; + mutable std::mutex mutex_; +}; + +phi::ccl::CCLReduceOp ToCustomCCLRedType(ReduceOp reduction); +std::string SerializeCustomCCLUniqueId(const phi::ccl::CCLRootId& ccl_id); + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/collective/ProcessGroupCustom.cc b/paddle/fluid/distributed/collective/ProcessGroupCustom.cc new file mode 100644 index 00000000000..73a0b631eef --- /dev/null +++ b/paddle/fluid/distributed/collective/ProcessGroupCustom.cc @@ -0,0 +1,289 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/collective/ProcessGroupCustom.h" + +#include "paddle/fluid/distributed/collective/Common.h" +#include "paddle/fluid/distributed/collective/CustomCCLTools.h" +#include "paddle/fluid/memory/malloc.h" +#include "paddle/fluid/platform/device_context.h" +#include "paddle/fluid/platform/place.h" +#include "paddle/phi/api/include/api.h" +#include "paddle/phi/common/place.h" + +DECLARE_bool(xccl_blocking_wait); + +constexpr int64_t kWaitBlockTImeout = 10; + +namespace paddle { +namespace distributed { + +void SyncDefaultStream( + const std::vector& places, + std::vector& cclEvents, // NOLINT + std::vector>& dev_ctx) { // NOLINT + for (size_t i = 0; i < places.size(); ++i) { + auto* default_ctx = static_cast( + platform::DeviceContextPool::Instance().Get(places[i])); + cclEvents[i].Record(*dev_ctx[i]); + cclEvents[i].Block(*default_ctx); + } +} + +std::shared_ptr ProcessGroupCustom::CreateTask( + std::vector places, + int rank, + CommType comm_type, + const std::vector& inputs) { + return std::make_shared( + places, rank, comm_type, inputs); +} + +ProcessGroupCustom::CustomTask::CustomTask( + const std::vector& places, + int rank, + CommType CommType, + const std::vector& inputs) + : Task(rank, inputs, CommType), places_(places) { + control_events_.resize(places.size()); + cclComms_.resize(places.size()); +} + +ProcessGroupCustom::CustomTask::~CustomTask() {} + +void ProcessGroupCustom::CustomTask::SetOutputs( + std::vector& outputs) { // NOLINT + outputs_ = std::make_shared>(outputs); +} + +void ProcessGroupCustom::CustomTask::SynchronizeStreams() { + for (size_t i = 0; i < places_.size(); ++i) { + auto* default_ctx = static_cast( + platform::DeviceContextPool::Instance().Get(places_[i])); + phi::DeviceGuard guard(default_ctx->GetPlace()); + phi::stream::Stream stream(default_ctx->GetPlace(), default_ctx->stream()); + stream.WaitEvent(control_events_[i].GetCustomEvent()); + } +} + +bool ProcessGroupCustom::CustomTask::IsCompleted() { + for (size_t i = 0; i < places_.size(); ++i) { + if (!control_events_[i].Query()) { + return false; + } + } + + return true; +} + +bool ProcessGroupCustom::CustomTask::Wait(std::chrono::milliseconds timeout) { + SynchronizeStreams(); + while (!IsCompleted()) { + std::this_thread::sleep_for(std::chrono::milliseconds(kWaitBlockTImeout)); + } + return true; +} + +// Same as Wait +void ProcessGroupCustom::CustomTask::Synchronize() { Wait(kWaitTimeout); } + +ProcessGroupCustom::ProcessGroupCustom(const std::shared_ptr& store, + int rank, + int size, + const platform::Place& place, + int gid) + : ProcessGroup(rank, size, place, gid), + store_(store), + device_type_(place.GetDeviceType()) { + phi::DeviceManager::SetDevice(place_); +} + +void ProcessGroupCustom::BroadcastUniqueCustomID( + std::vector& ccl_ids) { // NOLINT + if (rank_ == 0) { + for (size_t i = 0; i < ccl_ids.size(); i++) { + auto key = "ProcessGroupCustom/ccl_ids/" + std::to_string(i); + store_->set(key, ccl_ids[i]); + } + } else { + for (size_t i = 0; i < ccl_ids.size(); i++) { + auto key = "ProcessGroupCustom/ccl_ids/" + std::to_string(i); + ccl_ids[i] = store_->get(key); + } + } +} + +// create CustomCCLManager cache for places_key +void ProcessGroupCustom::CreateCustomManagerCache( + const std::string& places_key, const std::vector& places) { + PADDLE_ENFORCE_EQ(places_key.empty(), + false, + platform::errors::PreconditionNotMet( + "Not able to create/get the HCCL Communicator since " + "the NPU place are not known")); + const std::string device_type = places.back().GetDeviceType(); + + std::vector> ccl_comms; + ccl_comms.resize(places.size()); + + // using vector just for broadcast + std::vector ccl_ids; + ccl_ids.resize(1); + auto& ccl_id = ccl_ids.front(); + + if (rank_ == 0) { + phi::DeviceManager::CCLGetUniqueId(device_type, &ccl_id); + } + BroadcastUniqueCustomID(ccl_ids); + + VLOG(3) << "init custom ccl rank: " << rank_ << ", nranks: " << size_ + << ", place: " << places_key + << ", custom ccl uniqueid: " << SerializeCustomCCLUniqueId(ccl_id); + + std::vector> dev_ctx; + dev_ctx.resize(places.size()); + + std::unique_ptr comms( + new phi::ccl::CCLComm[places.size()]); + for (size_t i = 0; i < places.size(); ++i) { + phi::DeviceGuard guard(places[i]); + ccl_comms[i] = CustomCCLCommManager::Create( + device_type, GetSize(), GetRank(), &ccl_id, comms.get() + i); + dev_ctx[i].reset(new CustomDeviceContext(places[i])); + } + + std::vector events; + events.resize(places.size()); + + // These caches will be useful to process sync/wait/communicate + places_to_events_.emplace(places_key, std::move(events)); + places_to_customcomm_.emplace(places_key, std::move(ccl_comms)); + places_to_ctx_.emplace(places_key, std::move(dev_ctx)); +} + +template +std::shared_ptr ProcessGroupCustom::Collective( + std::vector& inputs, + std::vector& outputs, + Fn fn, + CommType op_type) { + const auto places = GetPlaceList(inputs); + const auto key = GetKeyFromPlaces(places); + + { + std::lock_guard lock(mutex_); + if (places_to_customcomm_.find(key) == places_to_customcomm_.end()) { + CreateCustomManagerCache(key, places); + } + } + + auto& ccl_comms = places_to_customcomm_[key]; + SyncDefaultStream(places, places_to_events_[key], places_to_ctx_[key]); + auto task = CreateTask(places, rank_, op_type, inputs); + task->SetOutputs(outputs); + + for (size_t i = 0; i < inputs.size(); ++i) { + phi::DeviceGuard guard(places[i]); + const auto& ccl_stream = places_to_ctx_[key][i]->stream(); + phi::stream::Stream stream(places[i], ccl_stream); + fn(inputs[i], outputs[i], ccl_comms[i]->GetCustomCCLComm(), stream); + } + + for (size_t i = 0; i < inputs.size(); ++i) { + phi::DeviceGuard guard(places[i]); + task->control_events_[i].Record(*places_to_ctx_[key][i]); + } + return task; +} + +std::shared_ptr ProcessGroupCustom::AllReduce( + std::vector& in_tensors, // NOLINT + std::vector& out_tensors, // NOLINT + const AllreduceOptions& opts) { + return Collective( + in_tensors, + out_tensors, + [&](phi::DenseTensor& input, + phi::DenseTensor& output, + phi::ccl::CCLComm comm, + const phi::stream::Stream& stream) { + return phi::DeviceManager::CCLAllReduce( + device_type_, + input.data(), + output.data(), + input.numel(), + phi::ccl::ToCCLDataType(input.dtype()), + ToCustomCCLRedType(opts.reduce_op), + comm, + stream); + }, + CommType::ALLREDUCE); +} + +std::shared_ptr ProcessGroupCustom::Broadcast( + std::vector& in_tensors, // NOLINT + std::vector& out_tensors, // NOLINT + const BroadcastOptions& opts) { + return Collective( + in_tensors, + out_tensors, + [&](phi::DenseTensor& input, + phi::DenseTensor& output, + phi::ccl::CCLComm comm, + const phi::stream::Stream& stream) { + int root = opts.source_rank * in_tensors.size() + opts.source_root; + if (rank_ == root) { + return phi::DeviceManager::CCLBroadcast( + device_type_, + input.data(), + input.numel(), + phi::ccl::ToCCLDataType(input.dtype()), + root, + comm, + stream); + } else { + return phi::DeviceManager::CCLBroadcast( + device_type_, + output.data(), + output.numel(), + phi::ccl::ToCCLDataType(output.dtype()), + root, + comm, + stream); + } + }, + CommType::BROADCAST); +} + +std::shared_ptr ProcessGroupCustom::Barrier( + const BarrierOptions& opts) { + // Only support single card single process + std::vector places = {place_}; + std::vector barrierTensors; + barrierTensors.reserve(places.size()); + + for (auto& place : places) { + phi::DeviceGuard guard(place); + auto dt = full({1}, 0, phi::DataType::FLOAT32, place); + barrierTensors.push_back( + *std::dynamic_pointer_cast(dt.impl())); + } + auto task = ProcessGroupCustom::AllReduce(barrierTensors, barrierTensors); + auto xccl_task = dynamic_cast(task.get()); + xccl_task->barrierTensors_ = std::move(barrierTensors); + return task; +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/collective/ProcessGroupCustom.h b/paddle/fluid/distributed/collective/ProcessGroupCustom.h new file mode 100644 index 00000000000..fca6f127c38 --- /dev/null +++ b/paddle/fluid/distributed/collective/ProcessGroupCustom.h @@ -0,0 +1,129 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paddle/fluid/distributed/collective/CustomCCLTools.h" +#include "paddle/fluid/distributed/collective/ProcessGroup.h" +#include "paddle/fluid/distributed/store/store.h" +#include "paddle/fluid/platform/device/npu/npu_stream.h" +#include "paddle/fluid/platform/device_context.h" +#include "paddle/fluid/platform/enforce.h" +#include "paddle/fluid/platform/gen_comm_id_helper.h" +#include "paddle/fluid/platform/place.h" + +namespace paddle { +namespace distributed { +using Place = paddle::platform::Place; +using CustomDeviceContext = paddle::platform::CustomDeviceContext; +class ProcessGroupCustom : public ProcessGroup { + public: + class CustomTask : public ProcessGroup::Task, + public std::enable_shared_from_this { + public: + CustomTask(const std::vector& places, + int rank, + CommType CommType, + const std::vector& inputs); + + bool IsCompleted(); + void SynchronizeStreams(); + bool Wait(std::chrono::milliseconds timeout = kWaitTimeout); + void Synchronize(); + void SetOutputs(std::vector& outputs); // NOLINT + virtual ~CustomTask(); + + std::vector control_events_; + std::vector barrierTensors_; + + protected: + std::vector places_; + std::vector> cclComms_; + std::shared_ptr> outputs_; + + private: + const std::string device_type_; + }; + + ProcessGroupCustom(const std::shared_ptr& store, + int rank, + int size, + const platform::Place& place, + int gid); + + const std::string GetBackendName() const override { + return "XCCL_" + device_type_; + } + + std::shared_ptr AllReduce( + std::vector& in_tensors, + std::vector& out_tensors, + const AllreduceOptions& = AllreduceOptions()) override; + + std::shared_ptr Broadcast( + std::vector& in_tensors, + std::vector& out_tensors, + const BroadcastOptions& = BroadcastOptions()) override; + + std::shared_ptr Barrier( + const BarrierOptions& = BarrierOptions()) override; + + protected: + virtual std::shared_ptr CreateTask( + std::vector places, + int rank, + CommType opType, + const std::vector& inputs); + + std::shared_ptr store_; + std::shared_ptr custom_comm_; + std::mutex mutex_; + std::unordered_map>> + places_to_customcomm_; + std::unordered_map> + places_to_events_; + std::unordered_map>> + places_to_ctx_; + std::set used_place_ids_; + + private: + void BcastCustomId(std::vector& ccl_ids, + int root, // NOLINT + int server_fd); + + void BroadcastUniqueCustomID( + std::vector& custom_ccl_ids); // NOLINT + + template + std::shared_ptr Collective( + std::vector& inputs, // NOLINT + std::vector& outputs, // NOLINT + Fn fn, + CommType op_type); + + void CreateCustomManagerCache(const std::string& places_key, + const std::vector& places); + const std::string device_type_; +}; +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/collective/ProcessGroupHeter.cc b/paddle/fluid/distributed/collective/ProcessGroupHeter.cc index 2963d65f261..aba40d4687b 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupHeter.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupHeter.cc @@ -73,9 +73,15 @@ ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr& store, src_rank_(src_rank), dst_rank_(dst_rank) { return; +#ifdef PADDLE_WITH_CUSTOM + if (paddle::platform::is_custom_place(place_)) { + inner_pg_ = std::make_shared( + store, local_rank, local_size, place_, IGNORE_ID); + } else { +#endif #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) - inner_pg_ = std::make_shared( - store, local_rank, local_size, place_, IGNORE_ID); + inner_pg_ = std::make_shared( + store, local_rank, local_size, place_, IGNORE_ID); #elif defined(PADDLE_WITH_ASCEND_CL) inner_pg_ = std::make_shared( store, local_rank, local_size, place_, IGNORE_ID); @@ -83,6 +89,10 @@ ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr& store, PADDLE_THROW(platform::errors::Unavailable( "ProcessGroupHeter only supports NCCL, RCCL and HCCL now.")); #endif +#ifdef PADDLE_WITH_CUSTOM + } +#endif + if (local_rank_ == 0 && !with_switch_) { auto opts = ProcessGroupGloo::GlooOptions::create(); opts->device = ProcessGroupGloo::createDefaultDevice(); diff --git a/paddle/fluid/distributed/collective/ProcessGroupHeter.h b/paddle/fluid/distributed/collective/ProcessGroupHeter.h index 275968696b3..12a568f5ac5 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupHeter.h +++ b/paddle/fluid/distributed/collective/ProcessGroupHeter.h @@ -45,6 +45,11 @@ #include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h" #endif +#if defined(PADDLE_WITH_CUSTOM_DEVICE) +#include "paddle/fluid/distributed/collective/CustomCCLTools.h" +#include "paddle/fluid/distributed/collective/ProcessGroupCustom.h" +#endif + #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ (defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \ defined(PADDLE_WITH_ASCEND_CL)) diff --git a/paddle/fluid/distributed/collective/reducer.cc b/paddle/fluid/distributed/collective/reducer.cc index 8f4466f7baa..a6cd3050402 100644 --- a/paddle/fluid/distributed/collective/reducer.cc +++ b/paddle/fluid/distributed/collective/reducer.cc @@ -13,6 +13,8 @@ // limitations under the License. #include "paddle/fluid/distributed/collective/reducer.h" +#include "paddle/phi/backends/device_guard.h" +#include "paddle/phi/backends/device_manager.h" namespace paddle { namespace distributed { @@ -147,41 +149,90 @@ std::vector> Eager_AssignGroupBySize( } template -static void ConcatTensorsForAllReduce( - const DeviceContext &context, - const std::vector &dense_tensors_, - Tensor *p_dense_contents) { - operators::math::ConcatFunctor concat_functor_; - concat_functor_( - context, - dense_tensors_, - 0, - std::dynamic_pointer_cast(p_dense_contents->impl()) - .get()); -} +struct ConcatTensorsForAllReduce { + void operator()(const DeviceContext &context, + const std::vector &dense_tensors_, + Tensor *p_dense_contents) { + operators::math::ConcatFunctor concat_functor_; + concat_functor_( + context, + dense_tensors_, + 0, + std::dynamic_pointer_cast(p_dense_contents->impl()) + .get()); + } +}; template -static void SplitTensorsForAllReduce( - const DeviceContext &context, - Tensor *p_dense_contents, - std::vector *p_dense_tensors) { - auto *in = - std::dynamic_pointer_cast(p_dense_contents->impl()) - .get(); - std::vector outs; - std::vector shape_refer; +struct SplitTensorsForAllReduce { + void operator()(const DeviceContext &context, + Tensor *p_dense_contents, + std::vector *p_dense_tensors) { + auto *in = + std::dynamic_pointer_cast(p_dense_contents->impl()) + .get(); + std::vector outs; + std::vector shape_refer; - outs.reserve(p_dense_tensors->size()); - shape_refer.reserve(p_dense_tensors->size()); + outs.reserve(p_dense_tensors->size()); + shape_refer.reserve(p_dense_tensors->size()); - for (auto &tensor : *p_dense_tensors) { - outs.emplace_back(&tensor); - shape_refer.emplace_back(&tensor); - } + for (auto &tensor : *p_dense_tensors) { + outs.emplace_back(&tensor); + shape_refer.emplace_back(&tensor); + } - operators::math::SplitFunctor split_functor_; - split_functor_(context, *in, shape_refer, 0, &outs); -} + operators::math::SplitFunctor split_functor_; + split_functor_(context, *in, shape_refer, 0, &outs); + } +}; + +#ifdef PADDLE_WITH_CUSTOM_DEVICE +// note(wangran16): A temporary solution for all backends. +template +struct ConcatTensorsForAllReduce { + void operator()(const platform::CustomDeviceContext &context, + const std::vector &dense_tensors_, + Tensor *p_dense_contents) { + phi::DeviceGuard guard(context.GetPlace()); + auto *out = + std::dynamic_pointer_cast(p_dense_contents->impl()) + .get(); + uint8_t *out_data = reinterpret_cast(out->data()); + auto *device = phi::DeviceManager::GetDeviceWithPlace(context.GetPlace()); + + size_t offset = 0; + for (const auto &tensor : dense_tensors_) { + const uint8_t *in_data = + reinterpret_cast(tensor.data()); + auto sz = tensor.numel() * sizeof(T); + device->MemoryCopyD2D(out_data + offset, in_data, sz, nullptr); + offset += sz; + } + } +}; + +template +struct SplitTensorsForAllReduce { + void operator()(const platform::CustomDeviceContext &context, + Tensor *p_dense_contents, + std::vector *p_dense_tensors) { + auto *in = + std::dynamic_pointer_cast(p_dense_contents->impl()) + .get(); + uint8_t *in_data = reinterpret_cast(in->data()); + auto *device = phi::DeviceManager::GetDeviceWithPlace(context.GetPlace()); + + size_t offset = 0; + for (auto &tensor : *p_dense_tensors) { + uint8_t *out_data = reinterpret_cast(tensor.data()); + auto sz = tensor.numel() * sizeof(T); + device->MemoryCopyD2D(out_data, in_data + offset, sz, nullptr); + offset += sz; + } + } +}; +#endif // context is used to select the stream for concat template @@ -192,15 +243,15 @@ static void ConcatTensorsWithType( phi::DataType type) { switch (type) { case phi::DataType::FLOAT16: - ConcatTensorsForAllReduce( + ConcatTensorsForAllReduce()( context, dense_tensors_, p_dense_contents); break; case phi::DataType::FLOAT32: - ConcatTensorsForAllReduce( + ConcatTensorsForAllReduce()( context, dense_tensors_, p_dense_contents); break; case phi::DataType::FLOAT64: - ConcatTensorsForAllReduce( + ConcatTensorsForAllReduce()( context, dense_tensors_, p_dense_contents); break; default: @@ -219,15 +270,15 @@ static void SplitTensorsWithType(const DeviceContext &context, phi::DataType type) { switch (type) { case phi::DataType::FLOAT16: - SplitTensorsForAllReduce( + SplitTensorsForAllReduce()( context, p_dense_contents, p_dense_tensors); break; case phi::DataType::FLOAT32: - SplitTensorsForAllReduce( + SplitTensorsForAllReduce()( context, p_dense_contents, p_dense_tensors); break; case phi::DataType::FLOAT64: - SplitTensorsForAllReduce( + SplitTensorsForAllReduce()( context, p_dense_contents, p_dense_tensors); break; default: @@ -249,6 +300,18 @@ void EagerGroup::ConcatTensors(const platform::Place &place) { PADDLE_THROW(platform::errors::PermissionDenied( "Paddle can't concat grad tensors since it's not compiled with NCCL," "Please recompile or reinstall Paddle with NCCL support.")); +#endif + } else if (platform::is_custom_place(place)) { +#ifdef PADDLE_WITH_CUSTOM_DEVICE + auto *default_ctx = static_cast( + platform::DeviceContextPool::Instance().Get(place)); + ConcatTensorsWithType( + *default_ctx, dense_tensors_, &dense_contents_, dtype_); +#else + PADDLE_THROW(platform::errors::PermissionDenied( + "Paddle can't concat grad tensors since it's not compiled with " + "CUSTOM_DEVICE," + "Please recompile or reinstall Paddle with CUSTOM_DEVICE support.")); #endif } else if (platform::is_cpu_place(place)) { auto *default_ctx = static_cast( @@ -272,6 +335,18 @@ void EagerGroup::SplitTensors(const platform::Place &place) { PADDLE_THROW(platform::errors::PermissionDenied( "Paddle can't split grad tensor since it's not compiled with NCCL," "Please recompile or reinstall Paddle with NCCL support.")); +#endif + } else if (platform::is_custom_place(place)) { +#ifdef PADDLE_WITH_CUSTOM_DEVICE + auto *default_ctx = static_cast( + platform::DeviceContextPool::Instance().Get(place)); + SplitTensorsWithType( + *default_ctx, &dense_contents_, &dense_tensors_, dtype_); +#else + PADDLE_THROW(platform::errors::PermissionDenied( + "Paddle can't split grad tensor since it's not compiled with " + "CUSTOM_DEVICE," + "Please recompile or reinstall Paddle with CUSTOM_DEVICE support.")); #endif } else if (platform::is_cpu_place(place)) { auto *default_ctx = static_cast( @@ -889,6 +964,16 @@ void EagerReducer::AllReduceSparse(EagerGroup *group, PADDLE_THROW(platform::errors::PermissionDenied( "Paddle can't concat grad tensors since it's not compiled with NCCL," "Please recompile or reinstall Paddle with NCCL support.")); +#endif + } else if (platform::is_custom_place(inner_place_)) { +#ifdef PADDLE_WITH_CUSTOM_DEVICE + dev_ctx = static_cast( + platform::DeviceContextPool::Instance().Get(inner_place_)); +#else + PADDLE_THROW(platform::errors::PermissionDenied( + "Paddle can't concat grad tensors since it's not compiled with " + "CUSTOM_DEVICE," + "Please recompile or reinstall Paddle with CUSTOM_DEVICE support.")); #endif } else if (platform::is_cpu_place(inner_place_)) { dev_ctx = static_cast( diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index 0803e268d05..e198287abce 100755 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -155,6 +155,9 @@ if(NOT ON_INFER) set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_heter) endif() endif() + if(WITH_CUSTOM_DEVICE) + set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_custom) + endif() set(PYBIND_SRCS ${PYBIND_SRCS} distributed_py.cc) endif() diff --git a/paddle/fluid/pybind/distributed_py.cc b/paddle/fluid/pybind/distributed_py.cc index 1146f650d8d..1b325dcf378 100644 --- a/paddle/fluid/pybind/distributed_py.cc +++ b/paddle/fluid/pybind/distributed_py.cc @@ -39,6 +39,10 @@ limitations under the License. */ #include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h" #endif +#if defined(PADDLE_WITH_CUSTOM_DEVICE) +#include "paddle/fluid/distributed/collective/ProcessGroupCustom.h" +#endif + #if defined(PADDLE_WITH_GLOO) && defined(PADDLE_WITH_PSCORE) && \ (defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_ASCEND_CL)) #include "paddle/fluid/distributed/collective/ProcessGroupHeter.h" @@ -458,6 +462,24 @@ void BindDistributed(py::module *m) { py::arg("group_id") = 0, py::call_guard()); +#endif + +#if defined(PADDLE_WITH_CUSTOM_DEVICE) + py::class_>( + *m, "ProcessGroupCustom", ProcessGroup) + .def(py::init &, + int, + int, + const platform::CustomPlace &, + int>(), + py::arg("store"), + py::arg("rank"), + py::arg("world_size"), + py::arg("place"), + py::arg("group_id") = 0, + py::call_guard()); + #endif py::class_ 0: + if self.ctx.node.device.dtype == DeviceType.CUSTOM_DEVICE: + e.update(self.ctx.node.device.get_custom_device_envs()) if self.pod.replicas == 1: e.update({selected_dev_key: ",".join(selected_dev_list)}) else: diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index e95b771fe6f..cb8baa220c7 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -72,10 +72,10 @@ def _start_kv_server(port, http_server_d, size): def _is_cpuonly(backend): check_backend(backend) - if backend in [ - 'auto', 'nccl', 'bkcl', 'hccl', 'heter', 'cncl' - ] and (core.is_compiled_with_cuda() or core.is_compiled_with_xpu() - or core.is_compiled_with_npu() or core.is_compiled_with_mlu()): + if (backend in ['auto', 'nccl', 'bkcl', 'hccl', 'heter', 'cncl'] and + (core.is_compiled_with_cuda() or core.is_compiled_with_xpu() + or core.is_compiled_with_npu() + or core.is_compiled_with_mlu())) or backend is 'xccl': # passes 'auto' and can use cuda or xpu, use the default logics. so return False return False @@ -172,18 +172,23 @@ def init_parallel_env(): raise NotImplementedError( "If you want to use CPU-only version, please use 'gloo' as backend") - if not is_cpu_only and core.is_compiled_with_cuda(): - _check_var_exists("FLAGS_selected_gpus") - backend = "nccl" if backend == "auto" else backend - elif not is_cpu_only and core.is_compiled_with_xpu(): - _check_var_exists('FLAGS_selected_xpus') - backend = "bkcl" if backend == "auto" else backend - elif not is_cpu_only and core.is_compiled_with_npu(): - _check_var_exists('FLAGS_selected_npus') - backend = "hccl" if backend == "auto" else backend - elif not is_cpu_only and core.is_compiled_with_mlu(): - _check_var_exists('FLAGS_selected_mlus') - backend = "cncl" if backend == "auto" else backend + if backend == "xccl": + FLAGS_selected_custom_devices = 'FLAGS_selected_{}s'.format( + parallel_env.device_type) + _check_var_exists(FLAGS_selected_custom_devices) + else: + if not is_cpu_only and core.is_compiled_with_cuda(): + _check_var_exists("FLAGS_selected_gpus") + backend = "nccl" if backend == "auto" else backend + elif not is_cpu_only and core.is_compiled_with_xpu(): + _check_var_exists('FLAGS_selected_xpus') + backend = "bkcl" if backend == "auto" else backend + elif not is_cpu_only and core.is_compiled_with_npu(): + _check_var_exists('FLAGS_selected_npus') + backend = "hccl" if backend == "auto" else backend + elif not is_cpu_only and core.is_compiled_with_mlu(): + _check_var_exists('FLAGS_selected_mlus') + backend = "cncl" if backend == "auto" else backend _check_var_exists("PADDLE_TRAINER_ID") _check_var_exists("PADDLE_CURRENT_ENDPOINT") @@ -196,7 +201,10 @@ def init_parallel_env(): # directly, if they want to switch default place, # they need to call a function to change default place, # here just set correctly place to users - if is_cpu_only: + if backend == "xccl": + place = core.CustomPlace(parallel_env.device_type, + parallel_env.device_id) + elif is_cpu_only: place = core.CPUPlace() elif core.is_compiled_with_cuda(): place = core.CUDAPlace(parallel_env.device_id) diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py index 09036ed942d..52b25debaca 100644 --- a/python/paddle/fluid/dygraph/parallel.py +++ b/python/paddle/fluid/dygraph/parallel.py @@ -118,20 +118,28 @@ class ParallelEnv(object): def __init__(self): self._rank = int(os.getenv("PADDLE_TRAINER_ID", "0")) self._world_size = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) + self._device_type = str(os.getenv("PADDLE_XCCL_BACKEND", "")) # imperative only support one gpu or xpu - if core.is_compiled_with_cuda(): - selected_gpus = os.getenv("FLAGS_selected_gpus", "0").split(",") - self._device_id = int(selected_gpus[0]) - elif core.is_compiled_with_xpu(): - selected_xpus = os.getenv("FLAGS_selected_xpus", "0").split(",") - self._device_id = int(selected_xpus[0]) - elif core.is_compiled_with_npu(): - selected_npus = os.getenv("FLAGS_selected_npus", "0").split(",") - self._device_id = int(selected_npus[0]) - elif core.is_compiled_with_mlu(): - selected_mlus = os.getenv("FLAGS_selected_mlus", "0").split(",") - self._device_id = int(selected_mlus[0]) + if self._device_type != "": + FLAGS_selected_custom_devices = 'FLAGS_selected_{}s'.format( + self._device_type) + selected_custom_devices = os.getenv(FLAGS_selected_custom_devices, + "0").split(",") + self._device_id = int(selected_custom_devices[0]) + else: + if core.is_compiled_with_cuda(): + selected_gpus = os.getenv("FLAGS_selected_gpus", "0").split(",") + self._device_id = int(selected_gpus[0]) + elif core.is_compiled_with_xpu(): + selected_xpus = os.getenv("FLAGS_selected_xpus", "0").split(",") + self._device_id = int(selected_xpus[0]) + elif core.is_compiled_with_npu(): + selected_npus = os.getenv("FLAGS_selected_npus", "0").split(",") + self._device_id = int(selected_npus[0]) + elif core.is_compiled_with_mlu(): + selected_mlus = os.getenv("FLAGS_selected_mlus", "0").split(",") + self._device_id = int(selected_mlus[0]) self._trainer_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS", "").split(",") @@ -199,6 +207,16 @@ class ParallelEnv(object): """ return self._device_id + @property + def device_type(self): + """ + The type of custom device for parallel training. + + Its value is equal to the value of the environment variable ``PADDLE_XCCL_BACKEND`` . The default value is None. + + """ + return self._device_type + @property def current_endpoint(self): """ diff --git a/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt b/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt index 04f01714b37..8056f468958 100644 --- a/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt +++ b/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt @@ -5,8 +5,16 @@ if(WITH_CUSTOM_DEVICE) "test_*.py") string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") - list(REMOVE_ITEM TEST_OPS "test_collective_process_group_xccl") + list(REMOVE_ITEM TEST_OPS test_collective_process_group_xccl) foreach(TEST_OP ${TEST_OPS}) py_test(${TEST_OP} SRCS ${TEST_OP}.py) endforeach() + + bash_test_modules( + test_fleet_launch_custom_device START_BASH + test_fleet_launch_custom_device.sh ENVS + PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) + + set_tests_properties(test_custom_cpu_plugin PROPERTIES TIMEOUT 120) + set_tests_properties(test_fleet_launch_custom_device PROPERTIES TIMEOUT 120) endif() diff --git a/python/paddle/fluid/tests/custom_runtime/test_fleet_launch_custom_device.sh b/python/paddle/fluid/tests/custom_runtime/test_fleet_launch_custom_device.sh index 3afb1979905..68023430172 100644 --- a/python/paddle/fluid/tests/custom_runtime/test_fleet_launch_custom_device.sh +++ b/python/paddle/fluid/tests/custom_runtime/test_fleet_launch_custom_device.sh @@ -16,13 +16,13 @@ set -e - - rm -rf PaddleCustomDevice && git clone https://github.com/PaddlePaddle/PaddleCustomDevice.git && pushd PaddleCustomDevice/backends/custom_cpu && mkdir build && pushd build && cmake .. && make -j8 && popd && popd echo "begin test use custom_cpu" export FLAGS_selected_custom_cpus=0,1 +export CUSTOM_CPU_VISIBLE_DEVICES=0,1 +export CUSTOM_DEVICE_ROOT=PaddleCustomDevice/backends/custom_cpu/build -distributed_args="--ips=127.0.0.1 --backend=xccl --custom_device_type=custom_cpu --custom_devices=0,1 --run_mode=collective --log_dir=testlog" +distributed_args="--devices=0,1" python -m paddle.distributed.fleet.launch ${distributed_args} custom_device_multi_process_collective.py fleetlaunch_custom_cpu -- GitLab