From 23876de55b68d292d6917c566ad569414f2ba6c3 Mon Sep 17 00:00:00 2001 From: xujiaqi01 <173596896@qq.com> Date: Fri, 15 Nov 2019 16:42:15 +0800 Subject: [PATCH] fix cache table bug, add save_paddle_inference_model, fix hdfs util bug (#21052) * fix cache table bug * add save_paddle_inference_model * fix hdfs util bug * test=develop --- paddle/fluid/framework/fleet/fleet_wrapper.cc | 7 +- paddle/fluid/framework/fleet/fleet_wrapper.h | 2 +- .../fleet/parameter_server/pslib/__init__.py | 11 +- .../fleet/parameter_server/pslib/node.py | 9 +- .../pslib/optimizer_factory.py | 9 ++ .../fluid/incubate/fleet/utils/fleet_util.py | 121 ++++++++++++++++-- .../paddle/fluid/incubate/fleet/utils/hdfs.py | 3 +- .../fluid/tests/unittests/test_dataset.py | 1 + 8 files changed, 143 insertions(+), 20 deletions(-) diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 7aa1a6fc938..bef9c85e6d2 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -549,12 +549,12 @@ void FleetWrapper::SaveModel(const std::string& path, const int mode) { #endif } -double FleetWrapper::GetCacheThreshold() { +double FleetWrapper::GetCacheThreshold(int table_id) { #ifdef PADDLE_WITH_PSLIB double cache_threshold = 0.0; auto ret = pslib_ptr_->_worker_ptr->flush(); ret.wait(); - ret = pslib_ptr_->_worker_ptr->get_cache_threshold(0, cache_threshold); + ret = pslib_ptr_->_worker_ptr->get_cache_threshold(table_id, cache_threshold); ret.wait(); if (cache_threshold < 0) { LOG(ERROR) << "get cache threshold failed"; @@ -588,7 +588,8 @@ void FleetWrapper::CacheShuffle(int table_id, const std::string& path, int32_t FleetWrapper::SaveCache(int table_id, const std::string& path, const int mode) { #ifdef PADDLE_WITH_PSLIB - auto ret = pslib_ptr_->_worker_ptr->save_cache(0, path, std::to_string(mode)); + auto ret = + pslib_ptr_->_worker_ptr->save_cache(table_id, path, std::to_string(mode)); ret.wait(); int32_t feasign_cnt = ret.get(); if (feasign_cnt == -1) { diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.h b/paddle/fluid/framework/fleet/fleet_wrapper.h index 8ad860cbd22..73247748b0b 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.h +++ b/paddle/fluid/framework/fleet/fleet_wrapper.h @@ -176,7 +176,7 @@ class FleetWrapper { // mode = 1, save delta feature, which means save diff void SaveModel(const std::string& path, const int mode); // get save cache threshold - double GetCacheThreshold(); + double GetCacheThreshold(int table_id); // shuffle cache model between servers void CacheShuffle(int table_id, const std::string& path, const int mode, const double cache_threshold); diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index 6785823f08f..c6d62b1d027 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -273,7 +273,8 @@ class PSLib(Fleet): main_program(Program): fluid program, default None kwargs: use define property, current support following mode(int): define for feature extension in the future, - currently no use, will pass a default value 0 + currently no use, will pass a default value 0 + table_id(int): which table to save cache, default is 0 Returns: feasign_num(int): cache feasign num @@ -285,23 +286,25 @@ class PSLib(Fleet): """ mode = kwargs.get("mode", 0) + table_id = kwargs.get("table_id", 0) self._fleet_ptr.client_flush() self._role_maker._barrier_worker() cache_threshold = 0.0 if self._role_maker.is_first_worker(): - cache_threshold = self._fleet_ptr.get_cache_threshold() + cache_threshold = self._fleet_ptr.get_cache_threshold(table_id) #check cache threshold right or not self._role_maker._barrier_worker() if self._role_maker.is_first_worker(): - self._fleet_ptr.cache_shuffle(0, dirname, mode, cache_threshold) + self._fleet_ptr.cache_shuffle(table_id, dirname, mode, + cache_threshold) self._role_maker._barrier_worker() feasign_num = -1 if self._role_maker.is_first_worker(): - feasign_num = self._fleet_ptr.save_cache(0, dirname, mode) + feasign_num = self._fleet_ptr.save_cache(table_id, dirname, mode) self._role_maker._barrier_worker() return feasign_num diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py index 902f515d0bb..a283c018533 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py @@ -79,7 +79,8 @@ class DownpourServer(Server): 'sparse_weight_bounds', 'sparse_embedx_dim', 'sparse_embedx_threshold', 'sparse_nonclk_coeff', \ 'sparse_click_coeff', 'sparse_base_threshold', 'sparse_delta_threshold', 'sparse_delta_keep_days', \ 'sparse_delete_after_unseen_days', 'sparse_show_click_decay_rate', 'sparse_delete_threshold', \ - 'sparse_converter', 'sparse_deconverter'] + 'sparse_converter', 'sparse_deconverter', 'sparse_enable_cache', 'sparse_cache_rate', \ + 'sparse_cache_file_num'] for key in strategy: if key not in support_sparse_key_list: @@ -98,6 +99,12 @@ class DownpourServer(Server): table.table_class = table_class if table_class == 'DownpourSparseTable': + table.enable_sparse_table_cache = strategy.get( + 'sparse_enable_cache', True) + table.sparse_table_cache_rate = strategy.get('sparse_cache_rate', + 0.00055) + table.sparse_table_cache_file_num = strategy.get( + 'sparse_cache_file_num', 16) table.compress_in_save = strategy.get('sparse_compress_in_save', True) table.shard_num = strategy.get('sparse_shard_num', 1000) diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py index 5affa9b59c2..89225c37feb 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py @@ -349,6 +349,15 @@ class DistributedAdam(DistributedOptimizerImplBase): tp = ps_param.trainer_param.add() tp.CopyFrom(prog_id_to_worker[k].get_desc()) + ps_param.fs_client_param.uri = \ + strategy.get("fs_uri", "hdfs://your_hdfs_uri") + ps_param.fs_client_param.user = \ + strategy.get("fs_user", "your_hdfs_user") + ps_param.fs_client_param.passwd = \ + strategy.get("fs_passwd", "your_hdfs_passwd") + ps_param.fs_client_param.hadoop_bin = \ + strategy.get("fs_hadoop_bin", "$HADOOP_HOME/bin/hadoop") + opt_info = {} opt_info["program_id_to_worker"] = prog_id_to_worker opt_info["program_configs"] = program_configs diff --git a/python/paddle/fluid/incubate/fleet/utils/fleet_util.py b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py index 0f106a75253..30752b9e442 100644 --- a/python/paddle/fluid/incubate/fleet/utils/fleet_util.py +++ b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py @@ -559,7 +559,8 @@ class FleetUtil(object): hadoop_fs_name, hadoop_fs_ugi, hadoop_home="$HADOOP_HOME", - donefile_name="sparse_cache.meta"): + donefile_name="sparse_cache.meta", + **kwargs): """ write cache donefile @@ -572,6 +573,9 @@ class FleetUtil(object): hadoop_fs_ugi(str): hdfs/afs fs ugi hadoop_home(str): hadoop home, default is "$HADOOP_HOME" donefile_name(str): donefile name, default is "sparse_cache.meta" + kwargs(dict): user defined properties + file_num(int): cache file num + table_id(int): cache table id Examples: .. code-block:: python @@ -591,12 +595,14 @@ class FleetUtil(object): day = str(day) pass_id = str(pass_id) key_num = int(key_num) + file_num = kwargs.get("file_num", 16) + table_id = kwargs.get("table_id", 0) if pass_id != "-1": - suffix_name = "/%s/delta-%s/000_cache" % (day, pass_id) + suffix_name = "/%s/delta-%s/%03d_cache" % (day, pass_id, table_id) model_path = output_path.rstrip("/") + suffix_name else: - suffix_name = "/%s/base/000_cache" % day + suffix_name = "/%s/base/%03d_cache" % (day, table_id) model_path = output_path.rstrip("/") + suffix_name if fleet.worker_index() == 0: @@ -610,8 +616,8 @@ class FleetUtil(object): self.rank0_error( \ "not write because %s already exists" % donefile_path) else: - meta_str = \ - "file_prefix:part\npart_num:16\nkey_num:%d\n" % key_num + meta_str = "file_prefix:part\npart_num:%s\nkey_num:%d\n" \ + % (file_num, key_num) with open(donefile_name, "w") as f: f.write(meta_str) client.upload( @@ -743,7 +749,7 @@ class FleetUtil(object): fleet.save_persistables(None, model_path, mode=2) self.rank0_print("save_xbox_base_model done") - def save_cache_model(self, output_path, day, pass_id, mode=1): + def save_cache_model(self, output_path, day, pass_id, mode=1, **kwargs): """ save cache model @@ -752,6 +758,8 @@ class FleetUtil(object): day(str|int): training day pass_id(str|int): training pass id mode(str|int): save mode + kwargs(dict): user defined properties + table_id(int): table id to save cache Returns: key_num(int): cache key num @@ -767,14 +775,16 @@ class FleetUtil(object): day = str(day) pass_id = str(pass_id) mode = int(mode) + table_id = kwargs.get("table_id", 0) suffix_name = "/%s/delta-%s" % (day, pass_id) model_path = output_path.rstrip("/") + suffix_name self.rank0_print("going to save_cache_model %s" % model_path) - key_num = fleet.save_cache_model(None, model_path, mode=mode) + key_num = fleet.save_cache_model( + None, model_path, mode=mode, table_id=table_id) self.rank0_print("save_cache_model done") return key_num - def save_cache_base_model(self, output_path, day): + def save_cache_base_model(self, output_path, day, **kwargs): """ save cache model @@ -782,6 +792,8 @@ class FleetUtil(object): output_path(str): output path day(str|int): training day pass_id(str|int): training pass id + kwargs(dict): user defined properties + table_id(int): table id to save cache Returns: key_num(int): cache key num @@ -795,10 +807,12 @@ class FleetUtil(object): """ day = str(day) + table_id = kwargs.get("table_id", 0) suffix_name = "/%s/base" % day model_path = output_path.rstrip("/") + suffix_name self.rank0_print("going to save_cache_base_model %s" % model_path) - key_num = fleet.save_cache_model(None, model_path, mode=2) + key_num = fleet.save_cache_model( + None, model_path, mode=2, table_id=table_id) self.rank0_print("save_cache_base_model done") return key_num @@ -845,6 +859,95 @@ class FleetUtil(object): int(table.table_id), var_name_list) fleet._role_maker._barrier_worker() + def save_paddle_inference_model(self, + executor, + scope, + program, + feeded_vars, + target_vars, + output_path, + day, + pass_id, + hadoop_fs_name, + hadoop_fs_ugi, + hadoop_home="$HADOOP_HOME", + save_combine=True): + """ + save paddle inference model, and upload to hdfs dnn_plugin path + + Args: + executor(Executor): fluid Executor + scope(Scope): fluid Scope + program(Program): fluid Program + feeded_vars(list[Variable]): feed vars + target_vars(list[variable]): fetch vars + output_path(str): hdfs/afs output path + day(str|int): training day + pass_id(str|int): training pass + hadoop_fs_name(str): hadoop fs name + hadoop_fs_ugi(str): hadoop fs ugi + hadoop_home(str): hadoop home, default is "$HADOOP_HOME" + save_combine(bool): whether to save in a file or seperate files, + default is True + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.save_paddle_inference_model(exe, + join_scope, + join_program, + feeded_vars, + target_vars, + "hdfs:/my/output/path/", + day=20190727, + pass_id=6, + hadoop_fs_name="xxx", + hadoop_fs_ugi="xxx,xxx") + """ + day = str(day) + pass_id = str(pass_id) + feeded_var_names = [i.name for i in feeded_vars] + model_name = "inference_model" + # pull dense before save + self.pull_all_dense_params(scope, program) + if fleet.worker_index() == 0: + with fluid.scope_guard(scope): + if save_combine: + fluid.io.save_inference_model( + dirname=model_name, + feeded_var_names=feeded_var_names, + target_vars=target_vars, + executor=exe, + main_program=program, + params_filename="params") + else: + fluid.io.save_inference_model( + dirname=model_name, + feeded_var_names=feeded_var_names, + target_vars=target_vars, + executor=exe, + main_program=program) + + configs = { + "fs.default.name": hadoop_fs_name, + "hadoop.job.ugi": hadoop_fs_ugi + } + client = HDFSClient(hadoop_home, configs) + + if pass_id == "-1": + dest = "%s/%s/base/dnn_plugin/" % (output_path, day) + else: + dest = "%s/%s/delta-%s/dnn_plugin/" % (output_path, day, + pass_id) + if not client.is_exist(dest): + client.makedirs(dest) + + client.upload(dest, model_name) + + fleet._role_maker._barrier_worker() + def save_paddle_params(self, executor, scope, diff --git a/python/paddle/fluid/incubate/fleet/utils/hdfs.py b/python/paddle/fluid/incubate/fleet/utils/hdfs.py index 53fb92748d8..7474d418911 100644 --- a/python/paddle/fluid/incubate/fleet/utils/hdfs.py +++ b/python/paddle/fluid/incubate/fleet/utils/hdfs.py @@ -595,8 +595,7 @@ class HDFSClient(object): if not self.is_exist(dest_dir): self.makedirs(dest_dir) put_command = ["-put", local_dir, dest_dir] - returncode, output, errors = self.__run_hdfs_cmd(put_command, - retry_times) + returncode, output, errors = self.__run_hdfs_cmd(put_command) if returncode != 0: _logger.error("Put local dir: {} to HDFS dir: {} failed".format( local_dir, dest_dir)) diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index 3c507d36d80..58623adc91c 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -354,6 +354,7 @@ class TestDataset(unittest.TestCase): dataset.dataset.merge_by_lineid() fleet_ptr = fluid.core.Fleet() fleet_ptr.set_client2client_config(1, 1, 1) + fleet_ptr.get_cache_threshold(0) os.remove("./test_in_memory_dataset_run_a.txt") os.remove("./test_in_memory_dataset_run_b.txt") -- GitLab