From 89951472c9cfc1ab1ea1f324a2aaec395f597795 Mon Sep 17 00:00:00 2001 From: lilong12 Date: Wed, 27 Apr 2022 11:35:22 +0800 Subject: [PATCH] add the support for allreduce_prod for new dygraph (#42284) --- python/paddle/distributed/collective.py | 25 ++++++----- python/paddle/distributed/parallel.py | 5 ++- .../tests/unittests/process_group_nccl.py | 44 +++++++++++++++++++ 3 files changed, 62 insertions(+), 12 deletions(-) diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index b92b2a3c15..b2d146297d 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -350,18 +350,19 @@ def new_group(ranks=None, backend=None): global _default_group_name gid = _new_ring_id() group_name = _default_group_name + str(gid) - global_group = _get_default_group() - global_rank = global_group.rank - global_ranks = global_group.ranks - backend = _default_backend if backend is None else backend - if ranks is None: - ranks = global_ranks - assert len(ranks) <= len(global_ranks), ( - "Size of new group must be less than or " - "equal to that of the default global group.") + if ranks is None or len(ranks) > 1: + global_group = _get_default_group() + global_rank = global_group.rank + global_ranks = global_group.ranks + backend = _default_backend if backend is None else backend + if ranks is None: + ranks = global_ranks + assert len(ranks) <= len(global_ranks), ( + "Size of new group must be less than or " + "equal to that of the default global group.") size = len(ranks) ranks = sorted(ranks) - if global_rank in ranks and size > 1: + if size > 1 and global_rank in ranks: rank = ranks.index(global_rank) pg = _new_process_group_impl( backend, @@ -642,6 +643,8 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True): op_type = core.ReduceOp.MAX elif op == ReduceOp.MIN: op_type = core.ReduceOp.MIN + elif op == ReduceOp.PROD: + op_type = core.ReduceOp.PRODUCT else: raise ValueError("Unknown reduce_op type for allreduce.") group = _get_default_group() if group is None else group @@ -744,6 +747,8 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True): op_type = core.ReduceOp.MAX elif op == ReduceOp.MIN: op_type = core.ReduceOp.MIN + elif op == ReduceOp.PROD: + op_type = core.ReduceOp.PRODUCT else: raise ValueError("Unknown reduce_op type for reduce.") group = _get_default_group() if group is None else group diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index f0365cab8c..53d35a251c 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -219,8 +219,9 @@ def init_parallel_env(): "required to create a process group.") master_addr = os.getenv("MASTER_ADDR", None) master_port = os.getenv("MASTER_PORT", None) - endpoints = None - if not master_addr or not master_port: + endpoints = ":".join( + [master_addr, master_port]) if master_addr and master_port else None + if endpoints is None: endpoints = os.getenv("PADDLE_MASTER", None) if endpoints is None: endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS").split(',')[0] diff --git a/python/paddle/fluid/tests/unittests/process_group_nccl.py b/python/paddle/fluid/tests/unittests/process_group_nccl.py index 7aa83ad907..3667633d3b 100644 --- a/python/paddle/fluid/tests/unittests/process_group_nccl.py +++ b/python/paddle/fluid/tests/unittests/process_group_nccl.py @@ -122,6 +122,29 @@ class TestProcessGroupFp32(unittest.TestCase): print("test allreduce min api ok") + # test allreduce prod + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + prod_result = np.multiply(x, y) + + if pg.rank() == 0: + task = dist.all_reduce( + tensor_x, dist.ReduceOp.PROD, use_calc_stream=False) + task.wait() + assert np.array_equal(tensor_x, prod_result) + else: + task = dist.all_reduce( + tensor_y, dist.ReduceOp.PROD, use_calc_stream=False) + task.wait() + assert np.array_equal(tensor_y, prod_result) + + print("test allreduce prod api ok") + # test broadcast # rank 0 x = np.random.random(self.shape).astype(self.dtype) @@ -332,6 +355,27 @@ class TestProcessGroupFp32(unittest.TestCase): print("test reduce min api ok") + # test reduce product + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + prod_result = np.multiply(x, y) + + if pg.rank() == 0: + task = dist.reduce( + tensor_x, 0, dist.ReduceOp.PROD, use_calc_stream=False) + task.wait() + assert np.array_equal(tensor_x, prod_result) + else: + task = dist.reduce( + tensor_y, 0, dist.ReduceOp.PROD, use_calc_stream=False) + task.wait() + + print("test reduce prod api ok") # test Scatter # rank 0 in_shape = list(self.shape) -- GitLab