From ceea195ee343c1b931b22596c6e59885404a229c Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Tue, 16 Oct 2018 18:37:06 +0800 Subject: [PATCH] fill constant add infervarshape, lookuptable clone lr var (#13830) * fill constant add infervarshape, lookuptable clone lr var * add lookuptable ut * bug fix in transpliler about async with lookup table * test=release/1.0.0 --- paddle/fluid/operators/fill_constant_op.cc | 9 ++- .../fluid/tests/unittests/dist_simnet_bow.py | 22 ++++-- .../tests/unittests/test_dist_simnet_bow.py | 78 ++++++++++++++++++- .../fluid/transpiler/distribute_transpiler.py | 24 +++--- 4 files changed, 113 insertions(+), 20 deletions(-) diff --git a/paddle/fluid/operators/fill_constant_op.cc b/paddle/fluid/operators/fill_constant_op.cc index 2826b82117..e04a68717b 100644 --- a/paddle/fluid/operators/fill_constant_op.cc +++ b/paddle/fluid/operators/fill_constant_op.cc @@ -70,6 +70,12 @@ class FillConstantOp : public framework::OperatorBase { } }; +class FillConstantOpVarTypeInference : public framework::VarTypeInference { + public: + void operator()(const framework::OpDesc &op_desc, + framework::BlockDesc *block) const override {} +}; + class FillConstantOpMaker : public framework::OpProtoAndCheckerMaker { public: void Make() override { @@ -102,4 +108,5 @@ Fill up a variable with specified constant value. namespace ops = paddle::operators; REGISTER_OPERATOR(fill_constant, ops::FillConstantOp, ops::FillConstantInferShape, ops::FillConstantOpMaker, - paddle::framework::EmptyGradOpMaker); + paddle::framework::EmptyGradOpMaker, + ops::FillConstantOpVarTypeInference); diff --git a/python/paddle/fluid/tests/unittests/dist_simnet_bow.py b/python/paddle/fluid/tests/unittests/dist_simnet_bow.py index 6456d1b53a..fac5e037a4 100644 --- a/python/paddle/fluid/tests/unittests/dist_simnet_bow.py +++ b/python/paddle/fluid/tests/unittests/dist_simnet_bow.py @@ -81,7 +81,10 @@ def get_optimizer(): return optimizer -def train_network(batch_size, is_distributed=False, is_sparse=False): +def train_network(batch_size, + is_distributed=False, + is_sparse=False, + is_self_contained_lr=False): # query q = fluid.layers.data( name="query_ids", shape=[1], dtype="int64", lod_level=1) @@ -93,7 +96,9 @@ def train_network(batch_size, is_distributed=False, is_sparse=False): param_attr=fluid.ParamAttr( initializer=fluid.initializer.Constant(value=0.01), name="__emb__", - learning_rate=emb_lr), + learning_rate=emb_lr) if is_self_contained_lr else fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__emb__"), is_sparse=is_sparse) ## vsum q_sum = fluid.layers.sequence_pool(input=q_emb, pool_type='sum') @@ -119,7 +124,9 @@ def train_network(batch_size, is_distributed=False, is_sparse=False): param_attr=fluid.ParamAttr( initializer=fluid.initializer.Constant(value=0.01), name="__emb__", - learning_rate=emb_lr), + learning_rate=emb_lr) if is_self_contained_lr else fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__emb__"), is_sparse=is_sparse) ## vsum pt_sum = fluid.layers.sequence_pool(input=pt_emb, pool_type='sum') @@ -144,7 +151,9 @@ def train_network(batch_size, is_distributed=False, is_sparse=False): param_attr=fluid.ParamAttr( initializer=fluid.initializer.Constant(value=0.01), name="__emb__", - learning_rate=emb_lr), + learning_rate=emb_lr) if is_self_contained_lr else fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01), + name="__emb__"), is_sparse=is_sparse) ## vsum nt_sum = fluid.layers.sequence_pool(input=nt_emb, pool_type='sum') @@ -220,7 +229,10 @@ class TestDistSimnetBow2x2(TestDistRunnerBase): def get_model(self, batch_size=2): # Train program avg_cost, acc, predict = \ - train_network(batch_size, bool(int(os.environ["IS_DISTRIBUTED"])), bool(int(os.environ["IS_SPARSE"]))) + train_network(batch_size, + bool(int(os.environ["IS_DISTRIBUTED"])), + bool(int(os.environ["IS_SPARSE"])), + bool(int(os.environ["IS_SELF_CONTAINED_LR"]))) inference_program = fluid.default_main_program().clone() diff --git a/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py b/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py index e971f29db4..11095f2359 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py +++ b/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py @@ -25,7 +25,11 @@ class TestDistSimnetBowDense2x2(TestDistBase): self._enforce_place = "CPU" def test_simnet_bow(self): - need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '0'} + need_envs = { + "IS_DISTRIBUTED": '0', + "IS_SPARSE": '0', + 'IS_SELF_CONTAINED_LR': '1' + } self.check_with_place( "dist_simnet_bow.py", delta=1e-5, @@ -39,7 +43,11 @@ class TestDistSimnetBow2x2DenseAsync(TestDistBase): self._enforce_place = "CPU" def test_simnet_bow(self): - need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '0'} + need_envs = { + "IS_DISTRIBUTED": '0', + "IS_SPARSE": '0', + 'IS_SELF_CONTAINED_LR': '1' + } self.check_with_place( "dist_simnet_bow.py", delta=100, @@ -53,7 +61,11 @@ class TestDistSimnetBowSparse2x2(TestDistBase): self._enforce_place = "CPU" def test_simnet_bow(self): - need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '1'} + need_envs = { + "IS_DISTRIBUTED": '0', + "IS_SPARSE": '1', + 'IS_SELF_CONTAINED_LR': '1' + } self.check_with_place( "dist_simnet_bow.py", delta=1e-5, @@ -67,7 +79,11 @@ class TestDistSimnetBow2x2SparseAsync(TestDistBase): self._enforce_place = "CPU" def test_simnet_bow(self): - need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '1'} + need_envs = { + "IS_DISTRIBUTED": '0', + "IS_SPARSE": '1', + 'IS_SELF_CONTAINED_LR': '1' + } self.check_with_place( "dist_simnet_bow.py", delta=100, @@ -75,5 +91,59 @@ class TestDistSimnetBow2x2SparseAsync(TestDistBase): need_envs=need_envs) +class TestDistSimnetBow2x2LookupTableSync(TestDistBase): + def _setup_config(self): + self._sync_mode = True + self._enforce_place = "CPU" + + def test_simnet_bow(self): + need_envs = { + "IS_DISTRIBUTED": '1', + "IS_SPARSE": '1', + 'IS_SELF_CONTAINED_LR': '1' + } + self.check_with_place( + "dist_simnet_bow.py", + delta=1e-5, + check_error_log=False, + need_envs=need_envs) + + +class TestDistSimnetBow2x2LookupTableAsync(TestDistBase): + def _setup_config(self): + self._sync_mode = False + self._enforce_place = "CPU" + + def test_simnet_bow(self): + need_envs = { + "IS_DISTRIBUTED": '1', + "IS_SPARSE": '1', + 'IS_SELF_CONTAINED_LR': '1' + } + self.check_with_place( + "dist_simnet_bow.py", + delta=100, + check_error_log=False, + need_envs=need_envs) + + +class TestDistSimnetBow2x2LookupTableNotContainLRSync(TestDistBase): + def _setup_config(self): + self._sync_mode = True + self._enforce_place = "CPU" + + def test_simnet_bow(self): + need_envs = { + "IS_DISTRIBUTED": '1', + "IS_SPARSE": '1', + 'IS_SELF_CONTAINED_LR': '0' + } + self.check_with_place( + "dist_simnet_bow.py", + delta=1e-5, + check_error_log=False, + need_envs=need_envs) + + if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index ecdbe27f4d..0421c824a8 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -1118,6 +1118,7 @@ to transpile() call.") def _split_table_grad_and_add_send_vars(self, program, pserver_endpoints): # 2. add split_ids_op and send_op to send gradient to pservers + # there should only be one table_name all_ops = program.global_block().ops table_grad_name = grad_var_name(self.table_name) @@ -1142,7 +1143,7 @@ to transpile() call.") if self.sync_mode else [] }, attrs={ - "sync_mode": self.sync_mode, + "sync_mode": not self.sync_mode, "epmap": pserver_endpoints, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE, OP_ROLE_VAR_ATTR_NAME: [ @@ -1188,7 +1189,15 @@ to transpile() call.") def _create_table_optimize_block(self, pserver_index, pserver_program, pre_block_idx, grad_to_block_id): # STEP: create table optimize block + table_opt_block = pserver_program._create_block(pre_block_idx) # create table param and grad var in pserver program + # create table optimize block in pserver program + table_opt_op = [ + op for op in self.optimize_ops + if 'Param' in op.input_names and op.input("Param")[0] == + self.table_name + ][0] + origin_param_var = self.origin_program.global_block().vars[ self.table_name] @@ -1204,19 +1213,16 @@ to transpile() call.") dtype=origin_param_var.dtype, type=core.VarDesc.VarType.SELECTED_ROWS, persistable=True) + # parameter must be selected rows param_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS) grad_var = pserver_program.global_block()._clone_variable( self.origin_program.global_block().vars[grad_var_name( self.table_name)]) - # create table optimize block in pserver program - table_opt_op = [ - op for op in self.optimize_ops - if 'Param' in op.input_names and op.input("Param")[0] == - self.table_name - ][0] - table_opt_block = pserver_program._create_block(pre_block_idx) + lr_var = pserver_program.global_block()._clone_variable( + self.origin_program.global_block().vars[table_opt_op.input( + "LearningRate")[0]]) if self.sync_mode: # create grad vars in pserver program @@ -1248,8 +1254,6 @@ to transpile() call.") grad_var = pserver_program.global_block()._rename_var( origin_grad_name, splited_grad_name) - lr_var = pserver_program.global_block().vars[table_opt_op.input( - "LearningRate")[0]] inputs = { "Param": [param_var], "Grad": [grad_var], -- GitLab