From ade51aa57d3d4357535cdad972307d8854a70d74 Mon Sep 17 00:00:00 2001 From: Ghost Screaming Date: Wed, 30 Aug 2023 14:18:27 +0800 Subject: [PATCH] [Auto Parallel] Compatible new comm library upgrade (#56604) * for verify fluid operator support new comm library * u * u * u * compatiable new comm library upgrade for c_allgather, c_reduce, c_reduce_scatter and c_scatter. * Remove useless comments in process_group.py * Polish code style. * Fix some problems. * Remove use fluid api in phi comm_context_manager. * Add PPADDLE_WITH_CUDA and PADDLE_WITH_NCCL micro judgement. * Fix bug of HIP architecture. * Fix some problems. 1. remove useless loggings. 2. Fix conditional compilation for HIP. 3. Fix problems of test_pass_generation_pipeline.py. It calls paddle.distributed.init_parallel_env() at first, then auto.Engine calls _init_comm(), which will calls process_group.instantiate(). However, init_parallel_env() will call paddle.distributed.barrier(), it will call CreateNCCLEnvCache and create corresponding NCCLCommContext. But dev_id is not set, as a result, NCCLCommContext's dev_ctx is not initialized. * Fix some problems. * Polish code. * Polish code. * Revert compatiable upgrade for communication operators. Their upgrades will be submitted in another PR. * Remove StaticTCPStore. * Remove useless modification. * Remove useless set_cuda_device_id. * Polish code. * Remove fluid header files in phi files. * Remove useless comments. * Fix problems of hip arch. * Fix some problems. * Polish code. * Polish code style. --------- Co-authored-by: hitywt --- paddle/fluid/platform/init.cc | 40 ++++++++++++ paddle/fluid/pybind/communication.cc | 6 +- paddle/phi/common/memory_utils.cc | 28 ++++++++ paddle/phi/common/memory_utils.h | 64 +++++++++++++++++++ .../core/distributed/comm_context_manager.cc | 52 +++++++++++---- .../core/distributed/comm_context_manager.h | 5 +- .../phi/core/distributed/nccl_comm_context.cc | 24 +++++++ .../phi/core/distributed/nccl_comm_context.h | 36 +++++++++++ .../auto_parallel/static/process_group.py | 21 ++++-- python/paddle/distributed/collective.py | 2 +- python/paddle/distributed/parallel.py | 13 ++-- 11 files changed, 262 insertions(+), 29 deletions(-) diff --git a/paddle/fluid/platform/init.cc b/paddle/fluid/platform/init.cc index 2ae413db5e6..2cc3dc7124a 100644 --- a/paddle/fluid/platform/init.cc +++ b/paddle/fluid/platform/init.cc @@ -57,6 +57,11 @@ limitations under the License. */ #include "paddle/phi/common/memory_utils.h" #include "paddle/phi/core/custom_kernel.h" +#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \ + (defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)) +#include "paddle/fluid/platform/device/gpu/gpu_resource_pool.h" +#endif + PHI_DECLARE_int32(paddle_num_threads); PADDLE_DEFINE_EXPORTED_int32( multiple_of_cupti_buffer_size, @@ -440,6 +445,41 @@ void InitMemoryMethod() { #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) memory_method->gpu_memory_usage = paddle::platform::GpuMemoryUsage; #endif + +#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \ + (defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)) + // TODO(GhostScreaming): Use phi methods later. + memory_method->get_allocator = + [](int device_id, phi::gpuStream_t stream) -> phi::Allocator * { + return paddle::memory::allocation::AllocatorFacade::Instance() + .GetAllocator(phi::GPUPlace(device_id), stream) + .get(); + }; + memory_method->get_host_allocator = []() -> phi::Allocator * { + return paddle::memory::allocation::AllocatorFacade::Instance() + .GetAllocator(phi::CPUPlace()) + .get(); + }; + memory_method->get_zero_allocator = [](int device_id) -> phi::Allocator * { + return paddle::memory::allocation::AllocatorFacade::Instance() + .GetZeroAllocator(phi::GPUPlace(device_id)) + .get(); + }; + memory_method->get_host_zero_allocator = []() -> phi::Allocator * { + return paddle::memory::allocation::AllocatorFacade::Instance() + .GetZeroAllocator(phi::CPUPlace()) + .get(); + }; + memory_method->get_pinned_allocator = []() -> phi::Allocator * { + return paddle::memory::allocation::AllocatorFacade::Instance() + .GetAllocator(phi::GPUPinnedPlace()) + .get(); + }; + memory_method->get_new_cuda_event = [](int device_id) { + return paddle::platform::CudaEventResourcePool::Instance().New(device_id); + }; +#endif + memory_method->emplace_device_contexts = paddle::platform::EmplaceDeviceContexts; memory_method->init_devices = InitDevices; diff --git a/paddle/fluid/pybind/communication.cc b/paddle/fluid/pybind/communication.cc index bf58f1d6ac0..59ad5779111 100644 --- a/paddle/fluid/pybind/communication.cc +++ b/paddle/fluid/pybind/communication.cc @@ -42,14 +42,14 @@ void BindCommContextManager(py::module *m) { py::class_>( *m, "CommContextManager") + .def_static("set_device_id", + &phi::distributed::CommContextManager::SetDeviceId, + py::call_guard()) #if defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL) .def_static( "create_nccl_comm_context", &phi::distributed::CommContextManager::CreateNCCLCommContext, py::call_guard()) - .def_static("set_cuda_device_id", - &phi::distributed::CommContextManager::SetCUDADeviceId, - py::call_guard()) #endif #if defined(PADDLE_WITH_GLOO) .def_static( diff --git a/paddle/phi/common/memory_utils.cc b/paddle/phi/common/memory_utils.cc index f9ef6060492..1af8cc442a1 100644 --- a/paddle/phi/common/memory_utils.cc +++ b/paddle/phi/common/memory_utils.cc @@ -90,6 +90,34 @@ void EmplaceDeviceContexts( stream_priority); } +#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \ + (defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)) +const phi::Allocator* GetAllocator(int device_id, phi::gpuStream_t stream) { + return MemoryUtils::Instance().GetAllocator(device_id, stream); +} + +const phi::Allocator* GetHostAllocator() { + return MemoryUtils::Instance().GetHostAllocator(); +} + +const phi::Allocator* GetZeroAllocator(int device_id) { + return MemoryUtils::Instance().GetZeroAllocator(device_id); +} + +const phi::Allocator* GetHostZeroAllocator() { + return MemoryUtils::Instance().GetHostZeroAllocator(); +} + +const phi::Allocator* GetPinnedAllocator() { + return MemoryUtils::Instance().GetPinnedAllocator(); +} + +std::shared_ptr::type> GetCudaEvent( + int device_id) { + return MemoryUtils::Instance().GetCudaEvent(device_id); +} +#endif + } // namespace memory_utils } // namespace phi diff --git a/paddle/phi/common/memory_utils.h b/paddle/phi/common/memory_utils.h index f6a4afcea2f..5f4766f8b6b 100644 --- a/paddle/phi/common/memory_utils.h +++ b/paddle/phi/common/memory_utils.h @@ -24,6 +24,15 @@ #include "paddle/phi/core/macros.h" #include "paddle/phi/core/stream.h" +#ifdef PADDLE_WITH_CUDA +#include +#include +#endif + +#ifdef PADDLE_WITH_HIP +#include +#endif + namespace phi { struct MemoryInterface { @@ -150,6 +159,17 @@ struct MemoryInterface { const std::vector& places, bool disable_setting_default_stream_for_allocator, int stream_priority); + +#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \ + (defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)) + phi::Allocator* (*get_allocator)(int device_id, phi::gpuStream_t stream); + phi::Allocator* (*get_host_allocator)(); + phi::Allocator* (*get_zero_allocator)(int device_id); + phi::Allocator* (*get_host_zero_allocator)(); + phi::Allocator* (*get_pinned_allocator)(); + std::shared_ptr::type> ( + *get_new_cuda_event)(int device_id); +#endif }; class MemoryUtils { @@ -323,6 +343,34 @@ class MemoryUtils { "Fluid. You can call InitMemoryMethod() for initialization.")); } +#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \ + (defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)) + const phi::Allocator* GetAllocator(int device_id, phi::gpuStream_t stream) { + return memory_method_->get_allocator(device_id, stream); + } + + const phi::Allocator* GetHostAllocator() { + return memory_method_->get_host_allocator(); + } + + const phi::Allocator* GetZeroAllocator(int device_id) { + return memory_method_->get_zero_allocator(device_id); + } + + const phi::Allocator* GetHostZeroAllocator() { + return memory_method_->get_host_zero_allocator(); + } + + const phi::Allocator* GetPinnedAllocator() { + return memory_method_->get_pinned_allocator(); + } + + std::shared_ptr::type> GetCudaEvent( + int device_id) { + return memory_method_->get_new_cuda_event(device_id); + } +#endif + private: MemoryUtils() = default; @@ -385,6 +433,22 @@ void EmplaceDeviceContexts( bool disable_setting_default_stream_for_allocator, int stream_priority); +#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \ + (defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)) +const Allocator* GetAllocator(int device_id, phi::gpuStream_t stream); + +const Allocator* GetHostAllocator(); + +const Allocator* GetZeroAllocator(int device_id); + +const Allocator* GetHostZeroAllocator(); + +const Allocator* GetPinnedAllocator(); + +std::shared_ptr::type> GetCudaEvent( + int device_id); +#endif + class Buffer { public: explicit Buffer(const phi::Place& place) : place_(place) {} diff --git a/paddle/phi/core/distributed/comm_context_manager.cc b/paddle/phi/core/distributed/comm_context_manager.cc index 7bbf0612ab3..385bbb137cf 100644 --- a/paddle/phi/core/distributed/comm_context_manager.cc +++ b/paddle/phi/core/distributed/comm_context_manager.cc @@ -12,44 +12,53 @@ // See the License for the specific language governing permissions and // limitations under the License. -#if defined(PADDLE_WITH_GLOO) -#include - -#include "paddle/phi/core/distributed/gloo_comm_context.h" -#include "paddle/phi/core/distributed/gloo_utils.h" -#include "paddle/phi/core/distributed/store/gloo_store.h" -#endif - #include "paddle/phi/core/distributed/comm_context_manager.h" #include #include +#include "glog/logging.h" #include "paddle/phi/backends/gpu/gpu_info.h" #include "paddle/phi/core/distributed/store/store.h" #include "paddle/phi/core/enforce.h" +#if defined(PADDLE_WITH_GLOO) +#include +#include "paddle/phi/core/distributed/gloo_comm_context.h" +#include "paddle/phi/core/distributed/gloo_utils.h" +#include "paddle/phi/core/distributed/store/gloo_store.h" +#endif + #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) +#include "paddle/phi/common/memory_utils.h" #include "paddle/phi/core/distributed/nccl_comm_context.h" #endif #ifdef PADDLE_WITH_CUSTOM_DEVICE -#include "glog/logging.h" #include "paddle/phi/core/distributed/xccl_comm_context.h" #endif namespace phi { namespace distributed { +int CommContextManager::device_id = -1; + +void CommContextManager::SetDeviceId(int dev_id) { #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) -void CommContextManager::SetCUDADeviceId(int dev_id) { phi::backends::gpu::SetDeviceId(dev_id); + CommContextManager::device_id = dev_id; +#endif } +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) void CommContextManager::CreateNCCLCommContext( const std::shared_ptr& store, const std::string& unique_comm_key, int rank, int size) { + auto& comm_context_manager = CommContextManager::GetInstance(); + if (comm_context_manager.Has(unique_comm_key)) { + return; + } ncclUniqueId nccl_id; if (rank == 0) { PADDLE_ENFORCE_GPU_SUCCESS(phi::dynload::ncclGetUniqueId(&nccl_id)); @@ -68,7 +77,28 @@ void CommContextManager::CreateNCCLCommContext( auto nccl_comm_context = std::make_unique(rank, size, nccl_id); - auto& comm_context_manager = CommContextManager::GetInstance(); + + if (CommContextManager::device_id != -1) { + std::unique_ptr dev_ctx( + new phi::GPUContext(phi::GPUPlace(CommContextManager::device_id))); + dev_ctx->SetAllocator(phi::memory_utils::GetAllocator( + CommContextManager::device_id, dev_ctx->stream())); + dev_ctx->SetHostAllocator(phi::memory_utils::GetHostAllocator()); + dev_ctx->SetZeroAllocator( + phi::memory_utils::GetZeroAllocator(CommContextManager::device_id)); + dev_ctx->SetHostZeroAllocator(phi::memory_utils::GetHostZeroAllocator()); + dev_ctx->SetPinnedAllocator(phi::memory_utils::GetPinnedAllocator()); + dev_ctx->PartialInitWithAllocator(); + auto compute_event = + phi::memory_utils::GetCudaEvent(CommContextManager::device_id); + auto comm_event = + phi::memory_utils::GetCudaEvent(CommContextManager::device_id); + + nccl_comm_context->SetDevContext(std::move(dev_ctx)); + nccl_comm_context->SetComputeEvent(std::move(compute_event)); + nccl_comm_context->SetCommEvent(std::move(comm_event)); + } + comm_context_manager.SetStore(store); comm_context_manager.Emplace(unique_comm_key, std::move(nccl_comm_context)); } diff --git a/paddle/phi/core/distributed/comm_context_manager.h b/paddle/phi/core/distributed/comm_context_manager.h index 6d82e89f92b..55fa831c270 100644 --- a/paddle/phi/core/distributed/comm_context_manager.h +++ b/paddle/phi/core/distributed/comm_context_manager.h @@ -46,13 +46,13 @@ class CommContextManager { bool Has(const std::string& unique_comm_key) const; + static void SetDeviceId(int dev_id); + #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) static void CreateNCCLCommContext(const std::shared_ptr& store, const std::string& unique_comm_key, int rank, int size); - - static void SetCUDADeviceId(int dev_id); #endif #if defined(PADDLE_WITH_GLOO) @@ -76,6 +76,7 @@ class CommContextManager { std::unordered_map> id_to_comm_context_; std::shared_ptr store_; + static int device_id; }; } // namespace distributed diff --git a/paddle/phi/core/distributed/nccl_comm_context.cc b/paddle/phi/core/distributed/nccl_comm_context.cc index 2ad3ece71a5..90b6a4c447c 100644 --- a/paddle/phi/core/distributed/nccl_comm_context.cc +++ b/paddle/phi/core/distributed/nccl_comm_context.cc @@ -37,6 +37,30 @@ NCCLCommContext::NCCLCommContext(int rank, int size, ncclUniqueId nccl_id) ncclComm_t NCCLCommContext::GetNcclComm() { return nccl_comm_; } +gpuStream_t NCCLCommContext::GetStream() { return dev_ctx_->stream(); } + +phi::GPUContext* NCCLCommContext::GetDevContext() { return dev_ctx_.get(); } + +void NCCLCommContext::SetDevContext( + std::unique_ptr&& dev_ctx) { + dev_ctx_ = std::move(dev_ctx); +} + +gpuEvent_t NCCLCommContext::GetComputeEvent() { return compute_event_.get(); } + +void NCCLCommContext::SetComputeEvent( + std::shared_ptr::type>&& + compute_event) { + compute_event_ = std::move(compute_event); +} + +gpuEvent_t NCCLCommContext::GetCommEvent() { return comm_event_.get(); } + +void NCCLCommContext::SetCommEvent( + std::shared_ptr::type>&& comm_event) { + comm_event_ = std::move(comm_event); +} + void NCCLCommContext::Broadcast(phi::DenseTensor* out_tensor, const phi::DenseTensor& in_tensor, int root, diff --git a/paddle/phi/core/distributed/nccl_comm_context.h b/paddle/phi/core/distributed/nccl_comm_context.h index b8f14cef131..fdd45793a63 100644 --- a/paddle/phi/core/distributed/nccl_comm_context.h +++ b/paddle/phi/core/distributed/nccl_comm_context.h @@ -13,6 +13,16 @@ // limitations under the License. #pragma once +#ifdef PADDLE_WITH_CUDA +#include +#include +#endif + +#ifdef PADDLE_WITH_HIP +#include +#endif + +#include "paddle/phi/backends/gpu/gpu_context.h" #include "paddle/phi/backends/gpu/gpu_decls.h" #include "paddle/phi/core/distributed/comm_context.h" #include "paddle/phi/core/macros.h" @@ -30,9 +40,27 @@ namespace distributed { class NCCLCommContext final : public CommContext { public: NCCLCommContext(int rank, int size, ncclUniqueId nccl_id); + ~NCCLCommContext() {} ncclComm_t GetNcclComm(); + gpuStream_t GetStream(); + + gpuEvent_t GetComputeEvent(); + + void SetComputeEvent( + std::shared_ptr::type>&& + compute_event); + + gpuEvent_t GetCommEvent(); + + void SetCommEvent( + std::shared_ptr::type>&& comm_event); + + phi::GPUContext* GetDevContext(); + + void SetDevContext(std::unique_ptr&& dev_ctx); + void Broadcast(phi::DenseTensor* out_tensor, const phi::DenseTensor& in_tensor, int root, @@ -75,6 +103,14 @@ class NCCLCommContext final : public CommContext { DISABLE_COPY_AND_ASSIGN(NCCLCommContext); ncclComm_t nccl_comm_; + + std::unique_ptr dev_ctx_; + + // used for comm wait compute, compute_stream-->event-->comm_stream + std::shared_ptr::type> compute_event_; + + // used for compute wait comm, comm_stream-->event-->compute_stream + std::shared_ptr::type> comm_event_; }; } // namespace distributed diff --git a/python/paddle/distributed/auto_parallel/static/process_group.py b/python/paddle/distributed/auto_parallel/static/process_group.py index 285184db14c..9bbe1fbcf48 100644 --- a/python/paddle/distributed/auto_parallel/static/process_group.py +++ b/python/paddle/distributed/auto_parallel/static/process_group.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License +import os from collections import OrderedDict import paddle @@ -146,9 +147,6 @@ class ProcessGroup: global_rank = genv.rank if self.nranks >= 2 and global_rank in self.ranks: - logger.info( - f"group_id: {self.id}, ranks: {self.ranks}, nranks: {self.nranks}, trainer_endpoints: {genv.current_endpoint}" - ) strategy = core.ParallelStrategy() strategy.nranks = self.nranks strategy.local_rank = self.local_rank(global_rank) @@ -159,9 +157,22 @@ class ProcessGroup: strategy.nrings = 1 if core.is_compiled_with_cuda(): place = core.CUDAPlace(genv.device_id) - core.NCCLParallelContext(strategy, place).init_with_ring_id( - ring_id + use_new_comm = os.getenv( + "FLAGS_dynamic_static_unified_comm", "0" ) + if use_new_comm in ["1", "True", "true"]: + store = core.create_or_get_global_tcp_store() + core.CommContextManager.set_device_id(genv.device_id) + core.CommContextManager.create_nccl_comm_context( + store, + str(ring_id), + strategy.local_rank, + strategy.nranks, + ) + else: + core.NCCLParallelContext(strategy, place).init_with_ring_id( + ring_id + ) elif core.is_compiled_with_xpu(): place = core.XPUPlace(genv.device_id) core.BKCLParallelContext(strategy, place).init_with_ring_id( diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index b1d61196e05..144be980a75 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -330,7 +330,7 @@ def _init_parallel_env(backend): store, "0", rank, world_size ) elif backend == "nccl": - core.CommContextManager.set_cuda_device_id(dev_id) + core.CommContextManager.set_device_id(dev_id) core.CommContextManager.create_nccl_comm_context( store, "0", rank, world_size ) diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index 217920debe4..67452c1a4e1 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -1084,13 +1084,7 @@ def init_parallel_env(): master_port = int(master_port) is_master = rank == 0 stop_check_timeout = int(os.getenv("FLAGS_stop_check_timeout", "900")) - default_store = core.TCPStore( - master_addr, - master_port, - is_master, - world_size, - timeout=stop_check_timeout, - ) + default_store = core.create_or_get_global_tcp_store() _set_default_store(default_store) pg = _new_process_group_impl( backend, @@ -1108,6 +1102,11 @@ def init_parallel_env(): _add_new_group(group) parallel_helper._set_parallel_ctx(True) + # barrier will call CreateNCCLEnvCache which will call CreateNCCLCommContext. + # Set device_id to prevent creating null dev_ctx. + # TODO(mine): support XPU and other backends. + if backend in ["nccl", 'xccl', 'bkcl']: + core.CommContextManager.set_device_id(parallel_env.device_id) paddle.distributed.barrier(group=group) return group -- GitLab