未验证 提交 bddeecd1 编写于 作者: G Galaxy1458 提交者: GitHub

test,test=develop (#53301)

上级 336bc20b
......@@ -59,20 +59,20 @@ class ProcessGroup {
virtual ~Task() = default;
virtual bool IsCompleted();
virtual bool Wait(std::chrono::milliseconds timeout = kWaitTimeout) {
virtual bool Wait(std::chrono::milliseconds timeout UNUSED = kWaitTimeout) {
return false;
}
virtual void Synchronize() {}
virtual void UpdateWaitChain(const phi::DeviceContext& ctx) {}
virtual void UpdateWaitChain(const phi::DeviceContext& ctx UNUSED) {}
bool IsSync() const { return sync_op_; }
// TODO(sunyilun): methods below will be removed later
Task(int rank,
const std::vector<phi::DenseTensor>& inputs,
const std::vector<phi::DenseTensor>& inputs UNUSED,
CommType comm_type)
: rank_(rank), comm_type_(comm_type) {}
Task(int rank,
const std::vector<phi::DenseTensor>& inputs,
const std::vector<phi::DenseTensor>& inputs UNUSED,
CommType comm_type,
bool sync_op)
: rank_(rank), comm_type_(comm_type), sync_op_(sync_op) {}
......@@ -97,14 +97,15 @@ class ProcessGroup {
virtual std::string GetBackendName() const = 0;
virtual phi::DeviceContext* GetDeviceContext(const Place& place) const {
virtual phi::DeviceContext* GetDeviceContext(
const Place& place UNUSED) const {
PADDLE_THROW(phi::errors::Unimplemented(
"ProcessGroup%s does not support get device_context.",
GetBackendName()));
}
virtual phi::DeviceContext* GetDeviceContext(const Place& place,
bool use_calc_stream) const {
virtual phi::DeviceContext* GetDeviceContext(
const Place& place UNUSED, bool use_calc_stream UNUSED) const {
PADDLE_THROW(phi::errors::Unimplemented(
"ProcessGroup%s does not support get device_context.",
GetBackendName()));
......@@ -112,123 +113,127 @@ class ProcessGroup {
// without stream APIs
virtual std::shared_ptr<ProcessGroup::Task> AllGather(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
bool sync_op) {
phi::DenseTensor* out_tensor UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
bool sync_op UNUSED) {
PADDLE_THROW(phi::errors::Unimplemented(
"ProcessGroup%s does not support all_gather with sync_op flag.",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> AllGather(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
int64_t offset,
int64_t numel,
bool sync_op) {
phi::DenseTensor* out_tensor UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
int64_t offset UNUSED,
int64_t numel UNUSED,
bool sync_op UNUSED) {
PADDLE_THROW(phi::errors::Unimplemented(
"ProcessGroup%s does not support all_gather with sync_op flag.",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> AllReduce(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const AllreduceOptions& opts,
bool sync_op) {
phi::DenseTensor* out_tensor UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
const AllreduceOptions& opts UNUSED,
bool sync_op UNUSED) {
PADDLE_THROW(phi::errors::Unimplemented(
"ProcessGroup%s does not support all_reduce with sync_op flag.",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> AllToAll(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const std::vector<int64_t>& out_size_each_rank,
const std::vector<int64_t>& in_size_each_rank,
bool sync_op) {
phi::DenseTensor* out_tensor UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
const std::vector<int64_t>& out_size_each_rank UNUSED,
const std::vector<int64_t>& in_size_each_rank UNUSED,
bool sync_op UNUSED) {
PADDLE_THROW(phi::errors::Unimplemented(
"ProcessGroup%s does not support all_to_all with sync_op flag.",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Barrier(
const BarrierOptions& = BarrierOptions()) {
const BarrierOptions& UNUSED = BarrierOptions()) {
PADDLE_THROW(phi::errors::Unimplemented(
"ProcessGroup%s does not support barrier.", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Broadcast(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const BroadcastOptions& opts,
bool sync_op) {
phi::DenseTensor* out_tensor UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
const BroadcastOptions& opts UNUSED,
bool sync_op UNUSED) {
PADDLE_THROW(phi::errors::Unimplemented(
"ProcessGroup%s does not support broadcast with sync_op flag",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Reduce(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const ReduceOptions& opts,
bool sync_op) {
phi::DenseTensor* out_tensor UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
const ReduceOptions& opts UNUSED,
bool sync_op UNUSED) {
PADDLE_THROW(phi::errors::Unimplemented(
"ProcessGroup%s does not support reduce with sync_op flag.",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> ReduceScatter(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const ReduceScatterOptions& opts,
bool sync_op) {
phi::DenseTensor* out_tensor UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
const ReduceScatterOptions& opts UNUSED,
bool sync_op UNUSED) {
PADDLE_THROW(phi::errors::Unimplemented(
"ProcessGroup%s does not support reduce_scatter with sync_op flag.",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Scatter(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const ScatterOptions& opts,
bool sync_op) {
phi::DenseTensor* out_tensor UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
const ScatterOptions& opts UNUSED,
bool sync_op UNUSED) {
PADDLE_THROW(phi::errors::Unimplemented(
"ProcessGroup%s does not support scatter with sync_op flag.",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Recv(phi::DenseTensor* tensor,
int src_rank,
bool sync_op) {
virtual std::shared_ptr<ProcessGroup::Task> Recv(phi::DenseTensor* tensor
UNUSED,
int src_rank UNUSED,
bool sync_op UNUSED) {
PADDLE_THROW(phi::errors::Unimplemented(
"ProcessGroup%s does not support recv with sync_op flag.",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Recv(phi::DenseTensor* tensor,
int src_rank,
int64_t offset,
int64_t numel,
bool sync_op) {
virtual std::shared_ptr<ProcessGroup::Task> Recv(phi::DenseTensor* tensor
UNUSED,
int src_rank UNUSED,
int64_t offset UNUSED,
int64_t numel UNUSED,
bool sync_op UNUSED) {
PADDLE_THROW(phi::errors::Unimplemented(
"ProcessGroup%s does not support recv with sync_op flag.",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Send(
const phi::DenseTensor& tensor, int dst_rank, bool sync_op) {
const phi::DenseTensor& tensor UNUSED,
int dst_rank UNUSED,
bool sync_op UNUSED) {
PADDLE_THROW(phi::errors::Unimplemented(
"ProcessGroup%s does not support send with sync_op flag.",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Send(
const phi::DenseTensor& tensor,
int dst_rank,
int64_t offset,
int64_t numel,
bool sync_op) {
const phi::DenseTensor& tensor UNUSED,
int dst_rank UNUSED,
int64_t offset UNUSED,
int64_t numel UNUSED,
bool sync_op UNUSED) {
PADDLE_THROW(phi::errors::Unimplemented(
"ProcessGroup%s does not support send with sync_op flag.",
GetBackendName()));
......@@ -236,10 +241,10 @@ class ProcessGroup {
// stream APIs
virtual std::shared_ptr<ProcessGroup::Task> AllGather(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
bool sync_op,
bool use_calc_stream) {
phi::DenseTensor* out_tensor UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
bool sync_op UNUSED,
bool use_calc_stream UNUSED) {
PADDLE_THROW(
phi::errors::Unimplemented("ProcessGroup%s does not support all_gather "
"with sync_op and use_calc_stream flag.",
......@@ -247,12 +252,12 @@ class ProcessGroup {
}
virtual std::shared_ptr<ProcessGroup::Task> AllGather(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
int64_t offset,
int64_t numel,
bool sync_op,
bool use_calc_stream) {
phi::DenseTensor* out_tensor UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
int64_t offset UNUSED,
int64_t numel UNUSED,
bool sync_op UNUSED,
bool use_calc_stream UNUSED) {
PADDLE_THROW(
phi::errors::Unimplemented("ProcessGroup%s does not support all_gather "
"with sync_op and use_calc_stream flag.",
......@@ -260,11 +265,11 @@ class ProcessGroup {
}
virtual std::shared_ptr<ProcessGroup::Task> AllReduce(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const AllreduceOptions& opts,
bool sync_op,
bool use_calc_stream) {
phi::DenseTensor* out_tensor UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
const AllreduceOptions& opts UNUSED,
bool sync_op UNUSED,
bool use_calc_stream UNUSED) {
PADDLE_THROW(
phi::errors::Unimplemented("ProcessGroup%s does not support all_reduce "
"with sync_op and use_calc_stream flag.",
......@@ -272,12 +277,12 @@ class ProcessGroup {
}
virtual std::shared_ptr<ProcessGroup::Task> AllToAll(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const std::vector<int64_t>& out_size_each_rank,
const std::vector<int64_t>& in_size_each_rank,
bool sync_op,
bool use_calc_stream) {
phi::DenseTensor* out_tensor UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
const std::vector<int64_t>& out_size_each_rank UNUSED,
const std::vector<int64_t>& in_size_each_rank UNUSED,
bool sync_op UNUSED,
bool use_calc_stream UNUSED) {
PADDLE_THROW(
phi::errors::Unimplemented("ProcessGroup%s does not support all_to_all "
"with sync_op and use_calc_stream flag.",
......@@ -285,11 +290,11 @@ class ProcessGroup {
}
virtual std::shared_ptr<ProcessGroup::Task> Broadcast(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const BroadcastOptions& opts,
bool sync_op,
bool use_calc_stream) {
phi::DenseTensor* out_tensor UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
const BroadcastOptions& opts UNUSED,
bool sync_op UNUSED,
bool use_calc_stream UNUSED) {
PADDLE_THROW(
phi::errors::Unimplemented("ProcessGroup%s does not support broadcast "
"with sync_op and use_calc_stream flag.",
......@@ -297,11 +302,11 @@ class ProcessGroup {
}
virtual std::shared_ptr<ProcessGroup::Task> Reduce(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const ReduceOptions& opts,
bool sync_op,
bool use_calc_stream) {
phi::DenseTensor* out_tensor UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
const ReduceOptions& opts UNUSED,
bool sync_op UNUSED,
bool use_calc_stream UNUSED) {
PADDLE_THROW(
phi::errors::Unimplemented("ProcessGroup%s does not support reduce "
"with sync_op and use_calc_stream flag.",
......@@ -309,11 +314,11 @@ class ProcessGroup {
}
virtual std::shared_ptr<ProcessGroup::Task> ReduceScatter(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const ReduceScatterOptions& opts,
bool sync_op,
bool use_calc_stream) {
phi::DenseTensor* out_tensor UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
const ReduceScatterOptions& opts UNUSED,
bool sync_op UNUSED,
bool use_calc_stream UNUSED) {
PADDLE_THROW(phi::errors::Unimplemented(
"ProcessGroup%s does not support reduce_scatter "
"with sync_op and use_calc_stream flag.",
......@@ -321,11 +326,11 @@ class ProcessGroup {
}
virtual std::shared_ptr<ProcessGroup::Task> Scatter(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const ScatterOptions& opts,
bool sync_op,
bool use_calc_stream) {
phi::DenseTensor* out_tensor UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
const ScatterOptions& opts UNUSED,
bool sync_op UNUSED,
bool use_calc_stream UNUSED) {
PADDLE_THROW(
phi::errors::Unimplemented("ProcessGroup%s does not support scatter "
"with sync_op and use_calc_stream flag.",
......@@ -333,11 +338,11 @@ class ProcessGroup {
}
virtual std::shared_ptr<ProcessGroup::Task> Gather(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const GatherOptions& opts,
bool sync_op,
bool use_calc_stream) {
phi::DenseTensor* out_tensor UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
const GatherOptions& opts UNUSED,
bool sync_op UNUSED,
bool use_calc_stream UNUSED) {
PADDLE_THROW(
phi::errors::Unimplemented("ProcessGroup%s does not support gather "
"with sync_op and use_calc_stream flag.",
......@@ -345,33 +350,35 @@ class ProcessGroup {
}
virtual std::shared_ptr<ProcessGroup::Task> Gather(
std::vector<phi::DenseTensor>* gather_tensors_ptr,
const phi::DenseTensor& in_tensor,
const GatherOptions& opts,
bool sync_op,
bool use_calc_stream) {
std::vector<phi::DenseTensor>* gather_tensors_ptr UNUSED,
const phi::DenseTensor& in_tensor UNUSED,
const GatherOptions& opts UNUSED,
bool sync_op UNUSED,
bool use_calc_stream UNUSED) {
PADDLE_THROW(
phi::errors::Unimplemented("ProcessGroup%s does not support gather "
"with sync_op and use_calc_stream flag.",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Recv(phi::DenseTensor* tensor,
int src_rank,
bool sync_op,
bool use_calc_stream) {
virtual std::shared_ptr<ProcessGroup::Task> Recv(
phi::DenseTensor* tensor UNUSED,
int src_rank UNUSED,
bool sync_op UNUSED,
bool use_calc_stream UNUSED) {
PADDLE_THROW(
phi::errors::Unimplemented("ProcessGroup%s does not support recv with "
"sync_op and use_calc_stream flag.",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Recv(phi::DenseTensor* tensor,
int src_rank,
int64_t offset,
int64_t numel,
bool sync_op,
bool use_calc_stream) {
virtual std::shared_ptr<ProcessGroup::Task> Recv(
phi::DenseTensor* tensor UNUSED,
int src_rank UNUSED,
int64_t offset UNUSED,
int64_t numel UNUSED,
bool sync_op UNUSED,
bool use_calc_stream UNUSED) {
PADDLE_THROW(
phi::errors::Unimplemented("ProcessGroup%s does not support recv "
"with sync_op and use_calc_stream flag.",
......@@ -379,10 +386,10 @@ class ProcessGroup {
}
virtual std::shared_ptr<ProcessGroup::Task> Send(
const phi::DenseTensor& tensor,
int dst_rank,
bool sync_op,
bool use_calc_stream) {
const phi::DenseTensor& tensor UNUSED,
int dst_rank UNUSED,
bool sync_op UNUSED,
bool use_calc_stream UNUSED) {
PADDLE_THROW(
phi::errors::Unimplemented("ProcessGroup%s does not support send "
"with sync_op and use_calc_stream flag.",
......@@ -390,12 +397,12 @@ class ProcessGroup {
}
virtual std::shared_ptr<ProcessGroup::Task> Send(
const phi::DenseTensor& tensor,
int dst_rank,
int64_t offset,
int64_t numel,
bool sync_op,
bool use_calc_stream) {
const phi::DenseTensor& tensor UNUSED,
int dst_rank UNUSED,
int64_t offset UNUSED,
int64_t numel UNUSED,
bool sync_op UNUSED,
bool use_calc_stream UNUSED) {
PADDLE_THROW(
phi::errors::Unimplemented("ProcessGroup%s does not support send "
"with sync_op and use_calc_stream flag.",
......@@ -407,7 +414,7 @@ class ProcessGroup {
virtual std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
const AllreduceOptions& = AllreduceOptions()) {
const AllreduceOptions& UNUSED = AllreduceOptions()) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support allreduce", GetBackendName()));
}
......@@ -415,7 +422,7 @@ class ProcessGroup {
virtual std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
const AllreduceOptions&,
const AllreduceOptions& UNUSED,
bool) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support allreduce with sync_op flag",
......@@ -426,7 +433,7 @@ class ProcessGroup {
virtual std::shared_ptr<ProcessGroup::Task> Broadcast(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
const BroadcastOptions& = BroadcastOptions()) {
const BroadcastOptions& UNUSED = BroadcastOptions()) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support broadcast", GetBackendName()));
}
......@@ -434,7 +441,7 @@ class ProcessGroup {
virtual std::shared_ptr<ProcessGroup::Task> Broadcast(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
const BroadcastOptions&,
const BroadcastOptions& UNUSED,
bool) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support broadcast with sync_op flag",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册