未验证 提交 23876de5 编写于 作者: X xujiaqi01 提交者: GitHub

fix cache table bug, add save_paddle_inference_model, fix hdfs util bug (#21052)

* fix cache table bug
* add save_paddle_inference_model
* fix hdfs util bug
* test=develop
上级 eec9c9cb
...@@ -549,12 +549,12 @@ void FleetWrapper::SaveModel(const std::string& path, const int mode) { ...@@ -549,12 +549,12 @@ void FleetWrapper::SaveModel(const std::string& path, const int mode) {
#endif #endif
} }
double FleetWrapper::GetCacheThreshold() { double FleetWrapper::GetCacheThreshold(int table_id) {
#ifdef PADDLE_WITH_PSLIB #ifdef PADDLE_WITH_PSLIB
double cache_threshold = 0.0; double cache_threshold = 0.0;
auto ret = pslib_ptr_->_worker_ptr->flush(); auto ret = pslib_ptr_->_worker_ptr->flush();
ret.wait(); ret.wait();
ret = pslib_ptr_->_worker_ptr->get_cache_threshold(0, cache_threshold); ret = pslib_ptr_->_worker_ptr->get_cache_threshold(table_id, cache_threshold);
ret.wait(); ret.wait();
if (cache_threshold < 0) { if (cache_threshold < 0) {
LOG(ERROR) << "get cache threshold failed"; LOG(ERROR) << "get cache threshold failed";
...@@ -588,7 +588,8 @@ void FleetWrapper::CacheShuffle(int table_id, const std::string& path, ...@@ -588,7 +588,8 @@ void FleetWrapper::CacheShuffle(int table_id, const std::string& path,
int32_t FleetWrapper::SaveCache(int table_id, const std::string& path, int32_t FleetWrapper::SaveCache(int table_id, const std::string& path,
const int mode) { const int mode) {
#ifdef PADDLE_WITH_PSLIB #ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->save_cache(0, path, std::to_string(mode)); auto ret =
pslib_ptr_->_worker_ptr->save_cache(table_id, path, std::to_string(mode));
ret.wait(); ret.wait();
int32_t feasign_cnt = ret.get(); int32_t feasign_cnt = ret.get();
if (feasign_cnt == -1) { if (feasign_cnt == -1) {
......
...@@ -176,7 +176,7 @@ class FleetWrapper { ...@@ -176,7 +176,7 @@ class FleetWrapper {
// mode = 1, save delta feature, which means save diff // mode = 1, save delta feature, which means save diff
void SaveModel(const std::string& path, const int mode); void SaveModel(const std::string& path, const int mode);
// get save cache threshold // get save cache threshold
double GetCacheThreshold(); double GetCacheThreshold(int table_id);
// shuffle cache model between servers // shuffle cache model between servers
void CacheShuffle(int table_id, const std::string& path, const int mode, void CacheShuffle(int table_id, const std::string& path, const int mode,
const double cache_threshold); const double cache_threshold);
......
...@@ -274,6 +274,7 @@ class PSLib(Fleet): ...@@ -274,6 +274,7 @@ class PSLib(Fleet):
kwargs: use define property, current support following kwargs: use define property, current support following
mode(int): define for feature extension in the future, mode(int): define for feature extension in the future,
currently no use, will pass a default value 0 currently no use, will pass a default value 0
table_id(int): which table to save cache, default is 0
Returns: Returns:
feasign_num(int): cache feasign num feasign_num(int): cache feasign num
...@@ -285,23 +286,25 @@ class PSLib(Fleet): ...@@ -285,23 +286,25 @@ class PSLib(Fleet):
""" """
mode = kwargs.get("mode", 0) mode = kwargs.get("mode", 0)
table_id = kwargs.get("table_id", 0)
self._fleet_ptr.client_flush() self._fleet_ptr.client_flush()
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
cache_threshold = 0.0 cache_threshold = 0.0
if self._role_maker.is_first_worker(): if self._role_maker.is_first_worker():
cache_threshold = self._fleet_ptr.get_cache_threshold() cache_threshold = self._fleet_ptr.get_cache_threshold(table_id)
#check cache threshold right or not #check cache threshold right or not
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
if self._role_maker.is_first_worker(): if self._role_maker.is_first_worker():
self._fleet_ptr.cache_shuffle(0, dirname, mode, cache_threshold) self._fleet_ptr.cache_shuffle(table_id, dirname, mode,
cache_threshold)
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
feasign_num = -1 feasign_num = -1
if self._role_maker.is_first_worker(): if self._role_maker.is_first_worker():
feasign_num = self._fleet_ptr.save_cache(0, dirname, mode) feasign_num = self._fleet_ptr.save_cache(table_id, dirname, mode)
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
return feasign_num return feasign_num
......
...@@ -79,7 +79,8 @@ class DownpourServer(Server): ...@@ -79,7 +79,8 @@ class DownpourServer(Server):
'sparse_weight_bounds', 'sparse_embedx_dim', 'sparse_embedx_threshold', 'sparse_nonclk_coeff', \ 'sparse_weight_bounds', 'sparse_embedx_dim', 'sparse_embedx_threshold', 'sparse_nonclk_coeff', \
'sparse_click_coeff', 'sparse_base_threshold', 'sparse_delta_threshold', 'sparse_delta_keep_days', \ 'sparse_click_coeff', 'sparse_base_threshold', 'sparse_delta_threshold', 'sparse_delta_keep_days', \
'sparse_delete_after_unseen_days', 'sparse_show_click_decay_rate', 'sparse_delete_threshold', \ 'sparse_delete_after_unseen_days', 'sparse_show_click_decay_rate', 'sparse_delete_threshold', \
'sparse_converter', 'sparse_deconverter'] 'sparse_converter', 'sparse_deconverter', 'sparse_enable_cache', 'sparse_cache_rate', \
'sparse_cache_file_num']
for key in strategy: for key in strategy:
if key not in support_sparse_key_list: if key not in support_sparse_key_list:
...@@ -98,6 +99,12 @@ class DownpourServer(Server): ...@@ -98,6 +99,12 @@ class DownpourServer(Server):
table.table_class = table_class table.table_class = table_class
if table_class == 'DownpourSparseTable': if table_class == 'DownpourSparseTable':
table.enable_sparse_table_cache = strategy.get(
'sparse_enable_cache', True)
table.sparse_table_cache_rate = strategy.get('sparse_cache_rate',
0.00055)
table.sparse_table_cache_file_num = strategy.get(
'sparse_cache_file_num', 16)
table.compress_in_save = strategy.get('sparse_compress_in_save', table.compress_in_save = strategy.get('sparse_compress_in_save',
True) True)
table.shard_num = strategy.get('sparse_shard_num', 1000) table.shard_num = strategy.get('sparse_shard_num', 1000)
......
...@@ -349,6 +349,15 @@ class DistributedAdam(DistributedOptimizerImplBase): ...@@ -349,6 +349,15 @@ class DistributedAdam(DistributedOptimizerImplBase):
tp = ps_param.trainer_param.add() tp = ps_param.trainer_param.add()
tp.CopyFrom(prog_id_to_worker[k].get_desc()) tp.CopyFrom(prog_id_to_worker[k].get_desc())
ps_param.fs_client_param.uri = \
strategy.get("fs_uri", "hdfs://your_hdfs_uri")
ps_param.fs_client_param.user = \
strategy.get("fs_user", "your_hdfs_user")
ps_param.fs_client_param.passwd = \
strategy.get("fs_passwd", "your_hdfs_passwd")
ps_param.fs_client_param.hadoop_bin = \
strategy.get("fs_hadoop_bin", "$HADOOP_HOME/bin/hadoop")
opt_info = {} opt_info = {}
opt_info["program_id_to_worker"] = prog_id_to_worker opt_info["program_id_to_worker"] = prog_id_to_worker
opt_info["program_configs"] = program_configs opt_info["program_configs"] = program_configs
......
...@@ -559,7 +559,8 @@ class FleetUtil(object): ...@@ -559,7 +559,8 @@ class FleetUtil(object):
hadoop_fs_name, hadoop_fs_name,
hadoop_fs_ugi, hadoop_fs_ugi,
hadoop_home="$HADOOP_HOME", hadoop_home="$HADOOP_HOME",
donefile_name="sparse_cache.meta"): donefile_name="sparse_cache.meta",
**kwargs):
""" """
write cache donefile write cache donefile
...@@ -572,6 +573,9 @@ class FleetUtil(object): ...@@ -572,6 +573,9 @@ class FleetUtil(object):
hadoop_fs_ugi(str): hdfs/afs fs ugi hadoop_fs_ugi(str): hdfs/afs fs ugi
hadoop_home(str): hadoop home, default is "$HADOOP_HOME" hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
donefile_name(str): donefile name, default is "sparse_cache.meta" donefile_name(str): donefile name, default is "sparse_cache.meta"
kwargs(dict): user defined properties
file_num(int): cache file num
table_id(int): cache table id
Examples: Examples:
.. code-block:: python .. code-block:: python
...@@ -591,12 +595,14 @@ class FleetUtil(object): ...@@ -591,12 +595,14 @@ class FleetUtil(object):
day = str(day) day = str(day)
pass_id = str(pass_id) pass_id = str(pass_id)
key_num = int(key_num) key_num = int(key_num)
file_num = kwargs.get("file_num", 16)
table_id = kwargs.get("table_id", 0)
if pass_id != "-1": if pass_id != "-1":
suffix_name = "/%s/delta-%s/000_cache" % (day, pass_id) suffix_name = "/%s/delta-%s/%03d_cache" % (day, pass_id, table_id)
model_path = output_path.rstrip("/") + suffix_name model_path = output_path.rstrip("/") + suffix_name
else: else:
suffix_name = "/%s/base/000_cache" % day suffix_name = "/%s/base/%03d_cache" % (day, table_id)
model_path = output_path.rstrip("/") + suffix_name model_path = output_path.rstrip("/") + suffix_name
if fleet.worker_index() == 0: if fleet.worker_index() == 0:
...@@ -610,8 +616,8 @@ class FleetUtil(object): ...@@ -610,8 +616,8 @@ class FleetUtil(object):
self.rank0_error( \ self.rank0_error( \
"not write because %s already exists" % donefile_path) "not write because %s already exists" % donefile_path)
else: else:
meta_str = \ meta_str = "file_prefix:part\npart_num:%s\nkey_num:%d\n" \
"file_prefix:part\npart_num:16\nkey_num:%d\n" % key_num % (file_num, key_num)
with open(donefile_name, "w") as f: with open(donefile_name, "w") as f:
f.write(meta_str) f.write(meta_str)
client.upload( client.upload(
...@@ -743,7 +749,7 @@ class FleetUtil(object): ...@@ -743,7 +749,7 @@ class FleetUtil(object):
fleet.save_persistables(None, model_path, mode=2) fleet.save_persistables(None, model_path, mode=2)
self.rank0_print("save_xbox_base_model done") self.rank0_print("save_xbox_base_model done")
def save_cache_model(self, output_path, day, pass_id, mode=1): def save_cache_model(self, output_path, day, pass_id, mode=1, **kwargs):
""" """
save cache model save cache model
...@@ -752,6 +758,8 @@ class FleetUtil(object): ...@@ -752,6 +758,8 @@ class FleetUtil(object):
day(str|int): training day day(str|int): training day
pass_id(str|int): training pass id pass_id(str|int): training pass id
mode(str|int): save mode mode(str|int): save mode
kwargs(dict): user defined properties
table_id(int): table id to save cache
Returns: Returns:
key_num(int): cache key num key_num(int): cache key num
...@@ -767,14 +775,16 @@ class FleetUtil(object): ...@@ -767,14 +775,16 @@ class FleetUtil(object):
day = str(day) day = str(day)
pass_id = str(pass_id) pass_id = str(pass_id)
mode = int(mode) mode = int(mode)
table_id = kwargs.get("table_id", 0)
suffix_name = "/%s/delta-%s" % (day, pass_id) suffix_name = "/%s/delta-%s" % (day, pass_id)
model_path = output_path.rstrip("/") + suffix_name model_path = output_path.rstrip("/") + suffix_name
self.rank0_print("going to save_cache_model %s" % model_path) self.rank0_print("going to save_cache_model %s" % model_path)
key_num = fleet.save_cache_model(None, model_path, mode=mode) key_num = fleet.save_cache_model(
None, model_path, mode=mode, table_id=table_id)
self.rank0_print("save_cache_model done") self.rank0_print("save_cache_model done")
return key_num return key_num
def save_cache_base_model(self, output_path, day): def save_cache_base_model(self, output_path, day, **kwargs):
""" """
save cache model save cache model
...@@ -782,6 +792,8 @@ class FleetUtil(object): ...@@ -782,6 +792,8 @@ class FleetUtil(object):
output_path(str): output path output_path(str): output path
day(str|int): training day day(str|int): training day
pass_id(str|int): training pass id pass_id(str|int): training pass id
kwargs(dict): user defined properties
table_id(int): table id to save cache
Returns: Returns:
key_num(int): cache key num key_num(int): cache key num
...@@ -795,10 +807,12 @@ class FleetUtil(object): ...@@ -795,10 +807,12 @@ class FleetUtil(object):
""" """
day = str(day) day = str(day)
table_id = kwargs.get("table_id", 0)
suffix_name = "/%s/base" % day suffix_name = "/%s/base" % day
model_path = output_path.rstrip("/") + suffix_name model_path = output_path.rstrip("/") + suffix_name
self.rank0_print("going to save_cache_base_model %s" % model_path) self.rank0_print("going to save_cache_base_model %s" % model_path)
key_num = fleet.save_cache_model(None, model_path, mode=2) key_num = fleet.save_cache_model(
None, model_path, mode=2, table_id=table_id)
self.rank0_print("save_cache_base_model done") self.rank0_print("save_cache_base_model done")
return key_num return key_num
...@@ -845,6 +859,95 @@ class FleetUtil(object): ...@@ -845,6 +859,95 @@ class FleetUtil(object):
int(table.table_id), var_name_list) int(table.table_id), var_name_list)
fleet._role_maker._barrier_worker() fleet._role_maker._barrier_worker()
def save_paddle_inference_model(self,
executor,
scope,
program,
feeded_vars,
target_vars,
output_path,
day,
pass_id,
hadoop_fs_name,
hadoop_fs_ugi,
hadoop_home="$HADOOP_HOME",
save_combine=True):
"""
save paddle inference model, and upload to hdfs dnn_plugin path
Args:
executor(Executor): fluid Executor
scope(Scope): fluid Scope
program(Program): fluid Program
feeded_vars(list[Variable]): feed vars
target_vars(list[variable]): fetch vars
output_path(str): hdfs/afs output path
day(str|int): training day
pass_id(str|int): training pass
hadoop_fs_name(str): hadoop fs name
hadoop_fs_ugi(str): hadoop fs ugi
hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
save_combine(bool): whether to save in a file or seperate files,
default is True
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.save_paddle_inference_model(exe,
join_scope,
join_program,
feeded_vars,
target_vars,
"hdfs:/my/output/path/",
day=20190727,
pass_id=6,
hadoop_fs_name="xxx",
hadoop_fs_ugi="xxx,xxx")
"""
day = str(day)
pass_id = str(pass_id)
feeded_var_names = [i.name for i in feeded_vars]
model_name = "inference_model"
# pull dense before save
self.pull_all_dense_params(scope, program)
if fleet.worker_index() == 0:
with fluid.scope_guard(scope):
if save_combine:
fluid.io.save_inference_model(
dirname=model_name,
feeded_var_names=feeded_var_names,
target_vars=target_vars,
executor=exe,
main_program=program,
params_filename="params")
else:
fluid.io.save_inference_model(
dirname=model_name,
feeded_var_names=feeded_var_names,
target_vars=target_vars,
executor=exe,
main_program=program)
configs = {
"fs.default.name": hadoop_fs_name,
"hadoop.job.ugi": hadoop_fs_ugi
}
client = HDFSClient(hadoop_home, configs)
if pass_id == "-1":
dest = "%s/%s/base/dnn_plugin/" % (output_path, day)
else:
dest = "%s/%s/delta-%s/dnn_plugin/" % (output_path, day,
pass_id)
if not client.is_exist(dest):
client.makedirs(dest)
client.upload(dest, model_name)
fleet._role_maker._barrier_worker()
def save_paddle_params(self, def save_paddle_params(self,
executor, executor,
scope, scope,
......
...@@ -595,8 +595,7 @@ class HDFSClient(object): ...@@ -595,8 +595,7 @@ class HDFSClient(object):
if not self.is_exist(dest_dir): if not self.is_exist(dest_dir):
self.makedirs(dest_dir) self.makedirs(dest_dir)
put_command = ["-put", local_dir, dest_dir] put_command = ["-put", local_dir, dest_dir]
returncode, output, errors = self.__run_hdfs_cmd(put_command, returncode, output, errors = self.__run_hdfs_cmd(put_command)
retry_times)
if returncode != 0: if returncode != 0:
_logger.error("Put local dir: {} to HDFS dir: {} failed".format( _logger.error("Put local dir: {} to HDFS dir: {} failed".format(
local_dir, dest_dir)) local_dir, dest_dir))
......
...@@ -354,6 +354,7 @@ class TestDataset(unittest.TestCase): ...@@ -354,6 +354,7 @@ class TestDataset(unittest.TestCase):
dataset.dataset.merge_by_lineid() dataset.dataset.merge_by_lineid()
fleet_ptr = fluid.core.Fleet() fleet_ptr = fluid.core.Fleet()
fleet_ptr.set_client2client_config(1, 1, 1) fleet_ptr.set_client2client_config(1, 1, 1)
fleet_ptr.get_cache_threshold(0)
os.remove("./test_in_memory_dataset_run_a.txt") os.remove("./test_in_memory_dataset_run_a.txt")
os.remove("./test_in_memory_dataset_run_b.txt") os.remove("./test_in_memory_dataset_run_b.txt")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册