提交 152beec5 编写于 作者: P phlrain

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into fix_lod_reset

...@@ -393,9 +393,9 @@ paddle.fluid.contrib.MagnitudePruner.__init__ (ArgSpec(args=['self', 'threshold' ...@@ -393,9 +393,9 @@ paddle.fluid.contrib.MagnitudePruner.__init__ (ArgSpec(args=['self', 'threshold'
paddle.fluid.contrib.MagnitudePruner.prune (ArgSpec(args=['self', 'param', 'threshold'], varargs=None, keywords=None, defaults=(None,)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.contrib.MagnitudePruner.prune (ArgSpec(args=['self', 'param', 'threshold'], varargs=None, keywords=None, defaults=(None,)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.contrib.RatioPruner.__init__ (ArgSpec(args=['self', 'ratios'], varargs=None, keywords=None, defaults=(None,)), ('document', 'e7a81a325b296a9ca502ee5adb4fc85d')) paddle.fluid.contrib.RatioPruner.__init__ (ArgSpec(args=['self', 'ratios'], varargs=None, keywords=None, defaults=(None,)), ('document', 'e7a81a325b296a9ca502ee5adb4fc85d'))
paddle.fluid.contrib.RatioPruner.prune (ArgSpec(args=['self', 'param', 'ratio'], varargs=None, keywords=None, defaults=(None,)), ('document', '358cbf2978c91028fb96a195a9884645')) paddle.fluid.contrib.RatioPruner.prune (ArgSpec(args=['self', 'param', 'ratio'], varargs=None, keywords=None, defaults=(None,)), ('document', '358cbf2978c91028fb96a195a9884645'))
paddle.fluid.contrib.load_persistables_for_increment (ArgSpec(args=['dirname', 'executor', 'program', 'lookup_table_var', 'lookup_table_var_path'], varargs=None, keywords=None, defaults=None), ('document', '11fbf7e8dd2289805de291b453a33ee7')) paddle.fluid.contrib.load_persistables_for_increment (ArgSpec(args=['dirname', 'executor', 'program', 'lookup_table_var', 'lookup_table_var_path'], varargs=None, keywords=None, defaults=None), ('document', '2ab36d4f7a564f5f65e455807ad06c67'))
paddle.fluid.contrib.load_persistables_for_inference (ArgSpec(args=['dirname', 'executor', 'program', 'lookup_table_var_name'], varargs=None, keywords=None, defaults=None), ('document', '5b5577bb3d24070da819674255d16196')) paddle.fluid.contrib.load_persistables_for_inference (ArgSpec(args=['dirname', 'executor', 'program', 'lookup_table_var_name'], varargs=None, keywords=None, defaults=None), ('document', '59066bac9db0ac6ce414d05780b7333f'))
paddle.fluid.contrib.convert_dist_to_sparse_program (ArgSpec(args=['program'], varargs=None, keywords=None, defaults=None), ('document', '4efbd93876832d4d35497cdbc7a1e6d8')) paddle.fluid.contrib.convert_dist_to_sparse_program (ArgSpec(args=['program'], varargs=None, keywords=None, defaults=None), ('document', '74c39c595dc70d6be2f16d8e462d282b'))
paddle.fluid.contrib.HDFSClient.__init__ (ArgSpec(args=['self', 'hadoop_home', 'configs'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.contrib.HDFSClient.__init__ (ArgSpec(args=['self', 'hadoop_home', 'configs'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.contrib.HDFSClient.delete (ArgSpec(args=['self', 'hdfs_path'], varargs=None, keywords=None, defaults=None), ('document', 'c3721aa2d4d9ef5a857dd47b2681c03e')) paddle.fluid.contrib.HDFSClient.delete (ArgSpec(args=['self', 'hdfs_path'], varargs=None, keywords=None, defaults=None), ('document', 'c3721aa2d4d9ef5a857dd47b2681c03e'))
paddle.fluid.contrib.HDFSClient.download (ArgSpec(args=['self', 'hdfs_path', 'local_path', 'overwrite', 'unzip'], varargs=None, keywords=None, defaults=(False, False)), ('document', 'ca55bde92184d3fd0f9f5c963b25e634')) paddle.fluid.contrib.HDFSClient.download (ArgSpec(args=['self', 'hdfs_path', 'local_path', 'overwrite', 'unzip'], varargs=None, keywords=None, defaults=(False, False)), ('document', 'ca55bde92184d3fd0f9f5c963b25e634'))
......
...@@ -18,6 +18,7 @@ import os ...@@ -18,6 +18,7 @@ import os
import time import time
import logging import logging
import paddle
from paddle.fluid import core from paddle.fluid import core
from paddle.fluid import io from paddle.fluid import io
from paddle.fluid import Program from paddle.fluid import Program
...@@ -84,8 +85,9 @@ def convert_dist_to_sparse_program(program): ...@@ -84,8 +85,9 @@ def convert_dist_to_sparse_program(program):
when we train model with distributed lookup table but want to do the local inference, we can use when we train model with distributed lookup table but want to do the local inference, we can use
this function to convert the train program with distributed lookup table to sparse lookup table. this function to convert the train program with distributed lookup table to sparse lookup table.
:param program(Program): the program must be the trainer program, which will be get by the distribute transpiler. Args:
:return: program(Program): the program must be the trainer program, which will be get by the distribute transpiler.
Returns:
program: The `program` is a Program, it's the program replace distributed lookup table to sparse lookup table. program: The `program` is a Program, it's the program replace distributed lookup table to sparse lookup table.
""" """
if not program._distributed_lookup_table: if not program._distributed_lookup_table:
...@@ -128,68 +130,92 @@ def convert_dist_to_sparse_program(program): ...@@ -128,68 +130,92 @@ def convert_dist_to_sparse_program(program):
return program return program
def _load_persistable_vars(executor, dirname, program, lookup_table_vars): def load_persistables_for_increment(dirname, executor, program,
def _is_checkpoint_var(exclude_fluid_vars=None): lookup_table_var, lookup_table_var_path):
""" """
the checkpoint will not save or load all the variables. WARNING: this function will only be used for distributed training with distributed lookup table.
var type is FEED_MINIBATCH/FETCH_LIST/RAW or var name ends with @GRAD are discarded. for increment trainning, the pserver will not only load dense variables,
but also load the suitable lookup table var. Because of sliced lookup table
: param var(Variable) var with HASH, we must load the correct sliced var.
Args:
dirname(str): The directory path
executor(Executor): The executor to run for loading inference model.
program(Program): The parameter server program, which will run on Pserver.
lookup_table_var: the distributed lookup tables var name.
lookup_table_var_path: the the distributed lookup tables var location.
Returns:
None
""" """
if exclude_fluid_vars is None: def _load_persistable_vars(executor, dirname, need_load_vars):
exclude_fluid_vars = [] load_prog = Program()
load_block = load_prog.global_block()
def is_valid(var): need_delete_vars = []
if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \
var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \
var.desc.type() == core.VarDesc.VarType.RAW:
return False
# @GRAD are named for gradient variables, checkpoint will not save it.
if "@GRAD" in var.name:
return False
# .trainer_ are named for distribute train variables, checkpoint will not save it.
if ".trainer_" in var.name:
return False
# .block is named for distribute train variables, checkpoint will not save it.
if ".block" in var.name:
return False
if "tmp_" in var.name:
return False
if var.name in exclude_fluid_vars:
return False
return var.persistable for param in need_load_vars:
origin_var = param.origin
slice_var = param.slice
is_slice = param.is_slice
offset = param.offset
if is_slice:
origin = load_block.create_var(
name="{}.load".format(origin_var.name),
type=origin_var.type,
shape=origin_var.shape,
dtype=origin_var.dtype,
persistable=True)
return is_valid load_block.append_op(
type='load',
inputs={},
outputs={'Out': [origin]},
attrs={
'file_path': os.path.join(dirname, origin_var.name)
})
io.load_vars( slice = load_block.create_var(
executor, name=slice_var.name,
dirname=dirname, type=slice_var.type,
main_program=program, shape=slice_var.shape,
predicate=_is_checkpoint_var(lookup_table_vars), dtype=slice_var.dtype,
filename=None) persistable=True)
dim1_flatten = reduce(lambda x, y: x * y, slice.shape[1:])
start = int(offset / dim1_flatten)
end = int(offset / dim1_flatten + slice.shape[0])
def load_persistables_for_increment(dirname, executor, program, load_block.append_op(
lookup_table_var, lookup_table_var_path): type="slice",
""" inputs={'Input': origin},
WARNING: this function will only be used for distributed training with distributed lookup table. outputs={'Out': slice},
for increment trainning, the pserver will not only load dense variables, attrs={'axes': [0],
but also load the suitable lookup table var. Because of slice lookup table 'starts': [start],
var with HASH, we must load the correct slice var. 'ends': [end]})
need_delete_vars.append(origin)
else:
origin = load_block.create_var(
name="{}".format(origin_var.name),
type=origin_var.type,
shape=origin_var.shape,
dtype=origin_var.dtype,
persistable=True)
load_block.append_op(
type='load',
inputs={},
outputs={'Out': [origin]},
attrs={
'file_path': os.path.join(dirname, origin_var.name)
})
load_block.append_op(
type='delete_var',
inputs={'X': need_delete_vars}, )
:param dirname(str): The directory path executor.run(load_prog)
:param executor(Executor): The executor to run for loading inference model.
:param program(Program): The parameter server program, which will run on Pserver.
:param lookup_table_var: the distributed lookup tables var name.
:param lookup_table_var_path: the the distributed lookup tables var location.
:return: None
"""
def __load_lookup_table_vars(executor, main_program, lookup_table_var, def __load_lookup_table_vars(executor, main_program, lookup_table_var,
lookup_table_var_path): lookup_table_var_path):
...@@ -217,7 +243,9 @@ def load_persistables_for_increment(dirname, executor, program, ...@@ -217,7 +243,9 @@ def load_persistables_for_increment(dirname, executor, program,
"Distributed Lookup Table Vars from {}, time = {}".format( "Distributed Lookup Table Vars from {}, time = {}".format(
dirname, time.ctime())) dirname, time.ctime()))
_load_persistable_vars(executor, dirname, program, [lookup_table_var]) need_load_vars = program._parameters_on_pservers.get_distributed_vars_by_ep(
program._ps_endpoint)
_load_persistable_vars(executor, dirname, need_load_vars)
__load_lookup_table_vars(executor, program, lookup_table_var, __load_lookup_table_vars(executor, program, lookup_table_var,
lookup_table_var_path) lookup_table_var_path)
...@@ -233,14 +261,61 @@ def load_persistables_for_inference(dirname, executor, program, ...@@ -233,14 +261,61 @@ def load_persistables_for_inference(dirname, executor, program,
Inference with distributed lookup table is a little funky, this function will load distributed Inference with distributed lookup table is a little funky, this function will load distributed
lookup table vars into sparse var, can be used in local inference mode. lookup table vars into sparse var, can be used in local inference mode.
:param dirname(str): The directory path Args:
:param executor(Executor): The executor to run for loading inference model. dirname(str): The directory path
:param program(Program): The parameter server program, which will run on Pserver. executor(Executor): The executor to run for loading inference model.
:param lookup_table_var_name: the distributed lookup tables var name. program(Program): The parameter server program, which will run on Pserver.
:return: None lookup_table_var_name: the distributed lookup tables var name.
Returns:
None
"""
def _load_persistable_vars(executor, dirname, program, lookup_table_vars):
def _is_checkpoint_var(exclude_fluid_vars=None):
""" """
the checkpoint will not save or load all the variables.
var type is FEED_MINIBATCH/FETCH_LIST/RAW or var name ends with @GRAD are discarded.
def __load_lookup_table_vars(executor, dirname, main_program, : param var(Variable)
"""
if exclude_fluid_vars is None:
exclude_fluid_vars = []
def is_valid(var):
if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \
var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \
var.desc.type() == core.VarDesc.VarType.RAW:
return False
# @GRAD are named for gradient variables, checkpoint will not save it.
if "@GRAD" in var.name:
return False
# .trainer_ are named for distribute train variables, checkpoint will not save it.
if ".trainer_" in var.name:
return False
# .block is named for distribute train variables, checkpoint will not save it.
if ".block" in var.name:
return False
if "tmp_" in var.name:
return False
if var.name in exclude_fluid_vars:
return False
return var.persistable
return is_valid
io.load_vars(
executor,
dirname=dirname,
main_program=program,
predicate=_is_checkpoint_var(lookup_table_vars),
filename=None)
def _load_lookup_table_vars(executor, dirname, main_program,
lookup_table_vars): lookup_table_vars):
if not os.path.isdir(dirname): if not os.path.isdir(dirname):
raise ValueError("There is no directory named '%s'", dirname) raise ValueError("There is no directory named '%s'", dirname)
...@@ -313,11 +388,96 @@ def load_persistables_for_inference(dirname, executor, program, ...@@ -313,11 +388,96 @@ def load_persistables_for_inference(dirname, executor, program,
dirname, time.ctime())) dirname, time.ctime()))
_load_persistable_vars(executor, dirname, program, [lookup_table_var_name]) _load_persistable_vars(executor, dirname, program, [lookup_table_var_name])
__load_lookup_table_vars(executor, dirname, program, _load_lookup_table_vars(executor, dirname, program, [lookup_table_var_name])
[lookup_table_var_name])
_logger.info("Finish Load Sparse Program With " _logger.info("Finish Load Sparse Program With "
"Distributed Lookup Table Vars from {}, time = {}".format( "Distributed Lookup Table Vars from {}, time = {}".format(
dirname, time.ctime())) dirname, time.ctime()))
return program return program
def get_inference_model(main_program, feeded_var_names, target_vars):
"""
Prune the given `main_program` to build a new program especially for inference with distributed lookup table ,
and then add `feeded_vars` and `target_vars` in this program.
Args:
main_program(Program|None): The original program, which will be pruned to
build the inference model. If is setted None,
the default main program will be used.
Default: None.
feeded_var_names(list[str]): Names of variables that need to be feeded data
during inference.
target_vars(list[Variable]): Variables from which we can get inference
results.
Returns:
program(Program)
Raises:
ValueError: If `feed_var_names` is not a list of basestring.
ValueError: If `target_vars` is not a list of Variable.
"""
def prepend_feed_ops(inference_program,
feed_target_names,
feed_holder_name='feed'):
if len(feed_target_names) == 0:
return
global_block = inference_program.global_block()
feed_var = global_block.create_var(
name=feed_holder_name,
type=core.VarDesc.VarType.FEED_MINIBATCH,
persistable=True)
for i, name in enumerate(feed_target_names):
out = global_block.var(name)
global_block._prepend_op(
type='feed',
inputs={'X': [feed_var]},
outputs={'Out': [out]},
attrs={'col': i})
def append_fetch_ops(inference_program,
fetch_target_names,
fetch_holder_name='fetch'):
global_block = inference_program.global_block()
fetch_var = global_block.create_var(
name=fetch_holder_name,
type=core.VarDesc.VarType.FETCH_LIST,
persistable=True)
for i, name in enumerate(fetch_target_names):
global_block.append_op(
type='fetch',
inputs={'X': [name]},
outputs={'Out': [fetch_var]},
attrs={'col': i})
origin_program = main_program.clone()
main_program = main_program.clone()
global_block = main_program.global_block()
need_to_remove_op_index = []
for i, op in enumerate(global_block.ops):
op.desc.set_is_target(False)
if op.type == "feed" or op.type == "fetch":
need_to_remove_op_index.append(i)
for index in need_to_remove_op_index[::-1]:
global_block._remove_op(index)
main_program.desc.flush()
main_program = main_program._prune(targets=target_vars)
main_program = main_program._inference_optimize(prune_read_op=True)
fetch_var_names = [v.name for v in target_vars]
prepend_feed_ops(main_program, feeded_var_names)
append_fetch_ops(main_program, fetch_var_names)
return main_program
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册