未验证 提交 418abc92 编写于 作者: C Chengmo 提交者: GitHub

Update pyramid related OP (#21372)

* add special way to add distribute vars, Update Pyramid hash op
上级 3b84584e
......@@ -59,7 +59,12 @@ class PyramidHashOpMaker : public framework::OpProtoAndCheckerMaker {
.EqualGreaterThan(0);
AddAttr<int>("seed", "seed").SetDefault(0).EqualGreaterThan(0);
AddAttr<float>("lr", "learning rate").SetDefault(0.0).EqualGreaterThan(0.0);
AddAttr<std::string>(
"distribute_update_vars",
"['PyramidHash_emb_0','Filter']"
"Decided which params should be updated in distribute training. "
"Used in Distribute Transpiler to create a trainer/server program.")
.SetDefault("");
AddOutput("Out", "Out (Tensor, default Tensor<float>) Output variable");
AddOutput("DropPos", "Out (Tensor, Tensor<int>) Output variable");
AddOutput("X_Temp_Out", "Out (Tensor, Tensor<int>) Output variable")
......
......@@ -120,14 +120,14 @@ def var_conv_2d(input,
padding. Besides, input.dims[1] should be 1.
.. code-block:: text
If input_channel is 2 and given row lodTensor and col lodTensor as follows:
row.lod = [[5, 4]]
col.lod = [[6, 7]]
input is a lodTensor:
input.lod = [[60, 56]] # where 60 = input_channel * 5 * 6
input.dims = [116, 1] # where 116 = 60 + 56
If set output_channel is 3, filter_size is [3, 3], stride is [1, 1]:
output.lod = [[90, 84]] # where 90 = output_channel * [(5-1)/stride + 1] * [(6-1)/stride + 1]
output.dims = [174, 1] # where 174 = 90 + 84
......@@ -380,7 +380,7 @@ def tree_conv(nodes_vector,
name=None):
"""
${comment}
Args:
nodes_vector(${nodes_vector_type}): ${nodes_vector_comment}
edge_set(${edge_set_type}): ${edge_set_comment}
......@@ -513,7 +513,7 @@ def multiclass_nms2(bboxes,
name=None):
"""
**Multiclass NMS2**
This operator is to do multi-class non maximum suppression (NMS) on
boxes and scores.
In the NMS step, this operator greedily selects a subset of detection bounding
......@@ -646,6 +646,7 @@ def search_pyramid_hash(input,
param_attr_wl=None,
param_attr_bl=None,
name=None,
distribute_update_vars=None,
dtype='float32'):
"""
**Pyramid hash embedding**
......@@ -672,6 +673,8 @@ def search_pyramid_hash(input,
default weight parameter property is used. See usage for details in :ref:`api_fluid_ParamAttr` .
param_attr_wl(ParamAttr): Specified parameters of white filter.
param_attr_bl(ParamAttr): Specified parameters of black filter.
distribute_update_vars(list[ParamAttr.name]): Decided which params should be updated in distribute training.
Used in Distribute Transpiler to create a trainer/server program.
name(str, optional): The default value is None. Normally there is no need for user to set this property.
For more information, please refer to :ref:`api_guide_Name` .
dtype(str): The data type of output variable, float32.
......@@ -700,6 +703,22 @@ def search_pyramid_hash(input,
black_list.stop_gradient = True
input_vars['BlackList'] = black_list
distribute_update_vars_str = ""
if distribute_update_vars:
assert isinstance(distribute_update_vars, list)
special_name_list = []
if param_attr:
special_name_list.append(param_attr.name)
if param_attr_wl:
special_name_list.append(param_attr_wl.name)
if param_attr_bl:
special_name_list.append(param_attr_bl.name)
for param in distribute_update_vars:
if param not in special_name_list:
raise ValueError(
"Pyramid Hash layer didn't have parameter {}".format(param))
distribute_update_vars_str = ",".join(distribute_update_vars)
res = helper.create_variable_for_type_inference(dtype)
drop_pos = helper.create_variable_for_type_inference(dtype)
x_temp_out = helper.create_variable_for_type_inference(dtype)
......@@ -721,6 +740,7 @@ def search_pyramid_hash(input,
'black_list_len': black_list_len,
'seed': seed,
'lr': lr,
'distribute_update_vars': distribute_update_vars_str
})
return res
......
......@@ -78,6 +78,7 @@ if(NOT WITH_MKL OR NOT WITH_AVX)
endif()
if(WITH_COVERAGE OR NOT WITH_AVX OR WIN32)
list(REMOVE_ITEM TEST_OPS test_pyramid_hash_op)
list(REMOVE_ITEM TEST_OPS test_fleet_pyramid_hash)
endif()
if(WITH_GPU OR NOT WITH_MKLML)
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
class TestPyramidHashOpApi(unittest.TestCase):
def test_dist_geo_server_transpiler(self):
num_voc = 128
embed_dim = 64
x_shape, x_lod = [16, 10], [[3, 5, 2, 6]]
x = fluid.data(name='x', shape=x_shape, dtype='int32', lod_level=1)
hash_embd = fluid.contrib.layers.search_pyramid_hash(
input=x,
num_emb=embed_dim,
space_len=num_voc * embed_dim,
pyramid_layer=4,
rand_len=16,
drop_out_percent=0.5,
is_training=True,
use_filter=False,
white_list_len=6400,
black_list_len=2800,
seed=3,
lr=0.002,
param_attr=fluid.ParamAttr(
name="PyramidHash_emb_0",
learning_rate=0, ),
param_attr_wl=fluid.ParamAttr(
name="Filter",
learning_rate=0, ),
param_attr_bl=None,
distribute_update_vars=["PyramidHash_emb_0"],
name=None)
cost = fluid.layers.reduce_sum(hash_embd)
role = role_maker.UserDefinedRoleMaker(
current_id=0,
role=role_maker.Role.SERVER,
worker_num=2,
server_endpoints=["127.0.0.1:36011", "127.0.0.1:36012"])
fleet.init(role)
strategy = DistributeTranspilerConfig()
strategy.sync_mode = False
strategy.geo_sgd_mode = True
strategy.geo_sgd_need_push_nums = 5
optimizer = fluid.optimizer.SGD(0.1)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(cost)
pserver_startup_program = fleet.startup_program
pserver_mian_program = fleet.main_program
if __name__ == "__main__":
unittest.main()
......@@ -15,6 +15,9 @@
import unittest
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
class TestPyramidHashOpApi(unittest.TestCase):
......@@ -43,6 +46,7 @@ class TestPyramidHashOpApi(unittest.TestCase):
name="Filter",
learning_rate=0, ),
param_attr_bl=None,
distribute_update_vars=["PyramidHash_emb_0"],
name=None, )
place = fluid.CPUPlace()
......
......@@ -2633,4 +2633,35 @@ class DistributeTranspiler(object):
])
else:
pass
# designed for special situation
special_distribute_update_vars = self._get_distribute_update_vars()
if special_distribute_update_vars:
params_grads = params_grads + special_distribute_update_vars
return opt_ops, params_grads
def _get_distribute_update_vars(self):
#TODO(chengmo): find more powerful and simple way to deal with these special situation
"""
This Function is used for a special model, like PyramidDnn which has pyramid hash op.
Some Parameters don't use optimizing op to update its value, but updated in its BP process.
In these cases, Transpilse can't find these special vars by optimizing op information.
So we add this function and add attr "distribute_update_vars" to tell transpiler these Parameter
need to be updated in distribute training.
We assume these special var send and receive the same var_name.
"""
block = self.origin_program.global_block()
origin_var_dict = self.origin_program.global_block().vars
params = []
for op in block.ops:
special_attr = "distribute_update_vars"
if special_attr in op.all_attrs():
if op.attr(special_attr):
for param_name in op.attr(special_attr).split(","):
params.append(origin_var_dict[param_name])
unique_params = list(set(params))
params_grads = []
for var in unique_params:
params_grads.append([var, var])
return params_grads
......@@ -137,15 +137,22 @@ class GeoSgdTranspiler(DistributeTranspiler):
# send sparse id to communicator
self.sparse_var = []
self.sparse_tables = []
unique_sparse_var = {}
for op in self.origin_program.global_block().ops:
if op.type == "lookup_table":
op._set_attr('remote_prefetch', False)
if "is_sparse" in op.all_attrs():
if op.type == "lookup_table":
op._set_attr('remote_prefetch', False)
for input_var_name, sparse_var_name in zip(
op.input("Ids"), op.input("W")):
if sparse_var_name in self.sparse_var_list:
if input_var_name in unique_sparse_var:
if unique_sparse_var[
input_var_name] == sparse_var_name:
continue
input_var = program.global_block().var(input_var_name)
self.sparse_var.append(input_var)
self.sparse_tables.append(sparse_var_name)
unique_sparse_var[input_var_name] = sparse_var_name
# batch training loop end flag
dummy_output = program.global_block().create_var(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册