diff --git a/paddle/fluid/distributed/collective/process_group_bkcl.cc b/paddle/fluid/distributed/collective/process_group_bkcl.cc index b8cd285c87ff391cb1768e155d4a6ac287e4a40e..9c479ee054ba84c3d03d712d65f661abf43f344d 100644 --- a/paddle/fluid/distributed/collective/process_group_bkcl.cc +++ b/paddle/fluid/distributed/collective/process_group_bkcl.cc @@ -277,7 +277,8 @@ std::shared_ptr ProcessGroupBKCL::Reduce( const phi::DenseTensor& input, BKCLContext_t comm, const XPUStream& stream) { - phi::DenseTensor output_t(*output); + phi::DenseTensor output_t; + paddle::framework::TensorCopy(*output, platform::XPUPlace(), &output_t); const auto& place = input.place(); auto* calc_ctx = static_cast( platform::DeviceContextPool::Instance().Get(place)); diff --git a/paddle/fluid/pybind/process_group_utils.h b/paddle/fluid/pybind/process_group_utils.h index 05434957547a4ab26e73ae868cb10225a5d716b9..d693092331672c6bbf0bc700ecb17825719f4757 100644 --- a/paddle/fluid/pybind/process_group_utils.h +++ b/paddle/fluid/pybind/process_group_utils.h @@ -129,6 +129,27 @@ void ConcatDenseTensorWithType(const DeviceContext &dev_ctx, } } +#ifdef PADDLE_WITH_XPU +template <> +void ConcatDenseTensorWithType(const phi::XPUContext &dev_ctx, + const std::vector &t_list, + phi::DenseTensor *p_out, + phi::DataType type) { + switch (type) { + case phi::DataType::FLOAT16: + ConcatDenseTensor()( + dev_ctx, t_list, p_out); + break; + case phi::DataType::FLOAT32: + ConcatDenseTensor()(dev_ctx, t_list, p_out); + break; + default: + PADDLE_THROW(platform::errors::Unimplemented( + "Data type (%s) is not supported when it concats tensors.", type)); + } +} +#endif + template void SplitDenseTensorWithType(const DeviceContext &dev_ctx, const phi::DenseTensor &t_in, @@ -170,6 +191,27 @@ void SplitDenseTensorWithType(const DeviceContext &dev_ctx, } } +#ifdef PADDLE_WITH_XPU +template <> +void SplitDenseTensorWithType(const phi::XPUContext &dev_ctx, + const phi::DenseTensor &t_in, + std::vector *p_list, + phi::DataType type) { + switch (type) { + case phi::DataType::FLOAT16: + SplitDenseTensor()( + dev_ctx, t_in, p_list); + break; + case phi::DataType::FLOAT32: + SplitDenseTensor()(dev_ctx, t_in, p_list); + break; + default: + PADDLE_THROW(platform::errors::Unimplemented( + "Data type (%s) is not supported when it splits tensors.", type)); + } +} +#endif + void ConcatTensor(const phi::DeviceContext &dev_ctx, const std::vector &tensor_list, const experimental::Tensor *tensor) { @@ -187,6 +229,17 @@ void ConcatTensor(const phi::DeviceContext &dev_ctx, PADDLE_THROW(platform::errors::PermissionDenied( "Paddle can't concat tensor since it's not support GPU, please " "recompile or reinstall Paddle with GPU support.")); +#endif + } else if (platform::is_xpu_place(place)) { +#ifdef PADDLE_WITH_XPU + ConcatDenseTensorWithType(static_cast(dev_ctx), + tensor_list, + dense_tensor, + tensor->dtype()); +#else + PADDLE_THROW(platform::errors::PermissionDenied( + "Paddle can't concat tensor since it's not support XPU, please " + "recompile or reinstall Paddle with XPU support.")); #endif } else if (platform::is_custom_place(place)) { #ifdef PADDLE_WITH_CUSTOM_DEVICE @@ -233,6 +286,17 @@ void SplitTensor(const phi::DeviceContext &dev_ctx, PADDLE_THROW(platform::errors::PermissionDenied( "Paddle can't split tensor since it's not support GPU, please " "recompile or reinstall Paddle with GPU support.")); +#endif + } else if (platform::is_xpu_place(place)) { +#ifdef PADDLE_WITH_XPU + SplitDenseTensorWithType(static_cast(dev_ctx), + tensor, + &dense_list, + tensor.dtype()); +#else + PADDLE_THROW(platform::errors::PermissionDenied( + "Paddle can't split tensor since it's not compiled with XPU, " + "please recompile or reinstall Paddle with XPU support.")); #endif } else if (platform::is_custom_place(place)) { #ifdef PADDLE_WITH_CUSTOM_DEVICE diff --git a/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py b/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py index ff94376b23de647876afd6f8fbe01e05677f4006..6f112a76204c93506340f511ccddf061b5e5fabe 100644 --- a/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py +++ b/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py @@ -163,6 +163,7 @@ class TestProcessGroupFp32(unittest.TestCase): y = np.random.random(self.shape).astype(self.dtype) tensor_x = paddle.to_tensor(x) tensor_y = paddle.to_tensor(y) + old_tensor_y = paddle.to_tensor(y) sum_result = tensor_x + tensor_y if pg.rank() == 0: task = dist.reduce(tensor_x, 0, sync_op=True) @@ -174,6 +175,7 @@ class TestProcessGroupFp32(unittest.TestCase): paddle.device.xpu.synchronize() if pg.rank() == 0: assert np.array_equal(tensor_x, sum_result) + assert np.array_equal(tensor_y, old_tensor_y) sys.stdout.write("rank {}: test reduce sum api ok\n".format(pg.rank()))