diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 58be61362cabf22a3543af364f1b0bd180df826a..5ce04cf1301fc8abb0cadf3c043cf0575dda39af 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -218,7 +218,10 @@ void ParallelExecutor::BCastParamsToDevs( auto local_scope = member_->local_scopes_[i]; auto *t = local_scope->Var(var)->GetMutable(); - if (member_->use_all_reduce_ || member_->use_cuda_) { + + // FIXME(zcd): LR_DECAY_COUNTER should not be shared. This is a hot fix. + if (member_->use_all_reduce_ || member_->use_cuda_ || + var == "@LR_DECAY_COUNTER@") { t->Resize(dims); t->mutable_data(cpu, main_tensor.type()); paddle::framework::TensorCopy(main_tensor, cpu, t); diff --git a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py index f5c93319de02249a22981b50733da05bb8658e3a..fcf86cc5839113b75855ce97459b2ee4881238cd 100644 --- a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py +++ b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py @@ -35,7 +35,8 @@ class TestParallelExecutorBase(unittest.TestCase): feed_dict=None, seed=None, use_parallel_executor=True, - use_reduce=False): + use_reduce=False, + optimizer=fluid.optimizer.Adam): def run_executor(exe, feed, fetch_list, program=None): if isinstance(exe, fluid.ParallelExecutor): res = exe.run(fetch_list=fetch_list, feed=feed) @@ -57,8 +58,8 @@ class TestParallelExecutorBase(unittest.TestCase): main.random_seed = seed loss = method(use_feed=feed_dict is not None) - adam = fluid.optimizer.Adam() - adam.minimize(loss) + + optimizer().minimize(loss) if memory_opt: fluid.memory_optimize(main) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py index 57ae36dbdd401afd34d06f460ae613db18240a2e..4d39505b66abf44249e0ea160b82aaf7be0638cb 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py @@ -13,8 +13,12 @@ # limitations under the License. import paddle.fluid as fluid +import paddle.fluid.layers.ops as ops +from paddle.fluid.initializer import init_on_cpu +from paddle.fluid.layers.learning_rate_scheduler import _decay_step_counter from parallel_executor_test_base import TestParallelExecutorBase import unittest +import math import os @@ -131,27 +135,71 @@ def SE_ResNeXt50Small(batch_size=2, use_feed=False): class TestResnet(TestParallelExecutorBase): - def check_resnet_convergence(self, use_cuda, use_reduce=False, iter=20): + def check_resnet_convergence_with_learning_rate_decay(self, + use_cuda=True, + use_reduce=False, + iter=20): + os.environ['CPU_NUM'] = str(4) + def _cosine_decay(learning_rate, step_each_epoch, epochs=120): + """ + Applies cosine decay to the learning rate. + lr = 0.05 * (math.cos(epoch * (math.pi / 120)) + 1) + """ + global_step = _decay_step_counter() + + with init_on_cpu(): + epoch = ops.floor(global_step / step_each_epoch) + decayed_lr = learning_rate * \ + (ops.cos(epoch * (math.pi / epochs)) + 1)/2 + return decayed_lr + + def _optimizer(learning_rate=0.01): + optimizer = fluid.optimizer.Momentum( + learning_rate=_cosine_decay( + learning_rate=learning_rate, step_each_epoch=2, epochs=1), + momentum=0.9, + regularization=fluid.regularizer.L2Decay(1e-4)) + return optimizer + import functools + batch_size = 2 - self.check_network_convergence( + + single_first_loss, single_last_loss = self.check_network_convergence( functools.partial( SE_ResNeXt50Small, batch_size=batch_size), iter=iter, batch_size=batch_size, use_cuda=use_cuda, - use_reduce=use_reduce) - - def test_resnet(self): - self.check_resnet_convergence(True) - self.check_resnet_convergence(False, iter=5) + use_reduce=use_reduce, + optimizer=_optimizer, + use_parallel_executor=False) - def test_resnet_with_new_strategy(self): - # use_cuda, use_reduce - self.check_resnet_convergence(True, True) - self.check_resnet_convergence(False, True, iter=5) + parallel_first_loss, parallel_last_loss = self.check_network_convergence( + functools.partial( + SE_ResNeXt50Small, batch_size=batch_size), + iter=iter, + batch_size=batch_size, + use_cuda=use_cuda, + use_reduce=use_reduce, + optimizer=_optimizer) + + for p_f in parallel_first_loss: + self.assertAlmostEquals(p_f, single_first_loss[0], delta=1e-6) + for p_l in parallel_last_loss: + self.assertAlmostEquals(p_l, single_last_loss[0], delta=1e-6) + + def test_seresnext_with_learning_rate_decay(self): + self.check_resnet_convergence_with_learning_rate_decay(True, False) + self.check_resnet_convergence_with_learning_rate_decay( + False, False, iter=5) + + def test_seresnext_with_new_strategy_with_learning_rate_decay(self): + self.check_resnet_convergence_with_learning_rate_decay(True, True) + self.check_resnet_convergence_with_learning_rate_decay( + False, True, iter=5) if __name__ == '__main__':