diff --git a/python/paddle/fluid/clip.py b/python/paddle/fluid/clip.py index d280ec50354c6444562623366a20c9d32295e993..5f703aab25454bed4957bbe0d35d2477cda2e044 100644 --- a/python/paddle/fluid/clip.py +++ b/python/paddle/fluid/clip.py @@ -21,6 +21,7 @@ import functools from . import layers from . import framework from . import core +from . import name_scope __all__ = [ 'set_gradient_clip', @@ -63,10 +64,12 @@ class ErrorClipByValue(BaseErrorClipAttr): CLIP_MIN = -1e-6 prog = fluid.framework.Program() with fluid.program_guard(main_program=prog): - image = fluid.layers.data(name='x', shape=[784], dtype='float32') + image = fluid.layers.data( + name='x', shape=[784], dtype='float32') hidden1 = fluid.layers.fc(input=image, size=128, act='relu') hidden2 = fluid.layers.fc(input=hidden1, size=64, act='relu') - predict = fluid.layers.fc(input=hidden2, size=10, act='softmax') + predict = fluid.layers.fc( + input=hidden2, size=10, act='softmax') label = fluid.layers.data(name='y', shape=[1], dtype='int64') cost = fluid.layers.cross_entropy(input=predict, label=label) avg_cost = fluid.layers.mean(cost) @@ -154,13 +157,15 @@ class GradientClipByValue(BaseGradientClipAttr): import paddle.fluid as fluid w_param_attrs = fluid.ParamAttr(name=None, - initializer=fluid.initializer.UniformInitializer(low=-1.0, high=1.0, seed=0), + initializer=fluid.initializer.UniformInitializer( + low=-1.0, high=1.0, seed=0), learning_rate=1.0, regularizer=fluid.regularizer.L1Decay(1.0), trainable=True, gradient_clip=fluid.clip.GradientClipByValue(-1.0, 1.0)) x = fluid.layers.data(name='x', shape=[10], dtype='float32') - y_predict = fluid.layers.fc(input=x, size=1, param_attr=w_param_attrs) + y_predict = fluid.layers.fc( + input=x, size=1, param_attr=w_param_attrs) """ def __init__(self, max, min=None): @@ -184,8 +189,8 @@ class GradientClipByValue(BaseGradientClipAttr): class GradientClipByNorm(BaseGradientClipAttr): - """ - Convert the input multidimensional Tensor :math:`X` to a multidimensional Tensor whose L2 norm does not exceed the given two-norm maximum ( :math:`clip\_norm` ). + """ + Convert the input multidimensional Tensor :math:`X` to a multidimensional Tensor whose L2 norm does not exceed the given two-norm maximum ( :math:`clip\_norm` ). The tensor is not passed through this class, but passed through the parametre of ``main_program`` in ``fluid.program_guard``. @@ -193,22 +198,22 @@ class GradientClipByNorm(BaseGradientClipAttr): .. math:: Out = - \\left \{ - \\begin{aligned} - & X & & if (norm(X) \\leq clip\_norm) \\\\ - & \\frac{clip\_norm*X}{norm(X)} & & if (norm(X) > clip\_norm) \\\\ - \\end{aligned} - \\right. + \\left \{ + \\begin{aligned} + & X & & if (norm(X) \\leq clip\_norm) \\\\ + & \\frac{clip\_norm*X}{norm(X)} & & if (norm(X) > clip\_norm) \\\\ + \\end{aligned} + \\right. where :math:`norm(X)` represents the L2 norm of :math:`X`. .. math:: - norm(X) = ( \\sum_{i=1}^{n}|x\_i|^2)^{ \\frac{1}{2}} + norm(X) = ( \\sum_{i=1}^{n}|x\_i|^2)^{ \\frac{1}{2}} Args: clip_norm(float): The maximum norm value - + Examples: .. code-block:: python @@ -220,11 +225,14 @@ class GradientClipByNorm(BaseGradientClipAttr): startup_program = fluid.framework.Program() with fluid.program_guard( main_program=prog, startup_program=startup_program): - image = fluid.data(name='x', shape=[None, 784], dtype='float32', lod_level=0) - label = fluid.data(name='y', shape=[None, 1], dtype='int64', lod_level=0) + image = fluid.data( + name='x', shape=[None, 784], dtype='float32', lod_level=0) + label = fluid.data( + name='y', shape=[None, 1], dtype='int64', lod_level=0) hidden1 = fluid.layers.fc(input=image, size=128, act='relu') hidden2 = fluid.layers.fc(input=hidden1, size=64, act='relu') - predict = fluid.layers.fc(input=hidden2, size=10, act='softmax') + predict = fluid.layers.fc( + input=hidden2, size=10, act='softmax') cost = fluid.layers.cross_entropy(input=predict, label=label) avg_cost = fluid.layers.mean(cost) prog_clip = prog.clone() @@ -237,22 +245,23 @@ class GradientClipByNorm(BaseGradientClipAttr): p_g_clip = fluid.clip.append_gradient_clip_ops(p_g_clip) grad_list = [elem[1] for elem in p_g] grad_clip_list = [elem[1] for elem in p_g_clip] - train_reader = paddle.batch( + train_reader = paddle.batch( paddle.reader.shuffle( paddle.dataset.mnist.train(), buf_size=8192), batch_size=128) - + exe = fluid.Executor(place) feeder = fluid.DataFeeder(feed_list=[image, label], place=place) exe.run(startup_program) - + count = 0 for data in train_reader(): count += 1 print("count:%s" % count) if count > 5: break - out = exe.run(prog, feed=feeder.feed(data), fetch_list=grad_list) + out = exe.run(prog, feed=feeder.feed( + data), fetch_list=grad_list) out_clip = exe.run(prog_clip, feed=feeder.feed(data), fetch_list=grad_clip_list) @@ -279,8 +288,8 @@ class GradientClipByGlobalNorm(BaseGradientClipAttr): Given a list of tensors ``t_list`` , and a clipping ratio ``clip_norm``, this operation returns a instance of this class as first parameter of - ``set_gradient_clip`` method, second parameter of ``set_gradient_clip`` - is used to compute clipped tensors list ``list_clipped`` (default value + ``set_gradient_clip`` method, second parameter of ``set_gradient_clip`` + is used to compute clipped tensors list ``list_clipped`` (default value is ``None``, compute global norm ``global_norm`` based in all tensors). global norm (global_norm) of all tensors in t_list. @@ -315,11 +324,13 @@ class GradientClipByGlobalNorm(BaseGradientClipAttr): startup_program = fluid.framework.Program() with fluid.program_guard( main_program=prog, startup_program=startup_program): - image = fluid.layers.data(name='x', shape=[784], dtype='float32') + image = fluid.layers.data( + name='x', shape=[784], dtype='float32') label = fluid.layers.data(name='y', shape=[1], dtype='int64') hidden1 = fluid.layers.fc(input=image, size=128, act='relu') hidden2 = fluid.layers.fc(input=hidden1, size=64, act='relu') - predict = fluid.layers.fc(input=hidden2, size=10, act='softmax') + predict = fluid.layers.fc( + input=hidden2, size=10, act='softmax') cost = fluid.layers.cross_entropy(input=predict, label=label) avg_cost = fluid.layers.mean(cost) @@ -352,7 +363,8 @@ class GradientClipByGlobalNorm(BaseGradientClipAttr): print("count:%s" % count) if count > 5: break - out = exe.run(prog, feed=feeder.feed(data), fetch_list=grad_list) + out = exe.run(prog, feed=feeder.feed( + data), fetch_list=grad_list) out_clip = exe.run(prog_clip, feed=feeder.feed(data), fetch_list=grad_clip_list) @@ -432,11 +444,12 @@ def set_gradient_clip(clip, param_list=None, program=None): Examples: .. code-block:: python - + import paddle.fluid as fluid def network(): - image = fluid.data(name='image', shape=[None, 28], dtype='float32') + image = fluid.data(name='image', shape=[ + None, 28], dtype='float32') param_attr1 = fluid.ParamAttr("fc1_param") fc1 = fluid.layers.fc(image, size=10, param_attr=param_attr1) param_attr2 = fluid.ParamAttr("fc2_param") @@ -498,7 +511,7 @@ def append_gradient_clip_ops(param_grads): if g is None: continue with p.block.program._optimized_guard( - [p, g]), framework.name_scope('append_clip'): + [p, g]), framework.name_scope('append_clip_@CLIP'): clip_attr = getattr(p, 'gradient_clip_attr', NullGradientClipAttr()) if clip_attr is None: clip_attr = NullGradientClipAttr() @@ -514,9 +527,25 @@ def append_gradient_clip_ops(param_grads): if g is None: continue with p.block.program._optimized_guard( - [p, g]), framework.name_scope('append_graident_clip'): + [p, g]), framework.name_scope('append_graident_clip_@CLIP'): res.append(clip_attr._create_operators(param=p, grad=g)) + # change wrong mapping relation between param & grad in clip op + clip_flag = '@CLIP' + for p, g in param_grads: + if g is None: + continue + for op in p.block.program.global_block().ops: + if 'op_namescope' in op.all_attrs() and clip_flag in op.attr( + "op_namescope"): + if op.attr('op_role_var'): + param_name = op.attr('op_role_var')[0] + index = 0 + for i in range(len(res)): + if res[i][0].name == param_name: + index = i + correct_p_g = [param_name, res[index][1].name] + op._set_attr('op_role_var', correct_p_g) return res diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index 3988c6900189553948cf2dbed4316df669bbb451..f75f65cade7f494a85873b7e03b226ea4016085b 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -73,8 +73,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): datas = [dnn_data, lr_data, label] # build dnn model - # add 12800 for test huge dense Variable - dnn_layer_dims = [128, 128000, 64, 32, 1] + dnn_layer_dims = [128, 128, 64, 32, 1] dnn_embedding = fluid.layers.embedding( is_distributed=False, input=dnn_data, diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py index 9227eb651faabaf068b64745877a8fa6073f80d2..de3e3be840f7cea789d0b295f1703b03cf6416c3 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py @@ -39,6 +39,8 @@ from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import f from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory +__all__ = ['FleetDistRunnerBase', 'TestFleetBase', 'runtime_main'] + RUN_STEP = 5 LEARNING_RATE = 0.01 DIST_UT_PORT = 0 @@ -80,6 +82,19 @@ class FleetDistRunnerBase(object): avg_cost = self.net() + use_grad_clip = int(os.getenv('GRAD_CLIP', 0)) + if use_grad_clip: + # 1: clip_by_value; 2: clip_by_norm; 3:clip_by_global_norm + if use_grad_clip == 1: + fluid.clip.set_gradient_clip( + clip=fluid.clip.GradientClipByValue(2.0)) + elif use_grad_clip == 2: + fluid.clip.set_gradient_clip( + clip=fluid.clip.GradientClipByNorm(2.0)) + elif use_grad_clip == 3: + fluid.clip.set_gradient_clip( + clip=fluid.clip.GradientClipByGlobalNorm(2.0)) + optimizer = fluid.optimizer.SGD(LEARNING_RATE) optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer.minimize(avg_cost) @@ -102,6 +117,20 @@ class FleetDistRunnerBase(object): strategy = self.generate_strategy(args) avg_cost = self.net() + + use_grad_clip = int(os.getenv('GRAD_CLIP', 0)) + if use_grad_clip: + # 1: clip_by_value; 2: clip_by_norm; 3:clip_by_global_norm + if use_grad_clip == 1: + fluid.clip.set_gradient_clip( + clip=fluid.clip.GradientClipByValue(2.0)) + elif use_grad_clip == 2: + fluid.clip.set_gradient_clip( + clip=fluid.clip.GradientClipByNorm(2.0)) + elif use_grad_clip == 3: + fluid.clip.set_gradient_clip( + clip=fluid.clip.GradientClipByGlobalNorm(2.0)) + optimizer = fluid.optimizer.SGD(LEARNING_RATE) optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer.minimize(avg_cost) @@ -180,6 +209,7 @@ class TestFleetBase(unittest.TestCase): self._python_interp = sys.executable self._geo_sgd_need_push_nums = 5 + self._grad_clip_mode = 0 self._setup_config() def _find_free_port(self): @@ -233,7 +263,7 @@ class TestFleetBase(unittest.TestCase): return tr0_proc, tr1_proc, tr0_pipe, tr1_pipe def _run_cluster(self, model, envs): - env = {'CPU_NUM': '1'} + env = {'CPU_NUM': '1', 'GRAD_CLIP': str(self._grad_clip_mode)} env.update(envs) python_path = self._python_interp diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_grad_clip.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_grad_clip.py new file mode 100644 index 0000000000000000000000000000000000000000..34f4d8c542725a12f8e62f759f1ceb85a6744f7d --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_grad_clip.py @@ -0,0 +1,138 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import unittest +import paddle.fluid as fluid +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet +from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig +from test_dist_fleet_base import TestFleetBase +from dist_simnet_bow import train_network + + +class TestDistGeoClipByGlobalNormTranspiler(unittest.TestCase): + def test_pserver(self): + role = role_maker.UserDefinedRoleMaker( + current_id=0, + role=role_maker.Role.SERVER, + worker_num=2, + server_endpoints=["127.0.0.1:36011", "127.0.0.1:36012"]) + + fleet.init(role) + + batch_size = 128 + is_sparse = True + is_distribute = False + + strategy = DistributeTranspilerConfig() + strategy.sync_mode = False + strategy.geo_sgd_mode = True + strategy.geo_sgd_need_push_nums = 5 + + avg_cost, _, _ = train_network(batch_size, is_distribute, is_sparse) + fluid.clip.set_gradient_clip( + clip=fluid.clip.GradientClipByGlobalNorm(2.0)) + + optimizer = fluid.optimizer.SGD(0.1) + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(avg_cost) + + pserver_startup_program = fleet.startup_program + pserver_mian_program = fleet.main_program + + +class TestDistGeoClipByGlobalNorm(TestFleetBase): + def _setup_config(self): + self._mode = "geo" + self._reader = "dataset" + self._geo_sgd_need_push_nums = 5 + self._grad_clip_mode = 3 + + def check_with_place(self, + model_file, + delta=1e-3, + check_error_log=False, + need_envs={}): + required_envs = { + "PATH": os.getenv("PATH", ""), + "PYTHONPATH": os.getenv("PYTHONPATH", ""), + "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), + "FLAGS_rpc_deadline": "5000", # 5sec to fail fast + "http_proxy": "" + } + required_envs.update(need_envs) + + tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) + + def test_dist_train(self): + self.check_with_place( + "dist_fleet_ctr.py", delta=1e-5, check_error_log=True) + + def _setup_config(self): + self._sync_mode = False + self._grad_clip_mode = 2 + + def check_with_place(self, + model_file, + delta=1e-3, + check_error_log=False, + need_envs={}): + required_envs = { + "PATH": os.getenv("PATH", ""), + "PYTHONPATH": os.getenv("PYTHONPATH", ""), + "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), + "FLAGS_rpc_deadline": "5000", # 5sec to fail fast + "http_proxy": "" + } + required_envs.update(need_envs) + + tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) + + def test_dist_train(self): + self.check_with_place( + "dist_fleet_ctr.py", delta=1e-5, check_error_log=True) + + +class TestDistASyncClipByGlobalNorm(TestFleetBase): + def _setup_config(self): + self._mode = "async" + self._reader = "dataset" + self._grad_clip_mode = 3 + + def check_with_place(self, + model_file, + delta=1e-3, + check_error_log=False, + need_envs={}): + required_envs = { + "PATH": os.getenv("PATH", ""), + "PYTHONPATH": os.getenv("PYTHONPATH", ""), + "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), + "FLAGS_rpc_deadline": "5000", # 5sec to fail fast + "http_proxy": "" + } + required_envs.update(need_envs) + + tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) + + def test_dist_train(self): + self.check_with_place( + "dist_fleet_ctr.py", delta=1e-5, check_error_log=True) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index dbc63d5cd675cbbd4b0d840f815f6ae0f3cb52c5..ebdfde88e2ef977cd8203a0d207cbc3d7977bfde 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -52,6 +52,8 @@ from . import collective LOOKUP_TABLE_TYPE = "lookup_table" LOOKUP_TABLE_GRAD_TYPE = "lookup_table_grad" +OP_NAME_SCOPE = "op_namescope" +CLIP_OP_NAME_SCOPE = "@CLIP" OP_ROLE_VAR_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleVarAttrName() RPC_OP_ROLE_ATTR_NAME = op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName( ) @@ -2608,6 +2610,16 @@ class DistributeTranspiler(object): origin_var_dict = self.origin_program.global_block().vars for op in block.ops: if self._is_opt_role_op(op): + # Todo(chengmo): Whether clip related op belongs to Optimize guard should be discussed + # delete clip op from opt_ops when run in Parameter Server mode + if OP_NAME_SCOPE in op.all_attrs( + ) and CLIP_OP_NAME_SCOPE in op.attr( + OP_NAME_SCOPE + ) and self.config.mode != "nccl2" and self.config.mode != "collective": + op._set_attr( + "op_role", + int(core.op_proto_and_checker_maker.OpRole.Backward)) + continue opt_ops.append(op) if op.attr(OP_ROLE_VAR_ATTR_NAME): param_name = op.attr(OP_ROLE_VAR_ATTR_NAME)[0]