From 418abc92f445c8ef04578a1f3231799cb3db9bc7 Mon Sep 17 00:00:00 2001 From: Chengmo Date: Tue, 7 Jan 2020 16:10:19 +0800 Subject: [PATCH] Update pyramid related OP (#21372) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add special way to add distribute vars, Update Pyramid hash op --- paddle/fluid/operators/pyramid_hash_op.cc | 7 +- python/paddle/fluid/contrib/layers/nn.py | 28 ++++++- .../fluid/tests/unittests/CMakeLists.txt | 1 + .../unittests/test_fleet_pyramid_hash.py | 76 +++++++++++++++++++ .../tests/unittests/test_pyramid_hash_op.py | 4 + .../fluid/transpiler/distribute_transpiler.py | 31 ++++++++ .../fluid/transpiler/geo_sgd_transpiler.py | 11 ++- 7 files changed, 151 insertions(+), 7 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/test_fleet_pyramid_hash.py diff --git a/paddle/fluid/operators/pyramid_hash_op.cc b/paddle/fluid/operators/pyramid_hash_op.cc index 62d33b2583a..fc6aab07faf 100644 --- a/paddle/fluid/operators/pyramid_hash_op.cc +++ b/paddle/fluid/operators/pyramid_hash_op.cc @@ -59,7 +59,12 @@ class PyramidHashOpMaker : public framework::OpProtoAndCheckerMaker { .EqualGreaterThan(0); AddAttr("seed", "seed").SetDefault(0).EqualGreaterThan(0); AddAttr("lr", "learning rate").SetDefault(0.0).EqualGreaterThan(0.0); - + AddAttr( + "distribute_update_vars", + "['PyramidHash_emb_0','Filter']" + "Decided which params should be updated in distribute training. " + "Used in Distribute Transpiler to create a trainer/server program.") + .SetDefault(""); AddOutput("Out", "Out (Tensor, default Tensor) Output variable"); AddOutput("DropPos", "Out (Tensor, Tensor) Output variable"); AddOutput("X_Temp_Out", "Out (Tensor, Tensor) Output variable") diff --git a/python/paddle/fluid/contrib/layers/nn.py b/python/paddle/fluid/contrib/layers/nn.py index 7074140a804..2687690978d 100644 --- a/python/paddle/fluid/contrib/layers/nn.py +++ b/python/paddle/fluid/contrib/layers/nn.py @@ -120,14 +120,14 @@ def var_conv_2d(input, padding. Besides, input.dims[1] should be 1. .. code-block:: text - + If input_channel is 2 and given row lodTensor and col lodTensor as follows: row.lod = [[5, 4]] col.lod = [[6, 7]] input is a lodTensor: input.lod = [[60, 56]] # where 60 = input_channel * 5 * 6 input.dims = [116, 1] # where 116 = 60 + 56 - + If set output_channel is 3, filter_size is [3, 3], stride is [1, 1]: output.lod = [[90, 84]] # where 90 = output_channel * [(5-1)/stride + 1] * [(6-1)/stride + 1] output.dims = [174, 1] # where 174 = 90 + 84 @@ -380,7 +380,7 @@ def tree_conv(nodes_vector, name=None): """ ${comment} - + Args: nodes_vector(${nodes_vector_type}): ${nodes_vector_comment} edge_set(${edge_set_type}): ${edge_set_comment} @@ -513,7 +513,7 @@ def multiclass_nms2(bboxes, name=None): """ **Multiclass NMS2** - + This operator is to do multi-class non maximum suppression (NMS) on boxes and scores. In the NMS step, this operator greedily selects a subset of detection bounding @@ -646,6 +646,7 @@ def search_pyramid_hash(input, param_attr_wl=None, param_attr_bl=None, name=None, + distribute_update_vars=None, dtype='float32'): """ **Pyramid hash embedding** @@ -672,6 +673,8 @@ def search_pyramid_hash(input, default weight parameter property is used. See usage for details in :ref:`api_fluid_ParamAttr` . param_attr_wl(ParamAttr): Specified parameters of white filter. param_attr_bl(ParamAttr): Specified parameters of black filter. + distribute_update_vars(list[ParamAttr.name]): Decided which params should be updated in distribute training. + Used in Distribute Transpiler to create a trainer/server program. name(str, optional): The default value is None. Normally there is no need for user to set this property. For more information, please refer to :ref:`api_guide_Name` . dtype(str): The data type of output variable, float32. @@ -700,6 +703,22 @@ def search_pyramid_hash(input, black_list.stop_gradient = True input_vars['BlackList'] = black_list + distribute_update_vars_str = "" + if distribute_update_vars: + assert isinstance(distribute_update_vars, list) + special_name_list = [] + if param_attr: + special_name_list.append(param_attr.name) + if param_attr_wl: + special_name_list.append(param_attr_wl.name) + if param_attr_bl: + special_name_list.append(param_attr_bl.name) + for param in distribute_update_vars: + if param not in special_name_list: + raise ValueError( + "Pyramid Hash layer didn't have parameter {}".format(param)) + distribute_update_vars_str = ",".join(distribute_update_vars) + res = helper.create_variable_for_type_inference(dtype) drop_pos = helper.create_variable_for_type_inference(dtype) x_temp_out = helper.create_variable_for_type_inference(dtype) @@ -721,6 +740,7 @@ def search_pyramid_hash(input, 'black_list_len': black_list_len, 'seed': seed, 'lr': lr, + 'distribute_update_vars': distribute_update_vars_str }) return res diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 1236553d671..7ce08b58c1d 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -78,6 +78,7 @@ if(NOT WITH_MKL OR NOT WITH_AVX) endif() if(WITH_COVERAGE OR NOT WITH_AVX OR WIN32) list(REMOVE_ITEM TEST_OPS test_pyramid_hash_op) + list(REMOVE_ITEM TEST_OPS test_fleet_pyramid_hash) endif() if(WITH_GPU OR NOT WITH_MKLML) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_pyramid_hash.py b/python/paddle/fluid/tests/unittests/test_fleet_pyramid_hash.py new file mode 100644 index 00000000000..fb1c6988e15 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_pyramid_hash.py @@ -0,0 +1,76 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import numpy as np +import paddle.fluid as fluid +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet +from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig + + +class TestPyramidHashOpApi(unittest.TestCase): + def test_dist_geo_server_transpiler(self): + num_voc = 128 + embed_dim = 64 + x_shape, x_lod = [16, 10], [[3, 5, 2, 6]] + x = fluid.data(name='x', shape=x_shape, dtype='int32', lod_level=1) + hash_embd = fluid.contrib.layers.search_pyramid_hash( + input=x, + num_emb=embed_dim, + space_len=num_voc * embed_dim, + pyramid_layer=4, + rand_len=16, + drop_out_percent=0.5, + is_training=True, + use_filter=False, + white_list_len=6400, + black_list_len=2800, + seed=3, + lr=0.002, + param_attr=fluid.ParamAttr( + name="PyramidHash_emb_0", + learning_rate=0, ), + param_attr_wl=fluid.ParamAttr( + name="Filter", + learning_rate=0, ), + param_attr_bl=None, + distribute_update_vars=["PyramidHash_emb_0"], + name=None) + + cost = fluid.layers.reduce_sum(hash_embd) + + role = role_maker.UserDefinedRoleMaker( + current_id=0, + role=role_maker.Role.SERVER, + worker_num=2, + server_endpoints=["127.0.0.1:36011", "127.0.0.1:36012"]) + + fleet.init(role) + + strategy = DistributeTranspilerConfig() + strategy.sync_mode = False + strategy.geo_sgd_mode = True + strategy.geo_sgd_need_push_nums = 5 + + optimizer = fluid.optimizer.SGD(0.1) + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(cost) + + pserver_startup_program = fleet.startup_program + pserver_mian_program = fleet.main_program + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_pyramid_hash_op.py b/python/paddle/fluid/tests/unittests/test_pyramid_hash_op.py index c1435d8781d..e06ee69d679 100644 --- a/python/paddle/fluid/tests/unittests/test_pyramid_hash_op.py +++ b/python/paddle/fluid/tests/unittests/test_pyramid_hash_op.py @@ -15,6 +15,9 @@ import unittest import numpy as np import paddle.fluid as fluid +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet +from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig class TestPyramidHashOpApi(unittest.TestCase): @@ -43,6 +46,7 @@ class TestPyramidHashOpApi(unittest.TestCase): name="Filter", learning_rate=0, ), param_attr_bl=None, + distribute_update_vars=["PyramidHash_emb_0"], name=None, ) place = fluid.CPUPlace() diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index ebdfde88e2e..4e643e77ef1 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -2633,4 +2633,35 @@ class DistributeTranspiler(object): ]) else: pass + + # designed for special situation + special_distribute_update_vars = self._get_distribute_update_vars() + if special_distribute_update_vars: + params_grads = params_grads + special_distribute_update_vars + return opt_ops, params_grads + + def _get_distribute_update_vars(self): + #TODO(chengmo): find more powerful and simple way to deal with these special situation + """ + This Function is used for a special model, like PyramidDnn which has pyramid hash op. + Some Parameters don't use optimizing op to update its value, but updated in its BP process. + In these cases, Transpilse can't find these special vars by optimizing op information. + So we add this function and add attr "distribute_update_vars" to tell transpiler these Parameter + need to be updated in distribute training. + We assume these special var send and receive the same var_name. + """ + block = self.origin_program.global_block() + origin_var_dict = self.origin_program.global_block().vars + params = [] + for op in block.ops: + special_attr = "distribute_update_vars" + if special_attr in op.all_attrs(): + if op.attr(special_attr): + for param_name in op.attr(special_attr).split(","): + params.append(origin_var_dict[param_name]) + unique_params = list(set(params)) + params_grads = [] + for var in unique_params: + params_grads.append([var, var]) + return params_grads diff --git a/python/paddle/fluid/transpiler/geo_sgd_transpiler.py b/python/paddle/fluid/transpiler/geo_sgd_transpiler.py index 4bd1aa2d15d..b0146538cec 100644 --- a/python/paddle/fluid/transpiler/geo_sgd_transpiler.py +++ b/python/paddle/fluid/transpiler/geo_sgd_transpiler.py @@ -137,15 +137,22 @@ class GeoSgdTranspiler(DistributeTranspiler): # send sparse id to communicator self.sparse_var = [] self.sparse_tables = [] + unique_sparse_var = {} for op in self.origin_program.global_block().ops: - if op.type == "lookup_table": - op._set_attr('remote_prefetch', False) + if "is_sparse" in op.all_attrs(): + if op.type == "lookup_table": + op._set_attr('remote_prefetch', False) for input_var_name, sparse_var_name in zip( op.input("Ids"), op.input("W")): if sparse_var_name in self.sparse_var_list: + if input_var_name in unique_sparse_var: + if unique_sparse_var[ + input_var_name] == sparse_var_name: + continue input_var = program.global_block().var(input_var_name) self.sparse_var.append(input_var) self.sparse_tables.append(sparse_var_name) + unique_sparse_var[input_var_name] = sparse_var_name # batch training loop end flag dummy_output = program.global_block().create_var( -- GitLab