未验证 提交 5fba2a98 编写于 作者: W Wen Sun 提交者: GitHub

[Cherry-pick] Collective communication APIs (#46922)

* Support both use_calc_stream and sync_op in send recv APIs (#46023)

* Support both use_calc_stream and sync_op in allgather API (#46295)

* Support both use_calc_stream and sync_op in collective communication API (#46761)

* Move group and all reduce from collective to communication (#45848)

* Completes bfloat16 dtype for collective api in eager mode (#45844)

* Fix collective APIs cannot be recognized when building docs (#46962)
Co-authored-by: NLiYuRio <63526175+LiYuRio@users.noreply.github.com>
上级 10225d22
......@@ -122,6 +122,16 @@ class ProcessGroup {
"ProcessGroup%s does not support broadcast", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Broadcast(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
const BroadcastOptions&,
bool) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support broadcast with sync_op flag",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Barrier(
const BarrierOptions& = BarrierOptions()) {
PADDLE_THROW(platform::errors::InvalidArgument(
......@@ -134,38 +144,89 @@ class ProcessGroup {
"ProcessGroup%s does not support send", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Send(
std::vector<phi::DenseTensor>&, int, bool) { // NOLINT
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support send with sync_op flag",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>& tensors, int) { // NOLINT
std::vector<phi::DenseTensor>&, int) { // NOLINT
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support receive", GetBackendName()));
"ProcessGroup%s does not support recv", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Send_Partial(phi::DenseTensor&,
int,
int,
int) { // NOLINT
virtual std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>&, int, bool) { // NOLINT
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support send", GetBackendName()));
"ProcessGroup%s does not support recv with sync_op flag",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Send_Partial(
phi::DenseTensor&, // NOLINT
int,
int64_t,
int64_t) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support send_partial", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Send_Partial(
phi::DenseTensor&, int, int64_t, int64_t, bool) { // NOLINT
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support send_partial with sync_op flag",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Recv_Partial(
phi::DenseTensor& tensors, int, int, int) { // NOLINT
phi::DenseTensor&, // NOLINT
int,
int64_t,
int64_t) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support receive", GetBackendName()));
"ProcessGroup%s does not support recv_partial", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Recv_Partial(
phi::DenseTensor&, int, int64_t, int64_t, bool) { // NOLINT
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support recv_partial with sync_op flag",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> AllGather(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&) { // NOLINT
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support AllGather", GetBackendName()));
"ProcessGroup%s does not support all_gather", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> AllGather(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&, // NOLINT
bool) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support all_gather with sync_op flag",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> AllGather_Partial(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
int offset,
int length) { // NOLINT
int64_t offset,
int64_t length) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support AllGather_Partial", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> AllGather_Partial(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
int64_t offset,
int64_t length,
bool) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support AllGather_Partial", GetBackendName()));
}
......@@ -177,6 +238,14 @@ class ProcessGroup {
"ProcessGroup%s does not support AllToAll", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> AllToAll(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&, // NOLINT
bool) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support alltoall", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> AllToAll_Single(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&, // NOLINT
......@@ -186,26 +255,66 @@ class ProcessGroup {
"ProcessGroup%s does not support AllToAll_Single", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> AllToAllSingle(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<int64_t>&,
std::vector<int64_t>&,
bool) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support alltoall_single", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Reduce(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&, // NOLINT
const ReduceOptions& opts) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support Reduce", GetBackendName()));
"ProcessGroup%s does not support reduce", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Reduce(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
const ReduceOptions&,
bool) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support reduce with sync_op flag",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Scatter(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&, // NOLINT
const ScatterOptions&) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support scatter", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Scatter(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&, // NOLINT
const ScatterOptions&) { // NOLINT
const ScatterOptions&,
bool) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support scatter with sync_op flag",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> ReduceScatter(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&, // NOLINT
const ReduceScatterOptions&,
bool) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support Scatter", GetBackendName()));
"ProcessGroup%s does not support reduce_scatter with sync_op flag",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> _ReduceScatterBase(
phi::DenseTensor&, // NOLINT
phi::DenseTensor&, // NOLINT
const ReduceScatterOptions&) { // NOLINT
phi::DenseTensor&, // NOLINT
phi::DenseTensor&, // NOLINT
const ReduceScatterOptions&) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support ReduceScatter", GetBackendName()));
}
......
......@@ -267,8 +267,8 @@ void* XcclGetPointerByOffset(void* raw_pointer,
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::AllGather_Partial(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
int offset,
int length) {
int64_t offset,
int64_t length) {
PADDLE_ENFORCE_EQ(
CheckTensorsInCustomPlace(in_tensors, device_type_),
true,
......
......@@ -80,8 +80,8 @@ class ProcessGroupCustom : public ProcessGroup {
std::shared_ptr<ProcessGroup::Task> AllGather_Partial(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
int offset,
int length) override;
int64_t offset,
int64_t length) override;
std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& in_tensors,
......@@ -117,8 +117,8 @@ class ProcessGroupCustom : public ProcessGroup {
std::set<int> used_place_ids_;
private:
void BcastCustomId(std::vector<phi::ccl::CCLRootId>& ccl_ids,
int root, // NOLINT
void BcastCustomId(std::vector<phi::ccl::CCLRootId>& ccl_ids, // NOLINT
int root,
int server_fd);
void BroadcastUniqueCustomID(
......
......@@ -88,6 +88,9 @@ namespace distributed {
case experimental::DataType::BOOL: \
func<bool>(args); \
break; \
case experimental::DataType::BFLOAT16: \
func<bfloat16>(args); \
break; \
default: \
VLOG(0) << "Error: Unknown DataType."; \
exit(-1); \
......@@ -293,6 +296,14 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::AllReduce(
std::vector<phi::DenseTensor>& inputs,
std::vector<phi::DenseTensor>& outputs,
const AllreduceOptions& opts) {
return AllReduce(inputs, outputs, opts, true);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::AllReduce(
std::vector<phi::DenseTensor>& inputs,
std::vector<phi::DenseTensor>& outputs,
const AllreduceOptions& opts,
bool sync_op) {
auto tag = next_tag();
std::shared_ptr<GlooTask> task;
auto context = get_context();
......
......@@ -120,6 +120,12 @@ class ProcessGroupGloo : public ProcessGroup {
std::vector<phi::DenseTensor>& outputs,
const AllreduceOptions& opts = AllreduceOptions()) override;
std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& inputs,
std::vector<phi::DenseTensor>& outputs,
const AllreduceOptions& opts,
bool sync_op) override;
std::shared_ptr<ProcessGroup::Task> Barrier(
const BarrierOptions& = BarrierOptions()) override;
......
......@@ -60,7 +60,7 @@ class ProcessGroupNCCL : public ProcessGroupStream {
int rank,
CommType comm_type,
const std::vector<phi::DenseTensor>& inputs,
bool is_sync,
bool sync_op,
bool use_calc_stream);
bool IsCompleted();
......@@ -98,6 +98,9 @@ class ProcessGroupNCCL : public ProcessGroupStream {
phi::DeviceContext* GetDeviceContext(const Place& place) const override;
phi::DeviceContext* GetDeviceContext(const Place& place,
bool use_calc_stream) const override;
std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
......@@ -116,38 +119,93 @@ class ProcessGroupNCCL : public ProcessGroupStream {
std::vector<phi::DenseTensor>& out_tensors,
const BroadcastOptions& = BroadcastOptions()) override;
std::shared_ptr<ProcessGroup::Task> Broadcast(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const BroadcastOptions& opts,
bool sync_op,
bool use_calc_stream) override;
std::shared_ptr<ProcessGroup::Task> Barrier(
const BarrierOptions& = BarrierOptions()) override;
std::shared_ptr<ProcessGroup::Task> Send(
std::vector<phi::DenseTensor>& tensors, int dst_rank) override;
std::shared_ptr<ProcessGroup::Task> Send(
std::vector<phi::DenseTensor>& tensors,
int dst_rank,
bool sync_op,
bool use_calc_stream) override;
std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>& tensors, int src_rank) override;
std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>& tensors,
int src_rank,
bool sync_op,
bool use_calc_stream) override;
std::shared_ptr<ProcessGroup::Task> Send_Partial(phi::DenseTensor& tensors,
int dst_rank,
int offset,
int length) override;
int64_t offset,
int64_t length) override;
std::shared_ptr<ProcessGroup::Task> Send_Partial(
phi::DenseTensor& tensors,
int dst_rank,
int64_t offset,
int64_t length,
bool sync_op,
bool use_calc_stream) override;
std::shared_ptr<ProcessGroup::Task> Recv_Partial(phi::DenseTensor& tensors,
int src_rank,
int offset,
int length) override;
int64_t offset,
int64_t length) override;
std::shared_ptr<ProcessGroup::Task> Recv_Partial(
phi::DenseTensor& tensors,
int src_rank,
int64_t offset,
int64_t length,
bool sync_op,
bool use_calc_stream) override;
std::shared_ptr<ProcessGroup::Task> AllGather(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors) override;
std::shared_ptr<ProcessGroup::Task> AllGather(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
bool sync_op,
bool use_calc_stream) override;
std::shared_ptr<ProcessGroup::Task> AllGather_Partial(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
int64_t offset,
int64_t length) override;
std::shared_ptr<ProcessGroup::Task> AllGather_Partial(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
int offset,
int length) override;
int64_t offset,
int64_t length,
bool sync_op,
bool use_calc_stream) override;
std::shared_ptr<ProcessGroup::Task> AllToAll(
std::vector<phi::DenseTensor>& in,
std::vector<phi::DenseTensor>& out) override;
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors) override;
std::shared_ptr<ProcessGroup::Task> AllToAll(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
bool sync_op,
bool use_calc_stream) override;
std::shared_ptr<ProcessGroup::Task> AllToAll_Single(
std::vector<phi::DenseTensor>& in,
......@@ -155,15 +213,44 @@ class ProcessGroupNCCL : public ProcessGroupStream {
std::vector<int64_t>& in_sizes,
std::vector<int64_t>& out_sizes) override;
std::shared_ptr<ProcessGroup::Task> AllToAllSingle(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
std::vector<int64_t>& in_sizes,
std::vector<int64_t>& out_sizes,
bool sync_op,
bool use_calc_stream) override;
std::shared_ptr<ProcessGroup::Task> Reduce(
std::vector<phi::DenseTensor>& tensors,
std::vector<phi::DenseTensor>& out_tensors,
const ReduceOptions& opts) override;
std::shared_ptr<ProcessGroup::Task> Reduce(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const ReduceOptions& opts,
bool sync_op,
bool use_calc_stream) override;
std::shared_ptr<ProcessGroup::Task> ReduceScatter(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const ReduceScatterOptions& opts,
bool sync_op,
bool use_calc_stream) override;
std::shared_ptr<ProcessGroup::Task> Scatter(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const ScatterOptions& opts) override;
std::shared_ptr<ProcessGroup::Task> Scatter(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const ScatterOptions&) override;
const ScatterOptions& opts,
bool sync_op,
bool use_calc_stream) override;
std::shared_ptr<ProcessGroup::Task> _ReduceScatterBase(
phi::DenseTensor&, // NOLINT
......@@ -180,9 +267,17 @@ class ProcessGroupNCCL : public ProcessGroupStream {
virtual std::shared_ptr<ProcessGroupNCCL::NCCLTask> CreateTask(
std::vector<Place> places,
int rank,
CommType opType,
CommType op_type,
const std::vector<phi::DenseTensor>& inputs);
virtual std::shared_ptr<ProcessGroupNCCL::NCCLTask> CreateTask(
const std::vector<Place>& places,
int rank,
CommType op_type,
const std::vector<phi::DenseTensor>& inputs,
bool sync_op,
bool use_calc_stream);
protected:
std::shared_ptr<Store> store_;
std::shared_ptr<NCCLCommManager> nccl_comm_;
......@@ -233,6 +328,15 @@ class ProcessGroupNCCL : public ProcessGroupStream {
int dst_rank,
CommType op_type);
template <typename Fn>
std::shared_ptr<ProcessGroup::Task> PointToPoint(
std::vector<phi::DenseTensor>& tensors, // NOLINT
Fn fn,
int dst_rank,
CommType op_type,
bool sync_op,
bool use_calc_stream);
void CreateNCCLManagerCache(const std::string& places_key,
const std::vector<Place>& places);
......
......@@ -23,6 +23,31 @@ ProcessGroupStream::ProcessGroupStream(int rank,
int gid)
: ProcessGroup(rank, size, place, gid) {}
phi::DeviceContext* ProcessGroupStream::GetDeviceContext(
const Place& place, bool use_calc_stream) const {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support get device_context.", GetBackendName()));
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::AllGather(
std::vector<phi::DenseTensor>& input_tensors, // NOLINT
std::vector<phi::DenseTensor>& output_tensors, // NOLINT
bool sync_op) {
return AllGather(input_tensors,
output_tensors,
sync_op,
/*use_calc_stream*/ false);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::AllGather(
std::vector<phi::DenseTensor>& input_tensors, // NOLINT
std::vector<phi::DenseTensor>& output_tensors, // NOLINT
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support do all_gather", GetBackendName()));
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::AllReduce(
std::vector<phi::DenseTensor>& input_tensors, // NOLINT
std::vector<phi::DenseTensor>& output_tensors, // NOLINT
......@@ -42,7 +67,248 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::AllReduce(
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support do allreduce", GetBackendName()));
"ProcessGroup%s does not support do all_reduce", GetBackendName()));
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::AllToAll(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
bool sync_op) {
return AllToAll(in_tensors,
out_tensors,
sync_op,
/*use_calc_stream*/ false);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::AllToAll(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support do alltoall", GetBackendName()));
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::AllToAllSingle(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
std::vector<int64_t>& in_sizes,
std::vector<int64_t>& out_sizes,
bool sync_op) {
return AllToAllSingle(in_tensors,
out_tensors,
in_sizes,
out_sizes,
sync_op,
/*use_calc_stream*/ false);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::AllToAllSingle(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
std::vector<int64_t>& in_sizes,
std::vector<int64_t>& out_sizes,
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support do alltoall_single", GetBackendName()));
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::Broadcast(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const BroadcastOptions& opts,
bool sync_op) {
return Broadcast(in_tensors,
out_tensors,
opts,
sync_op,
/*use_calc_stream*/ false);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::Broadcast(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const BroadcastOptions& opts,
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support do broadcast", GetBackendName()));
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::Reduce(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const ReduceOptions& opts,
bool sync_op) {
return Reduce(in_tensors,
out_tensors,
opts,
sync_op,
/*use_calc_stream*/ false);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::Reduce(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const ReduceOptions& opts,
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support do reduce", GetBackendName()));
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::ReduceScatter(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const ReduceScatterOptions& opts,
bool sync_op) {
return ReduceScatter(in_tensors,
out_tensors,
opts,
sync_op,
/*use_calc_stream*/ false);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::ReduceScatter(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const ReduceScatterOptions& opts,
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support do reduce_scatter", GetBackendName()));
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::Scatter(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const ScatterOptions& opts,
bool sync_op) {
return Scatter(in_tensors,
out_tensors,
opts,
sync_op,
/*use_calc_stream*/ false);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::Scatter(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const ScatterOptions& opts,
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support do scatter", GetBackendName()));
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::Send(
std::vector<phi::DenseTensor>& tensors, int dst_rank, bool sync_op) {
return Send(tensors,
dst_rank,
sync_op,
/*use_calc_stream*/ false);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::Send(
std::vector<phi::DenseTensor>& tensors,
int dst_rank,
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support do send", GetBackendName()));
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::Send_Partial(
phi::DenseTensor& tensors,
int dst_rank,
int64_t offset,
int64_t length,
bool sync_op) {
return Send_Partial(tensors,
dst_rank,
offset,
length,
sync_op,
/*use_calc_stream*/ false);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::Send_Partial(
phi::DenseTensor& tensors,
int dst_rank,
int64_t offset,
int64_t length,
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support do send_partial", GetBackendName()));
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::Recv(
std::vector<phi::DenseTensor>& tensors, int src_rank, bool sync_op) {
return Recv(tensors,
src_rank,
sync_op,
/*use_calc_stream*/ false);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::Recv(
std::vector<phi::DenseTensor>& tensors,
int src_rank,
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support do recv", GetBackendName()));
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::Recv_Partial(
phi::DenseTensor& tensors,
int src_rank,
int64_t offset,
int64_t length,
bool sync_op) {
return Recv_Partial(tensors,
src_rank,
offset,
length,
sync_op,
/*use_calc_stream*/ false);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::Recv_Partial(
phi::DenseTensor& tensors,
int src_rank,
int64_t offset,
int64_t length,
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support do recv_partial", GetBackendName()));
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::AllGather_Partial(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
int64_t offset,
int64_t length,
bool sync_op) {
return AllGather_Partial(in_tensors,
out_tensors,
offset,
length,
sync_op,
/*use_calc_stream*/ false);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::AllGather_Partial(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
int64_t offset,
int64_t length,
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support do recv_partial", GetBackendName()));
}
} // namespace distributed
......
......@@ -54,6 +54,20 @@ class ProcessGroupStream : public ProcessGroup {
ProcessGroupStream(int rank, int size, const platform::Place& place, int gid);
virtual ~ProcessGroupStream() = default;
virtual phi::DeviceContext* GetDeviceContext(const Place& place,
bool use_calc_stream) const;
std::shared_ptr<ProcessGroup::Task> AllGather(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
bool sync_op) override;
virtual std::shared_ptr<ProcessGroup::Task> AllGather(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
bool sync_op,
bool use_calc_stream);
std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& input_tensors, // NOLINT
std::vector<phi::DenseTensor>& output_tensors, // NOLINT
......@@ -66,6 +80,151 @@ class ProcessGroupStream : public ProcessGroup {
const AllreduceOptions& options,
bool sync_op,
bool use_calc_stream);
std::shared_ptr<ProcessGroup::Task> AllToAll(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
bool sync_op) override;
virtual std::shared_ptr<ProcessGroup::Task> AllToAll(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
bool sync_op,
bool use_calc_stream);
std::shared_ptr<ProcessGroup::Task> AllToAllSingle(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
std::vector<int64_t>& in_sizes, // NOLINT
std::vector<int64_t>& out_sizes, // NOLINT
bool sync_op) override;
virtual std::shared_ptr<ProcessGroup::Task> AllToAllSingle(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
std::vector<int64_t>& in_sizes, // NOLINT
std::vector<int64_t>& out_sizes, // NOLINT
bool sync_op,
bool use_calc_stream);
std::shared_ptr<ProcessGroup::Task> Broadcast(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
const BroadcastOptions& opts,
bool sync_op) override;
virtual std::shared_ptr<ProcessGroup::Task> Broadcast(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
const BroadcastOptions& opts,
bool sync_op,
bool use_calc_stream);
std::shared_ptr<ProcessGroup::Task> Reduce(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
const ReduceOptions& opts,
bool sync_op) override;
virtual std::shared_ptr<ProcessGroup::Task> Reduce(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
const ReduceOptions& opts,
bool sync_op,
bool use_calc_stream);
std::shared_ptr<ProcessGroup::Task> ReduceScatter(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
const ReduceScatterOptions& opts,
bool sync_op) override;
virtual std::shared_ptr<ProcessGroup::Task> ReduceScatter(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
const ReduceScatterOptions& opts,
bool sync_op,
bool use_calc_stream);
std::shared_ptr<ProcessGroup::Task> Scatter(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
const ScatterOptions& opts,
bool sync_op) override;
virtual std::shared_ptr<ProcessGroup::Task> Scatter(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
const ScatterOptions& opts,
bool sync_op,
bool use_calc_stream);
std::shared_ptr<ProcessGroup::Task> Send(
std::vector<phi::DenseTensor>& tensors, // NOLINT
int dst_rank,
bool sync_op) override;
virtual std::shared_ptr<ProcessGroup::Task> Send(
std::vector<phi::DenseTensor>& tensors, // NOLINT
int dst_rank,
bool sync_op,
bool use_calc_stream);
std::shared_ptr<ProcessGroup::Task> Send_Partial(
phi::DenseTensor& tensors, // NOLINT
int dst_rank,
int64_t offset,
int64_t length,
bool sync_op) override;
virtual std::shared_ptr<ProcessGroup::Task> Send_Partial(
phi::DenseTensor& tensors, // NOLINT
int dst_rank,
int64_t offset,
int64_t length,
bool sync_op,
bool use_calc_stream);
std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>& tensors, // NOLINT
int src_rank,
bool sync_op) override;
virtual std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>& tensors, // NOLINT
int src_rank,
bool sync_op,
bool use_calc_stream);
std::shared_ptr<ProcessGroup::Task> Recv_Partial(
phi::DenseTensor& tensors, // NOLINT
int src_rank,
int64_t offset,
int64_t length,
bool sync_op) override;
virtual std::shared_ptr<ProcessGroup::Task> Recv_Partial(
phi::DenseTensor& tensors, // NOLINT
int src_rank,
int64_t offset,
int64_t length,
bool sync_op,
bool use_calc_stream);
std::shared_ptr<ProcessGroup::Task> AllGather_Partial(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
int64_t offset,
int64_t length,
bool sync_op) override;
virtual std::shared_ptr<ProcessGroup::Task> AllGather_Partial(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
int64_t offset,
int64_t length,
bool sync_op,
bool use_calc_stream);
};
} // namespace distributed
......
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "paddle/fluid/platform/device_context.h"
#include "paddle/phi/api/include/tensor.h"
#include "paddle/phi/backends/device_guard.h"
#include "paddle/phi/backends/device_manager.h"
#include "paddle/phi/kernels/funcs/concat_and_split_functor.h"
namespace paddle {
namespace distributed {
template <typename DeviceContext, typename T>
struct ConcatDenseTensor {
void operator()(const DeviceContext *context,
const std::vector<phi::DenseTensor> &in,
phi::DenseTensor *out,
int axis = 0) {
phi::funcs::ConcatFunctor<DeviceContext, T> concat_functor;
concat_functor(*context, in, axis, out);
}
};
template <typename DeviceContext, typename T>
struct SplitDenseTensor {
void operator()(const DeviceContext *context,
const phi::DenseTensor &in,
std::vector<phi::DenseTensor *> *out,
int axis = 0) {
std::vector<const phi::DenseTensor *> shape_refer;
shape_refer.reserve(out->size());
for (auto *p_tensor : *out) {
shape_refer.emplace_back(p_tensor);
}
phi::funcs::SplitFunctor<DeviceContext, T> split_functor;
split_functor(*context, in, shape_refer, axis, out);
}
};
#ifdef PADDLE_WITH_CUSTOM_DEVICE
template <typename T>
struct ConcatDenseTensor<platform::CustomDeviceContext, T> {
void operator()(const platform::CustomDeviceContext *context,
const std::vector<phi::DenseTensor> &in,
phi::DenseTensor *out,
int axis = 0) {
auto *out_data = out->data<T>();
auto *device = phi::DeviceManager::GetDeviceWithPlace(context->GetPlace());
size_t offset = 0;
for (const auto &tensor : in) {
const auto *in_data = tensor.data<T>();
auto sz = tensor.numel() * sizeof(T);
device->MemoryCopyD2D(out_data + offset, in_data, sz, nullptr);
offset += sz;
}
}
};
template <typename T>
struct SplitDenseTensor<platform::CustomDeviceContext, T> {
void operator()(const platform::CustomDeviceContext *context,
const phi::DenseTensor &in,
std::vector<phi::DenseTensor *> *out,
int axis = 0) {
auto *in_data = in.data<T>();
auto *device = phi::DeviceManager::GetDeviceWithPlace(context->GetPlace());
size_t offset = 0;
for (auto *p_tensor : *out) {
auto *out_data = p_tensor->data<T>();
auto sz = p_tensor->numel() * sizeof(T);
device->MemoryCopyD2D(out_data, in_data + offset, sz, nullptr);
offset += sz;
}
}
};
#endif
template <typename DeviceContext>
void ConcatDenseTensorWithType(const DeviceContext *dev_ctx,
const std::vector<phi::DenseTensor> &t_list,
phi::DenseTensor *p_out,
phi::DataType type) {
switch (type) {
case phi::DataType::BOOL:
ConcatDenseTensor<DeviceContext, bool>()(dev_ctx, t_list, p_out);
break;
case phi::DataType::UINT8:
ConcatDenseTensor<DeviceContext, uint8_t>()(dev_ctx, t_list, p_out);
break;
case phi::DataType::INT8:
ConcatDenseTensor<DeviceContext, int8_t>()(dev_ctx, t_list, p_out);
break;
case phi::DataType::INT32:
ConcatDenseTensor<DeviceContext, int32_t>()(dev_ctx, t_list, p_out);
break;
case phi::DataType::INT64:
ConcatDenseTensor<DeviceContext, int64_t>()(dev_ctx, t_list, p_out);
break;
case phi::DataType::FLOAT16:
ConcatDenseTensor<DeviceContext, platform::float16>()(
dev_ctx, t_list, p_out);
break;
case phi::DataType::FLOAT32:
ConcatDenseTensor<DeviceContext, float>()(dev_ctx, t_list, p_out);
break;
case phi::DataType::FLOAT64:
ConcatDenseTensor<DeviceContext, double>()(dev_ctx, t_list, p_out);
break;
default:
PADDLE_THROW(platform::errors::Unimplemented(
"Data type (%s) is not supported when it concats tensors.", type));
}
}
template <typename DeviceContext>
void SplitDenseTensorWithType(const DeviceContext *dev_ctx,
const phi::DenseTensor &t_in,
std::vector<phi::DenseTensor *> *p_list,
phi::DataType type) {
switch (type) {
case phi::DataType::BOOL:
SplitDenseTensor<DeviceContext, bool>()(dev_ctx, t_in, p_list);
break;
case phi::DataType::UINT8:
SplitDenseTensor<DeviceContext, uint8_t>()(dev_ctx, t_in, p_list);
break;
case phi::DataType::INT8:
SplitDenseTensor<DeviceContext, int8_t>()(dev_ctx, t_in, p_list);
break;
case phi::DataType::INT32:
SplitDenseTensor<DeviceContext, int32_t>()(dev_ctx, t_in, p_list);
break;
case phi::DataType::INT64:
SplitDenseTensor<DeviceContext, int64_t>()(dev_ctx, t_in, p_list);
break;
case phi::DataType::FLOAT16:
SplitDenseTensor<DeviceContext, platform::float16>()(
dev_ctx, t_in, p_list);
break;
case phi::DataType::FLOAT32:
SplitDenseTensor<DeviceContext, float>()(dev_ctx, t_in, p_list);
break;
case phi::DataType::FLOAT64:
SplitDenseTensor<DeviceContext, double>()(dev_ctx, t_in, p_list);
break;
default:
PADDLE_THROW(platform::errors::Unimplemented(
"Data type (%s) is not supported when it splits tensors.", type));
}
}
void ConcatTensor(const phi::DeviceContext *dev_ctx,
const std::vector<phi::DenseTensor> &tensor_list,
const experimental::Tensor *tensor) {
auto *dense_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(tensor->impl()).get();
const auto &place = dev_ctx->GetPlace();
if (platform::is_gpu_place(place)) {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
ConcatDenseTensorWithType(static_cast<const phi::GPUContext *>(dev_ctx),
tensor_list,
dense_tensor,
tensor->dtype());
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't concat tensor since it's not support GPU, please "
"recompile or reinstall Paddle with GPU support."));
#endif
} else if (platform::is_custom_place(place)) {
#ifdef PADDLE_WITH_CUSTOM_DEVICE
ConcatDenseTensorWithType(
static_cast<const platform::CustomDeviceContext *>(dev_ctx),
tensor_list,
dense_tensor,
tensor->dtype());
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't concat tensor since it's not compiled with "
"CUSTOM_DEVICE, please recompile or reinstall Paddle with "
"CUSTOM_DEVICE support."));
#endif
} else if (platform::is_cpu_place(place)) {
ConcatDenseTensorWithType(static_cast<const phi::CPUContext *>(dev_ctx),
tensor_list,
dense_tensor,
tensor->dtype());
} else {
PADDLE_THROW(platform::errors::Unimplemented(
"Concat tensor not supported on place (%s)", place));
}
}
void SplitTensor(const phi::DeviceContext *dev_ctx,
const phi::DenseTensor &tensor,
const std::vector<experimental::Tensor> *tensor_list) {
std::vector<phi::DenseTensor *> dense_list;
for (auto &tensor : *tensor_list) {
auto p_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(tensor.impl()).get();
dense_list.emplace_back(p_tensor);
}
const auto &place = dev_ctx->GetPlace();
if (platform::is_gpu_place(place)) {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
SplitDenseTensorWithType(static_cast<const phi::GPUContext *>(dev_ctx),
tensor,
&dense_list,
tensor.dtype());
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't split tensor since it's not support GPU, please "
"recompile or reinstall Paddle with GPU support."));
#endif
} else if (platform::is_custom_place(place)) {
#ifdef PADDLE_WITH_CUSTOM_DEVICE
SplitDenseTensorWithType(
static_cast<const platform::CustomDeviceContext *>(dev_ctx),
tensor,
&dense_list,
tensor.dtype());
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't split tensor since it's not compiled with CUSTOM_DEVICE, "
"please recompile or reinstall Paddle with CUSTOM_DEVICE support."));
#endif
} else if (platform::is_cpu_place(place)) {
SplitDenseTensorWithType(static_cast<const phi::CPUContext *>(dev_ctx),
tensor,
&dense_list,
tensor.dtype());
} else {
PADDLE_THROW(platform::errors::Unimplemented(
"Split tensor not supported on place (%s)", place));
}
}
} // namespace distributed
} // namespace paddle
......@@ -59,7 +59,7 @@ inline ncclDataType_t ToNCCLDataType(framework::proto::VarType::Type type) {
return ncclUint8;
} else if (type == framework::proto::VarType::BOOL) {
return ncclUint8;
#if CUDNN_VERSION_MIN(8, 1, 0) && NCCL_VERSION_CODE >= 21000
#if NCCL_VERSION_CODE >= 21000
} else if (type == framework::proto::VarType::BF16) {
return ncclBfloat16;
#endif
......@@ -86,7 +86,7 @@ inline ncclDataType_t ToNCCLDataType(experimental::DataType type) {
return ncclInt8;
} else if (type == experimental::DataType::BOOL) {
return ncclUint8;
#if CUDNN_VERSION_MIN(8, 1, 0) && NCCL_VERSION_CODE >= 21000
#if NCCL_VERSION_CODE >= 21000
} else if (type == experimental::DataType::BFLOAT16) {
return ncclBfloat16;
#endif
......
......@@ -74,7 +74,7 @@ __all__ = [ # noqa
"gloo_release", "QueueDataset", "split", "CountFilterEntry",
"ShowClickEntry", "get_world_size", "get_group", "all_gather",
"all_gather_object", "InMemoryDataset", "barrier", "all_reduce", "alltoall",
"send", "reduce", "recv", "ReduceOp", "wait", "get_rank",
"ProbabilityEntry", "ParallelMode", "is_initialized", "isend", "irecv",
"reduce_scatter"
"alltoall_single", "send", "reduce", "recv", "ReduceOp", "wait", "get_rank",
"ProbabilityEntry", "ParallelMode", "is_initialized",
"destroy_process_group", "isend", "irecv", "reduce_scatter", "stream"
]
......@@ -52,54 +52,12 @@ from .fleet.layers.mpu.mp_ops import _c_softmax_with_cross_entropy
from .fleet.layers.mpu.mp_ops import _linear
from .fleet.layers.mpu.mp_ops import _parallel_linear
from .fleet.layers.mpu.mp_ops import _parallel_embedding
from .communication.comm_utils import ReduceOp
from .communication.group import Group, _add_new_group
from .communication.all_reduce import all_reduce
from .communication.reduce import _get_reduce_op, ReduceOp
__all__ = []
class Group():
"""
The abstract representation of group.
"""
def __init__(self, rank, rank_num, id=0, ranks=[], pg=None, name=None):
self.rank = rank
self.nranks = rank_num
self.id = id
self.ranks = ranks
self.pg = pg
self.name = name
def is_member(self):
if self.rank < 0:
return False
if self.nranks < 2:
return False
return True
def get_group_rank(self, rank):
if self.is_member() and rank in self.ranks:
return self.ranks.index(rank)
else:
return -1
@property
def process_group(self):
return self.pg
@property
def world_size(self):
return self.nranks if self.rank >= 0 else -1
def __repr__(self):
debug_str = "rank: {}, nranks: {}, id: {}, ranks: ".format(
self.rank, self.nranks, self.id)
debug_str += ", ".join(map(str, self.ranks))
debug_str += "; name: "
debug_str += self.name if self.name else "None"
return debug_str
_global_env = None
......@@ -147,9 +105,8 @@ def _get_group_map():
global _group_map
if _global_env_gid not in _group_map:
genv = _get_global_env()
_group_map[_global_env_gid] = Group(genv.rank,
genv.world_size,
ranks=list(range(genv.world_size)))
_group_map[_global_env_gid] = Group(genv.rank, 0,
list(range(genv.world_size)))
return _group_map
......@@ -197,19 +154,6 @@ def _new_ring_id():
return len(_get_group_map()) + max(_get_global_env().nrings, 9)
def _get_reduce_op(reduce_op, func_name):
if reduce_op == ReduceOp.SUM:
return core.ReduceOp.SUM
elif reduce_op == ReduceOp.MAX:
return core.ReduceOp.MAX
elif reduce_op == ReduceOp.MIN:
return core.ReduceOp.MIN
elif reduce_op == ReduceOp.PROD:
return core.ReduceOp.PRODUCT
else:
raise ValueError("Unknown reduce_op type for {}.".format(func_name))
def get_group(id=0):
"""
......@@ -451,10 +395,13 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout):
else:
rank = -1
pg = None
group = Group(rank, size, id=gid, ranks=ranks, pg=pg, name=group_name)
group = Group(rank, gid, ranks, pg=pg, name=group_name)
_group_map_by_name[group_name] = group
_group_map[gid] = group
_group_map_backend[group] = backend
#TODO: The method below is a new method for group management, will replace the previous
# three in the future.
_add_new_group(group)
# TODO(shenliang03): This is a temporary solution to solve the problem of
# hang caused by tcp
......@@ -476,13 +423,13 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout):
ring_id = _new_ring_id()
if global_rank not in ranks:
gp = Group(-1, -1, ring_id, ranks)
gp = Group(-1, ring_id, ranks)
_group_map[ring_id] = gp
else:
ranks = sorted(ranks)
group_rank = ranks.index(global_rank)
group_size = len(ranks)
gp = Group(group_rank, group_size, ring_id, ranks)
gp = Group(group_rank, ring_id, ranks)
_group_map[ring_id] = gp
if group_size >= 2:
......@@ -531,7 +478,8 @@ def is_initialized():
Check whether the distributed environment has been initialized
Returns (bool): `True` if distributed environment has been initialized, otherwise `False`.
Returns:
`True` if distributed environment has been initialized, otherwise `False`.
Examples:
.. code-block:: python
......@@ -679,7 +627,7 @@ def broadcast(tensor, src, group=None, sync_op=True):
Args:
tensor (Tensor): The Tensor to send if current rank is the source, or the Tensor to receive otherwise. Its data type
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
src (int): The source rank.
group (Group, optional): The group instance return by new_group or None for global default group.
sync_op (bool, optional): Whether this op is a sync op. The default value is True.
......@@ -748,104 +696,6 @@ def broadcast(tensor, src, group=None, sync_op=True):
})
def all_reduce(tensor, op=ReduceOp.SUM, group=None, sync_op=True):
"""
Reduce a tensor over all ranks so that all get the result.
As shown below, one process is started with a GPU and the data of this process is represented
by its group rank. The reduce operator is sum. Through all_reduce operator,
each GPU will have the sum of the data from all GPUs.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/allreduce.png
:width: 800
:alt: all_reduce
:align: center
Args:
tensor (Tensor): The input Tensor. It also works as the output Tensor. Its data type
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The operation used. Default value is ReduceOp.SUM.
group (Group, optional): The group instance return by new_group or None for global default group.
sync_op (bool, optional): Wether this op is a sync op. Default value is True.
Returns:
None.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
if dist.get_rank() == 0:
data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
else:
data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
dist.all_reduce(data)
print(data)
# [[5, 7, 9], [5, 7, 9]] (2 GPUs)
"""
if group is not None and not group.is_member():
return
if in_dygraph_mode():
op_type = _get_reduce_op(op, "all_reduce")
group = _get_default_group() if group is None else group
task = group.process_group.allreduce(tensor, op_type)
if sync_op:
task.wait()
return None
else:
return task
use_calc_stream = sync_op
ring_id = 0 if group is None else group.id
if _non_static_mode():
if op == ReduceOp.SUM:
return _legacy_C_ops.c_allreduce_sum_(tensor, 'use_calc_stream',
use_calc_stream, 'ring_id',
ring_id)
elif op == ReduceOp.MAX:
return _legacy_C_ops.c_allreduce_max_(tensor, 'use_calc_stream',
use_calc_stream, 'ring_id',
ring_id)
elif op == ReduceOp.MIN:
return _legacy_C_ops.c_allreduce_min_(tensor, 'use_calc_stream',
use_calc_stream, 'ring_id',
ring_id)
elif op == ReduceOp.PROD:
return _legacy_C_ops.c_allreduce_prod_(tensor, 'use_calc_stream',
use_calc_stream, 'ring_id',
ring_id)
else:
raise ValueError("Unknown parameter: {}.".format(op))
check_variable_and_dtype(tensor, 'tensor', [
'float16', 'float32', 'float64', 'int32', 'int64', 'int8', 'uint8',
'bool'
], 'all_reduce')
if op == ReduceOp.SUM:
op_type = 'c_allreduce_sum'
elif op == ReduceOp.MAX:
op_type = 'c_allreduce_max'
elif op == ReduceOp.MIN:
op_type = 'c_allreduce_min'
elif op == ReduceOp.PROD:
op_type = 'c_allreduce_prod'
if not isinstance(ring_id, int):
raise ValueError("The type of 'ring_id' for all_reduce should be int.")
helper = LayerHelper(op_type, **locals())
helper.append_op(type=op_type,
inputs={'X': [tensor]},
outputs={'Out': [tensor]},
attrs={
'ring_id': ring_id,
'use_calc_stream': use_calc_stream
})
def reduce(tensor, dst, op=ReduceOp.SUM, group=None, sync_op=True):
"""
......@@ -860,7 +710,7 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, sync_op=True):
Args:
tensor (Tensor): The output Tensor for the destination and the input Tensor otherwise. Its data type
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
dst (int): The destination rank id.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The operation used. Default value is ReduceOp.SUM.
group (Group, optional): The group instance return by new_group or None for global default group.
......@@ -968,7 +818,7 @@ def all_gather(tensor_list, tensor, group=None, sync_op=True):
Args:
tensor_list (list): A list of output Tensors. Every element in the list must be a Tensor whose data type
should be float16, float32, float64, int32, int64, int8, uint8, bool, complex64 or complex128.
should be float16, float32, float64, int32, int64, int8, uint8, bool, bfloat16, complex64 or complex128.
tensor (Tensor): The Tensor to send. Its data type
should be float16, float32, float64, int32, int64, int8, uint8, bool, complex64 or complex128.
group (Group, optional): The group instance return by new_group or None for global default group.
......@@ -1150,9 +1000,9 @@ def scatter(tensor, tensor_list=None, src=0, group=None, sync_op=True):
Args:
tensor (Tensor): The output Tensor. Its data type
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
tensor_list (list|tuple): A list/tuple of Tensors to scatter. Every element in the list must be a Tensor whose data type
should be float16, float32, float64, int32, int64, int8, uint8 or bool. Default value is None.
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. Default value is None.
src (int): The source rank id. Default value is 0.
group (Group, optional): The group instance return by new_group or None for global default group.
sync_op (bool, optional): Whether this op is a sync op. The default value is True.
......@@ -1247,7 +1097,7 @@ def alltoall(in_tensor_list, out_tensor_list, group=None, sync_op=True):
Args:
in_tensor_list (list): A list of input Tensors. Every element in the list must be a Tensor whose data type
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
out_tensor_list (list): A list of output Tensors. The data type of its elements should be the same as the
data type of the input Tensors.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
......@@ -1348,7 +1198,7 @@ def alltoall_single(in_tensor,
``alltoall_single`` is only supported in eager mode.
Args:
in_tensor (Tensor): Input tensor. The data type should be float16, float32, float64, int32, int64, int8, uint8 or bool.
in_tensor (Tensor): Input tensor. The data type should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
out_tensor (Tensor): Output Tensor. The data type should be the same as the data type of the input Tensor.
in_split_sizes (list[int], optional): Split sizes of ``in_tensor`` for dim[0]. If not given, dim[0] of ``in_tensor``
must be divisible by group size and ``in_tensor`` will be scattered averagely to all participators. Default: None.
......@@ -1437,7 +1287,7 @@ def send(tensor, dst=0, group=None, sync_op=True):
Args:
tensor (Tensor): The Tensor to send. Its data type
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
dst (int): The destination rank id.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
sync_op (bool, optional): Whether this op is a sync op. The default value is True.
......@@ -1503,7 +1353,7 @@ def recv(tensor, src=0, group=None, sync_op=True):
Args:
tensor (Tensor): The Tensor to receive. Its data type
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
src (int): The source rank id.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
sync_op (bool, optional): Whether this op is a sync op. The default value is True.
......@@ -1586,7 +1436,7 @@ def isend(tensor, dst, group=None):
Args:
tensor (Tensor): The Tensor to send. Its data type
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
dst (int): The destination rank.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
......@@ -1636,7 +1486,7 @@ def irecv(tensor, src=None, group=None):
Args:
tensor (Tensor): The Tensor to receive. Its data type
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
src (int): The source rank id.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
......@@ -1745,7 +1595,7 @@ def batch_isend_irecv(p2p_op_list):
corresponding tasks. NCCL are currently supported.
Args:
p2p_op_list: A list of point-to-point operations(type of each operator is
p2p_op_list (List[P2POp]): A list of point-to-point operations(type of each operator is
``paddle.distributed.P2POp``). The order of the isend/irecv in the list
matters and it needs to match with corresponding isend/irecv on the
remote end.
......@@ -1819,9 +1669,9 @@ def reduce_scatter(tensor,
Reduces, then scatters a list of tensors to all processes in a group
Args:
tensor (Tensor): Output tensor. Its data type should be float16, float32, float64, int32, int64, int8, uint8 or bool.
tensor (Tensor): Output tensor. Its data type should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
tensor_list (list[Tensor]): List of tensors to reduce and scatter. Every element in the list must be a Tensor whose data type
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD): Optional. The operation used. Default: ReduceOp.SUM.
group (Group, optional): The group instance return by new_group or None for global
default group. Default: None.
......@@ -1887,9 +1737,9 @@ def _reduce_scatter_base(output,
Reduces, then scatters a flattened tensor to all processes in a group.
Args:
output (Tensor): Output tensor. Its data type should be float16, float32, float64, int32, int64, int8, uint8 or bool.
input (Tensor): Input tensor that is of size output tensor size times world size. Its data type
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
output (Tensor): Output tensor. Its data type should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
input (Tensor): Input tensor that is of size output tensor size times world size. Its data type
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD): Optional. The operation used. Default: ReduceOp.SUM.
group (ProcessGroup, optional): The process group to work on. If None,
the default process group will be used.
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.fluid.framework as framework
from paddle.distributed.communication import stream as stream
from paddle.distributed.communication.reduce import ReduceOp
def all_reduce(tensor, op=ReduceOp.SUM, group=None, sync_op=True):
"""
Reduce a tensor over all ranks so that all get the result.
As shown below, one process is started with a GPU and the data of this process is represented
by its group rank. The reduce operator is sum. Through all_reduce operator,
each GPU will have the sum of the data from all GPUs.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/allreduce.png
:width: 800
:alt: all_reduce
:align: center
Args:
tensor (Tensor): The input Tensor. It also works as the output Tensor. Its data type
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The operation used. Default value is ReduceOp.SUM.
group (Group, optional): The group instance return by new_group or None for global default group.
sync_op (bool, optional): Wether this op is a sync op. Default value is True.
Returns:
Return a task object.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
if dist.get_rank() == 0:
data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
else:
data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
dist.all_reduce(data)
print(data)
# [[5, 7, 9], [5, 7, 9]] (2 GPUs)
"""
if not framework._in_legacy_dygraph():
return stream.all_reduce(tensor,
op=op,
group=group,
sync_op=sync_op,
use_calc_stream=False)
# code below will be removed after we remove the old dygraph
use_calc_stream = sync_op
ring_id = 0 if group is None else group.id
if op == ReduceOp.SUM:
return paddle._legacy_C_ops.c_allreduce_sum_(tensor, 'use_calc_stream',
use_calc_stream, 'ring_id',
ring_id)
elif op == ReduceOp.MAX:
return paddle._legacy_C_ops.c_allreduce_max_(tensor, 'use_calc_stream',
use_calc_stream, 'ring_id',
ring_id)
elif op == ReduceOp.MIN:
return paddle._legacy_C_ops.c_allreduce_min_(tensor, 'use_calc_stream',
use_calc_stream, 'ring_id',
ring_id)
elif op == ReduceOp.PROD:
return paddle._legacy_C_ops.c_allreduce_prod_(tensor, 'use_calc_stream',
use_calc_stream,
'ring_id', ring_id)
else:
raise ValueError("Unknown parameter: {}.".format(op))
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class Group():
"""
The abstract representation of group.
"""
def __init__(self, rank_in_group, id, ranks, pg=None, name=None):
self._rank_in_group = rank_in_group
self._world_size = len(ranks) if rank_in_group >= 0 else -1
self._id = id
self._ranks = ranks
self._pg = pg
self._name = name
@property
def rank(self):
return self._rank_in_group
@property
def ranks(self):
return self._ranks
@property
def nranks(self):
return len(self._ranks)
@property
def name(self):
return self._name
@property
def process_group(self):
return self._pg
@property
def world_size(self):
return self._world_size
@property
def id(self):
return self._id
def is_member(self):
if self.rank < 0:
return False
if self.nranks < 2:
return False
return True
def get_group_rank(self, rank):
if self.is_member():
return self.ranks.index(rank)
else:
return -1
def __repr__(self):
debug_str = "rank: {}, nranks: {}, id: {}, ranks: ".format(
self.rank, self.nranks, self.id)
debug_str += ", ".join(map(str, self.ranks))
debug_str += "; name: "
debug_str += self.name if self.name else "None"
return debug_str
class _GroupManager():
global_group_id = 0
group_map_by_id = {}
def _get_global_group():
if _GroupManager.global_group_id not in _GroupManager.group_map_by_id:
raise RuntimeError("The global group is not initialized.")
return _GroupManager.group_map_by_id[_GroupManager.global_group_id]
def _add_new_group(group):
if group.id in _GroupManager.group_map_by_id:
raise RuntimeError("The group with id {} already exist.".format(
group.id))
_GroupManager.group_map_by_id[group.id] = group
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.framework as framework
import paddle.fluid.core as core
class ReduceOp:
"""
......@@ -48,3 +51,26 @@ class ReduceOp:
MIN = 2
PROD = 3
AVG = 4
def _get_reduce_op(reduce_op, func_name):
if framework.in_dygraph_mode():
if reduce_op == ReduceOp.SUM:
return core.ReduceOp.SUM
elif reduce_op == ReduceOp.MAX:
return core.ReduceOp.MAX
elif reduce_op == ReduceOp.MIN:
return core.ReduceOp.MIN
elif reduce_op == ReduceOp.PROD:
return core.ReduceOp.PRODUCT
else:
if reduce_op == ReduceOp.SUM:
return 'c_allreduce_sum'
elif reduce_op == ReduceOp.MAX:
return 'c_allreduce_max'
elif reduce_op == ReduceOp.MIN:
return 'c_allreduce_min'
elif reduce_op == ReduceOp.PROD:
return 'c_allreduce_prod'
raise ValueError("Unknown reduce_op type for {}.".format(func_name))
......@@ -12,6 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .all_gather import all_gather
from .all_reduce import all_reduce
from .alltoall import alltoall
from .alltoall_single import alltoall_single
from .broadcast import broadcast
from .reduce import reduce
from .reduce_scatter import reduce_scatter
from .recv import recv
from .scatter import scatter
from .send import send
__all__ = ["all_reduce"]
__all__ = [
"all_gather", "all_reduce", "alltoall", "alltoall_single", "broadcast",
"reduce", "reduce_scatter", "recv", "scatter", "send"
]
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.fluid.framework as framework
from paddle.distributed import collective
def _check_tensor_shape(tensor, shape, nranks=1):
expect_shape = list(shape)
expect_shape[0] *= nranks
if list(tensor.shape) != expect_shape:
raise RuntimeError('The tensor for all_gather is not correctly-sized.')
def _check_tensor_list_shape(tensor_list, shape, nranks=1):
if len(tensor_list) != nranks:
raise RuntimeError(
'The tensor_list for all_gather is not correctly-sized.')
for tensor in tensor_list:
if tensor.shape != shape:
raise RuntimeError(
'The tensor_list for all_gather is not correctly-sized.')
def _all_gather_into_tensor_in_dygraph(out_tensor, in_tensor, group, sync_op,
use_calc_stream):
group = collective._get_default_group() if group is None else group
_check_tensor_shape(out_tensor, in_tensor.shape, group.nranks)
if use_calc_stream:
return group.process_group.allgather_into_tensor_on_calc_stream(
in_tensor, out_tensor)
task = group.process_group.allgather_into_tensor(in_tensor, out_tensor,
sync_op)
if sync_op:
task.wait()
return task
def _all_gather_in_dygraph(tensor_list, tensor, group, sync_op,
use_calc_stream):
group = collective._get_default_group() if group is None else group
if len(tensor_list) == 0:
tensor_list += [paddle.empty_like(tensor) for _ in range(group.nranks)]
else:
_check_tensor_list_shape(tensor_list, tensor.shape, group.nranks)
if use_calc_stream:
return group.process_group.allgather_on_calc_stream(tensor, tensor_list)
task = group.process_group.allgather(tensor, tensor_list, sync_op)
if sync_op:
task.wait()
return task
def all_gather(tensor_or_tensor_list,
tensor,
group=None,
sync_op=True,
use_calc_stream=False):
"""
Gather tensors across devices to a correctly-sized tensor or a tensor list.
Args:
tensor_or_tensor_list (Union[Tensor, List[Tensor]]): The output. If it is a tensor, it should be correctly-sized. If it is a list, it
should be empty or contain correctly-sized tensors.
tensor (Tensor): The input tensor on each rank. The result will overwrite this tenor after communication. Support
float16, float32, float64, int32, int64, int8, uint8 or bool as the input data type.
group (Group, optional): Communicate in which group. If none is given, use the global group as default.
sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default.
use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This
option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning.
Returns:
Return a task object.
Warning:
This API only supports the dygraph mode now.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
local_rank = dist.get_rank()
tensor_list = []
if local_rank == 0:
data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
else:
data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
task = dist.stream.all_gather(tensor_list, data, sync_op=False)
task.wait()
print(tensor_list)
# [[[4, 5, 6], [4, 5, 6]], [[1, 2, 3], [1, 2, 3]]] (2 GPUs)
"""
if group is not None and not group.is_member():
raise RuntimeError(
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream:
raise RuntimeError(
"use_calc_stream can only be true in sync op behavior.")
if framework.in_dygraph_mode():
if paddle.is_tensor(tensor_or_tensor_list):
return _all_gather_into_tensor_in_dygraph(tensor_or_tensor_list,
tensor, group, sync_op,
use_calc_stream)
else:
return _all_gather_in_dygraph(tensor_or_tensor_list, tensor, group,
sync_op, use_calc_stream)
raise RuntimeError(
"paddle.distributed.stream.all_gather is only supported in dygraph mode now."
)
......@@ -13,12 +13,16 @@
# limitations under the License.
import paddle.fluid.framework as framework
from ...collective import _get_default_group, _get_reduce_op, ReduceOp
import paddle.fluid.data_feeder as data_feeder
import paddle.fluid.layer_helper as layer_helper
from paddle.distributed.communication.reduce import _get_reduce_op, ReduceOp
from paddle.distributed.communication.group import _get_global_group
def _all_reduce_in_dygraph(tensor, op, group, sync_op, use_calc_stream):
op_type = _get_reduce_op(op, "all_reduce")
group = _get_default_group() if group is None else group
group = _get_global_group() if group is None else group
if use_calc_stream:
return group.process_group.allreduce_on_calc_stream(tensor, op_type)
......@@ -29,6 +33,32 @@ def _all_reduce_in_dygraph(tensor, op, group, sync_op, use_calc_stream):
return task
def _all_reduce_in_static_mode(tensor, op, group, sync_op, use_calc_stream):
data_feeder.check_variable_and_dtype(tensor, 'tensor', [
'float16', 'float32', 'float64', 'int32', 'int64', 'int8', 'uint8',
'bool'
], 'all_reduce')
op_type = _get_reduce_op(op, "all_reduce")
ring_id = 0 if group is None else group.id
if not isinstance(ring_id, int):
raise ValueError("The type of 'ring_id' for all_reduce should be int.")
# TODO: Support task and use task.wait in static mode
# Use use_calc_stream rather than sync_op
helper = layer_helper.LayerHelper(op_type, **locals())
helper.append_op(type=op_type,
inputs={'X': [tensor]},
outputs={'Out': [tensor]},
attrs={
'ring_id': ring_id,
'use_calc_stream': sync_op
})
return None
def all_reduce(tensor,
op=ReduceOp.SUM,
group=None,
......@@ -40,8 +70,8 @@ def all_reduce(tensor,
Args:
tensor (Tensor): The input tensor on each rank. The result will overwrite this tenor after communication. Support
float16, float32, float64, int32 or int64 as the input data type.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.Min|ReduceOp.PROD, optional): The reduction used. If none is given, use ReduceOp.SUM as default.
float16, float32, float64, int32, int64, int8, uint8 or bool as the input data type.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The reduction used. If none is given, use ReduceOp.SUM as default.
group (Group, optional): Communicate in which group. If none is given, use the global group as default.
sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default.
use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This
......@@ -50,9 +80,6 @@ def all_reduce(tensor,
Returns:
Return a task object.
Warning:
This API only supports the dygraph mode now.
Examples:
.. code-block:: python
......@@ -84,7 +111,6 @@ def all_reduce(tensor,
if framework.in_dygraph_mode():
return _all_reduce_in_dygraph(tensor, op, group, sync_op,
use_calc_stream)
raise RuntimeError(
"paddle.distributed.stream.all_reduce is only supported in dygraph mode now."
)
else:
return _all_reduce_in_static_mode(tensor, op, group, sync_op,
use_calc_stream)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.fluid.framework as framework
from paddle.distributed import collective
def _check_tensor_shape(tensor, shape, nranks=1):
if tensor.shape != shape:
raise RuntimeError('The tensor for alltoall is not correctly-sized.')
def _check_tensor_list_shape(tensor_list, shape, nranks=1):
if len(tensor_list) != nranks:
raise RuntimeError(
'The tensor_list for alltoall is not correctly-sized.')
for tensor in tensor_list:
if tensor.shape != shape:
raise RuntimeError(
'The tensor_list for alltoall is not correctly-sized.')
def _alltoall_tensor_in_dygraph(out_tensor, in_tensor, group, sync_op,
use_calc_stream):
group = collective._get_default_group() if group is None else group
_check_tensor_shape(out_tensor, in_tensor.shape, group.nranks)
if use_calc_stream:
return group.process_group.alltoall_tensor_on_calc_stream(
in_tensor, out_tensor)
task = group.process_group.alltoall_tensor(in_tensor, out_tensor, sync_op)
if sync_op:
task.wait()
return task
def _alltoall_in_dygraph(out_tensor_list, in_tensor_list, group, sync_op,
use_calc_stream):
group = collective._get_default_group() if group is None else group
if len(in_tensor_list) == 0:
raise RuntimeError("The input tensor_list should not be empty.")
if len(out_tensor_list) == 0:
out_tensor_list += [
paddle.empty_like(tensor) for tensor in in_tensor_list
]
else:
_check_tensor_list_shape(out_tensor_list, in_tensor_list[0].shape,
group.nranks)
if use_calc_stream:
return group.process_group.alltoall_on_calc_stream(
in_tensor_list, out_tensor_list)
task = group.process_group.alltoall(in_tensor_list, out_tensor_list,
sync_op)
if sync_op:
task.wait()
return task
def alltoall(out_tensor_or_tensor_list,
in_tensor_or_tensor_list,
group=None,
sync_op=True,
use_calc_stream=False):
"""
Scatter a tensor (or a tensor list) across devices and gather outputs to another tensor (or a tensor list, respectively).
Args:
out_tensor_or_tensor_list (Union[Tensor, List[Tensor]]): The output. If it is a tensor, it should be correctly-sized.
If it is a list, it should be empty or contain correctly-sized tensors. Its data type should be the same as the input.
in_tensor_or_tensor_list (Union[Tensor, List[Tensor]]): The input to scatter (must be specified on the source rank).
If it is a tensor, it should be correctly-sized. If it is a list, it should contain correctly-sized tensors. Support
float16, float32, float64, int32, int64, int8, uint8 or bool as the input data type.
group (Group, optional): Communicate in which group. If none is given, use the global group as default.
sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default.
use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This
option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning.
Returns:
Return a task object.
Warning:
This API only supports the dygraph mode now.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
out_tensor_list = []
if dist.get_rank() == 0:
data1 = paddle.to_tensor([[1, 2, 3], [4, 5, 6]])
data2 = paddle.to_tensor([[7, 8, 9], [10, 11, 12]])
else:
data1 = paddle.to_tensor([[13, 14, 15], [16, 17, 18]])
data2 = paddle.to_tensor([[19, 20, 21], [22, 23, 24]])
task = dist.stream.alltoall(out_tensor_list, [data1, data2], sync_op=False)
task.wait()
print(out_tensor_list)
# [[[1, 2, 3], [4, 5, 6]], [[13, 14, 15], [16, 17, 18]]] (2 GPUs, out for rank 0)
# [[[7, 8, 9], [10, 11, 12]], [[19, 20, 21], [22, 23, 24]]] (2 GPUs, out for rank 1)
"""
if group is not None and not group.is_member():
raise RuntimeError(
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream:
raise RuntimeError(
"use_calc_stream can only be true in sync op behavior.")
if out_tensor_or_tensor_list is None:
raise RuntimeError("The output should be specified.")
if in_tensor_or_tensor_list is None:
raise RuntimeError("The input should be specified.")
if framework.in_dygraph_mode():
out_is_tensor = paddle.is_tensor(out_tensor_or_tensor_list)
in_is_tensor = paddle.is_tensor(in_tensor_or_tensor_list)
if out_is_tensor and in_is_tensor:
return _alltoall_tensor_in_dygraph(out_tensor_or_tensor_list,
in_tensor_or_tensor_list, group,
sync_op, use_calc_stream)
elif not out_is_tensor and not in_is_tensor:
return _alltoall_in_dygraph(out_tensor_or_tensor_list,
in_tensor_or_tensor_list, group,
sync_op, use_calc_stream)
else:
raise RuntimeError(
"The output and input should be both tensor or tensor list.")
raise RuntimeError(
"paddle.distributed.stream.alltoall is only supported in dygraph mode now."
)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.framework as framework
from paddle.distributed import collective
def _alltoall_single_in_dygraph(out_tensor, in_tensor, out_split_sizes,
in_split_sizes, group, sync_op,
use_calc_stream):
group = collective._get_default_group() if group is None else group
if out_split_sizes is None:
out_split_sizes = []
if in_split_sizes is None:
in_split_sizes = []
if use_calc_stream:
return group.process_group.alltoall_single_on_calc_stream(
in_tensor, out_tensor, in_split_sizes, out_split_sizes)
task = group.process_group.alltoall_single(in_tensor, out_tensor,
in_split_sizes, out_split_sizes,
sync_op)
if sync_op:
task.wait()
return task
def alltoall_single(out_tensor,
in_tensor,
out_split_sizes=None,
in_split_sizes=None,
group=None,
sync_op=True,
use_calc_stream=False):
"""
Split and Scatter the splitted input tensor to the out tensor across devices.
Args:
out_tensor(Tensor): The output tensor. Its data type should be the same as the input.
in_tensor (Tensor): The input tensor. Its data type should be float16, float32, float64, int32, int64, int8, uint8 or bool.
out_split_sizes (List[int], optional): Split sizes of out_tensor for dim[0]. If not given, dim[0] of out_tensor must be divisible
by group size and out_tensor will be gathered averagely from all participators. If none is given, use a empty list as default.
in_split_sizes (List[int], optional): Split sizes of in_tensor for dim[0]. If not given, dim[0] of in_tensor must be divisible
by group size and in_tensor will be scattered averagely to all participators. If none is given, use a empty list as default.
group (Group, optional): Communicate in which group. If none is given, use the global group as default.
sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default.
use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This
option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning.
Returns:
Return a task object.
Warning:
This API only supports the dygraph mode now.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
local_rank = dist.get_rank()
# case 1
output = paddle.empty([2], dtype="int64")
if local_rank == 0:
data = paddle.to_tensor([0, 1])
else:
data = paddle.to_tensor([2, 3])
task = dist.stream.alltoall_single(output, data, sync_op=False)
task.wait()
out = output.numpy()
# [0, 2] (2 GPUs, out for rank 0)
# [1, 3] (2 GPUs, out for rank 1)
# case 2
size = dist.get_world_size()
output = paddle.empty([(local_rank + 1) * size, size], dtype='float32')
if local_rank == 0:
data = paddle.to_tensor([[0., 0.], [0., 0.], [0., 0.]])
else:
data = paddle.to_tensor([[1., 1.], [1., 1.], [1., 1.]])
out_split_sizes = [local_rank + 1 for i in range(size)]
in_split_sizes = [i + 1 for i in range(size)]
task = dist.stream.alltoall_single(output,
data,
out_split_sizes,
in_split_sizes,
sync_op=False)
task.wait()
out = output.numpy()
# [[0., 0.], [1., 1.]] (2 GPUs, out for rank 0)
# [[0., 0.], [0., 0.], [1., 1.], [1., 1.]] (2 GPUs, out for rank 1)
"""
if group is not None and not group.is_member():
raise RuntimeError(
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream:
raise RuntimeError(
"use_calc_stream can only be true in sync op behavior.")
if framework.in_dygraph_mode():
return _alltoall_single_in_dygraph(out_tensor, in_tensor,
out_split_sizes, in_split_sizes,
group, sync_op, use_calc_stream)
raise RuntimeError(
"paddle.distributed.stream.alltoall_single is only supported in dygraph mode now."
)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.framework as framework
from paddle.distributed import collective
def _broadcast_in_dygraph(tensor, src, group, sync_op, use_calc_stream):
group = collective._get_default_group() if group is None else group
if use_calc_stream:
return group.process_group.broadcast_on_calc_stream(tensor, src)
task = group.process_group.broadcast(tensor, src, sync_op)
if sync_op:
task.wait()
return task
def broadcast(tensor, src=0, group=None, sync_op=True, use_calc_stream=False):
"""
Broadcast a tensor to all devices.
Args:
tensor (Tensor): The tensor to broadcast. Support float16, float32, float64, int32, int64, int8, uint8 or bool as its data type.
src (int, optional): Rank of the source device. If none is given, use `0` as default.
group (Group, optional): Communicate in which group. If none is given, use the global group as default.
sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default.
use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This
option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning.
Returns:
Return a task object.
Warning:
This API only supports the dygraph mode now.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
local_rank = dist.get_rank()
if local_rank == 0:
data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
else:
data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
task = dist.stream.broadcast(data, src=1, sync_op=False)
task.wait()
out = data.numpy()
# [[1, 2, 3], [1, 2, 3]] (2 GPUs)
"""
if group is not None and not group.is_member():
raise RuntimeError(
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream:
raise RuntimeError(
"use_calc_stream can only be True in sync op behavior.")
if framework.in_dygraph_mode():
return _broadcast_in_dygraph(tensor, src, group, sync_op,
use_calc_stream)
raise RuntimeError(
"paddle.distributed.stream.broadcast is only supported in dygraph mode now."
)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.distributed.collective as collective
import paddle.fluid.framework as framework
def _recv_in_dygraph(tensor, src, group, sync_op, use_calc_stream):
group = collective._get_default_group() if group is None else group
if use_calc_stream:
return group.process_group.recv_on_calc_stream(tensor, src)
task = group.process_group.recv(tensor, src, sync_op)
if sync_op:
task.wait()
return task
def recv(tensor, src=0, group=None, sync_op=True, use_calc_stream=False):
"""
Receive a tensor from the source device.
Args:
tensor (Tensor): The tensor to receive. Support float16, float32, float64, int32, int64, int8, uint8 or bool as its data type.
src (int, optional): Rank of the source device. If none is given, use `0` as default.
group (Group, optional): Communicate in which group. If none is given, use the global group as default.
sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default.
use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This
option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning.
Returns:
Return a task object.
Warning:
This API only supports the dygraph mode now.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
local_rank = dist.get_rank()
if local_rank == 0:
data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
task = dist.stream.send(data, dst=1, sync_op=False)
else:
data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
task = dist.stream.recv(data, src=0, sync_op=False)
task.wait()
out = data.numpy()
# [[4, 5, 6], [4, 5, 6]] (2 GPUs)
"""
if group is not None and not group.is_member():
raise RuntimeError(
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream:
raise RuntimeError(
"use_calc_stream can only be True in sync op behavior.")
if framework.in_dygraph_mode():
return _recv_in_dygraph(tensor, src, group, sync_op, use_calc_stream)
raise RuntimeError(
"paddle.distributed.stream.recv is only supported in dygraph mode now.")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.framework as framework
from paddle.distributed.communication.group import _get_global_group
from paddle.distributed.communication.reduce import _get_reduce_op, ReduceOp
def _reduce_in_dygraph(tensor, dst, op, group, sync_op, use_calc_stream):
op_type = _get_reduce_op(op, "reduce")
group = _get_global_group() if group is None else group
if use_calc_stream:
return group.process_group.reduce_on_calc_stream(tensor, dst, op_type)
task = group.process_group.reduce(tensor, dst, op_type, sync_op)
if sync_op:
task.wait()
return task
def reduce(tensor,
dst=0,
op=ReduceOp.SUM,
group=None,
sync_op=True,
use_calc_stream=False):
"""
Perform specific reduction (for example, sum, max) on a tensor across devices and send to the destintion device.
Args:
tensor (Tensor): The input tensor on each rank. The result will overwrite this tenor after communication. Support
float16, float32, float64, int32, int64, int8, uint8 or bool as the input data type.
dst (int, optional): Rank of the destination device. If none is given, use `0` as default.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The reduction used. If none is given, use ReduceOp.SUM as default.
group (Group, optional): Communicate in which group. If none is given, use the global group as default.
sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default.
use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This
option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning.
Returns:
Return a task object.
Warning:
This API only supports the dygraph mode now.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
local_rank = dist.get_rank()
if local_rank == 0:
data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
else:
data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
task = dist.stream.reduce(data, dst=0, sync_op=False)
task.wait()
out = data.numpy()
# [[5, 7, 9], [5, 7, 9]] (2 GPUs, out for rank 0)
# [[1, 2, 3], [1, 2, 3]] (2 GPUs, out for rank 1)
"""
if group is not None and not group.is_member():
raise RuntimeError(
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream:
raise RuntimeError(
"use_calc_stream can only be true in sync op behavior.")
if framework.in_dygraph_mode():
return _reduce_in_dygraph(tensor, dst, op, group, sync_op,
use_calc_stream)
raise RuntimeError(
"paddle.distributed.stream.reduce is only supported in dygraph mode now."
)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.distributed as dist
import paddle.fluid.framework as framework
from paddle.distributed.communication.group import _get_global_group
from paddle.distributed.communication.reduce import _get_reduce_op, ReduceOp
def _check_tensor_shape(tensor, shape, nranks=1):
expect_shape = list(shape)
expect_shape[0] //= nranks
if list(tensor.shape) != expect_shape:
raise RuntimeError(
"The in_tensor for reduce_scatter is not correctly-sized.")
def _check_tensor_list_shape(tensor_list, shape, nranks=1):
if len(tensor_list) != nranks:
raise RuntimeError(
f"The tensor_list for reduce_scatter is not correctly-sized.")
for tensor in tensor_list:
if tensor.shape != shape:
raise RuntimeError(
f"The tensor_list for reduce_scatter is not correctly-sized.")
def _reduce_scatter_tensor_in_dygraph(out_tensor,
in_tensor,
op,
group,
sync_op,
use_calc_stream,
caller="reduce_scatter"):
op_type = _get_reduce_op(op, caller)
group = _get_global_group() if group is None else group
_check_tensor_shape(out_tensor, in_tensor.shape, group.nranks)
if use_calc_stream:
return group.process_group.reduce_scatter_tensor_on_calc_stream(
in_tensor, out_tensor, op_type)
task = group.process_group.reduce_scatter_tensor(in_tensor, out_tensor,
op_type, sync_op)
if sync_op:
task.wait()
return task
def _reduce_scatter_in_dygraph(tensor, tensor_list, op, group, sync_op,
use_calc_stream):
op_type = _get_reduce_op(op, "reduce_scatter")
group = _get_global_group() if group is None else group
_check_tensor_list_shape(tensor_list, tensor.shape, group.nranks)
if use_calc_stream:
return group.process_group.reduce_scatter_on_calc_stream(
tensor_list, tensor, op_type)
task = group.process_group.reduce_scatter(tensor_list, tensor, op_type,
sync_op)
if sync_op:
task.wait()
return task
def reduce_scatter(tensor,
tensor_or_tensor_list,
op=ReduceOp.SUM,
group=None,
sync_op=True,
use_calc_stream=False):
"""
Reduce, then scatter a tensor (or a tensor list) across devices.
Args:
tensor (Tensor): The output tensor on each rank. The result will overwrite this tenor after communication. Support
float16, float32, float64, int32, int64, int8, uint8 or bool as the input data type.
tensor_list (List[Tensor]]): The input to scatter.
If it is a tensor, it should be correctly-sized. If it is a list, it should contain correctly-sized tensors.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The reduction used. If none is given, use ReduceOp.SUM as default.
group (Group, optional): Communicate in which group. If none is given, use the global group as default.
sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default.
use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This
option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning.
Returns:
Return a task object.
Warning:
This API only supports the dygraph mode now.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
if dist.get_rank() == 0:
data1 = paddle.to_tensor([0, 1])
data2 = paddle.to_tensor([2, 3])
else:
data1 = paddle.to_tensor([4, 5])
data2 = paddle.to_tensor([6, 7])
dist.stream.reduce_scatter(data1, [data1, data2])
out = data1.numpy()
# [4, 6] (2 GPUs, out for rank 0)
# [8, 10] (2 GPUs, out for rank 1)
"""
if group is not None and not group.is_member():
raise RuntimeError(
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream:
raise RuntimeError(
"use_calc_stream can only be true in sync op behavior.")
if framework.in_dygraph_mode():
if paddle.is_tensor(tensor_or_tensor_list):
return _reduce_scatter_tensor_in_dygraph(tensor,
tensor_or_tensor_list, op,
group, sync_op,
use_calc_stream)
else:
return _reduce_scatter_in_dygraph(tensor, tensor_or_tensor_list, op,
group, sync_op, use_calc_stream)
raise RuntimeError(
"paddle.distributed.stream.reduce_scatter is only supported in dygraph mode now."
)
def _reduce_scatter_base(out_tensor,
in_tensor,
op=ReduceOp.SUM,
group=None,
sync_op=True,
use_calc_stream=False):
"""
Reduce, then scatter a flattened tensor across devices.
Args:
out_tensor (Tensor): The output tensor on each rank. The result will overwrite this tenor after communication. Support
float16, float32, float64, int32 or int64 as the input data type.
in_tensor (Tensor): The input tensor to reduce and scatter.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The reduction used. If none is given, use ReduceOp.SUM as default.
group (Group, optional): Communicate in which group. If none is given, use the global group as default.
sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default.
use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This
option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning.
Returns:
Return a task object.
Warning:
This API will be deprecated in the future, and only supports the dygraph mode now.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
if dist.get_rank() == 0:
data1 = paddle.to_tensor([7, 8, 9])
data2 = paddle.to_tensor([10, 11, 12])
dist.stream.scatter(data1, src=1)
else:
data1 = paddle.to_tensor([1, 2, 3])
data2 = paddle.to_tensor([4, 5, 6])
dist.stream.scatter(data1, [data1, data2], src=1)
out = data1.numpy()
# [1, 2, 3] (2 GPUs, out for rank 0)
# [4, 5, 6] (2 GPUs, out for rank 1)
"""
if group is not None and not group.is_member():
raise RuntimeError(
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream:
raise RuntimeError(
"use_calc_stream can only be true in sync op behavior.")
if framework.in_dygraph_mode():
return _reduce_scatter_tensor_in_dygraph(out_tensor, in_tensor, op,
group, sync_op,
use_calc_stream,
"_reduce_scatter_base")
raise RuntimeError(
"paddle.distributed.stream._reduce_scatter_base is only supported in dygraph mode now."
)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.distributed as dist
import paddle.fluid.framework as framework
from paddle.distributed import collective
def _check_tensor_shape(tensor, shape, nranks=1):
expect_shape = list(shape)
expect_shape[0] //= nranks
if list(tensor.shape) != expect_shape:
raise RuntimeError("The in_tensor for scatter is not correctly-sized.")
def _check_tensor_list_shape(tensor_list, shape, nranks=1):
if len(tensor_list) != nranks:
raise RuntimeError(
f"The tensor_list for scatter is not correctly-sized.")
for tensor in tensor_list:
if tensor.shape != shape:
raise RuntimeError(
f"The tensor_list for scatter is not correctly-sized.")
def _scatter_tensor_in_dygraph(out_tensor, in_tensor, src, group, sync_op,
use_calc_stream):
group = collective._get_default_group() if group is None else group
src_rank = group.get_group_rank(src)
if src_rank == -1:
raise RuntimeError("Src rank out of group.")
nranks = group.nranks
rank = dist.get_rank()
if rank == src_rank:
_check_tensor_shape(out_tensor, in_tensor.shape, nranks)
if use_calc_stream:
return group.process_group.scatter_tensor_on_calc_stream(
in_tensor, out_tensor, src)
task = group.process_group.scatter_tensor(in_tensor, out_tensor, src,
sync_op)
if sync_op:
task.wait()
return task
def _scatter_in_dygraph(tensor, tensor_list, src, group, sync_op,
use_calc_stream):
group = collective._get_default_group() if group is None else group
src_rank = group.get_group_rank(src)
if src_rank == -1:
raise RuntimeError("Src rank out of group.")
nranks = group.nranks
rank = dist.get_rank()
if rank == src_rank:
if len(tensor_list) == 0:
raise RuntimeError(
"The tensor_list should not be empty on src rank.")
_check_tensor_list_shape(tensor_list, tensor.shape, nranks)
else:
tensor_list = [tensor for _ in range(nranks)]
if use_calc_stream:
return group.process_group.scatter_on_calc_stream(
tensor_list, tensor, src)
task = group.process_group.scatter(tensor_list, tensor, src, sync_op)
if sync_op:
task.wait()
return task
def scatter(tensor,
tensor_or_tensor_list=None,
src=0,
group=None,
sync_op=True,
use_calc_stream=False):
"""
Scatter a tensor (or a tensor list) across devices.
Args:
tensor (Tensor): The output tensor on each rank. The result will overwrite this tenor after communication. Support
float16, float32, float64, int32, int64, int8, uint8 or bool as the input data type.
tensor_or_tensor_list (Union[Tensor, List[Tensor]]): The input to scatter (default is `None`, must be specified on the source rank).
If it is a tensor, it should be correctly-sized. If it is a list, it should contain correctly-sized tensors.
src (int, optional): Rank of the source device. If none is given, use `0` as default.
group (Group, optional): Communicate in which group. If none is given, use the global group as default.
sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default.
use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This
option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning.
Returns:
Return a task object.
Warning:
This API only supports the dygraph mode now.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
if dist.get_rank() == 0:
data1 = paddle.to_tensor([7, 8, 9])
data2 = paddle.to_tensor([10, 11, 12])
dist.stream.scatter(data1, src=1)
else:
data1 = paddle.to_tensor([1, 2, 3])
data2 = paddle.to_tensor([4, 5, 6])
dist.stream.scatter(data1, [data1, data2], src=1)
out = data1.numpy()
# [1, 2, 3] (2 GPUs, out for rank 0)
# [4, 5, 6] (2 GPUs, out for rank 1)
"""
if group is not None and not group.is_member():
raise RuntimeError(
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream:
raise RuntimeError(
"use_calc_stream can only be true in sync op behavior.")
if tensor_or_tensor_list is None:
raise RuntimeError("The input should be specified.")
if framework.in_dygraph_mode():
if paddle.is_tensor(tensor_or_tensor_list):
return _scatter_tensor_in_dygraph(tensor, tensor_or_tensor_list,
src, group, sync_op,
use_calc_stream)
else:
return _scatter_in_dygraph(tensor, tensor_or_tensor_list, src,
group, sync_op, use_calc_stream)
raise RuntimeError(
"paddle.distributed.stream.scatter is only supported in dygraph mode now."
)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.distributed.collective as collective
import paddle.fluid.framework as framework
def _send_in_dygraph(tensor, dst, group, sync_op, use_calc_stream):
group = collective._get_default_group() if group is None else group
if use_calc_stream:
return group.process_group.send_on_calc_stream(tensor, dst)
task = group.process_group.send(tensor, dst, sync_op)
if sync_op:
task.wait()
return task
def send(tensor, dst=0, group=None, sync_op=True, use_calc_stream=False):
"""
Send a tensor to the destination device.
Args:
tensor (Tensor): The tensor to send. Support float16, float32, float64, int32, int64, int8, uint8 or bool as its data type.
dst (int, optional): Rank of the destination device. If none is given, use `0` as default.
group (Group, optional): Communicate in which group. If none is given, use the global group as default.
sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default.
use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This
option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning.
Returns:
Return a task object.
Warning:
This API only supports the dygraph mode now.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
local_rank = dist.get_rank()
if local_rank == 0:
data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
task = dist.stream.send(data, dst=1, sync_op=False)
else:
data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
task = dist.stream.recv(data, src=0, sync_op=False)
task.wait()
out = data.numpy()
# [[4, 5, 6], [4, 5, 6]] (2 GPUs)
"""
if group is not None and not group.is_member():
raise RuntimeError(
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream:
raise RuntimeError(
"use_calc_stream can only be True in sync op behavior.")
if framework.in_dygraph_mode():
return _send_in_dygraph(tensor, dst, group, sync_op, use_calc_stream)
raise RuntimeError(
"paddle.distributed.stream.send is only supported in dygraph mode now.")
......@@ -378,8 +378,8 @@ class _CommunicateGroup(object):
def set_comm_group(self, group_name, group_rank, group_size, ring_id,
group_ranks):
group = paddle.distributed.collective.Group(group_rank, group_size,
ring_id, group_ranks)
group = paddle.distributed.collective.Group(group_rank, ring_id,
group_ranks)
self.groups[group_name] = group
def get_group(self, group_name):
......
......@@ -22,7 +22,7 @@ from paddle.fluid.layer_helper import LayerHelper
from paddle.fluid.data_feeder import check_variable_and_dtype
from paddle.fluid.dygraph import layers
from paddle.distributed import collective
from ....communication.comm_utils import ReduceOp
from ....communication.reduce import ReduceOp
from paddle.fluid.data_feeder import check_dtype
import paddle.fluid.dygraph_utils as dygraph_utils
......
......@@ -43,6 +43,7 @@ from paddle.distributed.collective import _set_default_store
from paddle.distributed.collective import _new_process_group_impl
from paddle.distributed.collective import Group
from paddle.distributed.collective import _set_group_map_backend
from paddle.distributed.communication.group import _add_new_group
__all__ = []
......@@ -258,15 +259,11 @@ def init_parallel_env():
_default_group_name,
pg_options=None)
ranks = list(range(world_size))
group = Group(rank,
world_size,
id=0,
ranks=ranks,
pg=pg,
name=_default_group_name)
group = Group(rank, 0, ranks, pg=pg, name=_default_group_name)
_set_group_map_by_name(_default_group_name, group)
_set_group_map(0, group)
_set_group_map_backend(group, backend)
_add_new_group(group)
parallel_helper._set_parallel_ctx(True)
paddle.distributed.barrier(group=group)
......
......@@ -71,14 +71,14 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX))
test_collective_allreduce_api MODULES test_collective_allreduce_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_allreduce_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
PROPERTIES TIMEOUT "180" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_alltoall_api MODULES test_collective_alltoall_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_alltoall_api
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
bash_test_modules(
......@@ -98,7 +98,7 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX))
test_collective_alltoall_single_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_alltoall_single_api
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
......@@ -125,7 +125,7 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX))
test_collective_broadcast_api MODULES test_collective_broadcast_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_broadcast_api
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
PROPERTIES TIMEOUT "180" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
......@@ -154,7 +154,7 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX))
test_collective_isend_irecv_api MODULES test_collective_isend_irecv_api
ENVS "http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_isend_irecv_api
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
......@@ -187,7 +187,7 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX))
test_collective_reduce_api MODULES test_collective_reduce_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_reduce_api
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
PROPERTIES TIMEOUT "180" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
bash_test_modules(
......@@ -207,7 +207,7 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX))
test_collective_reduce_scatter_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_reduce_scatter_api
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
......@@ -221,7 +221,7 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX))
test_collective_scatter_api MODULES test_collective_scatter_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_scatter_api
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
PROPERTIES TIMEOUT "180" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
......@@ -235,7 +235,7 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX))
test_collective_sendrecv_api MODULES test_collective_sendrecv_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_sendrecv_api
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
......@@ -268,17 +268,82 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX))
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_eager_dist_api MODULES test_eager_dist_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_eager_dist_api PROPERTIES TIMEOUT "120" LABELS
"RUN_TYPE=DIST")
test_communication_stream_allgather_api MODULES
test_communication_stream_allgather_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_allgather_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_new_group_api MODULES test_new_group_api ENVS
test_communication_stream_allreduce_api MODULES
test_communication_stream_allreduce_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_allreduce_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_communication_stream_alltoall_api MODULES
test_communication_stream_alltoall_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_alltoall_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_communication_stream_alltoall_single_api MODULES
test_communication_stream_alltoall_single_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_alltoall_single_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_communication_stream_broadcast_api MODULES
test_communication_stream_broadcast_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_broadcast_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_communication_stream_reduce_api MODULES
test_communication_stream_reduce_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_reduce_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_communication_stream_reduce_scatter_api MODULES
test_communication_stream_reduce_scatter_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_reduce_scatter_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_communication_stream_scatter_api MODULES
test_communication_stream_scatter_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_scatter_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_communication_stream_sendrecv_api MODULES
test_communication_stream_sendrecv_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_sendrecv_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_eager_dist_api MODULES test_eager_dist_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_new_group_api PROPERTIES TIMEOUT "120" LABELS
"RUN_TYPE=DIST")
set_tests_properties(test_eager_dist_api PROPERTIES TIMEOUT "120" LABELS
"RUN_TYPE=DIST")
endif()
if((WITH_GPU
OR WITH_ROCM
......@@ -298,11 +363,10 @@ if((WITH_GPU
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_communication_stream_allreduce_api MODULES
test_communication_stream_allreduce_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_allreduce_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
test_new_group_api MODULES test_new_group_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_new_group_api PROPERTIES TIMEOUT "120" LABELS
"RUN_TYPE=DIST")
endif()
if((WITH_ROCM OR WITH_GPU) AND (LINUX))
bash_test_modules(
......
......@@ -15,6 +15,7 @@
from __future__ import print_function
import paddle
import paddle.distributed as dist
import paddle.fluid as fluid
import unittest
import test_collective_api_base as test_base
......@@ -27,10 +28,18 @@ class TestCollectiveAllgatherAPI(test_base.TestCollectiveAPIRunnerBase):
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.to_tensor(indata)
tensor_list = []
paddle.distributed.all_gather(tensor_list, tindata)
return [tensor.numpy() for tensor in tensor_list]
# NOTE: this is a hack relying on an undocumented behavior that `to_tensor` uses uint16 to replace bfloat16
if indata.dtype == "bfloat16":
tindata = paddle.to_tensor(indata, "float32").cast("uint16")
dist.all_gather(tensor_list, tindata)
return [
tensor.cast("float32").numpy() for tensor in tensor_list
]
else:
tindata = paddle.to_tensor(indata)
dist.all_gather(tensor_list, tindata)
return [tensor.numpy() for tensor in tensor_list]
if __name__ == "__main__":
......
......@@ -15,6 +15,7 @@
from __future__ import print_function
import paddle
import paddle.distributed as dist
import paddle.fluid as fluid
import unittest
import test_collective_api_base as test_base
......@@ -27,9 +28,15 @@ class TestCollectiveAllreduceAPI(test_base.TestCollectiveAPIRunnerBase):
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.to_tensor(indata)
paddle.distributed.all_reduce(tindata)
return [tindata.numpy()]
# NOTE: this is a hack relying on an undocumented behavior that `to_tensor` uses uint16 to replace bfloat16
if indata.dtype == "bfloat16":
tindata = paddle.to_tensor(indata, "float32").cast("uint16")
dist.all_reduce(tindata)
return [tindata.cast("float32").numpy()]
else:
tindata = paddle.to_tensor(indata)
dist.all_reduce(tindata)
return [tindata.numpy()]
if __name__ == "__main__":
......
......@@ -25,30 +25,31 @@ from contextlib import closing
from six import string_types
import math
import paddle
import paddle.distributed as dist
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
import unittest
from multiprocessing import Process
import paddle.fluid.layers as layers
from functools import reduce
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import test_collective_api_base as test_base
class TestCollectiveAllToAllAPI(TestCollectiveAPIRunnerBase):
class TestCollectiveAllToAllAPI(test_base.TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.to_tensor(indata)
tindata = paddle.split(tindata, 2, axis=0)
toutdata = []
paddle.distributed.alltoall(tindata, toutdata)
return [data.numpy() for data in toutdata]
# NOTE: this is a hack relying on an undocumented behavior that `to_tensor` uses uint16 to replace bfloat16
if indata.dtype == "bfloat16":
tindata = paddle.to_tensor(indata, "float32").cast("uint16")
tindata = paddle.split(tindata, 2, axis=0)
dist.alltoall(tindata, toutdata)
return [data.cast("float32").numpy() for data in toutdata]
else:
tindata = paddle.to_tensor(indata)
tindata = paddle.split(tindata, 2, axis=0)
dist.alltoall(tindata, toutdata)
return [data.numpy() for data in toutdata]
if __name__ == "__main__":
runtime_main(TestCollectiveAllToAllAPI, "alltoall")
test_base.runtime_main(TestCollectiveAllToAllAPI, "alltoall")
......@@ -15,6 +15,7 @@
from __future__ import print_function
import paddle
import paddle.distributed as dist
import paddle.fluid as fluid
import test_collective_api_base as test_base
......@@ -26,10 +27,17 @@ class TestCollectiveAllToAllSingleAPI(test_base.TestCollectiveAPIRunnerBase):
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.to_tensor(indata)
toutdata = paddle.to_tensor(indata)
paddle.distributed.alltoall_single(tindata, toutdata)
return [toutdata.numpy()]
# NOTE: this is a hack relying on an undocumented behavior that `to_tensor` uses uint16 to replace bfloat16
if indata.dtype == "bfloat16":
tindata = paddle.to_tensor(indata, "float32").cast("uint16")
toutdata = paddle.to_tensor(tindata, "float32").cast("uint16")
dist.alltoall_single(tindata, toutdata)
return [toutdata.cast("float32").numpy()]
else:
tindata = paddle.to_tensor(indata)
toutdata = paddle.to_tensor(indata)
dist.alltoall_single(tindata, toutdata)
return [toutdata.numpy()]
if __name__ == "__main__":
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册