未验证 提交 e87ddb28 编写于 作者: X xujiaqi01 提交者: GitHub

cherry pick 1.7 , fix copy table, add hdfs ls retry, fix save inference (#22447)

* fix copy table bug (#22432)

* fix copy table bug of lost some feasign
* test=develop

* add hdfs ls retry time and sleep time, fix save inference (#22433)

* add hdfs ls retry time and sleep time, fix save inference
* test=develop
上级 8b8530c5
...@@ -577,13 +577,6 @@ void DownpourWorker::TrainFilesWithProfiler() { ...@@ -577,13 +577,6 @@ void DownpourWorker::TrainFilesWithProfiler() {
timeline.Start(); timeline.Start();
if (copy_table_config_.need_copy()) { if (copy_table_config_.need_copy()) {
VLOG(3) << "copy_sparse_tables_.size " << copy_sparse_tables_.size(); VLOG(3) << "copy_sparse_tables_.size " << copy_sparse_tables_.size();
if (copy_table_config_.sparse_copy_by_feasign()) {
for (size_t i = 0; i < copy_sparse_tables_.size(); ++i) {
uint64_t tid = copy_sparse_tables_[i].first;
feasign_set_[tid].insert(sparse_push_keys_[tid].begin(),
sparse_push_keys_[tid].end());
}
}
if (batch_cnt % copy_table_config_.batch_num() == 0) { if (batch_cnt % copy_table_config_.batch_num() == 0) {
CopySparseTable(); CopySparseTable();
CopyDenseTable(); CopyDenseTable();
...@@ -696,6 +689,18 @@ void DownpourWorker::TrainFilesWithProfiler() { ...@@ -696,6 +689,18 @@ void DownpourWorker::TrainFilesWithProfiler() {
} }
} }
#ifdef PADDLE_WITH_PSLIB
if (copy_table_config_.need_copy()) {
if (copy_table_config_.sparse_copy_by_feasign()) {
for (size_t i = 0; i < copy_sparse_tables_.size(); ++i) {
uint64_t tid = copy_sparse_tables_[i].first;
feasign_set_[tid].insert(sparse_push_keys_[tid].begin(),
sparse_push_keys_[tid].end());
}
}
}
#endif
if (need_to_push_dense_) { if (need_to_push_dense_) {
timeline.Start(); timeline.Start();
for (int i = 0; i < param_.program_config(0).push_dense_table_id_size(); for (int i = 0; i < param_.program_config(0).push_dense_table_id_size();
...@@ -828,13 +833,6 @@ void DownpourWorker::TrainFiles() { ...@@ -828,13 +833,6 @@ void DownpourWorker::TrainFiles() {
int cur_batch; int cur_batch;
while ((cur_batch = device_reader_->Next()) > 0) { while ((cur_batch = device_reader_->Next()) > 0) {
if (copy_table_config_.need_copy()) { if (copy_table_config_.need_copy()) {
if (copy_table_config_.sparse_copy_by_feasign()) {
for (size_t i = 0; i < copy_sparse_tables_.size(); ++i) {
uint64_t tid = copy_sparse_tables_[i].first;
feasign_set_[tid].insert(sparse_push_keys_[tid].begin(),
sparse_push_keys_[tid].end());
}
}
if (batch_cnt % copy_table_config_.batch_num() == 0) { if (batch_cnt % copy_table_config_.batch_num() == 0) {
CopySparseTable(); CopySparseTable();
CopyDenseTable(); CopyDenseTable();
...@@ -918,6 +916,18 @@ void DownpourWorker::TrainFiles() { ...@@ -918,6 +916,18 @@ void DownpourWorker::TrainFiles() {
} }
} }
#ifdef PADDLE_WITH_PSLIB
if (copy_table_config_.need_copy()) {
if (copy_table_config_.sparse_copy_by_feasign()) {
for (size_t i = 0; i < copy_sparse_tables_.size(); ++i) {
uint64_t tid = copy_sparse_tables_[i].first;
feasign_set_[tid].insert(sparse_push_keys_[tid].begin(),
sparse_push_keys_[tid].end());
}
}
}
#endif
if (need_to_push_dense_) { if (need_to_push_dense_) {
for (int i = 0; i < param_.program_config(0).push_dense_table_id_size(); for (int i = 0; i < param_.program_config(0).push_dense_table_id_size();
++i) { ++i) {
......
...@@ -920,7 +920,7 @@ class FleetUtil(object): ...@@ -920,7 +920,7 @@ class FleetUtil(object):
feeded_var_names=feeded_var_names, feeded_var_names=feeded_var_names,
target_vars=target_vars, target_vars=target_vars,
executor=executor, executor=executor,
main_program=program, main_program=program.clone(),
params_filename="params") params_filename="params")
else: else:
fluid.io.save_inference_model( fluid.io.save_inference_model(
...@@ -928,7 +928,7 @@ class FleetUtil(object): ...@@ -928,7 +928,7 @@ class FleetUtil(object):
feeded_var_names=feeded_var_names, feeded_var_names=feeded_var_names,
target_vars=target_vars, target_vars=target_vars,
executor=executor, executor=executor,
main_program=program) main_program=program.clone())
configs = { configs = {
"fs.default.name": hadoop_fs_name, "fs.default.name": hadoop_fs_name,
......
...@@ -22,7 +22,7 @@ from datetime import datetime ...@@ -22,7 +22,7 @@ from datetime import datetime
import re import re
import copy import copy
import errno import errno
import time
import logging import logging
__all__ = ["HDFSClient"] __all__ = ["HDFSClient"]
...@@ -83,6 +83,7 @@ class HDFSClient(object): ...@@ -83,6 +83,7 @@ class HDFSClient(object):
ret_code = 0 ret_code = 0
ret_out = None ret_out = None
ret_err = None ret_err = None
retry_sleep_second = 3
whole_commands = " ".join(whole_commands) whole_commands = " ".join(whole_commands)
for x in range(retry_times + 1): for x in range(retry_times + 1):
proc = subprocess.Popen( proc = subprocess.Popen(
...@@ -99,6 +100,7 @@ class HDFSClient(object): ...@@ -99,6 +100,7 @@ class HDFSClient(object):
if ret_code == 0: if ret_code == 0:
break break
time.sleep(retry_sleep_second)
return ret_code, ret_out, ret_err return ret_code, ret_out, ret_err
...@@ -329,7 +331,7 @@ class HDFSClient(object): ...@@ -329,7 +331,7 @@ class HDFSClient(object):
ls_commands = ['-ls', hdfs_path] ls_commands = ['-ls', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd( returncode, output, errors = self.__run_hdfs_cmd(
ls_commands, retry_times=1) ls_commands, retry_times=10)
if returncode: if returncode:
_logger.error("HDFS list path: {} failed".format(hdfs_path)) _logger.error("HDFS list path: {} failed".format(hdfs_path))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册