diff --git a/paddle/fluid/framework/io/shell.cc b/paddle/fluid/framework/io/shell.cc index 995d19387ba6b367628198a13930b10c7385f318..937f053bf848cc29261fbac4708e636653803eb4 100644 --- a/paddle/fluid/framework/io/shell.cc +++ b/paddle/fluid/framework/io/shell.cc @@ -21,7 +21,7 @@ namespace framework { std::shared_ptr shell_fopen(const std::string& path, const std::string& mode) { -#if defined _WIN32 || defined __APPLE__ || defined PADDLE_ARM +#if defined(_WIN32) || defined(__APPLE__) || defined(PADDLE_ARM) return nullptr; #else if (shell_verbose()) { @@ -48,7 +48,7 @@ std::shared_ptr shell_fopen(const std::string& path, // The implementation is async signal safe // Mostly copy from CPython code static int close_open_fds_internal() { -#if defined _WIN32 || defined __APPLE__ || defined PADDLE_ARM +#if defined(_WIN32) || defined(__APPLE__) || defined(PADDLE_ARM) return 0; #else struct linux_dirent { @@ -103,8 +103,9 @@ static int close_open_fds_internal() { } static int shell_popen_fork_internal(const char* real_cmd, bool do_read, - int parent_end, int child_end) { -#if defined _WIN32 || defined __APPLE__ + int parent_end, int child_end, + bool redirect_stderr = false) { +#if defined(_WIN32) || defined(__APPLE__) || defined(PADDLE_ARM) return 0; #else int child_pid = -1; @@ -125,18 +126,41 @@ static int shell_popen_fork_internal(const char* real_cmd, bool do_read, if (child_end != child_std_end) { PCHECK(dup2(child_end, child_std_end) == child_std_end); + if (redirect_stderr && do_read) { + PCHECK(dup2(child_end, 2) == 2); + } close(child_end); } close_open_fds_internal(); PCHECK(execl("/bin/bash", "bash", "-c", real_cmd, NULL) >= 0); - exit(127); + // Note: just for compilation. the child don't run this line. + _exit(0); #endif } +static int read_from_pipe(FILE* fp, std::string* output) { + char buf[4096]; + while (1) { + int n = fread(buf, 1, 4096, fp); + if (n <= 0) { + break; + } + + output->append(buf, n); + } + + if (!feof(fp)) { + return -1; + } + + return 0; +} + std::shared_ptr shell_popen(const std::string& cmd, - const std::string& mode, int* err_no) { -#if defined _WIN32 || defined __APPLE__ + const std::string& mode, int* err_no, + int* status, bool redirect_stderr) { +#if defined(_WIN32) || defined(__APPLE__) || defined(PADDLE_ARM) return nullptr; #else bool do_read = mode == "r"; @@ -146,9 +170,7 @@ std::shared_ptr shell_popen(const std::string& cmd, return NULL; } - if (shell_verbose()) { - LOG(INFO) << "Opening pipe[" << cmd << "] with mode[" << mode << "]"; - } + VLOG(3) << "Opening pipe[" << cmd << "] with mode[" << mode << "]"; std::string real_cmd = "set -o pipefail; " + cmd; @@ -168,43 +190,54 @@ std::shared_ptr shell_popen(const std::string& cmd, child_end = pipe_fds[0]; } - int child_pid = shell_popen_fork_internal(real_cmd.c_str(), do_read, - parent_end, child_end); - close(child_end); + sighandler_t old_handler; + old_handler = signal(SIGCHLD, SIG_DFL); + fcntl(parent_end, F_SETFD, FD_CLOEXEC); - FILE* fp; + + int child_pid = shell_popen_fork_internal( + real_cmd.c_str(), do_read, parent_end, child_end, redirect_stderr); + + close(child_end); + + FILE* fp = NULL; if ((fp = fdopen(parent_end, mode.c_str())) == NULL) { *err_no = -1; + signal(SIGCHLD, old_handler); return NULL; } - return {fp, [child_pid, cmd, err_no](FILE* fp) { - if (shell_verbose()) { - LOG(INFO) << "Closing pipe[" << cmd << "]"; - } - if (fclose(fp) != 0) { + return {fp, [cmd, child_pid, old_handler, err_no, status](FILE* fp) { + VLOG(3) << "Closing pipe[" << cmd << "]"; + if (fclose(fp)) { *err_no = -1; } + int wstatus = -1; + // don't do this before parent read data from child pipe + // or when get the large data, it will hang! waitpid(child_pid, &wstatus, 0); - if (wstatus == 0 || wstatus == (128 + SIGPIPE) * 256 || - (wstatus == -1 && errno == ECHILD)) { + + if (status) { + *status = wstatus; + } + + if (WIFEXITED(wstatus) || wstatus == (128 + SIGPIPE) * 256) { } else { + PADDLE_ENFORCE_NE( + errno, ECHILD, + platform::errors::Fatal("Must not be ECHILD errno here!")); *err_no = -1; - LOG(WARNING) << "status[" << wstatus << "], cmd[" << cmd << "]" - << ", err_no[" << *err_no << "]"; - } - if (wstatus == -1 && errno == ECHILD) { - // temporarily remove this warning - // LOG(WARNING) << "errno is ECHILD"; } + + signal(SIGCHLD, old_handler); }}; #endif } static int shell_p2open_fork_internal(const char* real_cmd, int pipein_fds[2], int pipeout_fds[2]) { -#if defined _WIN32 || defined __APPLE__ +#if defined(_WIN32) || defined(__APPLE__) || defined(PADDLE_ARM) return 0; #else int child_pid = -1; @@ -243,7 +276,7 @@ static int shell_p2open_fork_internal(const char* real_cmd, int pipein_fds[2], std::pair, std::shared_ptr> shell_p2open( const std::string& cmd) { -#if defined _WIN32 || defined __APPLE__ +#if defined(_WIN32) || defined(__APPLE__) || defined(PADDLE_ARM) return {}; #else if (shell_verbose()) { @@ -301,51 +334,102 @@ std::pair, std::shared_ptr> shell_p2open( #endif } -std::string shell_get_command_output(const std::string& cmd, int time_out, - int sleep_inter, bool print_cmd) { -#if defined _WIN32 || defined __APPLE__ +#if defined(_WIN32) || defined(__APPLE__) || defined(PADDLE_ARM) +#else +static int _get_err_no(int err_no, int status) { + if (err_no == 0) { + if (WIFEXITED(status)) { + return WEXITSTATUS(status); + } + return -1; + } + + return err_no; +} +#endif + +static int _shell_execute_cmd(const std::string& cmd, std::string* output, + int time_out, int sleep_inter, + bool redirect_stderr = false) { +#if defined(_WIN32) || defined(__APPLE__) || defined(PADDLE_ARM) PADDLE_THROW(platform::errors::Unimplemented( "This function(shell_get_command_output) is not implemented under _WIN32 " "or __APPLE__.")); #else int err_no = 0; + int status = 0; + int cmd_status = 0; platform::Timer timer; do { - if (print_cmd) { - LOG(INFO) << "exec cmd:[" << cmd << "]"; - } + VLOG(3) << "exec cmd:[" << cmd << "]"; + err_no = 0; - std::shared_ptr pipe = shell_popen(cmd, "r", &err_no); - string::LineFileReader reader; + status = 0; + *output = ""; + auto pipe = shell_popen(cmd, "r", &err_no, &status, redirect_stderr); - char* buf = reader.getdelim(&*pipe, 0); if (err_no == 0) { - if (buf) { - return reader.get(); + // read file + err_no = read_from_pipe(&*pipe, output); + if (err_no) { + LOG(WARNING) << "status[" << status << "], cmd[" << cmd << "]" + << ", err_no[" << err_no << "]"; } - return ""; } - if (sleep_inter > 0) { - usleep(sleep_inter); + // close file and etc. + pipe = nullptr; + if (err_no) { + LOG(WARNING) << "status[" << status << "], cmd[" << cmd << "]" + << ", err_no[" << err_no << "]"; + } + + cmd_status = _get_err_no(err_no, status); + // cmd run ok! + if (cmd_status == 0) { + return cmd_status; } + // time out timer.Pause(); - if (time_out > 0 && timer.ElapsedMS() >= time_out) { - PADDLE_THROW(paddle::platform::errors::ExecutionTimeout( - "shell_get_command_output execute error errno:%d and try until " - "timeout.", - errno)); - return ""; + if ((time_out > 0 && timer.ElapsedMS() >= time_out) || time_out == 0) { + break; } timer.Resume(); - pipe = nullptr; - } while (err_no); + if (sleep_inter > 0) { + usleep(sleep_inter * 1000); + } + } while (cmd_status); + + // log when check timeout! + if (time_out != 0) { + *output += string::Sprintf( + " _shell_execute_cmd execute cmd:%s ElapsedMS:%d, err_no:%d status:%d", + cmd, timer.ElapsedMS(), err_no, cmd_status); + LOG(WARNING) << *output; + } + + return cmd_status; - return ""; #endif } +std::string shell_get_command_output(const std::string& cmd, int time_out, + int sleep_inter) { + std::string output; + _shell_execute_cmd(cmd, &output, time_out, sleep_inter); + return output; +} + +std::vector shell_execute_cmd(const std::string& cmd, int time_out, + int sleep_inter, + bool redirect_stderr) { + std::string output; + int ret = + _shell_execute_cmd(cmd, &output, time_out, sleep_inter, redirect_stderr); + return std::vector({string::Sprintf("%d", ret), output}); +} + } // end namespace framework } // end namespace paddle diff --git a/paddle/fluid/framework/io/shell.h b/paddle/fluid/framework/io/shell.h index 194b1c0edafc328a43fe6f733af1b76986cea38c..5b3e9a4df1d11b957d656181844f17a06574556f 100644 --- a/paddle/fluid/framework/io/shell.h +++ b/paddle/fluid/framework/io/shell.h @@ -28,6 +28,7 @@ #include #include #include +#include #include "paddle/fluid/platform/port.h" #include "paddle/fluid/string/string_helper.h" @@ -51,8 +52,10 @@ inline void shell_set_verbose(bool x) { shell_verbose_internal() = x; } extern std::shared_ptr shell_fopen(const std::string& path, const std::string& mode); -extern std::shared_ptr shell_popen(const std::string& cmd, - const std::string& mode, int* err_no); +std::shared_ptr shell_popen(const std::string& cmd, + const std::string& mode, int* err_no, + int* status = NULL, + bool redirect_stderr = false); extern std::pair, std::shared_ptr> shell_p2open( const std::string& cmd); @@ -65,12 +68,17 @@ inline void shell_execute(const std::string& cmd) { } while (err_no == -1); } -// timeout:ms, default -1 means forever. +// time_out:ms, default value:-1 means forever. // sleep_inter:ms, default -1 means not sleep. extern std::string shell_get_command_output(const std::string& cmd, - int time_out = -1, - int sleep_inter = -1, - bool print_cmd = false); + int time_out = 10 * 60 * 1000, + int sleep_inter = 1000); +// time_out:ms, default -1 means forever. +// sleep_inter:ms, default -1 means not sleep. +extern std::vector shell_execute_cmd(const std::string& cmd, + int time_out = 0, + int sleep_inter = 0, + bool redirect_stderr = false); } // namespace framework } // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index ce9b143db1dd286bcff87b0afff1572e285d6c3f..79ee871ee882d864fd41363c733b2bc09d4cebf9 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1563,6 +1563,15 @@ All parameter, weight, gradient are variables in Paddle. sleep_inter); }, py::arg("cmd"), py::arg("time_out") = -1, py::arg("sleep_inter") = -1); + m.def("shell_execute_cmd", + [](const std::string &cmd, int time_out = 0, int sleep_inter = 0, + bool redirect_stderr = false) -> std::vector { + return paddle::framework::shell_execute_cmd( + cmd, time_out, sleep_inter, redirect_stderr); + }, + py::arg("cmd"), py::arg("time_out") = 0, py::arg("sleep_inter") = 0, + py::arg("redirect_stderr") = false); + #ifdef PADDLE_WITH_CUDA m.def("is_float16_supported", [](const platform::CUDAPlace &place) -> bool { // Only GPUs with Compute Capability >= 53 support float16 diff --git a/python/paddle/distributed/__init__.py b/python/paddle/distributed/__init__.py index 1e71b316b06779ef861c9fb9612e30a62f810f7d..d0c32e26092f6ea25771279418582a24ea449ab2 100644 --- a/python/paddle/distributed/__init__.py +++ b/python/paddle/distributed/__init__.py @@ -11,6 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from . import fs_wrapper - -__all__ = fs_wrapper.__all__ diff --git a/python/paddle/distributed/fs_wrapper.py b/python/paddle/distributed/fs_wrapper.py deleted file mode 100644 index d73d144e1c47544a214d73ac677f12e71230d058..0000000000000000000000000000000000000000 --- a/python/paddle/distributed/fs_wrapper.py +++ /dev/null @@ -1,225 +0,0 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import paddle.fluid as fluid -import sys -import abc -import os -from pathlib import PurePosixPath -import shutil - -__all__ = ['FS', 'LocalFS', 'BDFS'] - - -class FS(object): - @abc.abstractmethod - def list_dirs(self, fs_path): - pass - - @abc.abstractmethod - def ls_dir(self, fs_path): - pass - - @abc.abstractmethod - def stat(self, fs_path): - pass - - @abc.abstractmethod - def upload(self, local_path, fs_path): - pass - - @abc.abstractmethod - def download(self, fs_path, local_path): - pass - - @abc.abstractmethod - def mkdir(self, fs_path): - pass - - @abc.abstractmethod - def mv(self, fs_src_path, fs_dst_path): - pass - - @abc.abstractmethod - def rmr(self, fs_path): - pass - - @abc.abstractmethod - def rm(self, fs_path): - pass - - @abc.abstractmethod - def delete(self, fs_path): - pass - - @abc.abstractmethod - def need_upload_download(self): - pass - - -class LocalFS(FS): - def list_dirs(self, fs_path): - if not self.stat(fs_path): - return [] - - return [ - f for f in os.listdir(fs_path) if os.path.isdir(fs_path + "/" + f) - ] - - def ls_dir(self, fs_path): - return [f for f in os.listdir(fs_path)] - - def stat(self, fs_path): - return os.path.exists(fs_path) - - def mkdir(self, fs_path): - assert not os.path.isfile(fs_path), "{} is already a file".format( - fs_path) - os.system("mkdir -p {}".format(fs_path)) - - def mv(self, fs_src_path, fs_dst_path): - os.rename(fs_src_path, fs_dst_path) - - def rmr(self, fs_path): - shutil.rmtree(fs_path) - - def rm(self, fs_path): - os.remove(fs_path) - - def delete(self, fs_path): - if not self.stat(fs_path): - return - - if os.path.isfile(fs_path): - return self.rm(fs_path) - - return self.rmr(fs_path) - - def need_upload_download(self): - return False - - -class BDFS(FS): - def __init__(self, - hdfs_name, - hdfs_ugi, - time_out=20 * 60 * 1000, - sleep_inter=1000): - self._base_cmd = "hadoop fs -Dfs.default.name=\"{}\" -Dhadoop.job.ugi=\"{}\"".format( - hdfs_name, hdfs_ugi) - self._time_out = time_out - self._sleep_inter = sleep_inter - - def _run_cmd(self, cmd): - ret = fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter) - if len(ret) <= 0: - return [] - - lines = ret.splitlines() - return lines - - def list_dirs(self, fs_path): - if not self.stat(fs_path): - return [] - - dirs, _ = self.ls_dir(fs_path) - return dirs - - def ls_dir(self, fs_path): - """ - list directory under fs_path, and only give the pure name, not include the fs_path - """ - cmd = "{} -ls {}".format(self._base_cmd, fs_path) - lines = self._run_cmd(cmd) - - dirs = [] - files = [] - for line in lines: - arr = line.split() - if len(arr) != 8: - continue - - if fs_path not in arr[7]: - continue - - p = PurePosixPath(arr[7]) - if arr[0][0] == 'd': - dirs.append(p.name) - else: - files.append(p.name) - - return dirs, files - - def is_dir(self, fs_path): - cmd = "{} -test -d {} ; echo $?".format(self._base_cmd, fs_path) - - test = self._run_cmd(cmd) - if test[0].strip() == "0": - return True - - return False - - def stat(self, fs_path): - cmd = "{} -test -e {} ; echo $?".format(self._base_cmd, fs_path) - - test = self._run_cmd(cmd) - if test[0].strip() == "0": - return True - - return False - - def upload(self, local_path, fs_path): - cmd = "{} -put {} {}".format(self._base_cmd, local_path, fs_path) - fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter) - - def download(self, fs_path, local_path): - cmd = "{} -get {} {}/".format(self._base_cmd, fs_path, local_path) - fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter) - - def mkdir(self, fs_path): - - if not self.stat(fs_path): - cmd = "{} -mkdir {}".format(self._base_cmd, fs_path) - fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter) - - def mv(self, fs_src_path, fs_dst_path): - cmd = "{} -mv {} {}".format(self._base_cmd, fs_src_path, fs_dst_path) - fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter) - - def rmr(self, fs_path): - if not self.stat(fs_path): - return - - cmd = "{} -rmr {}".format(self._base_cmd, fs_path) - return fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter) - - def rm(self, fs_path): - if not self.stat(fs_path): - return - - cmd = "{} -rm {}".format(self._base_cmd, fs_path) - return fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter) - - def delete(self, fs_path): - if not self.stat(fs_path): - return - - is_dir = self.is_dir(fs_path) - if is_dir: - return self.rmr(fs_path) - - return self.rm(fs_path) - - def need_upload_download(self): - return True diff --git a/python/paddle/distributed/utils.py b/python/paddle/distributed/utils.py index 30a928cc9f86a842695bb90a7838d68ac291d9d8..0bfd75b4994402359651be3bd6247847a6427ffb 100644 --- a/python/paddle/distributed/utils.py +++ b/python/paddle/distributed/utils.py @@ -143,7 +143,7 @@ class Trainer(object): return False if self.endpoint != t.endpoint or \ - self.rank != t.rank : + self.rank != t.rank: return False for a, b in zip(self.gpus, t.gpus): diff --git a/python/paddle/fluid/incubate/fleet/collective/__init__.py b/python/paddle/fluid/incubate/fleet/collective/__init__.py index e6304473b6507de22805f0a9bd5088dbe5a29ecd..4bcd5196a3b3cc5a11a93d27d4dc3dffe61e8644 100644 --- a/python/paddle/fluid/incubate/fleet/collective/__init__.py +++ b/python/paddle/fluid/incubate/fleet/collective/__init__.py @@ -26,7 +26,7 @@ from paddle.fluid.incubate.fleet.base.fleet_base import Mode from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer from paddle.fluid import compiler -from paddle.distributed.fs_wrapper import LocalFS, BDFS +from paddle.fluid.incubate.fleet.utils.fs import LocalFS import os import sys @@ -70,7 +70,7 @@ class Collective(Fleet): self._origin_program = None self._transpiled_program = None self.main_program = None - self._checkoint_prefix = "__paddle_fleet_checkpoint__" + self._checkpoint_prefix = "__paddle_fleet_checkpoint__" self._param_file_name = "_paddle_fleet_param__" def init_worker(self): @@ -186,8 +186,8 @@ class Collective(Fleet): max_no = -1 d = {} dirs = fs.list_dirs(root_path) - for dir in dirs: - g = dir.split(".") + for d in dirs: + g = d.split(".") if len(g) != 2: continue @@ -203,10 +203,10 @@ class Collective(Fleet): return max_no - def clean_redundant_check_points(self, - root_path, - fs=LocalFS(), - checkpoint_num=1): + def clean_redundant_checkpoints(self, + root_path, + fs=LocalFS(), + checkpoint_num=1): max_no = self._get_last_checkpoint_no(root_path, fs) if max_no < 0: return @@ -215,32 +215,32 @@ class Collective(Fleet): checkpoint_num = 1 dirs = fs.list_dirs(root_path) - for dir in dirs: - g = dir.split(".") + for d in dirs: + g = d.split(".") if len(g) != 2: continue - if g[0] != self._checkoint_prefix: + if g[0] != self._checkpoint_prefix: continue try: n = int(g[1]) if n <= max_no - checkpoint_num: - path = "{}/{}.{}".format(root_path, self._checkoint_prefix, + path = "{}/{}.{}".format(root_path, self._checkpoint_prefix, n) - fs.rmr(path) + fs.delete(path) except Exception as e: print(e) continue - def save_check_point(self, - executor, - path, - train_status, - main_program=None, - fs=LocalFS(), - local_cache_path=".cache", - remain_all_checkpoint=True): + def save_checkpoint(self, + executor, + path, + train_status, + main_program=None, + fs=LocalFS(), + local_cache_path=".cache", + remain_all_checkpoint=True): """ This function save persistables and current epoch num to path. """ @@ -248,14 +248,16 @@ class Collective(Fleet): if main_program == None: main_program = self._transpiled_program - if not fs.stat(path): - fs.mkdir(path) + if not fs.is_exist(path): + fs.mkdirs(path) + else: + assert fs.is_dir(path), "path:%s must be a directory".format(path) max_no = self._get_last_checkpoint_no(path, fs=fs) if max_no < 0: max_no = -1 - real_path = "{}/{}.{}".format(path, self._checkoint_prefix, max_no + 1) + real_path = "{}/{}.{}".format(path, self._checkpoint_prefix, max_no + 1) tmp_path = "{}.tmp".format(real_path) saved_path = tmp_path @@ -264,9 +266,14 @@ class Collective(Fleet): cache_path = None if fs.need_upload_download(): cache_path = "{}/{}.{}.saved_cache".format( - local_cache_path, self._checkoint_prefix, max_no + 1) - if not local_fs.stat(cache_path): - local_fs.mkdir(cache_path) + local_cache_path, self._checkpoint_prefix, max_no + 1) + if not local_fs.is_exist(cache_path): + local_fs.mkdirs(cache_path) + else: + assert fs.is_dir( + path), "cache path:{} must be a directory".format( + cache_path) + saved_path = cache_path self.save_persistables( @@ -282,16 +289,16 @@ class Collective(Fleet): fs.mv(tmp_path, real_path) if not remain_all_checkpoint: - self.clean_redundant_check_points(path) - - def load_check_point(self, - executor, - path, - trainer_id, - main_program=None, - fs=LocalFS(), - local_cache_path=".cache", - ignore_empty=True): + self.clean_redundant_checkpoints(path) + + def load_checkpoint(self, + executor, + path, + trainer_id, + main_program=None, + fs=LocalFS(), + local_cache_path=".cache", + ignore_empty=True): """ This function load persistables and current epoch num from path. """ @@ -306,11 +313,13 @@ class Collective(Fleet): local_fs = LocalFS() if fs.need_upload_download(): cache_path = "{}/{}.{}.load_cache.{}".format( - local_cache_path, self._checkoint_prefix, max_no, trainer_id) - if local_fs.stat(cache_path): + local_cache_path, self._checkpoint_prefix, max_no, trainer_id) + if not local_fs.is_exist(local_cache_path): + local_fs.mkdirs(local_cache_path) + if local_fs.is_exist(cache_path): local_fs.delete(cache_path) - real_path = "{}/{}.{}".format(path, self._checkoint_prefix, max_no) + real_path = "{}/{}.{}".format(path, self._checkpoint_prefix, max_no) load_path = real_path if fs.need_upload_download(): fs.download(real_path, cache_path) diff --git a/python/paddle/fluid/incubate/fleet/utils/fs.py b/python/paddle/fluid/incubate/fleet/utils/fs.py new file mode 100644 index 0000000000000000000000000000000000000000..4782c2f8d90c84027f9fd1d11022425a9ed6c84b --- /dev/null +++ b/python/paddle/fluid/incubate/fleet/utils/fs.py @@ -0,0 +1,164 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import sys +import subprocess +import multiprocessing +from datetime import datetime + +import re +import copy +import errno +import time +import logging +import abc +from pathlib import PurePosixPath, Path +import shutil + +__all__ = ['FS', 'LocalFS'] + + +class ExecuteError(Exception): + pass + + +class FSFileExistsError(Exception): + pass + + +class FSFileNotExistsError(Exception): + pass + + +class FSTimeOut(Exception): + pass + + +class FS(object): + @abc.abstractmethod + def ls_dir(self, fs_path): + raise NotImplementedError + + @abc.abstractmethod + def is_file(self, fs_path): + raise NotImplementedError + + @abc.abstractmethod + def is_dir(self, fs_path): + raise NotImplementedError + + @abc.abstractmethod + def is_exist(self, fs_path): + raise NotImplementedError + + @abc.abstractmethod + def upload(self, local_path, fs_path): + raise NotImplementedError + + @abc.abstractmethod + def download(self, fs_path, local_path): + raise NotImplementedError + + @abc.abstractmethod + def mkdirs(self, fs_path): + raise NotImplementedError + + @abc.abstractmethod + def delete(self, fs_path): + raise NotImplementedError + + @abc.abstractmethod + def need_upload_download(self): + raise NotImplementedError + + @abc.abstractmethod + def rename(self, fs_src_path, fs_dst_path): + raise NotImplementedError + + @abc.abstractmethod + def mv(self, fs_src_path, fs_dst_path): + raise NotImplementedError + + @abc.abstractmethod + def upload_dir(self, local_dir, dest_dir): + raise NotImplementedError + + @abc.abstractmethod + def list_dirs(self, fs_path): + raise NotImplementedError + + +class LocalFS(FS): + def ls_dir(self, fs_path): + return [f for f in os.listdir(fs_path)] + + def mkdirs(self, fs_path): + assert not os.path.isfile(fs_path), "{} is already a file".format( + fs_path) + os.system("mkdir -p {}".format(fs_path)) + + def rename(self, fs_src_path, fs_dst_path): + os.rename(fs_src_path, fs_dst_path) + + def _rmr(self, fs_path): + shutil.rmtree(fs_path) + + def _rm(self, fs_path): + os.remove(fs_path) + + def delete(self, fs_path): + if not self.is_exist(fs_path): + return + + if os.path.isfile(fs_path): + return self._rm(fs_path) + + return self._rmr(fs_path) + + def need_upload_download(self): + return False + + def is_file(self, fs_path): + return os.path.isfile(fs_path) + + def is_dir(self, fs_path): + return os.path.isdir(fs_path) + + def is_exist(self, fs_path): + return os.path.exists(fs_path) + + def touch(self, fs_path): + return Path(fs_path).touch() + + def mv(self, src_path, dst_path): + if not self.is_exist(src_path): + raise FSFileNotExistsError + + if self.is_exist(dst_path): + raise FSFileExistsError + + return self.rename(src_path, dst_path) + + def list_dirs(self, fs_path): + """ + list directory under fs_path, and only give the pure name, not include the fs_path + """ + if not self.is_exist(fs_path): + return [] + + dirs = [ + f for f in os.listdir(fs_path) if os.path.isdir(fs_path + "/" + f) + ] + + return dirs diff --git a/python/paddle/fluid/incubate/fleet/utils/hdfs.py b/python/paddle/fluid/incubate/fleet/utils/hdfs.py index c16d7e3cc458f3f2052507497a3ba7ebaec3d042..27f68076f27feb32f823977274d2b33bccf3eb1c 100644 --- a/python/paddle/fluid/incubate/fleet/utils/hdfs.py +++ b/python/paddle/fluid/incubate/fleet/utils/hdfs.py @@ -24,596 +24,221 @@ import copy import errno import time import logging +import six +from . import fs +from .fs import FS, LocalFS, FSFileExistsError, FSFileNotExistsError, ExecuteError, FSTimeOut +import paddle.fluid as fluid +import functools -__all__ = ["HDFSClient"] - - -def get_logger(name, level, fmt): - logger = logging.getLogger(name) - logger.setLevel(level) - handler = logging.FileHandler('hdfs.log', mode='w') - formatter = logging.Formatter(fmt=fmt) - handler.setFormatter(formatter) - logger.addHandler(handler) - return logger - - -_logger = get_logger( - __name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s') - +from pathlib import PurePosixPath, Path +import shutil -class HDFSClient(object): - """ - A tool of HDFS - - Args: - hadoop_home (string): hadoop_home - configs (dict): hadoop config, it is a dict, please contain \ - key "fs.default.name" and "hadoop.job.ugi" - Can be a float value - Examples: - hadoop_home = "/home/client/hadoop-client/hadoop/" - - configs = { - "fs.default.name": "hdfs://xxx.hadoop.com:54310", - "hadoop.job.ugi": "hello,hello123" - } +__all__ = ["HDFSClient"] - client = HDFSClient(hadoop_home, configs) - client.ls("/user/com/train-25") - files = client.lsr("/user/com/train-25/models") - """ +def _handle_errors(f): + def handler(*args, **kwargs): + start = time.time() + while True: + try: + return f(*args, **kwargs) + except ExecuteError as e: + o = args[0] + time_out = float(o._time_out) / 1000.0 + inter = float(o._sleep_inter) / 1000.0 + if time.time() - start >= time_out: + raise FSTimeOut + time.sleep(inter) + + return functools.wraps(f)(handler) + + +class HDFSClient(FS): + def __init__( + self, + hadoop_home, + configs, + time_out=5 * 60 * 1000, #ms + sleep_inter=1000): #ms + # Raise exception if JAVA_HOME not exists. + java_home = os.environ["JAVA_HOME"] - def __init__(self, hadoop_home, configs): self.pre_commands = [] hadoop_bin = '%s/bin/hadoop' % hadoop_home self.pre_commands.append(hadoop_bin) dfs = 'fs' self.pre_commands.append(dfs) - for k, v in configs.iteritems(): - config_command = '-D%s=%s' % (k, v) - self.pre_commands.append(config_command) - - def __run_hdfs_cmd(self, commands, retry_times=5): - whole_commands = copy.deepcopy(self.pre_commands) - whole_commands.extend(commands) - - ret_code = 0 - ret_out = None - ret_err = None - retry_sleep_second = 3 - whole_commands = " ".join(whole_commands) - for x in range(retry_times + 1): - proc = subprocess.Popen( - whole_commands, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True) - (output, errors) = proc.communicate() - ret_code, ret_out, ret_err = proc.returncode, output, errors - - _logger.info( - 'Times: %d, Running command: %s. Return code: %d, Msg: %s' % - (x, whole_commands, proc.returncode, errors)) - - if ret_code == 0: - break - time.sleep(retry_sleep_second) - - return ret_code, ret_out, ret_err - - def cat(self, hdfs_path=None): - """ - cat hdfs file - Args: - hdfs_path(str): the hdfs file path - Returns: - file content - """ - if self.is_file(hdfs_path): - exist_cmd = ['-cat', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd( - exist_cmd, retry_times=1) - if returncode != 0: - _logger.error("HDFS cat HDFS path: {} failed".format(hdfs_path)) - return "" - else: - _logger.info("HDFS cat HDFS path: {} succeed".format(hdfs_path)) - return output.strip() + if configs: + for k, v in six.iteritems(configs): + config_command = '-D%s=%s' % (k, v) - else: - return "" + self._time_out = time_out + self._sleep_inter = sleep_inter + self._base_cmd = " ".join(self.pre_commands) + self._bd_err_re = re.compile( + r'\s?responseErrorMsg\s?\:.*, errorCode\:\s?[0-9]+, path\:') - def is_exist(self, hdfs_path=None): - """ - whether the remote HDFS path exists - - Args: - hdfs_path(str): the hdfs file path - - Returns: - True or False - """ - exist_cmd = ['-test', '-e', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd( - exist_cmd, retry_times=1) - - if returncode: - _logger.error("HDFS is_exist HDFS path: {} failed".format( - hdfs_path)) - return False - else: - _logger.info("HDFS is_exist HDFS path: {} successfully".format( - hdfs_path)) - return True + def _run_cmd(self, cmd, redirect_stderr=False): + ret, output = fluid.core.shell_execute_cmd(cmd, 0, 0, redirect_stderr) + return int(ret), output.splitlines() - def is_dir(self, hdfs_path=None): - """ - whether the remote HDFS path is directory + def list_dirs(self, fs_path): + if not self.is_exist(fs_path): + return [] - Args: - hdfs_path(str): the hdfs file path + dirs, _ = self.ls_dir(fs_path) + return dirs - Returns: - True or False + @_handle_errors + def ls_dir(self, fs_path): + """ + list directory under fs_path, and only give the pure name, not include the fs_path """ + if not self.is_exist(fs_path): + return [], [] - if not self.is_exist(hdfs_path): - return False + cmd = "{} -ls {}".format(self._base_cmd, fs_path) + ret, lines = self._run_cmd(cmd) - dir_cmd = ['-test', '-d', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd(dir_cmd, retry_times=1) + if ret != 0: + raise ExecuteError - if returncode: - _logger.error("HDFS path: {} failed is not a directory".format( - hdfs_path)) - return False - else: - _logger.info("HDFS path: {} successfully is a directory".format( - hdfs_path)) - return True + dirs = [] + files = [] + for line in lines: + arr = line.split() + if len(arr) != 8: + continue - def is_file(self, hdfs_path=None): - """ - whether the remote HDFS path is file + if fs_path not in arr[7]: + continue - Args: - hdfs_path(str): the hdfs file path + p = PurePosixPath(arr[7]) + if arr[0][0] == 'd': + dirs.append(p.name) + else: + files.append(p.name) - Returns: - True or False - """ + return dirs, files - if not self.is_exist(hdfs_path): - return False + def _test_match(self, lines): + for l in lines: + m = self._bd_err_re.match(l) + if m != None: + return m - dir_cmd = ['-test', '-d', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd(dir_cmd, retry_times=1) + return None - if returncode == 0: - _logger.error("HDFS path: {} failed is not a file".format( - hdfs_path)) + @_handle_errors + def is_dir(self, fs_path): + if not self.is_exist(fs_path): return False - else: - _logger.info("HDFS path: {} successfully is a file".format( - hdfs_path)) - return True - - def delete(self, hdfs_path): - """ - Remove a file or directory from HDFS. - - whether the remote HDFS path exists - - Args: - hdfs_path(str): HDFS path. - - Returns: - True or False - This function returns `True` if the deletion was successful and `False` if - no file or directory previously existed at `hdfs_path`. - """ - _logger.info('Deleting %r.', hdfs_path) - - if not self.is_exist(hdfs_path): - _logger.warn("HDFS path: {} do not exist".format(hdfs_path)) - return True - - if self.is_dir(hdfs_path): - del_cmd = ['-rmr', hdfs_path] - else: - del_cmd = ['-rm', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd(del_cmd, retry_times=0) + cmd = "{} -test -d {}".format( + self._base_cmd, fs_path, redirect_stderr=True) + ret, lines = self._run_cmd(cmd) + if ret: + # other error + if self._test_match(lines) != None: + raise ExecuteError - if returncode: - _logger.error("HDFS path: {} delete files failure".format( - hdfs_path)) return False - else: - _logger.info("HDFS path: {} delete files successfully".format( - hdfs_path)) - return True - - def rename(self, hdfs_src_path, hdfs_dst_path, overwrite=False): - """ - Move a file or folder on HDFS. - - Args: - hdfs_src_path(str): HDFS path - hdfs_dst_path(str): HDFS path - overwrite(bool|False): If the path already exists and overwrite is - False, will return False. - Returns: - True or False - """ - assert hdfs_src_path is not None - assert hdfs_dst_path is not None - - if not self.is_exist(hdfs_src_path): - _logger.info("HDFS path do not exist: {}".format(hdfs_src_path)) - if self.is_exist(hdfs_dst_path) and not overwrite: - _logger.error("HDFS path is exist: {} and overwrite=False".format( - hdfs_dst_path)) - - rename_command = ['-mv', hdfs_src_path, hdfs_dst_path] - returncode, output, errors = self.__run_hdfs_cmd( - rename_command, retry_times=1) - - if returncode: - _logger.error("HDFS rename path: {} to {} failed".format( - hdfs_src_path, hdfs_dst_path)) - return False - else: - _logger.info("HDFS rename path: {} to {} successfully".format( - hdfs_src_path, hdfs_dst_path)) - return True - - @staticmethod - def make_local_dirs(local_path): - """ - create a directory local, is same to mkdir - - Args: - local_path(str): local path that wants to create a directory. - """ - try: - os.makedirs(local_path) - except OSError as e: - if e.errno != errno.EEXIST: - raise - - def makedirs(self, hdfs_path): - """ - Create a remote directory, recursively if necessary. - - Args: - hdfs_path(str): Remote path. Intermediate directories will be - created appropriately. - Returns: - True or False - """ - _logger.info('Creating directories to %r.', hdfs_path) - assert hdfs_path is not None - - if self.is_exist(hdfs_path): - _logger.error("HDFS path is exist: {}".format(hdfs_path)) - return - - mkdirs_commands = ['-mkdir', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd( - mkdirs_commands, retry_times=1) + return True - if returncode: - _logger.error("HDFS mkdir path: {} failed".format(hdfs_path)) + def is_file(self, fs_path): + if not self.is_exist(fs_path): return False - else: - _logger.info("HDFS mkdir path: {} successfully".format(hdfs_path)) - return True - - def ls(self, hdfs_path): - """ - ls directory contents about HDFS hdfs_path - - Args: - hdfs_path(str): Remote HDFS path will be ls. - - Returns: - List: a contents list about hdfs_path. - """ - assert hdfs_path is not None - - if not self.is_exist(hdfs_path): - return [] - ls_commands = ['-ls', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd( - ls_commands, retry_times=10) + return not self.is_dir(fs_path) - if returncode: - _logger.error("HDFS list path: {} failed".format(hdfs_path)) - return [] - else: - _logger.info("HDFS list path: {} successfully".format(hdfs_path)) - - ret_lines = [] - regex = re.compile('\s+') - out_lines = output.strip().split("\n") - for line in out_lines: - re_line = regex.split(line) - if len(re_line) == 8: - ret_lines.append(re_line[7]) - return ret_lines - - def lsr(self, hdfs_path, excludes=[]): - """ - list directory contents about HDFS hdfs_path recursively - - Args: - hdfs_path(str): Remote HDFS path. - excludes(list): excludes - - Returns: - List: a contents list about hdfs_path. - """ - - assert hdfs_path is not None - - if not self.is_exist(hdfs_path): - return [] - - ls_commands = ['-lsr', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd( - ls_commands, retry_times=1) - - if returncode: - _logger.error("HDFS list all files: {} failed".format(hdfs_path)) - return [] - else: - _logger.info("HDFS list all files: {} successfully".format( - hdfs_path)) - lines = [] - regex = re.compile('\s+') - out_lines = output.strip().split("\n") - for line_id, line in enumerate(out_lines): - re_line = regex.split(line) - if len(re_line) == 8: - if re_line[0][0] == "d": - continue - if re_line[7] in excludes: - continue - else: - lines.append((re_line[7], re_line[5] + " " + re_line[6], - line_id)) - lines = sorted(lines, key=lambda line: line[2]) - ret_lines = [ret[0] for ret in lines] - return ret_lines - - @staticmethod - def split_files(files, trainer_id, trainers): - """ - split file list - - Args: - files(list): file list - trainer_id(int): trainer mpi rank id - trainers(int): all trainers num - - Returns: - fileist(list): file list of current trainer - """ - remainder = len(files) % trainers - blocksize = len(files) / trainers - - blocks = [blocksize] * trainers - for i in range(remainder): - blocks[i] += 1 - - trainer_files = [[]] * trainers - begin = 0 - for i in range(trainers): - trainer_files[i] = files[begin:begin + blocks[i]] - begin += blocks[i] - - return trainer_files[trainer_id] - - def download(self, - hdfs_path, - local_path, - multi_processes=5, - overwrite=False, - retry_times=5): - """ - Download files from HDFS using multi process. - - Args: - hdfs_path(str): path on hdfs - local_path(str): path on local - multi_processes(int|5): the download data process at the same time, default=5 - overwrite(bool): is overwrite - retry_times(int): retry times - - Returns: - List: - Download files in local folder. - """ - - def __subprocess_download(local_path, datas): - """ - download file from HDFS - - Args: - hdfs_path(str): the hdfs file path - local_path(str): the local file path - overwrite(bool|None): will overwrite the file on HDFS or not - retry_times(int|5): retry times - - Returns: - True or False - """ - for data in datas: - download_commands = ["-get", data, local_path] - - returncode, output, errors = self.__run_hdfs_cmd( - download_commands, retry_times=retry_times) - - if returncode: - _logger.error( - "Get local path: {} from HDFS path: {} failed".format( - local_path, hdfs_path)) + @_handle_errors + def is_exist(self, fs_path): + cmd = "{} -ls {} ".format(self._base_cmd, fs_path) + ret, out = self._run_cmd(cmd, redirect_stderr=True) + if ret != 0: + for l in out: + if "No such file or directory" in l: return False - return True - - self.make_local_dirs(local_path) - - all_files = self.ls(hdfs_path) - - procs = [] - for i in range(multi_processes): - process_datas = HDFSClient.split_files(all_files, i, - multi_processes) - p = multiprocessing.Process( - target=__subprocess_download, - args=( - local_path, - process_datas, )) - procs.append(p) - p.start() - - # complete the processes - for proc in procs: - proc.join() - - _logger.info("Finish {} multi process to download datas".format( - multi_processes)) - - local_downloads = [] - for dirname, folder, files in os.walk(local_path): - for i in files: - t = os.path.join(dirname, i) - local_downloads.append(t) - return local_downloads - - def upload(self, - hdfs_path, - local_path, - multi_processes=5, - overwrite=False, - retry_times=5): - """ - Upload files to HDFS using multi process. - - Args: - hdfs_path(str): path on hdfs - local_path(str): path on local - multi_processes(int|5): the upload data process at the same time, default=5 - overwrite(bool|False): will overwrite file on HDFS or not - retry_times(int): upload file max retry time. - - Returns: - None - """ + raise ExecuteError - def __subprocess_upload(hdfs_path_single, datas): - for data in datas: - put_commands = ["-put", data, hdfs_path_single] - returncode, output, errors = self.__run_hdfs_cmd(put_commands, - retry_times) + return True - if returncode: - _logger.error("Put local path: {} to HDFS path: {} failed". - format(data, hdfs_path_single)) - return False - return True + @_handle_errors + def upload(self, local_path, fs_path): + if self.is_exist(fs_path): + raise FSFileExistsError - def get_local_files(path): - """ - get local files + local = LocalFS() + if not local.is_exist(local_path): + raise FSFileNotExistsError - Args: - path(str): local path + cmd = "{} -put {} {}".format(self._base_cmd, local_path, fs_path) + ret, lines = self._run_cmd(cmd) + if ret != 0: + raise ExecuteError - Returns: - list of local files - """ - rlist = [] + @_handle_errors + def download(self, fs_path, local_path): + if self.is_exist(local_path): + raise FSFileExistsError - if not os.path.exists(path): - return rlist + if not self.is_exist(fs_path): + raise FSFileNotExistsError - if os.path.isdir(path): - for file in os.listdir(path): - t = os.path.join(path, file) - rlist.append(t) - else: - rlist.append(path) - return rlist + cmd = "{} -get {} {}".format(self._base_cmd, fs_path, local_path) + ret, lines = self._run_cmd(cmd) + if ret != 0: + raise ExecuteError - all_files = get_local_files(local_path) - if not all_files: - _logger.info("there are nothing need to upload, exit") + @_handle_errors + def mkdirs(self, fs_path): + if self.is_exist(fs_path): return - if self.is_exist(hdfs_path) and overwrite: - self.delete(hdfs_path) - self.makedirs(hdfs_path) - - procs = [] - for i in range(multi_processes): - process_datas = HDFSClient.split_files(all_files, i, - multi_processes) - p = multiprocessing.Process( - target=__subprocess_upload, args=( - hdfs_path, - process_datas, )) - procs.append(p) - p.start() - - # complete the processes - for proc in procs: - proc.join() - - _logger.info("Finish upload datas from {} to {}".format(local_path, - hdfs_path)) - - def upload_dir(self, dest_dir, local_dir, overwrite=False): - """ - upload dir to hdfs - Args: - dest_dir(str): hdfs dest dir - local_dir(str): hdfs local dir - overwrite(bool): is overwrite - Returns: - return code - """ - local_dir = local_dir.rstrip("/") - dest_dir = dest_dir.rstrip("/") - local_basename = os.path.basename(local_dir) - if self.is_exist(dest_dir + "/" + local_basename) and overwrite: - self.delete(dest_dir + "/" + local_basename) - if not self.is_exist(dest_dir): - self.makedirs(dest_dir) - put_command = ["-put", local_dir, dest_dir] - returncode, output, errors = self.__run_hdfs_cmd(put_command) - if returncode != 0: - _logger.error("Put local dir: {} to HDFS dir: {} failed".format( - local_dir, dest_dir)) - return False - return True - - -if __name__ == "__main__": - hadoop_home = "/home/client/hadoop-client/hadoop/" + cmd = "{} -mkdir {}".format(self._base_cmd, fs_path) + ret, lines = self._run_cmd(cmd) + if ret != 0: + raise ExecuteError + + @_handle_errors + def mv(self, fs_src_path, fs_dst_path, test_exists=True): + if test_exists: + if not self.is_exist(fs_src_path): + raise FSFileNotExistsError + + if self.is_exist(fs_dst_path): + raise FSFileExistsError + + cmd = "{} -mv {} {}".format(self._base_cmd, fs_src_path, fs_dst_path) + ret, _ = self._run_cmd(cmd) + if ret != 0: + raise ExecuteError + + @_handle_errors + def _rmr(self, fs_path): + cmd = "{} -rmr {}".format(self._base_cmd, fs_path) + ret, _ = self._run_cmd(cmd) + if ret != 0: + raise ExecuteError + + @_handle_errors + def _rm(self, fs_path): + cmd = "{} -rm {}".format(self._base_cmd, fs_path) + ret, _ = self._run_cmd(cmd) + if ret != 0: + raise ExecuteError + + def delete(self, fs_path): + if not self.is_exist(fs_path): + return - configs = { - "fs.default.name": "hdfs://xxx.hadoop.com:54310", - "hadoop.job.ugi": "hello,hello123" - } + is_dir = self.is_dir(fs_path) + if is_dir: + return self._rmr(fs_path) - client = HDFSClient(hadoop_home, configs) + return self._rm(fs_path) - client.ls("/user/com/train-25") - files = client.lsr("/user/com/train-25/models") + def need_upload_download(self): + return True diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 366951aa7129790ad162eca814ee7a6832d557fd..90f7781b7a437603c19ef846152c78b4a9bfb06c 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -65,6 +65,11 @@ if(WIN32) LIST(REMOVE_ITEM TEST_OPS test_math_op_patch_var_base) endif() +if(APPLE OR WIN32) + LIST(REMOVE_ITEM TEST_OPS test_hdfs) + LIST(REMOVE_ITEM TEST_OPS test_fs_interface) +endif() + if (NOT ${WITH_GPU}) LIST(REMOVE_ITEM TEST_OPS test_conv2d_fusion_op) LIST(REMOVE_ITEM TEST_OPS test_rank_attention_op) # TODO(shenliang03): rank_attention_op support CPU device in future @@ -351,7 +356,7 @@ if(WITH_DISTRIBUTE) endif() bash_test_modules(test_launch_ps MODULES test_launch_ps.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) - set(dist_ut_port 1000) + set(dist_ut_port 20001) foreach(TEST_OP ${DIST_TEST_OPS}) bash_test_modules(${TEST_OP} MODULES dist_test.sh SERIAL LABELS "RUN_TYPE=EXCLUSIVE" ENVS "PADDLE_DIST_UT_PORT=${dist_ut_port}") MATH(EXPR dist_ut_port "${dist_ut_port}+50") diff --git a/python/paddle/fluid/tests/unittests/test_fleet_checkpoint.py b/python/paddle/fluid/tests/unittests/test_fleet_checkpoint.py index 35c2801bde31cc1b69a4cf9baf53fccc23f765c4..3318b67cadc74080468c0eab7ae1df1e834b5952 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_checkpoint.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_checkpoint.py @@ -17,11 +17,14 @@ import paddle.fluid as fluid import paddle.fluid.incubate.fleet.base.role_maker as role_maker from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet, TrainStatus import os -from paddle.distributed.fs_wrapper import LocalFS, BDFS +import sys + +from paddle.fluid.incubate.fleet.utils.fs import LocalFS +from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient class FleetTest(unittest.TestCase): - def _test_check_point(self, fs, dir_path): + def _test_checkpoint(self, fs, dir_path): file_name = "persistables" os.environ["TRAINING_ROLE"] = "TRAINER" @@ -47,30 +50,60 @@ class FleetTest(unittest.TestCase): exe.run(fluid.default_startup_program()) status = TrainStatus(2) - fleet.save_check_point(exe, dir_path, train_status=status, fs=fs) + fleet.save_checkpoint(exe, dir_path, train_status=status, fs=fs) n1 = fleet._get_last_checkpoint_no(dir_path, fs=fs) - status2 = fleet.load_check_point(exe, dir_path, trainer_id=0, fs=fs) + status2 = fleet.load_checkpoint(exe, dir_path, trainer_id=0, fs=fs) self.assertEqual(status2, status) - fleet.save_check_point(exe, dir_path, train_status=status, fs=fs) + fleet.save_checkpoint(exe, dir_path, train_status=status, fs=fs) n2 = fleet._get_last_checkpoint_no(dir_path, fs=fs) self.assertEqual(n2, n1 + 1) - fleet.clean_redundant_check_points(dir_path, fs=fs) + fleet.clean_redundant_checkpoints(dir_path, fs=fs) - def test_hdfs_check_point(self): - try: - fs = BDFS("xxxx", "xxxx", 1 * 1000, 1 * 1000) - dir_path = "/user/Paddle_Data/gongweibao/edl_test/my_paddle_model" - self._test_check_point(fs, dir_path) - except Exception as e: - print(e) + # unnormal + # test remain_all_checkpoint + fleet.save_checkpoint( + exe, + dir_path, + train_status=status, + fs=fs, + remain_all_checkpoint=False) - def test_local_check_point(self): + # can't save under a file + fs = LocalFS() + cache_path = "./.load_cache" + fs.touch(cache_path) + try: + fleet.save_checkpoint( + exe, + dir_path, + train_status=status, + fs=fs, + cache_path=cache_path) + self.assertFalse(True) + except: + pass + + # can't load under a file + try: + status2 = fleet.load_checkpoint( + exe, dir_path, trainer_id=0, fs=fs, cache_path=cache_path) + self.assertFalse(True) + except: + pass + fs.delete(cache_path) + + def test_hdfs_checkpoint(self): + fs = HDFSClient("/usr/local/hadoop-2.7.7", None) + dir_path = "./checkpoint_test_hdfs" + self._test_checkpoint(fs, os.path.abspath(dir_path)) + + def test_local_checkpoint(self): fs = LocalFS() - dir_path = "./my_paddle_model" - self._test_check_point(fs, dir_path) + dir_path = "./checkpoint_test_local" + self._test_checkpoint(fs, dir_path) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_fs_interface.py b/python/paddle/fluid/tests/unittests/test_fs_interface.py new file mode 100644 index 0000000000000000000000000000000000000000..0d87b94538f05d734cb3e621fc0dfc7c48e8fea2 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fs_interface.py @@ -0,0 +1,55 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle.fluid as fluid +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet, TrainStatus +import os +import sys +import inspect + +from paddle.fluid.incubate.fleet.utils.fs import LocalFS, FS +from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient +from paddle.fluid.incubate.fleet.utils.hdfs import FSTimeOut, FSFileExistsError, FSFileNotExistsError + + +class FSTest(unittest.TestCase): + def _test_method(self, func): + if sys.version_info[0] <= 2: + args = inspect.getargspec(func).args + else: + args = inspect.getfullargspec(func).args + + a = None + try: + if len(args) == 1: + func() + elif len(args) == 2: + func(a) + elif len(args) == 3: + func(a, a) + print("args:", args, len(args), "func:", func) + self.assertFalse(True) + except NotImplementedError as e: + pass + + def test(self): + fs = FS() + for name, func in inspect.getmembers(fs, predicate=inspect.ismethod): + self._test_method(func) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_hdfs.py b/python/paddle/fluid/tests/unittests/test_hdfs.py new file mode 100644 index 0000000000000000000000000000000000000000..9826542cee3732a48e1c6b6959afb74063bb09d7 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_hdfs.py @@ -0,0 +1,225 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle.fluid as fluid +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet, TrainStatus +import os +import sys + +from paddle.fluid.incubate.fleet.utils.fs import LocalFS +from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient +from paddle.fluid.incubate.fleet.utils.hdfs import FSTimeOut, FSFileExistsError, FSFileNotExistsError + +java_home = os.environ["JAVA_HOME"] + + +class FSTest(unittest.TestCase): + def _test_dirs(self, fs): + dir_path = os.path.abspath("./test_dir") + fs.delete(dir_path) + self.assertTrue(not fs.is_exist(dir_path)) + + fs.mkdirs(dir_path) + self.assertTrue(fs.is_exist(dir_path)) + self.assertTrue(not fs.is_file(dir_path)) + self.assertTrue(fs.is_dir(dir_path)) + + new_dir_path = os.path.abspath("./new_test_dir") + fs.delete(new_dir_path) + try: + fs.mv(new_dir_path, dir_path) + self.assertFalse(True) + except FSFileNotExistsError as e: + pass + + fs.mv(dir_path, new_dir_path) + self.assertTrue(fs.is_exist(new_dir_path)) + + fs.mv(new_dir_path, dir_path) + self.assertTrue(fs.is_exist(dir_path)) + try: + fs.mv(dir_path, dir_path) + self.assertFalse(True) + except FSFileExistsError as e: + pass + + fs.delete(dir_path) + self.assertTrue(not fs.is_exist(dir_path)) + + def _test_touch_file(self, fs): + file_path = os.path.abspath("./test_file") + + fs.delete(file_path) + self.assertTrue(not fs.is_exist(file_path)) + + fs.touch(file_path) + self.assertTrue(fs.is_exist(file_path)) + self.assertTrue(not fs.is_dir(file_path) and fs.is_file(file_path)) + + new_file_path = os.path.abspath("./new_test_file") + fs.mv(file_path, new_file_path) + self.assertTrue(fs.is_exist(new_file_path)) + + fs.mv(new_file_path, file_path) + self.assertTrue(fs.is_exist(file_path)) + + fs.delete(file_path) + self.assertTrue(not fs.is_exist(file_path)) + + def _test_upload(self, fs): + src_file = os.path.abspath("./test_upload.src") + dst_file = os.path.abspath("./test_uolpad.dst") + + try: + fs.upload(src_file, dst_file) + self.assertFalse(True) + except FSFileNotExistsError as e: + pass + + local = LocalFS() + local.touch(src_file) + fs.delete(dst_file) + + assert fs.need_upload_download() + + fs.upload(src_file, dst_file) + try: + fs.upload(src_file, dst_file) + self.assertFalse(True) + except FSFileExistsError as e: + pass + + self.assertTrue(fs.is_exist(dst_file)) + fs.delete(dst_file) + fs.delete(src_file) + + def _test_download(self, fs): + src_file = os.path.abspath("./test_download.src") + dst_file = os.path.abspath("./test_download.dst") + fs.delete(dst_file) + fs.delete(src_file) + + try: + fs.download(src_file, dst_file) + self.assertFalse(True) + except FSFileNotExistsError as e: + pass + + local = LocalFS() + local.touch(src_file) + fs.delete(dst_file) + + assert fs.need_upload_download() + + fs.download(src_file, dst_file) + try: + fs.download(src_file, dst_file) + self.assertFalse(True) + except FSFileExistsError as e: + pass + + self.assertTrue(fs.is_exist(dst_file)) + fs.delete(dst_file) + fs.delete(src_file) + + def _test_mkdirs(self, fs): + dir_name = "./test_mkdir" + fs.mkdirs(dir_name) + fs.mkdirs(dir_name) + + def test_exists(self): + fs = HDFSClient("/usr/local/hadoop-2.7.7/", None, time_out=15 * 1000) + self.assertFalse(fs.is_exist(os.path.abspath("./xxxx"))) + self.assertFalse(fs.is_dir(os.path.abspath("./xxxx"))) + self.assertTrue(fs.is_dir(os.path.abspath("./xxx/.."))) + dirs, files = fs.ls_dir(os.path.abspath("./test_hdfs.py")) + self.assertTrue(dirs == []) + self.assertTrue(len(files) == 1) + dirs, files = fs.ls_dir(os.path.abspath("./xxx/..")) + + def test_hdfs(self): + fs = HDFSClient("/usr/local/hadoop-2.7.7/", None, time_out=15 * 1000) + self._test_dirs(fs) + self._test_upload(fs) + + self._test_download(fs) + self._test_mkdirs(fs) + self._test_list_dir(fs) + + def test_local(self): + fs = LocalFS() + self._test_dirs(fs) + self._test_touch_file(fs) + self._test_mkdirs(fs) + self._test_list_dir(fs) + + def test_timeout(self): + fs = HDFSClient( + "/usr/local/hadoop-2.7.7/", + None, + time_out=6 * 1000, + sleep_inter=2000) + src = "hdfs_test_timeout" + dst = "new_hdfs_test_timeout" + fs.delete(dst) + fs.mkdirs(src) + fs.mkdirs(dst) + fs.mkdirs(dst + "/" + src) + output = "" + try: + fs.mv(src, dst, test_exists=False) + self.assertFalse(1, "can't execute cmd:{} output:{}".format(cmd, + output)) + except FSTimeOut as e: + print("execute mv {} to {} timeout".format(src, dst)) + + cmd = "{} -mv {} {}".format(fs._base_cmd, src, dst) + ret, output = fluid.core.shell_execute_cmd(cmd, 6 * 1000, 2 * 1000) + self.assertNotEqual(ret, 0) + print("second mv ret:{} output:{}".format(ret, output)) + + def test_is_dir(self): + fs = HDFSClient("/usr/local/hadoop-2.7.7/", None, time_out=15 * 1000) + self.assertFalse(fs.is_dir("./test_hdfs.py")) + s = """ +java.io.IOException: Input/output error + responseErrorMsg : failed to getFileStatus, errorCode: 3, path: /user/PUBLIC_KM_Data/wangxi16/data/serving_model, lparam: d868f6bb6822c621, errorMessage: inner error + at org.apache.hadoop.util.FileSystemUtil.throwException(FileSystemUtil.java:164) + at org.apache.hadoop.util.FileSystemUtil.dealWithResponse(FileSystemUtil.java:118) + at org.apache.hadoop.lite.client.LiteClientImpl.getFileStatus(LiteClientImpl.java:696) + at org.apache.hadoop.fs.LibDFileSystemImpl.getFileStatus(LibDFileSystemImpl.java:297) + at org.apache.hadoop.fs.LiteFileSystem.getFileStatus(LiteFileSystem.java:514) + at org.apache.hadoop.fs.FsShell.test(FsShell.java:1092) + at org.apache.hadoop.fs.FsShell.run(FsShell.java:2285) + at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) + at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79) + at org.apache.hadoop.fs.FsShell.main(FsShell.java:2353) + """ + + print("split lines:", s.splitlines()) + self.assertTrue(fs._test_match(s.splitlines()) != None) + + def test_config(self): + config = {"fs.default.name": "hdfs://xxx", "hadoop.job.ugi": "ugi"} + fs = HDFSClient("/usr/local/hadoop-2.7.7/", config, time_out=15 * 1000) + + def _test_list_dir(self, fs): + fs = HDFSClient("/usr/local/hadoop-2.7.7/", None, time_out=15 * 1000) + fs.ls_dir("test_not_exists") + + +if __name__ == '__main__': + unittest.main() diff --git a/tools/manylinux1/Dockerfile.cuda10_cudnn7_gcc48_ubuntu16 b/tools/manylinux1/Dockerfile.cuda10_cudnn7_gcc48_ubuntu16 index 9a763059ace11cd1cef11eaa739de9b5f2a24a88..ffef02dba4614f7bbbe13ebc30b40438a52b4590 100644 --- a/tools/manylinux1/Dockerfile.cuda10_cudnn7_gcc48_ubuntu16 +++ b/tools/manylinux1/Dockerfile.cuda10_cudnn7_gcc48_ubuntu16 @@ -100,10 +100,13 @@ WORKDIR /home/setuptools-40.6.2 RUN python setup.py build RUN python setup.py install WORKDIR /home -RUN wget https://files.pythonhosted.org/packages/69/81/52b68d0a4de760a2f1979b0931ba7889202f302072cc7a0d614211bc7579/pip-18.0.tar.gz -RUN tar -zxvf pip-18.0.tar.gz + +RUN wget https://files.pythonhosted.org/packages/69/81/52b68d0a4de760a2f1979b0931ba7889202f302072cc7a0d614211bc7579/pip-18.0.tar.gz && tar -zxvf pip-18.0.tar.gz WORKDIR pip-18.0 -RUN python setup.py install +RUN python setup.py install && \ + python3.7 setup.py install && \ + python3.6 setup.py install && \ + python3 setup.py install WORKDIR /home RUN rm Python-$version.tgz setuptools-40.6.2.zip pip-18.0.tar.gz && \ @@ -225,6 +228,9 @@ RUN wget https://paddle-ci.gz.bcebos.com/ccache-3.7.9.tar.gz && \ make -j8 && make install && \ ln -s /usr/local/ccache-3.7.9/bin/ccache /usr/local/bin/ccache +RUN wget --no-check-certificate -q https://paddle-edl.bj.bcebos.com/hadoop-2.7.7.tar.gz && \ + tar -xzf hadoop-2.7.7.tar.gz && mv hadoop-2.7.7 /usr/local/ + # Configure OpenSSH server. c.f. https://docs.docker.com/engine/examples/running_ssh_service RUN mkdir /var/run/sshd RUN echo 'root:root' | chpasswd diff --git a/tools/manylinux1/Dockerfile.cuda10_cudnn7_gcc8_ubuntu16 b/tools/manylinux1/Dockerfile.cuda10_cudnn7_gcc8_ubuntu16 index 3e1697fbd57cfe2c37b42b00e6300bf0279632c0..5bb471da0e94f47d7b90fc33790c7a296f0f0d0b 100644 --- a/tools/manylinux1/Dockerfile.cuda10_cudnn7_gcc8_ubuntu16 +++ b/tools/manylinux1/Dockerfile.cuda10_cudnn7_gcc8_ubuntu16 @@ -107,10 +107,13 @@ WORKDIR /home/setuptools-40.6.2 RUN python setup.py build RUN python setup.py install WORKDIR /home -RUN wget https://files.pythonhosted.org/packages/69/81/52b68d0a4de760a2f1979b0931ba7889202f302072cc7a0d614211bc7579/pip-18.0.tar.gz -RUN tar -zxvf pip-18.0.tar.gz + +RUN wget https://files.pythonhosted.org/packages/69/81/52b68d0a4de760a2f1979b0931ba7889202f302072cc7a0d614211bc7579/pip-18.0.tar.gz && tar -zxvf pip-18.0.tar.gz WORKDIR pip-18.0 -RUN python setup.py install +RUN python setup.py install && \ + python3.7 setup.py install && \ + python3.6 setup.py install && \ + python3 setup.py install WORKDIR /home RUN rm Python-$version.tgz setuptools-40.6.2.zip pip-18.0.tar.gz && \ @@ -228,6 +231,9 @@ RUN wget https://paddle-ci.gz.bcebos.com/ccache-3.7.9.tar.gz && \ make -j8 && make install && \ ln -s /usr/local/ccache-3.7.9/bin/ccache /usr/local/bin/ccache +RUN wget --no-check-certificate -q https://paddle-edl.bj.bcebos.com/hadoop-2.7.7.tar.gz && \ + tar -xzf hadoop-2.7.7.tar.gz && mv hadoop-2.7.7 /usr/local/ + # Configure OpenSSH server. c.f. https://docs.docker.com/engine/examples/running_ssh_service RUN mkdir /var/run/sshd RUN echo 'root:root' | chpasswd