From df1132080e12635ad866bff8ef20c5f3b48f3a89 Mon Sep 17 00:00:00 2001 From: lilong12 Date: Fri, 6 May 2022 14:27:46 +0800 Subject: [PATCH] add send/recv for ProcessGroupHeter (#42318) --- CMakeLists.txt | 8 +- cmake/flags.cmake | 4 + .../collective/ProcessGroupHeter.cc | 106 +++++++++++++++++- .../collective/ProcessGroupHeter.h | 13 ++- .../operators/collective/recv_v2_op.cu.cc | 17 +++ .../operators/collective/recv_v2_op_npu.cc | 11 ++ .../operators/collective/send_v2_op.cu.cc | 12 ++ .../operators/collective/send_v2_op_npu.cc | 12 ++ paddle/fluid/pybind/distributed_py.cc | 6 +- python/paddle/distributed/collective.py | 2 +- 10 files changed, 182 insertions(+), 9 deletions(-) mode change 100755 => 100644 paddle/fluid/distributed/collective/ProcessGroupHeter.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 9002cb287e..ff49ba164d 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -100,7 +100,11 @@ if(APPLE AND WITH_ARM) endif() if(WITH_ASCEND_CL AND NOT WITH_ASCEND_CXX11) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_GLIBCXX_USE_CXX11_ABI=0") + if(WITH_ARM_BRPC) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_GLIBCXX_USE_CXX11_ABI=1") + else() + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_GLIBCXX_USE_CXX11_ABI=0") + endif() endif() if(WIN32) @@ -386,7 +390,7 @@ if(WITH_DISTRIBUTE) if(LINUX) set(WITH_GLOO ON CACHE STRING "Enable GLOO when compiling WITH_DISTRIBUTE=ON." FORCE) endif() - if(WITH_ASCEND_CL) + if(WITH_ASCEND_CL AND NOT WITH_ARM_BRPC) # disable WITH_PSCORE for NPU before include third_party MESSAGE(WARNING "Disable WITH_PSCORE when compiling with NPU. Force WITH_PSCORE=OFF.") set(WITH_PSCORE OFF CACHE BOOL "Disable WITH_PSCORE when compiling with NPU" FORCE) diff --git a/cmake/flags.cmake b/cmake/flags.cmake index c1a7ba6d90..f9cac0579f 100644 --- a/cmake/flags.cmake +++ b/cmake/flags.cmake @@ -158,6 +158,10 @@ if(WITH_IPU) ) endif() +if(WITH_ASCEND_CL AND WITH_ARM_BRPC) + set(COMMON_FLAGS ${COMMON_FLAGS} -faligned-new) +endif() + if(NOT APPLE) if((${CMAKE_CXX_COMPILER_VERSION} VERSION_GREATER 8.0) OR (WITH_ROCM)) set(COMMON_FLAGS diff --git a/paddle/fluid/distributed/collective/ProcessGroupHeter.cc b/paddle/fluid/distributed/collective/ProcessGroupHeter.cc old mode 100755 new mode 100644 index ba57342081..31f9b26e73 --- a/paddle/fluid/distributed/collective/ProcessGroupHeter.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupHeter.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "paddle/fluid/distributed/collective/ProcessGroupHeter.h" +#include #include "paddle/fluid/platform/device/gpu/nccl_helper.h" #include "paddle/fluid/platform/place.h" #include "paddle/phi/api/include/api.h" @@ -24,6 +25,8 @@ namespace paddle { namespace distributed { using Place = paddle::platform::Place; +int ProcessGroupHeter::send_count = 0; +int ProcessGroupHeter::recv_count = 0; std::shared_ptr ProcessGroupHeter::CreateTask( int rank, CommType comm_type, const std::vector& inputs) { @@ -47,7 +50,8 @@ bool ProcessGroupHeter::HeterTask::Wait(std::chrono::milliseconds timeout) { ProcessGroupHeter::ProcessGroupHeter( const std::shared_ptr& store, int rank, int size, const platform::Place& place, int gid, int local_rank, int local_size, - int gloo_rank, int gloo_size, bool with_switch, std::string switch_endpoint) + int gloo_rank, int gloo_size, bool with_switch, std::string switch_endpoint, + int src_rank, int dst_rank) : ProcessGroup(rank, size, place, gid), store_(store), local_rank_(local_rank), @@ -55,7 +59,10 @@ ProcessGroupHeter::ProcessGroupHeter( gloo_rank_(gloo_rank), gloo_size_(gloo_size), with_switch_(with_switch), - switch_endpoint_(switch_endpoint) { + switch_endpoint_(switch_endpoint), + src_rank_(src_rank), + dst_rank_(dst_rank) { + return; #if defined(PADDLE_WITH_NCCL) inner_pg_ = std::make_shared(store, local_rank, local_size, place_, IGNORE_ID); @@ -246,5 +253,100 @@ std::shared_ptr ProcessGroupHeter::Broadcast( return CreateTask(rank_, CommType::BROADCAST, in_tensors); } +std::shared_ptr ProcessGroupHeter::Send( + std::vector& in_tensors, int peer) { +#if defined(PADDLE_WITH_NCCL) + PADDLE_ENFORCE_EQ( + CheckTensorsInCudaPlace(in_tensors), true, + platform::errors::InvalidArgument("All inputs should be in CudaPlace.")); +#endif + + PADDLE_ENFORCE_EQ( + in_tensors.size(), 1, + platform::errors::PreconditionNotMet( + "For each send operation, there can only be one tensor to send.")); + // Copy Tensor to cpu + auto start = std::chrono::high_resolution_clock::now(); + phi::DenseTensor cpu_tensor; + auto& gpu_tensor = in_tensors[0]; + framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor); + PADDLE_ENFORCE_EQ(with_switch_, true, + platform::errors::PreconditionNotMet( + "Gloo does not support the send operation.")); + auto end = std::chrono::high_resolution_clock::now(); + std::chrono::duration diff = end - start; + VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims() + << ") from gpu to cpu for send " << std::setw(9) + << " is: " << diff.count() << " s" << std::endl; + + // Send to switch + HeterClient* client_ = + HeterClient::GetInstance({switch_endpoint_}, {}, 0).get(); + int64_t tensor_size = + cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype()); + std::vector send_size; + send_size.push_back(tensor_size); + auto id = src_rank_ * 10000 + dst_rank_; + std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) + + std::string("_") + std::to_string(send_count++); + VLOG(2) << "tensor_name:" << tensor_name; + int ret = client_->Send(gid_, {tensor_name}, send_size, cpu_tensor.data(), + tensor_size); + PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( + "Send to the switch module error.")); + return CreateTask(rank_, CommType::SEND, in_tensors); +} + +std::shared_ptr ProcessGroupHeter::Recv( + std::vector& out_tensors, int peer) { +#if defined(PADDLE_WITH_NCCL) + PADDLE_ENFORCE_EQ( + CheckTensorsInCudaPlace(out_tensors), true, + platform::errors::InvalidArgument("All inputs should be in CudaPlace.")); +#endif + + PADDLE_ENFORCE_EQ( + out_tensors.size(), 1, + platform::errors::PreconditionNotMet( + "For each rece operation, there can only be one tensor to receive.")); + + // Copy Tensor to cpu + phi::DenseTensor cpu_tensor; + auto& gpu_tensor = out_tensors[0]; + cpu_tensor.Resize(gpu_tensor.dims()); + cpu_tensor.set_layout(gpu_tensor.layout()); + cpu_tensor.mutable_data(platform::CPUPlace(), gpu_tensor.dtype()); + + PADDLE_ENFORCE_EQ(with_switch_, true, + platform::errors::PreconditionNotMet( + "Gloo does not support the send operation.")); + // recv from switch + HeterClient* client_ = + HeterClient::GetInstance({switch_endpoint_}, {}, 0).get(); + auto id = src_rank_ * 10000 + dst_rank_; + std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) + + std::string("_") + std::to_string(recv_count++); + VLOG(2) << "tensor_name: " << tensor_name; + auto start = std::chrono::high_resolution_clock::now(); + int ret = client_->Recv( + gid_, {tensor_name}, cpu_tensor.data(), + cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype())); + PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( + "receive to the switch module error.")); + auto end = std::chrono::high_resolution_clock::now(); + std::chrono::duration diff = end - start; + double goodput = cpu_tensor.numel() * + framework::DataTypeSize(cpu_tensor.dtype()) / diff.count(); + VLOG(2) << "Goodput: " << goodput << "B/s" << std::endl; + start = std::chrono::high_resolution_clock::now(); + framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor); + end = std::chrono::high_resolution_clock::now(); + diff = end - start; + VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims() + << ") from gpu to cpu for recv " << std::setw(9) + << " is: " << diff.count() << " s" << std::endl; + return CreateTask(rank_, CommType::RECV, out_tensors); +} + } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/collective/ProcessGroupHeter.h b/paddle/fluid/distributed/collective/ProcessGroupHeter.h index 640acdfb6a..89b0f078b4 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupHeter.h +++ b/paddle/fluid/distributed/collective/ProcessGroupHeter.h @@ -83,7 +83,8 @@ class ProcessGroupHeter : public ProcessGroup { ProcessGroupHeter(const std::shared_ptr& store, int rank, int size, const platform::Place& place, int gid, int local_rank, int local_size, int gloo_rank, int gloo_size, - bool with_switch, std::string switch_endpoints); + bool with_switch, std::string switch_endpoints, + int src_rank, int dst_rank); const std::string GetBackendName() const override { return std::string(HETER_BACKEND_NAME); @@ -97,6 +98,12 @@ class ProcessGroupHeter : public ProcessGroup { std::vector&, std::vector&, const BroadcastOptions& = BroadcastOptions()) override; + std::shared_ptr Send( + std::vector& in_tensors, int peer) override; + + std::shared_ptr Recv( + std::vector& out_tensors, int peer) override; + protected: virtual std::shared_ptr CreateTask( int rank, CommType opType, const std::vector& inputs); @@ -112,6 +119,10 @@ class ProcessGroupHeter : public ProcessGroup { int gloo_size_; bool with_switch_; std::string switch_endpoint_; + int src_rank_; + int dst_rank_; + static int send_count; + static int recv_count; }; } // namespace distributed diff --git a/paddle/fluid/operators/collective/recv_v2_op.cu.cc b/paddle/fluid/operators/collective/recv_v2_op.cu.cc index 784385d79b..96b27a833f 100644 --- a/paddle/fluid/operators/collective/recv_v2_op.cu.cc +++ b/paddle/fluid/operators/collective/recv_v2_op.cu.cc @@ -19,6 +19,9 @@ limitations under the License. */ #include "paddle/fluid/platform/device/gpu/nccl_helper.h" #endif +#include "paddle/fluid/distributed/collective/ProcessGroup.h" +#include "paddle/phi/api/include/tensor.h" + namespace paddle { namespace operators { @@ -42,6 +45,20 @@ class RecvOpV2CUDAKernel : public framework::OpKernel { gpuStream_t stream = nullptr; auto place = ctx.GetPlace(); + auto map = distributed::ProcessGroupMapFromGid::getInstance(); + if (map->has(rid)) { + // Use ProcessGroup + distributed::ProcessGroup *pg = map->get(rid); + std::vector out_tensor; + auto out_shape = ctx.Attr>("out_shape"); + auto out = ctx.Output("Out"); + auto out_dims = out->dims(); + out->mutable_data(out_dims, place); + + out_tensor.emplace_back(*out); + auto task = pg->Recv(out_tensor, peer); + return; + } auto comm = platform::NCCLCommContext::Instance().Get(rid, place); if (ctx.Attr("use_calc_stream")) { auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); diff --git a/paddle/fluid/operators/collective/recv_v2_op_npu.cc b/paddle/fluid/operators/collective/recv_v2_op_npu.cc index 6a2244b910..c31f1210f0 100644 --- a/paddle/fluid/operators/collective/recv_v2_op_npu.cc +++ b/paddle/fluid/operators/collective/recv_v2_op_npu.cc @@ -18,6 +18,8 @@ limitations under the License. */ #include "paddle/fluid/platform/collective_helper.h" #include "paddle/fluid/platform/device/npu/hccl_helper.h" #endif +#include "paddle/fluid/distributed/collective/ProcessGroup.h" +#include "paddle/phi/api/include/tensor.h" namespace paddle { namespace operators { @@ -35,6 +37,15 @@ class CRecvOpASCENDKernel : public framework::OpKernel { platform::ToHCCLDataType(framework::TransToProtoVarType(out->dtype())); int ring_id = ctx.Attr("ring_id"); + auto map = distributed::ProcessGroupMapFromGid::getInstance(); + if (map->has(ring_id)) { + // Use ProcessGroup + distributed::ProcessGroup* pg = map->get(ring_id); + std::vector out_tensor; + out_tensor.emplace_back(*out); + auto task = pg->Recv(out_tensor, 0); + return; + } auto place = ctx.GetPlace(); auto comm = paddle::platform::HCCLCommContext::Instance().Get(ring_id, place); diff --git a/paddle/fluid/operators/collective/send_v2_op.cu.cc b/paddle/fluid/operators/collective/send_v2_op.cu.cc index 3e565d1b97..add352306f 100644 --- a/paddle/fluid/operators/collective/send_v2_op.cu.cc +++ b/paddle/fluid/operators/collective/send_v2_op.cu.cc @@ -18,6 +18,8 @@ limitations under the License. */ #include "paddle/fluid/platform/collective_helper.h" #include "paddle/fluid/platform/device/gpu/nccl_helper.h" #endif +#include "paddle/fluid/distributed/collective/ProcessGroup.h" +#include "paddle/phi/api/include/tensor.h" namespace paddle { namespace operators { @@ -39,6 +41,16 @@ class SendOpV2CUDAKernel : public framework::OpKernel { peer, 0, platform::errors::InvalidArgument( "The peer (%d) for send_v2 op must be non-negative.", peer)); + auto map = distributed::ProcessGroupMapFromGid::getInstance(); + if (map->has(rid)) { + // Use ProcessGroup + distributed::ProcessGroup* pg = map->get(rid); + std::vector in_tensor; + auto x = ctx.Input("X"); + in_tensor.push_back(*x); + auto task = pg->Send(in_tensor, peer); + return; + } gpuStream_t stream = nullptr; auto place = ctx.GetPlace(); auto comm = platform::NCCLCommContext::Instance().Get(rid, place); diff --git a/paddle/fluid/operators/collective/send_v2_op_npu.cc b/paddle/fluid/operators/collective/send_v2_op_npu.cc index 3bc5487371..2d7382f3df 100644 --- a/paddle/fluid/operators/collective/send_v2_op_npu.cc +++ b/paddle/fluid/operators/collective/send_v2_op_npu.cc @@ -18,6 +18,8 @@ limitations under the License. */ #include "paddle/fluid/platform/collective_helper.h" #include "paddle/fluid/platform/device/npu/hccl_helper.h" #endif +#include "paddle/fluid/distributed/collective/ProcessGroup.h" +#include "paddle/phi/api/include/tensor.h" namespace paddle { namespace operators { @@ -34,6 +36,16 @@ class CSendOpASCENDKernel : public framework::OpKernel { platform::ToHCCLDataType(framework::TransToProtoVarType(x->dtype())); int ring_id = ctx.Attr("ring_id"); + auto map = distributed::ProcessGroupMapFromGid::getInstance(); + if (map->has(ring_id)) { + // Use ProcessGroup + distributed::ProcessGroup* pg = map->get(ring_id); + std::vector in_tensor; + auto x = ctx.Input("X"); + in_tensor.push_back(*x); + auto task = pg->Send(in_tensor, 1); + return; + } auto place = ctx.GetPlace(); auto comm = paddle::platform::HCCLCommContext::Instance().Get(ring_id, place); diff --git a/paddle/fluid/pybind/distributed_py.cc b/paddle/fluid/pybind/distributed_py.cc index ab8bf0529d..6636fc8aca 100644 --- a/paddle/fluid/pybind/distributed_py.cc +++ b/paddle/fluid/pybind/distributed_py.cc @@ -258,13 +258,13 @@ void BindDistributed(py::module *m) { #else const platform::CUDAPlace &, #endif - int, int, int, int, int, bool, std::string>(), + int, int, int, int, int, bool, std::string, int, int>(), py::arg("store"), py::arg("rank"), py::arg("world_size"), py::arg("place"), py::arg("gid") = 0, py::arg("local_rank") = 0, py::arg("local_size") = 1, py::arg("gloo_rank") = 0, py::arg("gloo_size") = 1, py::arg("with_switch") = false, - py::arg("switch_endpoint") = "", - py::call_guard()); + py::arg("switch_endpoint") = "", py::arg("src_rank") = "", + py::arg("dst_rank") = "", py::call_guard()); #endif #if defined(PADDLE_WITH_ASCEND_CL) diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index b2d146297d..e33a3dba66 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -263,7 +263,7 @@ def _new_process_group_impl(backend, rank=global_rank, world_size=global_world_size, place=place, - gid=0, + gid=group_id, local_rank=rank, local_size=world_size, gloo_rank=cluster_id, -- GitLab