From f064ead63dd0643222ae626a7647f6a30f91aaea Mon Sep 17 00:00:00 2001 From: Roc <30228238+sljlp@users.noreply.github.com> Date: Mon, 1 Aug 2022 16:06:43 +0800 Subject: [PATCH] [CI] CI for Distributed (#44085) --- CMakeLists.txt | 1 + .../fluid/tests/unittests/CMakeLists.txt | 27 ++ python/paddle/fluid/tests/unittests/common.py | 60 ++++ .../tests/unittests/dygraph_hybrid_dp.py | 67 ++++ .../tests/unittests/dygraph_hybrid_dpppmp.py | 213 +++++++++++++ .../tests/unittests/dygraph_hybrid_fp16.py | 225 +++++++++++++ .../unittests/dygraph_hybrid_recompute.py | 212 ++++++++++++ .../mn_dygraph_group_sharded_stage3.py | 301 ++++++++++++++++++ .../unittests/mn_dygraph_sharding_stage2.py | 251 +++++++++++++++ .../tests/unittests/multinode_dist_test.sh | 107 +++++++ .../unittests/test_collective_multi_nodes.py | 120 +++++++ .../test_multinode_dygraph_hybrid_dp.py | 40 +++ .../test_multinode_dygraph_hybrid_dpppmp.py | 50 +++ .../test_multinode_dygraph_sharding.py | 45 +++ 14 files changed, 1719 insertions(+) create mode 100644 python/paddle/fluid/tests/unittests/common.py create mode 100644 python/paddle/fluid/tests/unittests/dygraph_hybrid_dp.py create mode 100644 python/paddle/fluid/tests/unittests/dygraph_hybrid_dpppmp.py create mode 100644 python/paddle/fluid/tests/unittests/dygraph_hybrid_fp16.py create mode 100644 python/paddle/fluid/tests/unittests/dygraph_hybrid_recompute.py create mode 100644 python/paddle/fluid/tests/unittests/mn_dygraph_group_sharded_stage3.py create mode 100644 python/paddle/fluid/tests/unittests/mn_dygraph_sharding_stage2.py create mode 100644 python/paddle/fluid/tests/unittests/multinode_dist_test.sh create mode 100644 python/paddle/fluid/tests/unittests/test_collective_multi_nodes.py create mode 100755 python/paddle/fluid/tests/unittests/test_multinode_dygraph_hybrid_dp.py create mode 100755 python/paddle/fluid/tests/unittests/test_multinode_dygraph_hybrid_dpppmp.py create mode 100755 python/paddle/fluid/tests/unittests/test_multinode_dygraph_sharding.py diff --git a/CMakeLists.txt b/CMakeLists.txt index ea0011762d..762fd17909 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -243,6 +243,7 @@ include(simd) option(WITH_AVX "Compile PaddlePaddle with AVX intrinsics" ${AVX_FOUND}) option(WITH_PYTHON "Compile PaddlePaddle with python interpreter" ON) option(WITH_TESTING "Compile PaddlePaddle with unit testing" OFF) +option(WITH_MULTINODE_TESTING "Test multinode apis and ops" OFF) option(WITH_MKL "Compile PaddlePaddle with MKL support." ${AVX_FOUND}) option(WITH_SYSTEM_BLAS "Use system blas library" OFF) option(WITH_DISTRIBUTE "Compile with distributed support" OFF) diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index e34616f5fe..06fa8eedef 100755 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -7,6 +7,12 @@ set(GC_ENVS FLAGS_eager_delete_tensor_gb=0.0 FLAGS_fast_eager_deletion_mode=1 FLAGS_memory_fraction_of_eager_deletion=1.0) set(dist_ENVS http_proxy="" https_proxy="") +file( + GLOB MULTINODE_DIST_TEST_OPS + RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" + "test_multinode_*.py") +string(REPLACE ".py" "" MULTINODE_DIST_TEST_OPS "${MULTINODE_DIST_TEST_OPS}") + file( GLOB DIST_TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" @@ -78,6 +84,11 @@ list(APPEND DIST_TEST_OPS test_collective_batch_isend_irecv) list(APPEND DIST_TEST_OPS test_collective_reduce_scatter) set(MIXED_DIST_TEST_OPS ${DIST_TEST_OPS}) #remove distribute unittests. + +foreach(TEST_OP ${MULTINODE_DIST_TEST_OPS}) + list(APPEND MIXED_DIST_TEST_OPS ${TEST_OP}) +endforeach() + list(APPEND MIXED_DIST_TEST_OPS test_dgc_op) list(APPEND MIXED_DIST_TEST_OPS test_dgc_momentum_op) list(APPEND MIXED_DIST_TEST_OPS test_dgc_optimizer) @@ -135,6 +146,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_auto_parallel_reshard_mppp) list(APPEND MIXED_DIST_TEST_OPS test_auto_parallel_reshard_dpmppp) list(APPEND MIXED_DIST_TEST_OPS test_auto_parallel_cost_model) list(APPEND MIXED_DIST_TEST_OPS test_tcp_store) +list(APPEND MIXED_DIST_TEST_OPS test_dygraph_hybrid_dp) foreach(TEST_OP ${MIXED_DIST_TEST_OPS}) list(REMOVE_ITEM TEST_OPS ${TEST_OP}) endforeach() @@ -958,6 +970,21 @@ if(WITH_DISTRIBUTE) PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) endif() + # add new dist test + if(WITH_DISTRIBUTE AND WITH_MULTINODE_TESTING) + foreach(TEST_OP ${MULTINODE_DIST_TEST_OPS}) + bash_test_modules( + ${TEST_OP} + START_BASH + multinode_dist_test.sh + LABELS + "RUN_TYPE=EXCLUSIVE" + ENVS + "PADDLE_DIST_UT_PORT=${dist_ut_port}") + endforeach() + + endif() + # port range (20000, 23000) is reserved for dist-ops foreach(TEST_OP ${DIST_TEST_OPS}) bash_test_modules( diff --git a/python/paddle/fluid/tests/unittests/common.py b/python/paddle/fluid/tests/unittests/common.py new file mode 100644 index 0000000000..cb2ff2ea09 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/common.py @@ -0,0 +1,60 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from paddle.distributed import fleet + + +def init_parallel_env(mode, global_batch_size, seed=1024): + ''' + Args: + mode:(str) DP1-MP1-PP1-SH1-O1 + ''' + + def parse_mode(mode): + assert "DP" == mode[:2] + assert "-MP" in mode + assert "-PP" in mode + assert "-SH" in mode + assert "-O" in mode + modes = mode.split("-") + DP = int(modes[0][2:]) + MP = int(modes[1][2:]) + PP = int(modes[2][2:]) + SH = int(modes[3][2:]) + Ostage = int(modes[4][1:]) + return DP, MP, PP, SH, Ostage + + DP, MP, PP, SH, Ostage = parse_mode(mode) + + strategy = fleet.DistributedStrategy() + strategy.hybrid_configs = { + "dp_degree": DP, + "mp_degree": MP, + "pp_degree": PP, + "sharding_degree": SH + } + + accumulate_steps = 1 + + if PP > 1: + strategy.pipeline_configs = { + "accumulate_steps": accumulate_steps, + "micro_batch_size": global_batch_size // DP // accumulate_steps + } + + # set control in tensor parallel + strategy.tensor_parallel_configs = {"tensor_init_seed": seed} + fleet.init(is_collective=True, strategy=strategy) + + return fleet.get_hybrid_communicate_group() diff --git a/python/paddle/fluid/tests/unittests/dygraph_hybrid_dp.py b/python/paddle/fluid/tests/unittests/dygraph_hybrid_dp.py new file mode 100644 index 0000000000..47294c141f --- /dev/null +++ b/python/paddle/fluid/tests/unittests/dygraph_hybrid_dp.py @@ -0,0 +1,67 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import numpy as np +import argparse +import os +import sys +import signal +import time +import socket +from contextlib import closing +from six import string_types +import math +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +import paddle.fluid.unique_name as nameGen +from paddle.fluid import core +import paddle.distributed.fleet as fleet +from paddle.fluid.incubate.fleet.base import role_maker +import unittest +from multiprocessing import Process +import paddle.fluid.layers as layers +from functools import reduce +from test_collective_multi_nodes import TestCollectiveAPIRunnerBase, runtime_main + + +class TestDygrapgHybridDP(TestCollectiveAPIRunnerBase): + + def __init__(self): + pass + + def check_pass(self, *args, **kwargs): + from common import init_parallel_env + import paddle + from paddle.distributed import fleet + hcg = init_parallel_env("DP16-MP1-PP1-SH1-O1", 2) + import numpy as np + dp_group = hcg.get_data_parallel_group() + np.random.seed(1024) + data = np.random.random((10 * dp_group.nranks, 100)).reshape( + (dp_group.nranks, -1, 100)) + data_part = paddle.to_tensor(data[dp_group.rank]) + paddle.distributed.collective.all_reduce(data_part) + data_reduced = data_part + data_sumed = np.sum(data, axis=0) + assert np.allclose(data_sumed, + data_reduced.numpy(), + rtol=1e-8, + atol=1e-8) + + +if __name__ == "__main__": + runtime_main(TestDygrapgHybridDP, "dp") diff --git a/python/paddle/fluid/tests/unittests/dygraph_hybrid_dpppmp.py b/python/paddle/fluid/tests/unittests/dygraph_hybrid_dpppmp.py new file mode 100644 index 0000000000..256f733162 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/dygraph_hybrid_dpppmp.py @@ -0,0 +1,213 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import numpy as np +import argparse +import os +import sys +import signal +import time +import socket +from contextlib import closing +from six import string_types +import math +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +import paddle.fluid.unique_name as nameGen +from paddle.fluid import core +import paddle.distributed.fleet as fleet +from paddle.fluid.incubate.fleet.base import role_maker +import unittest +from multiprocessing import Process +import paddle.fluid.layers as layers +from functools import reduce +from test_collective_multi_nodes import TestCollectiveAPIRunnerBase, runtime_main +from paddle import nn +import numpy as np + + +def weight_init(mp, shape, col=True, seed=1024): + np.random.seed(seed) + w = np.random.normal(0, 0.02, size=shape) + if mp is None: + _w = w + else: + if col: + step = shape[1] // mp.nranks + _w = w[:, mp.rank * step:mp.rank * step + step] + else: + step = shape[0] // mp.nranks + _w = w[mp.rank * step:mp.rank * step + step, :] + return paddle.fluid.initializer.NumpyArrayInitializer(_w) + + +class Criterion(nn.Layer): + + def __init__(self): + super(Criterion, self).__init__() + self.loss_func = nn.MSELoss(reduction="mean") + + def forward(self, pred, label): + loss = self.loss_func(pred, label) + return loss + + +class ModelPipeline(fleet.meta_parallel.PipelineLayer): + + def __init__(self, hcg): + paddle.seed(1024) + dp_linear = nn.Linear(32, 128) + self.layers_pp = [] + self.topology = hcg.topology() + self.layers_pp.append(dp_linear) + mp = hcg.get_model_parallel_group() + for i in range(6): + if mp is not None and mp.nranks > 1: + mp_linear_1 = fleet.meta_parallel.ColumnParallelLinear( + 128, + 512, + weight_attr=weight_init(mp, (128, 512), True, 1204 + i), + has_bias=True, + gather_output=False) + mp_linear_2 = fleet.meta_parallel.RowParallelLinear( + 512, + 128, + weight_attr=weight_init(mp, (512, 128), False, 2012 + i), + has_bias=True, + input_is_parallel=True) + else: + mp_linear_1 = nn.Linear(128, + 512, + weight_attr=weight_init( + None, (128, 512), True, 1204 + i)) + mp_linear_2 = nn.Linear(512, + 128, + weight_attr=weight_init( + None, (512, 128), True, 2012 + i)) + act = nn.ReLU6() + layer_seq = nn.Sequential(mp_linear_1, mp_linear_2, act) + self.layers_pp.append(layer_seq) + + out = nn.Linear(128, 32) + self.layers_pp.append(out) + super(ModelPipeline, self).__init__(layers=self.layers_pp, + loss_fn=Criterion(), + topology=self.topology) + + +class Model(nn.Layer): + + def __init__(self, hcg): + super(Model, self).__init__() + paddle.seed(1024) + dp_linear = nn.Linear(32, 128) + self.layers_pp = [] + self.layers_pp.append(dp_linear) + mp = hcg.get_model_parallel_group() if hcg else None + for i in range(6): + if mp is not None and mp.nranks > 1: + mp_linear_1 = fleet.meta_parallel.ColumnParallelLinear( + 128, + 512, + weight_attr=weight_init(mp, (128, 512), True, 1204 + i), + has_bias=True, + gather_output=False) + mp_linear_2 = fleet.meta_parallel.RowParallelLinear( + 512, + 128, + weight_attr=weight_init(mp, (512, 128), False, 2012 + i), + has_bias=True, + input_is_parallel=True) + else: + mp_linear_1 = nn.Linear(128, + 512, + weight_attr=weight_init( + None, (128, 512), True, 1204 + i)) + mp_linear_2 = nn.Linear(512, + 128, + weight_attr=weight_init( + None, (512, 128), True, 2012 + i)) + act = nn.ReLU6() + layer_seq = nn.Sequential(mp_linear_1, mp_linear_2, act) + self.layers_pp.append(layer_seq) + + out = nn.Linear(128, 32) + self.layers_pp.append(out) + self.layers = nn.Sequential(*self.layers_pp) + + def forward(self, x): + return self.layers(x) + + +class TestDygrapgHybridDPPPMP(TestCollectiveAPIRunnerBase): + + def __init__(self): + pass + + def check_pass(self, *args, **kwargs): + + from common import init_parallel_env + import paddle + from paddle.distributed import fleet + hcg = init_parallel_env("DP4-MP2-PP2-SH1-O1", 64) + pp_degree = hcg.get_pipe_parallel_world_size() + import numpy as np + crit = Criterion() + if pp_degree <= 1: + model = Model(hcg) + else: + model = ModelPipeline(hcg) + + model_base = Model(None) + + optimizer = paddle.optimizer.Adam(learning_rate=0.01, + parameters=model.parameters()) + optimizer_base = paddle.optimizer.Adam( + learning_rate=0.01, parameters=model_base.parameters()) + + model = fleet.distributed_model(model) + optimizer = fleet.distributed_optimizer(optimizer) + loss_hybrid_arr = [] + loss_base_arr = [] + + x = paddle.to_tensor(np.random.random((16, 32))).astype("float32") + y = paddle.to_tensor(np.random.random((16, 32))).astype("float32") + + for _ in range(5): + if pp_degree > 1: + loss = model.train_batch([x, y], optimizer=optimizer) + else: + output = model(x) + loss = crit(output, y) + loss.backward() + optimizer.step() + optimizer.clear_grad() + + # baseline loss + output_base = model_base(x) + loss_base = crit(output_base, y) + loss_base.backward() + optimizer_base.step() + optimizer_base.clear_grad() + + loss_base_arr.append(loss_base.numpy()) + loss_hybrid_arr.append(loss.numpy()) + assert np.allclose(loss_base_arr, loss_hybrid_arr, rtol=1e-5, atol=1e-5) + + +if __name__ == "__main__": + runtime_main(TestDygrapgHybridDPPPMP, "dpppmp") diff --git a/python/paddle/fluid/tests/unittests/dygraph_hybrid_fp16.py b/python/paddle/fluid/tests/unittests/dygraph_hybrid_fp16.py new file mode 100644 index 0000000000..be036de31f --- /dev/null +++ b/python/paddle/fluid/tests/unittests/dygraph_hybrid_fp16.py @@ -0,0 +1,225 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import numpy as np +import argparse +import os +import sys +import signal +import time +import socket +from contextlib import closing +from six import string_types +import math +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +import paddle.fluid.unique_name as nameGen +from paddle.fluid import core +import paddle.distributed.fleet as fleet +from paddle.fluid.incubate.fleet.base import role_maker +import unittest +from multiprocessing import Process +import paddle.fluid.layers as layers +from functools import reduce +from test_collective_multi_nodes import TestCollectiveAPIRunnerBase, runtime_main +from paddle import nn +import numpy as np + + +def weight_init(mp, shape, col=True, seed=1024): + np.random.seed(seed) + w = np.random.normal(0, 0.02, size=shape) + if mp is None: + _w = w + else: + if col: + step = shape[1] // mp.nranks + _w = w[:, mp.rank * step:mp.rank * step + step] + else: + step = shape[0] // mp.nranks + _w = w[mp.rank * step:mp.rank * step + step, :] + return paddle.fluid.initializer.NumpyArrayInitializer(_w) + + +class Criterion(nn.Layer): + + def __init__(self): + super(Criterion, self).__init__() + self.loss_func = nn.MSELoss(reduction="mean") + + def forward(self, pred, label): + loss = self.loss_func(pred, label) + return loss + + +class ModelPipeline(fleet.meta_parallel.PipelineLayer): + + def __init__(self, hcg): + paddle.seed(1024) + dp_linear = nn.Linear(32, 128) + self.layers_pp = [] + self.topology = hcg.topology() + self.layers_pp.append(dp_linear) + mp = hcg.get_model_parallel_group() + for i in range(6): + if mp is not None and mp.nranks > 1: + mp_linear_1 = fleet.meta_parallel.ColumnParallelLinear( + 128, + 512, + weight_attr=weight_init(mp, (128, 512), True, 1204 + i), + has_bias=True, + gather_output=False) + mp_linear_2 = fleet.meta_parallel.RowParallelLinear( + 512, + 128, + weight_attr=weight_init(mp, (512, 128), False, 2012 + i), + has_bias=True, + input_is_parallel=True) + else: + mp_linear_1 = nn.Linear(128, + 512, + weight_attr=weight_init( + None, (128, 512), True, 1204 + i)) + mp_linear_2 = nn.Linear(512, + 128, + weight_attr=weight_init( + None, (512, 128), True, 2012 + i)) + act = nn.ReLU6() + layer_seq = nn.Sequential(mp_linear_1, mp_linear_2, act) + self.layers_pp.append(layer_seq) + + out = nn.Linear(128, 32) + self.layers_pp.append(out) + super(ModelPipeline, self).__init__(layers=self.layers_pp, + loss_fn=Criterion(), + topology=self.topology) + + +class Model(nn.Layer): + + def __init__(self, hcg): + super(Model, self).__init__() + paddle.seed(1024) + dp_linear = nn.Linear(32, 128) + self.layers_pp = [] + self.layers_pp.append(dp_linear) + mp = hcg.get_model_parallel_group() if hcg else None + for i in range(6): + if mp is not None and mp.nranks > 1: + mp_linear_1 = fleet.meta_parallel.ColumnParallelLinear( + 128, + 512, + weight_attr=weight_init(mp, (128, 512), True, 1204 + i), + has_bias=True, + gather_output=False) + mp_linear_2 = fleet.meta_parallel.RowParallelLinear( + 512, + 128, + weight_attr=weight_init(mp, (512, 128), False, 2012 + i), + has_bias=True, + input_is_parallel=True) + else: + mp_linear_1 = nn.Linear(128, + 512, + weight_attr=weight_init( + None, (128, 512), True, 1204 + i)) + mp_linear_2 = nn.Linear(512, + 128, + weight_attr=weight_init( + None, (512, 128), True, 2012 + i)) + act = nn.ReLU6() + layer_seq = nn.Sequential(mp_linear_1, mp_linear_2, act) + self.layers_pp.append(layer_seq) + + out = nn.Linear(128, 32) + self.layers_pp.append(out) + self.layers = nn.Sequential(*self.layers_pp) + + def forward(self, x): + return self.layers(x) + + +class TestDygraphHybridFp16(TestCollectiveAPIRunnerBase): + + def __init__(self): + pass + + def check_pass(self, *args, **kwargs): + + from common import init_parallel_env + import paddle + from paddle.distributed import fleet + hcg = init_parallel_env("DP4-MP2-PP2-SH1-O1", 64) + pp_degree = hcg.get_pipe_parallel_world_size() + import numpy as np + crit = Criterion() + if pp_degree <= 1: + model = Model(hcg) + else: + model = ModelPipeline(hcg) + + model_base = Model(None) + + optimizer = paddle.optimizer.Adam(learning_rate=0.01, + parameters=model.parameters(), + multi_precision=True) + optimizer_base = paddle.optimizer.Adam( + learning_rate=0.01, parameters=model_base.parameters()) + + scaler = paddle.amp.GradScaler(init_loss_scaling=4096) + scaler = fleet.distributed_scaler(scaler) + model = paddle.amp.decorate(models=model, + level='O2', + save_dtype='float32') + + model = fleet.distributed_model(model) + optimizer = fleet.distributed_optimizer(optimizer) + + loss_hybrid_arr = [] + loss_base_arr = [] + x = paddle.to_tensor(np.random.random((16, 32))).astype("float32") + y = paddle.to_tensor(np.random.random((16, 32))).astype("float32") + + for _ in range(2): + if pp_degree > 1: + with paddle.amp.auto_cast(True, level='O2'): + loss = model.train_batch([x, y], + optimizer=optimizer, + scaler=scaler) + else: + with paddle.amp.auto_cast(True, level='O2'): + output = model(x) + loss = crit(output, y) + scaler.scale(loss).backward() + scaler.minimize(optimizer, loss) + optimizer.clear_grad() + + # baseline loss + with paddle.amp.auto_cast(True, level='O2'): + output_base = model_base(x) + loss_base = crit(output_base, y) + loss_base.backward() + optimizer_base.step() + optimizer_base.clear_grad() + + loss_base_arr.append(loss_base.numpy()) + loss_hybrid_arr.append(loss) + assert np.allclose(loss_base_arr, loss_hybrid_arr, rtol=1e-3, atol=1e-3) + + +if __name__ == "__main__": + runtime_main(TestDygraphHybridFp16, "dpppmp") diff --git a/python/paddle/fluid/tests/unittests/dygraph_hybrid_recompute.py b/python/paddle/fluid/tests/unittests/dygraph_hybrid_recompute.py new file mode 100644 index 0000000000..20196a98eb --- /dev/null +++ b/python/paddle/fluid/tests/unittests/dygraph_hybrid_recompute.py @@ -0,0 +1,212 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import numpy as np +import argparse +import os +import sys +import signal +import time +import socket +from contextlib import closing +from six import string_types +import math +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +import paddle.fluid.unique_name as nameGen +from paddle.fluid import core +import paddle.distributed.fleet as fleet +from paddle.fluid.incubate.fleet.base import role_maker +import unittest +from multiprocessing import Process +import paddle.fluid.layers as layers +from functools import reduce +from test_collective_multi_nodes import TestCollectiveAPIRunnerBase, runtime_main +from paddle import nn +import numpy as np + +from paddle.distributed.fleet.utils import recompute + + +def weight_init(mp, shape, col=True, seed=1024): + np.random.seed(seed) + w = np.random.normal(0, 0.02, size=shape) + if mp is None: + _w = w + else: + if col: + step = shape[1] // mp.nranks + _w = w[:, mp.rank * step:mp.rank * step + step] + else: + step = shape[0] // mp.nranks + _w = w[mp.rank * step:mp.rank * step + step, :] + return paddle.fluid.initializer.NumpyArrayInitializer(_w) + + +class Criterion(nn.Layer): + + def __init__(self): + super(Criterion, self).__init__() + self.loss_func = nn.MSELoss(reduction="mean") + + def forward(self, pred, label): + loss = self.loss_func(pred, label) + return loss + + +class RecomputeMatmulBlock(nn.Layer): + + def __init__(self, mp, seed, m, n, k): + super(RecomputeMatmulBlock, self).__init__() + self.mp = mp + if mp is not None and mp.nranks > 1: + mp_linear_1 = fleet.meta_parallel.ColumnParallelLinear( + m, + n, + weight_attr=weight_init(mp, (m, n), True, seed), + has_bias=True, + gather_output=False) + mp_linear_2 = fleet.meta_parallel.RowParallelLinear( + n, + k, + weight_attr=weight_init(mp, (n, k), False, seed + 1), + has_bias=True, + input_is_parallel=True) + else: + mp_linear_1 = nn.Linear(m, + n, + weight_attr=weight_init( + None, (m, n), True, seed)) + mp_linear_2 = nn.Linear(n, + k, + weight_attr=weight_init( + None, (n, k), True, seed + 1)) + self.layers = nn.Sequential(mp_linear_1, mp_linear_2) + + def forward(self, x): + if self.mp: + return recompute(self.layers, x) + else: + return self.layers(x) + + +RecomputeBlock = RecomputeMatmulBlock + + +class ModelPipeline(fleet.meta_parallel.PipelineLayer): + + def __init__(self, hcg): + paddle.seed(1024) + dp_linear = nn.Linear(32, 64) + self.layers_pp = [] + self.topology = hcg.topology() + self.layers_pp.append(dp_linear) + mp = hcg.get_model_parallel_group() + for i in range(6): + mp_layer = RecomputeBlock(mp, 1024 + i, 64, 128, 64) + act = nn.ReLU6() + layer_seq = nn.Sequential(mp_layer, act) + self.layers_pp.append(layer_seq) + + out = nn.Linear(64, 32) + self.layers_pp.append(out) + super(ModelPipeline, self).__init__(layers=self.layers_pp, + loss_fn=Criterion(), + topology=self.topology) + + +class Model(nn.Layer): + + def __init__(self, hcg): + super(Model, self).__init__() + paddle.seed(1024) + dp_linear = nn.Linear(32, 64) + self.layers_pp = [] + self.layers_pp.append(dp_linear) + mp = hcg.get_model_parallel_group() if hcg else None + for i in range(6): + mp_layer = RecomputeBlock(mp, 1024 + i, 64, 128, 64) + act = nn.ReLU6() + layer_seq = nn.Sequential(mp_layer, act) + self.layers_pp.append(layer_seq) + + out = nn.Linear(64, 32) + self.layers_pp.append(out) + self.layers = nn.Sequential(*self.layers_pp) + + def forward(self, x): + return self.layers(x) + + +class TestDygrapgHybridRecompute(TestCollectiveAPIRunnerBase): + + def __init__(self): + pass + + def check_pass(self, *args, **kwargs): + + from common import init_parallel_env + import paddle + from paddle.distributed import fleet + hcg = init_parallel_env("DP4-MP2-PP2-SH1-O1", 64) + pp_degree = hcg.get_pipe_parallel_world_size() + import numpy as np + crit = Criterion() + if pp_degree <= 1: + model = Model(hcg) + else: + model = ModelPipeline(hcg) + + model_base = Model(None) + + optimizer = paddle.optimizer.Adam(learning_rate=0.01, + parameters=model.parameters()) + optimizer_base = paddle.optimizer.Adam( + learning_rate=0.01, parameters=model_base.parameters()) + + model = fleet.distributed_model(model) + optimizer = fleet.distributed_optimizer(optimizer) + loss_hybrid_arr = [] + loss_base_arr = [] + + x = paddle.to_tensor(np.random.random((16, 32))).astype("float32") + y = paddle.to_tensor(np.random.random((16, 32))).astype("float32") + + for _ in range(5): + if pp_degree > 1: + loss = model.train_batch([x, y], optimizer=optimizer) + else: + output = model(x) + loss = crit(output, y) + loss.backward() + optimizer.step() + optimizer.clear_grad() + + # baseline loss + output_base = model_base(x) + loss_base = crit(output_base, y) + loss_base.backward() + optimizer_base.step() + optimizer_base.clear_grad() + + loss_base_arr.append(loss_base.numpy()) + loss_hybrid_arr.append(loss) + assert np.allclose(loss_base_arr, loss_hybrid_arr, rtol=1e-5, atol=1e-5) + + +if __name__ == "__main__": + runtime_main(TestDygrapgHybridRecompute, "dpppmp") diff --git a/python/paddle/fluid/tests/unittests/mn_dygraph_group_sharded_stage3.py b/python/paddle/fluid/tests/unittests/mn_dygraph_group_sharded_stage3.py new file mode 100644 index 0000000000..6e742f1b2f --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mn_dygraph_group_sharded_stage3.py @@ -0,0 +1,301 @@ +# -*- coding: UTF-8 -*- + +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import shutil +import tempfile +import numpy as np +import argparse +import ast +import time +import paddle +import paddle.fluid as fluid +from paddle.fluid.dygraph.nn import Linear +from paddle.distributed import fleet +from paddle.fluid.dygraph import nn +from paddle.fluid.framework import _test_eager_guard + +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_optimizer_stage2 import GroupShardedOptimizerStage2 +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage2 import GroupShardedStage2 +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage3 import GroupShardedStage3 +from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_utils import GroupShardedScaler + +epoch = 10 +paddle.seed(2022) +np.random.seed(2022) +base_lr = 0.1 +momentum_rate = 0.9 +l2_decay = 1e-4 + + +class MLP(fluid.Layer): + + def __init__(self, linear_size=1000, param_attr=None, bias_attr=None): + super(MLP, self).__init__() + + self._linear1 = Linear(linear_size, linear_size) + self._linear2 = Linear(linear_size, linear_size) + self._linear3 = Linear(linear_size, linear_size) + self._linear4 = Linear(linear_size, linear_size) + self._linear5 = Linear(linear_size, 10) + + def forward(self, inputs): + y = self._linear1(inputs) + y = self._linear2(y) + y = self._linear3(y) + y = self._linear4(y) + y = self._linear5(y) + return y + + +def reader_decorator(linear_size=1000): + + def __reader__(): + for _ in range(100): + img = np.random.rand(linear_size).astype('float32') + label = np.ones(1).astype('int64') + yield img, label + + return __reader__ + + +def optimizer_setting(model, use_pure_fp16, opt_group=False): + clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0) + optimizer = paddle.optimizer.Momentum( + parameters=[{ + "params": list(model.parameters()) + }] if opt_group else list(model.parameters()), + learning_rate=0.001, + weight_decay=0.00001, + grad_clip=clip, + multi_precision=use_pure_fp16) + + return optimizer + + +def train_mlp(model, + sharding_stage, + use_pure_fp16=False, + accumulate_grad=False, + batch_size=100, + opt_group=False, + sync_comm=False, + test_minimize=False, + save_model=False): + group = paddle.distributed.new_group( + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]) + if opt_group: + optimizer = optimizer_setting(model=model, + use_pure_fp16=use_pure_fp16, + opt_group=opt_group) + else: + optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16) + + if use_pure_fp16: + model = paddle.amp.decorate(models=model, + level='O2', + save_dtype='float32') + scaler = paddle.amp.GradScaler(init_loss_scaling=32768) + scaler = GroupShardedScaler(scaler) + if sharding_stage == 2: + optimizer = GroupShardedOptimizerStage2( + params=optimizer._parameter_list, optim=optimizer, group=group) + model = GroupShardedStage2(model, + optimizer, + group=group, + buffer_max_size=2**21) + elif sharding_stage == 3: + model = GroupShardedStage3(model, + optimizer=optimizer, + group=group, + sync_comm=sync_comm, + segment_size=2**15) + + # check optimizer.minimize() error + if test_minimize: + try: + optimizer.minimize() + except: + print( + "====== Find sharding_stage3_optimizer.minimize() error ======") + return + + train_reader = paddle.batch(reader_decorator(), + batch_size=batch_size, + drop_last=True) + + train_loader = paddle.io.DataLoader.from_generator(capacity=32, + use_double_buffer=True, + iterable=True, + return_list=True, + use_multiprocess=True) + train_loader.set_sample_list_generator(train_reader) + + for eop in range(epoch): + model.train() + for batch_id, data in enumerate(train_loader()): + img, label = data + label.stop_gradient = True + img.stop_gradient = True + with paddle.amp.auto_cast(True, level='O2'): + out = model(img) + loss = paddle.nn.functional.cross_entropy(input=out, + label=label) + avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32)) + + if batch_size == 20: + avg_loss = avg_loss / 5 + + if not use_pure_fp16: + avg_loss.backward() + else: + scaler.scale(avg_loss).backward() + + if not accumulate_grad: + if not use_pure_fp16: + optimizer.step() + else: + scaler.step(optimizer) + scaler.update() + optimizer.clear_grad() + if accumulate_grad: + if not use_pure_fp16: + optimizer.step() + else: + scaler.step(optimizer) + scaler.update() + optimizer.clear_grad() + if sharding_stage == 3: + model.get_all_parameters() + + if save_model: + return model, optimizer + return model.parameters() + + +def test_stage2_stage3(): + paddle.distributed.init_parallel_env() + mlp, mlp1, mlp2, mlp3, mlp4, mlp5, mlp6, mlp7, mlp8, mlp9, mlp10 = MLP( + ), MLP(), MLP(), MLP(), MLP(), MLP(), MLP(), MLP(), MLP(), MLP(), MLP() + state_dict = mlp.state_dict() + mlp1.set_state_dict(state_dict) + mlp2.set_state_dict(state_dict) + mlp3.set_state_dict(state_dict) + mlp4.set_state_dict(state_dict) + mlp5.set_state_dict(state_dict) + mlp6.set_state_dict(state_dict) + mlp7.set_state_dict(state_dict) + mlp8.set_state_dict(state_dict) + mlp9.set_state_dict(state_dict) + mlp10.set_state_dict(state_dict) + + # fp32 + stage2_params = train_mlp(mlp1, + sharding_stage=2, + use_pure_fp16=False, + opt_group=False) + stage3_params = train_mlp(mlp2, + sharding_stage=3, + use_pure_fp16=False, + opt_group=False) + + for i in range(len(stage2_params)): + np.testing.assert_allclose(stage2_params[i].numpy(), + stage3_params[i].numpy(), + rtol=1e-6, + atol=1e-6) + + # fp32 accumulate grad + stage3_params = train_mlp(mlp3, + sharding_stage=3, + use_pure_fp16=False, + accumulate_grad=True, + opt_group=True) + stage3_params_add = train_mlp(mlp4, + sharding_stage=3, + use_pure_fp16=False, + accumulate_grad=True, + batch_size=20, + opt_group=True) + for i in range(len(stage3_params)): + np.testing.assert_allclose(stage3_params[i].numpy(), + stage3_params_add[i].numpy(), + rtol=1e-6, + atol=1e-4) + + # fp16 + stage2_params = train_mlp(mlp5, + sharding_stage=2, + use_pure_fp16=True, + opt_group=False) + stage3_params = train_mlp(mlp6, + sharding_stage=3, + use_pure_fp16=True, + opt_group=False) + for i in range(len(stage2_params)): + np.testing.assert_allclose(stage2_params[i].numpy(), + stage3_params[i].numpy(), + rtol=1e-4, + atol=1e-3) + + # fp16 sync_comm + stage3_params = train_mlp(mlp7, + sharding_stage=3, + use_pure_fp16=True, + opt_group=False) + stage3_params_re = train_mlp(mlp8, + sharding_stage=3, + use_pure_fp16=True, + opt_group=False, + sync_comm=True) + for i in range(len(stage3_params)): + np.testing.assert_allclose(stage3_params[i].numpy(), + stage3_params_re[i].numpy(), + rtol=1e-6) + + # save/load model + output_dir = tempfile.mkdtemp() + try: + model_file = os.path.join(output_dir, "model.pdmodel") + optimizer_file = os.path.join(output_dir, "model.pdopt") + model_stage3, optimizer_stage3 = train_mlp(mlp9, + sharding_stage=3, + use_pure_fp16=False, + opt_group=False, + save_model=True) + paddle.save(model_stage3.state_dict(), model_file) + paddle.save(optimizer_stage3.state_dict(), optimizer_file) + m_state_dict = paddle.load(model_file) + opt_state_dict = paddle.load(optimizer_file) + model_stage3.set_state_dict(m_state_dict) + optimizer_stage3.set_state_dict(opt_state_dict) + except Exception as e: + shutil.rmtree(output_dir) + raise e + else: + shutil.rmtree(output_dir) + + # check optimizer.minimize() error + train_mlp(mlp10, + sharding_stage=3, + use_pure_fp16=False, + opt_group=False, + test_minimize=True) + + +if __name__ == '__main__': + with _test_eager_guard(): + test_stage2_stage3() diff --git a/python/paddle/fluid/tests/unittests/mn_dygraph_sharding_stage2.py b/python/paddle/fluid/tests/unittests/mn_dygraph_sharding_stage2.py new file mode 100644 index 0000000000..6aa8889218 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mn_dygraph_sharding_stage2.py @@ -0,0 +1,251 @@ +# -*- coding: UTF-8 -*- + +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import shutil +import numpy as np +import argparse +import tempfile +import ast +import time +import paddle +import paddle.fluid as fluid +from paddle.fluid.dygraph.nn import Linear +from paddle.distributed import fleet +from paddle.fluid.dygraph import nn +from paddle.fluid.framework import _test_eager_guard + +from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimizer_stage2 import ShardingOptimizerStage2 +from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ShardingStage2 + +seed = 2022 +epoch = 2 +linear_size = 1000 + +strategy = fleet.DistributedStrategy() +strategy.hybrid_configs = { + "dp_degree": 16, + "mp_degree": 1, + "pp_degree": 1, + "sharding_degree": 1 +} + +np.random.seed(seed) +paddle.seed(seed) + + +class MLP(fluid.Layer): + + def __init__(self, linear_size=1000, param_attr=None, bias_attr=None): + super(MLP, self).__init__() + + self._linear1 = Linear(linear_size, linear_size) + self._linear2 = Linear(linear_size, linear_size) + self._linear3 = Linear(linear_size, linear_size) + self._linear4 = Linear(linear_size, linear_size) + self._linear5 = Linear(linear_size, 10) + + def forward(self, inputs): + y = self._linear1(inputs) + y = self._linear2(y) + y = self._linear3(y) + y = self._linear4(y) + y = self._linear5(y) + return y + + +def reader_decorator(linear_size=1000): + + def __reader__(): + for _ in range(100): + img = np.random.rand(linear_size).astype('float32') + label = np.ones(1).astype('int64') + yield img, label + + return __reader__ + + +def optimizer_setting(model, use_pure_fp16, opt_group=False): + clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0) + optimizer = paddle.optimizer.AdamW(parameters=[{ + "params": model.parameters() + }] if opt_group else model.parameters(), + learning_rate=0.001, + weight_decay=0.00001, + grad_clip=clip, + multi_precision=use_pure_fp16) + + return optimizer + + +def train_mlp(model, + sharding_stage, + batch_size=100, + use_pure_fp16=False, + accumulate_grad=False, + opt_group=False, + save_model=False): + if sharding_stage == "dp": + hcg = fleet.get_hybrid_communicate_group() + group = hcg.get_check_parallel_group() + else: + group = paddle.distributed.new_group( + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]) + if opt_group: + optimizer = optimizer_setting(model=model, + use_pure_fp16=use_pure_fp16, + opt_group=opt_group) + else: + optimizer = optimizer_setting(model=model, use_pure_fp16=use_pure_fp16) + + if sharding_stage == 2: + optimizer = ShardingOptimizerStage2(params=model.parameters(), + optim=optimizer, + group=group) + + model = ShardingStage2(model, + optimizer, + group=group, + buffer_max_size=2**21) + else: + optimizer = fleet.distributed_optimizer(optimizer) + model = fleet.distributed_model(model) + + train_reader = paddle.batch(reader_decorator(), + batch_size=batch_size, + drop_last=True) + + train_loader = paddle.io.DataLoader.from_generator(capacity=32, + use_double_buffer=True, + iterable=True, + return_list=True, + use_multiprocess=True) + train_loader.set_sample_list_generator(train_reader) + + if sharding_stage == 2: + model.to(device="gpu") + + for eop in range(epoch): + model.train() + + for batch_id, data in enumerate(train_loader()): + img, label = data + label.stop_gradient = True + img.stop_gradient = True + + out = model(img) + loss = paddle.nn.functional.cross_entropy(input=out, label=label) + + avg_loss = paddle.mean(x=loss.cast(dtype=paddle.float32)) + if batch_size == 20: + avg_loss = avg_loss / 5 + avg_loss.backward() + + if not accumulate_grad: + optimizer.step() + optimizer.clear_grad() + + if accumulate_grad: + optimizer.step() + optimizer.clear_grad() + + if save_model: + return model, optimizer + return model.parameters() + + +def test_dp_stage2(): + mlp = MLP() + state_dict = mlp.state_dict() + mlp1 = MLP() + mlp2 = MLP() + mlp3 = MLP() + mlp4 = MLP() + mlp5 = MLP() + mlp6 = MLP() + mlp1.set_state_dict(state_dict) + mlp2.set_state_dict(state_dict) + mlp3.set_state_dict(state_dict) + mlp4.set_state_dict(state_dict) + mlp5.set_state_dict(state_dict) + mlp6.set_state_dict(state_dict) + + # DP VS stage2 + dp_params = train_mlp(mlp1, + sharding_stage="dp", + use_pure_fp16=False, + opt_group=False) + stage2_params = train_mlp(mlp2, + sharding_stage=2, + use_pure_fp16=False, + opt_group=False) + for i in range(len(dp_params)): + np.testing.assert_allclose(dp_params[i].numpy(), + stage2_params[i].numpy(), + rtol=1e-6, + atol=5e-4) + + # stage2 accumulate grad + stage2_params = train_mlp(mlp3, sharding_stage=2, accumulate_grad=True) + stage2_accumulate_grad = train_mlp(mlp4, + sharding_stage=2, + batch_size=20, + accumulate_grad=True) + for i in range(len(stage2_params)): + np.testing.assert_allclose(stage2_params[i].numpy(), + stage2_accumulate_grad[i].numpy(), + rtol=1e-5, + atol=1e-5) + + # stage2 param list VS param group + stage2_params = train_mlp(mlp5, + sharding_stage=2, + use_pure_fp16=False, + opt_group=True) + for i in range(len(dp_params)): + np.testing.assert_allclose(dp_params[i].numpy(), + stage2_params[i].numpy(), + rtol=1e-6, + atol=5e-4) + + # save/load model + output_dir = tempfile.mkdtemp() + try: + model_file = os.path.join(output_dir, "model.pdmodel") + optimizer_file = os.path.join(output_dir, "model.pdopt") + model_stage2, optimizer_stage2 = train_mlp(mlp6, + sharding_stage=2, + use_pure_fp16=False, + opt_group=False, + save_model=True) + paddle.save(model_stage2.state_dict(), model_file) + paddle.save(optimizer_stage2.state_dict(), optimizer_file) + m_state_dict = paddle.load(model_file) + opt_state_dict = paddle.load(optimizer_file) + model_stage2.set_state_dict(m_state_dict) + optimizer_stage2.set_state_dict(opt_state_dict) + except Exception as e: + shutil.rmtree(output_dir) + raise e + else: + shutil.rmtree(output_dir) + + +if __name__ == '__main__': + with _test_eager_guard(): + pass + fleet.init(is_collective=True, strategy=strategy) + test_dp_stage2() diff --git a/python/paddle/fluid/tests/unittests/multinode_dist_test.sh b/python/paddle/fluid/tests/unittests/multinode_dist_test.sh new file mode 100644 index 0000000000..8ea1937f83 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/multinode_dist_test.sh @@ -0,0 +1,107 @@ +#!/bin/bash + +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +unset https_proxy http_proxy +export FLAGS_rpc_disable_reuse_port=1 + +export MPIRUN=${EXE_MPIRUN:-""} # MPIRUN="mpirun" + +name=${TEST_TARGET_NAME} +TEST_TIMEOUT=${TEST_TIMEOUT} + +if [[ ${name}"x" == "x" ]]; then + echo "can't find ${name}, please set ${TEST_TARGET_NAME} first" + exit 1 +fi + +if [[ ${TEST_TIMEOUT}"x" == "x" ]]; then + echo "can't find ${TEST_TIMEOUT}, please set ${TEST_TIMEOUT} first" + exit 1 +fi + + +# rm flag file +rm -f ${name}_*.log + +# start the unit test +run_time=$(( $TEST_TIMEOUT - 10 )) +echo "run_time: ${run_time}" + +if [[ ${WITH_COVERAGE} == "ON" ]]; then + PYTHON_EXEC="python3 -u -m coverage run --branch -p " +else + PYTHON_EXEC="python3 -u " +fi + +unset PYTHONPATH +timeout -s SIGKILL ${run_time} ${MPIRUN} ${PYTHON_EXEC} ${name}.py > ${name}_run.log 2>&1 +exit_code=$? +if [[ $exit_code -eq 0 ]]; then + exit 0 +fi + +echo "${name} faild with ${exit_code}" + +echo "after run ${name}" +ps -aux +netstat -anlp + +# paddle log +echo "${name} log" +for log in `ls ${name}_*.log` +do + printf "\ncat ${log}\n" + cat -n ${log} +done + +# check CUDA or ROCM env +GPU_SYS_INFO_CMD=nvidia-smi + +which ${GPU_SYS_INFO_CMD} +exit_code=$? +if [[ $exit_code -ne 0 ]]; then + GPU_SYS_INFO_CMD=rocm-smi +fi + +which ${GPU_SYS_INFO_CMD} +exit_code=$? +if [[ $exit_code -ne 0 ]]; then + echo "nvidia-smi or rocm-smi faild with ${exit_code}" + exit ${exit_code} +fi + +#display system context +for i in {1..2}; do + sleep 3 + ps -aux + netstat -anlp + + if hash "${GPU_SYS_INFO_CMD}" > /dev/null; then + ${GPU_SYS_INFO_CMD} + fi +done + +echo "dist space:" +df -h + +#display /tmp/files +echo "ls /tmp/paddle.*" +ls -l /tmp/paddle.* + +echo "ls -l ./" +ls -l ./ + +exit 1 diff --git a/python/paddle/fluid/tests/unittests/test_collective_multi_nodes.py b/python/paddle/fluid/tests/unittests/test_collective_multi_nodes.py new file mode 100644 index 0000000000..a5514d72f3 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_collective_multi_nodes.py @@ -0,0 +1,120 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import numpy as np +import unittest +import time +import argparse +import os +import sys +import subprocess +import traceback +import functools +import pickle +import tempfile +from contextlib import closing +import paddle +import paddle.fluid as fluid +import paddle.fluid.unique_name as nameGen +from paddle.fluid import core +import socket + + +class TestCollectiveAPIRunnerBase(object): + + def check_pass(self, *args, **kwargs): + raise NotImplementedError( + "get model should be implemented by child class.") + + def run_trainer(self, *args, **kwargs): + self.check_pass(*args, **kwargs) + + +def runtime_main(test_class, col_type=None): + args = {} + model = test_class() + args["static_mode"] = 0 + model.run_trainer(**args) + + +class TestDistBase(unittest.TestCase): + + def setUp(self): + self._trainers = 4 + self._init_env() + + def _init_env(self): + self._python_interp = sys.executable + self.temp_dir = tempfile.TemporaryDirectory() + + def check_with_place(self, + model_file, + backend="nccl", + static_mode=False, + check_error_log=False, + need_envs={}, + eager_mode=True, + args=[], + kwargs={}): + required_envs = { + "FLAGS_fraction_of_gpu_memory_to_use": "0.15", + "FLAGS_eager_delete_tensor_gb": "0.0", + "PATH": os.getenv("PATH"), + "PYTHONPATH": os.getenv("PYTHONPATH", ""), + "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), + "LD_PRELOAD": os.getenv("LD_PRELOAD", ""), + "FLAGS_call_stack_level": "2", + "GLOG_v": "0", + "NCCL_P2P_DISABLE": "1", + "PADDLE_WITH_GLOO": "0", + "BACKEND": backend, + "PADDLE_DISTRI_BACKEND": backend, + "PADDLE_USE_GPU": "1" + } + required_envs.update(need_envs) + if check_error_log: + required_envs["GLOG_v"] = "0" + required_envs["GLOG_logtostderr"] = "1" + required_envs["GLOO_LOG_LEVEL"] = "TRACE" + + if eager_mode: + required_envs["FLAGS_enable_eager_mode"] = "%d" % 1 + else: + required_envs["FLAGS_enable_eager_mode"] = "%d" % 0 + self._run_cluster(model_file, required_envs) + + def _run_cluster(self, model_file, envs): + run_cluster_process = f"{self._python_interp} -u -m paddle.distributed.launch --log_dir {self.temp_dir.name} {model_file}" + filted_envs = dict() + for k in envs.keys(): + if "PADDLE_" == k[:7] and k not in [ + "PADDLE_NNODES", "PADDLE_MASTER" + ]: + continue + filted_envs[k] = envs[k] + + launcher = subprocess.Popen(run_cluster_process.strip().split(), + stdout=sys.stderr, + stderr=sys.stdout, + env=filted_envs) + launcher.communicate(timeout=240) + + if launcher.poll() is None: + self.temp_dir.cleanup() + raise TimeoutError + elif launcher.poll() != 0: + self.temp_dir.cleanup() + raise RuntimeError("test failed!") + self.temp_dir.cleanup() diff --git a/python/paddle/fluid/tests/unittests/test_multinode_dygraph_hybrid_dp.py b/python/paddle/fluid/tests/unittests/test_multinode_dygraph_hybrid_dp.py new file mode 100755 index 0000000000..0557562dab --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_multinode_dygraph_hybrid_dp.py @@ -0,0 +1,40 @@ +#!/usr/bin/python3 + +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import unittest +import numpy as np +import paddle + +from test_collective_multi_nodes import TestDistBase + +import os + + +class TestDYgraphDPMode(TestDistBase): + + def setUp(self): + self._trainers = 16 + self._init_env() + + def test_col_parallel_linear(self): + self.check_with_place("dygraph_hybrid_dp.py", + backend="nccl", + need_envs=os.environ) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_multinode_dygraph_hybrid_dpppmp.py b/python/paddle/fluid/tests/unittests/test_multinode_dygraph_hybrid_dpppmp.py new file mode 100755 index 0000000000..000793cc4c --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_multinode_dygraph_hybrid_dpppmp.py @@ -0,0 +1,50 @@ +#!/usr/bin/python3 + +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import unittest +import numpy as np +import paddle + +from test_collective_multi_nodes import TestDistBase + +import os + + +class TestDYgraphHybrid(TestDistBase): + + def setUp(self): + self._trainers = 16 + self._init_env() + + def test_hybrid_dpppmp(self): + self.check_with_place("dygraph_hybrid_dpppmp.py", + backend="nccl", + need_envs=os.environ) + + def test_hybrid_recompute(self): + self.check_with_place("dygraph_hybrid_recompute.py", + backend="nccl", + need_envs=os.environ) + + def test_hybrid_fp16(self): + self.check_with_place("dygraph_hybrid_fp16.py", + backend="nccl", + need_envs=os.environ) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_multinode_dygraph_sharding.py b/python/paddle/fluid/tests/unittests/test_multinode_dygraph_sharding.py new file mode 100755 index 0000000000..ae9d8f6da5 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_multinode_dygraph_sharding.py @@ -0,0 +1,45 @@ +#!/usr/bin/python3 + +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import unittest +import numpy as np +import paddle + +from test_collective_multi_nodes import TestDistBase + +import os + + +class TestDYgrapShardingDP(TestDistBase): + + def setUp(self): + self._trainers = 16 + self._init_env() + + def test_hybrid_sharding_stage2(self): + self.check_with_place("mn_dygraph_sharding_stage2.py", + backend="nccl", + need_envs=os.environ) + + def test_hybrid_sharding_stage3(self): + self.check_with_place("mn_dygraph_group_sharded_stage3.py", + backend="nccl", + need_envs=os.environ) + + +if __name__ == '__main__': + unittest.main() -- GitLab