diff --git a/cmake/external/xpu.cmake b/cmake/external/xpu.cmake index 3c78db71d45356b9d2a4303ad04afd0ae368c8eb..e0c8b1bbfe64c7376ff623249626433054cf5053 100644 --- a/cmake/external/xpu.cmake +++ b/cmake/external/xpu.cmake @@ -8,7 +8,7 @@ set(XPU_API_LIB_NAME "libxpuapi.so") set(XPU_RT_LIB_NAME "libxpurt.so") set(XPU_BASE_DATE "20230220") -set(XPU_XCCL_BASE_VERSION "1.0.9") +set(XPU_XCCL_BASE_VERSION "1.0.10") if(NOT DEFINED XPU_BASE_URL) set(XPU_BASE_URL_WITHOUT_DATE diff --git a/paddle/fluid/distributed/collective/process_group_bkcl.cc b/paddle/fluid/distributed/collective/process_group_bkcl.cc index 0bfa04932c3f1414ac5a3073cf150c78a728965d..4f49ea9596e6e92a7034e2ed5cd5c32d0386343e 100644 --- a/paddle/fluid/distributed/collective/process_group_bkcl.cc +++ b/paddle/fluid/distributed/collective/process_group_bkcl.cc @@ -367,6 +367,34 @@ std::shared_ptr ProcessGroupBKCL::Reduce( use_calc_stream); } +std::shared_ptr ProcessGroupBKCL::ReduceScatter( + phi::DenseTensor* out_tensor, + const phi::DenseTensor& in_tensor, + const ReduceScatterOptions& opts, + bool sync_op, + bool use_calc_stream) { + return Collective( + out_tensor, + in_tensor, + [&](phi::DenseTensor* output, + const phi::DenseTensor& input, + BKCLContext_t comm, + const XPUStream& stream) { + return bkcl_reduce_scatter( + comm, + input.data(), + output->data(), + output->numel(), + platform::ToBKCLDataType( + framework::TransToProtoVarType(input.type())), + ToBKCLRedType(opts.reduce_op), + stream); + }, + CommType::REDUCE_SCATTER, + sync_op, + use_calc_stream); +} + std::shared_ptr ProcessGroupBKCL::Barrier( const BarrierOptions& opts) { PADDLE_ENFORCE_GE(opts.device_id, diff --git a/paddle/fluid/distributed/collective/process_group_bkcl.h b/paddle/fluid/distributed/collective/process_group_bkcl.h index 1ecf8c9c0ff96af52038fa6be540263de913297b..ea89dbdd6d87d1ece680ced65a8d889fa8d36a2a 100644 --- a/paddle/fluid/distributed/collective/process_group_bkcl.h +++ b/paddle/fluid/distributed/collective/process_group_bkcl.h @@ -115,6 +115,13 @@ class ProcessGroupBKCL : public ProcessGroupWithStream { bool sync_op, bool use_calc_stream) override; + std::shared_ptr ReduceScatter( + phi::DenseTensor* out_tensor, + const phi::DenseTensor& in_tensor, + const ReduceScatterOptions& opts, + bool sync_op, + bool use_calc_stream) override; + std::shared_ptr Recv(phi::DenseTensor* tensor, int src_rank, int64_t offset, diff --git a/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py b/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py index 49fe7c97d0a1043cdbc7219f8b39b07314ec1d9f..b5a0655ad9ca1d5f465e48ef99f4398b3113b439 100644 --- a/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py +++ b/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py @@ -175,6 +175,34 @@ class TestProcessGroupFp32(unittest.TestCase): assert np.array_equal(tensor_y, old_tensor_y) sys.stdout.write("rank {}: test reduce sum api ok\n".format(pg.rank())) + # test reduce_scatter + in_shape = list(self.shape) + in_shape[0] *= 2 + x = np.random.random(in_shape).astype(self.dtype) + y = np.random.random(in_shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + need_result = tensor_x + tensor_y + need_result0 = paddle.slice(need_result, [0], [0], [self.shape[0]]) + need_result1 = paddle.slice( + need_result, [0], [self.shape[0]], [in_shape[0]] + ) + out = np.random.random(self.shape).astype(self.dtype) + tensor_out = paddle.to_tensor(out) + if pg.rank() == 0: + task = dist.reduce_scatter(tensor_out, tensor_x, sync_op=True) + else: + task = dist.reduce_scatter(tensor_out, tensor_y, sync_op=False) + task.wait() + paddle.device.xpu.synchronize() + if pg.rank() == 0: + assert np.array_equal(need_result0, tensor_out) + else: + assert np.array_equal(need_result1, tensor_out) + sys.stdout.write( + "rank {}: test reduce_scatter sum api ok\n".format(pg.rank()) + ) + # test send async api # rank 0 x = np.random.random(self.shape).astype(self.dtype)