From 9191e7439fa3075c944cc11a39103522e74ca2a5 Mon Sep 17 00:00:00 2001 From: LiYuRio <63526175+LiYuRio@users.noreply.github.com> Date: Mon, 14 Nov 2022 15:59:52 +0800 Subject: [PATCH] remove heter and hccl (#47918) --- .../distributed/collective/CMakeLists.txt | 44 -- .../collective/ProcessGroupHCCL.cc | 276 ------------ .../distributed/collective/ProcessGroupHCCL.h | 131 ------ .../collective/ProcessGroupHeter.cc | 393 ------------------ .../collective/ProcessGroupHeter.h | 140 ------- paddle/fluid/pybind/CMakeLists.txt | 9 - paddle/fluid/pybind/distributed_py.cc | 65 --- python/paddle/distributed/collective.py | 53 +-- .../tests/unittests/npu/process_group_hccl.py | 253 ----------- .../npu/test_collective_process_group_hccl.py | 28 -- 10 files changed, 1 insertion(+), 1391 deletions(-) delete mode 100644 paddle/fluid/distributed/collective/ProcessGroupHCCL.cc delete mode 100644 paddle/fluid/distributed/collective/ProcessGroupHCCL.h delete mode 100644 paddle/fluid/distributed/collective/ProcessGroupHeter.cc delete mode 100644 paddle/fluid/distributed/collective/ProcessGroupHeter.h delete mode 100644 python/paddle/fluid/tests/unittests/npu/process_group_hccl.py delete mode 100644 python/paddle/fluid/tests/unittests/npu/test_collective_process_group_hccl.py diff --git a/paddle/fluid/distributed/collective/CMakeLists.txt b/paddle/fluid/distributed/collective/CMakeLists.txt index b57808d32a5..e5dc51c63f0 100644 --- a/paddle/fluid/distributed/collective/CMakeLists.txt +++ b/paddle/fluid/distributed/collective/CMakeLists.txt @@ -30,18 +30,6 @@ if(WITH_NCCL OR WITH_RCCL) device_context ${DEVICE_EVENT_LIBS} dense_tensor) - if(WITH_DISTRIBUTE AND WITH_PSCORE) - if(CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0) - set(DISTRIBUTE_COMPILE_FLAGS "${DISTRIBUTE_COMPILE_FLAGS} -faligned-new") - set_source_files_properties( - ProcessGroupHeter.cc PROPERTIES COMPILE_FLAGS - ${DISTRIBUTE_COMPILE_FLAGS}) - endif() - cc_library( - processgroup_heter - SRCS ProcessGroupHeter.cc NCCLTools.cc Common.cc - DEPS place enforce collective_helper device_context phi_api eager_api) - endif() endif() if(WITH_XPU_BKCL) @@ -59,38 +47,6 @@ if(WITH_MPI) DEPS collective_helper device_context) endif() -if(WITH_ASCEND_CL) - cc_library( - processgroup_hccl - SRCS ProcessGroupHCCL.cc HCCLTools.cc Common.cc - DEPS place - npu_stream - enforce - collective_helper - device_context - phi_api - eager_api) - if(WITH_DISTRIBUTE AND WITH_PSCORE) - if(CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0) - set(DISTRIBUTE_COMPILE_FLAGS "${DISTRIBUTE_COMPILE_FLAGS} -faligned-new") - set_source_files_properties( - ProcessGroupHeter.cc PROPERTIES COMPILE_FLAGS - ${DISTRIBUTE_COMPILE_FLAGS}) - endif() - - cc_library( - processgroup_heter - SRCS ProcessGroupHeter.cc HCCLTools.cc Common.cc - DEPS place - npu_stream - enforce - collective_helper - device_context - phi_api - eager_api) - endif() -endif() - if(WITH_CUSTOM_DEVICE) cc_library( processgroup_custom diff --git a/paddle/fluid/distributed/collective/ProcessGroupHCCL.cc b/paddle/fluid/distributed/collective/ProcessGroupHCCL.cc deleted file mode 100644 index 718b33903af..00000000000 --- a/paddle/fluid/distributed/collective/ProcessGroupHCCL.cc +++ /dev/null @@ -1,276 +0,0 @@ -// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h" - -#include "paddle/fluid/distributed/collective/Common.h" -#include "paddle/fluid/distributed/collective/HCCLTools.h" -#include "paddle/fluid/memory/malloc.h" -#include "paddle/fluid/platform/device/npu/hccl_helper.h" -#include "paddle/fluid/platform/device/npu/npu_info.h" -#include "paddle/fluid/platform/device_context.h" -#include "paddle/fluid/platform/place.h" -#include "paddle/phi/api/include/api.h" -#include "paddle/phi/common/place.h" - -DECLARE_bool(hccl_blocking_wait); -// DECLARE_bool(use_stream_safe_npu_allocator); - -constexpr int64_t kWaitBlockTImeout = 10; - -namespace paddle { -namespace distributed { - -void SyncDefaultStream( - const std::vector& places, - std::vector& hcclEvents, // NOLINT - std::vector>& dev_ctx) { // NOLINT - for (size_t i = 0; i < places.size(); ++i) { - auto* default_ctx = static_cast( - platform::DeviceContextPool::Instance().Get(places[i])); - hcclEvents[i].Record(*dev_ctx[i]); - hcclEvents[i].Block(*default_ctx); - } -} - -std::shared_ptr ProcessGroupHCCL::CreateTask( - std::vector places, - int rank, - CommType comm_type, - const std::vector& inputs) { - return std::make_shared( - places, rank, comm_type, inputs); -} - -ProcessGroupHCCL::HCCLTask::HCCLTask( - const std::vector& places, - int rank, - CommType CommType, - const std::vector& inputs) - : Task(rank, inputs, CommType), places_(places) { - control_events_.resize(places.size()); - hcclComms_.resize(places.size()); -} - -ProcessGroupHCCL::HCCLTask::~HCCLTask() {} - -void ProcessGroupHCCL::HCCLTask::SetOutputs( - std::vector& outputs) { // NOLINT - outputs_ = std::make_shared>(outputs); -} - -void ProcessGroupHCCL::HCCLTask::SynchronizeStreams() { - for (size_t i = 0; i < places_.size(); ++i) { - auto* default_ctx = static_cast( - platform::DeviceContextPool::Instance().Get(places_[i])); - platform::NPUStreamWaitEvent(default_ctx->stream(), - control_events_[i].GetRawNPUEvent()); - } -} - -bool ProcessGroupHCCL::HCCLTask::IsCompleted() { - for (size_t i = 0; i < places_.size(); ++i) { - if (!control_events_[i].Query()) { - return false; - } - } - - return true; -} - -// TODO(sandyhouse): Add timeout for wait, now timeout unused -bool ProcessGroupHCCL::HCCLTask::Wait(std::chrono::milliseconds timeout) { - SynchronizeStreams(); - // NOTE(sandyhouse): It will block host for sync - while (!IsCompleted()) { - std::this_thread::sleep_for(std::chrono::milliseconds(kWaitBlockTImeout)); - } - return true; -} - -// Same as Wait -void ProcessGroupHCCL::HCCLTask::Synchronize() { Wait(kWaitTimeout); } - -ProcessGroupHCCL::ProcessGroupHCCL(const std::shared_ptr& store, - int rank, - int size, - const platform::Place& place, - int gid) - : ProcessGroup(rank, size, place, gid), store_(store) { - platform::SetNPUDeviceId(place_.device); -} - -void ProcessGroupHCCL::BroadcastUniqueHCCLID( - std::vector& hccl_ids) { // NOLINT - if (rank_ == 0) { - for (size_t i = 0; i < hccl_ids.size(); i++) { - auto key = "ProcessGroupHCCL/hccl_ids/" + std::to_string(i); - auto hccl_id = std::vector( - reinterpret_cast(&hccl_ids[i]), - reinterpret_cast(&hccl_ids[i]) + sizeof(HcclRootInfo)); - store_->set(key, hccl_id); - } - } else { - for (size_t i = 0; i < hccl_ids.size(); i++) { - auto key = "ProcessGroupHCCL/hccl_ids/" + std::to_string(i); - auto ret = store_->get(key); - std::memcpy(&hccl_ids[i], ret.data(), ret.size()); - } - } -} - -// create HCCLManager cache for places_key -void ProcessGroupHCCL::CreateHCCLManagerCache( - const std::string& places_key, const std::vector& places) { - PADDLE_ENFORCE_EQ(places_key.empty(), - false, - platform::errors::PreconditionNotMet( - "Not able to create/get the HCCL Communicator since " - "the NPU place are not known")); - - std::vector> hccl_comms; - hccl_comms.resize(places.size()); - - // using vector just for broadcast - std::vector hccl_ids; - hccl_ids.resize(1); - auto& hccl_id = hccl_ids.front(); - - if (rank_ == 0) { - PADDLE_ENFORCE_NPU_SUCCESS(platform::dynload::HcclGetRootInfo(&hccl_id)); - } - BroadcastUniqueHCCLID(hccl_ids); - - VLOG(3) << "init hccl rank: " << rank_ << ", nranks: " << size_ - << ", place: " << places_key - << ", hccl uniqueid: " << SerializeHCCLUniqueId(hccl_id); - - std::vector> dev_ctx; - dev_ctx.resize(places.size()); - - std::unique_ptr comms(new HcclComm[places.size()]); - for (size_t i = 0; i < places.size(); ++i) { - platform::NPUDeviceGuard guard(places[i].GetDeviceId()); - hccl_comms[i] = HCCLCommManager::Create( - GetSize(), GetRank(), &hccl_id, comms.get() + i); - dev_ctx[i].reset(new NPUDeviceContext(places[i])); - } - - std::vector events; - events.resize(places.size()); - - // These caches will be useful to process sync/wait/communicate - places_to_events_.emplace(places_key, std::move(events)); - places_to_hcclcomm_.emplace(places_key, std::move(hccl_comms)); - places_to_ctx_.emplace(places_key, std::move(dev_ctx)); -} - -template -std::shared_ptr ProcessGroupHCCL::Collective( - std::vector& inputs, - std::vector& outputs, - Fn fn, - CommType op_type) { - const auto places = GetPlaceList(inputs); - const auto key = GetKeyFromPlaces(places); - - { - std::lock_guard lock(mutex_); - if (places_to_hcclcomm_.find(key) == places_to_hcclcomm_.end()) { - CreateHCCLManagerCache(key, places); - } - } - - auto& hccl_comms = places_to_hcclcomm_[key]; - - SyncDefaultStream(places, places_to_events_[key], places_to_ctx_[key]); - - auto task = CreateTask(places, rank_, op_type, inputs); - - for (size_t i = 0; i < inputs.size(); ++i) { - platform::NPUDeviceGuard guard(places[i].GetDeviceId()); - const auto& hccl_stream = places_to_ctx_[key][i]->stream(); - fn(inputs[i], outputs[i], hccl_comms[i]->GetHcclComm(), hccl_stream); - } - - for (size_t i = 0; i < inputs.size(); ++i) { - platform::NPUDeviceGuard guard(places[i].GetDeviceId()); - task->control_events_[i].Record(*places_to_ctx_[key][i]); - } - return task; -} - -std::shared_ptr ProcessGroupHCCL::AllReduce( - std::vector& in_tensors, // NOLINT - std::vector& out_tensors, // NOLINT - const AllreduceOptions& opts) { - return Collective( - in_tensors, - out_tensors, - [&](phi::DenseTensor& input, - phi::DenseTensor& output, - HcclComm comm, - const aclrtStream& stream) { - return platform::dynload::HcclAllReduce( - input.data(), - output.data(), - input.numel(), - platform::ToHCCLDataType(input.dtype()), - ToHCCLRedType(opts.reduce_op), - comm, - stream); - }, - CommType::ALLREDUCE); -} - -std::shared_ptr ProcessGroupHCCL::Broadcast( - std::vector& in_tensors, // NOLINT - std::vector& out_tensors, // NOLINT - const BroadcastOptions& opts) { - // PADDLE_ENFORCE_EQ( - // CheckTensorsInNPUPlace(tensors), true, - // platform::errors::InvalidArgument("All inputs should be in - // CudaPlace.")); - - return Collective( - in_tensors, - out_tensors, - [&](phi::DenseTensor& input, - phi::DenseTensor& output, - HcclComm comm, - const aclrtStream& stream) { - int root = opts.source_rank * in_tensors.size() + opts.source_root; - if (rank_ == root) { - return platform::dynload::HcclBroadcast( - input.data(), - input.numel(), - platform::ToHCCLDataType(input.dtype()), - root, - comm, - stream); - } else { - return platform::dynload::HcclBroadcast( - output.data(), - output.numel(), - platform::ToHCCLDataType(output.dtype()), - root, - comm, - stream); - } - }, - CommType::BROADCAST); -} - -} // namespace distributed -} // namespace paddle diff --git a/paddle/fluid/distributed/collective/ProcessGroupHCCL.h b/paddle/fluid/distributed/collective/ProcessGroupHCCL.h deleted file mode 100644 index 06ef3089b10..00000000000 --- a/paddle/fluid/distributed/collective/ProcessGroupHCCL.h +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include -#include -#include - -#include "paddle/fluid/distributed/collective/HCCLTools.h" -#include "paddle/fluid/distributed/collective/ProcessGroup.h" -#include "paddle/fluid/distributed/store/store.h" -#include "paddle/fluid/platform/device/npu/npu_stream.h" -#include "paddle/fluid/platform/device_context.h" -#include "paddle/fluid/platform/enforce.h" -#include "paddle/fluid/platform/gen_comm_id_helper.h" -#include "paddle/fluid/platform/place.h" - -namespace paddle { -namespace distributed { - -using Place = paddle::platform::Place; -using NPUStream = platform::stream::NPUStream; -using NPUDeviceContext = paddle::platform::NPUDeviceContext; - -class ProcessGroupHCCL : public ProcessGroup { - public: - class HCCLTask : public ProcessGroup::Task, - public std::enable_shared_from_this { - public: - HCCLTask(const std::vector& places, - int rank, - CommType CommType, - const std::vector& inputs); - - bool IsCompleted(); - - void SynchronizeStreams(); - - bool Wait(std::chrono::milliseconds timeout = kWaitTimeout); - - void Synchronize(); - - void SetOutputs(std::vector& outputs); // NOLINT - - virtual ~HCCLTask(); - - std::vector control_events_; - - protected: - std::vector places_; - std::vector> hcclComms_; - std::shared_ptr> outputs_; - - private: - }; - - ProcessGroupHCCL(const std::shared_ptr& store, - int rank, - int size, - const platform::Place& place, - int gid); - - std::string GetBackendName() const override { return "HCCL"; } - - std::shared_ptr AllReduce( - std::vector& in_tensors, - std::vector& out_tensors, - const AllreduceOptions& = AllreduceOptions()) override; - - std::shared_ptr Broadcast( - std::vector& in_tensors, - std::vector& out_tensors, - const BroadcastOptions& = BroadcastOptions()) override; - - protected: - virtual std::shared_ptr CreateTask( - std::vector places, - int rank, - CommType opType, - const std::vector& inputs); - - std::shared_ptr store_; - std::shared_ptr hccl_comm_; - std::mutex mutex_; - std::unordered_map>> - places_to_hcclcomm_; - - std::unordered_map> - places_to_events_; - - std::unordered_map>> - places_to_ctx_; - - std::set used_place_ids_; - - private: - void BcastHCCLId(std::vector& hccl_ids, // NOLINT - int root, // NOLINT - int server_fd); - - void BroadcastUniqueHCCLID(std::vector& hccl_ids); // NOLINT - - template - std::shared_ptr Collective( - std::vector& inputs, // NOLINT - std::vector& outputs, // NOLINT - Fn fn, - CommType op_type); - - void CreateHCCLManagerCache(const std::string& places_key, - const std::vector& places); -}; - -} // namespace distributed -} // namespace paddle diff --git a/paddle/fluid/distributed/collective/ProcessGroupHeter.cc b/paddle/fluid/distributed/collective/ProcessGroupHeter.cc deleted file mode 100644 index aba40d4687b..00000000000 --- a/paddle/fluid/distributed/collective/ProcessGroupHeter.cc +++ /dev/null @@ -1,393 +0,0 @@ -// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/distributed/collective/ProcessGroupHeter.h" - -#include - -#include "paddle/fluid/platform/device/gpu/nccl_helper.h" -#include "paddle/fluid/platform/place.h" -#include "paddle/phi/api/include/api.h" -#include "paddle/phi/common/place.h" - -constexpr int64_t kWaitBlockTImeout = 10; - -namespace paddle { -namespace distributed { - -using Place = paddle::platform::Place; -int ProcessGroupHeter::send_count = 0; -int ProcessGroupHeter::recv_count = 0; - -std::shared_ptr ProcessGroupHeter::CreateTask( - int rank, CommType comm_type, const std::vector& inputs) { - return std::make_shared( - rank, comm_type, inputs); -} - -ProcessGroupHeter::HeterTask::HeterTask( - int rank, CommType CommType, const std::vector& inputs) - : Task(rank, inputs, CommType) {} - -ProcessGroupHeter::HeterTask::~HeterTask() {} - -bool ProcessGroupHeter::HeterTask::IsCompleted() { return true; } - -// TODO(sheniang03): Add timeout for wait, now timeout unused -bool ProcessGroupHeter::HeterTask::Wait(std::chrono::milliseconds timeout) { - return true; -} - -ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr& store, - int rank, - int size, - const platform::Place& place, - int gid, - int local_rank, - int local_size, - int gloo_rank, - int gloo_size, - bool with_switch, - std::string switch_endpoint, - int src_rank, - int dst_rank) - : ProcessGroup(rank, size, place, gid), - store_(store), - local_rank_(local_rank), - local_size_(local_size), - gloo_rank_(gloo_rank), - gloo_size_(gloo_size), - with_switch_(with_switch), - switch_endpoint_(switch_endpoint), - src_rank_(src_rank), - dst_rank_(dst_rank) { - return; -#ifdef PADDLE_WITH_CUSTOM - if (paddle::platform::is_custom_place(place_)) { - inner_pg_ = std::make_shared( - store, local_rank, local_size, place_, IGNORE_ID); - } else { -#endif -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) - inner_pg_ = std::make_shared( - store, local_rank, local_size, place_, IGNORE_ID); -#elif defined(PADDLE_WITH_ASCEND_CL) - inner_pg_ = std::make_shared( - store, local_rank, local_size, place_, IGNORE_ID); -#else - PADDLE_THROW(platform::errors::Unavailable( - "ProcessGroupHeter only supports NCCL, RCCL and HCCL now.")); -#endif -#ifdef PADDLE_WITH_CUSTOM - } -#endif - - if (local_rank_ == 0 && !with_switch_) { - auto opts = ProcessGroupGloo::GlooOptions::create(); - opts->device = ProcessGroupGloo::createDefaultDevice(); - inter_pg_ = std::make_shared( - store, gloo_rank_, gloo_size_, place_, IGNORE_ID, opts); - } -} - -template -static void _do_add(T* dst, T* src, size_t size) { - for (size_t i = 0; i < size; i++) { - *dst += *src; - dst++; - src++; - } -} - -std::shared_ptr ProcessGroupHeter::AllReduce( - std::vector& in_tensors, - std::vector& out_tensors, - const AllreduceOptions& opts) { -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) - PADDLE_ENFORCE_EQ( - CheckTensorsInCudaPlace(in_tensors), - true, - platform::errors::InvalidArgument("All inputs should be in CudaPlace.")); - PADDLE_ENFORCE_EQ( - CheckTensorsInCudaPlace(out_tensors), - true, - platform::errors::InvalidArgument("All outputs should be in CudaPlace.")); -#endif - - // Step1: do allreduce in inner cluster - auto task = inner_pg_->AllReduce(in_tensors, in_tensors, opts); - task->Wait(); - - // Step2: copy tensors to CPU - if (local_rank_ == 0) { - std::vector cpu_tensors; - cpu_tensors.reserve(in_tensors.size()); - phi::DenseTensor cpu_tensor; - for (size_t i = 0; i < in_tensors.size(); i++) { - auto gpu_tensor = in_tensors[i]; - cpu_tensor.Resize(gpu_tensor.dims()); - framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor); - cpu_tensors.push_back(cpu_tensor); - } - // Step3: do inter cluster allreduce - if (with_switch_) { - if (local_rank_ == 0) { - HeterClient* client_ = - HeterClient::GetInstance({switch_endpoint_}, {}, 0).get(); - auto dense_cpu_tensor = cpu_tensors[0]; - std::vector send_size; - send_size.push_back(dense_cpu_tensor.numel()); - int ret = client_->Send( - gid_, - {dense_cpu_tensor.name()}, - send_size, - dense_cpu_tensor.data(), - dense_cpu_tensor.numel() * - framework::DataTypeSize(dense_cpu_tensor.dtype())); - PADDLE_ENFORCE_EQ(ret, - 0, - platform::errors::PreconditionNotMet( - "Send to the switch module error.")); - phi::DenseTensor cpu_tensor2; - cpu_tensor2.AllocateFrom( - std::make_unique( - paddle::platform::CPUPlace()) - .get(), - dense_cpu_tensor.dtype(), - dense_cpu_tensor.numel()); - ret = client_->Recv( - gid_, - {dense_cpu_tensor.name()}, - cpu_tensor2.data(), - cpu_tensor2.numel() * framework::DataTypeSize(cpu_tensor2.dtype())); - PADDLE_ENFORCE_EQ(ret, - 0, - platform::errors::PreconditionNotMet( - "Recv from the switch module error.")); - - switch (dense_cpu_tensor.dtype()) { - case DataType::FLOAT32: - _do_add(reinterpret_cast(dense_cpu_tensor.data()), - reinterpret_cast(cpu_tensor2.data()), - dense_cpu_tensor.numel()); - break; - case DataType::FLOAT64: - _do_add(reinterpret_cast(dense_cpu_tensor.data()), - reinterpret_cast(cpu_tensor2.data()), - dense_cpu_tensor.numel()); - break; - case DataType::INT32: - _do_add(reinterpret_cast(dense_cpu_tensor.data()), - reinterpret_cast(cpu_tensor2.data()), - dense_cpu_tensor.numel()); - break; - default: - PADDLE_THROW(platform::errors::PreconditionNotMet( - "Unsupported data type (%s) to do add.", - framework::DataType2String(dense_cpu_tensor.dtype()))); - } - } - } else { - auto gloo_task = inter_pg_->AllReduce(cpu_tensors, cpu_tensors, opts); - gloo_task->Wait(); - } - // Step4: copy cpu tensors to gpu - // copy cpu tensors to gpu - for (size_t i = 0; i < in_tensors.size(); i++) { - auto gpu_tensor = out_tensors[i]; - auto cpu_tensor = cpu_tensors[i]; - framework::TensorCopySync(cpu_tensor, cpu_tensor.place(), &gpu_tensor); - } - } - - // Step5: broadcast among inner cluster - auto b_opts = BroadcastOptions(); - b_opts.source_rank = 0; - auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts); - broadcast_task->Wait(); - return CreateTask(rank_, CommType::ALLREDUCE, in_tensors); -} - -std::shared_ptr ProcessGroupHeter::Broadcast( - std::vector& in_tensors, - std::vector& out_tensors, - const BroadcastOptions& opts) { -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) - PADDLE_ENFORCE_EQ( - CheckTensorsInCudaPlace(in_tensors), - true, - platform::errors::InvalidArgument("All inputs should be in CudaPlace.")); - PADDLE_ENFORCE_EQ( - CheckTensorsInCudaPlace(out_tensors), - true, - platform::errors::InvalidArgument("All outputs should be in CudaPlace.")); -#endif - - // Step1: do broadcast in inner cluster - auto b_opts = BroadcastOptions(); - b_opts.source_rank = 0; - inner_pg_->Broadcast(in_tensors, out_tensors, b_opts); - - if (local_rank_ == 0) { - std::vector cpu_tensors; - cpu_tensors.reserve(in_tensors.size()); - for (size_t i = 0; i < in_tensors.size(); i++) { - auto gpu_tensor = in_tensors[i]; - phi::DenseTensor cpu_tensor; - cpu_tensor.Resize(gpu_tensor.dims()); - framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor); - cpu_tensors.push_back(cpu_tensor); - } - if (with_switch_) { - if (local_rank_ == 0) { - HeterClient* client_ = - HeterClient::GetInstance({switch_endpoint_}, {}, 0).get(); - auto dense_cpu_tensor = cpu_tensors[0]; - if (gloo_rank_ == 0) { - std::vector send_size; - send_size.push_back(dense_cpu_tensor.numel()); - int ret = client_->Send( - gid_, - {dense_cpu_tensor.name()}, - send_size, - dense_cpu_tensor.data(), - dense_cpu_tensor.numel() * - framework::DataTypeSize(dense_cpu_tensor.dtype())); - PADDLE_ENFORCE_EQ(ret, - 0, - platform::errors::PreconditionNotMet( - "Send to the switch module error.")); - } else { - int ret = client_->Recv( - gid_, - {dense_cpu_tensor.name()}, - dense_cpu_tensor.data(), - dense_cpu_tensor.numel() * - framework::DataTypeSize(dense_cpu_tensor.dtype())); - PADDLE_ENFORCE_EQ(ret, - 0, - platform::errors::PreconditionNotMet( - "Receive from the switch module error.")); - } - } - } else { - auto gloo_task = inter_pg_->Broadcast(cpu_tensors, cpu_tensors, opts); - gloo_task->Wait(); - } - for (size_t i = 0; i < in_tensors.size(); i++) { - auto gpu_tensor = out_tensors[i]; - auto cpu_tensor = cpu_tensors[i]; - framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor); - } - } - auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts); - broadcast_task->Wait(); - return CreateTask(rank_, CommType::BROADCAST, in_tensors); -} - -std::shared_ptr ProcessGroupHeter::Send( - std::vector& in_tensors, int peer) { - PADDLE_ENFORCE_EQ( - in_tensors.size(), - 1, - platform::errors::PreconditionNotMet( - "For each send operation, there can only be one tensor to send.")); - // Copy Tensor to cpu - auto start = std::chrono::high_resolution_clock::now(); - phi::DenseTensor cpu_tensor; - auto& gpu_tensor = in_tensors[0]; - framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor); - PADDLE_ENFORCE_EQ(with_switch_, - true, - platform::errors::PreconditionNotMet( - "Gloo does not support the send operation.")); - auto end = std::chrono::high_resolution_clock::now(); - std::chrono::duration diff = end - start; - VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims() - << ") from gpu to cpu for send " << std::setw(9) - << " is: " << diff.count() << " s" << std::endl; - - // Send to switch - HeterClient* client_ = - HeterClient::GetInstance({switch_endpoint_}, {}, 0).get(); - int64_t tensor_size = - cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype()); - std::vector send_size; - send_size.push_back(tensor_size); - auto id = src_rank_ * 10000 + dst_rank_; - std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) + - std::string("_") + std::to_string(send_count++); - VLOG(2) << "tensor_name:" << tensor_name; - int ret = client_->Send( - gid_, {tensor_name}, send_size, cpu_tensor.data(), tensor_size); - PADDLE_ENFORCE_EQ( - ret, - 0, - platform::errors::PreconditionNotMet("Send to the switch module error.")); - return CreateTask(rank_, CommType::SEND, in_tensors); -} - -std::shared_ptr ProcessGroupHeter::Recv( - std::vector& out_tensors, int peer) { - PADDLE_ENFORCE_EQ( - out_tensors.size(), - 1, - platform::errors::PreconditionNotMet( - "For each rece operation, there can only be one tensor to receive.")); - - // Copy Tensor to cpu - phi::DenseTensor cpu_tensor; - auto& gpu_tensor = out_tensors[0]; - cpu_tensor.Resize(gpu_tensor.dims()); - cpu_tensor.set_layout(gpu_tensor.layout()); - cpu_tensor.mutable_data(platform::CPUPlace(), gpu_tensor.dtype()); - - PADDLE_ENFORCE_EQ(with_switch_, - true, - platform::errors::PreconditionNotMet( - "Gloo does not support the send operation.")); - // recv from switch - HeterClient* client_ = - HeterClient::GetInstance({switch_endpoint_}, {}, 0).get(); - auto id = src_rank_ * 10000 + dst_rank_; - std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) + - std::string("_") + std::to_string(recv_count++); - VLOG(2) << "tensor_name: " << tensor_name; - auto start = std::chrono::high_resolution_clock::now(); - int ret = client_->Recv( - gid_, - {tensor_name}, - cpu_tensor.data(), - cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype())); - PADDLE_ENFORCE_EQ(ret, - 0, - platform::errors::PreconditionNotMet( - "receive to the switch module error.")); - auto end = std::chrono::high_resolution_clock::now(); - std::chrono::duration diff = end - start; - double goodput = cpu_tensor.numel() * - framework::DataTypeSize(cpu_tensor.dtype()) / diff.count(); - VLOG(2) << "Goodput: " << goodput << "B/s" << std::endl; - start = std::chrono::high_resolution_clock::now(); - framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor); - end = std::chrono::high_resolution_clock::now(); - diff = end - start; - VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims() - << ") from cpu to gpu for recv " << std::setw(9) - << " is: " << diff.count() << " s" << std::endl; - return CreateTask(rank_, CommType::RECV, out_tensors); -} - -} // namespace distributed -} // namespace paddle diff --git a/paddle/fluid/distributed/collective/ProcessGroupHeter.h b/paddle/fluid/distributed/collective/ProcessGroupHeter.h deleted file mode 100644 index c9104667877..00000000000 --- a/paddle/fluid/distributed/collective/ProcessGroupHeter.h +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include -#include -#include - -#include "paddle/fluid/distributed/collective/ProcessGroup.h" -#include "paddle/fluid/distributed/collective/ProcessGroupGloo.h" -#include "paddle/fluid/platform/device_context.h" - -#ifdef PADDLE_WITH_GLOO -#include "paddle/fluid/framework/fleet/gloo_wrapper.h" -#endif - -#include "paddle/fluid/distributed/store/store.h" -#include "paddle/fluid/platform/enforce.h" -#include "paddle/fluid/platform/gen_comm_id_helper.h" -#include "paddle/fluid/platform/place.h" - -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) -#include "paddle/fluid/distributed/collective/NCCLTools.h" -#include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h" -#include "paddle/fluid/platform/cuda_device_guard.h" -#endif - -#if defined(PADDLE_WITH_ASCEND_CL) -#include "paddle/fluid/distributed/collective/HCCLTools.h" -#include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h" -#endif - -#if defined(PADDLE_WITH_CUSTOM_DEVICE) -#include "paddle/fluid/distributed/collective/CustomCCLTools.h" -#include "paddle/fluid/distributed/collective/ProcessGroupCustom.h" -#endif - -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - (defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \ - defined(PADDLE_WITH_ASCEND_CL)) -#include "paddle/fluid/distributed/ps/service/heter_client.h" -#endif - -#include "paddle/fluid/distributed/collective/Common.h" - -namespace paddle { -namespace distributed { - -using Place = paddle::platform::Place; - -class ProcessGroupHeter : public ProcessGroup { - public: - class HeterTask : public ProcessGroup::Task, - public std::enable_shared_from_this { - public: - HeterTask(int rank, - CommType CommType, - const std::vector&); - - bool IsCompleted(); - - void SynchronizeStreams() {} - - bool Wait(std::chrono::milliseconds timeout = kWaitTimeout); - - void Synchronize() {} - - virtual ~HeterTask(); - }; - - ProcessGroupHeter(const std::shared_ptr& store, - int rank, - int size, - const platform::Place& place, - int gid, - int local_rank, - int local_size, - int gloo_rank, - int gloo_size, - bool with_switch, - std::string switch_endpoints, - int src_rank, - int dst_rank); - - std::string GetBackendName() const override { return "HETER_BACKEND"; } - - std::shared_ptr AllReduce( - std::vector&, - std::vector&, - const AllreduceOptions& = AllreduceOptions()) override; - - std::shared_ptr Broadcast( - std::vector&, - std::vector&, - const BroadcastOptions& = BroadcastOptions()) override; - - std::shared_ptr Send( - std::vector& in_tensors, int peer) override; - - std::shared_ptr Recv( - std::vector& out_tensors, int peer) override; - - protected: - virtual std::shared_ptr CreateTask( - int rank, CommType opType, const std::vector& inputs); - - private: - std::shared_ptr store_; - std::shared_ptr inner_pg_; - std::shared_ptr inter_pg_; - - int local_rank_; - int local_size_; - int gloo_rank_; - int gloo_size_; - bool with_switch_; - std::string switch_endpoint_; - int src_rank_; - int dst_rank_; - static int send_count; - static int recv_count; -}; - -} // namespace distributed -} // namespace paddle diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index 8cc5f4f4d2e..1fc0cad949f 100755 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -158,9 +158,6 @@ if(WITH_PYTHON) set(PYBIND_DEPS ${PYBIND_DEPS} processgroup eager_reducer) if(WITH_NCCL OR WITH_RCCL) set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_nccl) - if(WITH_PSCORE) - set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_heter) - endif() endif() if(WITH_XPU_BKCL) set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_bkcl) @@ -171,12 +168,6 @@ if(WITH_PYTHON) if(WITH_MPI) set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_mpi) endif() - if(WITH_ASCEND_CL) - set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_hccl) - if(WITH_PSCORE) - set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_heter) - endif() - endif() if(WITH_CUSTOM_DEVICE) set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_custom) endif() diff --git a/paddle/fluid/pybind/distributed_py.cc b/paddle/fluid/pybind/distributed_py.cc index 4cc1a0607e0..de415393caa 100644 --- a/paddle/fluid/pybind/distributed_py.cc +++ b/paddle/fluid/pybind/distributed_py.cc @@ -41,19 +41,10 @@ limitations under the License. */ #include "paddle/fluid/distributed/collective/ProcessGroupMPI.h" #endif -#if defined(PADDLE_WITH_ASCEND_CL) -#include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h" -#endif - #if defined(PADDLE_WITH_CUSTOM_DEVICE) #include "paddle/fluid/distributed/collective/ProcessGroupCustom.h" #endif -#if defined(PADDLE_WITH_GLOO) && defined(PADDLE_WITH_PSCORE) && \ - (defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_ASCEND_CL)) -#include "paddle/fluid/distributed/collective/ProcessGroupHeter.h" -#endif - #if defined(PADDLE_WITH_GLOO) #include "paddle/fluid/distributed/collective/ProcessGroupGloo.h" #include "paddle/fluid/distributed/store/tcp_store.h" @@ -1258,62 +1249,6 @@ void BindDistributed(py::module *m) { py::call_guard()); #endif -#if defined(PADDLE_WITH_GLOO) && defined(PADDLE_WITH_PSCORE) && \ - (defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_ASCEND_CL)) - py::class_>( - *m, "ProcessGroupHeter", ProcessGroup) - .def(py::init &, - int, - int, -#if defined(PADDLE_WITH_ASCEND_CL) - const platform::NPUPlace &, -#else - const platform::CUDAPlace &, -#endif - int, - int, - int, - int, - int, - bool, - std::string, - int, - int>(), - py::arg("store"), - py::arg("rank"), - py::arg("world_size"), - py::arg("place"), - py::arg("gid") = 0, - py::arg("local_rank") = 0, - py::arg("local_size") = 1, - py::arg("gloo_rank") = 0, - py::arg("gloo_size") = 1, - py::arg("with_switch") = false, - py::arg("switch_endpoint") = "", - py::arg("src_rank") = "", - py::arg("dst_rank") = "", - py::call_guard()); -#endif - -#if defined(PADDLE_WITH_ASCEND_CL) - py::class_>( - *m, "ProcessGroupHCCL", ProcessGroup) - .def(py::init &, - int, - int, - const platform::NPUPlace &, - int>(), - py::arg("store"), - py::arg("rank"), - py::arg("world_size"), - py::arg("place"), - py::arg("group_id") = 0, - py::call_guard()); - -#endif - #if defined(PADDLE_WITH_CUSTOM_DEVICE) py::class_>( diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index 91de7de4d45..c4e09a620fc 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -13,7 +13,6 @@ # limitations under the License. import numpy as np -import os import pickle import io import datetime @@ -148,15 +147,9 @@ def _new_process_group_impl( group_name, pg_options, group_id=0, - src_rank=None, - dst_rank=None, ): pg = None genv = _get_global_env() - if backend != 'heter': - assert src_rank is None and dst_rank is None, ( - "src_rank and dst_rank " "can only be set for heter backend." - ) assert backend in _valid_backend_list, "Unsupported backend: %s." % backend if backend == "gloo": place = core.CPUPlace() @@ -164,52 +157,12 @@ def _new_process_group_impl( elif backend == "nccl": place = core.CUDAPlace(genv.device_id) pg = core.ProcessGroupNCCL(store, rank, world_size, place, group_id) - elif backend == "hccl": - place = core.NPUPlace(genv.device_id) - pg = core.ProcessGroupHCCL(store, rank, world_size, place, group_id) elif backend == "xccl": place = core.CustomPlace(genv.device_type, genv.device_id) pg = core.ProcessGroupCustom(store, rank, world_size, place, group_id) elif backend == "bkcl": place = core.XPUPlace(genv.device_id) pg = core.ProcessGroupBKCL(store, rank, world_size, place, group_id) - elif backend == "heter": - place = None - if core.is_compiled_with_cuda(): - place = core.CUDAPlace(genv.device_id) - elif core.is_compiled_with_npu(): - place = core.NPUPlace(genv.device_id) - cluster_id = int(os.getenv("CLUSTER_ID", "-1")) - assert cluster_id >= 0, "please set the CLUSTER_ID variable." - cluster_size = os.getenv("CLUSTER_SIZE", None) - assert cluster_size, "please set the CLUSTER_SIZE variable." - cluster_size = cluster_size.split(",") - cluster_size = [int(s) for s in cluster_size] - switch_ep = os.getenv("CLUSTER_SWITCH", None) - assert switch_ep, "please set the CLUSTER_SWITCH variable." - cluster_size_cumsum = np.cumsum(cluster_size) - cluster_offset = ( - 0 if cluster_id == 0 else cluster_size_cumsum[cluster_id - 1] - ) - global_rank = cluster_offset + rank - global_world_size = cluster_size_cumsum[-1] - global_rank, global_world_size = _get_global_config(backend, rank) - pg = core.ProcessGroupHeter( - store, - rank=global_rank, - world_size=global_world_size, - place=place, - gid=group_id, - local_rank=rank, - local_size=world_size, - gloo_rank=cluster_id, - gloo_size=len(cluster_size), - with_switch=True, - switch_endpoint=switch_ep, - src_rank=src_rank, - dst_rank=dst_rank, - ) - return pg @@ -316,10 +269,8 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout): ) size = len(ranks) ranks = sorted(ranks) - if backend == 'heter' or (size > 1 and global_rank in ranks): + if size > 1 and global_rank in ranks: rank = 0 if backend == 'heter' else ranks.index(global_rank) - src_rank = ranks[0] if backend == 'heter' else None - dst_rank = ranks[1] if backend == 'heter' else None pg = _new_process_group_impl( backend, _default_store, @@ -328,8 +279,6 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout): group_name, pg_options=None, group_id=gid, - src_rank=src_rank, - dst_rank=dst_rank, ) else: rank = -1 diff --git a/python/paddle/fluid/tests/unittests/npu/process_group_hccl.py b/python/paddle/fluid/tests/unittests/npu/process_group_hccl.py deleted file mode 100644 index 474d4104b04..00000000000 --- a/python/paddle/fluid/tests/unittests/npu/process_group_hccl.py +++ /dev/null @@ -1,253 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -import random -import numpy as np -import os -import shutil - -import paddle -from paddle.fluid import core -from datetime import timedelta -import paddle.fluid.core as core -from paddle.fluid.framework import _test_eager_guard -from paddle.fluid.dygraph.parallel import ParallelEnv - - -def init_process_group(strategy=None): - nranks = ParallelEnv().nranks - rank = ParallelEnv().local_rank - is_master = True if rank == 0 else False - store = paddle.fluid.core.TCPStore("127.0.0.1", 6173, is_master, nranks) - pg_group = core.ProcessGroupHCCL(store, rank, nranks) - - return pg_group - - -class TestProcessGroupFp32(unittest.TestCase): - def setUp(self): - paddle.seed(2022) - random.seed(2022) - np.random.seed(2022) - self.config() - - def config(self): - self.dtype = "float32" - self.shape = (2, 10, 5) - - def test_create_process_group_nccl(self): - with _test_eager_guard(): - paddle.set_device( - 'npu:%d' % paddle.distributed.ParallelEnv().dev_id - ) - - pg = init_process_group() - - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - sum_result = tensor_x + tensor_y - if pg.rank() == 0: - task = pg.allreduce(tensor_x) - task.wait() - assert np.array_equal(tensor_x, sum_result) - else: - task = pg.allreduce(tensor_y) - task.wait() - assert np.array_equal(tensor_y, sum_result) - - print("test allreduce sum api ok") - - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - max_result = paddle.maximum(tensor_x, tensor_y) - - if pg.rank() == 0: - task = pg.allreduce(tensor_x, core.ReduceOp.MAX) - task.wait() - assert np.array_equal(tensor_x, max_result) - else: - task = pg.allreduce(tensor_y, core.ReduceOp.MAX) - task.wait() - assert np.array_equal(tensor_y, max_result) - - print("test allreduce max api ok") - - # test broadcast - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - # rank 1 - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - broadcast_result = paddle.assign(tensor_x) - if pg.rank() == 0: - task = pg.broadcast(tensor_x, 0) - task.synchronize() - paddle.device.cuda.synchronize() - assert task.is_completed() - assert np.array_equal(broadcast_result, tensor_x) - else: - task = pg.broadcast(tensor_y, 0) - task.synchronize() - paddle.device.cuda.synchronize() - assert task.is_completed() - assert np.array_equal(broadcast_result, tensor_y) - - print("test broadcast api ok") - - # test barrier - # rank 0 - if pg.rank() == 0: - task = pg.barrier() - task.wait() - # rank 1 - else: - task = pg.barrier() - task.wait() - - print("test barrier api ok\n") - exit(0) - - # test allgather - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - y = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - tensor_y = paddle.to_tensor(y) - out_shape = list(self.shape) - out_shape[0] *= 2 - out = np.random.random(out_shape).astype(self.dtype) - tensor_out = paddle.to_tensor(out) - if pg.rank() == 0: - task = pg.all_gather(tensor_x, tensor_out) - task.wait() - paddle.device.cuda.synchronize() - # rank 1 - else: - task = pg.all_gather(tensor_y, tensor_out) - task.wait() - paddle.device.cuda.synchronize() - out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2]) - out_2 = paddle.slice( - tensor_out, [0], [out_shape[0] // 2], [out_shape[0]] - ) - assert np.array_equal(tensor_x, out_1) - assert np.array_equal(tensor_y, out_2) - print("test allgather api ok\n") - - # test alltoall - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - y = np.random.random(self.shape).astype(self.dtype) - out1 = np.random.random(self.shape).astype(self.dtype) - out2 = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - tensor_y = paddle.to_tensor(y) - tensor_out1 = paddle.to_tensor(out1) - tensor_out2 = paddle.to_tensor(out2) - raw_tensor_x_2 = paddle.slice( - tensor_x, [0], [self.shape[0] // 2], [self.shape[0]] - ) - raw_tensor_y_1 = paddle.slice( - tensor_y, [0], [0], [self.shape[0] // 2] - ) - if pg.rank() == 0: - task = pg.alltoall(tensor_x, tensor_out1) - task.wait() - paddle.device.cuda.synchronize() - # rank 1 - else: - task = pg.alltoall(tensor_y, tensor_out2) - task.wait() - paddle.device.cuda.synchronize() - out1_2 = paddle.slice( - tensor_out1, [0], [self.shape[0] // 2], [self.shape[0]] - ) - out2_1 = paddle.slice(tensor_out2, [0], [0], [self.shape[0] // 2]) - if pg.rank() == 0: - assert np.array_equal(out1_2.numpy(), raw_tensor_y_1.numpy()) - else: - assert np.array_equal(out2_1, raw_tensor_x_2) - print("test alltoall api ok\n") - - # test Reduce - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - y = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - tensor_y = paddle.to_tensor(y) - sum_result = tensor_x + tensor_y - if pg.rank() == 0: - task = pg.reduce(tensor_x, 0) - task.wait() - paddle.device.cuda.synchronize() - # rank 1 - else: - task = pg.reduce(tensor_y, 0) - task.wait() - paddle.device.cuda.synchronize() - if pg.rank() == 0: - assert np.array_equal(tensor_x, sum_result) - print("test reduce sum api ok\n") - - # test Scatter - # rank 0 - in_shape = list(self.shape) - in_shape[0] *= 2 - x = np.random.random(in_shape).astype(self.dtype) - y = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - tensor_y = paddle.to_tensor(y) - if pg.rank() == 0: - task = pg.scatter(tensor_x, tensor_y, 0) - task.wait() - paddle.device.cuda.synchronize() - # rank 1 - else: - task = pg.scatter(tensor_x, tensor_y, 0) - task.wait() - paddle.device.cuda.synchronize() - out1 = paddle.slice(tensor_x, [0], [0], [self.shape[0]]) - out2 = paddle.slice( - tensor_x, [0], [self.shape[0]], [self.shape[0] * 2] - ) - if pg.rank() == 0: - assert np.array_equal(tensor_y, out1) - else: - assert np.array_equal(tensor_y, out2) - print("test scatter api ok\n") - - -class TestProcessGroupFp16(TestProcessGroupFp32): - def setUp(self): - paddle.seed(2022) - random.seed(2022) - np.random.seed(2022) - self.config() - - def config(self): - self.dtype = "float16" - self.shape = (4, 20, 20) - - -if __name__ == "__main__": - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/npu/test_collective_process_group_hccl.py b/python/paddle/fluid/tests/unittests/npu/test_collective_process_group_hccl.py deleted file mode 100644 index 181ba75c11b..00000000000 --- a/python/paddle/fluid/tests/unittests/npu/test_collective_process_group_hccl.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -import sys - -sys.path.append("..") -from test_parallel_dygraph_dataparallel import TestMultipleGpus - - -class TestProcessGroup(TestMultipleGpus): - def test_process_group_nccl(self): - self.run_mnist_2gpu('process_group_hccl.py') - - -if __name__ == "__main__": - unittest.main() -- GitLab