未验证 提交 f023d42f 编写于 作者: R Roc 提交者: GitHub

[NPU] PP for npu (#53501)

* revert p2p communication for xpu

* pp for npu

* update

* update

* fix xpuplace

* add ut for sync send

* Revert "fix xpuplace"

This reverts commit f89c1d7622426686bc153a3414a42c39e0f4a647.

* add ut for pp sync send

* rm unusable ut

* update
上级 6a279dfd
......@@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
import paddle
......@@ -24,6 +27,9 @@ _hcg = None
_use_cache = False
_enable_partial_send_recv = True
_sync_send = os.environ.get("PADDLE_P2P_SYNC_SEND", "0")
_sync_send = _sync_send.lower() in ['1', 'true']
def initialize_p2p_groups(hcg, use_cache=True, enable_partial_send_recv=True):
global _hcg, _use_cache, _enable_partial_send_recv
......@@ -351,43 +357,111 @@ def _p2p_helper(
# TODO(Yuang Liu): use batch_isend_irecv replace all these comm ops
tasks = []
# start to p2p communicate
if tensor_send_prev is not None:
if isinstance(tensor_send_prev, tuple):
for d in tensor_send_prev:
paddle.distributed.wait(d, use_calc_stream=True)
if _sync_send:
# Some devices(NPU for example) do not support asynchronized send op, So the order is
# recv_prev -> send_next -> recv_next -> send_prev
# When using this order, the environment variable
# 'PADDLE_P2P_SYNC_SEND' should be set True
if tensor_recv_prev is not None:
if isinstance(tensor_recv_prev, tuple):
for d in tensor_recv_prev:
task = recv_partial(
d,
src=0,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.recv_prev_group,
use_calc_stream=sync_recv,
)
if sync_recv:
allgather_partial(
d,
nranks=mp_degree,
rank_id=mp_rank,
group=mp_group,
use_calc_stream=True,
)
else:
tasks.append(task)
else:
task = recv_partial(
tensor_recv_prev,
src=0,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.recv_prev_group,
use_calc_stream=sync_recv,
)
if sync_recv:
allgather_partial(
tensor_recv_prev,
nranks=mp_degree,
rank_id=mp_rank,
group=mp_group,
use_calc_stream=True,
)
else:
tasks.append(task)
if tensor_send_next is not None:
if isinstance(tensor_send_next, tuple):
for d in tensor_send_next:
paddle.distributed.wait(d, use_calc_stream=True)
send_partial(
d,
dst=1,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.send_next_group,
use_calc_stream=False,
)
else:
paddle.distributed.wait(tensor_send_next, use_calc_stream=True)
send_partial(
d,
dst=0,
tensor_send_next,
dst=1,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.send_prev_group,
group=_hcg.send_next_group,
use_calc_stream=False,
)
else:
paddle.distributed.wait(tensor_send_prev, use_calc_stream=True)
send_partial(
tensor_send_prev,
dst=0,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.send_prev_group,
use_calc_stream=False,
)
if tensor_recv_prev is not None:
if isinstance(tensor_recv_prev, tuple):
for d in tensor_recv_prev:
if tensor_recv_next is not None:
if isinstance(tensor_recv_next, tuple):
for d in tensor_recv_next:
task = recv_partial(
d,
src=1,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.recv_next_group,
use_calc_stream=sync_recv,
)
if sync_recv:
allgather_partial(
d,
nranks=mp_degree,
rank_id=mp_rank,
group=mp_group,
use_calc_stream=True,
)
else:
tasks.append(task)
else:
task = recv_partial(
d,
src=0,
tensor_recv_next,
src=1,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.recv_prev_group,
group=_hcg.recv_next_group,
use_calc_stream=sync_recv,
)
if sync_recv:
allgather_partial(
d,
tensor_recv_next,
nranks=mp_degree,
rank_id=mp_rank,
group=mp_group,
......@@ -395,65 +469,154 @@ def _p2p_helper(
)
else:
tasks.append(task)
else:
task = recv_partial(
tensor_recv_prev,
src=0,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.recv_prev_group,
use_calc_stream=sync_recv,
)
if sync_recv:
allgather_partial(
tensor_recv_prev,
if tensor_send_prev is not None:
if isinstance(tensor_send_prev, tuple):
for d in tensor_send_prev:
paddle.distributed.wait(d, use_calc_stream=True)
send_partial(
d,
dst=0,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.send_prev_group,
use_calc_stream=False,
)
else:
paddle.distributed.wait(tensor_send_prev, use_calc_stream=True)
send_partial(
tensor_send_prev,
dst=0,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.send_prev_group,
use_calc_stream=False,
)
else:
if tensor_send_prev is not None:
if isinstance(tensor_send_prev, tuple):
for d in tensor_send_prev:
paddle.distributed.wait(d, use_calc_stream=True)
send_partial(
d,
dst=0,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.send_prev_group,
use_calc_stream=False,
)
else:
paddle.distributed.wait(tensor_send_prev, use_calc_stream=True)
send_partial(
tensor_send_prev,
dst=0,
nranks=mp_degree,
rank_id=mp_rank,
group=mp_group,
use_calc_stream=True,
group=_hcg.send_prev_group,
use_calc_stream=False,
)
if tensor_recv_prev is not None:
if isinstance(tensor_recv_prev, tuple):
for d in tensor_recv_prev:
task = recv_partial(
d,
src=0,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.recv_prev_group,
use_calc_stream=sync_recv,
)
if sync_recv:
allgather_partial(
d,
nranks=mp_degree,
rank_id=mp_rank,
group=mp_group,
use_calc_stream=True,
)
else:
tasks.append(task)
else:
tasks.append(task)
task = recv_partial(
tensor_recv_prev,
src=0,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.recv_prev_group,
use_calc_stream=sync_recv,
)
if sync_recv:
allgather_partial(
tensor_recv_prev,
nranks=mp_degree,
rank_id=mp_rank,
group=mp_group,
use_calc_stream=True,
)
else:
tasks.append(task)
if tensor_send_next is not None:
if isinstance(tensor_send_next, tuple):
for d in tensor_send_next:
paddle.distributed.wait(d, use_calc_stream=True)
if tensor_send_next is not None:
if isinstance(tensor_send_next, tuple):
for d in tensor_send_next:
paddle.distributed.wait(d, use_calc_stream=True)
send_partial(
d,
dst=1,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.send_next_group,
use_calc_stream=False,
)
else:
paddle.distributed.wait(tensor_send_next, use_calc_stream=True)
send_partial(
d,
tensor_send_next,
dst=1,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.send_next_group,
use_calc_stream=False,
)
else:
paddle.distributed.wait(tensor_send_next, use_calc_stream=True)
send_partial(
tensor_send_next,
dst=1,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.send_next_group,
use_calc_stream=False,
)
if tensor_recv_next is not None:
if isinstance(tensor_recv_next, tuple):
for d in tensor_recv_next:
if tensor_recv_next is not None:
if isinstance(tensor_recv_next, tuple):
for d in tensor_recv_next:
task = recv_partial(
d,
src=1,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.recv_next_group,
use_calc_stream=sync_recv,
)
if sync_recv:
allgather_partial(
d,
nranks=mp_degree,
rank_id=mp_rank,
group=mp_group,
use_calc_stream=True,
)
else:
tasks.append(task)
else:
task = recv_partial(
d,
tensor_recv_next,
src=1,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.recv_next_group,
use_calc_stream=sync_recv,
)
if sync_recv:
allgather_partial(
d,
tensor_recv_next,
nranks=mp_degree,
rank_id=mp_rank,
group=mp_group,
......@@ -462,25 +625,6 @@ def _p2p_helper(
else:
tasks.append(task)
else:
task = recv_partial(
tensor_recv_next,
src=1,
nranks=mp_degree,
rank_id=mp_rank,
group=_hcg.recv_next_group,
use_calc_stream=sync_recv,
)
if sync_recv:
allgather_partial(
tensor_recv_next,
nranks=mp_degree,
rank_id=mp_rank,
group=mp_group,
use_calc_stream=True,
)
else:
tasks.append(task)
if not sync_recv:
if framework.in_dygraph_mode():
# wait irecv tasks in eager dygraph mode with new comm library
......
......@@ -162,6 +162,19 @@ if((WITH_GPU) AND LOCAL_ALL_PLAT)
set_tests_properties(test_parallel_dygraph_pipeline_parallel
PROPERTIES TIMEOUT "500")
endif()
if((WITH_GPU OR WITH_XPU) AND LOCAL_ALL_PLAT)
bash_test_modules(
test_parallel_dygraph_pipeline_parallel_sync_send
START_BASH
../../dist_test.sh
LABELS
"RUN_TYPE=DIST"
ENVS
"PADDLE_DIST_UT_PORT=21992;http_proxy=;https_proxy=;PYTHONPATH=../..:${PADDLE_BINARY_DIR}/python;PADDLE_P2P_SYNC_SEND=1"
)
set_tests_properties(test_parallel_dygraph_pipeline_parallel_sync_send
PROPERTIES TIMEOUT "300")
endif()
if((WITH_GPU) AND LOCAL_ALL_PLAT)
bash_test_modules(
test_parallel_dygraph_pipeline_parallel_with_virtual_stage
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
from test_parallel_dygraph_dataparallel import TestMultipleGpus
# os.environ["PADDLE_P2P_SYNC_SEND"] = "1"
class TestHybridPipeParallel(TestMultipleGpus):
def test_hybrid_parallel_pp_layer(self):
self.run_mnist_2gpu(
os.path.abspath('../../hybrid_parallel_pp_layer.py')
)
def test_hybrid_parallel_pp_tuple_inputs(self):
self.run_mnist_2gpu('hybrid_parallel_pp_embedding.py')
def test_hybrid_parallel_shared_weight(self):
self.run_mnist_2gpu('hybrid_parallel_shared_weight.py')
if __name__ == "__main__":
unittest.main()
......@@ -13,6 +13,7 @@ test_tcp_store,LINUX;APPLE,,,DIST,../../dist_test.sh,2,,http_proxy=;https_proxy=
test_dygraph_sharding_stage3_for_eager,,,350,DIST,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../..,
test_communicator_half_async,,,120,DIST,test_runner.py,2,,FLAGS_communicator_send_queue_size=1;FLAGS_communicator_max_merge_var_num=1;http_proxy=;https_proxy=;PYTHONPATH=../..,WITH_NCCL
test_parallel_dygraph_pipeline_parallel,,GPU,500,DIST,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../..,
test_parallel_dygraph_pipeline_parallel_sync_send,,GPU;XPU,300,DIST,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../..;PADDLE_P2P_SYNC_SEND=1,
test_parallel_dygraph_pipeline_parallel_with_virtual_stage,,GPU,500,DIST,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../..,
test_parallel_dygraph_pp_adaptor,,GPU,500,DIST,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../..,
test_fleet_localsgd_meta_optimizer,LINUX,GPU;XPU,,,test_runner.py,2,,http_proxy=;https_proxy=;PYTHONPATH=../..,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册