未验证 提交 46c57674 编写于 作者: TaoTao Li's avatar TaoTao Li 提交者: GitHub

prepare for collective communicate upgrade in dygraph (#54417)

上级 0cf841c9
......@@ -125,7 +125,8 @@ void AsyncWorkQueue::AddTask(const OpFuncType& op_func_type,
queue_group_->AddTask(op_func_type == OpFuncType::kGpuAsync, std::move(fn));
}
bool IsCommunicationOp(const std::string& op_name) {
bool IsCommunicationOp(const OperatorBase* op) {
const std::string& op_name = op->Type();
const std::set<std::string> special_comm_op_set = {
"send",
"recv",
......@@ -137,6 +138,9 @@ bool IsCommunicationOp(const std::string& op_name) {
special_comm_op_set.count(op_name)) {
return true;
}
if (op->HasAttr("ring_id")) {
return true;
}
return false;
}
......@@ -144,7 +148,7 @@ bool IsCommunicationOp(const Instruction& instr) {
if (!instr.OpBaseValid()) {
return false;
}
return IsCommunicationOp(instr.OpBase()->Type());
return IsCommunicationOp(instr.OpBase());
}
bool IsCpuOp(const Instruction& instr) {
......@@ -579,7 +583,7 @@ void BuildOpFuncList(const platform::Place& place,
op_func_node.stream_priority_ = dist_attr->stream_priority();
op_func_node.scheduling_priority_ = dist_attr->scheduling_priority();
} else {
if (interpreter::IsCommunicationOp(op_type)) {
if (interpreter::IsCommunicationOp(op)) {
// NOTE(Ruibiao): Dispatching computation before communication improves
// multi-stream overlap when the time cost of communication less than
// that of the calculation (e.g., ResNet50_bs128_pure_fp16 N4C32
......
......@@ -65,7 +65,7 @@ class AsyncWorkQueue {
std::unique_ptr<WorkQueueGroup> queue_group_;
};
bool IsCommunicationOp(const std::string& op_name);
bool IsCommunicationOp(const OperatorBase* op);
bool IsCommunicationOp(const Instruction& instr);
......
......@@ -17,7 +17,7 @@ limitations under the License. */
namespace paddle {
namespace operators {
class AllToAllOp : public framework::OperatorWithKernel {
class AllToAllBaseOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
......@@ -43,7 +43,7 @@ class AllToAllOp : public framework::OperatorWithKernel {
}
};
class AllToAllOpMaker : public framework::OpProtoAndCheckerMaker {
class AllToAllBaseOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() {
AddInput("X", "(Tensor) tensor send.");
......@@ -67,7 +67,9 @@ Scatter tensors from all participators to all participators.
namespace ops = paddle::operators;
namespace plat = paddle::platform;
REGISTER_OP_WITHOUT_GRADIENT(alltoall, ops::AllToAllOp, ops::AllToAllOpMaker)
REGISTER_OP_WITHOUT_GRADIENT(alltoall,
ops::AllToAllBaseOp,
ops::AllToAllBaseOpMaker)
PD_REGISTER_STRUCT_KERNEL(alltoall,
CPU,
......
......@@ -19,6 +19,7 @@
#include "paddle/phi/core/dense_tensor.h"
#include "paddle/phi/core/distributed/check/nccl_dynamic_check.h"
#include "paddle/phi/core/distributed/check/static_check.h"
#include "paddle/phi/core/distributed/utils.h"
#include "paddle/phi/core/enforce.h"
#include "paddle/phi/core/utils/data_type.h"
......@@ -61,6 +62,17 @@ void NCCLCommContext::Broadcast(phi::DenseTensor* out_tensor,
void NCCLCommContext::AllGather(phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
gpuStream_t stream) {
phi::distributed::CommStaticCheck::GatherLikeShape(*out_tensor,
in_tensor,
/*dst_rank*/ rank_,
/*cur_rank*/ rank_,
size_);
if (FLAGS_enable_nccl_dynamic_check) {
phi::distributed::NCCLDynamicCheck::CheckShape(*out_tensor,
/*root_rank*/ 0,
rank_,
nccl_comm_);
}
PADDLE_ENFORCE_GPU_SUCCESS(
phi::dynload::ncclAllGather(in_tensor.data(),
out_tensor->data(),
......@@ -71,29 +83,43 @@ void NCCLCommContext::AllGather(phi::DenseTensor* out_tensor,
}
void NCCLCommContext::ReduceScatter(phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
ncclRedOp_t reduce_type,
gpuStream_t stream) {
int64_t out_size = in_tensor.numel() / GetSize();
phi::distributed::CommStaticCheck::ScatterLikeShape(*out_tensor,
in_tensor,
/*dst_rank*/ rank_,
/*cur_rank*/ rank_,
size_);
if (FLAGS_enable_nccl_dynamic_check) {
phi::distributed::NCCLDynamicCheck::CheckShape(*out_tensor,
/*root_rank*/ 0,
rank_,
nccl_comm_);
}
PADDLE_ENFORCE_GPU_SUCCESS(
phi::dynload::ncclReduceScatter(in_tensor.data(),
out_tensor->data(),
out_size,
out_tensor->numel(),
ToNCCLDataType(in_tensor.type()),
ncclSum,
reduce_type,
nccl_comm_,
stream));
}
void NCCLCommContext::Send(const phi::DenseTensor& in_tensor,
const int64_t& count,
const int& peer,
gpuStream_t stream) {
phi::distributed::CommStaticCheck::CheckShape(in_tensor, rank_, size_);
if (FLAGS_enable_nccl_dynamic_check) {
NCCLDynamicCheck::CheckShape(in_tensor, rank_, rank_, nccl_comm_);
}
PADDLE_ENFORCE_GPU_SUCCESS(
phi::dynload::ncclSend(in_tensor.data(),
in_tensor.numel(),
ToNCCLDataType(in_tensor.type()),
count,
ToNCCLDataType(in_tensor.dtype()),
peer,
nccl_comm_,
stream));
......@@ -102,16 +128,18 @@ void NCCLCommContext::Send(const phi::DenseTensor& in_tensor,
}
void NCCLCommContext::Recv(phi::DenseTensor* out_tensor,
const int64_t& count,
const int& peer,
gpuStream_t stream) {
phi::distributed::CommStaticCheck::CheckShape(*out_tensor, rank_, size_);
if (FLAGS_enable_nccl_dynamic_check) {
NCCLDynamicCheck::CheckShape(*out_tensor, rank_, rank_, nccl_comm_);
NCCLDynamicCheck::CheckShape(*out_tensor, peer, rank_, nccl_comm_);
}
PADDLE_ENFORCE_GPU_SUCCESS(
phi::dynload::ncclRecv(out_tensor->data(),
out_tensor->numel(),
ToNCCLDataType(out_tensor->type()),
count,
ToNCCLDataType(out_tensor->dtype()),
peer,
nccl_comm_,
stream));
......@@ -123,6 +151,17 @@ void NCCLCommContext::AllReduce(phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
ncclRedOp_t reduce_type,
gpuStream_t stream) {
phi::distributed::CommStaticCheck::SameShape(*out_tensor,
in_tensor,
/*dst_rank*/ rank_,
/*cur_rank*/ rank_,
size_);
if (FLAGS_enable_nccl_dynamic_check) {
phi::distributed::NCCLDynamicCheck::CheckShape(*out_tensor,
/*root_rank*/ 0,
rank_,
nccl_comm_);
}
PADDLE_ENFORCE_GPU_SUCCESS(
phi::dynload::ncclAllReduce(in_tensor.data(),
out_tensor->data(),
......@@ -138,6 +177,17 @@ void NCCLCommContext::Reduce(phi::DenseTensor* out_tensor,
ncclRedOp_t reduce_type,
int root,
gpuStream_t stream) {
phi::distributed::CommStaticCheck::SameShape(*out_tensor,
in_tensor,
/*dst_rank*/ root,
/*cur_rank*/ rank_,
size_);
if (FLAGS_enable_nccl_dynamic_check) {
phi::distributed::NCCLDynamicCheck::CheckShape(*out_tensor,
/*root_rank*/ root,
rank_,
nccl_comm_);
}
PADDLE_ENFORCE_GPU_SUCCESS(
phi::dynload::ncclReduce(in_tensor.data(),
out_tensor->data(),
......@@ -149,5 +199,10 @@ void NCCLCommContext::Reduce(phi::DenseTensor* out_tensor,
stream));
}
void NCCLCommContext::GroupStart() {
NCCL_CHECK(phi::dynload::ncclGroupStart());
}
void NCCLCommContext::GroupEnd() { NCCL_CHECK(phi::dynload::ncclGroupEnd()); }
} // namespace distributed
} // namespace phi
......@@ -38,13 +38,18 @@ class NCCLCommContext final : public CommContext {
int root,
gpuStream_t stream);
void Send(const phi::DenseTensor& in_tensor,
const int64_t& count,
const int& peer,
gpuStream_t stream);
void Recv(phi::DenseTensor* out_tensor, const int& peer, gpuStream_t stream);
void Recv(phi::DenseTensor* out_tensor,
const int64_t& count,
const int& peer,
gpuStream_t stream);
void ReduceScatter(phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
ncclRedOp_t reduce_type,
gpuStream_t stream);
void AllGather(phi::DenseTensor* out_tensor,
......@@ -62,6 +67,10 @@ class NCCLCommContext final : public CommContext {
int root,
gpuStream_t stream);
void GroupStart();
void GroupEnd();
private:
DISABLE_COPY_AND_ASSIGN(NCCLCommContext);
......
// Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "paddle/phi/core/dense_tensor.h"
namespace phi {
namespace distributed {
inline phi::DenseTensor GetPartialTensor(const phi::DenseTensor& tensor,
int64_t offset,
int64_t numel) {
phi::DenseTensor tensor_flattened;
tensor_flattened.ShareDataWith(tensor);
tensor_flattened.Resize({tensor.numel()});
return tensor_flattened.Slice(offset, offset + numel);
}
#define NCCL_CHECK(cmd) \
do { \
ncclResult_t r = cmd; \
if (r != ncclSuccess) { \
exit(EXIT_FAILURE); \
} \
} while (0)
} // namespace distributed
} // namespace phi
......@@ -48,7 +48,8 @@ DDim recv_shape_info(const Context& dev_ctx,
phi::DenseTensor* gpu_shape_size_tensor = new phi::DenseTensor(shape_dtype);
gpu_shape_size_tensor->Resize({1});
dev_ctx.Alloc(gpu_shape_size_tensor, shape_dtype);
comm_ctx->Recv(gpu_shape_size_tensor, peer, stream);
comm_ctx->Recv(
gpu_shape_size_tensor, gpu_shape_size_tensor->numel(), peer, stream);
// copy the shape size tensor to cpu
phi::DenseTensor* cpu_shape_size_tensor = new phi::DenseTensor(shape_dtype);
......@@ -71,7 +72,7 @@ DDim recv_shape_info(const Context& dev_ctx,
phi::DenseTensor* gpu_shape_tensor = new phi::DenseTensor(shape_dtype);
gpu_shape_tensor->Resize({shape_size});
dev_ctx.Alloc(gpu_shape_tensor, shape_dtype);
comm_ctx->Recv(gpu_shape_tensor, peer, stream);
comm_ctx->Recv(gpu_shape_tensor, gpu_shape_tensor->numel(), peer, stream);
// copy the shape tensor to cpu
phi::DenseTensor* cpu_shape_tensor = new phi::DenseTensor(shape_dtype);
......@@ -141,7 +142,7 @@ void PRecvKernel(const Context& dev_ctx,
out->Resize(new_dim);
}
dev_ctx.Alloc(out, dtype);
comm_ctx->Recv(out, peer, stream);
comm_ctx->Recv(out, out->numel(), peer, stream);
#else
PADDLE_THROW(
errors::PreconditionNotMet("PaddlePaddle should compile with GPU."
......@@ -165,7 +166,7 @@ void PRecvArrayKernel(const Context& dev_ctx,
auto out = out_array->at(idx);
auto out_dims = out.dims();
dev_ctx.Alloc(&out, dtype);
comm_ctx->Recv(&out, peer, stream);
comm_ctx->Recv(&out, out.numel(), peer, stream);
VLOG(3) << "rank " << comm_ctx->GetRank() << " recv "
<< phi::product(out_dims) << " from " << peer;
}
......
......@@ -65,7 +65,8 @@ void send_shape_info(const Context& dev_ctx,
cpu_shape_size_tensor.numel() * sizeof(int),
stream);
comm_ctx->Send(*gpu_shape_size_tensor, peer, stream);
comm_ctx->Send(
*gpu_shape_size_tensor, gpu_shape_size_tensor->numel(), peer, stream);
VLOG(3) << "send the shape size: " << shape_size << " to peer";
// step2: send the shape
......@@ -87,7 +88,7 @@ void send_shape_info(const Context& dev_ctx,
cpu_shape_tensor.data(),
cpu_shape_tensor.numel() * sizeof(int),
stream);
comm_ctx->Send(*gpu_shape_tensor, peer, stream);
comm_ctx->Send(*gpu_shape_tensor, gpu_shape_tensor->numel(), peer, stream);
VLOG(3) << "send the shape: (" << dims << ") to peer";
}
......@@ -131,7 +132,7 @@ void PSendKernel(const Context& dev_ctx,
send_shape_info<Context>(dev_ctx, x, comm_ctx, peer, stream);
}
comm_ctx->Send(x, peer, stream);
comm_ctx->Send(x, x.numel(), peer, stream);
#else
PADDLE_THROW(
errors::PreconditionNotMet("PaddlePaddle should compile with GPU."
......@@ -153,7 +154,7 @@ void PSendArrayKernel(const Context& dev_ctx,
auto x = x_array.at(idx);
int numel = x.numel();
ncclDataType_t dtype = ToNCCLDataType(x.type());
comm_ctx->Send(x, peer, stream);
comm_ctx->Send(x, x.numel(), peer, stream);
VLOG(3) << "rank " << comm_ctx->GetRank() << " send "
<< phi::product(x.dims()) << " to " << peer;
}
......
......@@ -50,7 +50,7 @@ void ReduceScatterKernel(const Context& dev_ctx,
out->Resize(out_dims);
dev_ctx.template Alloc<T>(out);
comm_context->ReduceScatter(out, x, stream);
comm_context->ReduceScatter(out, x, ncclSum, stream);
#else
PADDLE_THROW(
errors::PreconditionNotMet("PaddlePaddle should compile with GPU."));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册