未验证 提交 92faeedf 编写于 作者: L lilong12 提交者: GitHub

Pg heter cloud (#40911)

上级 ec510bfd
......@@ -6,8 +6,15 @@ if (WITH_DISTRIBUTE)
endif()
if(WITH_NCCL)
cc_library(processgroup_nccl SRCS ProcessGroupNCCL.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api)
cc_library(processgroup_nccl SRCS ProcessGroupNCCL.cc NCCLTools.cc Common.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api)
if (WITH_DISTRIBUTE)
cc_library(processgroup_heter SRCS ProcessGroupHeter.cc NCCLTools.cc Common.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api)
endif()
endif()
if(WITH_ASCEND_CL)
cc_library(processgroup_hccl SRCS ProcessGroupHCCL.cc DEPS place npu_stream enforce collective_helper device_context phi phi_api eager_api)
cc_library(processgroup_hccl SRCS ProcessGroupHCCL.cc HCCLTools.cc Common.cc DEPS place npu_stream enforce collective_helper device_context phi phi_api eager_api)
if (WITH_DISTRIBUTE)
cc_library(processgroup_heter SRCS ProcessGroupHeter.cc HCCLTools.cc Common.cc DEPS place npu_stream enforce collective_helper device_context phi phi_api eager_api)
endif()
endif()
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/collective/Common.h"
namespace paddle {
namespace distributed {
std::vector<Place> GetPlaceList(const std::vector<Tensor>& tensors) {
std::vector<Place> places;
places.reserve(tensors.size());
for (auto& tensor : tensors) {
places.push_back(tensor.inner_place());
}
return places;
}
std::string GetKeyFromPlaces(const std::vector<Place>& places) {
std::string placeList;
for (auto& place : places) {
std::stringstream tmp;
tmp << place;
if (placeList.empty()) {
placeList += tmp.str();
} else {
placeList += "," + tmp.str();
}
}
return placeList;
}
static bool CheckTensorsInPlace(const std::vector<Tensor>& tensors,
const PlaceType type) {
return std::all_of(tensors.cbegin(), tensors.cend(),
[&](const Tensor& t) { return t.place() == type; });
}
bool CheckTensorsInCudaPlace(const std::vector<Tensor>& tensors) {
return CheckTensorsInPlace(tensors, PlaceType::kGPU);
}
} // namespace distributed
} // namespace paddle
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
namespace paddle {
namespace distributed {
using Tensor = paddle::experimental::Tensor;
using Place = paddle::platform::Place;
// Get the list of devices from list of tensors
std::vector<Place> GetPlaceList(const std::vector<Tensor>& tensors);
// Get the deviceList String from the list of devices
std::string GetKeyFromPlaces(const std::vector<Place>& places);
bool CheckTensorsInCudaPlace(const std::vector<Tensor>& tensors);
} // namespace distributed
} // namespace paddle
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/collective/HCCLTools.h"
#include "paddle/fluid/distributed/collective/Types.h"
namespace paddle {
namespace distributed {
HcclReduceOp ToHCCLRedType(ReduceOp reduction) {
static const std::map<ReduceOp, HcclReduceOp> red_type = {
{ReduceOp::MIN, HCCL_REDUCE_MIN},
{ReduceOp::MAX, HCCL_REDUCE_MAX},
{ReduceOp::SUM, HCCL_REDUCE_SUM},
{ReduceOp::PRODUCT, HCCL_REDUCE_PROD},
};
auto it = red_type.find(reduction);
PADDLE_ENFORCE_EQ(
it != red_type.end(), true,
platform::errors::InvalidArgument("Invalid hccl reduction. "
"Must be Min | Max | Prod | Sum"));
return it->second;
}
std::string SerializeHCCLUniqueId(const HcclRootInfo& hcclID) {
const uint8_t* bytes = reinterpret_cast<const uint8_t*>(&hcclID);
std::ostringstream oss;
for (size_t i = 0; i < sizeof(hcclID); ++i) {
oss << std::hex << static_cast<int>(bytes[i]);
}
return oss.str();
}
} // namespace distributed
} // namespace paddle
......@@ -18,6 +18,7 @@
#include <string>
#include "boost/variant.hpp"
#include "paddle/fluid/distributed/collective/Types.h"
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/collective_helper.h"
......@@ -170,5 +171,8 @@ class HCCLCommManager {
mutable std::mutex mutex_;
};
HcclReduceOp ToHCCLRedType(ReduceOp reduction);
std::string SerializeHCCLUniqueId(const HcclRootInfo& hcclID);
} // namespace distributed
} // namespace paddle
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/collective/NCCLTools.h"
#include "paddle/fluid/distributed/collective/Types.h"
namespace paddle {
namespace distributed {
ncclRedOp_t ToNCCLRedType(ReduceOp reduction) {
static const std::map<ReduceOp, ncclRedOp_t> red_type = {
{ReduceOp::MIN, ncclMin},
{ReduceOp::MAX, ncclMax},
{ReduceOp::SUM, ncclSum},
{ReduceOp::PRODUCT, ncclProd},
};
auto it = red_type.find(reduction);
PADDLE_ENFORCE_EQ(it != red_type.end(), true,
platform::errors::InvalidArgument(
"Invalid nccl reduction. Must be ncclMin | ncclMax | "
"ncclProd | ncclSum"));
return it->second;
}
std::string SerializeNCCLUniqueId(const ncclUniqueId& ncclID) {
const uint8_t* bytes = reinterpret_cast<const uint8_t*>(&ncclID);
std::ostringstream oss;
for (auto i = 0; i < NCCL_UNIQUE_ID_BYTES; ++i) {
oss << std::hex << static_cast<int>(bytes[i]);
}
return oss.str();
}
} // namespace distributed
} // namespace paddle
......@@ -26,6 +26,8 @@
#include "paddle/fluid/platform/dynload/nccl.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/distributed/collective/Types.h"
namespace paddle {
namespace distributed {
......@@ -194,5 +196,8 @@ class NCCLCommManager {
mutable std::mutex mutex_;
};
ncclRedOp_t ToNCCLRedType(ReduceOp reduction);
std::string SerializeNCCLUniqueId(const ncclUniqueId& ncclID);
} // namespace distributed
} // namespace paddle
......@@ -34,7 +34,13 @@ bool ProcessGroup::Task::Wait(std::chrono::milliseconds timeout) {
void ProcessGroup::Task::Synchronize() {}
ProcessGroup::ProcessGroup(int rank, int size) : rank_(rank), size_(size) {}
ProcessGroup::ProcessGroup(int rank, int size, int gid)
: rank_(rank), size_(size) {
if (gid != IGNORE_ID) {
auto map = ProcessGroupMapFromGid::getInstance();
map->insert(gid, this);
}
}
} // namespace distributed
} // namespace paddle
......@@ -31,6 +31,7 @@ constexpr auto kWaitTimeout = std::chrono::milliseconds(0);
namespace paddle {
namespace distributed {
constexpr int IGNORE_ID = -1;
using Tensor = paddle::experimental::Tensor;
enum class CommType : std::uint8_t {
......@@ -49,14 +50,6 @@ enum class CommType : std::uint8_t {
UNKNOWN = 100,
};
struct ProcessGroupStrategy {
int nranks_{1};
int local_rank_{0};
std::vector<std::string> trainer_endpoints_{};
std::string current_endpoint_{""};
int nrings_{1};
};
class ProcessGroup {
public:
class Task {
......@@ -76,7 +69,7 @@ class ProcessGroup {
bool is_completed_ = false;
};
explicit ProcessGroup(int rank, int size);
explicit ProcessGroup(int rank, int size, int gid);
virtual ~ProcessGroup() {}
int GetRank() const { return rank_; }
......@@ -99,6 +92,12 @@ class ProcessGroup {
"ProcessGroup%s does not support broadcast", GetBackendName()));
}
virtual void Broadcast(const phi::DenseTensor* in, phi::DenseTensor* out) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support broadcast for static",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Barrier(
const BarrierOptions& = BarrierOptions()) {
PADDLE_THROW(platform::errors::InvalidArgument(
......@@ -151,5 +150,38 @@ class ProcessGroup {
const int size_;
};
class ProcessGroupMapFromGid {
public:
bool has(int gid) {
auto it = map_.find(gid);
return it != map_.end();
}
void insert(int gid, ProcessGroup* pg) {
PADDLE_ENFORCE_EQ(has(gid), false,
platform::errors::PreconditionNotMet(
"The process group with id %d doesnot exist.", gid));
map_[gid] = pg;
}
ProcessGroup* get(int gid) {
PADDLE_ENFORCE_EQ(has(gid), false,
platform::errors::PreconditionNotMet(
"The process group with id %d doesnot exist.", gid));
return map_.find(gid)->second;
}
static std::shared_ptr<ProcessGroupMapFromGid> getInstance() {
static auto s_instance = std::make_shared<ProcessGroupMapFromGid>();
return s_instance;
}
ProcessGroupMapFromGid() = default;
~ProcessGroupMapFromGid() = default;
private:
std::unordered_map<int, ProcessGroup*> map_;
};
} // namespace distributed
} // namespace paddle
......@@ -173,8 +173,10 @@ ProcessGroupGloo::GlooTask::GlooTask(int rank,
ProcessGroupGloo::ProcessGroupGloo(
const std::shared_ptr<paddle::distributed::Store>& store, int rank,
int world_size, const std::shared_ptr<GlooOptions> options)
: ProcessGroup(rank, world_size), _tag(0), _store(new GlooStore(store)) {
int world_size, int gid, const std::shared_ptr<GlooOptions> options)
: ProcessGroup(rank, world_size, gid),
_tag(0),
_store(new GlooStore(store)) {
_context = std::make_shared<gloo::rendezvous::Context>(rank, world_size);
auto prefix_store =
::gloo::rendezvous::PrefixStore(std::to_string(0), *_store);
......
......@@ -101,7 +101,7 @@ class ProcessGroupGloo : public ProcessGroup {
explicit ProcessGroupGloo(
const std::shared_ptr<paddle::distributed::Store>& store, int rank,
int world_size, std::shared_ptr<GlooOptions> options);
int world_size, int gid, std::shared_ptr<GlooOptions> options);
~ProcessGroupGloo() = default;
......
......@@ -13,6 +13,8 @@
// limitations under the License.
#include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h"
#include "paddle/fluid/distributed/collective/Common.h"
#include "paddle/fluid/distributed/collective/HCCLTools.h"
#include "paddle/fluid/memory/malloc.h"
#include "paddle/fluid/platform/device/npu/hccl_helper.h"
#include "paddle/fluid/platform/device_context.h"
......@@ -28,55 +30,6 @@ constexpr int64_t kWaitBlockTImeout = 10;
namespace paddle {
namespace distributed {
static HcclReduceOp ToHCCLRedType(ReduceOp reduction) {
static const std::map<ReduceOp, HcclReduceOp> red_type = {
{ReduceOp::MIN, HCCL_REDUCE_MIN},
{ReduceOp::MAX, HCCL_REDUCE_MAX},
{ReduceOp::SUM, HCCL_REDUCE_SUM},
{ReduceOp::PRODUCT, HCCL_REDUCE_PROD},
};
auto it = red_type.find(reduction);
PADDLE_ENFORCE_EQ(
it != red_type.end(), true,
platform::errors::InvalidArgument("Invalid hccl reduction. "
"Must be Min | Max | Prod | Sum"));
return it->second;
}
std::string SerializeHCCLUniqueId(const HcclRootInfo& hcclID) {
const uint8_t* bytes = reinterpret_cast<const uint8_t*>(&hcclID);
std::ostringstream oss;
for (size_t i = 0; i < sizeof(hcclID); ++i) {
oss << std::hex << static_cast<int>(bytes[i]);
}
return oss.str();
}
// Get the list of devices from list of tensors
std::vector<Place> GetPlaceList(const std::vector<Tensor>& tensors) {
std::vector<Place> places;
places.reserve(tensors.size());
for (auto& tensor : tensors) {
places.push_back(tensor.inner_place());
}
return places;
}
// Get the deviceList String from the list of devices
std::string GetKeyFromPlaces(const std::vector<Place>& places) {
std::string placeList;
for (auto& place : places) {
std::stringstream tmp;
tmp << place;
if (placeList.empty()) {
placeList += tmp.str();
} else {
placeList += "," + tmp.str();
}
}
return placeList;
}
// bool CheckTensorsInNPUPlace(const std::vector<Tensor>& tensors) {
// return std::all_of(tensors.cbegin(), tensors.cend(), [&](const Tensor& t) {
// return t.place() == platform::DeviceType::NPU;
......@@ -150,8 +103,8 @@ bool ProcessGroupHCCL::HCCLTask::Wait(std::chrono::milliseconds timeout) {
void ProcessGroupHCCL::HCCLTask::Synchronize() { Wait(kWaitTimeout); }
ProcessGroupHCCL::ProcessGroupHCCL(const std::shared_ptr<Store>& store,
int rank, int size)
: ProcessGroup(rank, size), store_(store) {}
int rank, int size, int gid)
: ProcessGroup(rank, size, gid), store_(store) {}
void ProcessGroupHCCL::BroadcastUniqueHCCLID(
std::vector<HcclRootInfo>& hccl_ids) { // NOLINT
......
......@@ -70,7 +70,8 @@ class ProcessGroupHCCL : public ProcessGroup {
private:
};
ProcessGroupHCCL(const std::shared_ptr<Store>& store, int rank, int size);
ProcessGroupHCCL(const std::shared_ptr<Store>& store, int rank, int size,
int gid);
const std::string GetBackendName() const override {
return std::string(HCCL_BACKEND_NAME);
......
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/collective/ProcessGroupHeter.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
#include "paddle/phi/common/place.h"
constexpr int64_t kWaitBlockTImeout = 10;
namespace paddle {
namespace distributed {
using Place = paddle::platform::Place;
std::shared_ptr<ProcessGroupHeter::HeterTask> ProcessGroupHeter::CreateTask(
int rank, CommType comm_type, const std::vector<Tensor>& inputs) {
return std::make_shared<ProcessGroupHeter::HeterTask>(rank, comm_type,
inputs);
}
ProcessGroupHeter::HeterTask::HeterTask(int rank, CommType CommType,
const std::vector<Tensor>& inputs)
: Task(rank, inputs, CommType) {}
ProcessGroupHeter::HeterTask::~HeterTask() {}
bool ProcessGroupHeter::HeterTask::IsCompleted() { return true; }
// TODO(sheniang03): Add timeout for wait, now timeout unused
bool ProcessGroupHeter::HeterTask::Wait(std::chrono::milliseconds timeout) {
return true;
}
ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr<Store>& store,
int rank, int size, int gid,
int local_rank, int local_size,
int gloo_rank, int gloo_size,
bool with_switch,
std::string switch_endpoint)
: ProcessGroup(rank, size, gid),
store_(store),
local_rank_(local_rank),
local_size_(local_size),
gloo_rank_(gloo_rank),
gloo_size_(gloo_size),
with_switch_(with_switch) {
#if defined(PADDLE_WITH_NCCL)
inner_pg_ = std::make_shared<ProcessGroupNCCL>(store, local_rank, local_size,
IGNORE_ID);
#elif defined(PADDLE_WITH_ASCEND_CL)
inner_pg_ = std::make_shared<ProcessGroupHCCL>(store, local_rank, local_size,
IGNORE_ID);
#else
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroupHeter only supports NCCL and HCCL now.");
#endif
if (with_switch_) {
// TODO(sandyhouse) starts a client to connect the cloud switch module
// std::shared_ptr<HeterClient> client_ =
// HeterClient::GetInstance({switch_endpoint}, {}, 0);
} else if (local_rank_ == 0) {
auto opts = ProcessGroupGloo::GlooOptions::create();
opts->device = ProcessGroupGloo::createDefaultDevice();
inter_pg_ = std::make_shared<ProcessGroupGloo>(store, gloo_rank_,
gloo_size_, IGNORE_ID, opts);
}
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce(
std::vector<Tensor>& tensors, const AllreduceOptions& opts) {
#if defined(PADDLE_WITH_NCCL)
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(tensors), true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
#endif
// Step1: do allreduce in inner cluster
auto task = inner_pg_->AllReduce(tensors, opts);
task->Wait();
// Step2: copy tensors to CPU
if (local_rank_ == 0) {
std::vector<Tensor> cpu_tensors(tensors.size());
for (size_t i = 0; i < tensors.size(); i++) {
auto dense_gpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(tensors[i].impl());
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[i].impl());
dense_cpu_tensor->Resize(tensors[i].dims());
framework::TensorCopySync(*dense_gpu_tensor, platform::CPUPlace(),
dense_cpu_tensor.get());
}
// Step3: do inter cluster allreduce
if (with_switch_) {
// TODO(sandyhouse) send to and recv from switch, and do add
} else {
auto gloo_task = inter_pg_->AllReduce(cpu_tensors, opts);
gloo_task->Wait();
}
// Step4: copy cpu tensors to gpu
// TODO(sandyhouse)
// copy cpu tensors to gpu
for (size_t i = 0; i < tensors.size(); i++) {
auto dense_gpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(tensors[i].impl());
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[i].impl());
// framework::TensorCopySync(*dense_cpu_tensor, tensors[i].place(),
// dense_gpu_tensor.get());
framework::TensorCopySync(*dense_cpu_tensor, dense_cpu_tensor->place(),
dense_gpu_tensor.get());
}
}
// Step5: broadcast among inner cluster
auto b_opts = BroadcastOptions();
b_opts.source_root = 0;
auto broadcast_task = inner_pg_->Broadcast(tensors, b_opts);
broadcast_task->Wait();
return CreateTask(rank_, CommType::ALLREDUCE, tensors);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
std::vector<Tensor>& tensors, const BroadcastOptions& opts) {
#if defined(PADDLE_WITH_NCCL)
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(tensors), true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
#endif
// Step1: do broadcast in inner cluster
auto b_opts = BroadcastOptions();
b_opts.source_root = 0;
inner_pg_->Broadcast(tensors, b_opts);
if (local_rank_ == 0) {
std::vector<Tensor> cpu_tensors(tensors.size());
for (size_t i = 0; i < tensors.size(); i++) {
auto dense_gpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(tensors[i].impl());
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[i].impl());
dense_cpu_tensor->Resize(tensors[i].dims());
framework::TensorCopySync(*dense_gpu_tensor, platform::CPUPlace(),
dense_cpu_tensor.get());
}
if (with_switch_) {
// TODO(sandyhouse) send to and recv
} else {
auto gloo_task = inter_pg_->Broadcast(cpu_tensors, opts);
gloo_task->Wait();
}
for (size_t i = 0; i < tensors.size(); i++) {
auto dense_gpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(tensors[i].impl());
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[i].impl());
// framework::TensorCopySync(*dense_cpu_tensor, tensors[i].place(),
// dense_gpu_tensor.get());
framework::TensorCopySync(*dense_cpu_tensor, dense_cpu_tensor->place(),
dense_gpu_tensor.get());
}
}
auto broadcast_task = inner_pg_->Broadcast(tensors, b_opts);
broadcast_task->Wait();
return CreateTask(rank_, CommType::BROADCAST, tensors);
}
void ProcessGroupHeter::Broadcast(const phi::DenseTensor* in,
phi::DenseTensor* out) {
// Step1: do broadcast in inner cluster
inner_pg_->Broadcast(in, out);
if (local_rank_ == 0) {
Tensor cpu_tensor;
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensor.impl());
dense_cpu_tensor->Resize(in->dims());
framework::TensorCopySync(*in, platform::CPUPlace(),
dense_cpu_tensor.get());
if (with_switch_) {
// TODO(sandyhouse) send to and recv
} else {
std::vector<Tensor> cpu_tensors = {cpu_tensor};
// auto gloo_task = inter_pg_->Broadcast(cpu_tensors);
// gloo_task->Wait();
inter_pg_->Broadcast(cpu_tensors);
}
framework::TensorCopySync(*dense_cpu_tensor, dense_cpu_tensor->place(),
out);
}
inner_pg_->Broadcast(out, out);
}
} // namespace distributed
} // namespace paddle
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <chrono>
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/distributed/collective/ProcessGroupGloo.h"
// #include "paddle/fluid/distributed/ps/service/heter_client.h"
#include "paddle/fluid/platform/device_context.h"
#ifdef PADDLE_WITH_GLOO
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#endif
#include "paddle/fluid/distributed/store/store.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/gen_comm_id_helper.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/stream/cuda_stream.h"
#if defined(PADDLE_WITH_NCCL)
#include "paddle/fluid/distributed/collective/NCCLTools.h"
#include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h"
#include "paddle/fluid/platform/cuda_device_guard.h"
#include "paddle/fluid/platform/dynload/nccl.h"
#endif
#if defined(PADDLE_WITH_ASCEND_CL)
#include "paddle/fluid/distributed/collective/HCCLTools.h"
#include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h"
#endif
#include "paddle/fluid/distributed/collective/Common.h"
constexpr const char* HETER_BACKEND_NAME = "HETER_BACKEND";
namespace paddle {
namespace distributed {
using Place = paddle::platform::Place;
class ProcessGroupHeter : public ProcessGroup {
public:
class HeterTask : public ProcessGroup::Task,
public std::enable_shared_from_this<HeterTask> {
public:
HeterTask(int rank, CommType CommType, const std::vector<Tensor>& inputs);
bool IsCompleted();
void SynchronizeStreams() {}
bool Wait(std::chrono::milliseconds timeout = kWaitTimeout);
void Synchronize() {}
virtual ~HeterTask();
};
ProcessGroupHeter(const std::shared_ptr<Store>& store, int rank, int size,
int gid, int local_rank, int local_size, int gloo_rank,
int gloo_size, bool with_switch,
std::string switch_endpoints);
const std::string GetBackendName() const override {
return std::string(HETER_BACKEND_NAME);
}
std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<Tensor>& tensors,
const AllreduceOptions& = AllreduceOptions()) override;
std::shared_ptr<ProcessGroup::Task> Broadcast(
std::vector<Tensor>& tensors,
const BroadcastOptions& = BroadcastOptions()) override;
void Broadcast(const phi::DenseTensor* in, phi::DenseTensor* out) override;
protected:
virtual std::shared_ptr<ProcessGroupHeter::HeterTask> CreateTask(
int rank, CommType opType, const std::vector<Tensor>& inputs);
private:
std::shared_ptr<Store> store_;
std::shared_ptr<ProcessGroup> inner_pg_;
std::shared_ptr<ProcessGroupGloo> inter_pg_;
int local_rank_;
int local_size_;
int gloo_rank_;
int gloo_size_;
bool with_switch_;
};
} // namespace distributed
} // namespace paddle
......@@ -13,6 +13,7 @@
// limitations under the License.
#include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h"
#include "paddle/fluid/distributed/collective/Common.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
......@@ -26,61 +27,6 @@ constexpr int64_t kWaitBlockTImeout = 10;
namespace paddle {
namespace distributed {
static ncclRedOp_t ToNCCLRedType(ReduceOp reduction) {
static const std::map<ReduceOp, ncclRedOp_t> red_type = {
{ReduceOp::MIN, ncclMin},
{ReduceOp::MAX, ncclMax},
{ReduceOp::SUM, ncclSum},
{ReduceOp::PRODUCT, ncclProd},
};
auto it = red_type.find(reduction);
PADDLE_ENFORCE_EQ(it != red_type.end(), true,
platform::errors::InvalidArgument(
"Invalid nccl reduction. Must be ncclMin | ncclMax | "
"ncclProd | ncclSum"));
return it->second;
}
std::string SerializeNCCLUniqueId(const ncclUniqueId& ncclID) {
const uint8_t* bytes = reinterpret_cast<const uint8_t*>(&ncclID);
std::ostringstream oss;
for (auto i = 0; i < NCCL_UNIQUE_ID_BYTES; ++i) {
oss << std::hex << static_cast<int>(bytes[i]);
}
return oss.str();
}
// Get the list of devices from list of tensors
std::vector<Place> GetPlaceList(const std::vector<Tensor>& tensors) {
std::vector<Place> places;
places.reserve(tensors.size());
for (auto& tensor : tensors) {
places.push_back(tensor.inner_place());
}
return places;
}
// Get the deviceList String from the list of devices
std::string GetKeyFromPlaces(const std::vector<Place>& places) {
std::string placeList;
for (auto& place : places) {
std::stringstream tmp;
tmp << place;
if (placeList.empty()) {
placeList += tmp.str();
} else {
placeList += "," + tmp.str();
}
}
return placeList;
}
bool CheckTensorsInCudaPlace(const std::vector<Tensor>& tensors) {
return std::all_of(tensors.cbegin(), tensors.cend(), [&](const Tensor& t) {
return t.place() == PlaceType::kGPU;
});
}
void SyncDefaultStream(
const std::vector<Place>& places,
std::vector<EventManager>& ncclEvents, // NOLINT
......@@ -157,8 +103,8 @@ bool ProcessGroupNCCL::NCCLTask::Wait(std::chrono::milliseconds timeout) {
void ProcessGroupNCCL::NCCLTask::Synchronize() { Wait(kWaitTimeout); }
ProcessGroupNCCL::ProcessGroupNCCL(const std::shared_ptr<Store>& store,
int rank, int size)
: ProcessGroup(rank, size), store_(store) {}
int rank, int size, int gid)
: ProcessGroup(rank, size, gid), store_(store) {}
void ProcessGroupNCCL::BroadcastUniqueNCCLID(
std::vector<ncclUniqueId>& nccl_ids) { // NOLINT
......
......@@ -76,7 +76,8 @@ class ProcessGroupNCCL : public ProcessGroup {
private:
};
ProcessGroupNCCL(const std::shared_ptr<Store>& store, int rank, int size);
ProcessGroupNCCL(const std::shared_ptr<Store>& store, int rank, int size,
int gid);
const std::string GetBackendName() const override {
return std::string(NCCL_BACKEND_NAME);
......
......@@ -18,6 +18,8 @@ limitations under the License. */
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#endif
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/phi/api/include/tensor.h"
namespace paddle {
namespace operators {
......@@ -36,6 +38,13 @@ class CBroadcastOpCUDAKernel : public framework::OpKernel<T> {
int rid = ctx.Attr<int>("ring_id");
auto place = ctx.GetPlace();
auto comm = platform::NCCLCommContext::Instance().Get(rid, place);
auto map = distributed::ProcessGroupMapFromGid::getInstance();
if (map->has(rid)) {
// Use ProcessGroup
distributed::ProcessGroup* pg = map->get(rid);
pg->Broadcast(x, out);
return;
}
gpuStream_t stream = nullptr;
if (ctx.Attr<bool>("use_calc_stream")) {
......
......@@ -213,16 +213,20 @@ void BindDistributed(py::module *m) {
py::class_<distributed::ProcessGroupNCCL,
std::shared_ptr<distributed::ProcessGroupNCCL>>(
*m, "ProcessGroupNCCL", ProcessGroup)
.def(py::init<const std::shared_ptr<distributed::Store> &, int, int>(),
py::call_guard<py::gil_scoped_release>());
.def(py::init<const std::shared_ptr<distributed::Store> &, int, int,
int>(),
py::arg("store"), py::arg("rank"), py::arg("world_size"),
py::arg("group_id") = 0, py::call_guard<py::gil_scoped_release>());
#endif
#if defined(PADDLE_WITH_ASCEND_CL)
py::class_<distributed::ProcessGroupHCCL,
std::shared_ptr<distributed::ProcessGroupHCCL>>(
*m, "ProcessGroupHCCL", ProcessGroup)
.def(py::init<const std::shared_ptr<distributed::Store> &, int, int>(),
py::call_guard<py::gil_scoped_release>());
.def(py::init<const std::shared_ptr<distributed::Store> &, int, int,
int>(),
py::arg("store"), py::arg("rank"), py::arg("world_size"),
py::arg("group_id") = 0, py::call_guard<py::gil_scoped_release>());
#endif
py::class_<distributed::ProcessGroup::Task,
......@@ -238,10 +242,10 @@ void BindDistributed(py::module *m) {
py::class_<ProcessGroupGloo, std::shared_ptr<ProcessGroupGloo>>(
*m, "ProcessGroupGloo", ProcessGroup)
.def(py::init<const std::shared_ptr<paddle::distributed::Store> &, int,
int, std::shared_ptr<GlooOptions> &>(),
int, int, std::shared_ptr<GlooOptions> &>(),
py::call_guard<py::gil_scoped_release>())
.def(py::init([](const std::shared_ptr<paddle::distributed::Store> &store,
int rank, int world_size) {
int rank, int world_size, int gid) {
auto opts = GlooOptions::create();
char *ifname = getenv(GLOO_SOCKET_IFNAME_ENV.c_str());
if (ifname && strlen(ifname) > 1) {
......@@ -251,10 +255,10 @@ void BindDistributed(py::module *m) {
opts->device = ProcessGroupGloo::createDefaultDevice();
}
return std::make_shared<ProcessGroupGloo>(store, rank, world_size,
opts);
gid, opts);
}),
py::arg("store"), py::arg("rank"), py::arg("world_size"),
py::call_guard<py::gil_scoped_release>())
py::arg("group_id") = 0, py::call_guard<py::gil_scoped_release>())
.def_static("create_default_device",
&ProcessGroupGloo::createDefaultDevice);
#endif
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册