未验证 提交 7970ab96 编写于 作者: Q Qiao Longfei 提交者: GitHub

Merge pull request #12544 from jacquesqiao/dist-lookup-table-only-support-sgd

dist lookup table only support sgd
...@@ -359,5 +359,110 @@ class TestL2DecayWithPiecewise(TranspilerTest): ...@@ -359,5 +359,110 @@ class TestL2DecayWithPiecewise(TranspilerTest):
["sum", "scale", "scale", "elementwise_add", "momentum"]) ["sum", "scale", "scale", "elementwise_add", "momentum"])
class TestDistLookupTableBase(TranspilerTest):
def network_with_table(self, is_sparse, is_distributed):
def emb_pool(ids):
table_size = 1000
emb_size = 64
emb = fluid.layers.embedding(
input=ids,
size=[table_size, emb_size],
dtype='float32',
param_attr='shared_w', # share parameter
is_sparse=is_sparse,
is_distributed=is_distributed)
pool = fluid.layers.sequence_pool(input=emb, pool_type='average')
return pool
title_ids = fluid.layers.data(
name='title_ids', shape=[1], dtype='int64', lod_level=1)
brand_ids = fluid.layers.data(
name='brand_ids', shape=[1], dtype='int64', lod_level=1)
title_emb = emb_pool(title_ids)
brand_emb = emb_pool(brand_ids)
fc0 = fluid.layers.concat(input=[title_emb, brand_emb], axis=1)
predict = fluid.layers.fc(input=fc0,
size=2,
act=None,
param_attr=fluid.ParamAttr(name='fc_w'),
bias_attr=fluid.ParamAttr(name='fc_b'))
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(cost)
optimizer = fluid.optimizer.Adam(learning_rate=0.003)
optimizer.minimize(avg_cost)
class TestLocalLookupTable(TestDistLookupTableBase):
def net_conf(self):
self.network_with_table(is_sparse=True, is_distributed=False)
def transpiler_test_impl(self):
pserver1, startup1 = self.get_pserver(self.pserver1_ep)
self.assertEqual(len(pserver1.blocks), 3)
# 0 listen_and_serv
# 1 optimize for fc_w or fc_b adam
self.assertEqual([op.type for op in pserver1.blocks[1].ops],
["sum", "scale", "adam", "scale", "scale"])
# 2 optimize for table adam
# NOTE: if param is not selected rows, the grad will scaled to grad / trainer_num
self.assertEqual([op.type for op in pserver1.blocks[2].ops],
["sum", "adam", "scale", "scale"])
trainer = self.get_trainer()
self.assertEqual(len(trainer.blocks), 1)
ops = [
'lookup_table', 'sequence_pool', 'lookup_table', 'sequence_pool',
'concat', 'mul', 'elementwise_add', 'cross_entropy', 'mean',
'fill_constant', 'mean_grad', 'cross_entropy_grad',
'elementwise_add_grad', 'send', 'mul_grad', 'send', 'concat_grad',
'sequence_pool_grad', 'lookup_table_grad', 'sequence_pool_grad',
'lookup_table_grad', 'sum', 'split_selected_rows', 'send',
'send_barrier', 'recv', 'recv', 'recv', 'fetch_barrier', 'concat'
]
self.assertEqual([op.type for op in trainer.blocks[0].ops], ops)
class TestDistLookupTable(TestDistLookupTableBase):
def net_conf(self):
self.network_with_table(is_sparse=True, is_distributed=True)
def transpiler_test_impl(self):
pserver1, startup1 = self.get_pserver(self.pserver1_ep)
self.assertEqual(len(pserver1.blocks), 6)
# 0 listen_and_serv
# 1 optimize for fc_w or fc_b adam
self.assertEqual([op.type for op in pserver1.blocks[1].ops],
["sum", "scale", "adam", "scale", "scale"])
# 2 optimize for table sgd
self.assertEqual([op.type for op in pserver1.blocks[2].ops],
["sum", "sgd"])
# 3 prefetch -> lookup_sparse_table for data0
self.assertEqual([op.type for op in pserver1.blocks[3].ops],
["lookup_sparse_table"])
# 4 prefetch -> lookup_sparse_table for data1
self.assertEqual([op.type for op in pserver1.blocks[4].ops],
["lookup_sparse_table"])
# 5 save table
self.assertEqual([op.type for op in pserver1.blocks[5].ops], ["save"])
trainer = self.get_trainer()
self.assertEqual(len(trainer.blocks), 1)
ops = [
'split_ids', 'prefetch', 'merge_ids', 'sequence_pool', 'split_ids',
'prefetch', 'merge_ids', 'sequence_pool', 'concat', 'mul',
'elementwise_add', 'cross_entropy', 'mean', 'fill_constant',
'mean_grad', 'cross_entropy_grad', 'elementwise_add_grad', 'send',
'mul_grad', 'send', 'concat_grad', 'sequence_pool_grad',
'lookup_table_grad', 'sequence_pool_grad', 'lookup_table_grad',
'sum', 'split_ids', 'send', 'send_barrier', 'recv', 'recv',
'fetch_barrier'
]
self.assertEqual([op.type for op in trainer.blocks[0].ops], ops)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -896,8 +896,6 @@ class DistributeTranspiler(object): ...@@ -896,8 +896,6 @@ class DistributeTranspiler(object):
self.table_name self.table_name
][0] ][0]
table_opt_block = pserver_program.create_block(pre_block_idx) table_opt_block = pserver_program.create_block(pre_block_idx)
# only support sgd now
assert table_opt_op.type == "sgd"
if self.sync_mode: if self.sync_mode:
# create grad vars in pserver program # create grad vars in pserver program
...@@ -937,11 +935,12 @@ class DistributeTranspiler(object): ...@@ -937,11 +935,12 @@ class DistributeTranspiler(object):
"LearningRate": [lr_var] "LearningRate": [lr_var]
} }
outputs = {"ParamOut": [param_var]} outputs = {"ParamOut": [param_var]}
table_opt_block.append_op( # only support sgd now
type=table_opt_op.type, import logging
inputs=inputs, logging.warn(
outputs=outputs, "distribute lookup table only support sgd optimizer, change it's optimizer to sgd instead of "
attrs=table_opt_op.attrs) + table_opt_op.type)
table_opt_block.append_op(type="sgd", inputs=inputs, outputs=outputs)
# add table parameter gradient and it's block id to grad_to_block_id # add table parameter gradient and it's block id to grad_to_block_id
grad_to_block_id.append(grad_var.name + ":" + str(table_opt_block.idx)) grad_to_block_id.append(grad_var.name + ":" + str(table_opt_block.idx))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册