未验证 提交 80f1c507 编写于 作者: G gongweibao 提交者: GitHub

Fix typo in interface. (#24779)

上级 7b7e6051
...@@ -21,7 +21,7 @@ namespace framework { ...@@ -21,7 +21,7 @@ namespace framework {
std::shared_ptr<FILE> shell_fopen(const std::string& path, std::shared_ptr<FILE> shell_fopen(const std::string& path,
const std::string& mode) { const std::string& mode) {
#if defined _WIN32 || defined __APPLE__ || defined PADDLE_ARM #if defined(_WIN32) || defined(__APPLE__) || defined(PADDLE_ARM)
return nullptr; return nullptr;
#else #else
if (shell_verbose()) { if (shell_verbose()) {
...@@ -48,7 +48,7 @@ std::shared_ptr<FILE> shell_fopen(const std::string& path, ...@@ -48,7 +48,7 @@ std::shared_ptr<FILE> shell_fopen(const std::string& path,
// The implementation is async signal safe // The implementation is async signal safe
// Mostly copy from CPython code // Mostly copy from CPython code
static int close_open_fds_internal() { static int close_open_fds_internal() {
#if defined _WIN32 || defined __APPLE__ || defined PADDLE_ARM #if defined(_WIN32) || defined(__APPLE__) || defined(PADDLE_ARM)
return 0; return 0;
#else #else
struct linux_dirent { struct linux_dirent {
...@@ -103,8 +103,9 @@ static int close_open_fds_internal() { ...@@ -103,8 +103,9 @@ static int close_open_fds_internal() {
} }
static int shell_popen_fork_internal(const char* real_cmd, bool do_read, static int shell_popen_fork_internal(const char* real_cmd, bool do_read,
int parent_end, int child_end) { int parent_end, int child_end,
#if defined _WIN32 || defined __APPLE__ bool redirect_stderr = false) {
#if defined(_WIN32) || defined(__APPLE__) || defined(PADDLE_ARM)
return 0; return 0;
#else #else
int child_pid = -1; int child_pid = -1;
...@@ -125,18 +126,41 @@ static int shell_popen_fork_internal(const char* real_cmd, bool do_read, ...@@ -125,18 +126,41 @@ static int shell_popen_fork_internal(const char* real_cmd, bool do_read,
if (child_end != child_std_end) { if (child_end != child_std_end) {
PCHECK(dup2(child_end, child_std_end) == child_std_end); PCHECK(dup2(child_end, child_std_end) == child_std_end);
if (redirect_stderr && do_read) {
PCHECK(dup2(child_end, 2) == 2);
}
close(child_end); close(child_end);
} }
close_open_fds_internal(); close_open_fds_internal();
PCHECK(execl("/bin/bash", "bash", "-c", real_cmd, NULL) >= 0); PCHECK(execl("/bin/bash", "bash", "-c", real_cmd, NULL) >= 0);
exit(127); // Note: just for compilation. the child don't run this line.
_exit(0);
#endif #endif
} }
static int read_from_pipe(FILE* fp, std::string* output) {
char buf[4096];
while (1) {
int n = fread(buf, 1, 4096, fp);
if (n <= 0) {
break;
}
output->append(buf, n);
}
if (!feof(fp)) {
return -1;
}
return 0;
}
std::shared_ptr<FILE> shell_popen(const std::string& cmd, std::shared_ptr<FILE> shell_popen(const std::string& cmd,
const std::string& mode, int* err_no) { const std::string& mode, int* err_no,
#if defined _WIN32 || defined __APPLE__ int* status, bool redirect_stderr) {
#if defined(_WIN32) || defined(__APPLE__) || defined(PADDLE_ARM)
return nullptr; return nullptr;
#else #else
bool do_read = mode == "r"; bool do_read = mode == "r";
...@@ -146,9 +170,7 @@ std::shared_ptr<FILE> shell_popen(const std::string& cmd, ...@@ -146,9 +170,7 @@ std::shared_ptr<FILE> shell_popen(const std::string& cmd,
return NULL; return NULL;
} }
if (shell_verbose()) { VLOG(3) << "Opening pipe[" << cmd << "] with mode[" << mode << "]";
LOG(INFO) << "Opening pipe[" << cmd << "] with mode[" << mode << "]";
}
std::string real_cmd = "set -o pipefail; " + cmd; std::string real_cmd = "set -o pipefail; " + cmd;
...@@ -168,43 +190,54 @@ std::shared_ptr<FILE> shell_popen(const std::string& cmd, ...@@ -168,43 +190,54 @@ std::shared_ptr<FILE> shell_popen(const std::string& cmd,
child_end = pipe_fds[0]; child_end = pipe_fds[0];
} }
int child_pid = shell_popen_fork_internal(real_cmd.c_str(), do_read, sighandler_t old_handler;
parent_end, child_end); old_handler = signal(SIGCHLD, SIG_DFL);
close(child_end);
fcntl(parent_end, F_SETFD, FD_CLOEXEC); fcntl(parent_end, F_SETFD, FD_CLOEXEC);
FILE* fp;
int child_pid = shell_popen_fork_internal(
real_cmd.c_str(), do_read, parent_end, child_end, redirect_stderr);
close(child_end);
FILE* fp = NULL;
if ((fp = fdopen(parent_end, mode.c_str())) == NULL) { if ((fp = fdopen(parent_end, mode.c_str())) == NULL) {
*err_no = -1; *err_no = -1;
signal(SIGCHLD, old_handler);
return NULL; return NULL;
} }
return {fp, [child_pid, cmd, err_no](FILE* fp) {
if (shell_verbose()) {
LOG(INFO) << "Closing pipe[" << cmd << "]";
}
if (fclose(fp) != 0) { return {fp, [cmd, child_pid, old_handler, err_no, status](FILE* fp) {
VLOG(3) << "Closing pipe[" << cmd << "]";
if (fclose(fp)) {
*err_no = -1; *err_no = -1;
} }
int wstatus = -1; int wstatus = -1;
// don't do this before parent read data from child pipe
// or when get the large data, it will hang!
waitpid(child_pid, &wstatus, 0); waitpid(child_pid, &wstatus, 0);
if (wstatus == 0 || wstatus == (128 + SIGPIPE) * 256 ||
(wstatus == -1 && errno == ECHILD)) { if (status) {
*status = wstatus;
}
if (WIFEXITED(wstatus) || wstatus == (128 + SIGPIPE) * 256) {
} else { } else {
PADDLE_ENFORCE_NE(
errno, ECHILD,
platform::errors::Fatal("Must not be ECHILD errno here!"));
*err_no = -1; *err_no = -1;
LOG(WARNING) << "status[" << wstatus << "], cmd[" << cmd << "]"
<< ", err_no[" << *err_no << "]";
}
if (wstatus == -1 && errno == ECHILD) {
// temporarily remove this warning
// LOG(WARNING) << "errno is ECHILD";
} }
signal(SIGCHLD, old_handler);
}}; }};
#endif #endif
} }
static int shell_p2open_fork_internal(const char* real_cmd, int pipein_fds[2], static int shell_p2open_fork_internal(const char* real_cmd, int pipein_fds[2],
int pipeout_fds[2]) { int pipeout_fds[2]) {
#if defined _WIN32 || defined __APPLE__ #if defined(_WIN32) || defined(__APPLE__) || defined(PADDLE_ARM)
return 0; return 0;
#else #else
int child_pid = -1; int child_pid = -1;
...@@ -243,7 +276,7 @@ static int shell_p2open_fork_internal(const char* real_cmd, int pipein_fds[2], ...@@ -243,7 +276,7 @@ static int shell_p2open_fork_internal(const char* real_cmd, int pipein_fds[2],
std::pair<std::shared_ptr<FILE>, std::shared_ptr<FILE>> shell_p2open( std::pair<std::shared_ptr<FILE>, std::shared_ptr<FILE>> shell_p2open(
const std::string& cmd) { const std::string& cmd) {
#if defined _WIN32 || defined __APPLE__ #if defined(_WIN32) || defined(__APPLE__) || defined(PADDLE_ARM)
return {}; return {};
#else #else
if (shell_verbose()) { if (shell_verbose()) {
...@@ -301,51 +334,102 @@ std::pair<std::shared_ptr<FILE>, std::shared_ptr<FILE>> shell_p2open( ...@@ -301,51 +334,102 @@ std::pair<std::shared_ptr<FILE>, std::shared_ptr<FILE>> shell_p2open(
#endif #endif
} }
std::string shell_get_command_output(const std::string& cmd, int time_out, #if defined(_WIN32) || defined(__APPLE__) || defined(PADDLE_ARM)
int sleep_inter, bool print_cmd) { #else
#if defined _WIN32 || defined __APPLE__ static int _get_err_no(int err_no, int status) {
if (err_no == 0) {
if (WIFEXITED(status)) {
return WEXITSTATUS(status);
}
return -1;
}
return err_no;
}
#endif
static int _shell_execute_cmd(const std::string& cmd, std::string* output,
int time_out, int sleep_inter,
bool redirect_stderr = false) {
#if defined(_WIN32) || defined(__APPLE__) || defined(PADDLE_ARM)
PADDLE_THROW(platform::errors::Unimplemented( PADDLE_THROW(platform::errors::Unimplemented(
"This function(shell_get_command_output) is not implemented under _WIN32 " "This function(shell_get_command_output) is not implemented under _WIN32 "
"or __APPLE__.")); "or __APPLE__."));
#else #else
int err_no = 0; int err_no = 0;
int status = 0;
int cmd_status = 0;
platform::Timer timer; platform::Timer timer;
do { do {
if (print_cmd) { VLOG(3) << "exec cmd:[" << cmd << "]";
LOG(INFO) << "exec cmd:[" << cmd << "]";
}
err_no = 0; err_no = 0;
std::shared_ptr<FILE> pipe = shell_popen(cmd, "r", &err_no); status = 0;
string::LineFileReader reader; *output = "";
auto pipe = shell_popen(cmd, "r", &err_no, &status, redirect_stderr);
char* buf = reader.getdelim(&*pipe, 0);
if (err_no == 0) { if (err_no == 0) {
if (buf) { // read file
return reader.get(); err_no = read_from_pipe(&*pipe, output);
if (err_no) {
LOG(WARNING) << "status[" << status << "], cmd[" << cmd << "]"
<< ", err_no[" << err_no << "]";
} }
return "";
} }
if (sleep_inter > 0) { // close file and etc.
usleep(sleep_inter); pipe = nullptr;
if (err_no) {
LOG(WARNING) << "status[" << status << "], cmd[" << cmd << "]"
<< ", err_no[" << err_no << "]";
}
cmd_status = _get_err_no(err_no, status);
// cmd run ok!
if (cmd_status == 0) {
return cmd_status;
} }
// time out
timer.Pause(); timer.Pause();
if (time_out > 0 && timer.ElapsedMS() >= time_out) { if ((time_out > 0 && timer.ElapsedMS() >= time_out) || time_out == 0) {
PADDLE_THROW(paddle::platform::errors::ExecutionTimeout( break;
"shell_get_command_output execute error errno:%d and try until "
"timeout.",
errno));
return "";
} }
timer.Resume(); timer.Resume();
pipe = nullptr; if (sleep_inter > 0) {
} while (err_no); usleep(sleep_inter * 1000);
}
} while (cmd_status);
// log when check timeout!
if (time_out != 0) {
*output += string::Sprintf(
" _shell_execute_cmd execute cmd:%s ElapsedMS:%d, err_no:%d status:%d",
cmd, timer.ElapsedMS(), err_no, cmd_status);
LOG(WARNING) << *output;
}
return cmd_status;
return "";
#endif #endif
} }
std::string shell_get_command_output(const std::string& cmd, int time_out,
int sleep_inter) {
std::string output;
_shell_execute_cmd(cmd, &output, time_out, sleep_inter);
return output;
}
std::vector<std::string> shell_execute_cmd(const std::string& cmd, int time_out,
int sleep_inter,
bool redirect_stderr) {
std::string output;
int ret =
_shell_execute_cmd(cmd, &output, time_out, sleep_inter, redirect_stderr);
return std::vector<std::string>({string::Sprintf("%d", ret), output});
}
} // end namespace framework } // end namespace framework
} // end namespace paddle } // end namespace paddle
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector>
#include "paddle/fluid/platform/port.h" #include "paddle/fluid/platform/port.h"
#include "paddle/fluid/string/string_helper.h" #include "paddle/fluid/string/string_helper.h"
...@@ -51,8 +52,10 @@ inline void shell_set_verbose(bool x) { shell_verbose_internal() = x; } ...@@ -51,8 +52,10 @@ inline void shell_set_verbose(bool x) { shell_verbose_internal() = x; }
extern std::shared_ptr<FILE> shell_fopen(const std::string& path, extern std::shared_ptr<FILE> shell_fopen(const std::string& path,
const std::string& mode); const std::string& mode);
extern std::shared_ptr<FILE> shell_popen(const std::string& cmd, std::shared_ptr<FILE> shell_popen(const std::string& cmd,
const std::string& mode, int* err_no); const std::string& mode, int* err_no,
int* status = NULL,
bool redirect_stderr = false);
extern std::pair<std::shared_ptr<FILE>, std::shared_ptr<FILE>> shell_p2open( extern std::pair<std::shared_ptr<FILE>, std::shared_ptr<FILE>> shell_p2open(
const std::string& cmd); const std::string& cmd);
...@@ -65,12 +68,17 @@ inline void shell_execute(const std::string& cmd) { ...@@ -65,12 +68,17 @@ inline void shell_execute(const std::string& cmd) {
} while (err_no == -1); } while (err_no == -1);
} }
// timeout:ms, default -1 means forever. // time_out:ms, default value:-1 means forever.
// sleep_inter:ms, default -1 means not sleep. // sleep_inter:ms, default -1 means not sleep.
extern std::string shell_get_command_output(const std::string& cmd, extern std::string shell_get_command_output(const std::string& cmd,
int time_out = -1, int time_out = 10 * 60 * 1000,
int sleep_inter = -1, int sleep_inter = 1000);
bool print_cmd = false); // time_out:ms, default -1 means forever.
// sleep_inter:ms, default -1 means not sleep.
extern std::vector<std::string> shell_execute_cmd(const std::string& cmd,
int time_out = 0,
int sleep_inter = 0,
bool redirect_stderr = false);
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -1563,6 +1563,15 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -1563,6 +1563,15 @@ All parameter, weight, gradient are variables in Paddle.
sleep_inter); sleep_inter);
}, },
py::arg("cmd"), py::arg("time_out") = -1, py::arg("sleep_inter") = -1); py::arg("cmd"), py::arg("time_out") = -1, py::arg("sleep_inter") = -1);
m.def("shell_execute_cmd",
[](const std::string &cmd, int time_out = 0, int sleep_inter = 0,
bool redirect_stderr = false) -> std::vector<std::string> {
return paddle::framework::shell_execute_cmd(
cmd, time_out, sleep_inter, redirect_stderr);
},
py::arg("cmd"), py::arg("time_out") = 0, py::arg("sleep_inter") = 0,
py::arg("redirect_stderr") = false);
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
m.def("is_float16_supported", [](const platform::CUDAPlace &place) -> bool { m.def("is_float16_supported", [](const platform::CUDAPlace &place) -> bool {
// Only GPUs with Compute Capability >= 53 support float16 // Only GPUs with Compute Capability >= 53 support float16
......
...@@ -11,6 +11,3 @@ ...@@ -11,6 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from . import fs_wrapper
__all__ = fs_wrapper.__all__
...@@ -143,7 +143,7 @@ class Trainer(object): ...@@ -143,7 +143,7 @@ class Trainer(object):
return False return False
if self.endpoint != t.endpoint or \ if self.endpoint != t.endpoint or \
self.rank != t.rank : self.rank != t.rank:
return False return False
for a, b in zip(self.gpus, t.gpus): for a, b in zip(self.gpus, t.gpus):
......
...@@ -26,7 +26,7 @@ from paddle.fluid.incubate.fleet.base.fleet_base import Mode ...@@ -26,7 +26,7 @@ from paddle.fluid.incubate.fleet.base.fleet_base import Mode
from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer
from paddle.fluid import compiler from paddle.fluid import compiler
from paddle.distributed.fs_wrapper import LocalFS, BDFS from paddle.fluid.incubate.fleet.utils.fs import LocalFS
import os import os
import sys import sys
...@@ -70,7 +70,7 @@ class Collective(Fleet): ...@@ -70,7 +70,7 @@ class Collective(Fleet):
self._origin_program = None self._origin_program = None
self._transpiled_program = None self._transpiled_program = None
self.main_program = None self.main_program = None
self._checkoint_prefix = "__paddle_fleet_checkpoint__" self._checkpoint_prefix = "__paddle_fleet_checkpoint__"
self._param_file_name = "_paddle_fleet_param__" self._param_file_name = "_paddle_fleet_param__"
def init_worker(self): def init_worker(self):
...@@ -186,8 +186,8 @@ class Collective(Fleet): ...@@ -186,8 +186,8 @@ class Collective(Fleet):
max_no = -1 max_no = -1
d = {} d = {}
dirs = fs.list_dirs(root_path) dirs = fs.list_dirs(root_path)
for dir in dirs: for d in dirs:
g = dir.split(".") g = d.split(".")
if len(g) != 2: if len(g) != 2:
continue continue
...@@ -203,10 +203,10 @@ class Collective(Fleet): ...@@ -203,10 +203,10 @@ class Collective(Fleet):
return max_no return max_no
def clean_redundant_check_points(self, def clean_redundant_checkpoints(self,
root_path, root_path,
fs=LocalFS(), fs=LocalFS(),
checkpoint_num=1): checkpoint_num=1):
max_no = self._get_last_checkpoint_no(root_path, fs) max_no = self._get_last_checkpoint_no(root_path, fs)
if max_no < 0: if max_no < 0:
return return
...@@ -215,32 +215,32 @@ class Collective(Fleet): ...@@ -215,32 +215,32 @@ class Collective(Fleet):
checkpoint_num = 1 checkpoint_num = 1
dirs = fs.list_dirs(root_path) dirs = fs.list_dirs(root_path)
for dir in dirs: for d in dirs:
g = dir.split(".") g = d.split(".")
if len(g) != 2: if len(g) != 2:
continue continue
if g[0] != self._checkoint_prefix: if g[0] != self._checkpoint_prefix:
continue continue
try: try:
n = int(g[1]) n = int(g[1])
if n <= max_no - checkpoint_num: if n <= max_no - checkpoint_num:
path = "{}/{}.{}".format(root_path, self._checkoint_prefix, path = "{}/{}.{}".format(root_path, self._checkpoint_prefix,
n) n)
fs.rmr(path) fs.delete(path)
except Exception as e: except Exception as e:
print(e) print(e)
continue continue
def save_check_point(self, def save_checkpoint(self,
executor, executor,
path, path,
train_status, train_status,
main_program=None, main_program=None,
fs=LocalFS(), fs=LocalFS(),
local_cache_path=".cache", local_cache_path=".cache",
remain_all_checkpoint=True): remain_all_checkpoint=True):
""" """
This function save persistables and current epoch num to path. This function save persistables and current epoch num to path.
""" """
...@@ -248,14 +248,16 @@ class Collective(Fleet): ...@@ -248,14 +248,16 @@ class Collective(Fleet):
if main_program == None: if main_program == None:
main_program = self._transpiled_program main_program = self._transpiled_program
if not fs.stat(path): if not fs.is_exist(path):
fs.mkdir(path) fs.mkdirs(path)
else:
assert fs.is_dir(path), "path:%s must be a directory".format(path)
max_no = self._get_last_checkpoint_no(path, fs=fs) max_no = self._get_last_checkpoint_no(path, fs=fs)
if max_no < 0: if max_no < 0:
max_no = -1 max_no = -1
real_path = "{}/{}.{}".format(path, self._checkoint_prefix, max_no + 1) real_path = "{}/{}.{}".format(path, self._checkpoint_prefix, max_no + 1)
tmp_path = "{}.tmp".format(real_path) tmp_path = "{}.tmp".format(real_path)
saved_path = tmp_path saved_path = tmp_path
...@@ -264,9 +266,14 @@ class Collective(Fleet): ...@@ -264,9 +266,14 @@ class Collective(Fleet):
cache_path = None cache_path = None
if fs.need_upload_download(): if fs.need_upload_download():
cache_path = "{}/{}.{}.saved_cache".format( cache_path = "{}/{}.{}.saved_cache".format(
local_cache_path, self._checkoint_prefix, max_no + 1) local_cache_path, self._checkpoint_prefix, max_no + 1)
if not local_fs.stat(cache_path): if not local_fs.is_exist(cache_path):
local_fs.mkdir(cache_path) local_fs.mkdirs(cache_path)
else:
assert fs.is_dir(
path), "cache path:{} must be a directory".format(
cache_path)
saved_path = cache_path saved_path = cache_path
self.save_persistables( self.save_persistables(
...@@ -282,16 +289,16 @@ class Collective(Fleet): ...@@ -282,16 +289,16 @@ class Collective(Fleet):
fs.mv(tmp_path, real_path) fs.mv(tmp_path, real_path)
if not remain_all_checkpoint: if not remain_all_checkpoint:
self.clean_redundant_check_points(path) self.clean_redundant_checkpoints(path)
def load_check_point(self, def load_checkpoint(self,
executor, executor,
path, path,
trainer_id, trainer_id,
main_program=None, main_program=None,
fs=LocalFS(), fs=LocalFS(),
local_cache_path=".cache", local_cache_path=".cache",
ignore_empty=True): ignore_empty=True):
""" """
This function load persistables and current epoch num from path. This function load persistables and current epoch num from path.
""" """
...@@ -306,11 +313,13 @@ class Collective(Fleet): ...@@ -306,11 +313,13 @@ class Collective(Fleet):
local_fs = LocalFS() local_fs = LocalFS()
if fs.need_upload_download(): if fs.need_upload_download():
cache_path = "{}/{}.{}.load_cache.{}".format( cache_path = "{}/{}.{}.load_cache.{}".format(
local_cache_path, self._checkoint_prefix, max_no, trainer_id) local_cache_path, self._checkpoint_prefix, max_no, trainer_id)
if local_fs.stat(cache_path): if not local_fs.is_exist(local_cache_path):
local_fs.mkdirs(local_cache_path)
if local_fs.is_exist(cache_path):
local_fs.delete(cache_path) local_fs.delete(cache_path)
real_path = "{}/{}.{}".format(path, self._checkoint_prefix, max_no) real_path = "{}/{}.{}".format(path, self._checkpoint_prefix, max_no)
load_path = real_path load_path = real_path
if fs.need_upload_download(): if fs.need_upload_download():
fs.download(real_path, cache_path) fs.download(real_path, cache_path)
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
...@@ -11,215 +11,154 @@ ...@@ -11,215 +11,154 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
import paddle.fluid as fluid
import sys import sys
import subprocess
import multiprocessing
from datetime import datetime
import re
import copy
import errno
import time
import logging
import abc import abc
import os from pathlib import PurePosixPath, Path
from pathlib import PurePosixPath
import shutil import shutil
__all__ = ['FS', 'LocalFS', 'BDFS'] __all__ = ['FS', 'LocalFS']
class ExecuteError(Exception):
pass
class FSFileExistsError(Exception):
pass
class FSFileNotExistsError(Exception):
pass
class FSTimeOut(Exception):
pass
class FS(object): class FS(object):
@abc.abstractmethod @abc.abstractmethod
def list_dirs(self, fs_path): def ls_dir(self, fs_path):
pass raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def ls_dir(self, fs_path): def is_file(self, fs_path):
pass raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def stat(self, fs_path): def is_dir(self, fs_path):
pass raise NotImplementedError
@abc.abstractmethod
def is_exist(self, fs_path):
raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def upload(self, local_path, fs_path): def upload(self, local_path, fs_path):
pass raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def download(self, fs_path, local_path): def download(self, fs_path, local_path):
pass raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def mkdir(self, fs_path): def mkdirs(self, fs_path):
pass raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def mv(self, fs_src_path, fs_dst_path): def delete(self, fs_path):
pass raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def rmr(self, fs_path): def need_upload_download(self):
pass raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def rm(self, fs_path): def rename(self, fs_src_path, fs_dst_path):
pass raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def delete(self, fs_path): def mv(self, fs_src_path, fs_dst_path):
pass raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def need_upload_download(self): def upload_dir(self, local_dir, dest_dir):
pass raise NotImplementedError
class LocalFS(FS): @abc.abstractmethod
def list_dirs(self, fs_path): def list_dirs(self, fs_path):
if not self.stat(fs_path): raise NotImplementedError
return []
return [
f for f in os.listdir(fs_path) if os.path.isdir(fs_path + "/" + f)
]
class LocalFS(FS):
def ls_dir(self, fs_path): def ls_dir(self, fs_path):
return [f for f in os.listdir(fs_path)] return [f for f in os.listdir(fs_path)]
def stat(self, fs_path): def mkdirs(self, fs_path):
return os.path.exists(fs_path)
def mkdir(self, fs_path):
assert not os.path.isfile(fs_path), "{} is already a file".format( assert not os.path.isfile(fs_path), "{} is already a file".format(
fs_path) fs_path)
os.system("mkdir -p {}".format(fs_path)) os.system("mkdir -p {}".format(fs_path))
def mv(self, fs_src_path, fs_dst_path): def rename(self, fs_src_path, fs_dst_path):
os.rename(fs_src_path, fs_dst_path) os.rename(fs_src_path, fs_dst_path)
def rmr(self, fs_path): def _rmr(self, fs_path):
shutil.rmtree(fs_path) shutil.rmtree(fs_path)
def rm(self, fs_path): def _rm(self, fs_path):
os.remove(fs_path) os.remove(fs_path)
def delete(self, fs_path): def delete(self, fs_path):
if not self.stat(fs_path): if not self.is_exist(fs_path):
return return
if os.path.isfile(fs_path): if os.path.isfile(fs_path):
return self.rm(fs_path) return self._rm(fs_path)
return self.rmr(fs_path) return self._rmr(fs_path)
def need_upload_download(self): def need_upload_download(self):
return False return False
def is_file(self, fs_path):
class BDFS(FS): return os.path.isfile(fs_path)
def __init__(self,
hdfs_name,
hdfs_ugi,
time_out=20 * 60 * 1000,
sleep_inter=1000):
self._base_cmd = "hadoop fs -Dfs.default.name=\"{}\" -Dhadoop.job.ugi=\"{}\"".format(
hdfs_name, hdfs_ugi)
self._time_out = time_out
self._sleep_inter = sleep_inter
def _run_cmd(self, cmd):
ret = fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter)
if len(ret) <= 0:
return []
lines = ret.splitlines()
return lines
def list_dirs(self, fs_path):
if not self.stat(fs_path):
return []
dirs, _ = self.ls_dir(fs_path)
return dirs
def ls_dir(self, fs_path):
"""
list directory under fs_path, and only give the pure name, not include the fs_path
"""
cmd = "{} -ls {}".format(self._base_cmd, fs_path)
lines = self._run_cmd(cmd)
dirs = []
files = []
for line in lines:
arr = line.split()
if len(arr) != 8:
continue
if fs_path not in arr[7]:
continue
p = PurePosixPath(arr[7])
if arr[0][0] == 'd':
dirs.append(p.name)
else:
files.append(p.name)
return dirs, files
def is_dir(self, fs_path): def is_dir(self, fs_path):
cmd = "{} -test -d {} ; echo $?".format(self._base_cmd, fs_path) return os.path.isdir(fs_path)
test = self._run_cmd(cmd)
if test[0].strip() == "0":
return True
return False
def stat(self, fs_path):
cmd = "{} -test -e {} ; echo $?".format(self._base_cmd, fs_path)
test = self._run_cmd(cmd)
if test[0].strip() == "0":
return True
return False
def upload(self, local_path, fs_path):
cmd = "{} -put {} {}".format(self._base_cmd, local_path, fs_path)
fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter)
def download(self, fs_path, local_path):
cmd = "{} -get {} {}/".format(self._base_cmd, fs_path, local_path)
fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter)
def mkdir(self, fs_path):
if not self.stat(fs_path):
cmd = "{} -mkdir {}".format(self._base_cmd, fs_path)
fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter)
def mv(self, fs_src_path, fs_dst_path): def is_exist(self, fs_path):
cmd = "{} -mv {} {}".format(self._base_cmd, fs_src_path, fs_dst_path) return os.path.exists(fs_path)
fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter)
def rmr(self, fs_path): def touch(self, fs_path):
if not self.stat(fs_path): return Path(fs_path).touch()
return
cmd = "{} -rmr {}".format(self._base_cmd, fs_path) def mv(self, src_path, dst_path):
return fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter) if not self.is_exist(src_path):
raise FSFileNotExistsError
def rm(self, fs_path): if self.is_exist(dst_path):
if not self.stat(fs_path): raise FSFileExistsError
return
cmd = "{} -rm {}".format(self._base_cmd, fs_path) return self.rename(src_path, dst_path)
return fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter)
def delete(self, fs_path): def list_dirs(self, fs_path):
if not self.stat(fs_path): """
return list directory under fs_path, and only give the pure name, not include the fs_path
"""
is_dir = self.is_dir(fs_path) if not self.is_exist(fs_path):
if is_dir: return []
return self.rmr(fs_path)
return self.rm(fs_path) dirs = [
f for f in os.listdir(fs_path) if os.path.isdir(fs_path + "/" + f)
]
def need_upload_download(self): return dirs
return True
...@@ -24,596 +24,221 @@ import copy ...@@ -24,596 +24,221 @@ import copy
import errno import errno
import time import time
import logging import logging
import six
from . import fs
from .fs import FS, LocalFS, FSFileExistsError, FSFileNotExistsError, ExecuteError, FSTimeOut
import paddle.fluid as fluid
import functools
__all__ = ["HDFSClient"] from pathlib import PurePosixPath, Path
import shutil
def get_logger(name, level, fmt):
logger = logging.getLogger(name)
logger.setLevel(level)
handler = logging.FileHandler('hdfs.log', mode='w')
formatter = logging.Formatter(fmt=fmt)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
_logger = get_logger(
__name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s')
class HDFSClient(object): __all__ = ["HDFSClient"]
"""
A tool of HDFS
Args:
hadoop_home (string): hadoop_home
configs (dict): hadoop config, it is a dict, please contain \
key "fs.default.name" and "hadoop.job.ugi"
Can be a float value
Examples:
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
client.ls("/user/com/train-25") def _handle_errors(f):
files = client.lsr("/user/com/train-25/models") def handler(*args, **kwargs):
""" start = time.time()
while True:
try:
return f(*args, **kwargs)
except ExecuteError as e:
o = args[0]
time_out = float(o._time_out) / 1000.0
inter = float(o._sleep_inter) / 1000.0
if time.time() - start >= time_out:
raise FSTimeOut
time.sleep(inter)
return functools.wraps(f)(handler)
class HDFSClient(FS):
def __init__(
self,
hadoop_home,
configs,
time_out=5 * 60 * 1000, #ms
sleep_inter=1000): #ms
# Raise exception if JAVA_HOME not exists.
java_home = os.environ["JAVA_HOME"]
def __init__(self, hadoop_home, configs):
self.pre_commands = [] self.pre_commands = []
hadoop_bin = '%s/bin/hadoop' % hadoop_home hadoop_bin = '%s/bin/hadoop' % hadoop_home
self.pre_commands.append(hadoop_bin) self.pre_commands.append(hadoop_bin)
dfs = 'fs' dfs = 'fs'
self.pre_commands.append(dfs) self.pre_commands.append(dfs)
for k, v in configs.iteritems(): if configs:
config_command = '-D%s=%s' % (k, v) for k, v in six.iteritems(configs):
self.pre_commands.append(config_command) config_command = '-D%s=%s' % (k, v)
def __run_hdfs_cmd(self, commands, retry_times=5):
whole_commands = copy.deepcopy(self.pre_commands)
whole_commands.extend(commands)
ret_code = 0
ret_out = None
ret_err = None
retry_sleep_second = 3
whole_commands = " ".join(whole_commands)
for x in range(retry_times + 1):
proc = subprocess.Popen(
whole_commands,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
(output, errors) = proc.communicate()
ret_code, ret_out, ret_err = proc.returncode, output, errors
_logger.info(
'Times: %d, Running command: %s. Return code: %d, Msg: %s' %
(x, whole_commands, proc.returncode, errors))
if ret_code == 0:
break
time.sleep(retry_sleep_second)
return ret_code, ret_out, ret_err
def cat(self, hdfs_path=None):
"""
cat hdfs file
Args:
hdfs_path(str): the hdfs file path
Returns:
file content
"""
if self.is_file(hdfs_path):
exist_cmd = ['-cat', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(
exist_cmd, retry_times=1)
if returncode != 0:
_logger.error("HDFS cat HDFS path: {} failed".format(hdfs_path))
return ""
else:
_logger.info("HDFS cat HDFS path: {} succeed".format(hdfs_path))
return output.strip()
else: self._time_out = time_out
return "" self._sleep_inter = sleep_inter
self._base_cmd = " ".join(self.pre_commands)
self._bd_err_re = re.compile(
r'\s?responseErrorMsg\s?\:.*, errorCode\:\s?[0-9]+, path\:')
def is_exist(self, hdfs_path=None): def _run_cmd(self, cmd, redirect_stderr=False):
""" ret, output = fluid.core.shell_execute_cmd(cmd, 0, 0, redirect_stderr)
whether the remote HDFS path exists return int(ret), output.splitlines()
Args:
hdfs_path(str): the hdfs file path
Returns:
True or False
"""
exist_cmd = ['-test', '-e', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(
exist_cmd, retry_times=1)
if returncode:
_logger.error("HDFS is_exist HDFS path: {} failed".format(
hdfs_path))
return False
else:
_logger.info("HDFS is_exist HDFS path: {} successfully".format(
hdfs_path))
return True
def is_dir(self, hdfs_path=None): def list_dirs(self, fs_path):
""" if not self.is_exist(fs_path):
whether the remote HDFS path is directory return []
Args: dirs, _ = self.ls_dir(fs_path)
hdfs_path(str): the hdfs file path return dirs
Returns: @_handle_errors
True or False def ls_dir(self, fs_path):
"""
list directory under fs_path, and only give the pure name, not include the fs_path
""" """
if not self.is_exist(fs_path):
return [], []
if not self.is_exist(hdfs_path): cmd = "{} -ls {}".format(self._base_cmd, fs_path)
return False ret, lines = self._run_cmd(cmd)
dir_cmd = ['-test', '-d', hdfs_path] if ret != 0:
returncode, output, errors = self.__run_hdfs_cmd(dir_cmd, retry_times=1) raise ExecuteError
if returncode: dirs = []
_logger.error("HDFS path: {} failed is not a directory".format( files = []
hdfs_path)) for line in lines:
return False arr = line.split()
else: if len(arr) != 8:
_logger.info("HDFS path: {} successfully is a directory".format( continue
hdfs_path))
return True
def is_file(self, hdfs_path=None): if fs_path not in arr[7]:
""" continue
whether the remote HDFS path is file
Args: p = PurePosixPath(arr[7])
hdfs_path(str): the hdfs file path if arr[0][0] == 'd':
dirs.append(p.name)
else:
files.append(p.name)
Returns: return dirs, files
True or False
"""
if not self.is_exist(hdfs_path): def _test_match(self, lines):
return False for l in lines:
m = self._bd_err_re.match(l)
if m != None:
return m
dir_cmd = ['-test', '-d', hdfs_path] return None
returncode, output, errors = self.__run_hdfs_cmd(dir_cmd, retry_times=1)
if returncode == 0: @_handle_errors
_logger.error("HDFS path: {} failed is not a file".format( def is_dir(self, fs_path):
hdfs_path)) if not self.is_exist(fs_path):
return False return False
else:
_logger.info("HDFS path: {} successfully is a file".format(
hdfs_path))
return True
def delete(self, hdfs_path):
"""
Remove a file or directory from HDFS.
whether the remote HDFS path exists
Args:
hdfs_path(str): HDFS path.
Returns:
True or False
This function returns `True` if the deletion was successful and `False` if
no file or directory previously existed at `hdfs_path`.
"""
_logger.info('Deleting %r.', hdfs_path)
if not self.is_exist(hdfs_path):
_logger.warn("HDFS path: {} do not exist".format(hdfs_path))
return True
if self.is_dir(hdfs_path):
del_cmd = ['-rmr', hdfs_path]
else:
del_cmd = ['-rm', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(del_cmd, retry_times=0) cmd = "{} -test -d {}".format(
self._base_cmd, fs_path, redirect_stderr=True)
ret, lines = self._run_cmd(cmd)
if ret:
# other error
if self._test_match(lines) != None:
raise ExecuteError
if returncode:
_logger.error("HDFS path: {} delete files failure".format(
hdfs_path))
return False return False
else:
_logger.info("HDFS path: {} delete files successfully".format(
hdfs_path))
return True
def rename(self, hdfs_src_path, hdfs_dst_path, overwrite=False):
"""
Move a file or folder on HDFS.
Args:
hdfs_src_path(str): HDFS path
hdfs_dst_path(str): HDFS path
overwrite(bool|False): If the path already exists and overwrite is
False, will return False.
Returns:
True or False
"""
assert hdfs_src_path is not None
assert hdfs_dst_path is not None
if not self.is_exist(hdfs_src_path):
_logger.info("HDFS path do not exist: {}".format(hdfs_src_path))
if self.is_exist(hdfs_dst_path) and not overwrite:
_logger.error("HDFS path is exist: {} and overwrite=False".format(
hdfs_dst_path))
rename_command = ['-mv', hdfs_src_path, hdfs_dst_path]
returncode, output, errors = self.__run_hdfs_cmd(
rename_command, retry_times=1)
if returncode:
_logger.error("HDFS rename path: {} to {} failed".format(
hdfs_src_path, hdfs_dst_path))
return False
else:
_logger.info("HDFS rename path: {} to {} successfully".format(
hdfs_src_path, hdfs_dst_path))
return True
@staticmethod
def make_local_dirs(local_path):
"""
create a directory local, is same to mkdir
Args:
local_path(str): local path that wants to create a directory.
"""
try:
os.makedirs(local_path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
def makedirs(self, hdfs_path):
"""
Create a remote directory, recursively if necessary.
Args:
hdfs_path(str): Remote path. Intermediate directories will be
created appropriately.
Returns: return True
True or False
"""
_logger.info('Creating directories to %r.', hdfs_path)
assert hdfs_path is not None
if self.is_exist(hdfs_path):
_logger.error("HDFS path is exist: {}".format(hdfs_path))
return
mkdirs_commands = ['-mkdir', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(
mkdirs_commands, retry_times=1)
if returncode: def is_file(self, fs_path):
_logger.error("HDFS mkdir path: {} failed".format(hdfs_path)) if not self.is_exist(fs_path):
return False return False
else:
_logger.info("HDFS mkdir path: {} successfully".format(hdfs_path))
return True
def ls(self, hdfs_path):
"""
ls directory contents about HDFS hdfs_path
Args:
hdfs_path(str): Remote HDFS path will be ls.
Returns:
List: a contents list about hdfs_path.
"""
assert hdfs_path is not None
if not self.is_exist(hdfs_path):
return []
ls_commands = ['-ls', hdfs_path] return not self.is_dir(fs_path)
returncode, output, errors = self.__run_hdfs_cmd(
ls_commands, retry_times=10)
if returncode: @_handle_errors
_logger.error("HDFS list path: {} failed".format(hdfs_path)) def is_exist(self, fs_path):
return [] cmd = "{} -ls {} ".format(self._base_cmd, fs_path)
else: ret, out = self._run_cmd(cmd, redirect_stderr=True)
_logger.info("HDFS list path: {} successfully".format(hdfs_path)) if ret != 0:
for l in out:
ret_lines = [] if "No such file or directory" in l:
regex = re.compile('\s+')
out_lines = output.strip().split("\n")
for line in out_lines:
re_line = regex.split(line)
if len(re_line) == 8:
ret_lines.append(re_line[7])
return ret_lines
def lsr(self, hdfs_path, excludes=[]):
"""
list directory contents about HDFS hdfs_path recursively
Args:
hdfs_path(str): Remote HDFS path.
excludes(list): excludes
Returns:
List: a contents list about hdfs_path.
"""
assert hdfs_path is not None
if not self.is_exist(hdfs_path):
return []
ls_commands = ['-lsr', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(
ls_commands, retry_times=1)
if returncode:
_logger.error("HDFS list all files: {} failed".format(hdfs_path))
return []
else:
_logger.info("HDFS list all files: {} successfully".format(
hdfs_path))
lines = []
regex = re.compile('\s+')
out_lines = output.strip().split("\n")
for line_id, line in enumerate(out_lines):
re_line = regex.split(line)
if len(re_line) == 8:
if re_line[0][0] == "d":
continue
if re_line[7] in excludes:
continue
else:
lines.append((re_line[7], re_line[5] + " " + re_line[6],
line_id))
lines = sorted(lines, key=lambda line: line[2])
ret_lines = [ret[0] for ret in lines]
return ret_lines
@staticmethod
def split_files(files, trainer_id, trainers):
"""
split file list
Args:
files(list): file list
trainer_id(int): trainer mpi rank id
trainers(int): all trainers num
Returns:
fileist(list): file list of current trainer
"""
remainder = len(files) % trainers
blocksize = len(files) / trainers
blocks = [blocksize] * trainers
for i in range(remainder):
blocks[i] += 1
trainer_files = [[]] * trainers
begin = 0
for i in range(trainers):
trainer_files[i] = files[begin:begin + blocks[i]]
begin += blocks[i]
return trainer_files[trainer_id]
def download(self,
hdfs_path,
local_path,
multi_processes=5,
overwrite=False,
retry_times=5):
"""
Download files from HDFS using multi process.
Args:
hdfs_path(str): path on hdfs
local_path(str): path on local
multi_processes(int|5): the download data process at the same time, default=5
overwrite(bool): is overwrite
retry_times(int): retry times
Returns:
List:
Download files in local folder.
"""
def __subprocess_download(local_path, datas):
"""
download file from HDFS
Args:
hdfs_path(str): the hdfs file path
local_path(str): the local file path
overwrite(bool|None): will overwrite the file on HDFS or not
retry_times(int|5): retry times
Returns:
True or False
"""
for data in datas:
download_commands = ["-get", data, local_path]
returncode, output, errors = self.__run_hdfs_cmd(
download_commands, retry_times=retry_times)
if returncode:
_logger.error(
"Get local path: {} from HDFS path: {} failed".format(
local_path, hdfs_path))
return False return False
return True raise ExecuteError
self.make_local_dirs(local_path)
all_files = self.ls(hdfs_path)
procs = []
for i in range(multi_processes):
process_datas = HDFSClient.split_files(all_files, i,
multi_processes)
p = multiprocessing.Process(
target=__subprocess_download,
args=(
local_path,
process_datas, ))
procs.append(p)
p.start()
# complete the processes
for proc in procs:
proc.join()
_logger.info("Finish {} multi process to download datas".format(
multi_processes))
local_downloads = []
for dirname, folder, files in os.walk(local_path):
for i in files:
t = os.path.join(dirname, i)
local_downloads.append(t)
return local_downloads
def upload(self,
hdfs_path,
local_path,
multi_processes=5,
overwrite=False,
retry_times=5):
"""
Upload files to HDFS using multi process.
Args:
hdfs_path(str): path on hdfs
local_path(str): path on local
multi_processes(int|5): the upload data process at the same time, default=5
overwrite(bool|False): will overwrite file on HDFS or not
retry_times(int): upload file max retry time.
Returns:
None
"""
def __subprocess_upload(hdfs_path_single, datas): return True
for data in datas:
put_commands = ["-put", data, hdfs_path_single]
returncode, output, errors = self.__run_hdfs_cmd(put_commands,
retry_times)
if returncode: @_handle_errors
_logger.error("Put local path: {} to HDFS path: {} failed". def upload(self, local_path, fs_path):
format(data, hdfs_path_single)) if self.is_exist(fs_path):
return False raise FSFileExistsError
return True
def get_local_files(path): local = LocalFS()
""" if not local.is_exist(local_path):
get local files raise FSFileNotExistsError
Args: cmd = "{} -put {} {}".format(self._base_cmd, local_path, fs_path)
path(str): local path ret, lines = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError
Returns: @_handle_errors
list of local files def download(self, fs_path, local_path):
""" if self.is_exist(local_path):
rlist = [] raise FSFileExistsError
if not os.path.exists(path): if not self.is_exist(fs_path):
return rlist raise FSFileNotExistsError
if os.path.isdir(path): cmd = "{} -get {} {}".format(self._base_cmd, fs_path, local_path)
for file in os.listdir(path): ret, lines = self._run_cmd(cmd)
t = os.path.join(path, file) if ret != 0:
rlist.append(t) raise ExecuteError
else:
rlist.append(path)
return rlist
all_files = get_local_files(local_path) @_handle_errors
if not all_files: def mkdirs(self, fs_path):
_logger.info("there are nothing need to upload, exit") if self.is_exist(fs_path):
return return
if self.is_exist(hdfs_path) and overwrite: cmd = "{} -mkdir {}".format(self._base_cmd, fs_path)
self.delete(hdfs_path) ret, lines = self._run_cmd(cmd)
self.makedirs(hdfs_path) if ret != 0:
raise ExecuteError
procs = []
for i in range(multi_processes): @_handle_errors
process_datas = HDFSClient.split_files(all_files, i, def mv(self, fs_src_path, fs_dst_path, test_exists=True):
multi_processes) if test_exists:
p = multiprocessing.Process( if not self.is_exist(fs_src_path):
target=__subprocess_upload, args=( raise FSFileNotExistsError
hdfs_path,
process_datas, )) if self.is_exist(fs_dst_path):
procs.append(p) raise FSFileExistsError
p.start()
cmd = "{} -mv {} {}".format(self._base_cmd, fs_src_path, fs_dst_path)
# complete the processes ret, _ = self._run_cmd(cmd)
for proc in procs: if ret != 0:
proc.join() raise ExecuteError
_logger.info("Finish upload datas from {} to {}".format(local_path, @_handle_errors
hdfs_path)) def _rmr(self, fs_path):
cmd = "{} -rmr {}".format(self._base_cmd, fs_path)
def upload_dir(self, dest_dir, local_dir, overwrite=False): ret, _ = self._run_cmd(cmd)
""" if ret != 0:
upload dir to hdfs raise ExecuteError
Args:
dest_dir(str): hdfs dest dir @_handle_errors
local_dir(str): hdfs local dir def _rm(self, fs_path):
overwrite(bool): is overwrite cmd = "{} -rm {}".format(self._base_cmd, fs_path)
Returns: ret, _ = self._run_cmd(cmd)
return code if ret != 0:
""" raise ExecuteError
local_dir = local_dir.rstrip("/")
dest_dir = dest_dir.rstrip("/") def delete(self, fs_path):
local_basename = os.path.basename(local_dir) if not self.is_exist(fs_path):
if self.is_exist(dest_dir + "/" + local_basename) and overwrite: return
self.delete(dest_dir + "/" + local_basename)
if not self.is_exist(dest_dir):
self.makedirs(dest_dir)
put_command = ["-put", local_dir, dest_dir]
returncode, output, errors = self.__run_hdfs_cmd(put_command)
if returncode != 0:
_logger.error("Put local dir: {} to HDFS dir: {} failed".format(
local_dir, dest_dir))
return False
return True
if __name__ == "__main__":
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = { is_dir = self.is_dir(fs_path)
"fs.default.name": "hdfs://xxx.hadoop.com:54310", if is_dir:
"hadoop.job.ugi": "hello,hello123" return self._rmr(fs_path)
}
client = HDFSClient(hadoop_home, configs) return self._rm(fs_path)
client.ls("/user/com/train-25") def need_upload_download(self):
files = client.lsr("/user/com/train-25/models") return True
...@@ -65,6 +65,11 @@ if(WIN32) ...@@ -65,6 +65,11 @@ if(WIN32)
LIST(REMOVE_ITEM TEST_OPS test_math_op_patch_var_base) LIST(REMOVE_ITEM TEST_OPS test_math_op_patch_var_base)
endif() endif()
if(APPLE OR WIN32)
LIST(REMOVE_ITEM TEST_OPS test_hdfs)
LIST(REMOVE_ITEM TEST_OPS test_fs_interface)
endif()
if (NOT ${WITH_GPU}) if (NOT ${WITH_GPU})
LIST(REMOVE_ITEM TEST_OPS test_conv2d_fusion_op) LIST(REMOVE_ITEM TEST_OPS test_conv2d_fusion_op)
LIST(REMOVE_ITEM TEST_OPS test_rank_attention_op) # TODO(shenliang03): rank_attention_op support CPU device in future LIST(REMOVE_ITEM TEST_OPS test_rank_attention_op) # TODO(shenliang03): rank_attention_op support CPU device in future
...@@ -351,7 +356,7 @@ if(WITH_DISTRIBUTE) ...@@ -351,7 +356,7 @@ if(WITH_DISTRIBUTE)
endif() endif()
bash_test_modules(test_launch_ps MODULES test_launch_ps.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_launch_ps MODULES test_launch_ps.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
set(dist_ut_port 1000) set(dist_ut_port 20001)
foreach(TEST_OP ${DIST_TEST_OPS}) foreach(TEST_OP ${DIST_TEST_OPS})
bash_test_modules(${TEST_OP} MODULES dist_test.sh SERIAL LABELS "RUN_TYPE=EXCLUSIVE" ENVS "PADDLE_DIST_UT_PORT=${dist_ut_port}") bash_test_modules(${TEST_OP} MODULES dist_test.sh SERIAL LABELS "RUN_TYPE=EXCLUSIVE" ENVS "PADDLE_DIST_UT_PORT=${dist_ut_port}")
MATH(EXPR dist_ut_port "${dist_ut_port}+50") MATH(EXPR dist_ut_port "${dist_ut_port}+50")
......
...@@ -17,11 +17,14 @@ import paddle.fluid as fluid ...@@ -17,11 +17,14 @@ import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet, TrainStatus from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet, TrainStatus
import os import os
from paddle.distributed.fs_wrapper import LocalFS, BDFS import sys
from paddle.fluid.incubate.fleet.utils.fs import LocalFS
from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient
class FleetTest(unittest.TestCase): class FleetTest(unittest.TestCase):
def _test_check_point(self, fs, dir_path): def _test_checkpoint(self, fs, dir_path):
file_name = "persistables" file_name = "persistables"
os.environ["TRAINING_ROLE"] = "TRAINER" os.environ["TRAINING_ROLE"] = "TRAINER"
...@@ -47,30 +50,60 @@ class FleetTest(unittest.TestCase): ...@@ -47,30 +50,60 @@ class FleetTest(unittest.TestCase):
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
status = TrainStatus(2) status = TrainStatus(2)
fleet.save_check_point(exe, dir_path, train_status=status, fs=fs) fleet.save_checkpoint(exe, dir_path, train_status=status, fs=fs)
n1 = fleet._get_last_checkpoint_no(dir_path, fs=fs) n1 = fleet._get_last_checkpoint_no(dir_path, fs=fs)
status2 = fleet.load_check_point(exe, dir_path, trainer_id=0, fs=fs) status2 = fleet.load_checkpoint(exe, dir_path, trainer_id=0, fs=fs)
self.assertEqual(status2, status) self.assertEqual(status2, status)
fleet.save_check_point(exe, dir_path, train_status=status, fs=fs) fleet.save_checkpoint(exe, dir_path, train_status=status, fs=fs)
n2 = fleet._get_last_checkpoint_no(dir_path, fs=fs) n2 = fleet._get_last_checkpoint_no(dir_path, fs=fs)
self.assertEqual(n2, n1 + 1) self.assertEqual(n2, n1 + 1)
fleet.clean_redundant_check_points(dir_path, fs=fs) fleet.clean_redundant_checkpoints(dir_path, fs=fs)
def test_hdfs_check_point(self): # unnormal
try: # test remain_all_checkpoint
fs = BDFS("xxxx", "xxxx", 1 * 1000, 1 * 1000) fleet.save_checkpoint(
dir_path = "/user/Paddle_Data/gongweibao/edl_test/my_paddle_model" exe,
self._test_check_point(fs, dir_path) dir_path,
except Exception as e: train_status=status,
print(e) fs=fs,
remain_all_checkpoint=False)
def test_local_check_point(self): # can't save under a file
fs = LocalFS()
cache_path = "./.load_cache"
fs.touch(cache_path)
try:
fleet.save_checkpoint(
exe,
dir_path,
train_status=status,
fs=fs,
cache_path=cache_path)
self.assertFalse(True)
except:
pass
# can't load under a file
try:
status2 = fleet.load_checkpoint(
exe, dir_path, trainer_id=0, fs=fs, cache_path=cache_path)
self.assertFalse(True)
except:
pass
fs.delete(cache_path)
def test_hdfs_checkpoint(self):
fs = HDFSClient("/usr/local/hadoop-2.7.7", None)
dir_path = "./checkpoint_test_hdfs"
self._test_checkpoint(fs, os.path.abspath(dir_path))
def test_local_checkpoint(self):
fs = LocalFS() fs = LocalFS()
dir_path = "./my_paddle_model" dir_path = "./checkpoint_test_local"
self._test_check_point(fs, dir_path) self._test_checkpoint(fs, dir_path)
if __name__ == '__main__': if __name__ == '__main__':
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet, TrainStatus
import os
import sys
import inspect
from paddle.fluid.incubate.fleet.utils.fs import LocalFS, FS
from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient
from paddle.fluid.incubate.fleet.utils.hdfs import FSTimeOut, FSFileExistsError, FSFileNotExistsError
class FSTest(unittest.TestCase):
def _test_method(self, func):
if sys.version_info[0] <= 2:
args = inspect.getargspec(func).args
else:
args = inspect.getfullargspec(func).args
a = None
try:
if len(args) == 1:
func()
elif len(args) == 2:
func(a)
elif len(args) == 3:
func(a, a)
print("args:", args, len(args), "func:", func)
self.assertFalse(True)
except NotImplementedError as e:
pass
def test(self):
fs = FS()
for name, func in inspect.getmembers(fs, predicate=inspect.ismethod):
self._test_method(func)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet, TrainStatus
import os
import sys
from paddle.fluid.incubate.fleet.utils.fs import LocalFS
from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient
from paddle.fluid.incubate.fleet.utils.hdfs import FSTimeOut, FSFileExistsError, FSFileNotExistsError
java_home = os.environ["JAVA_HOME"]
class FSTest(unittest.TestCase):
def _test_dirs(self, fs):
dir_path = os.path.abspath("./test_dir")
fs.delete(dir_path)
self.assertTrue(not fs.is_exist(dir_path))
fs.mkdirs(dir_path)
self.assertTrue(fs.is_exist(dir_path))
self.assertTrue(not fs.is_file(dir_path))
self.assertTrue(fs.is_dir(dir_path))
new_dir_path = os.path.abspath("./new_test_dir")
fs.delete(new_dir_path)
try:
fs.mv(new_dir_path, dir_path)
self.assertFalse(True)
except FSFileNotExistsError as e:
pass
fs.mv(dir_path, new_dir_path)
self.assertTrue(fs.is_exist(new_dir_path))
fs.mv(new_dir_path, dir_path)
self.assertTrue(fs.is_exist(dir_path))
try:
fs.mv(dir_path, dir_path)
self.assertFalse(True)
except FSFileExistsError as e:
pass
fs.delete(dir_path)
self.assertTrue(not fs.is_exist(dir_path))
def _test_touch_file(self, fs):
file_path = os.path.abspath("./test_file")
fs.delete(file_path)
self.assertTrue(not fs.is_exist(file_path))
fs.touch(file_path)
self.assertTrue(fs.is_exist(file_path))
self.assertTrue(not fs.is_dir(file_path) and fs.is_file(file_path))
new_file_path = os.path.abspath("./new_test_file")
fs.mv(file_path, new_file_path)
self.assertTrue(fs.is_exist(new_file_path))
fs.mv(new_file_path, file_path)
self.assertTrue(fs.is_exist(file_path))
fs.delete(file_path)
self.assertTrue(not fs.is_exist(file_path))
def _test_upload(self, fs):
src_file = os.path.abspath("./test_upload.src")
dst_file = os.path.abspath("./test_uolpad.dst")
try:
fs.upload(src_file, dst_file)
self.assertFalse(True)
except FSFileNotExistsError as e:
pass
local = LocalFS()
local.touch(src_file)
fs.delete(dst_file)
assert fs.need_upload_download()
fs.upload(src_file, dst_file)
try:
fs.upload(src_file, dst_file)
self.assertFalse(True)
except FSFileExistsError as e:
pass
self.assertTrue(fs.is_exist(dst_file))
fs.delete(dst_file)
fs.delete(src_file)
def _test_download(self, fs):
src_file = os.path.abspath("./test_download.src")
dst_file = os.path.abspath("./test_download.dst")
fs.delete(dst_file)
fs.delete(src_file)
try:
fs.download(src_file, dst_file)
self.assertFalse(True)
except FSFileNotExistsError as e:
pass
local = LocalFS()
local.touch(src_file)
fs.delete(dst_file)
assert fs.need_upload_download()
fs.download(src_file, dst_file)
try:
fs.download(src_file, dst_file)
self.assertFalse(True)
except FSFileExistsError as e:
pass
self.assertTrue(fs.is_exist(dst_file))
fs.delete(dst_file)
fs.delete(src_file)
def _test_mkdirs(self, fs):
dir_name = "./test_mkdir"
fs.mkdirs(dir_name)
fs.mkdirs(dir_name)
def test_exists(self):
fs = HDFSClient("/usr/local/hadoop-2.7.7/", None, time_out=15 * 1000)
self.assertFalse(fs.is_exist(os.path.abspath("./xxxx")))
self.assertFalse(fs.is_dir(os.path.abspath("./xxxx")))
self.assertTrue(fs.is_dir(os.path.abspath("./xxx/..")))
dirs, files = fs.ls_dir(os.path.abspath("./test_hdfs.py"))
self.assertTrue(dirs == [])
self.assertTrue(len(files) == 1)
dirs, files = fs.ls_dir(os.path.abspath("./xxx/.."))
def test_hdfs(self):
fs = HDFSClient("/usr/local/hadoop-2.7.7/", None, time_out=15 * 1000)
self._test_dirs(fs)
self._test_upload(fs)
self._test_download(fs)
self._test_mkdirs(fs)
self._test_list_dir(fs)
def test_local(self):
fs = LocalFS()
self._test_dirs(fs)
self._test_touch_file(fs)
self._test_mkdirs(fs)
self._test_list_dir(fs)
def test_timeout(self):
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
None,
time_out=6 * 1000,
sleep_inter=2000)
src = "hdfs_test_timeout"
dst = "new_hdfs_test_timeout"
fs.delete(dst)
fs.mkdirs(src)
fs.mkdirs(dst)
fs.mkdirs(dst + "/" + src)
output = ""
try:
fs.mv(src, dst, test_exists=False)
self.assertFalse(1, "can't execute cmd:{} output:{}".format(cmd,
output))
except FSTimeOut as e:
print("execute mv {} to {} timeout".format(src, dst))
cmd = "{} -mv {} {}".format(fs._base_cmd, src, dst)
ret, output = fluid.core.shell_execute_cmd(cmd, 6 * 1000, 2 * 1000)
self.assertNotEqual(ret, 0)
print("second mv ret:{} output:{}".format(ret, output))
def test_is_dir(self):
fs = HDFSClient("/usr/local/hadoop-2.7.7/", None, time_out=15 * 1000)
self.assertFalse(fs.is_dir("./test_hdfs.py"))
s = """
java.io.IOException: Input/output error
responseErrorMsg : failed to getFileStatus, errorCode: 3, path: /user/PUBLIC_KM_Data/wangxi16/data/serving_model, lparam: d868f6bb6822c621, errorMessage: inner error
at org.apache.hadoop.util.FileSystemUtil.throwException(FileSystemUtil.java:164)
at org.apache.hadoop.util.FileSystemUtil.dealWithResponse(FileSystemUtil.java:118)
at org.apache.hadoop.lite.client.LiteClientImpl.getFileStatus(LiteClientImpl.java:696)
at org.apache.hadoop.fs.LibDFileSystemImpl.getFileStatus(LibDFileSystemImpl.java:297)
at org.apache.hadoop.fs.LiteFileSystem.getFileStatus(LiteFileSystem.java:514)
at org.apache.hadoop.fs.FsShell.test(FsShell.java:1092)
at org.apache.hadoop.fs.FsShell.run(FsShell.java:2285)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.fs.FsShell.main(FsShell.java:2353)
"""
print("split lines:", s.splitlines())
self.assertTrue(fs._test_match(s.splitlines()) != None)
def test_config(self):
config = {"fs.default.name": "hdfs://xxx", "hadoop.job.ugi": "ugi"}
fs = HDFSClient("/usr/local/hadoop-2.7.7/", config, time_out=15 * 1000)
def _test_list_dir(self, fs):
fs = HDFSClient("/usr/local/hadoop-2.7.7/", None, time_out=15 * 1000)
fs.ls_dir("test_not_exists")
if __name__ == '__main__':
unittest.main()
...@@ -100,10 +100,13 @@ WORKDIR /home/setuptools-40.6.2 ...@@ -100,10 +100,13 @@ WORKDIR /home/setuptools-40.6.2
RUN python setup.py build RUN python setup.py build
RUN python setup.py install RUN python setup.py install
WORKDIR /home WORKDIR /home
RUN wget https://files.pythonhosted.org/packages/69/81/52b68d0a4de760a2f1979b0931ba7889202f302072cc7a0d614211bc7579/pip-18.0.tar.gz
RUN tar -zxvf pip-18.0.tar.gz RUN wget https://files.pythonhosted.org/packages/69/81/52b68d0a4de760a2f1979b0931ba7889202f302072cc7a0d614211bc7579/pip-18.0.tar.gz && tar -zxvf pip-18.0.tar.gz
WORKDIR pip-18.0 WORKDIR pip-18.0
RUN python setup.py install RUN python setup.py install && \
python3.7 setup.py install && \
python3.6 setup.py install && \
python3 setup.py install
WORKDIR /home WORKDIR /home
RUN rm Python-$version.tgz setuptools-40.6.2.zip pip-18.0.tar.gz && \ RUN rm Python-$version.tgz setuptools-40.6.2.zip pip-18.0.tar.gz && \
...@@ -225,6 +228,9 @@ RUN wget https://paddle-ci.gz.bcebos.com/ccache-3.7.9.tar.gz && \ ...@@ -225,6 +228,9 @@ RUN wget https://paddle-ci.gz.bcebos.com/ccache-3.7.9.tar.gz && \
make -j8 && make install && \ make -j8 && make install && \
ln -s /usr/local/ccache-3.7.9/bin/ccache /usr/local/bin/ccache ln -s /usr/local/ccache-3.7.9/bin/ccache /usr/local/bin/ccache
RUN wget --no-check-certificate -q https://paddle-edl.bj.bcebos.com/hadoop-2.7.7.tar.gz && \
tar -xzf hadoop-2.7.7.tar.gz && mv hadoop-2.7.7 /usr/local/
# Configure OpenSSH server. c.f. https://docs.docker.com/engine/examples/running_ssh_service # Configure OpenSSH server. c.f. https://docs.docker.com/engine/examples/running_ssh_service
RUN mkdir /var/run/sshd RUN mkdir /var/run/sshd
RUN echo 'root:root' | chpasswd RUN echo 'root:root' | chpasswd
......
...@@ -107,10 +107,13 @@ WORKDIR /home/setuptools-40.6.2 ...@@ -107,10 +107,13 @@ WORKDIR /home/setuptools-40.6.2
RUN python setup.py build RUN python setup.py build
RUN python setup.py install RUN python setup.py install
WORKDIR /home WORKDIR /home
RUN wget https://files.pythonhosted.org/packages/69/81/52b68d0a4de760a2f1979b0931ba7889202f302072cc7a0d614211bc7579/pip-18.0.tar.gz
RUN tar -zxvf pip-18.0.tar.gz RUN wget https://files.pythonhosted.org/packages/69/81/52b68d0a4de760a2f1979b0931ba7889202f302072cc7a0d614211bc7579/pip-18.0.tar.gz && tar -zxvf pip-18.0.tar.gz
WORKDIR pip-18.0 WORKDIR pip-18.0
RUN python setup.py install RUN python setup.py install && \
python3.7 setup.py install && \
python3.6 setup.py install && \
python3 setup.py install
WORKDIR /home WORKDIR /home
RUN rm Python-$version.tgz setuptools-40.6.2.zip pip-18.0.tar.gz && \ RUN rm Python-$version.tgz setuptools-40.6.2.zip pip-18.0.tar.gz && \
...@@ -228,6 +231,9 @@ RUN wget https://paddle-ci.gz.bcebos.com/ccache-3.7.9.tar.gz && \ ...@@ -228,6 +231,9 @@ RUN wget https://paddle-ci.gz.bcebos.com/ccache-3.7.9.tar.gz && \
make -j8 && make install && \ make -j8 && make install && \
ln -s /usr/local/ccache-3.7.9/bin/ccache /usr/local/bin/ccache ln -s /usr/local/ccache-3.7.9/bin/ccache /usr/local/bin/ccache
RUN wget --no-check-certificate -q https://paddle-edl.bj.bcebos.com/hadoop-2.7.7.tar.gz && \
tar -xzf hadoop-2.7.7.tar.gz && mv hadoop-2.7.7 /usr/local/
# Configure OpenSSH server. c.f. https://docs.docker.com/engine/examples/running_ssh_service # Configure OpenSSH server. c.f. https://docs.docker.com/engine/examples/running_ssh_service
RUN mkdir /var/run/sshd RUN mkdir /var/run/sshd
RUN echo 'root:root' | chpasswd RUN echo 'root:root' | chpasswd
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册