未验证 提交 4ba49af5 编写于 作者: S ShenLiang 提交者: GitHub

add barrier for new group (#32572)

上级 fcd18ef1
......@@ -160,6 +160,46 @@ def get_group(id=0):
return gm[group] if group in gm else None
def barrier(group=None):
"""
Barrier among all participators in the group.
Args:
group (Group): The group instance return by new_group or None for global default group.
Returns:
None.
Examples:
.. code-block:: python
import paddle
from paddle.distributed import init_parallel_env
paddle.set_device('gpu:%d'%paddle.distributed.ParallelEnv().dev_id)
init_parallel_env()
paddle.distributed.barrier()
"""
if group is not None and not group.is_member():
return
ring_id = 0 if group is None else group.id
op_type = 'barrier'
temp = fill_constant([1], dtype="int32", value="1")
if in_dygraph_mode():
return core.ops.barrier(temp, temp, 'ring_id', ring_id)
if not isinstance(ring_id, int):
raise ValueError("The type of 'group' for barrier must be int.")
helper = LayerHelper(op_type, **locals())
helper.append_op(
type=op_type,
inputs={'X': [temp]},
outputs={'Out': [temp]},
attrs={'ring_id': ring_id})
def new_group(ranks=None, backend=None):
"""
......@@ -220,7 +260,8 @@ def new_group(ranks=None, backend=None):
core.NCCLParallelContext(strategy, place).init_with_ring_id(ring_id)
else:
assert False, ("no cuda device found")
# need to barrier to construct group
barrier(gp)
return gp
......@@ -838,46 +879,6 @@ def _mp_allreduce(tensor,
raise NotImplementedError("No support _mp_allreduce in dygraph mode.")
def barrier(group=None):
"""
Barrier among all participators in the group.
Args:
group (Group): The group instance return by new_group or None for global default group.
Returns:
None.
Examples:
.. code-block:: python
import paddle
from paddle.distributed import init_parallel_env
paddle.set_device('gpu:%d'%paddle.distributed.ParallelEnv().dev_id)
init_parallel_env()
paddle.distributed.barrier()
"""
if group is not None and not group.is_member():
return
ring_id = 0 if group is None else group.id
op_type = 'barrier'
temp = fill_constant([1], dtype="int32", value="1")
if in_dygraph_mode():
return core.ops.barrier(temp, temp, 'ring_id', ring_id)
if not isinstance(ring_id, int):
raise ValueError("The type of 'group' for barrier must be int.")
helper = LayerHelper(op_type, **locals())
helper.append_op(
type=op_type,
inputs={'X': [temp]},
outputs={'Out': [temp]},
attrs={'ring_id': ring_id})
def _parallel_linear(x,
num_rows,
num_cols,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册