未验证 提交 fa2ab334 编写于 作者: T tangwei12 提交者: GitHub

fill constant add infervarshape, lookuptable clone lr var (#13830)

* fill constant add infervarshape, lookuptable clone lr var

* test=develop

* add lookuptable ut, test=develop

* bug fix in transpliler about async with lookup table

* test=develop
上级 7a751b83
......@@ -70,6 +70,12 @@ class FillConstantOp : public framework::OperatorBase {
}
};
class FillConstantOpVarTypeInference : public framework::VarTypeInference {
public:
void operator()(const framework::OpDesc &op_desc,
framework::BlockDesc *block) const override {}
};
class FillConstantOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
......@@ -102,4 +108,5 @@ Fill up a variable with specified constant value.
namespace ops = paddle::operators;
REGISTER_OPERATOR(fill_constant, ops::FillConstantOp,
ops::FillConstantInferShape, ops::FillConstantOpMaker,
paddle::framework::EmptyGradOpMaker);
paddle::framework::EmptyGradOpMaker,
ops::FillConstantOpVarTypeInference);
......@@ -81,7 +81,10 @@ def get_optimizer():
return optimizer
def train_network(batch_size, is_distributed=False, is_sparse=False):
def train_network(batch_size,
is_distributed=False,
is_sparse=False,
is_self_contained_lr=False):
# query
q = fluid.layers.data(
name="query_ids", shape=[1], dtype="int64", lod_level=1)
......@@ -93,7 +96,9 @@ def train_network(batch_size, is_distributed=False, is_sparse=False):
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__emb__",
learning_rate=emb_lr),
learning_rate=emb_lr) if is_self_contained_lr else fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__emb__"),
is_sparse=is_sparse)
## vsum
q_sum = fluid.layers.sequence_pool(input=q_emb, pool_type='sum')
......@@ -119,7 +124,9 @@ def train_network(batch_size, is_distributed=False, is_sparse=False):
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__emb__",
learning_rate=emb_lr),
learning_rate=emb_lr) if is_self_contained_lr else fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__emb__"),
is_sparse=is_sparse)
## vsum
pt_sum = fluid.layers.sequence_pool(input=pt_emb, pool_type='sum')
......@@ -144,7 +151,9 @@ def train_network(batch_size, is_distributed=False, is_sparse=False):
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__emb__",
learning_rate=emb_lr),
learning_rate=emb_lr) if is_self_contained_lr else fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01),
name="__emb__"),
is_sparse=is_sparse)
## vsum
nt_sum = fluid.layers.sequence_pool(input=nt_emb, pool_type='sum')
......@@ -220,7 +229,10 @@ class TestDistSimnetBow2x2(TestDistRunnerBase):
def get_model(self, batch_size=2):
# Train program
avg_cost, acc, predict = \
train_network(batch_size, bool(int(os.environ["IS_DISTRIBUTED"])), bool(int(os.environ["IS_SPARSE"])))
train_network(batch_size,
bool(int(os.environ["IS_DISTRIBUTED"])),
bool(int(os.environ["IS_SPARSE"])),
bool(int(os.environ["IS_SELF_CONTAINED_LR"])))
inference_program = fluid.default_main_program().clone()
......
......@@ -25,7 +25,11 @@ class TestDistSimnetBowDense2x2(TestDistBase):
self._enforce_place = "CPU"
def test_simnet_bow(self):
need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '0'}
need_envs = {
"IS_DISTRIBUTED": '0',
"IS_SPARSE": '0',
'IS_SELF_CONTAINED_LR': '1'
}
self.check_with_place(
"dist_simnet_bow.py",
delta=1e-5,
......@@ -39,7 +43,11 @@ class TestDistSimnetBow2x2DenseAsync(TestDistBase):
self._enforce_place = "CPU"
def test_simnet_bow(self):
need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '0'}
need_envs = {
"IS_DISTRIBUTED": '0',
"IS_SPARSE": '0',
'IS_SELF_CONTAINED_LR': '1'
}
self.check_with_place(
"dist_simnet_bow.py",
delta=100,
......@@ -53,7 +61,11 @@ class TestDistSimnetBowSparse2x2(TestDistBase):
self._enforce_place = "CPU"
def test_simnet_bow(self):
need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '1'}
need_envs = {
"IS_DISTRIBUTED": '0',
"IS_SPARSE": '1',
'IS_SELF_CONTAINED_LR': '1'
}
self.check_with_place(
"dist_simnet_bow.py",
delta=1e-5,
......@@ -67,7 +79,11 @@ class TestDistSimnetBow2x2SparseAsync(TestDistBase):
self._enforce_place = "CPU"
def test_simnet_bow(self):
need_envs = {"IS_DISTRIBUTED": '0', "IS_SPARSE": '1'}
need_envs = {
"IS_DISTRIBUTED": '0',
"IS_SPARSE": '1',
'IS_SELF_CONTAINED_LR': '1'
}
self.check_with_place(
"dist_simnet_bow.py",
delta=100,
......@@ -75,5 +91,59 @@ class TestDistSimnetBow2x2SparseAsync(TestDistBase):
need_envs=need_envs)
class TestDistSimnetBow2x2LookupTableSync(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._enforce_place = "CPU"
def test_simnet_bow(self):
need_envs = {
"IS_DISTRIBUTED": '1',
"IS_SPARSE": '1',
'IS_SELF_CONTAINED_LR': '1'
}
self.check_with_place(
"dist_simnet_bow.py",
delta=1e-5,
check_error_log=False,
need_envs=need_envs)
class TestDistSimnetBow2x2LookupTableAsync(TestDistBase):
def _setup_config(self):
self._sync_mode = False
self._enforce_place = "CPU"
def test_simnet_bow(self):
need_envs = {
"IS_DISTRIBUTED": '1',
"IS_SPARSE": '1',
'IS_SELF_CONTAINED_LR': '1'
}
self.check_with_place(
"dist_simnet_bow.py",
delta=100,
check_error_log=False,
need_envs=need_envs)
class TestDistSimnetBow2x2LookupTableNotContainLRSync(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._enforce_place = "CPU"
def test_simnet_bow(self):
need_envs = {
"IS_DISTRIBUTED": '1',
"IS_SPARSE": '1',
'IS_SELF_CONTAINED_LR': '0'
}
self.check_with_place(
"dist_simnet_bow.py",
delta=1e-5,
check_error_log=False,
need_envs=need_envs)
if __name__ == "__main__":
unittest.main()
......@@ -1119,6 +1119,7 @@ to transpile() call.")
def _split_table_grad_and_add_send_vars(self, program, pserver_endpoints):
# 2. add split_ids_op and send_op to send gradient to pservers
# there should only be one table_name
all_ops = program.global_block().ops
table_grad_name = grad_var_name(self.table_name)
......@@ -1143,7 +1144,7 @@ to transpile() call.")
if self.sync_mode else []
},
attrs={
"sync_mode": self.sync_mode,
"sync_mode": not self.sync_mode,
"epmap": pserver_endpoints,
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE,
OP_ROLE_VAR_ATTR_NAME: [
......@@ -1189,7 +1190,15 @@ to transpile() call.")
def _create_table_optimize_block(self, pserver_index, pserver_program,
pre_block_idx, grad_to_block_id):
# STEP: create table optimize block
table_opt_block = pserver_program._create_block(pre_block_idx)
# create table param and grad var in pserver program
# create table optimize block in pserver program
table_opt_op = [
op for op in self.optimize_ops
if 'Param' in op.input_names and op.input("Param")[0] ==
self.table_name
][0]
origin_param_var = self.origin_program.global_block().vars[
self.table_name]
......@@ -1205,19 +1214,16 @@ to transpile() call.")
dtype=origin_param_var.dtype,
type=core.VarDesc.VarType.SELECTED_ROWS,
persistable=True)
# parameter must be selected rows
param_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS)
grad_var = pserver_program.global_block()._clone_variable(
self.origin_program.global_block().vars[grad_var_name(
self.table_name)])
# create table optimize block in pserver program
table_opt_op = [
op for op in self.optimize_ops
if 'Param' in op.input_names and op.input("Param")[0] ==
self.table_name
][0]
table_opt_block = pserver_program._create_block(pre_block_idx)
lr_var = pserver_program.global_block()._clone_variable(
self.origin_program.global_block().vars[table_opt_op.input(
"LearningRate")[0]])
if self.sync_mode:
# create grad vars in pserver program
......@@ -1249,8 +1255,6 @@ to transpile() call.")
grad_var = pserver_program.global_block()._rename_var(
origin_grad_name, splited_grad_name)
lr_var = pserver_program.global_block().vars[table_opt_op.input(
"LearningRate")[0]]
inputs = {
"Param": [param_var],
"Grad": [grad_var],
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册