未验证 提交 9191e743 编写于 作者: L LiYuRio 提交者: GitHub

remove heter and hccl (#47918)

上级 05a47f39
...@@ -30,18 +30,6 @@ if(WITH_NCCL OR WITH_RCCL) ...@@ -30,18 +30,6 @@ if(WITH_NCCL OR WITH_RCCL)
device_context device_context
${DEVICE_EVENT_LIBS} ${DEVICE_EVENT_LIBS}
dense_tensor) dense_tensor)
if(WITH_DISTRIBUTE AND WITH_PSCORE)
if(CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
set(DISTRIBUTE_COMPILE_FLAGS "${DISTRIBUTE_COMPILE_FLAGS} -faligned-new")
set_source_files_properties(
ProcessGroupHeter.cc PROPERTIES COMPILE_FLAGS
${DISTRIBUTE_COMPILE_FLAGS})
endif()
cc_library(
processgroup_heter
SRCS ProcessGroupHeter.cc NCCLTools.cc Common.cc
DEPS place enforce collective_helper device_context phi_api eager_api)
endif()
endif() endif()
if(WITH_XPU_BKCL) if(WITH_XPU_BKCL)
...@@ -59,38 +47,6 @@ if(WITH_MPI) ...@@ -59,38 +47,6 @@ if(WITH_MPI)
DEPS collective_helper device_context) DEPS collective_helper device_context)
endif() endif()
if(WITH_ASCEND_CL)
cc_library(
processgroup_hccl
SRCS ProcessGroupHCCL.cc HCCLTools.cc Common.cc
DEPS place
npu_stream
enforce
collective_helper
device_context
phi_api
eager_api)
if(WITH_DISTRIBUTE AND WITH_PSCORE)
if(CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
set(DISTRIBUTE_COMPILE_FLAGS "${DISTRIBUTE_COMPILE_FLAGS} -faligned-new")
set_source_files_properties(
ProcessGroupHeter.cc PROPERTIES COMPILE_FLAGS
${DISTRIBUTE_COMPILE_FLAGS})
endif()
cc_library(
processgroup_heter
SRCS ProcessGroupHeter.cc HCCLTools.cc Common.cc
DEPS place
npu_stream
enforce
collective_helper
device_context
phi_api
eager_api)
endif()
endif()
if(WITH_CUSTOM_DEVICE) if(WITH_CUSTOM_DEVICE)
cc_library( cc_library(
processgroup_custom processgroup_custom
......
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h"
#include "paddle/fluid/distributed/collective/Common.h"
#include "paddle/fluid/distributed/collective/HCCLTools.h"
#include "paddle/fluid/memory/malloc.h"
#include "paddle/fluid/platform/device/npu/hccl_helper.h"
#include "paddle/fluid/platform/device/npu/npu_info.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
#include "paddle/phi/common/place.h"
DECLARE_bool(hccl_blocking_wait);
// DECLARE_bool(use_stream_safe_npu_allocator);
constexpr int64_t kWaitBlockTImeout = 10;
namespace paddle {
namespace distributed {
void SyncDefaultStream(
const std::vector<Place>& places,
std::vector<NPUEventManager>& hcclEvents, // NOLINT
std::vector<std::unique_ptr<NPUDeviceContext>>& dev_ctx) { // NOLINT
for (size_t i = 0; i < places.size(); ++i) {
auto* default_ctx = static_cast<platform::NPUDeviceContext*>(
platform::DeviceContextPool::Instance().Get(places[i]));
hcclEvents[i].Record(*dev_ctx[i]);
hcclEvents[i].Block(*default_ctx);
}
}
std::shared_ptr<ProcessGroupHCCL::HCCLTask> ProcessGroupHCCL::CreateTask(
std::vector<Place> places,
int rank,
CommType comm_type,
const std::vector<phi::DenseTensor>& inputs) {
return std::make_shared<ProcessGroupHCCL::HCCLTask>(
places, rank, comm_type, inputs);
}
ProcessGroupHCCL::HCCLTask::HCCLTask(
const std::vector<Place>& places,
int rank,
CommType CommType,
const std::vector<phi::DenseTensor>& inputs)
: Task(rank, inputs, CommType), places_(places) {
control_events_.resize(places.size());
hcclComms_.resize(places.size());
}
ProcessGroupHCCL::HCCLTask::~HCCLTask() {}
void ProcessGroupHCCL::HCCLTask::SetOutputs(
std::vector<phi::DenseTensor>& outputs) { // NOLINT
outputs_ = std::make_shared<std::vector<phi::DenseTensor>>(outputs);
}
void ProcessGroupHCCL::HCCLTask::SynchronizeStreams() {
for (size_t i = 0; i < places_.size(); ++i) {
auto* default_ctx = static_cast<platform::NPUDeviceContext*>(
platform::DeviceContextPool::Instance().Get(places_[i]));
platform::NPUStreamWaitEvent(default_ctx->stream(),
control_events_[i].GetRawNPUEvent());
}
}
bool ProcessGroupHCCL::HCCLTask::IsCompleted() {
for (size_t i = 0; i < places_.size(); ++i) {
if (!control_events_[i].Query()) {
return false;
}
}
return true;
}
// TODO(sandyhouse): Add timeout for wait, now timeout unused
bool ProcessGroupHCCL::HCCLTask::Wait(std::chrono::milliseconds timeout) {
SynchronizeStreams();
// NOTE(sandyhouse): It will block host for sync
while (!IsCompleted()) {
std::this_thread::sleep_for(std::chrono::milliseconds(kWaitBlockTImeout));
}
return true;
}
// Same as Wait
void ProcessGroupHCCL::HCCLTask::Synchronize() { Wait(kWaitTimeout); }
ProcessGroupHCCL::ProcessGroupHCCL(const std::shared_ptr<Store>& store,
int rank,
int size,
const platform::Place& place,
int gid)
: ProcessGroup(rank, size, place, gid), store_(store) {
platform::SetNPUDeviceId(place_.device);
}
void ProcessGroupHCCL::BroadcastUniqueHCCLID(
std::vector<HcclRootInfo>& hccl_ids) { // NOLINT
if (rank_ == 0) {
for (size_t i = 0; i < hccl_ids.size(); i++) {
auto key = "ProcessGroupHCCL/hccl_ids/" + std::to_string(i);
auto hccl_id = std::vector<uint8_t>(
reinterpret_cast<uint8_t*>(&hccl_ids[i]),
reinterpret_cast<uint8_t*>(&hccl_ids[i]) + sizeof(HcclRootInfo));
store_->set(key, hccl_id);
}
} else {
for (size_t i = 0; i < hccl_ids.size(); i++) {
auto key = "ProcessGroupHCCL/hccl_ids/" + std::to_string(i);
auto ret = store_->get(key);
std::memcpy(&hccl_ids[i], ret.data(), ret.size());
}
}
}
// create HCCLManager cache for places_key
void ProcessGroupHCCL::CreateHCCLManagerCache(
const std::string& places_key, const std::vector<Place>& places) {
PADDLE_ENFORCE_EQ(places_key.empty(),
false,
platform::errors::PreconditionNotMet(
"Not able to create/get the HCCL Communicator since "
"the NPU place are not known"));
std::vector<std::shared_ptr<HCCLCommManager>> hccl_comms;
hccl_comms.resize(places.size());
// using vector just for broadcast
std::vector<HcclRootInfo> hccl_ids;
hccl_ids.resize(1);
auto& hccl_id = hccl_ids.front();
if (rank_ == 0) {
PADDLE_ENFORCE_NPU_SUCCESS(platform::dynload::HcclGetRootInfo(&hccl_id));
}
BroadcastUniqueHCCLID(hccl_ids);
VLOG(3) << "init hccl rank: " << rank_ << ", nranks: " << size_
<< ", place: " << places_key
<< ", hccl uniqueid: " << SerializeHCCLUniqueId(hccl_id);
std::vector<std::unique_ptr<NPUDeviceContext>> dev_ctx;
dev_ctx.resize(places.size());
std::unique_ptr<HcclComm[]> comms(new HcclComm[places.size()]);
for (size_t i = 0; i < places.size(); ++i) {
platform::NPUDeviceGuard guard(places[i].GetDeviceId());
hccl_comms[i] = HCCLCommManager::Create(
GetSize(), GetRank(), &hccl_id, comms.get() + i);
dev_ctx[i].reset(new NPUDeviceContext(places[i]));
}
std::vector<NPUEventManager> events;
events.resize(places.size());
// These caches will be useful to process sync/wait/communicate
places_to_events_.emplace(places_key, std::move(events));
places_to_hcclcomm_.emplace(places_key, std::move(hccl_comms));
places_to_ctx_.emplace(places_key, std::move(dev_ctx));
}
template <typename Fn>
std::shared_ptr<ProcessGroup::Task> ProcessGroupHCCL::Collective(
std::vector<phi::DenseTensor>& inputs,
std::vector<phi::DenseTensor>& outputs,
Fn fn,
CommType op_type) {
const auto places = GetPlaceList(inputs);
const auto key = GetKeyFromPlaces(places);
{
std::lock_guard<std::mutex> lock(mutex_);
if (places_to_hcclcomm_.find(key) == places_to_hcclcomm_.end()) {
CreateHCCLManagerCache(key, places);
}
}
auto& hccl_comms = places_to_hcclcomm_[key];
SyncDefaultStream(places, places_to_events_[key], places_to_ctx_[key]);
auto task = CreateTask(places, rank_, op_type, inputs);
for (size_t i = 0; i < inputs.size(); ++i) {
platform::NPUDeviceGuard guard(places[i].GetDeviceId());
const auto& hccl_stream = places_to_ctx_[key][i]->stream();
fn(inputs[i], outputs[i], hccl_comms[i]->GetHcclComm(), hccl_stream);
}
for (size_t i = 0; i < inputs.size(); ++i) {
platform::NPUDeviceGuard guard(places[i].GetDeviceId());
task->control_events_[i].Record(*places_to_ctx_[key][i]);
}
return task;
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupHCCL::AllReduce(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
const AllreduceOptions& opts) {
return Collective(
in_tensors,
out_tensors,
[&](phi::DenseTensor& input,
phi::DenseTensor& output,
HcclComm comm,
const aclrtStream& stream) {
return platform::dynload::HcclAllReduce(
input.data(),
output.data(),
input.numel(),
platform::ToHCCLDataType(input.dtype()),
ToHCCLRedType(opts.reduce_op),
comm,
stream);
},
CommType::ALLREDUCE);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupHCCL::Broadcast(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
const BroadcastOptions& opts) {
// PADDLE_ENFORCE_EQ(
// CheckTensorsInNPUPlace(tensors), true,
// platform::errors::InvalidArgument("All inputs should be in
// CudaPlace."));
return Collective(
in_tensors,
out_tensors,
[&](phi::DenseTensor& input,
phi::DenseTensor& output,
HcclComm comm,
const aclrtStream& stream) {
int root = opts.source_rank * in_tensors.size() + opts.source_root;
if (rank_ == root) {
return platform::dynload::HcclBroadcast(
input.data(),
input.numel(),
platform::ToHCCLDataType(input.dtype()),
root,
comm,
stream);
} else {
return platform::dynload::HcclBroadcast(
output.data(),
output.numel(),
platform::ToHCCLDataType(output.dtype()),
root,
comm,
stream);
}
},
CommType::BROADCAST);
}
} // namespace distributed
} // namespace paddle
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <chrono>
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "paddle/fluid/distributed/collective/HCCLTools.h"
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/distributed/store/store.h"
#include "paddle/fluid/platform/device/npu/npu_stream.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/gen_comm_id_helper.h"
#include "paddle/fluid/platform/place.h"
namespace paddle {
namespace distributed {
using Place = paddle::platform::Place;
using NPUStream = platform::stream::NPUStream;
using NPUDeviceContext = paddle::platform::NPUDeviceContext;
class ProcessGroupHCCL : public ProcessGroup {
public:
class HCCLTask : public ProcessGroup::Task,
public std::enable_shared_from_this<HCCLTask> {
public:
HCCLTask(const std::vector<Place>& places,
int rank,
CommType CommType,
const std::vector<phi::DenseTensor>& inputs);
bool IsCompleted();
void SynchronizeStreams();
bool Wait(std::chrono::milliseconds timeout = kWaitTimeout);
void Synchronize();
void SetOutputs(std::vector<phi::DenseTensor>& outputs); // NOLINT
virtual ~HCCLTask();
std::vector<NPUEventManager> control_events_;
protected:
std::vector<Place> places_;
std::vector<std::shared_ptr<HCCLCommManager>> hcclComms_;
std::shared_ptr<std::vector<phi::DenseTensor>> outputs_;
private:
};
ProcessGroupHCCL(const std::shared_ptr<Store>& store,
int rank,
int size,
const platform::Place& place,
int gid);
std::string GetBackendName() const override { return "HCCL"; }
std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const AllreduceOptions& = AllreduceOptions()) override;
std::shared_ptr<ProcessGroup::Task> Broadcast(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const BroadcastOptions& = BroadcastOptions()) override;
protected:
virtual std::shared_ptr<ProcessGroupHCCL::HCCLTask> CreateTask(
std::vector<Place> places,
int rank,
CommType opType,
const std::vector<phi::DenseTensor>& inputs);
std::shared_ptr<Store> store_;
std::shared_ptr<HCCLCommManager> hccl_comm_;
std::mutex mutex_;
std::unordered_map<std::string, std::vector<std::shared_ptr<HCCLCommManager>>>
places_to_hcclcomm_;
std::unordered_map<std::string, std::vector<NPUEventManager>>
places_to_events_;
std::unordered_map<std::string,
std::vector<std::unique_ptr<NPUDeviceContext>>>
places_to_ctx_;
std::set<int> used_place_ids_;
private:
void BcastHCCLId(std::vector<HcclRootInfo>& hccl_ids, // NOLINT
int root, // NOLINT
int server_fd);
void BroadcastUniqueHCCLID(std::vector<HcclRootInfo>& hccl_ids); // NOLINT
template <typename Fn>
std::shared_ptr<ProcessGroup::Task> Collective(
std::vector<phi::DenseTensor>& inputs, // NOLINT
std::vector<phi::DenseTensor>& outputs, // NOLINT
Fn fn,
CommType op_type);
void CreateHCCLManagerCache(const std::string& places_key,
const std::vector<Place>& places);
};
} // namespace distributed
} // namespace paddle
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/collective/ProcessGroupHeter.h"
#include <chrono>
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
#include "paddle/phi/common/place.h"
constexpr int64_t kWaitBlockTImeout = 10;
namespace paddle {
namespace distributed {
using Place = paddle::platform::Place;
int ProcessGroupHeter::send_count = 0;
int ProcessGroupHeter::recv_count = 0;
std::shared_ptr<ProcessGroupHeter::HeterTask> ProcessGroupHeter::CreateTask(
int rank, CommType comm_type, const std::vector<phi::DenseTensor>& inputs) {
return std::make_shared<ProcessGroupHeter::HeterTask>(
rank, comm_type, inputs);
}
ProcessGroupHeter::HeterTask::HeterTask(
int rank, CommType CommType, const std::vector<phi::DenseTensor>& inputs)
: Task(rank, inputs, CommType) {}
ProcessGroupHeter::HeterTask::~HeterTask() {}
bool ProcessGroupHeter::HeterTask::IsCompleted() { return true; }
// TODO(sheniang03): Add timeout for wait, now timeout unused
bool ProcessGroupHeter::HeterTask::Wait(std::chrono::milliseconds timeout) {
return true;
}
ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr<Store>& store,
int rank,
int size,
const platform::Place& place,
int gid,
int local_rank,
int local_size,
int gloo_rank,
int gloo_size,
bool with_switch,
std::string switch_endpoint,
int src_rank,
int dst_rank)
: ProcessGroup(rank, size, place, gid),
store_(store),
local_rank_(local_rank),
local_size_(local_size),
gloo_rank_(gloo_rank),
gloo_size_(gloo_size),
with_switch_(with_switch),
switch_endpoint_(switch_endpoint),
src_rank_(src_rank),
dst_rank_(dst_rank) {
return;
#ifdef PADDLE_WITH_CUSTOM
if (paddle::platform::is_custom_place(place_)) {
inner_pg_ = std::make_shared<ProcessGroupCustom>(
store, local_rank, local_size, place_, IGNORE_ID);
} else {
#endif
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
inner_pg_ = std::make_shared<ProcessGroupNCCL>(
store, local_rank, local_size, place_, IGNORE_ID);
#elif defined(PADDLE_WITH_ASCEND_CL)
inner_pg_ = std::make_shared<ProcessGroupHCCL>(
store, local_rank, local_size, place_, IGNORE_ID);
#else
PADDLE_THROW(platform::errors::Unavailable(
"ProcessGroupHeter only supports NCCL, RCCL and HCCL now."));
#endif
#ifdef PADDLE_WITH_CUSTOM
}
#endif
if (local_rank_ == 0 && !with_switch_) {
auto opts = ProcessGroupGloo::GlooOptions::create();
opts->device = ProcessGroupGloo::createDefaultDevice();
inter_pg_ = std::make_shared<ProcessGroupGloo>(
store, gloo_rank_, gloo_size_, place_, IGNORE_ID, opts);
}
}
template <typename T>
static void _do_add(T* dst, T* src, size_t size) {
for (size_t i = 0; i < size; i++) {
*dst += *src;
dst++;
src++;
}
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const AllreduceOptions& opts) {
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(in_tensors),
true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(out_tensors),
true,
platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
#endif
// Step1: do allreduce in inner cluster
auto task = inner_pg_->AllReduce(in_tensors, in_tensors, opts);
task->Wait();
// Step2: copy tensors to CPU
if (local_rank_ == 0) {
std::vector<phi::DenseTensor> cpu_tensors;
cpu_tensors.reserve(in_tensors.size());
phi::DenseTensor cpu_tensor;
for (size_t i = 0; i < in_tensors.size(); i++) {
auto gpu_tensor = in_tensors[i];
cpu_tensor.Resize(gpu_tensor.dims());
framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
cpu_tensors.push_back(cpu_tensor);
}
// Step3: do inter cluster allreduce
if (with_switch_) {
if (local_rank_ == 0) {
HeterClient* client_ =
HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
auto dense_cpu_tensor = cpu_tensors[0];
std::vector<int64_t> send_size;
send_size.push_back(dense_cpu_tensor.numel());
int ret = client_->Send(
gid_,
{dense_cpu_tensor.name()},
send_size,
dense_cpu_tensor.data(),
dense_cpu_tensor.numel() *
framework::DataTypeSize(dense_cpu_tensor.dtype()));
PADDLE_ENFORCE_EQ(ret,
0,
platform::errors::PreconditionNotMet(
"Send to the switch module error."));
phi::DenseTensor cpu_tensor2;
cpu_tensor2.AllocateFrom(
std::make_unique<paddle::experimental::DefaultAllocator>(
paddle::platform::CPUPlace())
.get(),
dense_cpu_tensor.dtype(),
dense_cpu_tensor.numel());
ret = client_->Recv(
gid_,
{dense_cpu_tensor.name()},
cpu_tensor2.data(),
cpu_tensor2.numel() * framework::DataTypeSize(cpu_tensor2.dtype()));
PADDLE_ENFORCE_EQ(ret,
0,
platform::errors::PreconditionNotMet(
"Recv from the switch module error."));
switch (dense_cpu_tensor.dtype()) {
case DataType::FLOAT32:
_do_add<float>(reinterpret_cast<float*>(dense_cpu_tensor.data()),
reinterpret_cast<float*>(cpu_tensor2.data()),
dense_cpu_tensor.numel());
break;
case DataType::FLOAT64:
_do_add<double>(reinterpret_cast<double*>(dense_cpu_tensor.data()),
reinterpret_cast<double*>(cpu_tensor2.data()),
dense_cpu_tensor.numel());
break;
case DataType::INT32:
_do_add<int>(reinterpret_cast<int*>(dense_cpu_tensor.data()),
reinterpret_cast<int*>(cpu_tensor2.data()),
dense_cpu_tensor.numel());
break;
default:
PADDLE_THROW(platform::errors::PreconditionNotMet(
"Unsupported data type (%s) to do add.",
framework::DataType2String(dense_cpu_tensor.dtype())));
}
}
} else {
auto gloo_task = inter_pg_->AllReduce(cpu_tensors, cpu_tensors, opts);
gloo_task->Wait();
}
// Step4: copy cpu tensors to gpu
// copy cpu tensors to gpu
for (size_t i = 0; i < in_tensors.size(); i++) {
auto gpu_tensor = out_tensors[i];
auto cpu_tensor = cpu_tensors[i];
framework::TensorCopySync(cpu_tensor, cpu_tensor.place(), &gpu_tensor);
}
}
// Step5: broadcast among inner cluster
auto b_opts = BroadcastOptions();
b_opts.source_rank = 0;
auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts);
broadcast_task->Wait();
return CreateTask(rank_, CommType::ALLREDUCE, in_tensors);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const BroadcastOptions& opts) {
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(in_tensors),
true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(out_tensors),
true,
platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
#endif
// Step1: do broadcast in inner cluster
auto b_opts = BroadcastOptions();
b_opts.source_rank = 0;
inner_pg_->Broadcast(in_tensors, out_tensors, b_opts);
if (local_rank_ == 0) {
std::vector<phi::DenseTensor> cpu_tensors;
cpu_tensors.reserve(in_tensors.size());
for (size_t i = 0; i < in_tensors.size(); i++) {
auto gpu_tensor = in_tensors[i];
phi::DenseTensor cpu_tensor;
cpu_tensor.Resize(gpu_tensor.dims());
framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
cpu_tensors.push_back(cpu_tensor);
}
if (with_switch_) {
if (local_rank_ == 0) {
HeterClient* client_ =
HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
auto dense_cpu_tensor = cpu_tensors[0];
if (gloo_rank_ == 0) {
std::vector<int64_t> send_size;
send_size.push_back(dense_cpu_tensor.numel());
int ret = client_->Send(
gid_,
{dense_cpu_tensor.name()},
send_size,
dense_cpu_tensor.data(),
dense_cpu_tensor.numel() *
framework::DataTypeSize(dense_cpu_tensor.dtype()));
PADDLE_ENFORCE_EQ(ret,
0,
platform::errors::PreconditionNotMet(
"Send to the switch module error."));
} else {
int ret = client_->Recv(
gid_,
{dense_cpu_tensor.name()},
dense_cpu_tensor.data(),
dense_cpu_tensor.numel() *
framework::DataTypeSize(dense_cpu_tensor.dtype()));
PADDLE_ENFORCE_EQ(ret,
0,
platform::errors::PreconditionNotMet(
"Receive from the switch module error."));
}
}
} else {
auto gloo_task = inter_pg_->Broadcast(cpu_tensors, cpu_tensors, opts);
gloo_task->Wait();
}
for (size_t i = 0; i < in_tensors.size(); i++) {
auto gpu_tensor = out_tensors[i];
auto cpu_tensor = cpu_tensors[i];
framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor);
}
}
auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts);
broadcast_task->Wait();
return CreateTask(rank_, CommType::BROADCAST, in_tensors);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Send(
std::vector<phi::DenseTensor>& in_tensors, int peer) {
PADDLE_ENFORCE_EQ(
in_tensors.size(),
1,
platform::errors::PreconditionNotMet(
"For each send operation, there can only be one tensor to send."));
// Copy Tensor to cpu
auto start = std::chrono::high_resolution_clock::now();
phi::DenseTensor cpu_tensor;
auto& gpu_tensor = in_tensors[0];
framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
PADDLE_ENFORCE_EQ(with_switch_,
true,
platform::errors::PreconditionNotMet(
"Gloo does not support the send operation."));
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> diff = end - start;
VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims()
<< ") from gpu to cpu for send " << std::setw(9)
<< " is: " << diff.count() << " s" << std::endl;
// Send to switch
HeterClient* client_ =
HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
int64_t tensor_size =
cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype());
std::vector<int64_t> send_size;
send_size.push_back(tensor_size);
auto id = src_rank_ * 10000 + dst_rank_;
std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) +
std::string("_") + std::to_string(send_count++);
VLOG(2) << "tensor_name:" << tensor_name;
int ret = client_->Send(
gid_, {tensor_name}, send_size, cpu_tensor.data(), tensor_size);
PADDLE_ENFORCE_EQ(
ret,
0,
platform::errors::PreconditionNotMet("Send to the switch module error."));
return CreateTask(rank_, CommType::SEND, in_tensors);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Recv(
std::vector<phi::DenseTensor>& out_tensors, int peer) {
PADDLE_ENFORCE_EQ(
out_tensors.size(),
1,
platform::errors::PreconditionNotMet(
"For each rece operation, there can only be one tensor to receive."));
// Copy Tensor to cpu
phi::DenseTensor cpu_tensor;
auto& gpu_tensor = out_tensors[0];
cpu_tensor.Resize(gpu_tensor.dims());
cpu_tensor.set_layout(gpu_tensor.layout());
cpu_tensor.mutable_data(platform::CPUPlace(), gpu_tensor.dtype());
PADDLE_ENFORCE_EQ(with_switch_,
true,
platform::errors::PreconditionNotMet(
"Gloo does not support the send operation."));
// recv from switch
HeterClient* client_ =
HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
auto id = src_rank_ * 10000 + dst_rank_;
std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) +
std::string("_") + std::to_string(recv_count++);
VLOG(2) << "tensor_name: " << tensor_name;
auto start = std::chrono::high_resolution_clock::now();
int ret = client_->Recv(
gid_,
{tensor_name},
cpu_tensor.data(),
cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype()));
PADDLE_ENFORCE_EQ(ret,
0,
platform::errors::PreconditionNotMet(
"receive to the switch module error."));
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> diff = end - start;
double goodput = cpu_tensor.numel() *
framework::DataTypeSize(cpu_tensor.dtype()) / diff.count();
VLOG(2) << "Goodput: " << goodput << "B/s" << std::endl;
start = std::chrono::high_resolution_clock::now();
framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor);
end = std::chrono::high_resolution_clock::now();
diff = end - start;
VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims()
<< ") from cpu to gpu for recv " << std::setw(9)
<< " is: " << diff.count() << " s" << std::endl;
return CreateTask(rank_, CommType::RECV, out_tensors);
}
} // namespace distributed
} // namespace paddle
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <chrono>
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/distributed/collective/ProcessGroupGloo.h"
#include "paddle/fluid/platform/device_context.h"
#ifdef PADDLE_WITH_GLOO
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#endif
#include "paddle/fluid/distributed/store/store.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/gen_comm_id_helper.h"
#include "paddle/fluid/platform/place.h"
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
#include "paddle/fluid/distributed/collective/NCCLTools.h"
#include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h"
#include "paddle/fluid/platform/cuda_device_guard.h"
#endif
#if defined(PADDLE_WITH_ASCEND_CL)
#include "paddle/fluid/distributed/collective/HCCLTools.h"
#include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h"
#endif
#if defined(PADDLE_WITH_CUSTOM_DEVICE)
#include "paddle/fluid/distributed/collective/CustomCCLTools.h"
#include "paddle/fluid/distributed/collective/ProcessGroupCustom.h"
#endif
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
(defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_ASCEND_CL))
#include "paddle/fluid/distributed/ps/service/heter_client.h"
#endif
#include "paddle/fluid/distributed/collective/Common.h"
namespace paddle {
namespace distributed {
using Place = paddle::platform::Place;
class ProcessGroupHeter : public ProcessGroup {
public:
class HeterTask : public ProcessGroup::Task,
public std::enable_shared_from_this<HeterTask> {
public:
HeterTask(int rank,
CommType CommType,
const std::vector<phi::DenseTensor>&);
bool IsCompleted();
void SynchronizeStreams() {}
bool Wait(std::chrono::milliseconds timeout = kWaitTimeout);
void Synchronize() {}
virtual ~HeterTask();
};
ProcessGroupHeter(const std::shared_ptr<Store>& store,
int rank,
int size,
const platform::Place& place,
int gid,
int local_rank,
int local_size,
int gloo_rank,
int gloo_size,
bool with_switch,
std::string switch_endpoints,
int src_rank,
int dst_rank);
std::string GetBackendName() const override { return "HETER_BACKEND"; }
std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>&,
std::vector<phi::DenseTensor>&,
const AllreduceOptions& = AllreduceOptions()) override;
std::shared_ptr<ProcessGroup::Task> Broadcast(
std::vector<phi::DenseTensor>&,
std::vector<phi::DenseTensor>&,
const BroadcastOptions& = BroadcastOptions()) override;
std::shared_ptr<ProcessGroup::Task> Send(
std::vector<phi::DenseTensor>& in_tensors, int peer) override;
std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>& out_tensors, int peer) override;
protected:
virtual std::shared_ptr<ProcessGroupHeter::HeterTask> CreateTask(
int rank, CommType opType, const std::vector<phi::DenseTensor>& inputs);
private:
std::shared_ptr<Store> store_;
std::shared_ptr<ProcessGroup> inner_pg_;
std::shared_ptr<ProcessGroupGloo> inter_pg_;
int local_rank_;
int local_size_;
int gloo_rank_;
int gloo_size_;
bool with_switch_;
std::string switch_endpoint_;
int src_rank_;
int dst_rank_;
static int send_count;
static int recv_count;
};
} // namespace distributed
} // namespace paddle
...@@ -158,9 +158,6 @@ if(WITH_PYTHON) ...@@ -158,9 +158,6 @@ if(WITH_PYTHON)
set(PYBIND_DEPS ${PYBIND_DEPS} processgroup eager_reducer) set(PYBIND_DEPS ${PYBIND_DEPS} processgroup eager_reducer)
if(WITH_NCCL OR WITH_RCCL) if(WITH_NCCL OR WITH_RCCL)
set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_nccl) set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_nccl)
if(WITH_PSCORE)
set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_heter)
endif()
endif() endif()
if(WITH_XPU_BKCL) if(WITH_XPU_BKCL)
set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_bkcl) set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_bkcl)
...@@ -171,12 +168,6 @@ if(WITH_PYTHON) ...@@ -171,12 +168,6 @@ if(WITH_PYTHON)
if(WITH_MPI) if(WITH_MPI)
set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_mpi) set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_mpi)
endif() endif()
if(WITH_ASCEND_CL)
set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_hccl)
if(WITH_PSCORE)
set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_heter)
endif()
endif()
if(WITH_CUSTOM_DEVICE) if(WITH_CUSTOM_DEVICE)
set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_custom) set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_custom)
endif() endif()
......
...@@ -41,19 +41,10 @@ limitations under the License. */ ...@@ -41,19 +41,10 @@ limitations under the License. */
#include "paddle/fluid/distributed/collective/ProcessGroupMPI.h" #include "paddle/fluid/distributed/collective/ProcessGroupMPI.h"
#endif #endif
#if defined(PADDLE_WITH_ASCEND_CL)
#include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h"
#endif
#if defined(PADDLE_WITH_CUSTOM_DEVICE) #if defined(PADDLE_WITH_CUSTOM_DEVICE)
#include "paddle/fluid/distributed/collective/ProcessGroupCustom.h" #include "paddle/fluid/distributed/collective/ProcessGroupCustom.h"
#endif #endif
#if defined(PADDLE_WITH_GLOO) && defined(PADDLE_WITH_PSCORE) && \
(defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_ASCEND_CL))
#include "paddle/fluid/distributed/collective/ProcessGroupHeter.h"
#endif
#if defined(PADDLE_WITH_GLOO) #if defined(PADDLE_WITH_GLOO)
#include "paddle/fluid/distributed/collective/ProcessGroupGloo.h" #include "paddle/fluid/distributed/collective/ProcessGroupGloo.h"
#include "paddle/fluid/distributed/store/tcp_store.h" #include "paddle/fluid/distributed/store/tcp_store.h"
...@@ -1258,62 +1249,6 @@ void BindDistributed(py::module *m) { ...@@ -1258,62 +1249,6 @@ void BindDistributed(py::module *m) {
py::call_guard<py::gil_scoped_release>()); py::call_guard<py::gil_scoped_release>());
#endif #endif
#if defined(PADDLE_WITH_GLOO) && defined(PADDLE_WITH_PSCORE) && \
(defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_ASCEND_CL))
py::class_<distributed::ProcessGroupHeter,
std::shared_ptr<distributed::ProcessGroupHeter>>(
*m, "ProcessGroupHeter", ProcessGroup)
.def(py::init<const std::shared_ptr<distributed::Store> &,
int,
int,
#if defined(PADDLE_WITH_ASCEND_CL)
const platform::NPUPlace &,
#else
const platform::CUDAPlace &,
#endif
int,
int,
int,
int,
int,
bool,
std::string,
int,
int>(),
py::arg("store"),
py::arg("rank"),
py::arg("world_size"),
py::arg("place"),
py::arg("gid") = 0,
py::arg("local_rank") = 0,
py::arg("local_size") = 1,
py::arg("gloo_rank") = 0,
py::arg("gloo_size") = 1,
py::arg("with_switch") = false,
py::arg("switch_endpoint") = "",
py::arg("src_rank") = "",
py::arg("dst_rank") = "",
py::call_guard<py::gil_scoped_release>());
#endif
#if defined(PADDLE_WITH_ASCEND_CL)
py::class_<distributed::ProcessGroupHCCL,
std::shared_ptr<distributed::ProcessGroupHCCL>>(
*m, "ProcessGroupHCCL", ProcessGroup)
.def(py::init<const std::shared_ptr<distributed::Store> &,
int,
int,
const platform::NPUPlace &,
int>(),
py::arg("store"),
py::arg("rank"),
py::arg("world_size"),
py::arg("place"),
py::arg("group_id") = 0,
py::call_guard<py::gil_scoped_release>());
#endif
#if defined(PADDLE_WITH_CUSTOM_DEVICE) #if defined(PADDLE_WITH_CUSTOM_DEVICE)
py::class_<distributed::ProcessGroupCustom, py::class_<distributed::ProcessGroupCustom,
std::shared_ptr<distributed::ProcessGroupCustom>>( std::shared_ptr<distributed::ProcessGroupCustom>>(
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
# limitations under the License. # limitations under the License.
import numpy as np import numpy as np
import os
import pickle import pickle
import io import io
import datetime import datetime
...@@ -148,15 +147,9 @@ def _new_process_group_impl( ...@@ -148,15 +147,9 @@ def _new_process_group_impl(
group_name, group_name,
pg_options, pg_options,
group_id=0, group_id=0,
src_rank=None,
dst_rank=None,
): ):
pg = None pg = None
genv = _get_global_env() genv = _get_global_env()
if backend != 'heter':
assert src_rank is None and dst_rank is None, (
"src_rank and dst_rank " "can only be set for heter backend."
)
assert backend in _valid_backend_list, "Unsupported backend: %s." % backend assert backend in _valid_backend_list, "Unsupported backend: %s." % backend
if backend == "gloo": if backend == "gloo":
place = core.CPUPlace() place = core.CPUPlace()
...@@ -164,52 +157,12 @@ def _new_process_group_impl( ...@@ -164,52 +157,12 @@ def _new_process_group_impl(
elif backend == "nccl": elif backend == "nccl":
place = core.CUDAPlace(genv.device_id) place = core.CUDAPlace(genv.device_id)
pg = core.ProcessGroupNCCL(store, rank, world_size, place, group_id) pg = core.ProcessGroupNCCL(store, rank, world_size, place, group_id)
elif backend == "hccl":
place = core.NPUPlace(genv.device_id)
pg = core.ProcessGroupHCCL(store, rank, world_size, place, group_id)
elif backend == "xccl": elif backend == "xccl":
place = core.CustomPlace(genv.device_type, genv.device_id) place = core.CustomPlace(genv.device_type, genv.device_id)
pg = core.ProcessGroupCustom(store, rank, world_size, place, group_id) pg = core.ProcessGroupCustom(store, rank, world_size, place, group_id)
elif backend == "bkcl": elif backend == "bkcl":
place = core.XPUPlace(genv.device_id) place = core.XPUPlace(genv.device_id)
pg = core.ProcessGroupBKCL(store, rank, world_size, place, group_id) pg = core.ProcessGroupBKCL(store, rank, world_size, place, group_id)
elif backend == "heter":
place = None
if core.is_compiled_with_cuda():
place = core.CUDAPlace(genv.device_id)
elif core.is_compiled_with_npu():
place = core.NPUPlace(genv.device_id)
cluster_id = int(os.getenv("CLUSTER_ID", "-1"))
assert cluster_id >= 0, "please set the CLUSTER_ID variable."
cluster_size = os.getenv("CLUSTER_SIZE", None)
assert cluster_size, "please set the CLUSTER_SIZE variable."
cluster_size = cluster_size.split(",")
cluster_size = [int(s) for s in cluster_size]
switch_ep = os.getenv("CLUSTER_SWITCH", None)
assert switch_ep, "please set the CLUSTER_SWITCH variable."
cluster_size_cumsum = np.cumsum(cluster_size)
cluster_offset = (
0 if cluster_id == 0 else cluster_size_cumsum[cluster_id - 1]
)
global_rank = cluster_offset + rank
global_world_size = cluster_size_cumsum[-1]
global_rank, global_world_size = _get_global_config(backend, rank)
pg = core.ProcessGroupHeter(
store,
rank=global_rank,
world_size=global_world_size,
place=place,
gid=group_id,
local_rank=rank,
local_size=world_size,
gloo_rank=cluster_id,
gloo_size=len(cluster_size),
with_switch=True,
switch_endpoint=switch_ep,
src_rank=src_rank,
dst_rank=dst_rank,
)
return pg return pg
...@@ -316,10 +269,8 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout): ...@@ -316,10 +269,8 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout):
) )
size = len(ranks) size = len(ranks)
ranks = sorted(ranks) ranks = sorted(ranks)
if backend == 'heter' or (size > 1 and global_rank in ranks): if size > 1 and global_rank in ranks:
rank = 0 if backend == 'heter' else ranks.index(global_rank) rank = 0 if backend == 'heter' else ranks.index(global_rank)
src_rank = ranks[0] if backend == 'heter' else None
dst_rank = ranks[1] if backend == 'heter' else None
pg = _new_process_group_impl( pg = _new_process_group_impl(
backend, backend,
_default_store, _default_store,
...@@ -328,8 +279,6 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout): ...@@ -328,8 +279,6 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout):
group_name, group_name,
pg_options=None, pg_options=None,
group_id=gid, group_id=gid,
src_rank=src_rank,
dst_rank=dst_rank,
) )
else: else:
rank = -1 rank = -1
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import random
import numpy as np
import os
import shutil
import paddle
from paddle.fluid import core
from datetime import timedelta
import paddle.fluid.core as core
from paddle.fluid.framework import _test_eager_guard
from paddle.fluid.dygraph.parallel import ParallelEnv
def init_process_group(strategy=None):
nranks = ParallelEnv().nranks
rank = ParallelEnv().local_rank
is_master = True if rank == 0 else False
store = paddle.fluid.core.TCPStore("127.0.0.1", 6173, is_master, nranks)
pg_group = core.ProcessGroupHCCL(store, rank, nranks)
return pg_group
class TestProcessGroupFp32(unittest.TestCase):
def setUp(self):
paddle.seed(2022)
random.seed(2022)
np.random.seed(2022)
self.config()
def config(self):
self.dtype = "float32"
self.shape = (2, 10, 5)
def test_create_process_group_nccl(self):
with _test_eager_guard():
paddle.set_device(
'npu:%d' % paddle.distributed.ParallelEnv().dev_id
)
pg = init_process_group()
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
sum_result = tensor_x + tensor_y
if pg.rank() == 0:
task = pg.allreduce(tensor_x)
task.wait()
assert np.array_equal(tensor_x, sum_result)
else:
task = pg.allreduce(tensor_y)
task.wait()
assert np.array_equal(tensor_y, sum_result)
print("test allreduce sum api ok")
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
max_result = paddle.maximum(tensor_x, tensor_y)
if pg.rank() == 0:
task = pg.allreduce(tensor_x, core.ReduceOp.MAX)
task.wait()
assert np.array_equal(tensor_x, max_result)
else:
task = pg.allreduce(tensor_y, core.ReduceOp.MAX)
task.wait()
assert np.array_equal(tensor_y, max_result)
print("test allreduce max api ok")
# test broadcast
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
broadcast_result = paddle.assign(tensor_x)
if pg.rank() == 0:
task = pg.broadcast(tensor_x, 0)
task.synchronize()
paddle.device.cuda.synchronize()
assert task.is_completed()
assert np.array_equal(broadcast_result, tensor_x)
else:
task = pg.broadcast(tensor_y, 0)
task.synchronize()
paddle.device.cuda.synchronize()
assert task.is_completed()
assert np.array_equal(broadcast_result, tensor_y)
print("test broadcast api ok")
# test barrier
# rank 0
if pg.rank() == 0:
task = pg.barrier()
task.wait()
# rank 1
else:
task = pg.barrier()
task.wait()
print("test barrier api ok\n")
exit(0)
# test allgather
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
out_shape = list(self.shape)
out_shape[0] *= 2
out = np.random.random(out_shape).astype(self.dtype)
tensor_out = paddle.to_tensor(out)
if pg.rank() == 0:
task = pg.all_gather(tensor_x, tensor_out)
task.wait()
paddle.device.cuda.synchronize()
# rank 1
else:
task = pg.all_gather(tensor_y, tensor_out)
task.wait()
paddle.device.cuda.synchronize()
out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2])
out_2 = paddle.slice(
tensor_out, [0], [out_shape[0] // 2], [out_shape[0]]
)
assert np.array_equal(tensor_x, out_1)
assert np.array_equal(tensor_y, out_2)
print("test allgather api ok\n")
# test alltoall
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
out1 = np.random.random(self.shape).astype(self.dtype)
out2 = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
tensor_out1 = paddle.to_tensor(out1)
tensor_out2 = paddle.to_tensor(out2)
raw_tensor_x_2 = paddle.slice(
tensor_x, [0], [self.shape[0] // 2], [self.shape[0]]
)
raw_tensor_y_1 = paddle.slice(
tensor_y, [0], [0], [self.shape[0] // 2]
)
if pg.rank() == 0:
task = pg.alltoall(tensor_x, tensor_out1)
task.wait()
paddle.device.cuda.synchronize()
# rank 1
else:
task = pg.alltoall(tensor_y, tensor_out2)
task.wait()
paddle.device.cuda.synchronize()
out1_2 = paddle.slice(
tensor_out1, [0], [self.shape[0] // 2], [self.shape[0]]
)
out2_1 = paddle.slice(tensor_out2, [0], [0], [self.shape[0] // 2])
if pg.rank() == 0:
assert np.array_equal(out1_2.numpy(), raw_tensor_y_1.numpy())
else:
assert np.array_equal(out2_1, raw_tensor_x_2)
print("test alltoall api ok\n")
# test Reduce
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
sum_result = tensor_x + tensor_y
if pg.rank() == 0:
task = pg.reduce(tensor_x, 0)
task.wait()
paddle.device.cuda.synchronize()
# rank 1
else:
task = pg.reduce(tensor_y, 0)
task.wait()
paddle.device.cuda.synchronize()
if pg.rank() == 0:
assert np.array_equal(tensor_x, sum_result)
print("test reduce sum api ok\n")
# test Scatter
# rank 0
in_shape = list(self.shape)
in_shape[0] *= 2
x = np.random.random(in_shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
if pg.rank() == 0:
task = pg.scatter(tensor_x, tensor_y, 0)
task.wait()
paddle.device.cuda.synchronize()
# rank 1
else:
task = pg.scatter(tensor_x, tensor_y, 0)
task.wait()
paddle.device.cuda.synchronize()
out1 = paddle.slice(tensor_x, [0], [0], [self.shape[0]])
out2 = paddle.slice(
tensor_x, [0], [self.shape[0]], [self.shape[0] * 2]
)
if pg.rank() == 0:
assert np.array_equal(tensor_y, out1)
else:
assert np.array_equal(tensor_y, out2)
print("test scatter api ok\n")
class TestProcessGroupFp16(TestProcessGroupFp32):
def setUp(self):
paddle.seed(2022)
random.seed(2022)
np.random.seed(2022)
self.config()
def config(self):
self.dtype = "float16"
self.shape = (4, 20, 20)
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import sys
sys.path.append("..")
from test_parallel_dygraph_dataparallel import TestMultipleGpus
class TestProcessGroup(TestMultipleGpus):
def test_process_group_nccl(self):
self.run_mnist_2gpu('process_group_hccl.py')
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册