diff --git a/paddle/fluid/distributed/collective/ProcessGroup.h b/paddle/fluid/distributed/collective/ProcessGroup.h index 795a1a91b5235f462735ff65d5d3f8e37db2d144..09be2ca5e8788e7be31da38780df48ddefb5e225 100644 --- a/paddle/fluid/distributed/collective/ProcessGroup.h +++ b/paddle/fluid/distributed/collective/ProcessGroup.h @@ -244,26 +244,12 @@ class ProcessGroup { "ProcessGroup%s does not support send", GetBackendName())); } - virtual std::shared_ptr Send( - std::vector&, int, bool) { // NOLINT - PADDLE_THROW(platform::errors::InvalidArgument( - "ProcessGroup%s does not support send with sync_op flag", - GetBackendName())); - } - virtual std::shared_ptr Recv( std::vector&, int) { // NOLINT PADDLE_THROW(platform::errors::InvalidArgument( "ProcessGroup%s does not support recv", GetBackendName())); } - virtual std::shared_ptr Recv( - std::vector&, int, bool) { // NOLINT - PADDLE_THROW(platform::errors::InvalidArgument( - "ProcessGroup%s does not support recv with sync_op flag", - GetBackendName())); - } - virtual std::shared_ptr AllGather( std::vector&, // NOLINT std::vector&) { // NOLINT @@ -287,14 +273,6 @@ class ProcessGroup { "ProcessGroup%s does not support AllToAll", GetBackendName())); } - virtual std::shared_ptr AllToAll( - std::vector&, // NOLINT - std::vector&, // NOLINT - bool) { - PADDLE_THROW(platform::errors::InvalidArgument( - "ProcessGroup%s does not support alltoall", GetBackendName())); - } - virtual std::shared_ptr Reduce( std::vector&, // NOLINT std::vector&, // NOLINT diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc index 74ebf80205964dde7e180156c43d8172437684bb..3c7bc0ec8429f4feef1abac7c0c8a8328ccc8ee5 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc @@ -626,35 +626,6 @@ std::shared_ptr ProcessGroupNCCL::PointToPoint( return task; } -void ProcessGroupNCCL::CheckSplitSizes(std::vector* split_sizes, - std::vector tensor_shape) { - int64_t len_size = (*split_sizes).size(); - if (len_size == 0) { - PADDLE_ENFORCE_EQ(tensor_shape[0] % size_ == 0, - true, - platform::errors::InvalidArgument( - "Tensor's dim[0] must be divisible by group size " - "when split_sizes not given.")); - (*split_sizes) - .insert((*split_sizes).end(), - size_, - static_cast(tensor_shape[0] / size_)); - } else { - PADDLE_ENFORCE_EQ( - len_size == size_, - true, - platform::errors::InvalidArgument( - "The length of split_sizes must be equal to group size.")); - auto sum_size = std::accumulate( - (*split_sizes).begin(), (*split_sizes).end(), static_cast(0)); - PADDLE_ENFORCE_EQ( - sum_size == tensor_shape[0], - true, - platform::errors::InvalidArgument( - "The sum of split_sizes must be equal to tensor's dim[0].")); - } -} - // TODO(sunyilun): methods below will be removed later void SyncDefaultStream(const std::vector& places, platform::DeviceEvent& nccl_event, // NOLINT @@ -676,17 +647,6 @@ std::shared_ptr ProcessGroupNCCL::CreateTask( places, rank, comm_type, inputs); } -std::shared_ptr ProcessGroupNCCL::CreateTask( - const std::vector& places, - int rank, - CommType comm_type, - const std::vector& inputs, - bool is_sync, - bool use_calc_stream) { - return std::make_shared( - places, rank, comm_type, inputs, is_sync, use_calc_stream); -} - ProcessGroupNCCL::NCCLTask::NCCLTask( const std::vector& places, int rank, @@ -696,17 +656,6 @@ ProcessGroupNCCL::NCCLTask::NCCLTask( comm_event_(places[0]), task_place_(places[0]) {} -ProcessGroupNCCL::NCCLTask::NCCLTask( - const std::vector& places, - int rank, - CommType comm_type, - const std::vector& inputs, - bool sync_op, - bool use_calc_stream) - : TaskStream(rank, inputs, comm_type, sync_op, use_calc_stream), - comm_event_(places[0]), - task_place_(places[0]) {} - // create NCCLManager cache for places_key void ProcessGroupNCCL::CreateNCCLManagerCache( const std::string& places_key, const std::vector& places) { @@ -759,85 +708,6 @@ void ProcessGroupNCCL::CreateNCCLManagerCache( places_to_ctx_.emplace(places_key, std::move(dev_ctx_raw)); } -template -std::shared_ptr ProcessGroupNCCL::Collective( - std::vector& inputs, - std::vector& outputs, - Fn fn, - CommType comm_type, - bool sync_op, - bool use_calc_stream) { - const auto& places = GetPlaceList(inputs); - const auto& key = GetKeyFromPlaces(places); - - { - std::lock_guard lock(mutex_); - if (place_to_comm_ctx_.find(key) == place_to_comm_ctx_.end()) { - CreateNCCLManagerCache(key, places); - } - } - - if (!use_calc_stream) { - SyncDefaultStream( - places, place_to_calc_event_.at(key), places_to_ctx_.at(key)); - } - - auto task = - CreateTask(places, rank_, comm_type, inputs, sync_op, use_calc_stream); - - platform::CUDADeviceGuard cuda_guard; - - { - platform::NCCLGroupGuard nccl_guard; - for (size_t i = 0; i < inputs.size(); ++i) { - cuda_guard.SetDevice(places[i]); - - gpuStream_t nccl_stream; - if (use_calc_stream) { - nccl_stream = - static_cast( - platform::DeviceContextPool::Instance().Get(places[i])) - ->stream(); - } else { - nccl_stream = places_to_ctx_.at(key)[i]->stream(); - } - - fn(inputs[i], - outputs[i], - places_to_ctx_.at(key)[i]->nccl_comm(), - nccl_stream); - } - } - - if (FLAGS_use_stream_safe_cuda_allocator) { - for (size_t i = 0; i < inputs.size(); ++i) { - cuda_guard.SetDevice(places[i]); - - gpuStream_t nccl_stream; - if (use_calc_stream) { - nccl_stream = - static_cast( - platform::DeviceContextPool::Instance().Get(places[i])) - ->stream(); - } else { - nccl_stream = places_to_ctx_.at(key)[i]->stream(); - } - - memory::RecordStream(inputs[i].Holder(), nccl_stream); - } - } - - // Adding stream event dependency only when use comm stream - if (!use_calc_stream) { - for (size_t i = 0; i < inputs.size(); ++i) { - cuda_guard.SetDevice(places[i]); - task->UpdateWaitChain(*places_to_ctx_.at(key)[i]); - } - } - - return task; -} - template std::shared_ptr ProcessGroupNCCL::Collective( std::vector& inputs, @@ -889,117 +759,6 @@ std::shared_ptr ProcessGroupNCCL::Collective( return task; } -template -void ProcessGroupNCCL::Collective(const phi::DenseTensor* in, - phi::DenseTensor* out, - Fn fn, - CommType op_type) { - std::vector places; - places.push_back(in->place()); - const std::string& key = GetKeyFromPlaces(places); - - { - std::lock_guard lock(mutex_); - if (place_to_comm_ctx_.find(key) == place_to_comm_ctx_.end()) { - CreateNCCLManagerCache(key, places); - } - } - - SyncDefaultStream( - places, place_to_calc_event_.at(key), places_to_ctx_.at(key)); - - // construct uninitialize guard for device - platform::CUDADeviceGuard cuda_guard; - - if (FLAGS_use_stream_safe_cuda_allocator) { - cuda_guard.SetDevice(places[0]); - memory::RecordStream(in->Holder(), places_to_ctx_.at(key)[0]->stream()); - } - - { - platform::NCCLGroupGuard nccl_guard; - cuda_guard.SetDevice(places[0]); - const auto& nccl_stream = places_to_ctx_.at(key)[0]->stream(); - fn(in, out, places_to_ctx_.at(key)[0]->nccl_comm(), nccl_stream); - } - - cuda_guard.SetDevice(places[0]); -} - -template -std::shared_ptr ProcessGroupNCCL::PointToPoint( - std::vector& tensors, - Fn fn, - int dst_rank, - CommType op_type, - bool sync_op, - bool use_calc_stream) { - const auto& places = GetPlaceList(tensors); - const auto& key = GetKeyFromPlaces(places); - - { - std::lock_guard lock(mutex_); - if (place_to_comm_ctx_.find(key) == place_to_comm_ctx_.end()) { - CreateNCCLManagerCache(key, places); - } - } - - if (!use_calc_stream) { - SyncDefaultStream( - places, place_to_calc_event_.at(key), places_to_ctx_.at(key)); - } - - auto task = - CreateTask(places, rank_, op_type, tensors, sync_op, use_calc_stream); - - platform::CUDADeviceGuard cuda_guard; - - { - platform::NCCLGroupGuard nccl_guard; - for (size_t i = 0; i < tensors.size(); ++i) { - cuda_guard.SetDevice(places[i]); - gpuStream_t nccl_stream; - if (use_calc_stream) { - nccl_stream = - static_cast( - platform::DeviceContextPool::Instance().Get(places[i])) - ->stream(); - } else { - nccl_stream = places_to_ctx_.at(key)[i]->stream(); - } - fn(tensors[i], - places_to_ctx_.at(key)[i]->nccl_comm(), - nccl_stream, - dst_rank); - } - } - - if (FLAGS_use_stream_safe_cuda_allocator) { - for (size_t i = 0; i < tensors.size(); ++i) { - cuda_guard.SetDevice(places[i]); - gpuStream_t nccl_stream; - if (use_calc_stream) { - nccl_stream = - static_cast( - platform::DeviceContextPool::Instance().Get(places[i])) - ->stream(); - } else { - nccl_stream = places_to_ctx_.at(key)[i]->stream(); - } - memory::RecordStream(tensors[i].Holder(), nccl_stream); - } - } - - if (!use_calc_stream) { - for (size_t i = 0; i < tensors.size(); ++i) { - cuda_guard.SetDevice(places[i]); - task->UpdateWaitChain(*places_to_ctx_.at(key)[i]); - } - } - - return task; -} - template std::shared_ptr ProcessGroupNCCL::PointToPoint( std::vector& tensors, @@ -1290,52 +1049,6 @@ std::shared_ptr ProcessGroupNCCL::AllToAll( CommType::ALLTOALL); } -std::shared_ptr ProcessGroupNCCL::AllToAll( - std::vector& in_tensors, - std::vector& out_tensors, - bool sync_op, - bool use_calc_stream) { - PADDLE_ENFORCE_EQ( - CheckTensorsInCudaPlace(in_tensors), - true, - platform::errors::InvalidArgument("All inputs should be in CudaPlace.")); - PADDLE_ENFORCE_EQ( - CheckTensorsInCudaPlace(out_tensors), - true, - platform::errors::InvalidArgument("All inputs should be in CudaPlace.")); - return Collective( - in_tensors, - out_tensors, - [&](phi::DenseTensor& input, - phi::DenseTensor& output, - ncclComm_t comm, - const gpuStream_t& stream) { - size_t offset = 0; - GroupStart(); - for (auto i = 0; i < size_; i++) { - PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend( - GetPointerByOffset(input.data(), offset, input.dtype()), - input.numel() / size_, - platform::ToNCCLDataType(input.dtype()), - i, - comm, - stream)); - PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclRecv( - GetPointerByOffset(output.data(), offset, input.dtype()), - input.numel() / size_, - platform::ToNCCLDataType(input.dtype()), - i, - comm, - stream)); - offset += input.numel() / size_; - } - GroupEnd(); - }, - CommType::ALLTOALL, - sync_op, - use_calc_stream); -} - std::shared_ptr ProcessGroupNCCL::Reduce( std::vector& in_tensors, std::vector& out_tensors, diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h index c10c4370b4b23eded8c721e9d2914c4fbe2c0e4a..a52e5e61cd29559f2e9fe3e079ccd9a028c28d83 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h @@ -68,12 +68,6 @@ class ProcessGroupNCCL final : public ProcessGroupStream { int rank, CommType CommType, const std::vector& inputs); - NCCLTask(const std::vector& places, - int rank, - CommType comm_type, - const std::vector& inputs, - bool sync_op, - bool use_calc_stream); private: bool block_cpu_in_wait_{false}; @@ -192,12 +186,6 @@ class ProcessGroupNCCL final : public ProcessGroupStream { std::vector& in_tensors, std::vector& out_tensors) override; - std::shared_ptr AllToAll( - std::vector& in_tensors, - std::vector& out_tensors, - bool sync_op, - bool use_calc_stream) override; - std::shared_ptr Reduce( std::vector& tensors, std::vector& out_tensors, @@ -245,14 +233,6 @@ class ProcessGroupNCCL final : public ProcessGroupStream { CommType op_type, const std::vector& inputs); - std::shared_ptr CreateTask( - const std::vector& places, - int rank, - CommType op_type, - const std::vector& inputs, - bool sync_op, - bool use_calc_stream); - template std::shared_ptr Collective( std::vector& inputs, // NOLINT @@ -260,21 +240,6 @@ class ProcessGroupNCCL final : public ProcessGroupStream { Fn fn, CommType op_type); - template - std::shared_ptr Collective( - std::vector& inputs, // NOLINT - std::vector& outputs, // NOLINT - Fn fn, - CommType comm_type, - bool sync_op, - bool use_calc_stream); - - template - void Collective(const phi::DenseTensor*, - phi::DenseTensor*, - Fn fn, - CommType op_type); - template std::shared_ptr PointToPoint( std::vector& tensors, // NOLINT @@ -282,21 +247,9 @@ class ProcessGroupNCCL final : public ProcessGroupStream { int dst_rank, CommType op_type); - template - std::shared_ptr PointToPoint( - std::vector& tensors, // NOLINT - Fn fn, - int dst_rank, - CommType op_type, - bool sync_op, - bool use_calc_stream); - void CreateNCCLManagerCache(const std::string& places_key, const std::vector& places); - void CheckSplitSizes(std::vector* split_sizes, - std::vector tensor_shape); - private: std::shared_ptr store_; std::unordered_map diff --git a/paddle/fluid/distributed/collective/ProcessGroupStream.cc b/paddle/fluid/distributed/collective/ProcessGroupStream.cc index 9f7b3c1964e2373ae4c0c4f9a8eb0d76c4b74685..cd1e617a89e4cc70626517fb97ae904c3ee42429 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupStream.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupStream.cc @@ -236,42 +236,5 @@ std::shared_ptr ProcessGroupStream::Send( "ProcessGroup%s does not support send.", GetBackendName())); } -// TODO(sunyilun): methods below will be removed later -std::shared_ptr ProcessGroupStream::AllToAll( - std::vector& in_tensors, - std::vector& out_tensors, - bool sync_op) { - return AllToAll(in_tensors, - out_tensors, - sync_op, - /*use_calc_stream*/ false); -} - -std::shared_ptr ProcessGroupStream::AllToAll( - std::vector& in_tensors, - std::vector& out_tensors, - bool sync_op, - bool use_calc_stream) { - PADDLE_THROW(platform::errors::InvalidArgument( - "ProcessGroup%s does not support do alltoall", GetBackendName())); -} - -std::shared_ptr ProcessGroupStream::Recv( - std::vector& tensors, int src_rank, bool sync_op) { - return Recv(tensors, - src_rank, - sync_op, - /*use_calc_stream*/ false); -} - -std::shared_ptr ProcessGroupStream::Recv( - std::vector& tensors, - int src_rank, - bool sync_op, - bool use_calc_stream) { - PADDLE_THROW(platform::errors::InvalidArgument( - "ProcessGroup%s does not support do recv", GetBackendName())); -} - } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/collective/ProcessGroupStream.h b/paddle/fluid/distributed/collective/ProcessGroupStream.h index d1fd95953f1f021130f004b84dc4c9a131157b85..be76429580d100ead1f455f73e41309cbe4a46c8 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupStream.h +++ b/paddle/fluid/distributed/collective/ProcessGroupStream.h @@ -179,29 +179,6 @@ class ProcessGroupStream : public ProcessGroup { int64_t numel, bool sync_op, bool use_calc_stream); - - // TODO(sunyilun): methods below will be removed later - std::shared_ptr AllToAll( - std::vector& in_tensors, // NOLINT - std::vector& out_tensors, // NOLINT - bool sync_op) override; - - virtual std::shared_ptr AllToAll( - std::vector& in_tensors, // NOLINT - std::vector& out_tensors, // NOLINT - bool sync_op, - bool use_calc_stream); - - std::shared_ptr Recv( - std::vector& tensors, // NOLINT - int src_rank, - bool sync_op) override; - - virtual std::shared_ptr Recv( - std::vector& tensors, // NOLINT - int src_rank, - bool sync_op, - bool use_calc_stream); }; } // namespace distributed