diff --git a/python/paddle/fluid/tests/unittests/dist_mnist.py b/python/paddle/fluid/tests/unittests/dist_mnist.py index 722b3e159abf4737b2bb43c7b84e23a3618cda12..85a96c0b53f6bc08687965048d6251265055a6fe 100644 --- a/python/paddle/fluid/tests/unittests/dist_mnist.py +++ b/python/paddle/fluid/tests/unittests/dist_mnist.py @@ -46,7 +46,8 @@ def cnn_model(data): pool_size=2, pool_stride=2, act="relu", - param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant())) + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant( + value=0.3))) conv_pool_2 = fluid.nets.simple_img_conv_pool( input=conv_pool_1, filter_size=5, @@ -54,7 +55,8 @@ def cnn_model(data): pool_size=2, pool_stride=2, act="relu", - param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant())) + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant( + value=0.2))) SIZE = 10 input_shape = conv_pool_2.shape @@ -66,8 +68,7 @@ def cnn_model(data): size=SIZE, act="softmax", param_attr=fluid.param_attr.ParamAttr( - initializer=fluid.initializer.NormalInitializer( - loc=0.0, scale=scale, seed=1))) + initializer=fluid.initializer.Constant(value=0.1))) return predict diff --git a/python/paddle/fluid/tests/unittests/dist_se_resnext.py b/python/paddle/fluid/tests/unittests/dist_se_resnext.py index 1307ba4e4ad11ef01094c44068d916ff2d442f78..0387e911880256ea6b8efb6f2311bbf4c4f8c0f2 100644 --- a/python/paddle/fluid/tests/unittests/dist_se_resnext.py +++ b/python/paddle/fluid/tests/unittests/dist_se_resnext.py @@ -129,7 +129,12 @@ class SE_ResNeXt(): input=conv, pool_size=7, pool_type='avg', global_pooling=True) drop = fluid.layers.dropout(x=pool, dropout_prob=0.2) stdv = 1.0 / math.sqrt(drop.shape[1] * 1.0) - out = fluid.layers.fc(input=drop, size=class_dim, act='softmax') + out = fluid.layers.fc( + input=drop, + size=class_dim, + act='softmax', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.2))) return out def shortcut(self, input, ch_out, stride): @@ -179,7 +184,7 @@ class SE_ResNeXt(): act=None, # avoid pserver CPU init differs from GPU param_attr=fluid.ParamAttr( - initializer=fluid.initializer.Constant()), + initializer=fluid.initializer.Constant(value=0.2)), bias_attr=False) return fluid.layers.batch_norm(input=conv, act=act) @@ -228,10 +233,8 @@ class DistSeResneXt2x2(TestDistRunnerBase): lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] optimizer = fluid.optimizer.Momentum( - # FIXME(typhoonzero): add back LR decay once ParallelExecutor fixed. - #learning_rate=fluid.layers.piecewise_decay( - # boundaries=bd, values=lr), - learning_rate=base_lr, + learning_rate=fluid.layers.piecewise_decay( + boundaries=bd, values=lr), momentum=0.9, regularization=fluid.regularizer.L2Decay(1e-4)) optimizer.minimize(avg_cost) diff --git a/python/paddle/fluid/tests/unittests/dist_transformer.py b/python/paddle/fluid/tests/unittests/dist_transformer.py index ab4c5c3f368333ac42781aebd495579b5c26f388..239adcb9d5900d4073a6c07cb189ab7503aea86e 100644 --- a/python/paddle/fluid/tests/unittests/dist_transformer.py +++ b/python/paddle/fluid/tests/unittests/dist_transformer.py @@ -265,9 +265,9 @@ def main(role="pserver", if __name__ == "__main__": - if len(sys.argv) != 7: + if len(sys.argv) != 8: print( - "Usage: python dist_transformer.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist]" + "Usage: python dist_transformer.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist] [sync_mode]" ) role = sys.argv[1] endpoints = sys.argv[2] @@ -275,6 +275,8 @@ if __name__ == "__main__": current_endpoint = sys.argv[4] trainers = int(sys.argv[5]) is_dist = True if sys.argv[6] == "TRUE" else False + # FIXME(typhoonzero): refine this test. + is_async = True if sys.argv[7] == "TRUE" else False main( role=role, endpoints=endpoints, diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 4c71181d0d736bd1a8796b2d38ed1667557e3db8..0e815c91446b285ba2c2c5aa9ad18d97f51eae65 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -30,7 +30,7 @@ class TestDistRunnerBase(object): "get_model should be implemented by child classes.") def get_transpiler(self, trainer_id, main_program, pserver_endpoints, - trainers): + trainers, sync_mode): # NOTE: import fluid until runtime, or else forking processes will cause error. import paddle import paddle.fluid as fluid @@ -39,17 +39,22 @@ class TestDistRunnerBase(object): trainer_id=trainer_id, program=main_program, pservers=pserver_endpoints, - trainers=trainers) + trainers=trainers, + sync_mode=sync_mode) return t - def run_pserver(self, pserver_endpoints, trainers, current_endpoint, - trainer_id): + def run_pserver(self, + pserver_endpoints, + trainers, + current_endpoint, + trainer_id, + sync_mode=True): import paddle import paddle.fluid as fluid self.get_model(batch_size=2) t = self.get_transpiler(trainer_id, fluid.default_main_program(), pserver_endpoints, - trainers) + trainers, sync_mode) pserver_prog = t.get_pserver_program(current_endpoint) startup_prog = t.get_startup_program(current_endpoint, pserver_prog) place = fluid.CPUPlace() @@ -57,7 +62,13 @@ class TestDistRunnerBase(object): exe.run(startup_prog) exe.run(pserver_prog) - def run_trainer(self, place, endpoints, trainer_id, trainers, is_dist=True): + def run_trainer(self, + place, + endpoints, + trainer_id, + trainers, + is_dist=True, + sync_mode=True): import paddle import paddle.fluid as fluid test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \ @@ -65,7 +76,7 @@ class TestDistRunnerBase(object): if is_dist: t = self.get_transpiler(trainer_id, fluid.default_main_program(), endpoints, - trainers) + trainers, sync_mode) trainer_prog = t.get_trainer_program() else: trainer_prog = fluid.default_main_program() @@ -106,9 +117,9 @@ def runtime_main(test_class): import paddle.fluid as fluid import paddle.fluid.core as core - if len(sys.argv) != 7: + if len(sys.argv) != 8: print( - "Usage: python dist_se_resnext.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist]" + "Usage: python dist_se_resnext.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist] [sync_mode]" ) role = sys.argv[1] endpoints = sys.argv[2] @@ -116,34 +127,43 @@ def runtime_main(test_class): current_endpoint = sys.argv[4] trainers = int(sys.argv[5]) is_dist = True if sys.argv[6] == "TRUE" else False + sync_mode = True if sys.argv[7] == "TRUE" else False model = test_class() if role == "pserver": - model.run_pserver(endpoints, trainers, current_endpoint, trainer_id) + model.run_pserver(endpoints, trainers, current_endpoint, trainer_id, + sync_mode) else: p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( ) else fluid.CPUPlace() - model.run_trainer(p, endpoints, trainer_id, trainers, is_dist) + model.run_trainer(p, endpoints, trainer_id, trainers, is_dist, + sync_mode) import paddle.compat as cpt class TestDistBase(unittest.TestCase): + def _setup_config(self): + raise NotImplementedError("tests should have _setup_config implemented") + def setUp(self): self._trainers = 2 self._pservers = 2 self._ps_endpoints = "127.0.0.1:9123,127.0.0.1:9124" self._python_interp = "python" + self._sync_mode = True + self._setup_config() def start_pserver(self, model_file, check_error_log): + sync_mode_str = "TRUE" if self._sync_mode else "FALSE" ps0_ep, ps1_ep = self._ps_endpoints.split(",") - ps0_cmd = "%s %s pserver %s 0 %s %d TRUE" % \ + ps0_cmd = "%s %s pserver %s 0 %s %d TRUE %s" % \ (self._python_interp, model_file, self._ps_endpoints, ps0_ep, - self._trainers) - ps1_cmd = "%s %s pserver %s 0 %s %d TRUE" % \ + self._trainers, sync_mode_str) + ps1_cmd = "%s %s pserver %s 0 %s %d TRUE %s" % \ (self._python_interp, model_file, self._ps_endpoints, ps1_ep, - self._trainers) + self._trainers, sync_mode_str) ps0_pipe = subprocess.PIPE ps1_pipe = subprocess.PIPE @@ -195,9 +215,10 @@ class TestDistBase(unittest.TestCase): # Run local to get a base line env_local = {"CUDA_VISIBLE_DEVICES": "0"} env_local.update(required_envs) - local_cmd = "%s %s trainer %s 0 %s %d FLASE" % \ + sync_mode_str = "TRUE" if self._sync_mode else "FALSE" + local_cmd = "%s %s trainer %s 0 %s %d FLASE %s" % \ (self._python_interp, model_file, - "127.0.0.1:1234", "127.0.0.1:1234", 1) + "127.0.0.1:1234", "127.0.0.1:1234", 1, sync_mode_str) if not check_error_log: local_proc = subprocess.Popen( local_cmd.split(" "), @@ -226,12 +247,12 @@ class TestDistBase(unittest.TestCase): self._wait_ps_ready(ps1.pid) ps0_ep, ps1_ep = self._ps_endpoints.split(",") - tr0_cmd = "%s %s trainer %s 0 %s %d TRUE" % \ + tr0_cmd = "%s %s trainer %s 0 %s %d TRUE %s" % \ (self._python_interp, model_file, self._ps_endpoints, ps0_ep, - self._trainers) - tr1_cmd = "%s %s trainer %s 1 %s %d TRUE" % \ + self._trainers, sync_mode_str) + tr1_cmd = "%s %s trainer %s 1 %s %d TRUE %s" % \ (self._python_interp, model_file, self._ps_endpoints, ps1_ep, - self._trainers) + self._trainers, sync_mode_str) env0 = {"CUDA_VISIBLE_DEVICES": "0"} env1 = {"CUDA_VISIBLE_DEVICES": "1"} diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist.py b/python/paddle/fluid/tests/unittests/test_dist_mnist.py index 4ec68d411b0f0e9ae89b107914e8fd844a19228b..36bab6f04603b7ad3218603489eead859bfcb5b6 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist.py @@ -17,10 +17,21 @@ import unittest from test_dist_base import TestDistBase -class TestDistSeResneXt2x2(TestDistBase): +class TestDistMnist2x2(TestDistBase): + def _setup_config(self): + self._sync_mode = True + def test_se_resnext(self): self.check_with_place("dist_mnist.py", delta=1e-7) +class TestDistMnistAsync(TestDistBase): + def _setup_config(self): + self._sync_mode = False + + def test_se_resnext(self): + self.check_with_place("dist_mnist.py", delta=200) + + if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py b/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py index 16525f6fdb60a90a44a628fb0648f4130218c102..c0e9fa38e7d1eadd89eff9a8ba4442f888b8120e 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py +++ b/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py @@ -18,9 +18,20 @@ from test_dist_base import TestDistBase class TestDistSeResneXt2x2(TestDistBase): + def _setup_config(self): + self._sync_mode = True + def test_se_resnext(self): self.check_with_place("dist_se_resnext.py", delta=1e-7) +class TestDistSeResneXt2x2Async(TestDistBase): + def _setup_config(self): + self._sync_mode = False + + def test_se_resnext(self): + self.check_with_place("dist_se_resnext.py", delta=100) + + if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_transformer.py b/python/paddle/fluid/tests/unittests/test_dist_transformer.py index 313207ff9ce054f81322224cb6ceafaaf25bbedf..62fcf5953f93637a20beed649de21476a8673419 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_transformer.py +++ b/python/paddle/fluid/tests/unittests/test_dist_transformer.py @@ -19,6 +19,9 @@ from test_dist_base import TestDistBase class TestDistTransformer2x2(TestDistBase): + def _setup_config(self): + self._sync_mode = True + def test_transformer(self): # TODO(paddle-dev): check if the delta is OK. # Usually start around ~8000 and converge to ~5000 diff --git a/python/paddle/fluid/tests/unittests/test_dist_word2vec.py b/python/paddle/fluid/tests/unittests/test_dist_word2vec.py index e43992c488d35d1b3f670e13650d420b0498eeec..38af149ad336fcb818c3cbc9c686bcbdf00238be 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_word2vec.py +++ b/python/paddle/fluid/tests/unittests/test_dist_word2vec.py @@ -18,9 +18,20 @@ from test_dist_base import TestDistBase class TestDistSeResneXt2x2(TestDistBase): + def _setup_config(self): + self._sync_mode = True + def test_se_resnext(self): self.check_with_place("dist_word2vec.py", delta=1e-7) +class TestDistSeResneXt2x2Async(TestDistBase): + def _setup_config(self): + self._sync_mode = False + + def test_se_resnext(self): + self.check_with_place("dist_word2vec.py", delta=1) + + if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 57bc2e8a0ba173bb1273a5183340d0b618f0d73c..112a6839a27aad61f708f1d51b68fde03e716134 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -1209,8 +1209,8 @@ class DistributeTranspiler(object): elif op_type == "momentum": if varkey == "Velocity": return param_shape - elif op_type == "": - if varkey == "Moment": + elif op_type == "rmsprop": + if varkey in ["Moment", "MeanSquare"]: return param_shape elif op_type == "sgd": pass @@ -1289,8 +1289,6 @@ class DistributeTranspiler(object): pserver_block = program.global_block() new_inputs = collections.OrderedDict() - # update param/grad shape first, then other inputs like - # moment can use the updated shape def _get_param_block(opt_op): # param is already created on global program param_block = None @@ -1303,22 +1301,6 @@ class DistributeTranspiler(object): for key in opt_op.input_names: if key == "Grad": new_inputs[key] = merged_var - # For RMSProp optimizer - elif key == "Moment" or key == "MeanSquare": - param_block = _get_param_block(opt_op) - if not param_block: - return - moment_var = origin_program.global_block().vars[opt_op.input( - key)[0]] - tmpvar = pserver_block.create_var( - name=moment_var.name, - persistable=moment_var.persistable, - dtype=moment_var.dtype, - # change to use same shape as param - # TODO(typhoonzero): didn't append .block in the var name, - # may affect checkpoint saving? Need to verify. - shape=param_block.shape) - new_inputs[key] = tmpvar elif key == "Param": param_block = _get_param_block(opt_op) if not param_block: @@ -1346,7 +1328,7 @@ class DistributeTranspiler(object): for key in opt_op.input_names: new_shape = None - if key in ["Param", "Grad", "LearningRate", "Moment", "MeanSquare"]: + if key in ["Param", "Grad", "LearningRate"]: continue var = self.origin_program.global_block().vars[opt_op.input(key)[0]] # update accumulator variable shape