未验证 提交 ade51aa5 编写于 作者: G Ghost Screaming 提交者: GitHub

[Auto Parallel] Compatible new comm library upgrade (#56604)

* for verify

fluid operator support new comm library

* u

* u

* u

* compatiable new comm library upgrade for c_allgather, c_reduce, c_reduce_scatter and c_scatter.

* Remove useless comments in process_group.py

* Polish code style.

* Fix some problems.

* Remove use fluid api in phi comm_context_manager.

* Add PPADDLE_WITH_CUDA and PADDLE_WITH_NCCL micro judgement.

* Fix bug of HIP architecture.

* Fix some problems.
1. remove useless loggings.
2. Fix conditional compilation for HIP.
3. Fix problems of test_pass_generation_pipeline.py. It calls paddle.distributed.init_parallel_env() at first,
then auto.Engine calls _init_comm(), which will calls process_group.instantiate(). However, init_parallel_env() will call
paddle.distributed.barrier(), it will call CreateNCCLEnvCache and create corresponding NCCLCommContext. But dev_id is not
set, as a result, NCCLCommContext's dev_ctx is not initialized.

* Fix some problems.

* Polish code.

* Polish code.

* Revert compatiable upgrade for communication operators. Their upgrades
will be submitted in another PR.

* Remove StaticTCPStore.

* Remove useless modification.

* Remove useless set_cuda_device_id.

* Polish code.

* Remove fluid header files in phi files.

* Remove useless comments.

* Fix problems of hip arch.

* Fix some problems.

* Polish code.

* Polish code style.

---------
Co-authored-by: TaoTao Li's avatarhitywt <yuwentao126@126.com>
上级 ded10442
......@@ -57,6 +57,11 @@ limitations under the License. */
#include "paddle/phi/common/memory_utils.h"
#include "paddle/phi/core/custom_kernel.h"
#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \
(defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL))
#include "paddle/fluid/platform/device/gpu/gpu_resource_pool.h"
#endif
PHI_DECLARE_int32(paddle_num_threads);
PADDLE_DEFINE_EXPORTED_int32(
multiple_of_cupti_buffer_size,
......@@ -440,6 +445,41 @@ void InitMemoryMethod() {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
memory_method->gpu_memory_usage = paddle::platform::GpuMemoryUsage;
#endif
#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \
(defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL))
// TODO(GhostScreaming): Use phi methods later.
memory_method->get_allocator =
[](int device_id, phi::gpuStream_t stream) -> phi::Allocator * {
return paddle::memory::allocation::AllocatorFacade::Instance()
.GetAllocator(phi::GPUPlace(device_id), stream)
.get();
};
memory_method->get_host_allocator = []() -> phi::Allocator * {
return paddle::memory::allocation::AllocatorFacade::Instance()
.GetAllocator(phi::CPUPlace())
.get();
};
memory_method->get_zero_allocator = [](int device_id) -> phi::Allocator * {
return paddle::memory::allocation::AllocatorFacade::Instance()
.GetZeroAllocator(phi::GPUPlace(device_id))
.get();
};
memory_method->get_host_zero_allocator = []() -> phi::Allocator * {
return paddle::memory::allocation::AllocatorFacade::Instance()
.GetZeroAllocator(phi::CPUPlace())
.get();
};
memory_method->get_pinned_allocator = []() -> phi::Allocator * {
return paddle::memory::allocation::AllocatorFacade::Instance()
.GetAllocator(phi::GPUPinnedPlace())
.get();
};
memory_method->get_new_cuda_event = [](int device_id) {
return paddle::platform::CudaEventResourcePool::Instance().New(device_id);
};
#endif
memory_method->emplace_device_contexts =
paddle::platform::EmplaceDeviceContexts;
memory_method->init_devices = InitDevices;
......
......@@ -42,14 +42,14 @@ void BindCommContextManager(py::module *m) {
py::class_<phi::distributed::CommContextManager,
std::shared_ptr<phi::distributed::CommContextManager>>(
*m, "CommContextManager")
.def_static("set_device_id",
&phi::distributed::CommContextManager::SetDeviceId,
py::call_guard<py::gil_scoped_release>())
#if defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)
.def_static(
"create_nccl_comm_context",
&phi::distributed::CommContextManager::CreateNCCLCommContext,
py::call_guard<py::gil_scoped_release>())
.def_static("set_cuda_device_id",
&phi::distributed::CommContextManager::SetCUDADeviceId,
py::call_guard<py::gil_scoped_release>())
#endif
#if defined(PADDLE_WITH_GLOO)
.def_static(
......
......@@ -90,6 +90,34 @@ void EmplaceDeviceContexts(
stream_priority);
}
#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \
(defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL))
const phi::Allocator* GetAllocator(int device_id, phi::gpuStream_t stream) {
return MemoryUtils::Instance().GetAllocator(device_id, stream);
}
const phi::Allocator* GetHostAllocator() {
return MemoryUtils::Instance().GetHostAllocator();
}
const phi::Allocator* GetZeroAllocator(int device_id) {
return MemoryUtils::Instance().GetZeroAllocator(device_id);
}
const phi::Allocator* GetHostZeroAllocator() {
return MemoryUtils::Instance().GetHostZeroAllocator();
}
const phi::Allocator* GetPinnedAllocator() {
return MemoryUtils::Instance().GetPinnedAllocator();
}
std::shared_ptr<std::remove_pointer<phi::gpuEvent_t>::type> GetCudaEvent(
int device_id) {
return MemoryUtils::Instance().GetCudaEvent(device_id);
}
#endif
} // namespace memory_utils
} // namespace phi
......@@ -24,6 +24,15 @@
#include "paddle/phi/core/macros.h"
#include "paddle/phi/core/stream.h"
#ifdef PADDLE_WITH_CUDA
#include <cuda.h>
#include <cuda_runtime.h>
#endif
#ifdef PADDLE_WITH_HIP
#include <hip/hip_runtime.h>
#endif
namespace phi {
struct MemoryInterface {
......@@ -150,6 +159,17 @@ struct MemoryInterface {
const std::vector<phi::Place>& places,
bool disable_setting_default_stream_for_allocator,
int stream_priority);
#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \
(defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL))
phi::Allocator* (*get_allocator)(int device_id, phi::gpuStream_t stream);
phi::Allocator* (*get_host_allocator)();
phi::Allocator* (*get_zero_allocator)(int device_id);
phi::Allocator* (*get_host_zero_allocator)();
phi::Allocator* (*get_pinned_allocator)();
std::shared_ptr<std::remove_pointer<phi::gpuEvent_t>::type> (
*get_new_cuda_event)(int device_id);
#endif
};
class MemoryUtils {
......@@ -323,6 +343,34 @@ class MemoryUtils {
"Fluid. You can call InitMemoryMethod() for initialization."));
}
#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \
(defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL))
const phi::Allocator* GetAllocator(int device_id, phi::gpuStream_t stream) {
return memory_method_->get_allocator(device_id, stream);
}
const phi::Allocator* GetHostAllocator() {
return memory_method_->get_host_allocator();
}
const phi::Allocator* GetZeroAllocator(int device_id) {
return memory_method_->get_zero_allocator(device_id);
}
const phi::Allocator* GetHostZeroAllocator() {
return memory_method_->get_host_zero_allocator();
}
const phi::Allocator* GetPinnedAllocator() {
return memory_method_->get_pinned_allocator();
}
std::shared_ptr<std::remove_pointer<phi::gpuEvent_t>::type> GetCudaEvent(
int device_id) {
return memory_method_->get_new_cuda_event(device_id);
}
#endif
private:
MemoryUtils() = default;
......@@ -385,6 +433,22 @@ void EmplaceDeviceContexts(
bool disable_setting_default_stream_for_allocator,
int stream_priority);
#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \
(defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL))
const Allocator* GetAllocator(int device_id, phi::gpuStream_t stream);
const Allocator* GetHostAllocator();
const Allocator* GetZeroAllocator(int device_id);
const Allocator* GetHostZeroAllocator();
const Allocator* GetPinnedAllocator();
std::shared_ptr<std::remove_pointer<phi::gpuEvent_t>::type> GetCudaEvent(
int device_id);
#endif
class Buffer {
public:
explicit Buffer(const phi::Place& place) : place_(place) {}
......
......@@ -12,44 +12,53 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#if defined(PADDLE_WITH_GLOO)
#include <gloo/rendezvous/prefix_store.h>
#include "paddle/phi/core/distributed/gloo_comm_context.h"
#include "paddle/phi/core/distributed/gloo_utils.h"
#include "paddle/phi/core/distributed/store/gloo_store.h"
#endif
#include "paddle/phi/core/distributed/comm_context_manager.h"
#include <memory>
#include <string>
#include "glog/logging.h"
#include "paddle/phi/backends/gpu/gpu_info.h"
#include "paddle/phi/core/distributed/store/store.h"
#include "paddle/phi/core/enforce.h"
#if defined(PADDLE_WITH_GLOO)
#include <gloo/rendezvous/prefix_store.h>
#include "paddle/phi/core/distributed/gloo_comm_context.h"
#include "paddle/phi/core/distributed/gloo_utils.h"
#include "paddle/phi/core/distributed/store/gloo_store.h"
#endif
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
#include "paddle/phi/common/memory_utils.h"
#include "paddle/phi/core/distributed/nccl_comm_context.h"
#endif
#ifdef PADDLE_WITH_CUSTOM_DEVICE
#include "glog/logging.h"
#include "paddle/phi/core/distributed/xccl_comm_context.h"
#endif
namespace phi {
namespace distributed {
int CommContextManager::device_id = -1;
void CommContextManager::SetDeviceId(int dev_id) {
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
void CommContextManager::SetCUDADeviceId(int dev_id) {
phi::backends::gpu::SetDeviceId(dev_id);
CommContextManager::device_id = dev_id;
#endif
}
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
void CommContextManager::CreateNCCLCommContext(
const std::shared_ptr<Store>& store,
const std::string& unique_comm_key,
int rank,
int size) {
auto& comm_context_manager = CommContextManager::GetInstance();
if (comm_context_manager.Has(unique_comm_key)) {
return;
}
ncclUniqueId nccl_id;
if (rank == 0) {
PADDLE_ENFORCE_GPU_SUCCESS(phi::dynload::ncclGetUniqueId(&nccl_id));
......@@ -68,7 +77,28 @@ void CommContextManager::CreateNCCLCommContext(
auto nccl_comm_context =
std::make_unique<NCCLCommContext>(rank, size, nccl_id);
auto& comm_context_manager = CommContextManager::GetInstance();
if (CommContextManager::device_id != -1) {
std::unique_ptr<phi::GPUContext> dev_ctx(
new phi::GPUContext(phi::GPUPlace(CommContextManager::device_id)));
dev_ctx->SetAllocator(phi::memory_utils::GetAllocator(
CommContextManager::device_id, dev_ctx->stream()));
dev_ctx->SetHostAllocator(phi::memory_utils::GetHostAllocator());
dev_ctx->SetZeroAllocator(
phi::memory_utils::GetZeroAllocator(CommContextManager::device_id));
dev_ctx->SetHostZeroAllocator(phi::memory_utils::GetHostZeroAllocator());
dev_ctx->SetPinnedAllocator(phi::memory_utils::GetPinnedAllocator());
dev_ctx->PartialInitWithAllocator();
auto compute_event =
phi::memory_utils::GetCudaEvent(CommContextManager::device_id);
auto comm_event =
phi::memory_utils::GetCudaEvent(CommContextManager::device_id);
nccl_comm_context->SetDevContext(std::move(dev_ctx));
nccl_comm_context->SetComputeEvent(std::move(compute_event));
nccl_comm_context->SetCommEvent(std::move(comm_event));
}
comm_context_manager.SetStore(store);
comm_context_manager.Emplace(unique_comm_key, std::move(nccl_comm_context));
}
......
......@@ -46,13 +46,13 @@ class CommContextManager {
bool Has(const std::string& unique_comm_key) const;
static void SetDeviceId(int dev_id);
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
static void CreateNCCLCommContext(const std::shared_ptr<Store>& store,
const std::string& unique_comm_key,
int rank,
int size);
static void SetCUDADeviceId(int dev_id);
#endif
#if defined(PADDLE_WITH_GLOO)
......@@ -76,6 +76,7 @@ class CommContextManager {
std::unordered_map<std::string, std::unique_ptr<CommContext>>
id_to_comm_context_;
std::shared_ptr<Store> store_;
static int device_id;
};
} // namespace distributed
......
......@@ -37,6 +37,30 @@ NCCLCommContext::NCCLCommContext(int rank, int size, ncclUniqueId nccl_id)
ncclComm_t NCCLCommContext::GetNcclComm() { return nccl_comm_; }
gpuStream_t NCCLCommContext::GetStream() { return dev_ctx_->stream(); }
phi::GPUContext* NCCLCommContext::GetDevContext() { return dev_ctx_.get(); }
void NCCLCommContext::SetDevContext(
std::unique_ptr<phi::GPUContext>&& dev_ctx) {
dev_ctx_ = std::move(dev_ctx);
}
gpuEvent_t NCCLCommContext::GetComputeEvent() { return compute_event_.get(); }
void NCCLCommContext::SetComputeEvent(
std::shared_ptr<std::remove_pointer<phi::gpuEvent_t>::type>&&
compute_event) {
compute_event_ = std::move(compute_event);
}
gpuEvent_t NCCLCommContext::GetCommEvent() { return comm_event_.get(); }
void NCCLCommContext::SetCommEvent(
std::shared_ptr<std::remove_pointer<phi::gpuEvent_t>::type>&& comm_event) {
comm_event_ = std::move(comm_event);
}
void NCCLCommContext::Broadcast(phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
int root,
......
......@@ -13,6 +13,16 @@
// limitations under the License.
#pragma once
#ifdef PADDLE_WITH_CUDA
#include <cuda.h>
#include <cuda_runtime.h>
#endif
#ifdef PADDLE_WITH_HIP
#include <hip/hip_runtime.h>
#endif
#include "paddle/phi/backends/gpu/gpu_context.h"
#include "paddle/phi/backends/gpu/gpu_decls.h"
#include "paddle/phi/core/distributed/comm_context.h"
#include "paddle/phi/core/macros.h"
......@@ -30,9 +40,27 @@ namespace distributed {
class NCCLCommContext final : public CommContext {
public:
NCCLCommContext(int rank, int size, ncclUniqueId nccl_id);
~NCCLCommContext() {}
ncclComm_t GetNcclComm();
gpuStream_t GetStream();
gpuEvent_t GetComputeEvent();
void SetComputeEvent(
std::shared_ptr<std::remove_pointer<phi::gpuEvent_t>::type>&&
compute_event);
gpuEvent_t GetCommEvent();
void SetCommEvent(
std::shared_ptr<std::remove_pointer<phi::gpuEvent_t>::type>&& comm_event);
phi::GPUContext* GetDevContext();
void SetDevContext(std::unique_ptr<phi::GPUContext>&& dev_ctx);
void Broadcast(phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
int root,
......@@ -75,6 +103,14 @@ class NCCLCommContext final : public CommContext {
DISABLE_COPY_AND_ASSIGN(NCCLCommContext);
ncclComm_t nccl_comm_;
std::unique_ptr<phi::GPUContext> dev_ctx_;
// used for comm wait compute, compute_stream-->event-->comm_stream
std::shared_ptr<std::remove_pointer<phi::gpuEvent_t>::type> compute_event_;
// used for compute wait comm, comm_stream-->event-->compute_stream
std::shared_ptr<std::remove_pointer<phi::gpuEvent_t>::type> comm_event_;
};
} // namespace distributed
......
......@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License
import os
from collections import OrderedDict
import paddle
......@@ -146,9 +147,6 @@ class ProcessGroup:
global_rank = genv.rank
if self.nranks >= 2 and global_rank in self.ranks:
logger.info(
f"group_id: {self.id}, ranks: {self.ranks}, nranks: {self.nranks}, trainer_endpoints: {genv.current_endpoint}"
)
strategy = core.ParallelStrategy()
strategy.nranks = self.nranks
strategy.local_rank = self.local_rank(global_rank)
......@@ -159,6 +157,19 @@ class ProcessGroup:
strategy.nrings = 1
if core.is_compiled_with_cuda():
place = core.CUDAPlace(genv.device_id)
use_new_comm = os.getenv(
"FLAGS_dynamic_static_unified_comm", "0"
)
if use_new_comm in ["1", "True", "true"]:
store = core.create_or_get_global_tcp_store()
core.CommContextManager.set_device_id(genv.device_id)
core.CommContextManager.create_nccl_comm_context(
store,
str(ring_id),
strategy.local_rank,
strategy.nranks,
)
else:
core.NCCLParallelContext(strategy, place).init_with_ring_id(
ring_id
)
......
......@@ -330,7 +330,7 @@ def _init_parallel_env(backend):
store, "0", rank, world_size
)
elif backend == "nccl":
core.CommContextManager.set_cuda_device_id(dev_id)
core.CommContextManager.set_device_id(dev_id)
core.CommContextManager.create_nccl_comm_context(
store, "0", rank, world_size
)
......
......@@ -1084,13 +1084,7 @@ def init_parallel_env():
master_port = int(master_port)
is_master = rank == 0
stop_check_timeout = int(os.getenv("FLAGS_stop_check_timeout", "900"))
default_store = core.TCPStore(
master_addr,
master_port,
is_master,
world_size,
timeout=stop_check_timeout,
)
default_store = core.create_or_get_global_tcp_store()
_set_default_store(default_store)
pg = _new_process_group_impl(
backend,
......@@ -1108,6 +1102,11 @@ def init_parallel_env():
_add_new_group(group)
parallel_helper._set_parallel_ctx(True)
# barrier will call CreateNCCLEnvCache which will call CreateNCCLCommContext.
# Set device_id to prevent creating null dev_ctx.
# TODO(mine): support XPU and other backends.
if backend in ["nccl", 'xccl', 'bkcl']:
core.CommContextManager.set_device_id(parallel_env.device_id)
paddle.distributed.barrier(group=group)
return group
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册