From e87ddb28c2ef7c2fff084b1c2db901b3417a6771 Mon Sep 17 00:00:00 2001 From: xujiaqi01 <173596896@qq.com> Date: Wed, 5 Feb 2020 09:52:28 +0800 Subject: [PATCH] cherry pick 1.7 , fix copy table, add hdfs ls retry, fix save inference (#22447) * fix copy table bug (#22432) * fix copy table bug of lost some feasign * test=develop * add hdfs ls retry time and sleep time, fix save inference (#22433) * add hdfs ls retry time and sleep time, fix save inference * test=develop --- paddle/fluid/framework/downpour_worker.cc | 38 ++++++++++++------- .../fluid/incubate/fleet/utils/fleet_util.py | 4 +- .../paddle/fluid/incubate/fleet/utils/hdfs.py | 6 ++- 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/paddle/fluid/framework/downpour_worker.cc b/paddle/fluid/framework/downpour_worker.cc index dd56b58c10..ebe62c7d05 100644 --- a/paddle/fluid/framework/downpour_worker.cc +++ b/paddle/fluid/framework/downpour_worker.cc @@ -577,13 +577,6 @@ void DownpourWorker::TrainFilesWithProfiler() { timeline.Start(); if (copy_table_config_.need_copy()) { VLOG(3) << "copy_sparse_tables_.size " << copy_sparse_tables_.size(); - if (copy_table_config_.sparse_copy_by_feasign()) { - for (size_t i = 0; i < copy_sparse_tables_.size(); ++i) { - uint64_t tid = copy_sparse_tables_[i].first; - feasign_set_[tid].insert(sparse_push_keys_[tid].begin(), - sparse_push_keys_[tid].end()); - } - } if (batch_cnt % copy_table_config_.batch_num() == 0) { CopySparseTable(); CopyDenseTable(); @@ -696,6 +689,18 @@ void DownpourWorker::TrainFilesWithProfiler() { } } +#ifdef PADDLE_WITH_PSLIB + if (copy_table_config_.need_copy()) { + if (copy_table_config_.sparse_copy_by_feasign()) { + for (size_t i = 0; i < copy_sparse_tables_.size(); ++i) { + uint64_t tid = copy_sparse_tables_[i].first; + feasign_set_[tid].insert(sparse_push_keys_[tid].begin(), + sparse_push_keys_[tid].end()); + } + } + } +#endif + if (need_to_push_dense_) { timeline.Start(); for (int i = 0; i < param_.program_config(0).push_dense_table_id_size(); @@ -828,13 +833,6 @@ void DownpourWorker::TrainFiles() { int cur_batch; while ((cur_batch = device_reader_->Next()) > 0) { if (copy_table_config_.need_copy()) { - if (copy_table_config_.sparse_copy_by_feasign()) { - for (size_t i = 0; i < copy_sparse_tables_.size(); ++i) { - uint64_t tid = copy_sparse_tables_[i].first; - feasign_set_[tid].insert(sparse_push_keys_[tid].begin(), - sparse_push_keys_[tid].end()); - } - } if (batch_cnt % copy_table_config_.batch_num() == 0) { CopySparseTable(); CopyDenseTable(); @@ -918,6 +916,18 @@ void DownpourWorker::TrainFiles() { } } +#ifdef PADDLE_WITH_PSLIB + if (copy_table_config_.need_copy()) { + if (copy_table_config_.sparse_copy_by_feasign()) { + for (size_t i = 0; i < copy_sparse_tables_.size(); ++i) { + uint64_t tid = copy_sparse_tables_[i].first; + feasign_set_[tid].insert(sparse_push_keys_[tid].begin(), + sparse_push_keys_[tid].end()); + } + } + } +#endif + if (need_to_push_dense_) { for (int i = 0; i < param_.program_config(0).push_dense_table_id_size(); ++i) { diff --git a/python/paddle/fluid/incubate/fleet/utils/fleet_util.py b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py index 305043c58b..247828de5a 100644 --- a/python/paddle/fluid/incubate/fleet/utils/fleet_util.py +++ b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py @@ -920,7 +920,7 @@ class FleetUtil(object): feeded_var_names=feeded_var_names, target_vars=target_vars, executor=executor, - main_program=program, + main_program=program.clone(), params_filename="params") else: fluid.io.save_inference_model( @@ -928,7 +928,7 @@ class FleetUtil(object): feeded_var_names=feeded_var_names, target_vars=target_vars, executor=executor, - main_program=program) + main_program=program.clone()) configs = { "fs.default.name": hadoop_fs_name, diff --git a/python/paddle/fluid/incubate/fleet/utils/hdfs.py b/python/paddle/fluid/incubate/fleet/utils/hdfs.py index 7474d41891..23a22531a4 100644 --- a/python/paddle/fluid/incubate/fleet/utils/hdfs.py +++ b/python/paddle/fluid/incubate/fleet/utils/hdfs.py @@ -22,7 +22,7 @@ from datetime import datetime import re import copy import errno - +import time import logging __all__ = ["HDFSClient"] @@ -83,6 +83,7 @@ class HDFSClient(object): ret_code = 0 ret_out = None ret_err = None + retry_sleep_second = 3 whole_commands = " ".join(whole_commands) for x in range(retry_times + 1): proc = subprocess.Popen( @@ -99,6 +100,7 @@ class HDFSClient(object): if ret_code == 0: break + time.sleep(retry_sleep_second) return ret_code, ret_out, ret_err @@ -329,7 +331,7 @@ class HDFSClient(object): ls_commands = ['-ls', hdfs_path] returncode, output, errors = self.__run_hdfs_cmd( - ls_commands, retry_times=1) + ls_commands, retry_times=10) if returncode: _logger.error("HDFS list path: {} failed".format(hdfs_path)) -- GitLab