From 26a83ed1e498155fda71ba24960ddb9d8ef56b3e Mon Sep 17 00:00:00 2001 From: sneaxiy Date: Tue, 5 Sep 2023 20:06:50 +0800 Subject: [PATCH] hack event --- .../collective/process_group_nccl.cc | 112 +++++++++++++++++- .../collective/process_group_nccl.h | 21 ++++ paddle/fluid/pybind/cuda_streams_py.cc | 1 + paddle/fluid/pybind/distributed_py.cc | 60 +++++++++- paddle/phi/api/profiler/event.h | 7 ++ paddle/phi/backends/event.cc | 8 ++ paddle/phi/backends/event.h | 1 + paddle/phi/core/flags.cc | 4 + .../paddle/distributed/communication/group.py | 3 + .../pp_utils/p2p_communication.py | 2 + 10 files changed, 215 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/distributed/collective/process_group_nccl.cc b/paddle/fluid/distributed/collective/process_group_nccl.cc index effd61a3b50..0fd1f6dfc28 100644 --- a/paddle/fluid/distributed/collective/process_group_nccl.cc +++ b/paddle/fluid/distributed/collective/process_group_nccl.cc @@ -27,6 +27,7 @@ DECLARE_bool(benchmark); DECLARE_bool(nccl_blocking_wait); DECLARE_bool(use_stream_safe_cuda_allocator); +DECLARE_bool(enable_process_group_event_record); // set this flag to `true` and recompile to enable dynamic checks constexpr bool FLAGS_enable_nccl_dynamic_check = false; @@ -94,7 +95,96 @@ ProcessGroupNCCL::ProcessGroupNCCL( int rank, int size, int gid) - : ProcessGroupWithStream(rank, size, gid), store_(store) {} + : ProcessGroupWithStream(rank, size, gid), store_(store) { + events_.resize(phi::backends::gpu::GetGPUDeviceCount()); +} + +ProcessGroupNCCL::~ProcessGroupNCCL() { + for (const auto& e : events_) { + for (const auto& p : e.events) { + cudaEventDestroy(p.first); + cudaEventDestroy(p.second); + } + } +} + +std::vector ProcessGroupNCCL::GetEventTimeAndRelease() { + auto dev_id = phi::backends::gpu::GetCurrentDeviceId(); + auto& e = events_[dev_id]; + std::vector times; + times.reserve(e.length); + for (size_t i = 0; i < e.length; ++i) { + PADDLE_ENFORCE_GPU_SUCCESS(cudaEventSynchronize(e.events[i].second)); + float ms = 0.0f; + PADDLE_ENFORCE_GPU_SUCCESS( + cudaEventElapsedTime(&ms, e.events[i].first, e.events[i].second)); + times.push_back(ms); + } + e.length = 0; + return times; +} + +gpuEvent_t ProcessGroupNCCL::RecordStartEventOnCalcStream() { + if (!FLAGS_enable_process_group_event_record) { + return nullptr; + } + + auto dev_id = phi::backends::gpu::GetCurrentDeviceId(); + auto stream = static_cast( + GetDeviceContext(phi::GPUPlace(dev_id), true)) + ->stream(); + return RecordStartEvent(stream); +} + +void ProcessGroupNCCL::RecordEndEventOnCalcStream(gpuEvent_t event) { + if (event == nullptr) { + return; + } + + auto dev_id = phi::backends::gpu::GetCurrentDeviceId(); + auto stream = static_cast( + GetDeviceContext(phi::GPUPlace(dev_id), true)) + ->stream(); + RecordEndEvent(event, stream); +} + +gpuEvent_t ProcessGroupNCCL::RecordStartEvent(gpuStream_t stream) { + if (!FLAGS_enable_process_group_event_record) { + return nullptr; + } + + if (s_group_call_counter > 0) { + return nullptr; + } + + auto dev_id = phi::backends::gpu::GetCurrentDeviceId(); + gpuEvent_t start_event, end_event; + auto& e = events_[dev_id]; + if (e.events.size() <= e.length) { + VLOG(10) << "Create new events when cached event pair number is " + << e.events.size() << " , and used event pair number is " + << e.length; + e.events.resize(e.events.size() + 1); + auto& p = e.events[e.length++]; + PADDLE_ENFORCE_GPU_SUCCESS(cudaEventCreate(&p.first)); + PADDLE_ENFORCE_GPU_SUCCESS(cudaEventCreate(&p.second)); + start_event = p.first; + end_event = p.second; + } else { + start_event = e.events[e.length].first; + end_event = e.events[e.length].second; + ++e.length; + } + + PADDLE_ENFORCE_GPU_SUCCESS(cudaEventRecord(start_event, stream)); + return end_event; +} + +void ProcessGroupNCCL::RecordEndEvent(gpuEvent_t event, gpuStream_t stream) { + if (event != nullptr) { + PADDLE_ENFORCE_GPU_SUCCESS(cudaEventRecord(event, stream)); + } +} void ProcessGroupNCCL::GroupStart() { NCCL_CHECK(phi::dynload::ncclGroupStart()); @@ -228,6 +318,7 @@ std::shared_ptr ProcessGroupNCCL::AllReduce( << ", sync_op: " << sync_op << ", use_calc_stream: " << use_calc_stream; + auto event = RecordStartEvent(stream); NCCL_CHECK( phi::dynload::ncclAllReduce(in_tensor.data(), out_tensor->data(), @@ -236,6 +327,7 @@ std::shared_ptr ProcessGroupNCCL::AllReduce( ToNCCLRedType(opts.reduce_op), comm, stream)); + RecordEndEvent(event, stream); }, in_tensor, CommType::ALLREDUCE, @@ -310,6 +402,7 @@ std::shared_ptr ProcessGroupNCCL::AllToAll( << ", sync_op: " << sync_op << ", use_calc_stream: " << use_calc_stream; + auto event = RecordStartEvent(stream); GroupStart(); for (auto i = 0; i < size_; i++) { in_numel = in_size_each_rank[i] * in_row_size; @@ -335,6 +428,7 @@ std::shared_ptr ProcessGroupNCCL::AllToAll( out_offset += out_numel; } GroupEnd(); + RecordEndEvent(event, stream); }, in_tensor, CommType::ALLTOALL, @@ -396,6 +490,7 @@ std::shared_ptr ProcessGroupNCCL::Broadcast( << ", nranks: " << size_ << ", sync_op: " << sync_op << ", use_calc_stream: " << use_calc_stream; + auto event = RecordStartEvent(stream); NCCL_CHECK( phi::dynload::ncclBroadcast(in_tensor.data(), out_tensor->data(), @@ -404,6 +499,7 @@ std::shared_ptr ProcessGroupNCCL::Broadcast( root, comm, stream)); + RecordEndEvent(event, stream); }, in_tensor, CommType::BROADCAST, @@ -444,6 +540,7 @@ std::shared_ptr ProcessGroupNCCL::Reduce( << ", nranks: " << size_ << ", sync_op: " << sync_op << ", use_calc_stream: " << use_calc_stream; + auto event = RecordStartEvent(stream); NCCL_CHECK( phi::dynload::ncclReduce(in_tensor.data(), out_tensor->data(), @@ -453,6 +550,7 @@ std::shared_ptr ProcessGroupNCCL::Reduce( opts.root_rank, comm, stream)); + RecordEndEvent(event, stream); }, in_tensor, CommType::REDUCE, @@ -492,6 +590,7 @@ std::shared_ptr ProcessGroupNCCL::ReduceScatter( << ", sync_op: " << sync_op << ", use_calc_stream: " << use_calc_stream; + auto event = RecordStartEvent(stream); NCCL_CHECK(phi::dynload::ncclReduceScatter( in_tensor.data(), out_tensor->data(), @@ -500,6 +599,7 @@ std::shared_ptr ProcessGroupNCCL::ReduceScatter( ToNCCLRedType(opts.reduce_op), comm, stream)); + RecordEndEvent(event, stream); }, in_tensor, CommType::REDUCE_SCATTER, @@ -543,6 +643,7 @@ std::shared_ptr ProcessGroupNCCL::Scatter( if (rank_ == opts.root_rank) { int64_t offset = 0; phi::DenseTensor partial_tensor; + auto event = RecordStartEvent(stream); GroupStart(); for (auto i = 0; i < size_; i++) { partial_tensor = GetPartialTensor(in_tensor, offset, numel); @@ -563,7 +664,9 @@ std::shared_ptr ProcessGroupNCCL::Scatter( comm, stream)); GroupEnd(); + RecordEndEvent(event, stream); } else { + auto event = RecordStartEvent(stream); NCCL_CHECK( phi::dynload::ncclRecv(out_tensor->data(), numel, @@ -571,6 +674,7 @@ std::shared_ptr ProcessGroupNCCL::Scatter( opts.root_rank, comm, stream)); + RecordEndEvent(event, stream); } }, in_tensor, @@ -627,6 +731,7 @@ std::shared_ptr ProcessGroupNCCL::Gather( << ", nranks: " << size_ << ", sync_op: " << sync_op << ", use_calc_stream: " << use_calc_stream; + auto event = RecordStartEvent(stream); GroupStart(); // root receive from all devices if (rank_ == opts.root_rank) { @@ -649,6 +754,7 @@ std::shared_ptr ProcessGroupNCCL::Gather( comm, stream)); GroupEnd(); + RecordEndEvent(event, stream); }; return Collective( gather_func, in_tensor, CommType::GATHER, sync_op, use_calc_stream); @@ -688,12 +794,14 @@ std::shared_ptr ProcessGroupNCCL::Recv( << ", sync_op: " << sync_op << ", use_calc_stream: " << use_calc_stream; + auto event = RecordStartEvent(stream); NCCL_CHECK(phi::dynload::ncclRecv(tensor->data(), tensor->numel(), phi::ToNCCLDataType(tensor->dtype()), rank_in_group, comm, stream)); + RecordEndEvent(event, stream); }, src_rank, *tensor, @@ -735,6 +843,7 @@ std::shared_ptr ProcessGroupNCCL::Send( << ", sync_op: " << sync_op << ", use_calc_stream: " << use_calc_stream; + auto event = RecordStartEvent(stream); NCCL_CHECK(phi::dynload::ncclSend( tensor_maybe_partial.data(), tensor_maybe_partial.numel(), @@ -742,6 +851,7 @@ std::shared_ptr ProcessGroupNCCL::Send( rank_in_group, comm, stream)); + RecordEndEvent(event, stream); }, dst_rank, tensor_maybe_partial, diff --git a/paddle/fluid/distributed/collective/process_group_nccl.h b/paddle/fluid/distributed/collective/process_group_nccl.h index 7fff7878dec..963844498c4 100644 --- a/paddle/fluid/distributed/collective/process_group_nccl.h +++ b/paddle/fluid/distributed/collective/process_group_nccl.h @@ -77,6 +77,8 @@ class ProcessGroupNCCL final : public ProcessGroupWithStream { int size, int gid); + ~ProcessGroupNCCL(); + std::string GetBackendName() const override { return "NCCL"; } phi::DeviceContext* GetDeviceContext(const Place& place) const override; @@ -169,6 +171,8 @@ class ProcessGroupNCCL final : public ProcessGroupWithStream { ncclComm_t NCCLComm(const Place& place) const; + std::vector GetEventTimeAndRelease(); + private: std::shared_ptr CreateTask(const Place& place, int rank, @@ -203,6 +207,16 @@ class ProcessGroupNCCL final : public ProcessGroupWithStream { bool sync_op, bool use_calc_stream); + public: + gpuEvent_t RecordStartEventOnCalcStream(); + + void RecordEndEventOnCalcStream(gpuEvent_t event); + + private: + gpuEvent_t RecordStartEvent(gpuStream_t stream); + + void RecordEndEvent(gpuEvent_t event, gpuStream_t stream); + private: std::shared_ptr store_; @@ -212,6 +226,13 @@ class ProcessGroupNCCL final : public ProcessGroupWithStream { std::unordered_map> place_to_comm_ctx_; + struct Events { + std::vector> events; + size_t length{0}; + }; + + std::vector events_; + // TODO(sunyilun): attrs below will be removed later std::mutex mutex_; static uint64_t s_group_call_counter; diff --git a/paddle/fluid/pybind/cuda_streams_py.cc b/paddle/fluid/pybind/cuda_streams_py.cc index 41202daa9c5..c15fde9c9b1 100644 --- a/paddle/fluid/pybind/cuda_streams_py.cc +++ b/paddle/fluid/pybind/cuda_streams_py.cc @@ -391,6 +391,7 @@ void BindCudaStream(py::module *m_ptr) { event.synchronize() )DOC") + .def("elapsed_time", &paddle::platform::CudaEvent::ElapsedTime) #endif .def( "__init__", diff --git a/paddle/fluid/pybind/distributed_py.cc b/paddle/fluid/pybind/distributed_py.cc index 7273bc8d84e..9f58967e02a 100644 --- a/paddle/fluid/pybind/distributed_py.cc +++ b/paddle/fluid/pybind/distributed_py.cc @@ -1224,7 +1224,24 @@ void BindDistributed(py::module *m) { py::arg("src"), py::arg("num"), py::arg("id"), - py::call_guard()); + py::call_guard()) + .def("_record_start_event_on_calc_stream", + [](distributed::ProcessGroup &self) -> uintptr_t { + PADDLE_THROW(phi::errors::Unimplemented( + "Unsupported _record_start_event_on_calc_stream method.")); + }) + .def("_record_end_event_on_calc_stream", + [](distributed::ProcessGroupNCCL &self, uintptr_t event) { + PADDLE_THROW(phi::errors::Unimplemented( + "Unsupported _record_end_event_on_calc_stream method.")); + }) + .def( + "_get_event_time_and_release", + [](distributed::ProcessGroup &self, bool accumulate) { + PADDLE_THROW(phi::errors::Unimplemented( + "Unsupported _get_event_time_and_release method.")); + }, + py::arg("accumulate") = true); #if defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL) py::class_()) .def_static("group_start", distributed::ProcessGroupNCCL::GroupStart) - .def_static("group_end", distributed::ProcessGroupNCCL::GroupEnd); - + .def_static("group_end", distributed::ProcessGroupNCCL::GroupEnd) + .def( + "_record_start_event_on_calc_stream", + [](distributed::ProcessGroupNCCL &self) -> uintptr_t { + return reinterpret_cast( + self.RecordStartEventOnCalcStream()); + }, + py::call_guard()) + .def( + "_record_end_event_on_calc_stream", + [](distributed::ProcessGroupNCCL &self, uintptr_t event) { + return self.RecordEndEventOnCalcStream( + reinterpret_cast(event)); + }, + py::call_guard()) + .def( + "_get_event_time_and_release", + [](distributed::ProcessGroupNCCL &self, + bool accumulate) -> py::object { + std::vector times; + double total_ms = 0.0; + { + py::gil_scoped_release release; + times = self.GetEventTimeAndRelease(); + if (accumulate) { + total_ms = std::accumulate(times.begin(), times.end(), 0.0); + } + } + if (accumulate) { + return py::cast(total_ms); + } else { + py::list obj(times.size()); + for (size_t i = 0; i < times.size(); ++i) { + obj[i] = py::cast(times[i]); + } + return obj; + } + }, + py::arg("accumulate") = true); #endif #if defined(PADDLE_WITH_MPI) diff --git a/paddle/phi/api/profiler/event.h b/paddle/phi/api/profiler/event.h index b19f2048522..4078d460eeb 100644 --- a/paddle/phi/api/profiler/event.h +++ b/paddle/phi/api/profiler/event.h @@ -205,6 +205,13 @@ class CudaEvent { } gpuEvent_t GetRawCudaEvent() { return event_; } + float ElapsedTime(const CudaEvent &end_event) const { + float ms; + PADDLE_ENFORCE_GPU_SUCCESS( + cudaEventElapsedTime(&ms, event_, end_event.event_)); + return ms; + } + private: #ifdef PADDLE_WITH_HIP unsigned int flags_ = hipEventDefault; diff --git a/paddle/phi/backends/event.cc b/paddle/phi/backends/event.cc index 371e858a3fe..96fe22ca638 100644 --- a/paddle/phi/backends/event.cc +++ b/paddle/phi/backends/event.cc @@ -62,6 +62,14 @@ bool Event::Query() const { return device_->QueryEvent(this); } void Event::Synchronize() const { device_->SynchronizeEvent(this); } +double Event::ElapsedTime(const Event& end_event) const { + auto s_event = static_cast(event_); + auto e_event = static_cast(end_event.event_); + float ms; + PADDLE_ENFORCE_GPU_SUCCESS(cudaEventElapsedTime(&ms, s_event, e_event)); + return ms; +} + const Place& Event::GetPlace() const { return place_; } } // namespace event diff --git a/paddle/phi/backends/event.h b/paddle/phi/backends/event.h index a58083ff289..2e4e7b7c3d4 100644 --- a/paddle/phi/backends/event.h +++ b/paddle/phi/backends/event.h @@ -47,6 +47,7 @@ class Event { void Record(const stream::Stream* stream); bool Query() const; void Synchronize() const; + double ElapsedTime(const Event& end_event) const; const Place& GetPlace() const; private: diff --git a/paddle/phi/core/flags.cc b/paddle/phi/core/flags.cc index c8da29d82b9..661858e0f44 100644 --- a/paddle/phi/core/flags.cc +++ b/paddle/phi/core/flags.cc @@ -1247,3 +1247,7 @@ PADDLE_DEFINE_EXPORTED_bool(use_shm_cache, PADDLE_DEFINE_EXPORTED_string(tensor_operants_mode, "eager", "Tensor operants mode"); + +PADDLE_DEFINE_EXPORTED_bool(enable_process_group_event_record, + false, + "Whether to enable process group event record."); diff --git a/python/paddle/distributed/communication/group.py b/python/paddle/distributed/communication/group.py index 5fff4440877..d71b5cff6c1 100644 --- a/python/paddle/distributed/communication/group.py +++ b/python/paddle/distributed/communication/group.py @@ -77,6 +77,9 @@ class Group: else: return -1 + def _get_event_time_and_release(self, accumulate=True): + return self._pg._get_event_time_and_release(accumulate) + def __repr__(self): debug_str = "rank: {}, nranks: {}, id: {}, ranks: ".format( self.rank, self.nranks, self.id diff --git a/python/paddle/distributed/fleet/meta_parallel/pp_utils/p2p_communication.py b/python/paddle/distributed/fleet/meta_parallel/pp_utils/p2p_communication.py index 9f8a032d588..024efa71729 100644 --- a/python/paddle/distributed/fleet/meta_parallel/pp_utils/p2p_communication.py +++ b/python/paddle/distributed/fleet/meta_parallel/pp_utils/p2p_communication.py @@ -278,6 +278,7 @@ def batch_send_recv_on_calc_stream(p2p_op_list): return group = _get_global_group() if group is None else group backend = group.backend + event = group.process_group._record_start_event_on_calc_stream() with _with_batch_p2p_guard(backend): for p2p_op in p2p_op_list: op = p2p_op.op @@ -287,6 +288,7 @@ def batch_send_recv_on_calc_stream(p2p_op_list): nranks = p2p_op.nranks rank_id = p2p_op.rank_id op(tensor, comm_group, peer, nranks, rank_id) + group.process_group._record_end_event_on_calc_stream(event) def _process_p2p_tuple_or_tensor( -- GitLab