From e305771e366605fe64ea7b875e839eedeaf9607c Mon Sep 17 00:00:00 2001 From: kangguangli Date: Tue, 14 Feb 2023 15:11:36 +0800 Subject: [PATCH] [with_data_parallel][part1] remove with_data_parallel in unit test (#50351) * process unit test matched test_p* * fix ci bug * fix codestyle * remove all tests about pe and restore some irrelated tests * delete test_parallel_executor_test_while_train.py --- .../fluid/tests/unittests/CMakeLists.txt | 22 -- .../unittests/test_eager_deletion_mnist.py | 12 - .../test_parallel_executor_dry_run.py | 96 ------ ..._parallel_executor_feed_persistable_var.py | 96 ------ .../test_parallel_executor_fetch_feed.py | 200 ------------ ...st_parallel_executor_fetch_isolated_var.py | 130 -------- ...test_parallel_executor_fix_op_run_order.py | 100 ------ ...el_executor_inference_feed_partial_data.py | 242 -------------- .../unittests/test_parallel_executor_mnist.py | 302 ------------------ .../test_parallel_executor_run_cinn.py | 2 +- ...test_parallel_executor_test_while_train.py | 113 ------- .../tests/unittests/test_pass_builder.py | 10 +- .../fluid/tests/unittests/test_print_op.py | 4 +- .../fluid/tests/unittests/test_profiler.py | 8 +- .../fluid/tests/unittests/test_prune.py | 95 +----- .../fluid/tests/unittests/test_py_func_op.py | 1 - tools/parallel_UT_rule.py | 13 - tools/static_mode_white_list.py | 6 - 18 files changed, 9 insertions(+), 1443 deletions(-) delete mode 100644 python/paddle/fluid/tests/unittests/test_parallel_executor_dry_run.py delete mode 100644 python/paddle/fluid/tests/unittests/test_parallel_executor_feed_persistable_var.py delete mode 100644 python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py delete mode 100644 python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_isolated_var.py delete mode 100644 python/paddle/fluid/tests/unittests/test_parallel_executor_fix_op_run_order.py delete mode 100644 python/paddle/fluid/tests/unittests/test_parallel_executor_inference_feed_partial_data.py delete mode 100644 python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py delete mode 100644 python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index a55cbaca0fc..84b4f2ee5c6 100755 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -427,7 +427,6 @@ list(REMOVE_ITEM TEST_OPS test_fetch_lod_tensor_array) list(REMOVE_ITEM TEST_OPS test_warpctc_op) list(REMOVE_ITEM TEST_OPS test_parallel_executor_profiler) list(REMOVE_ITEM TEST_OPS test_data_norm_op) -list(REMOVE_ITEM TEST_OPS test_parallel_executor_fetch_feed) list(REMOVE_ITEM TEST_OPS test_parallel_executor_transformer) list(REMOVE_ITEM TEST_OPS test_parallel_executor_transformer_auto_growth) list(REMOVE_ITEM TEST_OPS test_bilinear_interp_op) @@ -828,10 +827,6 @@ if(NOT WIN32) FLAGS_cudnn_deterministic=1) py_test_modules(test_ir_memory_optimize_transformer MODULES test_ir_memory_optimize_transformer) - # FIXME(zcd): temporally disable test_parallel_executor_fetch_feed in Windows CI because of the random failure. - py_test_modules(test_parallel_executor_fetch_feed MODULES - test_parallel_executor_fetch_feed) - set_tests_properties(test_parallel_executor_fetch_feed PROPERTIES TIMEOUT 450) endif() add_subdirectory(sequence) @@ -889,22 +884,11 @@ add_subdirectory(ir) add_subdirectory(standalone_executor) -if(WITH_TESTING) - set_property(TEST test_parallel_executor_mnist - PROPERTY ENVIRONMENT GLOG_vmodule=all_reduce_deps_pass=10) - set_property(TEST test_parallel_executor_fix_op_run_order - PROPERTY ENVIRONMENT GLOG_vmodule=fix_op_run_order_pass=10) -endif() - set_tests_properties( - test_parallel_executor_test_while_train - test_parallel_executor_mnist - test_parallel_executor_feed_persistable_var test_buffer_shared_memory_reuse_pass_and_fuse_optimization_op_pass test_data_norm_op test_dataloader_keep_order test_dataloader_unkeep_order - test_parallel_executor_inference_feed_partial_data test_parallel_ssa_graph_inference_feed_partial_data test_fetch_unmerged test_buffer_shared_memory_reuse_pass @@ -918,7 +902,6 @@ set_tests_properties( test_distributed_fused_lamb_op_with_clip test_distributed_fused_lamb_op_without_clip test_distributed_fused_lamb_op_with_gradient_merge - test_parallel_executor_fetch_isolated_var PROPERTIES LABELS "RUN_TYPE=DIST") if(NOT WIN32 AND NOT APPLE) @@ -1068,7 +1051,6 @@ set_tests_properties(test_weight_decay PROPERTIES TIMEOUT 120) set_tests_properties(test_imperative_ptb_rnn_sorted_gradient PROPERTIES TIMEOUT 120) set_tests_properties(test_crop_tensor_op PROPERTIES TIMEOUT 120) -set_tests_properties(test_parallel_executor_mnist PROPERTIES TIMEOUT 120) set_tests_properties(test_imperative_ptb_rnn PROPERTIES TIMEOUT 120) set_tests_properties(test_imperative_save_load_v2 PROPERTIES TIMEOUT 120) set_tests_properties(test_conv2d_transpose_op PROPERTIES TIMEOUT 120) @@ -1138,8 +1120,6 @@ set_tests_properties(test_matmul_v2_op PROPERTIES TIMEOUT 120) set_tests_properties(test_slice_op PROPERTIES TIMEOUT 120) set_tests_properties(test_strided_slice_op PROPERTIES TIMEOUT 120) set_tests_properties(test_translated_layer PROPERTIES TIMEOUT 120) -set_tests_properties(test_parallel_executor_inference_feed_partial_data - PROPERTIES TIMEOUT 120) set_tests_properties(test_pad3d_op PROPERTIES TIMEOUT 120) set_tests_properties(test_dataloader_keep_order PROPERTIES TIMEOUT 120) set_tests_properties(test_mean_op PROPERTIES TIMEOUT 120) @@ -1244,8 +1224,6 @@ if(WITH_CINN AND WITH_TESTING) endif() # ExecutionStrategy is deprecated in standalone executor -set_tests_properties(test_parallel_executor_dry_run - PROPERTIES ENVIRONMENT "FLAGS_USE_STANDALONE_EXECUTOR=0") set_tests_properties(test_parallel_executor_drop_scope PROPERTIES ENVIRONMENT "FLAGS_USE_STANDALONE_EXECUTOR=0") diff --git a/python/paddle/fluid/tests/unittests/test_eager_deletion_mnist.py b/python/paddle/fluid/tests/unittests/test_eager_deletion_mnist.py index f9344e9f823..6ab2ec455e9 100644 --- a/python/paddle/fluid/tests/unittests/test_eager_deletion_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_eager_deletion_mnist.py @@ -18,17 +18,5 @@ import paddle.fluid as fluid fluid.core._set_eager_deletion_mode(0.0, 1.0, True) -# FIXME(zjl): It seems that this unittest fails randomly -# when comparing all reduce last loss and reduce last loss -# e.g.: AssertionError: 1.0357145 != 1.0673475 within 0.01 delta -# Disable it temporarily. -''' -from test_parallel_executor_mnist import TestMNIST - - -class EagerDeletionTestMNIST(TestMNIST): - pass -''' - if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_dry_run.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_dry_run.py deleted file mode 100644 index 0d750ddcbe4..00000000000 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_dry_run.py +++ /dev/null @@ -1,96 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import os -import unittest - -import paddle -import paddle.fluid as fluid -from paddle.fluid import compiler - -os.environ['CPU_NUM'] = str(4) - - -class TestBase(unittest.TestCase): - def main( - self, - network_func, - iter=10, - iter_per_pe=10, - use_gpu=True, - use_experimental_executor=False, - ): - if use_gpu and not fluid.core.is_compiled_with_cuda(): - logging.warning( - "Paddle is not compiled with CUDA, skip GPU unittests" - ) - return - - main_prog = fluid.Program() - startup_prog = fluid.Program() - scope = fluid.Scope() - with fluid.program_guard(main_prog, startup_prog): - with fluid.scope_guard(scope): - loss = network_func() - exe = fluid.Executor( - fluid.CUDAPlace(0) if use_gpu else fluid.CPUPlace() - ) - exe.run(startup_prog) - - exe_strategy = fluid.ExecutionStrategy() - exe_strategy._dry_run = True - exe_strategy.use_experimental_executor = ( - use_experimental_executor - ) - train_cp = compiler.CompiledProgram( - main_prog - ).with_data_parallel( - loss_name=loss.name, exec_strategy=exe_strategy - ) - for _ in range(iter): - for _ in range(iter_per_pe): - exe.run(train_cp) - - -class TestMNISTDryRun(TestBase): - def test_mnist_dry_run(self): - for use_gpu in (False, True): - for use_experimental_executor in (False, True): - self.main( - network_func=TestMNISTDryRun.network_func, - use_gpu=use_gpu, - use_experimental_executor=use_experimental_executor, - ) - - @staticmethod - def network_func(): - img = paddle.static.data(name='img', shape=[-1, 784], dtype='float32') - label = paddle.static.data(name='label', shape=[-1, 1], dtype='int64') - hidden = img - for _ in range(10): - hidden = paddle.static.nn.fc(x=img, size=200, activation='tanh') - prediction = paddle.static.nn.fc( - x=hidden, size=10, activation='softmax' - ) - loss = paddle.nn.functional.cross_entropy( - input=prediction, label=label, reduction='none', use_softmax=False - ) - avg_loss = paddle.mean(loss) - fluid.optimizer.Adam().minimize(avg_loss) - return avg_loss - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_feed_persistable_var.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_feed_persistable_var.py deleted file mode 100644 index 3d525422ccf..00000000000 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_feed_persistable_var.py +++ /dev/null @@ -1,96 +0,0 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import unittest -from functools import partial - -import numpy -from simple_nets import init_data, simple_fc_net - -import paddle -import paddle.fluid as fluid -import paddle.fluid.core as core - - -class TestFeedPersistableVar(unittest.TestCase): - @classmethod - def setUpClass(cls): - os.environ['CPU_NUM'] = str(4) - batch_size = 4 - cls.img, cls.label = init_data( - batch_size, img_shape=[784], label_range=9 - ) - cls.feed_dict = { - 'image': cls.img, - 'label': cls.label, - 'learning_rate': numpy.array([1.0]).astype("float32"), - } - - def optimizer(self): - learning_rate = paddle.static.create_global_var( - name="learning_rate", - shape=[1], - value=1.0, - dtype='float32', - persistable=True, - ) - optimizer = fluid.optimizer.SGD(learning_rate=learning_rate) - return optimizer - - def check_feed_persistable_var(self, feed_dict, use_cuda=False): - if use_cuda and not core.is_compiled_with_cuda(): - return - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - exe = fluid.Executor(place) - - main = fluid.Program() - startup = fluid.Program() - with fluid.program_guard(main, startup): - loss = simple_fc_net() - - optimizer = self.optimizer() - optimizer.minimize(loss) - - exe.run(program=startup) - compiled_prog = fluid.compiler.CompiledProgram( - main - ).with_data_parallel(loss_name=loss.name) - - exe.run(program=compiled_prog, feed=feed_dict) - - def test_feed_persistable_var(self): - self.check_feed_persistable_var(self.feed_dict) - self.check_feed_persistable_var(self.feed_dict, use_cuda=True) - - self.feed_dict['learning_rate'] = numpy.array([1.0, 1.0]).astype( - "float32" - ) - self.check_feed_persistable_var(self.feed_dict, use_cuda=True) - - self.feed_dict['learning_rate'] = numpy.array([1.0, 1.0]).astype( - "float32" - ) - run = partial(self.check_feed_persistable_var, self.feed_dict) - self.assertRaises(RuntimeError, run) - - self.feed_dict['image'] = self.img[0, :] - self.feed_dict['label'] = self.label[0, :] - run = partial(self.check_feed_persistable_var, self.feed_dict) - self.assertRaises(RuntimeError, run) - - -if __name__ == '__main__': - paddle.enable_static() - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py deleted file mode 100644 index 5aa87bc7b06..00000000000 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py +++ /dev/null @@ -1,200 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import math -import os -import unittest - -import numpy as np - -import paddle -import paddle.fluid as fluid -import paddle.fluid.core as core -from paddle.fluid import compiler - - -def Lenet(data, class_dim): - conv1 = paddle.static.nn.conv2d(data, 4, 5, 1, act=None) - bn1 = paddle.static.nn.batch_norm(conv1, act='relu') - pool1 = paddle.nn.functional.max_pool2d(bn1, 2, 2) - conv2 = paddle.static.nn.conv2d(pool1, 16, 5, 1, act=None) - bn2 = paddle.static.nn.batch_norm(conv2, act='relu') - pool2 = paddle.nn.functional.max_pool2d(bn2, 2, 2) - - fc1 = paddle.static.nn.fc(pool2, size=50, activation='relu') - fc2 = paddle.static.nn.fc(fc1, size=class_dim, activation='softmax') - - return fc2 - - -class TestFetchAndFeed(unittest.TestCase): - @classmethod - def setUpClass(cls): - os.environ['CPU_NUM'] = str(4) - - def parallel_exe( - self, - use_cuda, - run_parallel_exe, - use_faster_executor=False, - num_threads=4, - seed=1, - ): - main_program = fluid.Program() - startup = fluid.Program() - startup.random_seed = seed - with fluid.program_guard(main_program, startup): - data = paddle.static.data( - name='image', shape=[-1, 3, 224, 224], dtype='float32' - ) - label = paddle.static.data( - name='label', shape=[-1, 1], dtype='int64' - ) - out = Lenet(data, class_dim=102) - loss = paddle.nn.functional.cross_entropy( - input=out, label=label, reduction='none', use_softmax=False - ) - loss = paddle.mean(loss) - opt = fluid.optimizer.Momentum( - learning_rate=0.1, - momentum=0.9, - regularization=fluid.regularizer.L2Decay(1e-4), - ) - opt.minimize(loss) - - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(startup) - - # FIXME force disable enable_inplace and memory_optimize to pass the unittest - build_strategy = fluid.BuildStrategy() - build_strategy.enable_inplace = False - build_strategy.memory_optimize = False - exec_strategy = fluid.ExecutionStrategy() - exec_strategy.use_experimental_executor = use_faster_executor - exec_strategy.num_threads = num_threads - train_cp = compiler.CompiledProgram(main_program).with_data_parallel( - loss_name=loss.name, - build_strategy=build_strategy, - exec_strategy=exec_strategy, - ) - - run_parallel_exe(train_cp, exe, use_cuda, data, label, loss) - - def run_parallel_exe_with_fetch( - self, compiled_program, exe, use_cuda, data, label, loss - ): - def get_data(batch_size=8): - np.random.seed(5) - while True: - img = np.random.random(size=[batch_size, 3, 224, 224]).astype( - np.float32 - ) - l = (np.random.random(size=[batch_size, 1]) * 10).astype( - np.int64 - ) - yield img, l - - fetch_list = [] - all_vars = compiled_program._program.global_block().vars - - for k, v in all_vars.items(): - if ( - ('tmp' not in k) - and (k[0] != '_' or v.persistable) - and v.type == core.VarDesc.VarType.LOD_TENSOR - ): - fetch_list.append(k) - - for batch_id, img_label in enumerate(get_data()): - img, l = img_label - train_inputs = {data.name: img, label.name: l} - ret = exe.run( - compiled_program, - fetch_list=fetch_list, - feed=train_inputs, - return_numpy=True, - ) - for i in range(len(fetch_list)): - assert not math.isnan(np.sum(ret[i])) and not math.isinf( - np.sum(ret[i]) - ) - if batch_id == 2: - break - - def run_parallel_exe_with_feed( - self, compiled_program, exe, use_cuda, data, label, loss - ): - def get_data(batch_size=8): - np.random.seed(5) - while True: - train_data = [] - for _ in range(batch_size): - img = np.random.random(size=[1, 3, 224, 224]).astype( - np.float32 - ) - label = (np.random.random(size=[1, 1]) * 10).astype( - np.int64 - ) - train_data.append([img, label]) - yield train_data - - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - feeder = fluid.DataFeeder(place=place, feed_list=[data, label]) - reader = feeder.decorate_reader(get_data, multi_devices=True) - - for batch_id, data in enumerate(reader()): - loss_np = exe.run( - compiled_program, feed=data, fetch_list=[loss.name] - )[0] - print(batch_id, loss_np) - if batch_id == 2: - break - - def check_executor(self, use_faster_executor=False, num_threads=4): - if core.is_compiled_with_cuda(): - self.parallel_exe( - use_cuda=True, - run_parallel_exe=self.run_parallel_exe_with_fetch, - use_faster_executor=use_faster_executor, - num_threads=num_threads, - ) - self.parallel_exe( - use_cuda=False, - run_parallel_exe=self.run_parallel_exe_with_fetch, - use_faster_executor=use_faster_executor, - num_threads=num_threads, - ) - - def test_fetch(self): - for use_faster_executor in {True, False}: - self.check_executor( - use_faster_executor=use_faster_executor, num_threads=4 - ) - self.check_executor( - use_faster_executor=use_faster_executor, num_threads=1 - ) - - def test_feed(self): - if core.is_compiled_with_cuda(): - self.parallel_exe( - use_cuda=True, run_parallel_exe=self.run_parallel_exe_with_feed - ) - self.parallel_exe( - use_cuda=False, run_parallel_exe=self.run_parallel_exe_with_feed - ) - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_isolated_var.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_isolated_var.py deleted file mode 100644 index d8e7cbe1303..00000000000 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_isolated_var.py +++ /dev/null @@ -1,130 +0,0 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import unittest - -import numpy as np - -import paddle -import paddle.fluid as fluid - - -def enable_parallel_ssa_executor(enabled=True): - if fluid.is_compiled_with_cuda(): - fluid.core.globals()['FLAGS_enable_parallel_graph'] = enabled - - -class TestParallelExecutorFetchIsolatedVarBase(unittest.TestCase): - def build_network(self, is_training): - x = fluid.data(name='x', shape=[-1, 10], dtype='float32') - y = fluid.data(name='y', shape=[-1, 10], dtype='float32') - fc = paddle.static.nn.fc(x, size=30, bias_attr=False) - loss = paddle.mean(fc) - if is_training: - adam = fluid.optimizer.Adam(learning_rate=1e-3) - adam.minimize(loss) - - return loss, y - - def exec_strategy(self, use_experimental_executor): - strategy = fluid.ExecutionStrategy() - strategy.use_experimental_executor = use_experimental_executor - return strategy - - def places(self, use_gpu, dev_cnt): - if use_gpu: - return fluid.cuda_places(list(range(dev_cnt))) - else: - return fluid.cpu_places(dev_cnt) - - def test_main(self): - for use_gpu in [False, True]: - for dev_cnt in [1, 2]: - for is_training in [False, True]: - for use_experimental_executor in [False, True]: - for use_parallel_ssa_executor in [False, True]: - func = lambda: self.run_impl( - use_gpu, - dev_cnt, - is_training, - use_experimental_executor, - use_parallel_ssa_executor, - ) - self.run_func_with_guard(func) - - def run_impl( - self, - use_gpu, - dev_cnt, - is_training, - use_experimental_executor, - use_parallel_ssa_executor, - ): - paddle.enable_static() - enable_parallel_ssa_executor(use_parallel_ssa_executor) - - if fluid.is_compiled_with_cuda(): - if ( - fluid.core.globals()['FLAGS_enable_parallel_graph'] - and not use_gpu - ): - return - # windows has only 1 GPU - if use_gpu and dev_cnt > 1 and os.name == "nt": - return - else: - if use_gpu: - return - - loss, isolated_var = self.build_network(is_training) - loss_name = loss.name if is_training else None - - places = self.places(use_gpu, dev_cnt) - exe = fluid.Executor(places[0]) - - exe.run(fluid.default_startup_program()) - - prog = fluid.CompiledProgram( - fluid.default_main_program() - ).with_data_parallel( - loss_name=loss_name, - exec_strategy=self.exec_strategy(use_experimental_executor), - places=places, - ) - - BATCH_SIZE = 8 * dev_cnt - for _ in range(10): - x_np = np.random.random(size=[BATCH_SIZE, 10]).astype('float32') - y_np = np.random.random(size=[BATCH_SIZE, 10]).astype('float32') - - _, y_np_fetch = exe.run( - prog, - feed={'x': x_np, 'y': y_np}, - fetch_list=[loss, isolated_var], - ) - - np.testing.assert_array_equal(y_np, y_np_fetch) - - enable_parallel_ssa_executor(False) - - def run_func_with_guard(self, func): - with fluid.program_guard(fluid.Program(), fluid.Program()): - with fluid.unique_name.guard(): - with fluid.scope_guard(fluid.Scope()): - func() - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_fix_op_run_order.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_fix_op_run_order.py deleted file mode 100644 index 8c047878ac9..00000000000 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_fix_op_run_order.py +++ /dev/null @@ -1,100 +0,0 @@ -# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -import numpy as np - -import paddle -import paddle.fluid as fluid -from paddle.nn import CrossEntropyLoss -from paddle.vision.models import resnet18 - - -class TestFixOpRunOrder(unittest.TestCase): - def setUp(self): - paddle.enable_static() - paddle.seed(1) - paddle.framework.random._manual_program_seed(1) - if paddle.is_compiled_with_cuda(): - fluid.set_flags({'FLAGS_cudnn_deterministic': 1}) - - def get_place(self): - return ( - paddle.CUDAPlace(0) - if paddle.is_compiled_with_cuda() - else paddle.CPUPlace() - ) - - def get_feed(self): - batch_size = 4 - image = np.random.random([batch_size, 3, 224, 224]).astype('float32') - label = np.random.randint(0, 1000, [batch_size, 1]).astype('int64') - return {"image": image, "label": label} - - def create_model(self, fix_op_run_order): - main_prog = paddle.static.Program() - startup_prog = paddle.static.Program() - scope = paddle.static.Scope() - with paddle.static.program_guard(main_prog, startup_prog): - image = paddle.static.data( - name="image", shape=[None, 3, 224, 224], dtype="float32" - ) - label = paddle.static.data( - name="label", shape=[None, 1], dtype="int64" - ) - model = resnet18() - pred = model(image) - loss_fn = CrossEntropyLoss() - loss = loss_fn(pred, label) - optimizer = paddle.optimizer.SGD(learning_rate=1e-3) - optimizer.minimize(loss) - - build_strategy = paddle.static.BuildStrategy() - build_strategy.fix_op_run_order = fix_op_run_order - build_strategy.fuse_bn_act_ops = True - build_strategy.fuse_bn_add_act_ops = True - main_prog = paddle.static.CompiledProgram(main_prog).with_data_parallel( - loss_name=loss.name, - build_strategy=build_strategy, - places=[self.get_place()], - ) - - exe = paddle.static.Executor(self.get_place()) - with paddle.static.scope_guard(scope): - exe.run(startup_prog) - - return main_prog, scope, loss - - def run_and_fetch_loss(self, main_prog, scope, loss, feed): - with paddle.static.scope_guard(scope): - exe = paddle.static.Executor(self.get_place()) - loss_value = exe.run(main_prog, feed=feed, fetch_list=[loss])[0] - return loss_value - - def test_main(self): - if not paddle.is_compiled_with_cuda(): - return - - main1, scope1, loss1 = self.create_model(True) - main2, scope2, loss2 = self.create_model(False) - for i in range(10): - feed = self.get_feed() - loss_val1 = self.run_and_fetch_loss(main1, scope1, loss1, feed) - loss_val2 = self.run_and_fetch_loss(main2, scope2, loss2, feed) - self.assertEqual(loss_val1, loss_val2) - - -if __name__ == "__main__": - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_inference_feed_partial_data.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_inference_feed_partial_data.py deleted file mode 100644 index bd5b2c77983..00000000000 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_inference_feed_partial_data.py +++ /dev/null @@ -1,242 +0,0 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -import numpy as np - -import paddle -import paddle.fluid as fluid -import paddle.nn.functional as F - - -class TestInferencePartialFeed(unittest.TestCase): - def setUp(self): - self.iterations = 10 - self.size = 10 - - def run_network(self, places, use_split, has_persistable): - startup_prog = fluid.Program() - main_prog = fluid.Program() - - with fluid.program_guard(main_prog, startup_prog): - x = fluid.data(name='x', shape=[None, self.size], dtype='float32') - y = fluid.data(name='y', shape=[None, self.size], dtype='float32') - if has_persistable: - lr = fluid.data(name='lr', shape=[1], dtype='float32') - lr.persistable = True - else: - lr = fluid.data(name='lr', shape=[None], dtype='float32') - - relu_x = F.relu(x) - relu_y = F.relu(y) - relu_lr = F.relu(lr) - - exe = fluid.Executor(places[0]) - exe.run(startup_prog) - - prog = fluid.CompiledProgram(main_prog).with_data_parallel( - places=places - ) - - gen_random = lambda shape: np.random.uniform( - low=-1.0, high=1.0, size=shape - ).astype('float32') - assert_result = lambda feed, result: np.testing.assert_array_equal( - np.maximum(0, feed), result - ) - - def assert_merged_unmerged(merged, unmerged): - unmerged = np.concatenate(unmerged, axis=0) - np.testing.assert_array_equal(merged, unmerged) - - def feed_split_test(): - for place_num in range(1, len(places) * 3): - x_np = gen_random([place_num, self.size]) - y_np = gen_random([place_num, self.size]) - if not lr.persistable or place_num <= len(places): - lr_np = gen_random([place_num]) - else: - lr_np = gen_random([1]) - - feed = {x.name: x_np, y.name: y_np, lr.name: lr_np} - fetch_list = [relu_x, relu_y, relu_lr] - - relu_x_np, relu_y_np, relu_lr_np = exe.run( - prog, feed=feed, fetch_list=fetch_list, return_merged=True - ) - - ( - relu_x_np_unmerged, - relu_y_np_unmerged, - relu_lr_np_unmerged, - ) = exe.run( - prog, feed=feed, fetch_list=fetch_list, return_merged=False - ) - - assert_merged_unmerged(relu_x_np, relu_x_np_unmerged) - assert_merged_unmerged(relu_y_np, relu_y_np_unmerged) - assert_merged_unmerged(relu_lr_np, relu_lr_np_unmerged) - - assert_result(x_np, relu_x_np) - assert_result(y_np, relu_y_np) - if not lr.persistable or place_num <= len(places): - assert_result(lr_np, relu_lr_np) - else: - expected_relu_lr_np = max(lr_np[0], 0) - self.assertTrue(np.all(expected_relu_lr_np == relu_lr_np)) - - def feed_list_test(): - for place_num in range(1, len(places) + 1): - x_np_list = [] - y_np_list = [] - lr_np_list = [] - feed_list = [] - for _ in range(place_num): - x_np = gen_random([1, self.size]) - y_np = gen_random([1, self.size]) - lr_np = gen_random([1]) - x_np_list.append(x_np) - y_np_list.append(y_np) - lr_np_list.append(lr_np) - - feed_list.append( - {x.name: x_np, y.name: y_np, lr.name: lr_np} - ) - - fetch_list = [relu_x, relu_y, relu_lr] - relu_x_np, relu_y_np, relu_lr_np = exe.run( - prog, - feed=feed_list, - fetch_list=fetch_list, - return_merged=True, - ) - - ( - relu_x_np_unmerged, - relu_y_np_unmerged, - relu_lr_np_unmerged, - ) = exe.run( - prog, - feed=feed_list, - fetch_list=fetch_list, - return_merged=False, - ) - - assert_merged_unmerged(relu_x_np, relu_x_np_unmerged) - assert_merged_unmerged(relu_y_np, relu_y_np_unmerged) - assert_merged_unmerged(relu_lr_np, relu_lr_np_unmerged) - - x_np = np.concatenate(x_np_list) - y_np = np.concatenate(y_np_list) - lr_np = np.concatenate(lr_np_list) - - assert_result(x_np, relu_x_np) - assert_result(y_np, relu_y_np) - assert_result(lr_np, relu_lr_np) - - for _ in range(self.iterations): - if use_split: - feed_split_test() - else: - feed_list_test() - - def test_main(self): - places = [fluid.cpu_places(4)] - if fluid.is_compiled_with_cuda(): - places.append(fluid.cuda_places()) - - for p in places: - for has_persistable in [False, True]: - for use_split in [False, True]: - self.run_network( - p, use_split=use_split, has_persistable=has_persistable - ) - - -class TestInferencePartialFeedUsingDataLoader(unittest.TestCase): - def setUp(self): - self.epoch_num = 3 - self.batch_num = 101 # a prime number - self.batch_size = 32 - - def create_reader(self): - def __impl__(): - for _ in range(self.batch_num): - yield np.random.random([self.batch_size, 1]).astype('float32'), - - return __impl__ - - def run_network(self, iterable, use_cuda, drop_last): - x = fluid.data(shape=[None, 1], name='x', dtype='float32') - places = fluid.cuda_places() if use_cuda else fluid.cpu_places(4) - loader = fluid.io.DataLoader.from_generator( - feed_list=[x], capacity=16, iterable=iterable, drop_last=drop_last - ) - y = paddle.static.nn.fc(x, size=10) - loss = paddle.mean(y) - - exe = fluid.Executor(places[0]) - exe.run(fluid.default_startup_program()) - - prog = fluid.CompiledProgram( - fluid.default_main_program() - ).with_data_parallel(places=places, loss_name=loss.name) - - loader.set_batch_generator( - self.create_reader(), places=places if iterable else None - ) - - for _ in range(self.epoch_num): - actual_batch_num = 0 - if loader.iterable: - for feed_data in loader(): - (x_data,) = exe.run(prog, feed=feed_data, fetch_list=[x]) - self.assertEqual(x_data.shape[0] % self.batch_size, 0) - self.assertTrue(x_data.shape[0] != 0) - actual_batch_num += int(x_data.shape[0] / self.batch_size) - else: - loader.start() - try: - while True: - (x_data,) = exe.run(prog, fetch_list=[x]) - self.assertEqual(x_data.shape[0] % self.batch_size, 0) - self.assertTrue(x_data.shape[0] != 0) - actual_batch_num += int( - x_data.shape[0] / self.batch_size - ) - except fluid.core.EOFException: - loader.reset() - - if not drop_last or len(places) == 1: - self.assertEqual(self.batch_num, actual_batch_num) - else: - self.assertGreater(self.batch_num, actual_batch_num) - - def test_main(self): - use_cuda_list = ( - [False, True] if fluid.is_compiled_with_cuda() else [False] - ) - iterable_list = [False, True] - drop_last_list = [False, True] - for iterable in iterable_list: - for use_cuda in use_cuda_list: - for drop_last in drop_last_list: - with fluid.program_guard(fluid.Program(), fluid.Program()): - with fluid.scope_guard(fluid.Scope()): - self.run_network(iterable, use_cuda, drop_last) - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py deleted file mode 100644 index 1f6429620f6..00000000000 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py +++ /dev/null @@ -1,302 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import unittest - -import numpy as np -from parallel_executor_test_base import DeviceType, TestParallelExecutorBase - -import paddle -import paddle.fluid as fluid -import paddle.fluid.core as core - - -def simple_fc_net(use_feed): - img = paddle.static.data(name='image', shape=[-1, 784], dtype='float32') - label = paddle.static.data(name='label', shape=[-1, 1], dtype='int64') - hidden = img - for _ in range(4): - hidden = paddle.static.nn.fc( - hidden, - size=200, - activation='tanh', - bias_attr=fluid.ParamAttr( - initializer=paddle.nn.initializer.Constant(value=1.0) - ), - ) - prediction = paddle.static.nn.fc(hidden, size=10, activation='softmax') - loss = paddle.nn.functional.cross_entropy( - input=prediction, label=label, reduction='none', use_softmax=False - ) - loss = paddle.mean(loss) - return loss - - -def fc_with_batchnorm(use_feed): - img = paddle.static.data(name='image', shape=[-1, 784], dtype='float32') - label = paddle.static.data(name='label', shape=[-1, 1], dtype='int64') - - hidden = img - for _ in range(1): - with fluid.name_scope("hidden"): - hidden = paddle.static.nn.fc( - hidden, - size=200, - activation='tanh', - bias_attr=fluid.ParamAttr( - initializer=paddle.nn.initializer.Constant(value=1.0) - ), - ) - - hidden = paddle.static.nn.batch_norm(input=hidden) - with fluid.name_scope("fc_layer"): - prediction = paddle.static.nn.fc(hidden, size=10, activation='softmax') - with fluid.name_scope("loss"): - loss = paddle.nn.functional.cross_entropy( - input=prediction, label=label, reduction='none', use_softmax=False - ) - loss = paddle.mean(loss) - return loss - - -def init_data(): - np.random.seed(5) - img = np.random.random(size=[32, 784]).astype(np.float32) - label = np.ones(shape=[32, 1], dtype='int64') - return img, label - - -class TestMNIST(TestParallelExecutorBase): - @classmethod - def setUpClass(cls): - os.environ['CPU_NUM'] = str(4) - - def _compare_reduce_and_allreduce( - self, model, use_device, delta1=1e-6, delta2=1e-4 - ): - if use_device == DeviceType.CUDA and not core.is_compiled_with_cuda(): - return - - if use_device == DeviceType.XPU and not core.is_compiled_with_xpu(): - return - - img, label = init_data() - - ( - all_reduce_first_loss, - all_reduce_last_loss, - _, - ) = self.check_network_convergence( - model, - feed_dict={"image": img, "label": label}, - use_device=use_device, - use_reduce=False, - ) - - reduce_first_loss, reduce_last_loss, _ = self.check_network_convergence( - model, - feed_dict={"image": img, "label": label}, - use_device=use_device, - use_reduce=True, - ) - - for loss in zip(all_reduce_first_loss, reduce_first_loss): - self.assertAlmostEqual(loss[0], loss[1], delta=delta1) - for loss in zip(all_reduce_last_loss, reduce_last_loss): - self.assertAlmostEqual(loss[0], loss[1], delta=delta2) - - # simple_fc - def check_simple_fc_convergence(self, use_device, use_reduce=False): - if use_device == DeviceType.CUDA and not core.is_compiled_with_cuda(): - return - - if use_device == DeviceType.XPU and not core.is_compiled_with_xpu(): - return - - img, label = init_data() - - self.check_network_convergence( - simple_fc_net, - feed_dict={"image": img, "label": label}, - use_device=use_device, - use_reduce=use_reduce, - ) - - def test_simple_fc(self): - # use_device - self.check_simple_fc_convergence(DeviceType.CUDA) - self.check_simple_fc_convergence(DeviceType.CPU) - self.check_simple_fc_convergence(DeviceType.XPU) - - def test_simple_fc_with_new_strategy(self): - # use_device, use_reduce - # NOTE: the computation result of nccl_reduce is non-deterministic, - # related issue: https://github.com/NVIDIA/nccl/issues/157 - self._compare_reduce_and_allreduce( - simple_fc_net, DeviceType.CUDA, 1e-5, 1e-2 - ) - self._compare_reduce_and_allreduce( - simple_fc_net, DeviceType.CPU, 1e-5, 1e-2 - ) - - def check_simple_fc_parallel_accuracy(self, use_device): - if use_device == DeviceType.CUDA and not core.is_compiled_with_cuda(): - return - - img, label = init_data() - - single_first_loss, single_last_loss, _ = self.check_network_convergence( - method=simple_fc_net, - feed_dict={"image": img, "label": label}, - use_device=use_device, - use_parallel_executor=False, - ) - ( - parallel_first_loss, - parallel_last_loss, - _, - ) = self.check_network_convergence( - method=simple_fc_net, - feed_dict={"image": img, "label": label}, - use_device=use_device, - use_parallel_executor=True, - ) - - self.assertAlmostEqual( - np.mean(parallel_first_loss), - single_first_loss, - delta=1e-6, - ) - self.assertAlmostEqual( - np.mean(parallel_last_loss), single_last_loss, delta=1e-6 - ) - - def test_simple_fc_parallel_accuracy(self): - self.check_simple_fc_parallel_accuracy(DeviceType.CUDA) - self.check_simple_fc_parallel_accuracy(DeviceType.CPU) - - def check_batchnorm_fc_convergence(self, use_device, use_fast_executor): - if use_device == DeviceType.CUDA and not core.is_compiled_with_cuda(): - return - if use_device == DeviceType.XPU and not core.is_compiled_with_xpu(): - return - img, label = init_data() - - self.check_network_convergence( - fc_with_batchnorm, - feed_dict={"image": img, "label": label}, - use_device=use_device, - use_fast_executor=use_fast_executor, - ) - - def test_batchnorm_fc(self): - for use_device in (DeviceType.CPU, DeviceType.CUDA): - for use_fast_executor in (False, True): - self.check_batchnorm_fc_convergence( - use_device, use_fast_executor - ) - - def test_batchnorm_fc_with_new_strategy(self): - # NOTE: the computation result of nccl_reduce is non-deterministic, - # related issue: https://github.com/NVIDIA/nccl/issues/157 - self._compare_reduce_and_allreduce( - fc_with_batchnorm, DeviceType.CUDA, 1e-5, 1e-2 - ) - self._compare_reduce_and_allreduce( - fc_with_batchnorm, DeviceType.CPU, 1e-5, 1e-2 - ) - - -class TestMNISTNoReduce(unittest.TestCase): - def run_program(self, device_type): - if device_type == DeviceType.CUDA: - if not paddle.is_compiled_with_cuda(): - return - places = paddle.static.cuda_places() - else: - self.assertEqual(device_type, DeviceType.CPU) - places = paddle.static.cpu_places(4) - - paddle.seed(10) - with paddle.fluid.unique_name.guard(): - main = paddle.static.Program() - startup = paddle.static.Program() - with paddle.static.program_guard(main, startup): - loss = simple_fc_net(use_feed=True) - optimizer = paddle.optimizer.SGD(learning_rate=0.0) - optimizer.minimize(loss) - - grads = [p.name + '@GRAD' for p in main.all_parameters()] - no_reduce = paddle.static.BuildStrategy.ReduceStrategy._NoReduce - - build_strategy = paddle.static.BuildStrategy() - build_strategy.reduce_strategy = no_reduce - main_multi_place = paddle.static.CompiledProgram( - main - ).with_data_parallel( - loss_name=loss.name, build_strategy=build_strategy, places=places - ) - - build_strategy = paddle.static.BuildStrategy() - build_strategy.reduce_strategy = no_reduce - main_single_place = paddle.static.CompiledProgram( - main.clone() - ).with_data_parallel( - loss_name=loss.name, build_strategy=build_strategy, places=places[0] - ) - - image, label = init_data() - feed = {'image': image, 'label': label} - exe = paddle.static.Executor(places[0]) - scope = paddle.static.Scope() - with paddle.static.scope_guard(scope): - exe.run(startup) - grads_multi_place = exe.run( - main_multi_place, feed=feed, fetch_list=[grads] - ) - - feeds = self.split_feed(feed, len(places)) - grads_single_place = [list() for _ in range(len(grads))] - for f in feeds: - gs = exe.run(main_single_place, feed=f, fetch_list=[grads]) - for i, g in enumerate(gs): - grads_single_place[i].append(g) - - for i in range(len(grads)): - grads_single_place[i] = np.concatenate( - grads_single_place[i], axis=0 - ) / len(places) - - self.assertEqual(len(grads_multi_place), len(grads_single_place)) - for g1, g2 in zip(grads_multi_place, grads_single_place): - np.testing.assert_allclose(g1, g2, rtol=1e-05) - - def split_feed(self, feed, n): - image = feed['image'] - label = feed['label'] - self.assertEqual(image.shape[0] % n, 0) - self.assertEqual(label.shape[0] % n, 0) - images = np.split(image, n) - labels = np.split(label, n) - return [{'image': images[i], 'label': labels[i]} for i in range(n)] - - def test_main(self): - self.run_program(DeviceType.CUDA) - self.run_program(DeviceType.CPU) - - -if __name__ == '__main__': - paddle.enable_static() - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_run_cinn.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_run_cinn.py index 135944f145b..79cbabee262 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_run_cinn.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_run_cinn.py @@ -103,7 +103,7 @@ def train(dot_save_dir, prefix, seed=1234): build_strategy.debug_graphviz_path = os.path.join(dot_save_dir, prefix) compiled_program = paddle.static.CompiledProgram( main_program, build_strategy - ).with_data_parallel(loss_name=loss.name) + ) iters = 100 feed = rand_data(img.name, label.name, iters) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py deleted file mode 100644 index 9d5d884c27b..00000000000 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py +++ /dev/null @@ -1,113 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import math -import os -import sys -import unittest - -import numpy as np -from simple_nets import simple_fc_net - -import paddle.fluid as fluid -import paddle.fluid.core as core -from paddle.fluid import compiler - - -class ParallelExecutorTestingDuringTraining(unittest.TestCase): - def check_network_convergence(self, use_cuda, build_strategy=None): - os.environ['CPU_NUM'] = str(4) - main = fluid.Program() - startup = fluid.Program() - with fluid.program_guard(main, startup): - loss = simple_fc_net() - test_program = main.clone(for_test=True) - - opt = fluid.optimizer.SGD(learning_rate=0.001) - opt.minimize(loss) - - batch_size = 32 - image = np.random.normal(size=(batch_size, 784)).astype('float32') - label = np.random.randint(0, 10, (batch_size, 1), dtype="int64") - - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(startup) - feed_dict = {'image': image, 'label': label} - - train_cp = compiler.CompiledProgram(main).with_data_parallel( - loss_name=loss.name, build_strategy=build_strategy - ) - test_cp = compiler.CompiledProgram(test_program).with_data_parallel( - loss_name=loss.name, - build_strategy=build_strategy, - share_vars_from=train_cp, - ) - - for i in range(5): - exe.run(train_cp, feed=feed_dict, fetch_list=[loss.name]) - (test_loss,) = exe.run( - test_cp, feed=feed_dict, fetch_list=[loss.name] - ) - (train_loss,) = exe.run( - train_cp, feed=feed_dict, fetch_list=[loss.name] - ) - - avg_test_loss_val = np.array(test_loss).mean() - if math.isnan(float(avg_test_loss_val)): - sys.exit("got NaN loss, testing failed.") - - avg_train_loss_val = np.array(train_loss).mean() - if math.isnan(float(avg_train_loss_val)): - sys.exit("got NaN loss, training failed.") - - np.testing.assert_allclose( - train_loss, test_loss, rtol=1e-05, atol=0.01 - ) - - def test_parallel_testing(self): - build_strategy = fluid.BuildStrategy() - build_strategy.reduce_strategy = ( - fluid.BuildStrategy.ReduceStrategy.AllReduce - ) - if core.is_compiled_with_cuda(): - self.check_network_convergence( - use_cuda=True, build_strategy=build_strategy - ) - self.check_network_convergence( - use_cuda=False, build_strategy=build_strategy - ) - - def test_parallel_testing_with_new_strategy_gpu(self): - build_strategy = fluid.BuildStrategy() - build_strategy.reduce_strategy = ( - fluid.BuildStrategy.ReduceStrategy.Reduce - ) - if core.is_compiled_with_cuda(): - self.check_network_convergence( - use_cuda=True, build_strategy=build_strategy - ) - - def test_parallel_testing_with_new_strategy_cpu(self): - build_strategy = fluid.BuildStrategy() - build_strategy.reduce_strategy = ( - fluid.BuildStrategy.ReduceStrategy.Reduce - ) - self.check_network_convergence( - use_cuda=False, build_strategy=build_strategy - ) - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_pass_builder.py b/python/paddle/fluid/tests/unittests/test_pass_builder.py index dfdcaac934e..67ef3fc39ec 100644 --- a/python/paddle/fluid/tests/unittests/test_pass_builder.py +++ b/python/paddle/fluid/tests/unittests/test_pass_builder.py @@ -47,13 +47,11 @@ class TestPassBuilder(unittest.TestCase): exe.run(startup) feed_dict = {'image': image, 'label': label} - train_cp = compiler.CompiledProgram(main).with_data_parallel( - loss_name=loss.name, build_strategy=build_strategy + train_cp = compiler.CompiledProgram( + main, build_strategy=build_strategy ) - test_cp = compiler.CompiledProgram(test_program).with_data_parallel( - loss_name=loss.name, - build_strategy=build_strategy, - share_vars_from=train_cp, + test_cp = compiler.CompiledProgram( + test_program, build_strategy=build_strategy ) for i in range(5): diff --git a/python/paddle/fluid/tests/unittests/test_print_op.py b/python/paddle/fluid/tests/unittests/test_print_op.py index 19be1c75267..dc970ac3bbf 100755 --- a/python/paddle/fluid/tests/unittests/test_print_op.py +++ b/python/paddle/fluid/tests/unittests/test_print_op.py @@ -131,9 +131,7 @@ class TestPrintOpBackward(unittest.TestCase): exe = paddle.static.Executor(place) exe.run(startup) - binary = paddle.static.CompiledProgram(main).with_data_parallel( - loss_name=loss.name - ) + binary = paddle.static.CompiledProgram(main) img, label = init_data() feed_dict = {"image": img, "label": label} diff --git a/python/paddle/fluid/tests/unittests/test_profiler.py b/python/paddle/fluid/tests/unittests/test_profiler.py index e22deb5df86..c176af4ad9e 100644 --- a/python/paddle/fluid/tests/unittests/test_profiler.py +++ b/python/paddle/fluid/tests/unittests/test_profiler.py @@ -80,13 +80,7 @@ class TestProfiler(unittest.TestCase): if compile_program: # TODO(luotao): profiler tool may have bug with multi-thread parallel executor. # https://github.com/PaddlePaddle/Paddle/pull/25200#issuecomment-650483092 - exec_strategy = fluid.ExecutionStrategy() - exec_strategy.num_threads = 1 - train_program = fluid.compiler.CompiledProgram( - main_program - ).with_data_parallel( - loss_name=avg_cost.name, exec_strategy=exec_strategy - ) + train_program = fluid.compiler.CompiledProgram(main_program) else: train_program = main_program return train_program, startup_program, avg_cost, batch_size, batch_acc diff --git a/python/paddle/fluid/tests/unittests/test_prune.py b/python/paddle/fluid/tests/unittests/test_prune.py index a93516da417..7a2b9986748 100644 --- a/python/paddle/fluid/tests/unittests/test_prune.py +++ b/python/paddle/fluid/tests/unittests/test_prune.py @@ -13,7 +13,6 @@ # limitations under the License. import contextlib -import os import unittest import numpy as np @@ -334,11 +333,7 @@ class TestExecutorRunAutoPrune(unittest.TestCase): sgd_optimizer.minimize(loss1) exe = fluid.Executor(fluid.CPUPlace()) exe.run(startup_program) - compiled_prog = fluid.CompiledProgram( - program - ).with_data_parallel( - loss_name=loss1.name, places=fluid.CPUPlace() - ) + compiled_prog = fluid.CompiledProgram(program) weight_init = np.array( scope.find_var(w_param_attrs.name).get_tensor() ) @@ -543,11 +538,7 @@ class TestExecutorRunAutoPrune(unittest.TestCase): label_np = np.random.randint(1, size=(10, 1)).astype( 'int64' ) - compiled_prog = fluid.CompiledProgram( - program - ).with_data_parallel( - loss_name=loss1.name, places=fluid.CPUPlace() - ) + compiled_prog = fluid.CompiledProgram(program) for i in range(10): res = exe.run( compiled_prog, @@ -621,88 +612,6 @@ class TestExecutorRunAutoPrune(unittest.TestCase): np.testing.assert_array_equal(weight_with_prune, weight_expected) self.assertFalse(np.array_equal(weight_without_prune, weight_expected)) - def test_prune_with_multi_devices(self): - ''' - When training model with multi_devices, the pruned CompiledProgram should share same local scopes. - This test the correctness. - ''' - exe = fluid.Executor(fluid.CPUPlace()) - program = framework.Program() - startup_program = framework.Program() - scope = fluid.Scope() - os.environ['CPU_NUM'] = str(2) - # do not use_prune - with fluid.scope_guard(scope): - with fluid.program_guard(program, startup_program): - ( - x1, - x2, - y1, - y2, - label, - loss1, - loss2, - w1_param_attrs, - w2_param_attrs, - ) = self.net2() - adam_optimizer1 = fluid.optimizer.AdamOptimizer( - learning_rate=0.5 - ) - train1 = adam_optimizer1.minimize(loss1) - cloned_program = program.clone() - adam_optimizer2 = fluid.optimizer.AdamOptimizer( - learning_rate=0.5 - ) - train2 = adam_optimizer2.minimize(loss2) - exe.run(startup_program) - x_np = np.random.random(size=(10, 2)).astype('float32') - label_np = np.random.randint(1, size=(10, 1)).astype('int64') - compiled_prog1 = fluid.CompiledProgram( - program - ).with_data_parallel( - loss_name=loss1.name, places=[fluid.CPUPlace()] * 2 - ) - compiled_prog2 = fluid.CompiledProgram( - program - ).with_data_parallel( - loss_name=loss2.name, places=[fluid.CPUPlace()] * 2 - ) - for i in range(10): - if i % 2 == 1: - res = exe.run( - compiled_prog1, - feed=[ - {'x1': x_np[0:5, :], 'label': label_np[0:5, :]}, - {'x1': x_np[5:, :], 'label': label_np[5:, :]}, - ], - fetch_list=[loss1.name, train1], - use_prune=True, - ) - else: - res = exe.run( - compiled_prog2, - feed={'x2': x_np, 'label': label_np}, - fetch_list=[loss2.name, train2], - use_prune=True, - ) - weight1 = np.array( - scope.find_var(w1_param_attrs.name).get_tensor() - ) - # expected - scope = fluid.Scope() - with fluid.scope_guard(scope): - exe.run(startup_program) - for i in range(10): - if i % 2 == 1: - exe.run( - cloned_program, - feed={'x1': x_np, 'x2': x_np, 'label': label_np}, - fetch_list=[loss1.name], - use_prune=False, - ) - weight2 = np.array(scope.find_var(w1_param_attrs.name).get_tensor()) - np.testing.assert_allclose(weight1, weight2, rtol=1e-05) - def test_prune_program_with_tupe_in_fetch_list(self): ''' If there are multiple optimizers in the program, we can run specific one by diff --git a/python/paddle/fluid/tests/unittests/test_py_func_op.py b/python/paddle/fluid/tests/unittests/test_py_func_op.py index 526e08e9d59..f6d9eb9b0e8 100644 --- a/python/paddle/fluid/tests/unittests/test_py_func_op.py +++ b/python/paddle/fluid/tests/unittests/test_py_func_op.py @@ -202,7 +202,6 @@ def test_main(use_cuda, use_py_func_op, use_parallel_executor): train_cp = compiler.CompiledProgram( fluid.default_main_program() ) - train_cp = train_cp.with_data_parallel(loss_name=loss.name) fetch_list = [loss.name] else: fetch_list = [loss] diff --git a/tools/parallel_UT_rule.py b/tools/parallel_UT_rule.py index 8d22fd6c249..3b6d7b8c050 100755 --- a/tools/parallel_UT_rule.py +++ b/tools/parallel_UT_rule.py @@ -513,7 +513,6 @@ HIGH_PARALLEL_JOB_NEW = [ 'test_new_group_api', 'test_dist_fleet_heter_base', 'test_collective_split_col_linear', - 'test_parallel_executor_mnist', 'test_dist_fleet_ctr2', 'test_dist_fleet_heter_program', 'test_dist_fleet_ctr', @@ -928,7 +927,6 @@ FOURTH_HIGH_PARALLEL_JOB_NEW = [ 'test_one_hot_op', 'test_adaptive_max_pool1d', 'test_label_smooth_op', - 'test_parallel_executor_fetch_feed', 'test_cast', 'test_parallel_dygraph_sync_batch_norm', 'test_collect_fpn_proposals_op', @@ -1443,7 +1441,6 @@ FOURTH_HIGH_PARALLEL_JOB_NEW = [ 'test_nan_inf', 'test_fuse_bn_add_act_pass', 'test_unpool_op', - 'test_parallel_executor_dry_run', 'test_layer_norm_op_v2', 'test_embedding_id_stop_gradient', 'test_mkldnn_fc_act_fuse_pass', @@ -1510,11 +1507,7 @@ FOURTH_HIGH_PARALLEL_JOB_NEW = [ 'test_bilateral_slice_op', 'test_inplace_abn_op', 'test_fetch_unmerged', - 'test_parallel_executor_feed_persistable_var', - 'test_parallel_executor_fetch_isolated_var', - 'test_parallel_executor_inference_feed_partial_data', 'test_parallel_executor_seresnext_base_gpu', - 'test_parallel_executor_test_while_train', 'test_parallel_executor_seresnext_with_fuse_all_reduce_gpu', 'test_parallel_ssa_graph_inference_feed_partial_data', 'test_parallel_executor_seresnext_with_reduce_gpu', @@ -2341,7 +2334,6 @@ TETRAD_PARALLEL_JOB = [ 'test_trt_conv3d_op', 'test_parallel_executor_drop_scope', 'test_tensorrt_engine', - 'test_parallel_executor_mnist', 'test_load_state_dict_from_old_format', 'test_fuse_elewise_add_act_pass', 'test_fetch_unmerged', @@ -2349,7 +2341,6 @@ TETRAD_PARALLEL_JOB = [ 'test_standalone_controlflow', 'test_standalone_multiply_write', 'test_reshape_op', - 'test_parallel_executor_fetch_isolated_var', 'test_inplace_abn_op', 'test_fused_transformer_encoder_layer', 'test_eager_deletion_while_op', @@ -2702,7 +2693,6 @@ TWO_PARALLEL_JOB = [ 'test_sequence_expand', 'test_pool2d_bf16_mkldnn_op', 'test_bilinear_api', - 'test_parallel_executor_inference_feed_partial_data', 'test_initializer_nn', 'test_modified_huber_loss_op', 'test_lookup_table_op', @@ -2961,7 +2951,6 @@ TWO_PARALLEL_JOB = [ 'test_is_empty_op', 'test_py_reader_pin_memory', 'test_train_recognize_digits', - 'test_parallel_executor_feed_persistable_var', 'test_update_loss_scaling_op', 'test_rnn_cell_api', 'test_imperative_load_static_param', @@ -3020,7 +3009,6 @@ TWO_PARALLEL_JOB = [ 'test_profiler', 'test_ir_memory_optimize_pass', 'test_callback_reduce_lr_on_plateau', - 'test_parallel_executor_dry_run', 'test_paddle_save_load', 'test_stack_op', 'test_overlap_add_op', @@ -3231,7 +3219,6 @@ TWO_PARALLEL_JOB = [ 'test_bert', 'test_simple_rnn_op', 'trt_resnext_test', - 'test_parallel_executor_fix_op_run_order', 'test_imperative_double_grad', 'test_cycle_gan', 'test_pretrained_model', diff --git a/tools/static_mode_white_list.py b/tools/static_mode_white_list.py index 9771bf6f195..6524c2a6e8c 100755 --- a/tools/static_mode_white_list.py +++ b/tools/static_mode_white_list.py @@ -370,12 +370,7 @@ STATIC_MODE_TESTING_LIST = [ 'test_pad_op', 'test_pairwise_distance', 'test_parallel_executor_drop_scope', - 'test_parallel_executor_dry_run', - 'test_parallel_executor_feed_persistable_var', - 'test_parallel_executor_inference_feed_partial_data', - 'test_parallel_executor_mnist', 'test_parallel_executor_run_load_infer_program', - 'test_parallel_executor_test_while_train', 'test_parallel_ssa_graph_inference_feed_partial_data', 'test_parameter', 'test_partial_concat_op', @@ -546,7 +541,6 @@ STATIC_MODE_TESTING_LIST = [ 'test_parallel_executor_seresnext_with_reduce_cpu', 'test_parallel_executor_seresnext_with_fuse_all_reduce_cpu', 'test_layers', - 'test_parallel_executor_fetch_feed', 'test_sequence_concat', 'test_sequence_conv', 'test_sequence_enumerate_op', -- GitLab