未验证 提交 df113208 编写于 作者: L lilong12 提交者: GitHub

add send/recv for ProcessGroupHeter (#42318)

上级 a384828d
......@@ -100,7 +100,11 @@ if(APPLE AND WITH_ARM)
endif()
if(WITH_ASCEND_CL AND NOT WITH_ASCEND_CXX11)
if(WITH_ARM_BRPC)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_GLIBCXX_USE_CXX11_ABI=1")
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_GLIBCXX_USE_CXX11_ABI=0")
endif()
endif()
if(WIN32)
......@@ -386,7 +390,7 @@ if(WITH_DISTRIBUTE)
if(LINUX)
set(WITH_GLOO ON CACHE STRING "Enable GLOO when compiling WITH_DISTRIBUTE=ON." FORCE)
endif()
if(WITH_ASCEND_CL)
if(WITH_ASCEND_CL AND NOT WITH_ARM_BRPC)
# disable WITH_PSCORE for NPU before include third_party
MESSAGE(WARNING "Disable WITH_PSCORE when compiling with NPU. Force WITH_PSCORE=OFF.")
set(WITH_PSCORE OFF CACHE BOOL "Disable WITH_PSCORE when compiling with NPU" FORCE)
......
......@@ -158,6 +158,10 @@ if(WITH_IPU)
)
endif()
if(WITH_ASCEND_CL AND WITH_ARM_BRPC)
set(COMMON_FLAGS ${COMMON_FLAGS} -faligned-new)
endif()
if(NOT APPLE)
if((${CMAKE_CXX_COMPILER_VERSION} VERSION_GREATER 8.0) OR (WITH_ROCM))
set(COMMON_FLAGS
......
......@@ -13,6 +13,7 @@
// limitations under the License.
#include "paddle/fluid/distributed/collective/ProcessGroupHeter.h"
#include <chrono>
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
......@@ -24,6 +25,8 @@ namespace paddle {
namespace distributed {
using Place = paddle::platform::Place;
int ProcessGroupHeter::send_count = 0;
int ProcessGroupHeter::recv_count = 0;
std::shared_ptr<ProcessGroupHeter::HeterTask> ProcessGroupHeter::CreateTask(
int rank, CommType comm_type, const std::vector<phi::DenseTensor>& inputs) {
......@@ -47,7 +50,8 @@ bool ProcessGroupHeter::HeterTask::Wait(std::chrono::milliseconds timeout) {
ProcessGroupHeter::ProcessGroupHeter(
const std::shared_ptr<Store>& store, int rank, int size,
const platform::Place& place, int gid, int local_rank, int local_size,
int gloo_rank, int gloo_size, bool with_switch, std::string switch_endpoint)
int gloo_rank, int gloo_size, bool with_switch, std::string switch_endpoint,
int src_rank, int dst_rank)
: ProcessGroup(rank, size, place, gid),
store_(store),
local_rank_(local_rank),
......@@ -55,7 +59,10 @@ ProcessGroupHeter::ProcessGroupHeter(
gloo_rank_(gloo_rank),
gloo_size_(gloo_size),
with_switch_(with_switch),
switch_endpoint_(switch_endpoint) {
switch_endpoint_(switch_endpoint),
src_rank_(src_rank),
dst_rank_(dst_rank) {
return;
#if defined(PADDLE_WITH_NCCL)
inner_pg_ = std::make_shared<ProcessGroupNCCL>(store, local_rank, local_size,
place_, IGNORE_ID);
......@@ -246,5 +253,100 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
return CreateTask(rank_, CommType::BROADCAST, in_tensors);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Send(
std::vector<phi::DenseTensor>& in_tensors, int peer) {
#if defined(PADDLE_WITH_NCCL)
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(in_tensors), true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
#endif
PADDLE_ENFORCE_EQ(
in_tensors.size(), 1,
platform::errors::PreconditionNotMet(
"For each send operation, there can only be one tensor to send."));
// Copy Tensor to cpu
auto start = std::chrono::high_resolution_clock::now();
phi::DenseTensor cpu_tensor;
auto& gpu_tensor = in_tensors[0];
framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
PADDLE_ENFORCE_EQ(with_switch_, true,
platform::errors::PreconditionNotMet(
"Gloo does not support the send operation."));
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> diff = end - start;
VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims()
<< ") from gpu to cpu for send " << std::setw(9)
<< " is: " << diff.count() << " s" << std::endl;
// Send to switch
HeterClient* client_ =
HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
int64_t tensor_size =
cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype());
std::vector<int64_t> send_size;
send_size.push_back(tensor_size);
auto id = src_rank_ * 10000 + dst_rank_;
std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) +
std::string("_") + std::to_string(send_count++);
VLOG(2) << "tensor_name:" << tensor_name;
int ret = client_->Send(gid_, {tensor_name}, send_size, cpu_tensor.data(),
tensor_size);
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"Send to the switch module error."));
return CreateTask(rank_, CommType::SEND, in_tensors);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Recv(
std::vector<phi::DenseTensor>& out_tensors, int peer) {
#if defined(PADDLE_WITH_NCCL)
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(out_tensors), true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
#endif
PADDLE_ENFORCE_EQ(
out_tensors.size(), 1,
platform::errors::PreconditionNotMet(
"For each rece operation, there can only be one tensor to receive."));
// Copy Tensor to cpu
phi::DenseTensor cpu_tensor;
auto& gpu_tensor = out_tensors[0];
cpu_tensor.Resize(gpu_tensor.dims());
cpu_tensor.set_layout(gpu_tensor.layout());
cpu_tensor.mutable_data(platform::CPUPlace(), gpu_tensor.dtype());
PADDLE_ENFORCE_EQ(with_switch_, true,
platform::errors::PreconditionNotMet(
"Gloo does not support the send operation."));
// recv from switch
HeterClient* client_ =
HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
auto id = src_rank_ * 10000 + dst_rank_;
std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) +
std::string("_") + std::to_string(recv_count++);
VLOG(2) << "tensor_name: " << tensor_name;
auto start = std::chrono::high_resolution_clock::now();
int ret = client_->Recv(
gid_, {tensor_name}, cpu_tensor.data(),
cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype()));
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"receive to the switch module error."));
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> diff = end - start;
double goodput = cpu_tensor.numel() *
framework::DataTypeSize(cpu_tensor.dtype()) / diff.count();
VLOG(2) << "Goodput: " << goodput << "B/s" << std::endl;
start = std::chrono::high_resolution_clock::now();
framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor);
end = std::chrono::high_resolution_clock::now();
diff = end - start;
VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims()
<< ") from gpu to cpu for recv " << std::setw(9)
<< " is: " << diff.count() << " s" << std::endl;
return CreateTask(rank_, CommType::RECV, out_tensors);
}
} // namespace distributed
} // namespace paddle
......@@ -83,7 +83,8 @@ class ProcessGroupHeter : public ProcessGroup {
ProcessGroupHeter(const std::shared_ptr<Store>& store, int rank, int size,
const platform::Place& place, int gid, int local_rank,
int local_size, int gloo_rank, int gloo_size,
bool with_switch, std::string switch_endpoints);
bool with_switch, std::string switch_endpoints,
int src_rank, int dst_rank);
const std::string GetBackendName() const override {
return std::string(HETER_BACKEND_NAME);
......@@ -97,6 +98,12 @@ class ProcessGroupHeter : public ProcessGroup {
std::vector<phi::DenseTensor>&, std::vector<phi::DenseTensor>&,
const BroadcastOptions& = BroadcastOptions()) override;
std::shared_ptr<ProcessGroup::Task> Send(
std::vector<phi::DenseTensor>& in_tensors, int peer) override;
std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>& out_tensors, int peer) override;
protected:
virtual std::shared_ptr<ProcessGroupHeter::HeterTask> CreateTask(
int rank, CommType opType, const std::vector<phi::DenseTensor>& inputs);
......@@ -112,6 +119,10 @@ class ProcessGroupHeter : public ProcessGroup {
int gloo_size_;
bool with_switch_;
std::string switch_endpoint_;
int src_rank_;
int dst_rank_;
static int send_count;
static int recv_count;
};
} // namespace distributed
......
......@@ -19,6 +19,9 @@ limitations under the License. */
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#endif
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/phi/api/include/tensor.h"
namespace paddle {
namespace operators {
......@@ -42,6 +45,20 @@ class RecvOpV2CUDAKernel : public framework::OpKernel<T> {
gpuStream_t stream = nullptr;
auto place = ctx.GetPlace();
auto map = distributed::ProcessGroupMapFromGid::getInstance();
if (map->has(rid)) {
// Use ProcessGroup
distributed::ProcessGroup *pg = map->get(rid);
std::vector<phi::DenseTensor> out_tensor;
auto out_shape = ctx.Attr<std::vector<int>>("out_shape");
auto out = ctx.Output<framework::LoDTensor>("Out");
auto out_dims = out->dims();
out->mutable_data<T>(out_dims, place);
out_tensor.emplace_back(*out);
auto task = pg->Recv(out_tensor, peer);
return;
}
auto comm = platform::NCCLCommContext::Instance().Get(rid, place);
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
......
......@@ -18,6 +18,8 @@ limitations under the License. */
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device/npu/hccl_helper.h"
#endif
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/phi/api/include/tensor.h"
namespace paddle {
namespace operators {
......@@ -35,6 +37,15 @@ class CRecvOpASCENDKernel : public framework::OpKernel<T> {
platform::ToHCCLDataType(framework::TransToProtoVarType(out->dtype()));
int ring_id = ctx.Attr<int>("ring_id");
auto map = distributed::ProcessGroupMapFromGid::getInstance();
if (map->has(ring_id)) {
// Use ProcessGroup
distributed::ProcessGroup* pg = map->get(ring_id);
std::vector<phi::DenseTensor> out_tensor;
out_tensor.emplace_back(*out);
auto task = pg->Recv(out_tensor, 0);
return;
}
auto place = ctx.GetPlace();
auto comm =
paddle::platform::HCCLCommContext::Instance().Get(ring_id, place);
......
......@@ -18,6 +18,8 @@ limitations under the License. */
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#endif
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/phi/api/include/tensor.h"
namespace paddle {
namespace operators {
......@@ -39,6 +41,16 @@ class SendOpV2CUDAKernel : public framework::OpKernel<T> {
peer, 0,
platform::errors::InvalidArgument(
"The peer (%d) for send_v2 op must be non-negative.", peer));
auto map = distributed::ProcessGroupMapFromGid::getInstance();
if (map->has(rid)) {
// Use ProcessGroup
distributed::ProcessGroup* pg = map->get(rid);
std::vector<phi::DenseTensor> in_tensor;
auto x = ctx.Input<framework::LoDTensor>("X");
in_tensor.push_back(*x);
auto task = pg->Send(in_tensor, peer);
return;
}
gpuStream_t stream = nullptr;
auto place = ctx.GetPlace();
auto comm = platform::NCCLCommContext::Instance().Get(rid, place);
......
......@@ -18,6 +18,8 @@ limitations under the License. */
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device/npu/hccl_helper.h"
#endif
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/phi/api/include/tensor.h"
namespace paddle {
namespace operators {
......@@ -34,6 +36,16 @@ class CSendOpASCENDKernel : public framework::OpKernel<T> {
platform::ToHCCLDataType(framework::TransToProtoVarType(x->dtype()));
int ring_id = ctx.Attr<int>("ring_id");
auto map = distributed::ProcessGroupMapFromGid::getInstance();
if (map->has(ring_id)) {
// Use ProcessGroup
distributed::ProcessGroup* pg = map->get(ring_id);
std::vector<phi::DenseTensor> in_tensor;
auto x = ctx.Input<framework::LoDTensor>("X");
in_tensor.push_back(*x);
auto task = pg->Send(in_tensor, 1);
return;
}
auto place = ctx.GetPlace();
auto comm =
paddle::platform::HCCLCommContext::Instance().Get(ring_id, place);
......
......@@ -258,13 +258,13 @@ void BindDistributed(py::module *m) {
#else
const platform::CUDAPlace &,
#endif
int, int, int, int, int, bool, std::string>(),
int, int, int, int, int, bool, std::string, int, int>(),
py::arg("store"), py::arg("rank"), py::arg("world_size"),
py::arg("place"), py::arg("gid") = 0, py::arg("local_rank") = 0,
py::arg("local_size") = 1, py::arg("gloo_rank") = 0,
py::arg("gloo_size") = 1, py::arg("with_switch") = false,
py::arg("switch_endpoint") = "",
py::call_guard<py::gil_scoped_release>());
py::arg("switch_endpoint") = "", py::arg("src_rank") = "",
py::arg("dst_rank") = "", py::call_guard<py::gil_scoped_release>());
#endif
#if defined(PADDLE_WITH_ASCEND_CL)
......
......@@ -263,7 +263,7 @@ def _new_process_group_impl(backend,
rank=global_rank,
world_size=global_world_size,
place=place,
gid=0,
gid=group_id,
local_rank=rank,
local_size=world_size,
gloo_rank=cluster_id,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册