From 8ea4218ce132a8ea4428efb753fe179aadd01aa8 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Mon, 18 Mar 2019 17:25:16 +0800 Subject: [PATCH] update load persistables for increment, test=develop (#15576) * update load persistables for increment, test=develop * update load persistables for increment, test=develop * update API Spec, test=develop * update API Spec, test=develop * add doc, test=develop * add doc, test=develop * Update lookup_table_utils.py * Update API.spec * Update lookup_table_utils.py test=develop * Update API.spec test=develop * fix api spec * Update lookup_table_utils.py test=develop --- paddle/fluid/API.spec | 6 +- .../fluid/contrib/utils/lookup_table_utils.py | 294 ++++++++++++++---- 2 files changed, 230 insertions(+), 70 deletions(-) diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index 056463205ab..66fc323e6b9 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -393,9 +393,9 @@ paddle.fluid.contrib.MagnitudePruner.__init__ (ArgSpec(args=['self', 'threshold' paddle.fluid.contrib.MagnitudePruner.prune (ArgSpec(args=['self', 'param', 'threshold'], varargs=None, keywords=None, defaults=(None,)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.contrib.RatioPruner.__init__ (ArgSpec(args=['self', 'ratios'], varargs=None, keywords=None, defaults=(None,)), ('document', 'e7a81a325b296a9ca502ee5adb4fc85d')) paddle.fluid.contrib.RatioPruner.prune (ArgSpec(args=['self', 'param', 'ratio'], varargs=None, keywords=None, defaults=(None,)), ('document', '358cbf2978c91028fb96a195a9884645')) -paddle.fluid.contrib.load_persistables_for_increment (ArgSpec(args=['dirname', 'executor', 'program', 'lookup_table_var', 'lookup_table_var_path'], varargs=None, keywords=None, defaults=None), ('document', '11fbf7e8dd2289805de291b453a33ee7')) -paddle.fluid.contrib.load_persistables_for_inference (ArgSpec(args=['dirname', 'executor', 'program', 'lookup_table_var_name'], varargs=None, keywords=None, defaults=None), ('document', '5b5577bb3d24070da819674255d16196')) -paddle.fluid.contrib.convert_dist_to_sparse_program (ArgSpec(args=['program'], varargs=None, keywords=None, defaults=None), ('document', '4efbd93876832d4d35497cdbc7a1e6d8')) +paddle.fluid.contrib.load_persistables_for_increment (ArgSpec(args=['dirname', 'executor', 'program', 'lookup_table_var', 'lookup_table_var_path'], varargs=None, keywords=None, defaults=None), ('document', '2ab36d4f7a564f5f65e455807ad06c67')) +paddle.fluid.contrib.load_persistables_for_inference (ArgSpec(args=['dirname', 'executor', 'program', 'lookup_table_var_name'], varargs=None, keywords=None, defaults=None), ('document', '59066bac9db0ac6ce414d05780b7333f')) +paddle.fluid.contrib.convert_dist_to_sparse_program (ArgSpec(args=['program'], varargs=None, keywords=None, defaults=None), ('document', '74c39c595dc70d6be2f16d8e462d282b')) paddle.fluid.contrib.HDFSClient.__init__ (ArgSpec(args=['self', 'hadoop_home', 'configs'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.contrib.HDFSClient.delete (ArgSpec(args=['self', 'hdfs_path'], varargs=None, keywords=None, defaults=None), ('document', 'c3721aa2d4d9ef5a857dd47b2681c03e')) paddle.fluid.contrib.HDFSClient.download (ArgSpec(args=['self', 'hdfs_path', 'local_path', 'overwrite', 'unzip'], varargs=None, keywords=None, defaults=(False, False)), ('document', 'ca55bde92184d3fd0f9f5c963b25e634')) diff --git a/python/paddle/fluid/contrib/utils/lookup_table_utils.py b/python/paddle/fluid/contrib/utils/lookup_table_utils.py index 20e6328d81c..a127f5b11b7 100644 --- a/python/paddle/fluid/contrib/utils/lookup_table_utils.py +++ b/python/paddle/fluid/contrib/utils/lookup_table_utils.py @@ -18,6 +18,7 @@ import os import time import logging +import paddle from paddle.fluid import core from paddle.fluid import io from paddle.fluid import Program @@ -84,8 +85,9 @@ def convert_dist_to_sparse_program(program): when we train model with distributed lookup table but want to do the local inference, we can use this function to convert the train program with distributed lookup table to sparse lookup table. - :param program(Program): the program must be the trainer program, which will be get by the distribute transpiler. - :return: + Args: + program(Program): the program must be the trainer program, which will be get by the distribute transpiler. + Returns: program: The `program` is a Program, it's the program replace distributed lookup table to sparse lookup table. """ if not program._distributed_lookup_table: @@ -128,68 +130,92 @@ def convert_dist_to_sparse_program(program): return program -def _load_persistable_vars(executor, dirname, program, lookup_table_vars): - def _is_checkpoint_var(exclude_fluid_vars=None): - """ - the checkpoint will not save or load all the variables. - var type is FEED_MINIBATCH/FETCH_LIST/RAW or var name ends with @GRAD are discarded. - - : param var(Variable) - """ - - if exclude_fluid_vars is None: - exclude_fluid_vars = [] - - def is_valid(var): - if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \ - var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \ - var.desc.type() == core.VarDesc.VarType.RAW: - return False - # @GRAD are named for gradient variables, checkpoint will not save it. - if "@GRAD" in var.name: - return False - # .trainer_ are named for distribute train variables, checkpoint will not save it. - if ".trainer_" in var.name: - return False - - # .block is named for distribute train variables, checkpoint will not save it. - if ".block" in var.name: - return False - - if "tmp_" in var.name: - return False - - if var.name in exclude_fluid_vars: - return False - - return var.persistable - - return is_valid - - io.load_vars( - executor, - dirname=dirname, - main_program=program, - predicate=_is_checkpoint_var(lookup_table_vars), - filename=None) - - def load_persistables_for_increment(dirname, executor, program, lookup_table_var, lookup_table_var_path): """ WARNING: this function will only be used for distributed training with distributed lookup table. for increment trainning, the pserver will not only load dense variables, - but also load the suitable lookup table var. Because of slice lookup table - var with HASH, we must load the correct slice var. + but also load the suitable lookup table var. Because of sliced lookup table + var with HASH, we must load the correct sliced var. + + Args: + dirname(str): The directory path + executor(Executor): The executor to run for loading inference model. + program(Program): The parameter server program, which will run on Pserver. + lookup_table_var: the distributed lookup tables var name. + lookup_table_var_path: the the distributed lookup tables var location. + + Returns: + None + """ + def _load_persistable_vars(executor, dirname, need_load_vars): + load_prog = Program() + load_block = load_prog.global_block() + need_delete_vars = [] + + for param in need_load_vars: + origin_var = param.origin + slice_var = param.slice + is_slice = param.is_slice + offset = param.offset + + if is_slice: + origin = load_block.create_var( + name="{}.load".format(origin_var.name), + type=origin_var.type, + shape=origin_var.shape, + dtype=origin_var.dtype, + persistable=True) + + load_block.append_op( + type='load', + inputs={}, + outputs={'Out': [origin]}, + attrs={ + 'file_path': os.path.join(dirname, origin_var.name) + }) + + slice = load_block.create_var( + name=slice_var.name, + type=slice_var.type, + shape=slice_var.shape, + dtype=slice_var.dtype, + persistable=True) + + dim1_flatten = reduce(lambda x, y: x * y, slice.shape[1:]) + start = int(offset / dim1_flatten) + end = int(offset / dim1_flatten + slice.shape[0]) + + load_block.append_op( + type="slice", + inputs={'Input': origin}, + outputs={'Out': slice}, + attrs={'axes': [0], + 'starts': [start], + 'ends': [end]}) + + need_delete_vars.append(origin) + else: + origin = load_block.create_var( + name="{}".format(origin_var.name), + type=origin_var.type, + shape=origin_var.shape, + dtype=origin_var.dtype, + persistable=True) + load_block.append_op( + type='load', + inputs={}, + outputs={'Out': [origin]}, + attrs={ + 'file_path': os.path.join(dirname, origin_var.name) + }) - :param dirname(str): The directory path - :param executor(Executor): The executor to run for loading inference model. - :param program(Program): The parameter server program, which will run on Pserver. - :param lookup_table_var: the distributed lookup tables var name. - :param lookup_table_var_path: the the distributed lookup tables var location. - :return: None - """ + load_block.append_op( + type='delete_var', + inputs={'X': need_delete_vars}, ) + + executor.run(load_prog) def __load_lookup_table_vars(executor, main_program, lookup_table_var, lookup_table_var_path): @@ -217,7 +243,9 @@ def load_persistables_for_increment(dirname, executor, program, "Distributed Lookup Table Vars from {}, time = {}".format( dirname, time.ctime())) - _load_persistable_vars(executor, dirname, program, [lookup_table_var]) + need_load_vars = program._parameters_on_pservers.get_distributed_vars_by_ep( + program._ps_endpoint) + _load_persistable_vars(executor, dirname, need_load_vars) __load_lookup_table_vars(executor, program, lookup_table_var, lookup_table_var_path) @@ -233,15 +261,62 @@ def load_persistables_for_inference(dirname, executor, program, Inference with distributed lookup table is a little funky, this function will load distributed lookup table vars into sparse var, can be used in local inference mode. - :param dirname(str): The directory path - :param executor(Executor): The executor to run for loading inference model. - :param program(Program): The parameter server program, which will run on Pserver. - :param lookup_table_var_name: the distributed lookup tables var name. - :return: None + Args: + dirname(str): The directory path + executor(Executor): The executor to run for loading inference model. + program(Program): The parameter server program, which will run on Pserver. + lookup_table_var_name: the distributed lookup tables var name. + Returns: + None """ - def __load_lookup_table_vars(executor, dirname, main_program, - lookup_table_vars): + def _load_persistable_vars(executor, dirname, program, lookup_table_vars): + def _is_checkpoint_var(exclude_fluid_vars=None): + """ + the checkpoint will not save or load all the variables. + var type is FEED_MINIBATCH/FETCH_LIST/RAW or var name ends with @GRAD are discarded. + + : param var(Variable) + """ + + if exclude_fluid_vars is None: + exclude_fluid_vars = [] + + def is_valid(var): + if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \ + var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \ + var.desc.type() == core.VarDesc.VarType.RAW: + return False + # @GRAD are named for gradient variables, checkpoint will not save it. + if "@GRAD" in var.name: + return False + # .trainer_ are named for distribute train variables, checkpoint will not save it. + if ".trainer_" in var.name: + return False + + # .block is named for distribute train variables, checkpoint will not save it. + if ".block" in var.name: + return False + + if "tmp_" in var.name: + return False + + if var.name in exclude_fluid_vars: + return False + + return var.persistable + + return is_valid + + io.load_vars( + executor, + dirname=dirname, + main_program=program, + predicate=_is_checkpoint_var(lookup_table_vars), + filename=None) + + def _load_lookup_table_vars(executor, dirname, main_program, + lookup_table_vars): if not os.path.isdir(dirname): raise ValueError("There is no directory named '%s'", dirname) @@ -313,11 +388,96 @@ def load_persistables_for_inference(dirname, executor, program, dirname, time.ctime())) _load_persistable_vars(executor, dirname, program, [lookup_table_var_name]) - __load_lookup_table_vars(executor, dirname, program, - [lookup_table_var_name]) + _load_lookup_table_vars(executor, dirname, program, [lookup_table_var_name]) _logger.info("Finish Load Sparse Program With " "Distributed Lookup Table Vars from {}, time = {}".format( dirname, time.ctime())) return program + + +def get_inference_model(main_program, feeded_var_names, target_vars): + """ + Prune the given `main_program` to build a new program especially for inference with distributed lookup table , + and then add `feeded_vars` and `target_vars` in this program. + + Args: + main_program(Program|None): The original program, which will be pruned to + build the inference model. If is setted None, + the default main program will be used. + Default: None. + feeded_var_names(list[str]): Names of variables that need to be feeded data + during inference. + target_vars(list[Variable]): Variables from which we can get inference + results. + Returns: + program(Program) + + Raises: + ValueError: If `feed_var_names` is not a list of basestring. + ValueError: If `target_vars` is not a list of Variable. + + """ + + def prepend_feed_ops(inference_program, + feed_target_names, + feed_holder_name='feed'): + if len(feed_target_names) == 0: + return + + global_block = inference_program.global_block() + + feed_var = global_block.create_var( + name=feed_holder_name, + type=core.VarDesc.VarType.FEED_MINIBATCH, + persistable=True) + + for i, name in enumerate(feed_target_names): + out = global_block.var(name) + global_block._prepend_op( + type='feed', + inputs={'X': [feed_var]}, + outputs={'Out': [out]}, + attrs={'col': i}) + + def append_fetch_ops(inference_program, + fetch_target_names, + fetch_holder_name='fetch'): + global_block = inference_program.global_block() + fetch_var = global_block.create_var( + name=fetch_holder_name, + type=core.VarDesc.VarType.FETCH_LIST, + persistable=True) + + for i, name in enumerate(fetch_target_names): + global_block.append_op( + type='fetch', + inputs={'X': [name]}, + outputs={'Out': [fetch_var]}, + attrs={'col': i}) + + origin_program = main_program.clone() + main_program = main_program.clone() + global_block = main_program.global_block() + + need_to_remove_op_index = [] + for i, op in enumerate(global_block.ops): + op.desc.set_is_target(False) + if op.type == "feed" or op.type == "fetch": + need_to_remove_op_index.append(i) + + for index in need_to_remove_op_index[::-1]: + global_block._remove_op(index) + + main_program.desc.flush() + + main_program = main_program._prune(targets=target_vars) + main_program = main_program._inference_optimize(prune_read_op=True) + + fetch_var_names = [v.name for v in target_vars] + + prepend_feed_ops(main_program, feeded_var_names) + append_fetch_ops(main_program, fetch_var_names) + + return main_program -- GitLab