未验证 提交 5d7e1c91 编写于 作者: Y Yuang Liu 提交者: GitHub

fix dygraph pp + mp nan after async send/recv (#45869)

上级 a5836222
...@@ -327,7 +327,8 @@ def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next): ...@@ -327,7 +327,8 @@ def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next):
if tensor_send_prev is not None: if tensor_send_prev is not None:
if isinstance(tensor_send_prev, tuple): if isinstance(tensor_send_prev, tuple):
for d in tensor_send_prev: for d in tensor_send_prev:
paddle.distributed.wait(d, use_calc_stream=True) if _in_legacy_dygraph():
paddle.distributed.wait(d, use_calc_stream=True)
tasks.append( tasks.append(
send_partial(d, send_partial(d,
dst=0, dst=0,
...@@ -336,7 +337,8 @@ def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next): ...@@ -336,7 +337,8 @@ def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next):
group=_hcg.send_prev_group, group=_hcg.send_prev_group,
use_calc_stream=False)) use_calc_stream=False))
else: else:
paddle.distributed.wait(tensor_send_prev, use_calc_stream=True) if _in_legacy_dygraph():
paddle.distributed.wait(tensor_send_prev, use_calc_stream=True)
tasks.append( tasks.append(
send_partial(tensor_send_prev, send_partial(tensor_send_prev,
dst=0, dst=0,
...@@ -355,12 +357,6 @@ def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next): ...@@ -355,12 +357,6 @@ def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next):
rank_id=mp_rank, rank_id=mp_rank,
group=_hcg.recv_prev_group, group=_hcg.recv_prev_group,
use_calc_stream=True)) use_calc_stream=True))
tasks.append(
allgather_partial(d,
nranks=mp_degree,
rank_id=mp_rank,
group=mp_group,
use_calc_stream=True))
else: else:
tasks.append( tasks.append(
recv_partial(tensor_recv_prev, recv_partial(tensor_recv_prev,
...@@ -369,17 +365,12 @@ def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next): ...@@ -369,17 +365,12 @@ def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next):
rank_id=mp_rank, rank_id=mp_rank,
group=_hcg.recv_prev_group, group=_hcg.recv_prev_group,
use_calc_stream=True)) use_calc_stream=True))
tasks.append(
allgather_partial(tensor_recv_prev,
nranks=mp_degree,
rank_id=mp_rank,
group=mp_group,
use_calc_stream=True))
if tensor_send_next is not None: if tensor_send_next is not None:
if isinstance(tensor_send_next, tuple): if isinstance(tensor_send_next, tuple):
for d in tensor_send_next: for d in tensor_send_next:
paddle.distributed.wait(d, use_calc_stream=True) if _in_legacy_dygraph():
paddle.distributed.wait(d, use_calc_stream=True)
tasks.append( tasks.append(
send_partial(d, send_partial(d,
dst=1, dst=1,
...@@ -388,7 +379,8 @@ def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next): ...@@ -388,7 +379,8 @@ def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next):
group=_hcg.send_next_group, group=_hcg.send_next_group,
use_calc_stream=False)) use_calc_stream=False))
else: else:
paddle.distributed.wait(tensor_send_next, use_calc_stream=True) if _in_legacy_dygraph():
paddle.distributed.wait(tensor_send_next, use_calc_stream=True)
tasks.append( tasks.append(
send_partial(tensor_send_next, send_partial(tensor_send_next,
dst=1, dst=1,
...@@ -407,12 +399,6 @@ def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next): ...@@ -407,12 +399,6 @@ def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next):
rank_id=mp_rank, rank_id=mp_rank,
group=_hcg.recv_next_group, group=_hcg.recv_next_group,
use_calc_stream=True)) use_calc_stream=True))
tasks.append(
allgather_partial(d,
nranks=mp_degree,
rank_id=mp_rank,
group=mp_group,
use_calc_stream=True))
else: else:
tasks.append( tasks.append(
...@@ -423,17 +409,40 @@ def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next): ...@@ -423,17 +409,40 @@ def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next):
group=_hcg.recv_next_group, group=_hcg.recv_next_group,
use_calc_stream=True)) use_calc_stream=True))
tasks.append(
allgather_partial(tensor_recv_next,
nranks=mp_degree,
rank_id=mp_rank,
group=mp_group,
use_calc_stream=True))
if in_dygraph_mode(): if in_dygraph_mode():
# wait tasks in new dygraph mode with new comm library # wait isend/irecv tasks in eager dygraph mode with new comm library
for task in tasks: for task in tasks:
if task is not None: assert task is not None
task.wait() task.wait()
tensors_for_all_gather = []
if tensor_recv_prev is not None:
if isinstance(tensor_recv_prev, tuple):
for d in tensor_recv_prev:
tensors_for_all_gather.append(d)
else:
tensors_for_all_gather.append(tensor_recv_prev)
if tensor_recv_next is not None:
if isinstance(tensor_recv_next, tuple):
for d in tensor_recv_next:
tensors_for_all_gather.append(d)
else:
tensors_for_all_gather.append(tensor_recv_next)
tasks = []
for tensor in tensors_for_all_gather:
tasks.append(
allgather_partial(tensor,
nranks=mp_degree,
rank_id=mp_rank,
group=mp_group,
use_calc_stream=True))
for task in tasks:
# wait partial all gather tasks
if task is not None:
task.wait()
return tensor_recv_prev, tensor_recv_next return tensor_recv_prev, tensor_recv_next
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册