From a7c5210051292177c6717e7218cfbdc2d613273a Mon Sep 17 00:00:00 2001 From: gongweibao Date: Mon, 10 Aug 2020 13:41:13 +0800 Subject: [PATCH] Fix test_hdfs bug. (#26068) * fix merge3 test=develop --- python/paddle/fleet/utils/fs.py | 289 +++++++++++++----- .../incubate/checkpoint/auto_checkpoint.py | 2 +- .../incubate/checkpoint/checkpoint_saver.py | 5 +- .../incubate/fleet/collective/__init__.py | 6 +- .../tests/unittests/test_auto_checkpoint.py | 3 +- .../tests/unittests/test_auto_checkpoint2.py | 3 +- 6 files changed, 218 insertions(+), 90 deletions(-) diff --git a/python/paddle/fleet/utils/fs.py b/python/paddle/fleet/utils/fs.py index 3fec773f27..2dbe5cefbb 100644 --- a/python/paddle/fleet/utils/fs.py +++ b/python/paddle/fleet/utils/fs.py @@ -26,6 +26,7 @@ import logging import six import abc import paddle.fluid as fluid +from paddle.fluid import core import functools from pathlib import PurePosixPath, Path @@ -33,7 +34,7 @@ import shutil __all__ = [ 'FS', 'LocalFS', 'HDFSClient', 'ExecuteError', 'FSTimeOut', - 'FSFileExistsError', 'FSFileNotExistsError' + 'FSFileExistsError', 'FSFileNotExistsError', 'FSShellCmdAborted' ] @@ -53,6 +54,10 @@ class FSTimeOut(Exception): pass +class FSShellCmdAborted(ExecuteError): + pass + + class FS(object): @abc.abstractmethod def ls_dir(self, fs_path): @@ -95,7 +100,7 @@ class FS(object): raise NotImplementedError @abc.abstractmethod - def mv(self, fs_src_path, fs_dst_path): + def mv(self, fs_src_path, fs_dst_path, overwrite=False, test_exists=False): raise NotImplementedError @abc.abstractmethod @@ -103,15 +108,11 @@ class FS(object): raise NotImplementedError @abc.abstractmethod - def glob(self, fs_path): - raise NotImplementedError - - @abc.abstractmethod - def stat(self, fs_path): + def list_dirs(self, fs_path): raise NotImplementedError @abc.abstractmethod - def walk(self, fs_path): + def touch(self, fs_path, exist_ok=True): raise NotImplementedError @@ -135,14 +136,8 @@ class LocalFS(FS): fs_path) os.system("mkdir -p {}".format(fs_path)) - def is_file(self, fs_path): - return os.path.isfile(fs_path) - - def is_dir(self, fs_path): - return os.path.isdir(fs_path) - - def is_exist(self, fs_path): - return os.path.exists(fs_path) + def rename(self, fs_src_path, fs_dst_path): + os.rename(fs_src_path, fs_dst_path) def _rmr(self, fs_path): shutil.rmtree(fs_path) @@ -159,24 +154,51 @@ class LocalFS(FS): return self._rmr(fs_path) - def rename(self, fs_src_path, fs_dst_path): - os.rename(fs_src_path, fs_dst_path) - def need_upload_download(self): return False - def touch(self, fs_path): - return Path(fs_path).touch() + def is_file(self, fs_path): + return os.path.isfile(fs_path) + + def is_dir(self, fs_path): + return os.path.isdir(fs_path) + + def is_exist(self, fs_path): + return os.path.exists(fs_path) + + def touch(self, fs_path, exist_ok=True): + if self.is_exist(fs_path): + if exist_ok: + return + raise FSFileExistsError + + return Path(fs_path).touch(exist_ok=True) - def mv(self, src_path, dst_path): + def mv(self, src_path, dst_path, overwrite=False, test_exists=False): if not self.is_exist(src_path): raise FSFileNotExistsError + if overwrite and self.is_exist(dst_path): + self.delete(dst_path) + if self.is_exist(dst_path): raise FSFileExistsError return self.rename(src_path, dst_path) + def list_dirs(self, fs_path): + """ + list directory under fs_path, and only give the pure name, not include the fs_path + """ + if not self.is_exist(fs_path): + return [] + + dirs = [ + f for f in os.listdir(fs_path) if os.path.isdir(fs_path + "/" + f) + ] + + return dirs + """HDFS Utils.""" @@ -198,6 +220,41 @@ def _handle_errors(f): return functools.wraps(f)(handler) +def _handle_errors(max_time_out=None): + def decorator(f): + @functools.wraps(f) + def handler(*args, **kwargs): + o = args[0] + time_out = max_time_out + if time_out is None: + time_out = float(o._time_out) / 1000.0 + else: + time_out /= 1000.0 + inter = float(o._sleep_inter) / 1000.0 + + start = time.time() + last_print_time = start + while True: + try: + return f(*args, **kwargs) + #important: only ExecuteError need to retry + except ExecuteError as e: + if time.time() - start >= time_out: + raise FSTimeOut("args:{} timeout:{}".format( + args, time.time() - start)) + + time.sleep(inter) + + if time.time() - last_print_time > 30: + print("hadoop operator timeout:args:{} timeout:{}".format( + args, time.time() - start)) + last_print_time = time.time() + + return handler + + return decorator + + class HDFSClient(FS): def __init__( self, @@ -216,7 +273,8 @@ class HDFSClient(FS): if configs: for k, v in six.iteritems(configs): - self.pre_commands.append('-D%s=%s' % (k, v)) + config_command = '-D%s=%s' % (k, v) + self.pre_commands.append(config_command) self._time_out = time_out self._sleep_inter = sleep_inter @@ -225,10 +283,22 @@ class HDFSClient(FS): r'\s?responseErrorMsg\s?\:.*, errorCode\:\s?[0-9]+, path\:') def _run_cmd(self, cmd, redirect_stderr=False): - ret, output = fluid.core.shell_execute_cmd(cmd, 0, 0, redirect_stderr) - return int(ret), output.splitlines() + exe_cmd = "{} -{}".format(self._base_cmd, cmd) + ret, output = core.shell_execute_cmd(exe_cmd, 0, 0, redirect_stderr) + ret = int(ret) + if ret == 134: + raise FSShellCmdAborted(cmd) + return ret, output.splitlines() + + @_handle_errors() + def list_dirs(self, fs_path): + if not self.is_exist(fs_path): + return [] - @_handle_errors + dirs, files = self._ls_dir(fs_path) + return dirs + + @_handle_errors() def ls_dir(self, fs_path): """ list directory under fs_path, and only give the pure name, not include the fs_path @@ -236,11 +306,14 @@ class HDFSClient(FS): if not self.is_exist(fs_path): return [], [] - cmd = "{} -ls {}".format(self._base_cmd, fs_path) + return self._ls_dir(fs_path) + + def _ls_dir(self, fs_path): + cmd = "ls {}".format(fs_path) ret, lines = self._run_cmd(cmd) if ret != 0: - raise ExecuteError + raise ExecuteError(cmd) dirs = [] files = [] @@ -249,9 +322,6 @@ class HDFSClient(FS): if len(arr) != 8: continue - if fs_path not in arr[7]: - continue - p = PurePosixPath(arr[7]) if arr[0][0] == 'd': dirs.append(p.name) @@ -268,18 +338,20 @@ class HDFSClient(FS): return None - @_handle_errors + @_handle_errors() def is_dir(self, fs_path): if not self.is_exist(fs_path): return False - cmd = "{} -test -d {}".format( - self._base_cmd, fs_path, redirect_stderr=True) + return self._is_dir(fs_path) + + def _is_dir(self, fs_path): + cmd = "test -d {}".format(fs_path, redirect_stderr=True) ret, lines = self._run_cmd(cmd) if ret: # other error - if self._test_match(lines) != None: - raise ExecuteError + if self._test_match(lines): + raise ExecuteError(cmd) return False @@ -289,94 +361,155 @@ class HDFSClient(FS): if not self.is_exist(fs_path): return False - return not self.is_dir(fs_path) + return not self._is_dir(fs_path) - @_handle_errors + @_handle_errors() def is_exist(self, fs_path): - cmd = "{} -ls {} ".format(self._base_cmd, fs_path) + cmd = "ls {} ".format(fs_path) ret, out = self._run_cmd(cmd, redirect_stderr=True) if ret != 0: for l in out: if "No such file or directory" in l: return False - raise ExecuteError + raise ExecuteError(cmd) return True - @_handle_errors + # can't retry def upload(self, local_path, fs_path): if self.is_exist(fs_path): - raise FSFileExistsError + raise FSFileExistsError("{} exists".format(fs_path)) local = LocalFS() if not local.is_exist(local_path): - raise FSFileNotExistsError - - cmd = "{} -put {} {}".format(self._base_cmd, local_path, fs_path) - ret, lines = self._run_cmd(cmd) - if ret != 0: - raise ExecuteError - - @_handle_errors + raise FSFileNotExistsError("{} not exists".format(local_path)) + + return self._try_upload(local_path, fs_path) + + @_handle_errors() + def _try_upload(self, local_path, fs_path): + cmd = "put {} {}".format(local_path, fs_path) + ret = 0 + try: + ret, lines = self._run_cmd(cmd) + if ret != 0: + raise ExecuteError(cmd) + except Exception as e: + self.delete(fs_path) + raise e + + # can't retry def download(self, fs_path, local_path): if self.is_exist(local_path): - raise FSFileExistsError + raise FSFileExistsError("{} exists".format(local_path)) if not self.is_exist(fs_path): - raise FSFileNotExistsError - - cmd = "{} -get {} {}".format(self._base_cmd, fs_path, local_path) - ret, lines = self._run_cmd(cmd) - if ret != 0: - raise ExecuteError - - @_handle_errors + raise FSFileNotExistsError("{} not exits".format(fs_path)) + + return self._try_download(fs_path, local_path) + + @_handle_errors() + def _try_download(self, fs_path, local_path): + cmd = "get {} {}".format(fs_path, local_path) + ret = 0 + try: + ret, lines = self._run_cmd(cmd) + if ret != 0: + raise ExecuteError(cmd) + except Exception as e: + local_fs = LocalFS() + local_fs.delete(local_path) + raise e + + @_handle_errors() def mkdirs(self, fs_path): if self.is_exist(fs_path): return - cmd = "{} -mkdir {}".format(self._base_cmd, fs_path) - ret, lines = self._run_cmd(cmd) + out_hdfs = False + + cmd = "mkdir {} ".format(fs_path) + ret, out = self._run_cmd(cmd, redirect_stderr=True) if ret != 0: - raise ExecuteError + for l in out: + if "No such file or directory" in l: + out_hdfs = True + break + if not out_hdfs: + raise ExecuteError(cmd) + + if out_hdfs and not self.is_exist(fs_path): + cmd = "mkdir -p {}".format(fs_path) + ret, lines = self._run_cmd(cmd) + if ret != 0: + raise ExecuteError(cmd) + + def mv(self, fs_src_path, fs_dst_path, overwrite=False, test_exists=True): + if overwrite and self.is_exist(fs_dst_path): + self.delete(fs_dst_path) - @_handle_errors - def mv(self, fs_src_path, fs_dst_path, test_exists=True): if test_exists: if not self.is_exist(fs_src_path): - raise FSFileNotExistsError + raise FSFileNotExistsError("{} is not exists".format( + fs_src_path)) if self.is_exist(fs_dst_path): - raise FSFileExistsError + raise FSFileExistsError("{} exists already".format( + fs_src_path, fs_dst_path, fs_dst_path)) + + return self._try_mv(fs_src_path, fs_dst_path) + + @_handle_errors() + def _try_mv(self, fs_src_path, fs_dst_path): + cmd = "mv {} {}".format(fs_src_path, fs_dst_path) + ret = 0 + try: + ret, _ = self._run_cmd(cmd) + if ret != 0: + raise ExecuteError(cmd) + except Exception as e: + if not self.is_exist(fs_src_path) and \ + self.is_exist(fs_dst_path): + return + raise e - cmd = "{} -mv {} {}".format(self._base_cmd, fs_src_path, fs_dst_path) - ret, _ = self._run_cmd(cmd) - if ret != 0: - raise ExecuteError - - @_handle_errors def _rmr(self, fs_path): - cmd = "{} -rmr {}".format(self._base_cmd, fs_path) + cmd = "rmr {}".format(fs_path) ret, _ = self._run_cmd(cmd) if ret != 0: - raise ExecuteError + raise ExecuteError(cmd) - @_handle_errors def _rm(self, fs_path): - cmd = "{} -rm {}".format(self._base_cmd, fs_path) + cmd = "rm {}".format(fs_path) ret, _ = self._run_cmd(cmd) if ret != 0: - raise ExecuteError + raise ExecuteError(cmd) + @_handle_errors() def delete(self, fs_path): if not self.is_exist(fs_path): return - is_dir = self.is_dir(fs_path) + is_dir = self._is_dir(fs_path) if is_dir: return self._rmr(fs_path) return self._rm(fs_path) + def touch(self, fs_path, exist_ok=True): + if self.is_exist(fs_path): + if exist_ok: + return + raise FSFileExistsError + + return self._touchz(fs_path) + + @_handle_errors() + def _touchz(self, fs_path): + cmd = "touchz {}".format(fs_path) + ret, _ = self._run_cmd(cmd) + if ret != 0: + raise ExecuteError + def need_upload_download(self): return True diff --git a/python/paddle/fluid/incubate/checkpoint/auto_checkpoint.py b/python/paddle/fluid/incubate/checkpoint/auto_checkpoint.py index 094aa842d0..3016ca4a50 100644 --- a/python/paddle/fluid/incubate/checkpoint/auto_checkpoint.py +++ b/python/paddle/fluid/incubate/checkpoint/auto_checkpoint.py @@ -24,7 +24,6 @@ from threading import Thread, current_thread from contextlib import contextmanager from paddle.fluid import unique_name, compiler -from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient from .checkpoint_saver import SerializableBase, CheckpointSaver, PaddleModel from paddle.fluid.framework import in_dygraph_mode, Program @@ -306,6 +305,7 @@ class TrainEpochRange(SerializableBase): if self._checker.ce_test: config = None + from paddle.fleet.utils.fs import HDFSClient self._hdfs = HDFSClient(self._checker.hdfs_home, config) self._cper = CheckpointSaver(self._hdfs) diff --git a/python/paddle/fluid/incubate/checkpoint/checkpoint_saver.py b/python/paddle/fluid/incubate/checkpoint/checkpoint_saver.py index da94f8cd14..23c6f9f7b9 100644 --- a/python/paddle/fluid/incubate/checkpoint/checkpoint_saver.py +++ b/python/paddle/fluid/incubate/checkpoint/checkpoint_saver.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ..fleet.utils.fs import FS, LocalFS -from ..fleet.utils.hdfs import HDFSClient from ...compiler import CompiledProgram @@ -81,6 +79,7 @@ class CheckpointSaver(object): tmp_path = "{}.tmp".format(real_path) saved_path = tmp_path + from paddle.fleet.utils.fs import LocalFS local_fs = LocalFS() cache_path = None @@ -121,7 +120,6 @@ class CheckpointSaver(object): Deserialize objects in slists from path Return really load path """ - if checkpoint_no is None: max_no = self._get_last_checkpoint_no(path) @@ -136,6 +134,7 @@ class CheckpointSaver(object): assert isinstance(checkpoint_no, int) assert checkpoint_no >= 0 + from paddle.fleet.utils.fs import LocalFS local_fs = LocalFS() if self._fs.need_upload_download(): cache_path = "{}/{}.{}.load_cache".format( diff --git a/python/paddle/fluid/incubate/fleet/collective/__init__.py b/python/paddle/fluid/incubate/fleet/collective/__init__.py index 6ad6f61e2d..6e5aae8251 100644 --- a/python/paddle/fluid/incubate/fleet/collective/__init__.py +++ b/python/paddle/fluid/incubate/fleet/collective/__init__.py @@ -26,7 +26,6 @@ from paddle.fluid.incubate.fleet.base.fleet_base import Mode from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer from paddle.fluid import compiler -from paddle.fluid.incubate.fleet.utils.fs import LocalFS from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel, CheckpointSaver import os @@ -143,14 +142,13 @@ class Collective(Fleet): path, trainer_id, train_status, + fs, main_program=None, - fs=LocalFS(), local_cache_path=".cache", remain_all_checkpoint=True): """ This function save persistables and current epoch num to path. """ - if main_program == None: main_program = self._transpiled_program @@ -173,8 +171,8 @@ class Collective(Fleet): path, trainer_id, train_status, + fs, main_program=None, - fs=LocalFS(), local_cache_path=".cache", ignore_empty=True): """ diff --git a/python/paddle/fluid/tests/unittests/test_auto_checkpoint.py b/python/paddle/fluid/tests/unittests/test_auto_checkpoint.py index dd373f523a..7f02467737 100644 --- a/python/paddle/fluid/tests/unittests/test_auto_checkpoint.py +++ b/python/paddle/fluid/tests/unittests/test_auto_checkpoint.py @@ -20,8 +20,7 @@ from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet import os import sys -from paddle.fluid.incubate.fleet.utils.fs import LocalFS -from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient +from paddle.fleet.utils.fs import LocalFS, HDFSClient import paddle.fluid.incubate.checkpoint.auto_checkpoint as acp from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel from paddle.fluid.framework import program_guard diff --git a/python/paddle/fluid/tests/unittests/test_auto_checkpoint2.py b/python/paddle/fluid/tests/unittests/test_auto_checkpoint2.py index 281c3d7af1..f4bbb6d51c 100644 --- a/python/paddle/fluid/tests/unittests/test_auto_checkpoint2.py +++ b/python/paddle/fluid/tests/unittests/test_auto_checkpoint2.py @@ -20,8 +20,7 @@ from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet import os import sys -from paddle.fluid.incubate.fleet.utils.fs import LocalFS -from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient +from paddle.fleet.utils.fs import LocalFS, HDFSClient import paddle.fluid.incubate.checkpoint.auto_checkpoint as acp from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel from paddle.fluid.framework import program_guard -- GitLab