未验证 提交 80ca78a2 编写于 作者: R ronnywang 提交者: GitHub

[CustomDevice] add custom ccl 2/2 (#44650)

* [CustomDevice] add custom ccl 2/2

* update

* update

* update launch
上级 377b3465
......@@ -51,3 +51,16 @@ if(WITH_ASCEND_CL)
eager_api)
endif()
endif()
if(WITH_CUSTOM_DEVICE)
cc_library(
processgroup_custom
SRCS ProcessGroupCustom.cc CustomCCLTools.cc Common.cc
DEPS phi_backends
place
enforce
collective_helper
device_context
phi_api
eager_api)
endif()
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/collective/CustomCCLTools.h"
#include "paddle/fluid/distributed/collective/Types.h"
namespace paddle {
namespace distributed {
phi::ccl::CCLReduceOp ToCustomCCLRedType(ReduceOp reduction) {
static const std::map<ReduceOp, phi::ccl::CCLReduceOp> red_type = {
{ReduceOp::MIN, phi::ccl::CCLReduceOp::MIN},
{ReduceOp::MAX, phi::ccl::CCLReduceOp::MAX},
{ReduceOp::SUM, phi::ccl::CCLReduceOp::SUM},
{ReduceOp::PRODUCT, phi::ccl::CCLReduceOp::PRODUCT},
};
auto it = red_type.find(reduction);
PADDLE_ENFORCE_EQ(
it != red_type.end(),
true,
platform::errors::InvalidArgument("Invalid hccl reduction. "
"Must be Min | Max | Prod | Sum"));
return it->second;
}
std::string SerializeCustomCCLUniqueId(const phi::ccl::CCLRootId& ccl_id) {
const uint8_t* bytes = ccl_id.data();
std::ostringstream oss;
for (size_t i = 0; i < ccl_id.size(); ++i) {
oss << std::hex << static_cast<int>(bytes[i]);
}
return oss.str();
}
} // namespace distributed
} // namespace paddle
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <error.h>
#include <string>
#include "paddle/fluid/distributed/collective/Types.h"
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/phi/backends/device_guard.h"
#include "paddle/phi/backends/device_manager.h"
namespace paddle {
namespace distributed {
class CustomEventManager {
public:
CustomEventManager() = default;
~CustomEventManager() {
if (is_created_) {
event_->Destroy();
}
}
CustomEventManager(const CustomEventManager&) = delete;
CustomEventManager& operator=(const CustomEventManager&) = delete;
CustomEventManager(CustomEventManager&& other) {
std::swap(is_created_, other.is_created_);
std::swap(device_index_, other.device_index_);
std::swap(device_type_, other.device_type_);
std::swap(event_, other.event_);
}
CustomEventManager& operator=(CustomEventManager&& other) {
std::swap(is_created_, other.is_created_);
std::swap(device_index_, other.device_index_);
std::swap(device_type_, other.device_type_);
std::swap(event_, other.event_);
return *this;
}
bool IsCreated() const { return is_created_; }
int8_t DeviceId() const { return device_index_; }
std::string DeviceType() const { return device_type_; }
phi::event::event_t GetRawCustomEvent() const { return event_->raw_event(); }
phi::event::Event* GetCustomEvent() const { return event_.get(); }
void Record(const paddle::platform::CustomDeviceContext& ctx) {
auto place = ctx.GetPlace();
auto device_type = place.GetDeviceType();
auto device_index = place.GetDeviceId();
if (!is_created_) {
CreateEvent(place);
}
PADDLE_ENFORCE_EQ(device_index,
device_index_,
platform::errors::PreconditionNotMet(
"CustomDeviceContext's device %d does not match"
"Event's device %d",
device_index,
device_index_));
PADDLE_ENFORCE_EQ(device_type,
device_type_,
platform::errors::PreconditionNotMet(
"CustomDeviceContext's device %d does not match"
"Event's device type %d",
device_type,
device_type_));
phi::DeviceGuard guard(place);
phi::stream::Stream stream(place, ctx.stream());
event_->Record(&stream);
}
bool Query() const { return event_->Query(); }
void Block(const paddle::platform::CustomDeviceContext& ctx) const {
if (is_created_) {
auto place = ctx.GetPlace();
auto device_type = place.GetDeviceType();
auto device_index = place.GetDeviceId();
PADDLE_ENFORCE_EQ(device_index,
device_index_,
platform::errors::PreconditionNotMet(
"CustomDeviceContext's device %d does not match"
"Event's device %d",
device_index,
device_index_));
PADDLE_ENFORCE_EQ(device_type,
device_type_,
platform::errors::PreconditionNotMet(
"CustomDeviceContext's device %d does not match"
"Event's device type %d",
device_type,
device_type_));
phi::DeviceGuard guard(place);
phi::stream::Stream stream(place, ctx.stream());
stream.WaitEvent(event_.get());
}
}
private:
bool is_created_{false};
std::shared_ptr<phi::event::Event> event_{nullptr};
int8_t device_index_{0};
std::string device_type_;
private:
void CreateEvent(const platform::Place& place) {
device_index_ = place.GetDeviceId();
device_type_ = place.GetDeviceType();
event_.reset(new phi::event::Event);
event_->Init(place);
is_created_ = true;
}
};
class CustomCCLCommManager {
public:
CustomCCLCommManager(const std::string& device_type,
phi::ccl::CCLComm ccl_comm)
: device_type_(device_type), ccl_comm_(ccl_comm) {}
CustomCCLCommManager() : CustomCCLCommManager("", nullptr) {}
~CustomCCLCommManager() noexcept {
std::unique_lock<std::mutex> lock(mutex_);
if (ccl_comm_) {
phi::DeviceManager::CCLDestroyComm(device_type_, ccl_comm_);
}
}
static std::shared_ptr<CustomCCLCommManager> Create(
const std::string& device_type,
int num_ranks,
int rank,
phi::ccl::CCLRootId* comm_id,
phi::ccl::CCLComm* ccl_comm) {
auto custom_ccl_manager = std::make_shared<CustomCCLCommManager>();
phi::DeviceManager::CCLCommInitRank(
device_type, num_ranks, comm_id, rank, ccl_comm);
custom_ccl_manager->device_type_ = device_type;
custom_ccl_manager->ccl_id_ = comm_id;
custom_ccl_manager->rank_ = rank;
custom_ccl_manager->ccl_comm_ = *ccl_comm;
return custom_ccl_manager;
}
phi::ccl::CCLRootId* GetCustomCCLId() const {
std::unique_lock<std::mutex> lock(mutex_);
return ccl_id_;
}
phi::ccl::CCLComm GetCustomCCLComm() const {
std::unique_lock<std::mutex> lock(mutex_);
return ccl_comm_;
}
CustomCCLCommManager(const CustomCCLCommManager&) = delete;
CustomCCLCommManager& operator=(const CustomCCLCommManager&) = delete;
CustomCCLCommManager& operator=(CustomCCLCommManager&& other) = delete;
CustomCCLCommManager(CustomCCLCommManager&& other) {
std::unique_lock<std::mutex> lock(other.mutex_);
std::swap(ccl_comm_, other.ccl_comm_);
}
protected:
std::string device_type_;
phi::ccl::CCLComm ccl_comm_;
phi::ccl::CCLRootId* ccl_id_;
int rank_;
mutable std::mutex mutex_;
};
phi::ccl::CCLReduceOp ToCustomCCLRedType(ReduceOp reduction);
std::string SerializeCustomCCLUniqueId(const phi::ccl::CCLRootId& ccl_id);
} // namespace distributed
} // namespace paddle
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/collective/ProcessGroupCustom.h"
#include "paddle/fluid/distributed/collective/Common.h"
#include "paddle/fluid/distributed/collective/CustomCCLTools.h"
#include "paddle/fluid/memory/malloc.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
#include "paddle/phi/common/place.h"
DECLARE_bool(xccl_blocking_wait);
constexpr int64_t kWaitBlockTImeout = 10;
namespace paddle {
namespace distributed {
void SyncDefaultStream(
const std::vector<Place>& places,
std::vector<CustomEventManager>& cclEvents, // NOLINT
std::vector<std::unique_ptr<CustomDeviceContext>>& dev_ctx) { // NOLINT
for (size_t i = 0; i < places.size(); ++i) {
auto* default_ctx = static_cast<platform::CustomDeviceContext*>(
platform::DeviceContextPool::Instance().Get(places[i]));
cclEvents[i].Record(*dev_ctx[i]);
cclEvents[i].Block(*default_ctx);
}
}
std::shared_ptr<ProcessGroupCustom::CustomTask> ProcessGroupCustom::CreateTask(
std::vector<Place> places,
int rank,
CommType comm_type,
const std::vector<phi::DenseTensor>& inputs) {
return std::make_shared<ProcessGroupCustom::CustomTask>(
places, rank, comm_type, inputs);
}
ProcessGroupCustom::CustomTask::CustomTask(
const std::vector<Place>& places,
int rank,
CommType CommType,
const std::vector<phi::DenseTensor>& inputs)
: Task(rank, inputs, CommType), places_(places) {
control_events_.resize(places.size());
cclComms_.resize(places.size());
}
ProcessGroupCustom::CustomTask::~CustomTask() {}
void ProcessGroupCustom::CustomTask::SetOutputs(
std::vector<phi::DenseTensor>& outputs) { // NOLINT
outputs_ = std::make_shared<std::vector<phi::DenseTensor>>(outputs);
}
void ProcessGroupCustom::CustomTask::SynchronizeStreams() {
for (size_t i = 0; i < places_.size(); ++i) {
auto* default_ctx = static_cast<platform::CustomDeviceContext*>(
platform::DeviceContextPool::Instance().Get(places_[i]));
phi::DeviceGuard guard(default_ctx->GetPlace());
phi::stream::Stream stream(default_ctx->GetPlace(), default_ctx->stream());
stream.WaitEvent(control_events_[i].GetCustomEvent());
}
}
bool ProcessGroupCustom::CustomTask::IsCompleted() {
for (size_t i = 0; i < places_.size(); ++i) {
if (!control_events_[i].Query()) {
return false;
}
}
return true;
}
bool ProcessGroupCustom::CustomTask::Wait(std::chrono::milliseconds timeout) {
SynchronizeStreams();
while (!IsCompleted()) {
std::this_thread::sleep_for(std::chrono::milliseconds(kWaitBlockTImeout));
}
return true;
}
// Same as Wait
void ProcessGroupCustom::CustomTask::Synchronize() { Wait(kWaitTimeout); }
ProcessGroupCustom::ProcessGroupCustom(const std::shared_ptr<Store>& store,
int rank,
int size,
const platform::Place& place,
int gid)
: ProcessGroup(rank, size, place, gid),
store_(store),
device_type_(place.GetDeviceType()) {
phi::DeviceManager::SetDevice(place_);
}
void ProcessGroupCustom::BroadcastUniqueCustomID(
std::vector<phi::ccl::CCLRootId>& ccl_ids) { // NOLINT
if (rank_ == 0) {
for (size_t i = 0; i < ccl_ids.size(); i++) {
auto key = "ProcessGroupCustom/ccl_ids/" + std::to_string(i);
store_->set(key, ccl_ids[i]);
}
} else {
for (size_t i = 0; i < ccl_ids.size(); i++) {
auto key = "ProcessGroupCustom/ccl_ids/" + std::to_string(i);
ccl_ids[i] = store_->get(key);
}
}
}
// create CustomCCLManager cache for places_key
void ProcessGroupCustom::CreateCustomManagerCache(
const std::string& places_key, const std::vector<Place>& places) {
PADDLE_ENFORCE_EQ(places_key.empty(),
false,
platform::errors::PreconditionNotMet(
"Not able to create/get the HCCL Communicator since "
"the NPU place are not known"));
const std::string device_type = places.back().GetDeviceType();
std::vector<std::shared_ptr<CustomCCLCommManager>> ccl_comms;
ccl_comms.resize(places.size());
// using vector just for broadcast
std::vector<phi::ccl::CCLRootId> ccl_ids;
ccl_ids.resize(1);
auto& ccl_id = ccl_ids.front();
if (rank_ == 0) {
phi::DeviceManager::CCLGetUniqueId(device_type, &ccl_id);
}
BroadcastUniqueCustomID(ccl_ids);
VLOG(3) << "init custom ccl rank: " << rank_ << ", nranks: " << size_
<< ", place: " << places_key
<< ", custom ccl uniqueid: " << SerializeCustomCCLUniqueId(ccl_id);
std::vector<std::unique_ptr<CustomDeviceContext>> dev_ctx;
dev_ctx.resize(places.size());
std::unique_ptr<phi::ccl::CCLComm> comms(
new phi::ccl::CCLComm[places.size()]);
for (size_t i = 0; i < places.size(); ++i) {
phi::DeviceGuard guard(places[i]);
ccl_comms[i] = CustomCCLCommManager::Create(
device_type, GetSize(), GetRank(), &ccl_id, comms.get() + i);
dev_ctx[i].reset(new CustomDeviceContext(places[i]));
}
std::vector<CustomEventManager> events;
events.resize(places.size());
// These caches will be useful to process sync/wait/communicate
places_to_events_.emplace(places_key, std::move(events));
places_to_customcomm_.emplace(places_key, std::move(ccl_comms));
places_to_ctx_.emplace(places_key, std::move(dev_ctx));
}
template <typename Fn>
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::Collective(
std::vector<phi::DenseTensor>& inputs,
std::vector<phi::DenseTensor>& outputs,
Fn fn,
CommType op_type) {
const auto places = GetPlaceList(inputs);
const auto key = GetKeyFromPlaces(places);
{
std::lock_guard<std::mutex> lock(mutex_);
if (places_to_customcomm_.find(key) == places_to_customcomm_.end()) {
CreateCustomManagerCache(key, places);
}
}
auto& ccl_comms = places_to_customcomm_[key];
SyncDefaultStream(places, places_to_events_[key], places_to_ctx_[key]);
auto task = CreateTask(places, rank_, op_type, inputs);
task->SetOutputs(outputs);
for (size_t i = 0; i < inputs.size(); ++i) {
phi::DeviceGuard guard(places[i]);
const auto& ccl_stream = places_to_ctx_[key][i]->stream();
phi::stream::Stream stream(places[i], ccl_stream);
fn(inputs[i], outputs[i], ccl_comms[i]->GetCustomCCLComm(), stream);
}
for (size_t i = 0; i < inputs.size(); ++i) {
phi::DeviceGuard guard(places[i]);
task->control_events_[i].Record(*places_to_ctx_[key][i]);
}
return task;
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::AllReduce(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
const AllreduceOptions& opts) {
return Collective(
in_tensors,
out_tensors,
[&](phi::DenseTensor& input,
phi::DenseTensor& output,
phi::ccl::CCLComm comm,
const phi::stream::Stream& stream) {
return phi::DeviceManager::CCLAllReduce(
device_type_,
input.data(),
output.data(),
input.numel(),
phi::ccl::ToCCLDataType(input.dtype()),
ToCustomCCLRedType(opts.reduce_op),
comm,
stream);
},
CommType::ALLREDUCE);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::Broadcast(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
const BroadcastOptions& opts) {
return Collective(
in_tensors,
out_tensors,
[&](phi::DenseTensor& input,
phi::DenseTensor& output,
phi::ccl::CCLComm comm,
const phi::stream::Stream& stream) {
int root = opts.source_rank * in_tensors.size() + opts.source_root;
if (rank_ == root) {
return phi::DeviceManager::CCLBroadcast(
device_type_,
input.data(),
input.numel(),
phi::ccl::ToCCLDataType(input.dtype()),
root,
comm,
stream);
} else {
return phi::DeviceManager::CCLBroadcast(
device_type_,
output.data(),
output.numel(),
phi::ccl::ToCCLDataType(output.dtype()),
root,
comm,
stream);
}
},
CommType::BROADCAST);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::Barrier(
const BarrierOptions& opts) {
// Only support single card single process
std::vector<phi::CustomPlace> places = {place_};
std::vector<phi::DenseTensor> barrierTensors;
barrierTensors.reserve(places.size());
for (auto& place : places) {
phi::DeviceGuard guard(place);
auto dt = full({1}, 0, phi::DataType::FLOAT32, place);
barrierTensors.push_back(
*std::dynamic_pointer_cast<phi::DenseTensor>(dt.impl()));
}
auto task = ProcessGroupCustom::AllReduce(barrierTensors, barrierTensors);
auto xccl_task = dynamic_cast<ProcessGroupCustom::CustomTask*>(task.get());
xccl_task->barrierTensors_ = std::move(barrierTensors);
return task;
}
} // namespace distributed
} // namespace paddle
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <chrono>
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "paddle/fluid/distributed/collective/CustomCCLTools.h"
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/distributed/store/store.h"
#include "paddle/fluid/platform/device/npu/npu_stream.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/gen_comm_id_helper.h"
#include "paddle/fluid/platform/place.h"
namespace paddle {
namespace distributed {
using Place = paddle::platform::Place;
using CustomDeviceContext = paddle::platform::CustomDeviceContext;
class ProcessGroupCustom : public ProcessGroup {
public:
class CustomTask : public ProcessGroup::Task,
public std::enable_shared_from_this<CustomTask> {
public:
CustomTask(const std::vector<Place>& places,
int rank,
CommType CommType,
const std::vector<phi::DenseTensor>& inputs);
bool IsCompleted();
void SynchronizeStreams();
bool Wait(std::chrono::milliseconds timeout = kWaitTimeout);
void Synchronize();
void SetOutputs(std::vector<phi::DenseTensor>& outputs); // NOLINT
virtual ~CustomTask();
std::vector<CustomEventManager> control_events_;
std::vector<phi::DenseTensor> barrierTensors_;
protected:
std::vector<Place> places_;
std::vector<std::shared_ptr<CustomCCLCommManager>> cclComms_;
std::shared_ptr<std::vector<phi::DenseTensor>> outputs_;
private:
const std::string device_type_;
};
ProcessGroupCustom(const std::shared_ptr<Store>& store,
int rank,
int size,
const platform::Place& place,
int gid);
const std::string GetBackendName() const override {
return "XCCL_" + device_type_;
}
std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const AllreduceOptions& = AllreduceOptions()) override;
std::shared_ptr<ProcessGroup::Task> Broadcast(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const BroadcastOptions& = BroadcastOptions()) override;
std::shared_ptr<ProcessGroup::Task> Barrier(
const BarrierOptions& = BarrierOptions()) override;
protected:
virtual std::shared_ptr<ProcessGroupCustom::CustomTask> CreateTask(
std::vector<Place> places,
int rank,
CommType opType,
const std::vector<phi::DenseTensor>& inputs);
std::shared_ptr<Store> store_;
std::shared_ptr<CustomCCLCommManager> custom_comm_;
std::mutex mutex_;
std::unordered_map<std::string,
std::vector<std::shared_ptr<CustomCCLCommManager>>>
places_to_customcomm_;
std::unordered_map<std::string, std::vector<CustomEventManager>>
places_to_events_;
std::unordered_map<std::string,
std::vector<std::unique_ptr<CustomDeviceContext>>>
places_to_ctx_;
std::set<int> used_place_ids_;
private:
void BcastCustomId(std::vector<phi::ccl::CCLRootId>& ccl_ids,
int root, // NOLINT
int server_fd);
void BroadcastUniqueCustomID(
std::vector<phi::ccl::CCLRootId>& custom_ccl_ids); // NOLINT
template <typename Fn>
std::shared_ptr<ProcessGroup::Task> Collective(
std::vector<phi::DenseTensor>& inputs, // NOLINT
std::vector<phi::DenseTensor>& outputs, // NOLINT
Fn fn,
CommType op_type);
void CreateCustomManagerCache(const std::string& places_key,
const std::vector<Place>& places);
const std::string device_type_;
};
} // namespace distributed
} // namespace paddle
......@@ -73,6 +73,12 @@ ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr<Store>& store,
src_rank_(src_rank),
dst_rank_(dst_rank) {
return;
#ifdef PADDLE_WITH_CUSTOM
if (paddle::platform::is_custom_place(place_)) {
inner_pg_ = std::make_shared<ProcessGroupCustom>(
store, local_rank, local_size, place_, IGNORE_ID);
} else {
#endif
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
inner_pg_ = std::make_shared<ProcessGroupNCCL>(
store, local_rank, local_size, place_, IGNORE_ID);
......@@ -83,6 +89,10 @@ ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr<Store>& store,
PADDLE_THROW(platform::errors::Unavailable(
"ProcessGroupHeter only supports NCCL, RCCL and HCCL now."));
#endif
#ifdef PADDLE_WITH_CUSTOM
}
#endif
if (local_rank_ == 0 && !with_switch_) {
auto opts = ProcessGroupGloo::GlooOptions::create();
opts->device = ProcessGroupGloo::createDefaultDevice();
......
......@@ -45,6 +45,11 @@
#include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h"
#endif
#if defined(PADDLE_WITH_CUSTOM_DEVICE)
#include "paddle/fluid/distributed/collective/CustomCCLTools.h"
#include "paddle/fluid/distributed/collective/ProcessGroupCustom.h"
#endif
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
(defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_ASCEND_CL))
......
......@@ -13,6 +13,8 @@
// limitations under the License.
#include "paddle/fluid/distributed/collective/reducer.h"
#include "paddle/phi/backends/device_guard.h"
#include "paddle/phi/backends/device_manager.h"
namespace paddle {
namespace distributed {
......@@ -147,8 +149,8 @@ std::vector<std::vector<size_t>> Eager_AssignGroupBySize(
}
template <typename DeviceContext, typename T>
static void ConcatTensorsForAllReduce(
const DeviceContext &context,
struct ConcatTensorsForAllReduce {
void operator()(const DeviceContext &context,
const std::vector<phi::DenseTensor> &dense_tensors_,
Tensor *p_dense_contents) {
operators::math::ConcatFunctor<DeviceContext, T> concat_functor_;
......@@ -158,11 +160,12 @@ static void ConcatTensorsForAllReduce(
0,
std::dynamic_pointer_cast<phi::DenseTensor>(p_dense_contents->impl())
.get());
}
}
};
template <typename DeviceContext, typename T>
static void SplitTensorsForAllReduce(
const DeviceContext &context,
struct SplitTensorsForAllReduce {
void operator()(const DeviceContext &context,
Tensor *p_dense_contents,
std::vector<phi::DenseTensor> *p_dense_tensors) {
auto *in =
......@@ -181,7 +184,55 @@ static void SplitTensorsForAllReduce(
operators::math::SplitFunctor<DeviceContext, T> split_functor_;
split_functor_(context, *in, shape_refer, 0, &outs);
}
}
};
#ifdef PADDLE_WITH_CUSTOM_DEVICE
// note(wangran16): A temporary solution for all backends.
template <typename T>
struct ConcatTensorsForAllReduce<platform::CustomDeviceContext, T> {
void operator()(const platform::CustomDeviceContext &context,
const std::vector<phi::DenseTensor> &dense_tensors_,
Tensor *p_dense_contents) {
phi::DeviceGuard guard(context.GetPlace());
auto *out =
std::dynamic_pointer_cast<phi::DenseTensor>(p_dense_contents->impl())
.get();
uint8_t *out_data = reinterpret_cast<uint8_t *>(out->data<T>());
auto *device = phi::DeviceManager::GetDeviceWithPlace(context.GetPlace());
size_t offset = 0;
for (const auto &tensor : dense_tensors_) {
const uint8_t *in_data =
reinterpret_cast<const uint8_t *>(tensor.data<T>());
auto sz = tensor.numel() * sizeof(T);
device->MemoryCopyD2D(out_data + offset, in_data, sz, nullptr);
offset += sz;
}
}
};
template <typename T>
struct SplitTensorsForAllReduce<platform::CustomDeviceContext, T> {
void operator()(const platform::CustomDeviceContext &context,
Tensor *p_dense_contents,
std::vector<phi::DenseTensor> *p_dense_tensors) {
auto *in =
std::dynamic_pointer_cast<phi::DenseTensor>(p_dense_contents->impl())
.get();
uint8_t *in_data = reinterpret_cast<uint8_t *>(in->data<T>());
auto *device = phi::DeviceManager::GetDeviceWithPlace(context.GetPlace());
size_t offset = 0;
for (auto &tensor : *p_dense_tensors) {
uint8_t *out_data = reinterpret_cast<uint8_t *>(tensor.data<T>());
auto sz = tensor.numel() * sizeof(T);
device->MemoryCopyD2D(out_data, in_data + offset, sz, nullptr);
offset += sz;
}
}
};
#endif
// context is used to select the stream for concat
template <typename DeviceContext>
......@@ -192,15 +243,15 @@ static void ConcatTensorsWithType(
phi::DataType type) {
switch (type) {
case phi::DataType::FLOAT16:
ConcatTensorsForAllReduce<DeviceContext, platform::float16>(
ConcatTensorsForAllReduce<DeviceContext, platform::float16>()(
context, dense_tensors_, p_dense_contents);
break;
case phi::DataType::FLOAT32:
ConcatTensorsForAllReduce<DeviceContext, float>(
ConcatTensorsForAllReduce<DeviceContext, float>()(
context, dense_tensors_, p_dense_contents);
break;
case phi::DataType::FLOAT64:
ConcatTensorsForAllReduce<DeviceContext, double>(
ConcatTensorsForAllReduce<DeviceContext, double>()(
context, dense_tensors_, p_dense_contents);
break;
default:
......@@ -219,15 +270,15 @@ static void SplitTensorsWithType(const DeviceContext &context,
phi::DataType type) {
switch (type) {
case phi::DataType::FLOAT16:
SplitTensorsForAllReduce<DeviceContext, platform::float16>(
SplitTensorsForAllReduce<DeviceContext, platform::float16>()(
context, p_dense_contents, p_dense_tensors);
break;
case phi::DataType::FLOAT32:
SplitTensorsForAllReduce<DeviceContext, float>(
SplitTensorsForAllReduce<DeviceContext, float>()(
context, p_dense_contents, p_dense_tensors);
break;
case phi::DataType::FLOAT64:
SplitTensorsForAllReduce<DeviceContext, double>(
SplitTensorsForAllReduce<DeviceContext, double>()(
context, p_dense_contents, p_dense_tensors);
break;
default:
......@@ -249,6 +300,18 @@ void EagerGroup::ConcatTensors(const platform::Place &place) {
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't concat grad tensors since it's not compiled with NCCL,"
"Please recompile or reinstall Paddle with NCCL support."));
#endif
} else if (platform::is_custom_place(place)) {
#ifdef PADDLE_WITH_CUSTOM_DEVICE
auto *default_ctx = static_cast<platform::CustomDeviceContext *>(
platform::DeviceContextPool::Instance().Get(place));
ConcatTensorsWithType(
*default_ctx, dense_tensors_, &dense_contents_, dtype_);
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't concat grad tensors since it's not compiled with "
"CUSTOM_DEVICE,"
"Please recompile or reinstall Paddle with CUSTOM_DEVICE support."));
#endif
} else if (platform::is_cpu_place(place)) {
auto *default_ctx = static_cast<phi::CPUContext *>(
......@@ -272,6 +335,18 @@ void EagerGroup::SplitTensors(const platform::Place &place) {
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't split grad tensor since it's not compiled with NCCL,"
"Please recompile or reinstall Paddle with NCCL support."));
#endif
} else if (platform::is_custom_place(place)) {
#ifdef PADDLE_WITH_CUSTOM_DEVICE
auto *default_ctx = static_cast<platform::CustomDeviceContext *>(
platform::DeviceContextPool::Instance().Get(place));
SplitTensorsWithType(
*default_ctx, &dense_contents_, &dense_tensors_, dtype_);
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't split grad tensor since it's not compiled with "
"CUSTOM_DEVICE,"
"Please recompile or reinstall Paddle with CUSTOM_DEVICE support."));
#endif
} else if (platform::is_cpu_place(place)) {
auto *default_ctx = static_cast<phi::CPUContext *>(
......@@ -889,6 +964,16 @@ void EagerReducer::AllReduceSparse(EagerGroup *group,
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't concat grad tensors since it's not compiled with NCCL,"
"Please recompile or reinstall Paddle with NCCL support."));
#endif
} else if (platform::is_custom_place(inner_place_)) {
#ifdef PADDLE_WITH_CUSTOM_DEVICE
dev_ctx = static_cast<platform::CustomDeviceContext *>(
platform::DeviceContextPool::Instance().Get(inner_place_));
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't concat grad tensors since it's not compiled with "
"CUSTOM_DEVICE,"
"Please recompile or reinstall Paddle with CUSTOM_DEVICE support."));
#endif
} else if (platform::is_cpu_place(inner_place_)) {
dev_ctx = static_cast<phi::CPUContext *>(
......
......@@ -155,6 +155,9 @@ if(NOT ON_INFER)
set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_heter)
endif()
endif()
if(WITH_CUSTOM_DEVICE)
set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_custom)
endif()
set(PYBIND_SRCS ${PYBIND_SRCS} distributed_py.cc)
endif()
......
......@@ -39,6 +39,10 @@ limitations under the License. */
#include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h"
#endif
#if defined(PADDLE_WITH_CUSTOM_DEVICE)
#include "paddle/fluid/distributed/collective/ProcessGroupCustom.h"
#endif
#if defined(PADDLE_WITH_GLOO) && defined(PADDLE_WITH_PSCORE) && \
(defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_ASCEND_CL))
#include "paddle/fluid/distributed/collective/ProcessGroupHeter.h"
......@@ -458,6 +462,24 @@ void BindDistributed(py::module *m) {
py::arg("group_id") = 0,
py::call_guard<py::gil_scoped_release>());
#endif
#if defined(PADDLE_WITH_CUSTOM_DEVICE)
py::class_<distributed::ProcessGroupCustom,
std::shared_ptr<distributed::ProcessGroupCustom>>(
*m, "ProcessGroupCustom", ProcessGroup)
.def(py::init<const std::shared_ptr<distributed::Store> &,
int,
int,
const platform::CustomPlace &,
int>(),
py::arg("store"),
py::arg("rank"),
py::arg("world_size"),
py::arg("place"),
py::arg("group_id") = 0,
py::call_guard<py::gil_scoped_release>());
#endif
py::class_<distributed::ProcessGroup::Task,
......
......@@ -146,7 +146,7 @@ _group_map_backend = {}
# Name of the default group for init_parallel_env
_default_group_name = "_default_pg"
_valid_backend_list = ['nccl', 'gloo', 'hccl', 'heter']
_valid_backend_list = ['nccl', 'gloo', 'hccl', 'heter', 'xccl']
_default_store = None # the default tcp store
_default_backend = None
......@@ -271,6 +271,9 @@ def _new_process_group_impl(backend,
elif backend == "hccl":
place = core.NPUPlace(genv.device_id)
pg = core.ProcessGroupHCCL(store, rank, world_size, place, group_id)
elif backend == "xccl":
place = core.CustomPlace(genv.device_type, genv.device_id)
pg = core.ProcessGroupCustom(store, rank, world_size, place, group_id)
elif backend == "heter":
place = None
if core.is_compiled_with_cuda():
......
......@@ -1965,10 +1965,13 @@ class ParameterServerLauncher(object):
def check_backend(backend):
if backend not in ['nccl', 'gloo', 'bkcl', 'cncl', 'auto', 'hccl', 'heter']:
raise ValueError("paddle.distributed initialize error, "
if backend not in [
'nccl', 'gloo', 'bkcl', 'cncl', 'auto', 'hccl', 'heter', 'xccl'
]:
raise ValueError(
"paddle.distributed initialize error, "
"backend argument can only be one of "
"'nccl', 'gloo', 'bkcl', 'auto', 'hccl', 'heter' "
"'nccl', 'gloo', 'bkcl', 'auto', 'hccl', 'heter', 'xccl' "
"but got %s" % backend)
if backend == 'nccl' and not fluid.core.is_compiled_with_cuda():
......
......@@ -13,6 +13,8 @@
# limitations under the License.
import os
import paddle.fluid as fluid
from paddle.device import get_available_custom_device
class DeviceType:
......@@ -22,6 +24,7 @@ class DeviceType:
NPU = 'npu'
MLU = 'mlu'
IPU = 'ipu'
CUSTOM_DEVICE = 'custom_device'
class Device(object):
......@@ -72,6 +75,8 @@ class Device(object):
return 'FLAGS_selected_mlus'
if self._dtype == DeviceType.IPU:
return 'FLAGS_selected_ipus'
if self._dtype == DeviceType.CUSTOM_DEVICE:
return 'FLAGS_selected_{}s'.format(os.getenv('PADDLE_XCCL_BACKEND'))
return 'FLAGS_selected_devices'
def get_selected_devices(self, devices=''):
......@@ -84,11 +89,23 @@ class Device(object):
devs = [x.strip() for x in devices.split(',')]
return [str(self._labels.index(d)) for d in devs]
def get_custom_device_envs(self):
return {
'PADDLE_DISTRI_BACKEND': 'xccl',
'PADDLE_XCCL_BACKEND': os.getenv('PADDLE_XCCL_BACKEND'),
}
@classmethod
def parse_device(self):
dev = Device()
visible_devices = None
if 'CUDA_VISIBLE_DEVICES' in os.environ or 'NVIDIA_VISIBLE_DEVICES' in os.environ:
if 'PADDLE_XCCL_BACKEND' in os.environ:
dev._dtype = DeviceType.CUSTOM_DEVICE
visible_devices_str = '{}_VISIBLE_DEVICES'.format(
os.getenv('PADDLE_XCCL_BACKEND').upper())
if visible_devices_str in os.environ:
visible_devices = os.getenv(visible_devices_str)
elif 'CUDA_VISIBLE_DEVICES' in os.environ or 'NVIDIA_VISIBLE_DEVICES' in os.environ:
dev._dtype = DeviceType.GPU
visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") or os.getenv(
"NVIDIA_VISIBLE_DEVICES")
......@@ -111,12 +128,27 @@ class Device(object):
@classmethod
def detect_device(self):
import paddle.fluid as fluid
def get_custom_devices_count(device_type):
all_custom_devices = get_available_custom_device()
all_custom_devices = [
device.split(':')[0] for device in all_custom_devices
]
custom_devices_count = all_custom_devices.count(device_type)
return custom_devices_count
dev = Device()
num = 0
visible_devices = None
if fluid.core.is_compiled_with_cuda():
if 'PADDLE_XCCL_BACKEND' in os.environ:
custom_device_type = os.getenv('PADDLE_XCCL_BACKEND')
dev._dtype = DeviceType.CUSTOM_DEVICE
num = get_custom_devices_count(custom_device_type)
visible_devices_str = '{}_VISIBLE_DEVICES'.format(
custom_device_type.upper())
if visible_devices_str in os.environ:
visible_devices = os.getenv(visible_devices_str)
elif fluid.core.is_compiled_with_cuda():
dev._dtype = DeviceType.GPU
num = fluid.core.get_cuda_device_count()
visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") or os.getenv(
......
......@@ -13,6 +13,7 @@
# limitations under the License.
from .controller import Controller, ControleMode
from ..context.device import DeviceType
import json
import os
......@@ -98,6 +99,8 @@ class CollectiveController(Controller):
"PADDLE_RANK_IN_NODE": str(i),
}
if len(selected_dev_list) > 0:
if self.ctx.node.device.dtype == DeviceType.CUSTOM_DEVICE:
e.update(self.ctx.node.device.get_custom_device_envs())
if self.pod.replicas == 1:
e.update({selected_dev_key: ",".join(selected_dev_list)})
else:
......
......@@ -72,10 +72,10 @@ def _start_kv_server(port, http_server_d, size):
def _is_cpuonly(backend):
check_backend(backend)
if backend in [
'auto', 'nccl', 'bkcl', 'hccl', 'heter', 'cncl'
] and (core.is_compiled_with_cuda() or core.is_compiled_with_xpu()
or core.is_compiled_with_npu() or core.is_compiled_with_mlu()):
if (backend in ['auto', 'nccl', 'bkcl', 'hccl', 'heter', 'cncl'] and
(core.is_compiled_with_cuda() or core.is_compiled_with_xpu()
or core.is_compiled_with_npu()
or core.is_compiled_with_mlu())) or backend is 'xccl':
# passes 'auto' and can use cuda or xpu, use the default logics. so return False
return False
......@@ -172,6 +172,11 @@ def init_parallel_env():
raise NotImplementedError(
"If you want to use CPU-only version, please use 'gloo' as backend")
if backend == "xccl":
FLAGS_selected_custom_devices = 'FLAGS_selected_{}s'.format(
parallel_env.device_type)
_check_var_exists(FLAGS_selected_custom_devices)
else:
if not is_cpu_only and core.is_compiled_with_cuda():
_check_var_exists("FLAGS_selected_gpus")
backend = "nccl" if backend == "auto" else backend
......@@ -196,7 +201,10 @@ def init_parallel_env():
# directly, if they want to switch default place,
# they need to call a function to change default place,
# here just set correctly place to users
if is_cpu_only:
if backend == "xccl":
place = core.CustomPlace(parallel_env.device_type,
parallel_env.device_id)
elif is_cpu_only:
place = core.CPUPlace()
elif core.is_compiled_with_cuda():
place = core.CUDAPlace(parallel_env.device_id)
......
......@@ -118,8 +118,16 @@ class ParallelEnv(object):
def __init__(self):
self._rank = int(os.getenv("PADDLE_TRAINER_ID", "0"))
self._world_size = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
self._device_type = str(os.getenv("PADDLE_XCCL_BACKEND", ""))
# imperative only support one gpu or xpu
if self._device_type != "":
FLAGS_selected_custom_devices = 'FLAGS_selected_{}s'.format(
self._device_type)
selected_custom_devices = os.getenv(FLAGS_selected_custom_devices,
"0").split(",")
self._device_id = int(selected_custom_devices[0])
else:
if core.is_compiled_with_cuda():
selected_gpus = os.getenv("FLAGS_selected_gpus", "0").split(",")
self._device_id = int(selected_gpus[0])
......@@ -199,6 +207,16 @@ class ParallelEnv(object):
"""
return self._device_id
@property
def device_type(self):
"""
The type of custom device for parallel training.
Its value is equal to the value of the environment variable ``PADDLE_XCCL_BACKEND`` . The default value is None.
"""
return self._device_type
@property
def current_endpoint(self):
"""
......
......@@ -5,8 +5,16 @@ if(WITH_CUSTOM_DEVICE)
"test_*.py")
string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
list(REMOVE_ITEM TEST_OPS "test_collective_process_group_xccl")
list(REMOVE_ITEM TEST_OPS test_collective_process_group_xccl)
foreach(TEST_OP ${TEST_OPS})
py_test(${TEST_OP} SRCS ${TEST_OP}.py)
endforeach()
bash_test_modules(
test_fleet_launch_custom_device START_BASH
test_fleet_launch_custom_device.sh ENVS
PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
set_tests_properties(test_custom_cpu_plugin PROPERTIES TIMEOUT 120)
set_tests_properties(test_fleet_launch_custom_device PROPERTIES TIMEOUT 120)
endif()
......@@ -16,13 +16,13 @@
set -e
rm -rf PaddleCustomDevice && git clone https://github.com/PaddlePaddle/PaddleCustomDevice.git && pushd PaddleCustomDevice/backends/custom_cpu && mkdir build && pushd build && cmake .. && make -j8 && popd && popd
echo "begin test use custom_cpu"
export FLAGS_selected_custom_cpus=0,1
export CUSTOM_CPU_VISIBLE_DEVICES=0,1
export CUSTOM_DEVICE_ROOT=PaddleCustomDevice/backends/custom_cpu/build
distributed_args="--ips=127.0.0.1 --backend=xccl --custom_device_type=custom_cpu --custom_devices=0,1 --run_mode=collective --log_dir=testlog"
distributed_args="--devices=0,1"
python -m paddle.distributed.fleet.launch ${distributed_args} custom_device_multi_process_collective.py fleetlaunch_custom_cpu
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册