未验证 提交 ce4775cd 编写于 作者: L LiYuRio 提交者: GitHub

add stream.all_reduce API and ProcessGroupStream (#45282)

上级 13a0ea4c
...@@ -2,10 +2,14 @@ cc_library( ...@@ -2,10 +2,14 @@ cc_library(
processgroup processgroup
SRCS ProcessGroup.cc SRCS ProcessGroup.cc
DEPS dense_tensor) DEPS dense_tensor)
cc_library(
processgroup_stream
SRCS ProcessGroupStream.cc
DEPS dense_tensor)
cc_library( cc_library(
eager_reducer eager_reducer
SRCS reducer.cc SRCS reducer.cc
DEPS eager_api processgroup phi_api string_helper) DEPS eager_api processgroup processgroup_stream phi_api string_helper)
if(WITH_DISTRIBUTE) if(WITH_DISTRIBUTE)
cc_library( cc_library(
...@@ -18,7 +22,12 @@ if(WITH_NCCL OR WITH_RCCL) ...@@ -18,7 +22,12 @@ if(WITH_NCCL OR WITH_RCCL)
cc_library( cc_library(
processgroup_nccl processgroup_nccl
SRCS ProcessGroupNCCL.cc NCCLTools.cc Common.cc SRCS ProcessGroupNCCL.cc NCCLTools.cc Common.cc
DEPS processgroup place enforce collective_helper device_context DEPS processgroup
processgroup_stream
place
enforce
collective_helper
device_context
dense_tensor) dense_tensor)
if(WITH_DISTRIBUTE AND WITH_PSCORE) if(WITH_DISTRIBUTE AND WITH_PSCORE)
if(CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0) if(CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
......
...@@ -18,10 +18,16 @@ namespace paddle { ...@@ -18,10 +18,16 @@ namespace paddle {
namespace distributed { namespace distributed {
ProcessGroup::Task::Task(int rank, ProcessGroup::Task::Task(int rank,
const std::vector<phi::DenseTensor>& inputTensors, const std::vector<phi::DenseTensor>& inputs,
CommType comm_type) CommType comm_type)
: rank_(rank), comm_type_(comm_type) {} : rank_(rank), comm_type_(comm_type) {}
ProcessGroup::Task::Task(int rank,
const std::vector<phi::DenseTensor>& inputs,
CommType comm_type,
bool sync_op)
: rank_(rank), comm_type_(comm_type), sync_op_(sync_op) {}
ProcessGroup::Task::~Task() = default; ProcessGroup::Task::~Task() = default;
bool ProcessGroup::Task::IsCompleted() { bool ProcessGroup::Task::IsCompleted() {
......
...@@ -55,19 +55,27 @@ class ProcessGroup { ...@@ -55,19 +55,27 @@ class ProcessGroup {
class Task { class Task {
public: public:
Task(int rank, Task(int rank,
const std::vector<phi::DenseTensor>& inputTensors, const std::vector<phi::DenseTensor>& inputs,
CommType opType = CommType::UNKNOWN); CommType comm_type);
Task(int rank,
const std::vector<phi::DenseTensor>& inputs,
CommType comm_type,
bool sync_op);
virtual ~Task(); virtual ~Task();
virtual bool IsCompleted(); virtual bool IsCompleted();
virtual bool Wait(std::chrono::milliseconds timeout = kWaitTimeout); virtual bool Wait(std::chrono::milliseconds timeout = kWaitTimeout);
virtual void Synchronize(); virtual void Synchronize();
bool IsSync() const { return sync_op_; }
protected: protected:
const int rank_; const int rank_;
CommType comm_type_; CommType comm_type_{CommType::UNKNOWN};
std::mutex mutex_; std::mutex mutex_;
bool is_completed_ = false; bool is_completed_{false};
private:
bool sync_op_{true};
}; };
explicit ProcessGroup(int rank, explicit ProcessGroup(int rank,
...@@ -82,6 +90,7 @@ class ProcessGroup { ...@@ -82,6 +90,7 @@ class ProcessGroup {
virtual const std::string GetBackendName() const = 0; virtual const std::string GetBackendName() const = 0;
// TODO(liyurui): This API will be moved later
virtual std::shared_ptr<ProcessGroup::Task> AllReduce( virtual std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
...@@ -90,6 +99,16 @@ class ProcessGroup { ...@@ -90,6 +99,16 @@ class ProcessGroup {
"ProcessGroup%s does not support allreduce", GetBackendName())); "ProcessGroup%s does not support allreduce", GetBackendName()));
} }
virtual std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
const AllreduceOptions&,
bool) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support allreduce with sync_op flag",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Broadcast( virtual std::shared_ptr<ProcessGroup::Task> Broadcast(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
......
...@@ -55,7 +55,20 @@ ProcessGroupNCCL::NCCLTask::NCCLTask( ...@@ -55,7 +55,20 @@ ProcessGroupNCCL::NCCLTask::NCCLTask(
int rank, int rank,
CommType CommType, CommType CommType,
const std::vector<phi::DenseTensor>& inputs) const std::vector<phi::DenseTensor>& inputs)
: Task(rank, inputs, CommType), places_(places) { : TaskStream(rank, inputs, CommType), places_(places) {
control_events_.resize(places.size());
ncclComms_.resize(places.size());
}
ProcessGroupNCCL::NCCLTask::NCCLTask(
const std::vector<Place>& places,
int rank,
CommType comm_type,
const std::vector<phi::DenseTensor>& inputs,
bool sync_op,
bool use_calc_stream)
: TaskStream(rank, inputs, comm_type, sync_op, use_calc_stream),
places_(places) {
control_events_.resize(places.size()); control_events_.resize(places.size());
ncclComms_.resize(places.size()); ncclComms_.resize(places.size());
} }
...@@ -116,6 +129,13 @@ void ProcessGroupNCCL::CheckSplitSizes(std::vector<int64_t>* split_sizes, ...@@ -116,6 +129,13 @@ void ProcessGroupNCCL::CheckSplitSizes(std::vector<int64_t>* split_sizes,
// TODO(sheniang03): Add timeout for wait, now timeout unused // TODO(sheniang03): Add timeout for wait, now timeout unused
bool ProcessGroupNCCL::NCCLTask::Wait(std::chrono::milliseconds timeout) { bool ProcessGroupNCCL::NCCLTask::Wait(std::chrono::milliseconds timeout) {
// Warning here when use calc stream but also invoke waiting explicitly.
if (UseCalcStream()) {
VLOG(3) << "Warning: The communication is on calc stream, wait here is "
"useless.";
return true;
}
SynchronizeStreams(); SynchronizeStreams();
if (FLAGS_nccl_blocking_wait) { if (FLAGS_nccl_blocking_wait) {
// NOTE(shenliang03): It will block host for sync // NOTE(shenliang03): It will block host for sync
...@@ -146,7 +166,7 @@ ProcessGroupNCCL::ProcessGroupNCCL(const std::shared_ptr<Store>& store, ...@@ -146,7 +166,7 @@ ProcessGroupNCCL::ProcessGroupNCCL(const std::shared_ptr<Store>& store,
int size, int size,
const platform::Place& place, const platform::Place& place,
int gid) int gid)
: ProcessGroup(rank, size, place, gid), store_(store) { : ProcessGroupStream(rank, size, place, gid), store_(store) {
platform::SetDeviceId(place_.device); platform::SetDeviceId(place_.device);
} }
...@@ -223,6 +243,81 @@ void ProcessGroupNCCL::CreateNCCLManagerCache( ...@@ -223,6 +243,81 @@ void ProcessGroupNCCL::CreateNCCLManagerCache(
places_to_ctx_.emplace(places_key, std::move(dev_ctx)); places_to_ctx_.emplace(places_key, std::move(dev_ctx));
} }
template <typename Fn>
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Collective(
std::vector<phi::DenseTensor>& inputs,
std::vector<phi::DenseTensor>& outputs,
Fn fn,
CommType comm_type,
bool sync_op,
bool use_calc_stream) {
const auto& places = GetPlaceList(inputs);
const auto& key = GetKeyFromPlaces(places);
{
std::lock_guard<std::mutex> lock(mutex_);
if (places_to_ncclcomm_.find(key) == places_to_ncclcomm_.end()) {
CreateNCCLManagerCache(key, places);
}
}
auto& nccl_comms = places_to_ncclcomm_[key];
SyncDefaultStream(places, places_to_events_[key], places_to_ctx_[key]);
auto task = std::make_shared<ProcessGroupNCCL::NCCLTask>(
places, rank_, comm_type, inputs, sync_op, use_calc_stream);
platform::CUDADeviceGuard cuda_guard;
{
platform::NCCLGroupGuard nccl_guard;
for (size_t i = 0; i < inputs.size(); ++i) {
cuda_guard.SetDevice(places[i]);
gpuStream_t nccl_stream;
if (use_calc_stream) {
nccl_stream =
static_cast<phi::GPUContext*>(
platform::DeviceContextPool::Instance().Get(places[i]))
->stream();
} else {
nccl_stream = places_to_ctx_[key][i]->stream();
}
fn(inputs[i], outputs[i], nccl_comms[i]->GetNcclComm(), nccl_stream);
}
}
if (FLAGS_use_stream_safe_cuda_allocator) {
for (size_t i = 0; i < inputs.size(); ++i) {
cuda_guard.SetDevice(places[i]);
gpuStream_t nccl_stream;
if (use_calc_stream) {
nccl_stream =
static_cast<phi::GPUContext*>(
platform::DeviceContextPool::Instance().Get(places[i]))
->stream();
} else {
nccl_stream = places_to_ctx_[key][i]->stream();
}
memory::RecordStream(inputs[i].Holder(), nccl_stream);
}
}
// Adding stream event dependency only when use comm stream
if (!use_calc_stream) {
for (size_t i = 0; i < inputs.size(); ++i) {
cuda_guard.SetDevice(places[i]);
task->control_events_[i].Record(*places_to_ctx_[key][i]);
}
}
return task;
}
template <typename Fn> template <typename Fn>
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Collective( std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Collective(
std::vector<phi::DenseTensor>& inputs, std::vector<phi::DenseTensor>& inputs,
...@@ -386,6 +481,37 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllReduce( ...@@ -386,6 +481,37 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllReduce(
CommType::ALLREDUCE); CommType::ALLREDUCE);
} }
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllReduce(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const AllreduceOptions& opts,
bool sync_op,
bool use_calc_stream) {
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(in_tensors),
true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
return Collective(
in_tensors,
out_tensors,
[&](const phi::DenseTensor& input,
phi::DenseTensor& output,
ncclComm_t comm,
const gpuStream_t& stream) {
return platform::dynload::ncclAllReduce(
input.data(),
output.data(),
input.numel(),
platform::ToNCCLDataType(input.type()),
ToNCCLRedType(opts.reduce_op),
comm,
stream);
},
CommType::ALLREDUCE,
sync_op,
use_calc_stream);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Broadcast( std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Broadcast(
std::vector<phi::DenseTensor>& in_tensors, std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors, std::vector<phi::DenseTensor>& out_tensors,
...@@ -432,7 +558,8 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Barrier( ...@@ -432,7 +558,8 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Barrier(
new paddle::experimental::DefaultAllocator(place)); new paddle::experimental::DefaultAllocator(place));
barrierTensors.emplace_back(allocator.get(), meta); barrierTensors.emplace_back(allocator.get(), meta);
} }
auto task = ProcessGroupNCCL::AllReduce(barrierTensors, barrierTensors); auto task = ProcessGroupNCCL::AllReduce(
barrierTensors, barrierTensors, AllreduceOptions());
auto nccl_task = dynamic_cast<ProcessGroupNCCL::NCCLTask*>(task.get()); auto nccl_task = dynamic_cast<ProcessGroupNCCL::NCCLTask*>(task.get());
nccl_task->barrierTensors_ = std::move(barrierTensors); nccl_task->barrierTensors_ = std::move(barrierTensors);
return task; return task;
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "paddle/fluid/distributed/collective/ProcessGroup.h" #include "paddle/fluid/distributed/collective/ProcessGroupStream.h"
#include "paddle/fluid/distributed/store/store.h" #include "paddle/fluid/distributed/store/store.h"
#include "paddle/fluid/platform/cuda_device_guard.h" #include "paddle/fluid/platform/cuda_device_guard.h"
#include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/device_context.h"
...@@ -46,9 +46,9 @@ namespace distributed { ...@@ -46,9 +46,9 @@ namespace distributed {
using Place = paddle::platform::Place; using Place = paddle::platform::Place;
class ProcessGroupNCCL : public ProcessGroup { class ProcessGroupNCCL : public ProcessGroupStream {
public: public:
class NCCLTask : public ProcessGroup::Task, class NCCLTask : public ProcessGroupStream::TaskStream,
public std::enable_shared_from_this<NCCLTask> { public std::enable_shared_from_this<NCCLTask> {
public: public:
NCCLTask(const std::vector<Place>& places, NCCLTask(const std::vector<Place>& places,
...@@ -56,6 +56,13 @@ class ProcessGroupNCCL : public ProcessGroup { ...@@ -56,6 +56,13 @@ class ProcessGroupNCCL : public ProcessGroup {
CommType CommType, CommType CommType,
const std::vector<phi::DenseTensor>& inputs); const std::vector<phi::DenseTensor>& inputs);
NCCLTask(const std::vector<Place>& places,
int rank,
CommType comm_type,
const std::vector<phi::DenseTensor>& inputs,
bool is_sync,
bool use_calc_stream);
bool IsCompleted(); bool IsCompleted();
void SynchronizeStreams(); void SynchronizeStreams();
...@@ -89,6 +96,14 @@ class ProcessGroupNCCL : public ProcessGroup { ...@@ -89,6 +96,14 @@ class ProcessGroupNCCL : public ProcessGroup {
return std::string(NCCL_BACKEND_NAME); return std::string(NCCL_BACKEND_NAME);
} }
std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
const AllreduceOptions& options,
bool sync_op,
bool use_calc_stream) override;
// TODO(liyurui): This API will be moved later
std::shared_ptr<ProcessGroup::Task> AllReduce( std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& in_tensors, std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors, std::vector<phi::DenseTensor>& out_tensors,
...@@ -194,6 +209,15 @@ class ProcessGroupNCCL : public ProcessGroup { ...@@ -194,6 +209,15 @@ class ProcessGroupNCCL : public ProcessGroup {
Fn fn, Fn fn,
CommType op_type); CommType op_type);
template <typename Fn>
std::shared_ptr<ProcessGroupStream::Task> Collective(
std::vector<phi::DenseTensor>& inputs, // NOLINT
std::vector<phi::DenseTensor>& outputs, // NOLINT
Fn fn,
CommType comm_type,
bool sync_op,
bool use_calc_stream);
template <typename Fn> template <typename Fn>
void Collective(const phi::DenseTensor*, void Collective(const phi::DenseTensor*,
phi::DenseTensor*, phi::DenseTensor*,
......
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/collective/ProcessGroupStream.h"
namespace paddle {
namespace distributed {
ProcessGroupStream::ProcessGroupStream(int rank,
int size,
const platform::Place& place,
int gid)
: ProcessGroup(rank, size, place, gid) {}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::AllReduce(
std::vector<phi::DenseTensor>& input_tensors, // NOLINT
std::vector<phi::DenseTensor>& output_tensors, // NOLINT
const AllreduceOptions& options,
bool sync_op) {
return AllReduce(input_tensors,
output_tensors,
options,
sync_op,
/*use_calc_stream*/ false);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::AllReduce(
std::vector<phi::DenseTensor>& input_tensors, // NOLINT
std::vector<phi::DenseTensor>& output_tensors, // NOLINT
const AllreduceOptions& options,
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support do allreduce", GetBackendName()));
}
} // namespace distributed
} // namespace paddle
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
namespace paddle {
namespace distributed {
// NOTE(liyurui): Notice that some backends use `stream` as an abstract
// conception of hardward resource. We provide this base class allowing users to
// put communications on calculation stream. In some scenorios, we found this
// will save the time of switching streams.
class ProcessGroupStream : public ProcessGroup {
public:
class TaskStream : public ProcessGroup::Task {
public:
// TODO(liyurui): This constructor is temporary here for compatible reason,
// will be deleted soon.
TaskStream(int rank,
const std::vector<phi::DenseTensor>& inputs,
CommType comm_type)
: Task(rank, inputs, comm_type) {}
TaskStream(int rank,
const std::vector<phi::DenseTensor>& inputs,
CommType comm_type,
bool sync_op,
bool use_calc_stream)
: Task(rank, inputs, comm_type, sync_op),
use_calc_stream_(use_calc_stream) {}
virtual ~TaskStream() = default;
protected:
bool UseCalcStream() const { return use_calc_stream_; }
private:
bool use_calc_stream_{false};
};
ProcessGroupStream(int rank, int size, const platform::Place& place, int gid);
virtual ~ProcessGroupStream() = default;
std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& input_tensors, // NOLINT
std::vector<phi::DenseTensor>& output_tensors, // NOLINT
const AllreduceOptions& options,
bool sync_op) override;
virtual std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& input_tensors, // NOLINT
std::vector<phi::DenseTensor>& output_tensors, // NOLINT
const AllreduceOptions& options,
bool sync_op,
bool use_calc_stream);
};
} // namespace distributed
} // namespace paddle
...@@ -22,6 +22,7 @@ limitations under the License. */ ...@@ -22,6 +22,7 @@ limitations under the License. */
#endif #endif
#include "paddle/fluid/distributed/collective/ProcessGroup.h" #include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/distributed/collective/ProcessGroupStream.h"
#include "paddle/fluid/distributed/collective/Types.h" #include "paddle/fluid/distributed/collective/Types.h"
#include "paddle/fluid/distributed/collective/reducer.h" #include "paddle/fluid/distributed/collective/reducer.h"
#include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/lod_tensor.h"
...@@ -134,6 +135,25 @@ void BindDistributed(py::module *m) { ...@@ -134,6 +135,25 @@ void BindDistributed(py::module *m) {
py::arg("op") = distributed::ReduceOp::SUM, py::arg("op") = distributed::ReduceOp::SUM,
py::call_guard<py::gil_scoped_release>()) py::call_guard<py::gil_scoped_release>())
.def(
"allreduce",
[](distributed::ProcessGroup &self,
py::handle py_tensor,
distributed::ReduceOp op,
bool sync_op) {
auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0);
distributed::AllreduceOptions opts;
opts.reduce_op = op;
auto dense =
std::dynamic_pointer_cast<phi::DenseTensor>(tensor.impl());
std::vector<phi::DenseTensor> tensors = {*dense};
return self.AllReduce(tensors, tensors, opts, sync_op);
},
py::arg("tensor"),
py::arg("op"),
py::arg("sync_op"),
py::call_guard<py::gil_scoped_release>())
.def( .def(
"broadcast", "broadcast",
[](distributed::ProcessGroup &self, [](distributed::ProcessGroup &self,
...@@ -384,11 +404,36 @@ void BindDistributed(py::module *m) { ...@@ -384,11 +404,36 @@ void BindDistributed(py::module *m) {
py::arg("op") = distributed::ReduceOp::SUM, py::arg("op") = distributed::ReduceOp::SUM,
py::call_guard<py::gil_scoped_release>()); py::call_guard<py::gil_scoped_release>());
auto ProcessGroupStream =
py::class_<distributed::ProcessGroupStream,
std::shared_ptr<distributed::ProcessGroupStream>>(
*m, "ProcessGroupStream", ProcessGroup)
.def(
"allreduce_on_calc_stream",
[](distributed::ProcessGroupStream &self,
py::handle py_tensor,
distributed::ReduceOp op) {
auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0);
distributed::AllreduceOptions opts;
opts.reduce_op = op;
auto dense =
std::dynamic_pointer_cast<phi::DenseTensor>(tensor.impl());
std::vector<phi::DenseTensor> tensors = {*dense};
return self.AllReduce(tensors,
tensors,
opts,
/*sync_op*/ true,
/*use_calc_stream*/ true);
},
py::arg("tensor"),
py::arg("op"),
py::call_guard<py::gil_scoped_release>());
#if defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL) #if defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)
auto processGroupNCCL = auto processGroupNCCL =
py::class_<distributed::ProcessGroupNCCL, py::class_<distributed::ProcessGroupNCCL,
std::shared_ptr<distributed::ProcessGroupNCCL>>( std::shared_ptr<distributed::ProcessGroupNCCL>>(
*m, "ProcessGroupNCCL", ProcessGroup) *m, "ProcessGroupNCCL", ProcessGroupStream)
.def(py::init<const std::shared_ptr<distributed::Store> &, .def(py::init<const std::shared_ptr<distributed::Store> &,
int, int,
int, int,
...@@ -485,6 +530,7 @@ void BindDistributed(py::module *m) { ...@@ -485,6 +530,7 @@ void BindDistributed(py::module *m) {
py::class_<distributed::ProcessGroup::Task, py::class_<distributed::ProcessGroup::Task,
std::shared_ptr<distributed::ProcessGroup::Task>>(*m, "task") std::shared_ptr<distributed::ProcessGroup::Task>>(*m, "task")
.def("is_completed", &distributed::ProcessGroup::Task::IsCompleted) .def("is_completed", &distributed::ProcessGroup::Task::IsCompleted)
.def("is_sync", &distributed::ProcessGroup::Task::IsSync)
.def("wait", .def("wait",
&distributed::ProcessGroup::Task::Wait, &distributed::ProcessGroup::Task::Wait,
py::arg("timeout") = kWaitTimeout, py::arg("timeout") = kWaitTimeout,
......
...@@ -51,6 +51,8 @@ from .collective import batch_isend_irecv # noqa: F401 ...@@ -51,6 +51,8 @@ from .collective import batch_isend_irecv # noqa: F401
from .collective import P2POp # noqa: F401 from .collective import P2POp # noqa: F401
from .collective import reduce_scatter # noqa: F401 from .collective import reduce_scatter # noqa: F401
from .communication import * # noqa: F401
from .auto_parallel import shard_op # noqa: F401 from .auto_parallel import shard_op # noqa: F401
from .auto_parallel import shard_tensor # noqa: F401 from .auto_parallel import shard_tensor # noqa: F401
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__all__ = ["stream"]
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .all_reduce import all_reduce
__all__ = ["all_reduce"]
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.framework as framework
from ...collective import _get_default_group, _get_reduce_op, ReduceOp
def _all_reduce_in_dygraph(tensor, op, group, sync_op, use_calc_stream):
op_type = _get_reduce_op(op, "all_reduce")
group = _get_default_group() if group is None else group
if use_calc_stream:
return group.process_group.allreduce_on_calc_stream(tensor, op_type)
task = group.process_group.allreduce(tensor, op_type, sync_op)
if sync_op:
task.wait()
return task
def all_reduce(tensor,
op=ReduceOp.SUM,
group=None,
sync_op=True,
use_calc_stream=False):
"""
Perform specific reduction (for example, sum, max) on inputs across devices.
Args:
tensor (Tensor): The input tensor on each rank. The result will overwrite this tenor after communication. Support
float16, float32, float64, int32 or int64 as the input data type.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.Min|ReduceOp.PROD, optional): The reduction used. If none is given, use ReduceOp.SUM as default.
group (Group, optional): Communicate in which group. If none is given, use the global group as default.
sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default.
use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This
option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning.
Returns:
Return a task object.
Warning:
This API only supports the dygraph mode now.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
local_rank = dist.get_rank()
data = None
if local_rank == 0:
data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
else:
data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
task = dist.stream.all_reduce(data, sync_op=False)
task.wait()
out = data.numpy()
# [[5, 7, 9], [5, 7, 9]]
"""
if group is not None and not group.is_member():
raise RuntimeError(
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream:
raise RuntimeError(
"use_calc_stream can only be true in sync op behavior.")
if framework.in_dygraph_mode():
return _all_reduce_in_dygraph(tensor, op, group, sync_op,
use_calc_stream)
raise RuntimeError(
"paddle.distributed.stream.all_reduce is only supported in dygraph mode now."
)
...@@ -272,5 +272,13 @@ if((WITH_GPU ...@@ -272,5 +272,13 @@ if((WITH_GPU
) )
set_tests_properties(test_gen_nccl_id_op PROPERTIES RUN_SERIAL 1) set_tests_properties(test_gen_nccl_id_op PROPERTIES RUN_SERIAL 1)
endif() endif()
if((WITH_GPU) AND (LINUX))
py_test_modules(
test_communication_stream_allreduce_api MODULES
test_communication_stream_allreduce_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_allreduce_api
PROPERTIES TIMEOUT "120" RUN_SERIAL 1)
endif()
add_subdirectory(fleet) add_subdirectory(fleet)
add_subdirectory(multinode) add_subdirectory(multinode)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
import paddle
import paddle.fluid as fluid
import paddle.distributed as dist
import test_communication_api_base as test_base
import test_collective_api_base as test_collective_base
class StreamAllReduceTestCase():
def __init__(self):
self._sync_op = eval(os.getenv("sync_op"))
self._use_calc_stream = eval(os.getenv("use_calc_stream"))
self._backend = os.getenv("backend")
self._shape = eval(os.getenv("shape"))
self._dtype = os.getenv("dtype")
self._seeds = eval(os.getenv("seeds"))
if self._backend not in ["nccl", "gloo"]:
raise NotImplementedError(
"Only support nccl and gloo as the backend for now.")
os.environ["PADDLE_DISTRI_BACKEND"] = self._backend
def run_test_case(self):
dist.init_parallel_env()
test_data_list = []
for seed in self._seeds:
test_data_list.append(
test_collective_base.create_test_data(shape=self._shape,
dtype=self._dtype,
seed=seed))
rank = dist.get_rank()
tensor = paddle.to_tensor(test_data_list[rank])
task = dist.stream.all_reduce(tensor,
sync_op=self._sync_op,
use_calc_stream=self._use_calc_stream)
if not self._sync_op:
task.wait()
result = test_data_list[0]
for i in range(1, len(test_data_list)):
result += test_data_list[i]
assert np.allclose(tensor, result, rtol=1e-05, atol=1e-05)
if __name__ == "__main__":
StreamAllReduceTestCase().run_test_case()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import sys
import tempfile
import itertools
import subprocess
import os
import shutil
class CommunicationTestDistBase(unittest.TestCase):
def setUp(self, save_log_dir=None, num_of_devices=2, timeout=120):
self._python_interp = sys.executable
self._save_log_dir = save_log_dir
self._log_dir = tempfile.TemporaryDirectory()
self._num_of_devices = num_of_devices
self._device_list = [str(i) for i in range(num_of_devices)]
self._timeout = timeout
self._seeds = [i + 10 for i in range(num_of_devices)]
self._devices = ','.join(self._device_list)
def run_test_case(self, script_file, user_defined_envs=None):
runtime_envs = os.environ
runtime_envs.update(user_defined_envs)
runtime_envs["CUDA_VISIBLE_DEVICES"] = self._devices
start_command = f"{self._python_interp} -u -m paddle.distributed.launch --log_dir {self._log_dir.name} --devices {self._devices} {script_file}"
start_command_list = start_command.strip().split()
try:
self._launcher = subprocess.run(start_command_list,
env=runtime_envs,
timeout=self._timeout,
check=True)
except subprocess.TimeoutExpired as err:
raise TimeoutError(
"Timeout while running command {}, try to set a longer period, {} is not enough."
.format(err.cmd, err.timeout))
except subprocess.CalledProcessError as err:
raise RuntimeError(
"Error occurs when running this test case. The return code of command {} is {}"
.format(err.cmd, err.returncode))
def tearDown(self):
if self._save_log_dir:
temp_log_dir_name = os.path.basename(self._log_dir.name)
dir_name = os.path.join(self._save_log_dir, temp_log_dir_name)
if not os.path.isdir(dir_name):
print("The running logs will copy to {}".format(dir_name))
shutil.copytree(self._log_dir.name, dir_name)
else:
raise RuntimeError(
"Directory {} exists, failed to save log.".format(dir_name))
def gen_product_envs_list(default_envs, changeable_envs):
envs_list = list()
for values in itertools.product(*changeable_envs.values()):
envs = dict(zip(changeable_envs.keys(), values))
envs.update(default_envs)
envs_list.append(envs)
return envs_list
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle
import itertools
import test_communication_api_base as test_base
class TestCommunicationStreamAllreduceAPI(test_base.CommunicationTestDistBase):
def setUp(self):
super(TestCommunicationStreamAllreduceAPI, self).setUp(num_of_devices=2,
timeout=120)
self._default_envs = {
"backend": "nccl",
"shape": "(100, 200)",
"dtype": "float32",
"seeds": str(self._seeds)
}
self._changeable_envs = {
"sync_op": ["True", "False"],
"use_calc_stream": ["True", "False"]
}
def test_allreduce_stream(self):
envs_list = test_base.gen_product_envs_list(self._default_envs,
self._changeable_envs)
for envs in envs_list:
if eval(envs["use_calc_stream"]) and not eval(envs["sync_op"]):
continue
self.run_test_case("communication_stream_allreduce_api_dygraph.py",
user_defined_envs=envs)
def tearDown(self):
super(TestCommunicationStreamAllreduceAPI, self).tearDown()
if __name__ == '__main__':
unittest.main()
...@@ -32,3 +32,4 @@ test_collective_wait,linux,gpu;rocm,300,DIST,test_runner.py,2,1,http_proxy=;http ...@@ -32,3 +32,4 @@ test_collective_wait,linux,gpu;rocm,300,DIST,test_runner.py,2,1,http_proxy=;http
test_eager_dist_api,linux,gpu;rocm,120,DIST,test_runner.py,2,1,http_proxy=;https_proxy=;PYTHONPATH=.., test_eager_dist_api,linux,gpu;rocm,120,DIST,test_runner.py,2,1,http_proxy=;https_proxy=;PYTHONPATH=..,
test_new_group_api,linux,gpu;rocm,120,DIST,test_runner.py,2,1,http_proxy=;https_proxy=;PYTHONPATH=.., test_new_group_api,linux,gpu;rocm,120,DIST,test_runner.py,2,1,http_proxy=;https_proxy=;PYTHONPATH=..,
test_gen_nccl_id_op,,gpu;rocm;ASCEND;ASCEND_CL,,DIST,../dist_test.sh,2,1,http_proxy=;https_proxy=;PYTHONPATH=.., test_gen_nccl_id_op,,gpu;rocm;ASCEND;ASCEND_CL,,DIST,../dist_test.sh,2,1,http_proxy=;https_proxy=;PYTHONPATH=..,
test_communication_stream_allreduce_api,linux,gpu;rocm,120,DIST,,2,1,PYTHONPATH=..;http_proxy=;https_proxy=,
...@@ -270,6 +270,8 @@ packages=['paddle', ...@@ -270,6 +270,8 @@ packages=['paddle',
'paddle.dataset', 'paddle.dataset',
'paddle.reader', 'paddle.reader',
'paddle.distributed', 'paddle.distributed',
'paddle.distributed.communication',
'paddle.distributed.communication.stream',
'paddle.distributed.metric', 'paddle.distributed.metric',
'paddle.distributed.ps', 'paddle.distributed.ps',
'paddle.distributed.ps.utils', 'paddle.distributed.ps.utils',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册