diff --git a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py index 18ed02a72275437fa6106e57c0383e17647d9700..723aafb171271ed248c93665a21089029a30a836 100644 --- a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py +++ b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py @@ -29,7 +29,8 @@ __all__ = ['TestParallelExecutorBase'] class TestParallelExecutorBase(unittest.TestCase): - def check_network_convergence(self, + @classmethod + def check_network_convergence(cls, method, use_cuda=True, memory_opt=True, diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py index 12d854fb54ac30ff2eeed97c16a78198d92387fd..92a5c58c11773e97ca0bb5ff2c21cbc8df612d58 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py @@ -29,7 +29,7 @@ import unittest import math import numpy as np from functools import partial - +os.environ['CPU_NUM'] = str(4) # FIXME(zcd): If the neural net has dropout_op, the output of ParallelExecutor # and Executor is different. Because, for ParallelExecutor, the dropout_op of # the neural net will be copied N copies(N is the number of device). This will @@ -113,7 +113,6 @@ def bottleneck_block(input, num_filters, stride, cardinality, reduction_ratio): return fluid.layers.elementwise_add(x=short, y=scale, act='relu') -batch_size = 12 img_shape = [3, 224, 224] @@ -181,43 +180,84 @@ def optimizer(learning_rate=0.01): return optimizer +def _batch_size(): + return 12 + + +def _iter(use_cuda): + if use_cuda: + return 10 + return 2 + + +gpu_img, gpu_label = init_data( + batch_size=_batch_size(), img_shape=img_shape, label_range=999) +cpu_img, cpu_label = init_data( + batch_size=_batch_size(), img_shape=img_shape, label_range=999) +feed_dict_gpu = {"image": gpu_img, "label": gpu_label} +feed_dict_cpu = {"image": cpu_img, "label": cpu_label} +model = SE_ResNeXt50Small + + +def _feed_dict(use_cuda): + if use_cuda: + return feed_dict_gpu + return feed_dict_cpu + + +def _get_result_of_origin_model(use_cuda): + global remove_bn + global remove_dropout + remove_bn = True + remove_dropout = True + first_loss, last_loss = TestParallelExecutorBase.check_network_convergence( + model, + feed_dict=_feed_dict(use_cuda), + iter=_iter(use_cuda), + batch_size=_batch_size(), + use_cuda=use_cuda, + use_reduce=False, + optimizer=optimizer) + + return first_loss, last_loss + + +origin_cpu_first_loss, origin_cpu_last_loss = _get_result_of_origin_model(False) +if core.is_compiled_with_cuda(): + origin_gpu_first_loss, origin_gpu_last_loss = _get_result_of_origin_model( + True) + + +def _get_origin_result(use_cuda): + if use_cuda: + assert core.is_compiled_with_cuda(), "Doesn't compiled with CUDA." + return origin_gpu_first_loss, origin_gpu_last_loss + return origin_cpu_first_loss, origin_cpu_last_loss + + class TestResnet(TestParallelExecutorBase): - @classmethod - def setUpClass(cls): - os.environ['CPU_NUM'] = str(4) - global remove_dropout - global remove_bn - remove_dropout = False - remove_bn = False - - def _compare_reduce_and_allreduce(self, - model, - use_cuda, - iter=20, - delta2=1e-5): + def _compare_reduce_and_allreduce(self, use_cuda, delta2=1e-5): if use_cuda and not core.is_compiled_with_cuda(): return global remove_bn + global remove_dropout remove_bn = True + remove_dropout = True - img, label = init_data( - batch_size=batch_size, img_shape=img_shape, label_range=999) all_reduce_first_loss, all_reduce_last_loss = self.check_network_convergence( model, - feed_dict={"image": img, - "label": label}, - iter=iter, - batch_size=batch_size, + feed_dict=_feed_dict(use_cuda), + iter=_iter(use_cuda), + batch_size=_batch_size(), use_cuda=use_cuda, use_reduce=False, optimizer=optimizer) reduce_first_loss, reduce_last_loss = self.check_network_convergence( model, - feed_dict={"image": img, - "label": label}, - iter=iter, - batch_size=batch_size, + feed_dict=_feed_dict(use_cuda), + iter=_iter(use_cuda), + batch_size=_batch_size(), use_cuda=use_cuda, use_reduce=True, optimizer=optimizer) @@ -232,10 +272,9 @@ class TestResnet(TestParallelExecutorBase): all_reduce_first_loss_seq, all_reduce_last_loss_seq = self.check_network_convergence( model, - feed_dict={"image": img, - "label": label}, - iter=iter, - batch_size=batch_size, + feed_dict=_feed_dict(use_cuda), + iter=_iter(use_cuda), + batch_size=_batch_size(), use_cuda=use_cuda, use_reduce=False, optimizer=optimizer, @@ -243,10 +282,9 @@ class TestResnet(TestParallelExecutorBase): reduce_first_loss_seq, reduce_last_loss_seq = self.check_network_convergence( model, - feed_dict={"image": img, - "label": label}, - iter=iter, - batch_size=batch_size, + feed_dict=_feed_dict(use_cuda), + iter=_iter(use_cuda), + batch_size=_batch_size(), use_cuda=use_cuda, use_reduce=True, optimizer=optimizer, @@ -267,37 +305,28 @@ class TestResnet(TestParallelExecutorBase): for loss in zip(all_reduce_last_loss_seq, reduce_last_loss_seq): self.assertAlmostEquals(loss[0], loss[1], delta=delta2) - def _check_resnet_convergence(self, - model, - check_func_1, - check_func_2, - use_cuda, - iter=20, - delta2=1e-5, - compare_seperately=True): + def _compare_result_with_origin_model(self, + get_origin_result, + check_func_2, + use_cuda, + delta2=1e-5, + compare_seperately=True, + rm_drop_out=False, + rm_bn=False): if use_cuda and not core.is_compiled_with_cuda(): return - global remove_dropout global remove_bn - remove_dropout = True - remove_bn = True + global remove_dropout + remove_bn = rm_bn or use_cuda + remove_dropout = rm_drop_out - img, label = init_data( - batch_size=batch_size, img_shape=img_shape, label_range=999) - func_1_first_loss, func_1_last_loss = check_func_1( - model, - feed_dict={"image": img, - "label": label}, - iter=iter, - batch_size=batch_size, - use_cuda=use_cuda) + func_1_first_loss, func_1_last_loss = get_origin_result(use_cuda) func_2_first_loss, func_2_last_loss = check_func_2( model, - feed_dict={"image": img, - "label": label}, - iter=iter, - batch_size=batch_size, + feed_dict=_feed_dict(use_cuda), + iter=_iter(use_cuda), + batch_size=_batch_size(), use_cuda=use_cuda) if compare_seperately: @@ -311,97 +340,55 @@ class TestResnet(TestParallelExecutorBase): self.assertAlmostEquals( np.mean(func_1_last_loss), func_2_last_loss[0], delta=delta2) - def _compare_with_fused_all_reduce(self, - model, - use_cuda, - iter=20, - delta2=1e-5): - if use_cuda and not core.is_compiled_with_cuda(): - return - - global remove_bn - remove_bn = True - - img, label = init_data( - batch_size=batch_size, img_shape=img_shape, label_range=999) - all_reduce_first_loss, all_reduce_last_loss = self.check_network_convergence( - model, - feed_dict={"image": img, - "label": label}, - iter=iter, - batch_size=batch_size, - use_cuda=use_cuda, - fuse_all_reduce_ops=False, - optimizer=optimizer) - reduce_first_loss, reduce_last_loss = self.check_network_convergence( - model, - feed_dict={"image": img, - "label": label}, - iter=iter, - batch_size=batch_size, - use_cuda=use_cuda, - fuse_all_reduce_ops=True, - optimizer=optimizer) - - for loss in zip(all_reduce_first_loss, reduce_first_loss): - self.assertAlmostEquals(loss[0], loss[1], delta=1e-5) - for loss in zip(all_reduce_last_loss, reduce_last_loss): - self.assertAlmostEquals(loss[0], loss[1], delta=delta2) - def test_seresnext_with_reduce(self): - self._compare_reduce_and_allreduce( - model=SE_ResNeXt50Small, use_cuda=True, delta2=1e-2) - self._compare_reduce_and_allreduce( - model=SE_ResNeXt50Small, use_cuda=False, iter=5) - - def test_seresnext_with_fused_all_reduce(self): - self._compare_with_fused_all_reduce( - model=SE_ResNeXt50Small, use_cuda=True, delta2=1e-3) - self._compare_with_fused_all_reduce( - model=SE_ResNeXt50Small, use_cuda=False, iter=2, delta2=1e-3) + self._compare_reduce_and_allreduce(use_cuda=False, delta2=1e-3) + self._compare_reduce_and_allreduce(use_cuda=True, delta2=1e-2) def test_seresnext_with_learning_rate_decay(self): - check_func_1 = partial( - self.check_network_convergence, - optimizer=optimizer, - use_parallel_executor=True) + # NOTE(zcd): This test is compare the result of use parallel_executor and executor, + # and the result of drop_out op and batch_norm op in this two executor + # have diff, so the two ops should be removed from the model. + check_func_1 = _get_origin_result check_func_2 = partial( self.check_network_convergence, optimizer=optimizer, use_parallel_executor=False) - self._check_resnet_convergence( - SE_ResNeXt50Small, - check_func_1, - check_func_2, - use_cuda=True, - compare_seperately=False) - self._check_resnet_convergence( - SE_ResNeXt50Small, + self._compare_result_with_origin_model( check_func_1, check_func_2, use_cuda=False, + rm_drop_out=True, + rm_bn=True, compare_seperately=False, - iter=2, delta2=1e-3) + self._compare_result_with_origin_model( + check_func_1, + check_func_2, + use_cuda=True, + rm_drop_out=True, + rm_bn=True, + compare_seperately=False) - def test_seresnext_with_fused_optimizer_ops(self): - check_func_1 = partial( - self.check_network_convergence, fuse_all_optimizer_ops=False) + def test_seresnext_with_fused_all_reduce(self): + # NOTE(zcd): In order to make the program faster, + # this unit test remove drop_out and batch_norm. + check_func_1 = _get_origin_result check_func_2 = partial( - self.check_network_convergence, fuse_all_optimizer_ops=True) - # TODO(zcd): this test failed random, I will fix it in next PR. - # self._check_resnet_convergence( - # SE_ResNeXt50Small, - # check_func_1, - # check_func_2, - # use_cuda=True, - # delta2=1e-3) - self._check_resnet_convergence( - SE_ResNeXt50Small, + self.check_network_convergence, + optimizer=optimizer, + fuse_all_reduce_ops=True) + self._compare_result_with_origin_model( check_func_1, check_func_2, use_cuda=False, - iter=2, + rm_drop_out=True, + rm_bn=True) + self._compare_result_with_origin_model( + check_func_1, + check_func_2, + use_cuda=True, + rm_drop_out=True, + rm_bn=True, delta2=1e-3)