未验证 提交 a7c52100 编写于 作者: G gongweibao 提交者: GitHub

Fix test_hdfs bug. (#26068)

* fix merge3 test=develop
上级 50f149a4
...@@ -26,6 +26,7 @@ import logging ...@@ -26,6 +26,7 @@ import logging
import six import six
import abc import abc
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.fluid import core
import functools import functools
from pathlib import PurePosixPath, Path from pathlib import PurePosixPath, Path
...@@ -33,7 +34,7 @@ import shutil ...@@ -33,7 +34,7 @@ import shutil
__all__ = [ __all__ = [
'FS', 'LocalFS', 'HDFSClient', 'ExecuteError', 'FSTimeOut', 'FS', 'LocalFS', 'HDFSClient', 'ExecuteError', 'FSTimeOut',
'FSFileExistsError', 'FSFileNotExistsError' 'FSFileExistsError', 'FSFileNotExistsError', 'FSShellCmdAborted'
] ]
...@@ -53,6 +54,10 @@ class FSTimeOut(Exception): ...@@ -53,6 +54,10 @@ class FSTimeOut(Exception):
pass pass
class FSShellCmdAborted(ExecuteError):
pass
class FS(object): class FS(object):
@abc.abstractmethod @abc.abstractmethod
def ls_dir(self, fs_path): def ls_dir(self, fs_path):
...@@ -95,7 +100,7 @@ class FS(object): ...@@ -95,7 +100,7 @@ class FS(object):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def mv(self, fs_src_path, fs_dst_path): def mv(self, fs_src_path, fs_dst_path, overwrite=False, test_exists=False):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
...@@ -103,15 +108,11 @@ class FS(object): ...@@ -103,15 +108,11 @@ class FS(object):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def glob(self, fs_path): def list_dirs(self, fs_path):
raise NotImplementedError
@abc.abstractmethod
def stat(self, fs_path):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def walk(self, fs_path): def touch(self, fs_path, exist_ok=True):
raise NotImplementedError raise NotImplementedError
...@@ -135,14 +136,8 @@ class LocalFS(FS): ...@@ -135,14 +136,8 @@ class LocalFS(FS):
fs_path) fs_path)
os.system("mkdir -p {}".format(fs_path)) os.system("mkdir -p {}".format(fs_path))
def is_file(self, fs_path): def rename(self, fs_src_path, fs_dst_path):
return os.path.isfile(fs_path) os.rename(fs_src_path, fs_dst_path)
def is_dir(self, fs_path):
return os.path.isdir(fs_path)
def is_exist(self, fs_path):
return os.path.exists(fs_path)
def _rmr(self, fs_path): def _rmr(self, fs_path):
shutil.rmtree(fs_path) shutil.rmtree(fs_path)
...@@ -159,24 +154,51 @@ class LocalFS(FS): ...@@ -159,24 +154,51 @@ class LocalFS(FS):
return self._rmr(fs_path) return self._rmr(fs_path)
def rename(self, fs_src_path, fs_dst_path):
os.rename(fs_src_path, fs_dst_path)
def need_upload_download(self): def need_upload_download(self):
return False return False
def touch(self, fs_path): def is_file(self, fs_path):
return Path(fs_path).touch() return os.path.isfile(fs_path)
def is_dir(self, fs_path):
return os.path.isdir(fs_path)
def is_exist(self, fs_path):
return os.path.exists(fs_path)
def touch(self, fs_path, exist_ok=True):
if self.is_exist(fs_path):
if exist_ok:
return
raise FSFileExistsError
return Path(fs_path).touch(exist_ok=True)
def mv(self, src_path, dst_path): def mv(self, src_path, dst_path, overwrite=False, test_exists=False):
if not self.is_exist(src_path): if not self.is_exist(src_path):
raise FSFileNotExistsError raise FSFileNotExistsError
if overwrite and self.is_exist(dst_path):
self.delete(dst_path)
if self.is_exist(dst_path): if self.is_exist(dst_path):
raise FSFileExistsError raise FSFileExistsError
return self.rename(src_path, dst_path) return self.rename(src_path, dst_path)
def list_dirs(self, fs_path):
"""
list directory under fs_path, and only give the pure name, not include the fs_path
"""
if not self.is_exist(fs_path):
return []
dirs = [
f for f in os.listdir(fs_path) if os.path.isdir(fs_path + "/" + f)
]
return dirs
"""HDFS Utils.""" """HDFS Utils."""
...@@ -198,6 +220,41 @@ def _handle_errors(f): ...@@ -198,6 +220,41 @@ def _handle_errors(f):
return functools.wraps(f)(handler) return functools.wraps(f)(handler)
def _handle_errors(max_time_out=None):
def decorator(f):
@functools.wraps(f)
def handler(*args, **kwargs):
o = args[0]
time_out = max_time_out
if time_out is None:
time_out = float(o._time_out) / 1000.0
else:
time_out /= 1000.0
inter = float(o._sleep_inter) / 1000.0
start = time.time()
last_print_time = start
while True:
try:
return f(*args, **kwargs)
#important: only ExecuteError need to retry
except ExecuteError as e:
if time.time() - start >= time_out:
raise FSTimeOut("args:{} timeout:{}".format(
args, time.time() - start))
time.sleep(inter)
if time.time() - last_print_time > 30:
print("hadoop operator timeout:args:{} timeout:{}".format(
args, time.time() - start))
last_print_time = time.time()
return handler
return decorator
class HDFSClient(FS): class HDFSClient(FS):
def __init__( def __init__(
self, self,
...@@ -216,7 +273,8 @@ class HDFSClient(FS): ...@@ -216,7 +273,8 @@ class HDFSClient(FS):
if configs: if configs:
for k, v in six.iteritems(configs): for k, v in six.iteritems(configs):
self.pre_commands.append('-D%s=%s' % (k, v)) config_command = '-D%s=%s' % (k, v)
self.pre_commands.append(config_command)
self._time_out = time_out self._time_out = time_out
self._sleep_inter = sleep_inter self._sleep_inter = sleep_inter
...@@ -225,10 +283,22 @@ class HDFSClient(FS): ...@@ -225,10 +283,22 @@ class HDFSClient(FS):
r'\s?responseErrorMsg\s?\:.*, errorCode\:\s?[0-9]+, path\:') r'\s?responseErrorMsg\s?\:.*, errorCode\:\s?[0-9]+, path\:')
def _run_cmd(self, cmd, redirect_stderr=False): def _run_cmd(self, cmd, redirect_stderr=False):
ret, output = fluid.core.shell_execute_cmd(cmd, 0, 0, redirect_stderr) exe_cmd = "{} -{}".format(self._base_cmd, cmd)
return int(ret), output.splitlines() ret, output = core.shell_execute_cmd(exe_cmd, 0, 0, redirect_stderr)
ret = int(ret)
if ret == 134:
raise FSShellCmdAborted(cmd)
return ret, output.splitlines()
@_handle_errors()
def list_dirs(self, fs_path):
if not self.is_exist(fs_path):
return []
@_handle_errors dirs, files = self._ls_dir(fs_path)
return dirs
@_handle_errors()
def ls_dir(self, fs_path): def ls_dir(self, fs_path):
""" """
list directory under fs_path, and only give the pure name, not include the fs_path list directory under fs_path, and only give the pure name, not include the fs_path
...@@ -236,11 +306,14 @@ class HDFSClient(FS): ...@@ -236,11 +306,14 @@ class HDFSClient(FS):
if not self.is_exist(fs_path): if not self.is_exist(fs_path):
return [], [] return [], []
cmd = "{} -ls {}".format(self._base_cmd, fs_path) return self._ls_dir(fs_path)
def _ls_dir(self, fs_path):
cmd = "ls {}".format(fs_path)
ret, lines = self._run_cmd(cmd) ret, lines = self._run_cmd(cmd)
if ret != 0: if ret != 0:
raise ExecuteError raise ExecuteError(cmd)
dirs = [] dirs = []
files = [] files = []
...@@ -249,9 +322,6 @@ class HDFSClient(FS): ...@@ -249,9 +322,6 @@ class HDFSClient(FS):
if len(arr) != 8: if len(arr) != 8:
continue continue
if fs_path not in arr[7]:
continue
p = PurePosixPath(arr[7]) p = PurePosixPath(arr[7])
if arr[0][0] == 'd': if arr[0][0] == 'd':
dirs.append(p.name) dirs.append(p.name)
...@@ -268,18 +338,20 @@ class HDFSClient(FS): ...@@ -268,18 +338,20 @@ class HDFSClient(FS):
return None return None
@_handle_errors @_handle_errors()
def is_dir(self, fs_path): def is_dir(self, fs_path):
if not self.is_exist(fs_path): if not self.is_exist(fs_path):
return False return False
cmd = "{} -test -d {}".format( return self._is_dir(fs_path)
self._base_cmd, fs_path, redirect_stderr=True)
def _is_dir(self, fs_path):
cmd = "test -d {}".format(fs_path, redirect_stderr=True)
ret, lines = self._run_cmd(cmd) ret, lines = self._run_cmd(cmd)
if ret: if ret:
# other error # other error
if self._test_match(lines) != None: if self._test_match(lines):
raise ExecuteError raise ExecuteError(cmd)
return False return False
...@@ -289,94 +361,155 @@ class HDFSClient(FS): ...@@ -289,94 +361,155 @@ class HDFSClient(FS):
if not self.is_exist(fs_path): if not self.is_exist(fs_path):
return False return False
return not self.is_dir(fs_path) return not self._is_dir(fs_path)
@_handle_errors @_handle_errors()
def is_exist(self, fs_path): def is_exist(self, fs_path):
cmd = "{} -ls {} ".format(self._base_cmd, fs_path) cmd = "ls {} ".format(fs_path)
ret, out = self._run_cmd(cmd, redirect_stderr=True) ret, out = self._run_cmd(cmd, redirect_stderr=True)
if ret != 0: if ret != 0:
for l in out: for l in out:
if "No such file or directory" in l: if "No such file or directory" in l:
return False return False
raise ExecuteError raise ExecuteError(cmd)
return True return True
@_handle_errors # can't retry
def upload(self, local_path, fs_path): def upload(self, local_path, fs_path):
if self.is_exist(fs_path): if self.is_exist(fs_path):
raise FSFileExistsError raise FSFileExistsError("{} exists".format(fs_path))
local = LocalFS() local = LocalFS()
if not local.is_exist(local_path): if not local.is_exist(local_path):
raise FSFileNotExistsError raise FSFileNotExistsError("{} not exists".format(local_path))
cmd = "{} -put {} {}".format(self._base_cmd, local_path, fs_path) return self._try_upload(local_path, fs_path)
ret, lines = self._run_cmd(cmd)
if ret != 0: @_handle_errors()
raise ExecuteError def _try_upload(self, local_path, fs_path):
cmd = "put {} {}".format(local_path, fs_path)
@_handle_errors ret = 0
try:
ret, lines = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
except Exception as e:
self.delete(fs_path)
raise e
# can't retry
def download(self, fs_path, local_path): def download(self, fs_path, local_path):
if self.is_exist(local_path): if self.is_exist(local_path):
raise FSFileExistsError raise FSFileExistsError("{} exists".format(local_path))
if not self.is_exist(fs_path): if not self.is_exist(fs_path):
raise FSFileNotExistsError raise FSFileNotExistsError("{} not exits".format(fs_path))
cmd = "{} -get {} {}".format(self._base_cmd, fs_path, local_path) return self._try_download(fs_path, local_path)
ret, lines = self._run_cmd(cmd)
if ret != 0: @_handle_errors()
raise ExecuteError def _try_download(self, fs_path, local_path):
cmd = "get {} {}".format(fs_path, local_path)
@_handle_errors ret = 0
try:
ret, lines = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
except Exception as e:
local_fs = LocalFS()
local_fs.delete(local_path)
raise e
@_handle_errors()
def mkdirs(self, fs_path): def mkdirs(self, fs_path):
if self.is_exist(fs_path): if self.is_exist(fs_path):
return return
cmd = "{} -mkdir {}".format(self._base_cmd, fs_path) out_hdfs = False
ret, lines = self._run_cmd(cmd)
cmd = "mkdir {} ".format(fs_path)
ret, out = self._run_cmd(cmd, redirect_stderr=True)
if ret != 0: if ret != 0:
raise ExecuteError for l in out:
if "No such file or directory" in l:
out_hdfs = True
break
if not out_hdfs:
raise ExecuteError(cmd)
if out_hdfs and not self.is_exist(fs_path):
cmd = "mkdir -p {}".format(fs_path)
ret, lines = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
def mv(self, fs_src_path, fs_dst_path, overwrite=False, test_exists=True):
if overwrite and self.is_exist(fs_dst_path):
self.delete(fs_dst_path)
@_handle_errors
def mv(self, fs_src_path, fs_dst_path, test_exists=True):
if test_exists: if test_exists:
if not self.is_exist(fs_src_path): if not self.is_exist(fs_src_path):
raise FSFileNotExistsError raise FSFileNotExistsError("{} is not exists".format(
fs_src_path))
if self.is_exist(fs_dst_path): if self.is_exist(fs_dst_path):
raise FSFileExistsError raise FSFileExistsError("{} exists already".format(
fs_src_path, fs_dst_path, fs_dst_path))
return self._try_mv(fs_src_path, fs_dst_path)
@_handle_errors()
def _try_mv(self, fs_src_path, fs_dst_path):
cmd = "mv {} {}".format(fs_src_path, fs_dst_path)
ret = 0
try:
ret, _ = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
except Exception as e:
if not self.is_exist(fs_src_path) and \
self.is_exist(fs_dst_path):
return
raise e
cmd = "{} -mv {} {}".format(self._base_cmd, fs_src_path, fs_dst_path)
ret, _ = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError
@_handle_errors
def _rmr(self, fs_path): def _rmr(self, fs_path):
cmd = "{} -rmr {}".format(self._base_cmd, fs_path) cmd = "rmr {}".format(fs_path)
ret, _ = self._run_cmd(cmd) ret, _ = self._run_cmd(cmd)
if ret != 0: if ret != 0:
raise ExecuteError raise ExecuteError(cmd)
@_handle_errors
def _rm(self, fs_path): def _rm(self, fs_path):
cmd = "{} -rm {}".format(self._base_cmd, fs_path) cmd = "rm {}".format(fs_path)
ret, _ = self._run_cmd(cmd) ret, _ = self._run_cmd(cmd)
if ret != 0: if ret != 0:
raise ExecuteError raise ExecuteError(cmd)
@_handle_errors()
def delete(self, fs_path): def delete(self, fs_path):
if not self.is_exist(fs_path): if not self.is_exist(fs_path):
return return
is_dir = self.is_dir(fs_path) is_dir = self._is_dir(fs_path)
if is_dir: if is_dir:
return self._rmr(fs_path) return self._rmr(fs_path)
return self._rm(fs_path) return self._rm(fs_path)
def touch(self, fs_path, exist_ok=True):
if self.is_exist(fs_path):
if exist_ok:
return
raise FSFileExistsError
return self._touchz(fs_path)
@_handle_errors()
def _touchz(self, fs_path):
cmd = "touchz {}".format(fs_path)
ret, _ = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError
def need_upload_download(self): def need_upload_download(self):
return True return True
...@@ -24,7 +24,6 @@ from threading import Thread, current_thread ...@@ -24,7 +24,6 @@ from threading import Thread, current_thread
from contextlib import contextmanager from contextlib import contextmanager
from paddle.fluid import unique_name, compiler from paddle.fluid import unique_name, compiler
from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient
from .checkpoint_saver import SerializableBase, CheckpointSaver, PaddleModel from .checkpoint_saver import SerializableBase, CheckpointSaver, PaddleModel
from paddle.fluid.framework import in_dygraph_mode, Program from paddle.fluid.framework import in_dygraph_mode, Program
...@@ -306,6 +305,7 @@ class TrainEpochRange(SerializableBase): ...@@ -306,6 +305,7 @@ class TrainEpochRange(SerializableBase):
if self._checker.ce_test: if self._checker.ce_test:
config = None config = None
from paddle.fleet.utils.fs import HDFSClient
self._hdfs = HDFSClient(self._checker.hdfs_home, config) self._hdfs = HDFSClient(self._checker.hdfs_home, config)
self._cper = CheckpointSaver(self._hdfs) self._cper = CheckpointSaver(self._hdfs)
......
...@@ -12,8 +12,6 @@ ...@@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from ..fleet.utils.fs import FS, LocalFS
from ..fleet.utils.hdfs import HDFSClient
from ...compiler import CompiledProgram from ...compiler import CompiledProgram
...@@ -81,6 +79,7 @@ class CheckpointSaver(object): ...@@ -81,6 +79,7 @@ class CheckpointSaver(object):
tmp_path = "{}.tmp".format(real_path) tmp_path = "{}.tmp".format(real_path)
saved_path = tmp_path saved_path = tmp_path
from paddle.fleet.utils.fs import LocalFS
local_fs = LocalFS() local_fs = LocalFS()
cache_path = None cache_path = None
...@@ -121,7 +120,6 @@ class CheckpointSaver(object): ...@@ -121,7 +120,6 @@ class CheckpointSaver(object):
Deserialize objects in slists from path Deserialize objects in slists from path
Return really load path Return really load path
""" """
if checkpoint_no is None: if checkpoint_no is None:
max_no = self._get_last_checkpoint_no(path) max_no = self._get_last_checkpoint_no(path)
...@@ -136,6 +134,7 @@ class CheckpointSaver(object): ...@@ -136,6 +134,7 @@ class CheckpointSaver(object):
assert isinstance(checkpoint_no, int) assert isinstance(checkpoint_no, int)
assert checkpoint_no >= 0 assert checkpoint_no >= 0
from paddle.fleet.utils.fs import LocalFS
local_fs = LocalFS() local_fs = LocalFS()
if self._fs.need_upload_download(): if self._fs.need_upload_download():
cache_path = "{}/{}.{}.load_cache".format( cache_path = "{}/{}.{}.load_cache".format(
......
...@@ -26,7 +26,6 @@ from paddle.fluid.incubate.fleet.base.fleet_base import Mode ...@@ -26,7 +26,6 @@ from paddle.fluid.incubate.fleet.base.fleet_base import Mode
from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer
from paddle.fluid import compiler from paddle.fluid import compiler
from paddle.fluid.incubate.fleet.utils.fs import LocalFS
from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel, CheckpointSaver from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel, CheckpointSaver
import os import os
...@@ -143,14 +142,13 @@ class Collective(Fleet): ...@@ -143,14 +142,13 @@ class Collective(Fleet):
path, path,
trainer_id, trainer_id,
train_status, train_status,
fs,
main_program=None, main_program=None,
fs=LocalFS(),
local_cache_path=".cache", local_cache_path=".cache",
remain_all_checkpoint=True): remain_all_checkpoint=True):
""" """
This function save persistables and current epoch num to path. This function save persistables and current epoch num to path.
""" """
if main_program == None: if main_program == None:
main_program = self._transpiled_program main_program = self._transpiled_program
...@@ -173,8 +171,8 @@ class Collective(Fleet): ...@@ -173,8 +171,8 @@ class Collective(Fleet):
path, path,
trainer_id, trainer_id,
train_status, train_status,
fs,
main_program=None, main_program=None,
fs=LocalFS(),
local_cache_path=".cache", local_cache_path=".cache",
ignore_empty=True): ignore_empty=True):
""" """
......
...@@ -20,8 +20,7 @@ from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet ...@@ -20,8 +20,7 @@ from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet
import os import os
import sys import sys
from paddle.fluid.incubate.fleet.utils.fs import LocalFS from paddle.fleet.utils.fs import LocalFS, HDFSClient
from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient
import paddle.fluid.incubate.checkpoint.auto_checkpoint as acp import paddle.fluid.incubate.checkpoint.auto_checkpoint as acp
from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel
from paddle.fluid.framework import program_guard from paddle.fluid.framework import program_guard
......
...@@ -20,8 +20,7 @@ from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet ...@@ -20,8 +20,7 @@ from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet
import os import os
import sys import sys
from paddle.fluid.incubate.fleet.utils.fs import LocalFS from paddle.fleet.utils.fs import LocalFS, HDFSClient
from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient
import paddle.fluid.incubate.checkpoint.auto_checkpoint as acp import paddle.fluid.incubate.checkpoint.auto_checkpoint as acp
from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel
from paddle.fluid.framework import program_guard from paddle.fluid.framework import program_guard
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册