diff --git a/paddle/fluid/operators/fill_constant_op.cc b/paddle/fluid/operators/fill_constant_op.cc index 2826b82117db113d4d8c10095e89f610ca895775..e04a68717b351ddb0be5a7e70aa9297e5eb0125f 100644 --- a/paddle/fluid/operators/fill_constant_op.cc +++ b/paddle/fluid/operators/fill_constant_op.cc @@ -70,6 +70,12 @@ class FillConstantOp : public framework::OperatorBase { } }; +class FillConstantOpVarTypeInference : public framework::VarTypeInference { + public: + void operator()(const framework::OpDesc &op_desc, + framework::BlockDesc *block) const override {} +}; + class FillConstantOpMaker : public framework::OpProtoAndCheckerMaker { public: void Make() override { @@ -102,4 +108,5 @@ Fill up a variable with specified constant value. namespace ops = paddle::operators; REGISTER_OPERATOR(fill_constant, ops::FillConstantOp, ops::FillConstantInferShape, ops::FillConstantOpMaker, - paddle::framework::EmptyGradOpMaker); + paddle::framework::EmptyGradOpMaker, + ops::FillConstantOpVarTypeInference); diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 5f3111f363ccc14de4dd3f067097a19eabb83662..b07d0131a32c3f2744854a17b180ae714d532f80 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -1522,13 +1522,17 @@ class Program(object): >>> with program.lr_schedule_guard(): >>> lr = lr * decay """ + + tmp_role = self._current_role + tmp_var = self._op_role_var + OpRole = core.op_proto_and_checker_maker.OpRole self._current_role = OpRole.LRSched # TODO(typhoonzero): how to set target learning rate var self._op_role_var = [] yield - self._op_role_var = [] - self._current_role = OpRole.Forward + self._op_role_var = tmp_var + self._current_role = tmp_role def __str__(self): """ diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 73e3a43af4b98ee8e83a208c397d59e9025b8420..fb6770f025a5ed51856d7cf5f2c24c8c3f682edb 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -15,7 +15,7 @@ from __future__ import print_function import re from collections import defaultdict -from paddle.fluid.framework import Program, Variable, name_scope +from paddle.fluid.framework import Program, Variable, name_scope, default_main_program from . import framework from . import layers from .backward import append_backward @@ -111,7 +111,8 @@ class Optimizer(object): if param_lr == 1.0: return self._global_learning_rate() else: - return self._global_learning_rate() * param_lr + with default_main_program()._lr_schedule_guard(): + return self._global_learning_rate() * param_lr def _create_accumulators(self, block, parameters): """Create all accumulators needed by the parameters diff --git a/python/paddle/fluid/tests/unittests/dist_simnet_bow.py b/python/paddle/fluid/tests/unittests/dist_simnet_bow.py index 6456d1b53a129db04ace7ff4413a3d76e922ccde..fac5e037a46715d146e354825f09ee8ccc4f3d70 100644 --- a/python/paddle/fluid/tests/unittests/dist_simnet_bow.py +++ b/python/paddle/fluid/tests/unittests/dist_simnet_bow.py @@ -81,7 +81,10 @@ def get_optimizer(): return optimizer -def train_network(batch_size, is_distributed=False, is_sparse=False): +def train_network(batch_size, + is_distributed=False, + is_sparse=False, + is_self_contained_lr=False): # query q = fluid.layers.data( name="query_ids", shape=[1], dtype="int64", lod_level=1) @@ -93,7 +96,9 @@ def train_network(batch_size, is_distributed=False, is_sparse=False): param_attr=fluid.ParamAttr( initializer=fluid.initializer.Constant(value=0.01), name="__emb__", - learning_rate=emb_lr), + learning_rate=emb_lr) if is_self_contained_lr else fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__emb__"), is_sparse=is_sparse) ## vsum q_sum = fluid.layers.sequence_pool(input=q_emb, pool_type='sum') @@ -119,7 +124,9 @@ def train_network(batch_size, is_distributed=False, is_sparse=False): param_attr=fluid.ParamAttr( initializer=fluid.initializer.Constant(value=0.01), name="__emb__", - learning_rate=emb_lr), + learning_rate=emb_lr) if is_self_contained_lr else fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__emb__"), is_sparse=is_sparse) ## vsum pt_sum = fluid.layers.sequence_pool(input=pt_emb, pool_type='sum') @@ -144,7 +151,9 @@ def train_network(batch_size, is_distributed=False, is_sparse=False): param_attr=fluid.ParamAttr( initializer=fluid.initializer.Constant(value=0.01), name="__emb__", - learning_rate=emb_lr), + learning_rate=emb_lr) if is_self_contained_lr else fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__emb__"), is_sparse=is_sparse) ## vsum nt_sum = fluid.layers.sequence_pool(input=nt_emb, pool_type='sum') @@ -220,7 +229,10 @@ class TestDistSimnetBow2x2(TestDistRunnerBase): def get_model(self, batch_size=2): # Train program avg_cost, acc, predict = \ - train_network(batch_size, bool(int(os.environ["IS_DISTRIBUTED"])), bool(int(os.environ["IS_SPARSE"]))) + train_network(batch_size, + bool(int(os.environ["IS_DISTRIBUTED"])), + bool(int(os.environ["IS_SPARSE"])), + bool(int(os.environ["IS_SELF_CONTAINED_LR"]))) inference_program = fluid.default_main_program().clone() diff --git a/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py b/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py index e971f29db42a7c1a2394505a8ece3d2fd6b347e9..11095f23591edc41a82962149a52096fa17cfb93 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py +++ b/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py @@ -25,7 +25,11 @@ class TestDistSimnetBowDense2x2(TestDistBase): self._enforce_place = "CPU" def test_simnet_bow(self): - need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '0'} + need_envs = { + "IS_DISTRIBUTED": '0', + "IS_SPARSE": '0', + 'IS_SELF_CONTAINED_LR': '1' + } self.check_with_place( "dist_simnet_bow.py", delta=1e-5, @@ -39,7 +43,11 @@ class TestDistSimnetBow2x2DenseAsync(TestDistBase): self._enforce_place = "CPU" def test_simnet_bow(self): - need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '0'} + need_envs = { + "IS_DISTRIBUTED": '0', + "IS_SPARSE": '0', + 'IS_SELF_CONTAINED_LR': '1' + } self.check_with_place( "dist_simnet_bow.py", delta=100, @@ -53,7 +61,11 @@ class TestDistSimnetBowSparse2x2(TestDistBase): self._enforce_place = "CPU" def test_simnet_bow(self): - need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '1'} + need_envs = { + "IS_DISTRIBUTED": '0', + "IS_SPARSE": '1', + 'IS_SELF_CONTAINED_LR': '1' + } self.check_with_place( "dist_simnet_bow.py", delta=1e-5, @@ -67,7 +79,11 @@ class TestDistSimnetBow2x2SparseAsync(TestDistBase): self._enforce_place = "CPU" def test_simnet_bow(self): - need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '1'} + need_envs = { + "IS_DISTRIBUTED": '0', + "IS_SPARSE": '1', + 'IS_SELF_CONTAINED_LR': '1' + } self.check_with_place( "dist_simnet_bow.py", delta=100, @@ -75,5 +91,59 @@ class TestDistSimnetBow2x2SparseAsync(TestDistBase): need_envs=need_envs) +class TestDistSimnetBow2x2LookupTableSync(TestDistBase): + def _setup_config(self): + self._sync_mode = True + self._enforce_place = "CPU" + + def test_simnet_bow(self): + need_envs = { + "IS_DISTRIBUTED": '1', + "IS_SPARSE": '1', + 'IS_SELF_CONTAINED_LR': '1' + } + self.check_with_place( + "dist_simnet_bow.py", + delta=1e-5, + check_error_log=False, + need_envs=need_envs) + + +class TestDistSimnetBow2x2LookupTableAsync(TestDistBase): + def _setup_config(self): + self._sync_mode = False + self._enforce_place = "CPU" + + def test_simnet_bow(self): + need_envs = { + "IS_DISTRIBUTED": '1', + "IS_SPARSE": '1', + 'IS_SELF_CONTAINED_LR': '1' + } + self.check_with_place( + "dist_simnet_bow.py", + delta=100, + check_error_log=False, + need_envs=need_envs) + + +class TestDistSimnetBow2x2LookupTableNotContainLRSync(TestDistBase): + def _setup_config(self): + self._sync_mode = True + self._enforce_place = "CPU" + + def test_simnet_bow(self): + need_envs = { + "IS_DISTRIBUTED": '1', + "IS_SPARSE": '1', + 'IS_SELF_CONTAINED_LR': '0' + } + self.check_with_place( + "dist_simnet_bow.py", + delta=1e-5, + check_error_log=False, + need_envs=need_envs) + + if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index ecdbe27f4d90268d755a712e25289cfaf4715f29..0421c824a8642a743c14d81c0238ef97832e58b4 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -1118,6 +1118,7 @@ to transpile() call.") def _split_table_grad_and_add_send_vars(self, program, pserver_endpoints): # 2. add split_ids_op and send_op to send gradient to pservers + # there should only be one table_name all_ops = program.global_block().ops table_grad_name = grad_var_name(self.table_name) @@ -1142,7 +1143,7 @@ to transpile() call.") if self.sync_mode else [] }, attrs={ - "sync_mode": self.sync_mode, + "sync_mode": not self.sync_mode, "epmap": pserver_endpoints, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE, OP_ROLE_VAR_ATTR_NAME: [ @@ -1188,7 +1189,15 @@ to transpile() call.") def _create_table_optimize_block(self, pserver_index, pserver_program, pre_block_idx, grad_to_block_id): # STEP: create table optimize block + table_opt_block = pserver_program._create_block(pre_block_idx) # create table param and grad var in pserver program + # create table optimize block in pserver program + table_opt_op = [ + op for op in self.optimize_ops + if 'Param' in op.input_names and op.input("Param")[0] == + self.table_name + ][0] + origin_param_var = self.origin_program.global_block().vars[ self.table_name] @@ -1204,19 +1213,16 @@ to transpile() call.") dtype=origin_param_var.dtype, type=core.VarDesc.VarType.SELECTED_ROWS, persistable=True) + # parameter must be selected rows param_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS) grad_var = pserver_program.global_block()._clone_variable( self.origin_program.global_block().vars[grad_var_name( self.table_name)]) - # create table optimize block in pserver program - table_opt_op = [ - op for op in self.optimize_ops - if 'Param' in op.input_names and op.input("Param")[0] == - self.table_name - ][0] - table_opt_block = pserver_program._create_block(pre_block_idx) + lr_var = pserver_program.global_block()._clone_variable( + self.origin_program.global_block().vars[table_opt_op.input( + "LearningRate")[0]]) if self.sync_mode: # create grad vars in pserver program @@ -1248,8 +1254,6 @@ to transpile() call.") grad_var = pserver_program.global_block()._rename_var( origin_grad_name, splited_grad_name) - lr_var = pserver_program.global_block().vars[table_opt_op.input( - "LearningRate")[0]] inputs = { "Param": [param_var], "Grad": [grad_var],