# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import abc import functools import multiprocessing import os import re import shutil import time # (TODO: GhostScreaming) It will be removed later. from paddle.fluid import core from .log_util import logger __all__ = [] class ExecuteError(Exception): pass class FSFileExistsError(Exception): pass class FSFileNotExistsError(Exception): pass class FSTimeOut(Exception): pass class FSShellCmdAborted(ExecuteError): pass class FS: @abc.abstractmethod def ls_dir(self, fs_path): raise NotImplementedError @abc.abstractmethod def is_file(self, fs_path): raise NotImplementedError @abc.abstractmethod def is_dir(self, fs_path): raise NotImplementedError @abc.abstractmethod def is_exist(self, fs_path): raise NotImplementedError @abc.abstractmethod def upload(self, local_path, fs_path): raise NotImplementedError @abc.abstractmethod def download(self, fs_path, local_path): raise NotImplementedError @abc.abstractmethod def mkdirs(self, fs_path): raise NotImplementedError @abc.abstractmethod def delete(self, fs_path): raise NotImplementedError @abc.abstractmethod def need_upload_download(self): raise NotImplementedError @abc.abstractmethod def rename(self, fs_src_path, fs_dst_path): raise NotImplementedError @abc.abstractmethod def mv(self, fs_src_path, fs_dst_path, overwrite=False, test_exists=False): raise NotImplementedError @abc.abstractmethod def upload_dir(self, local_dir, dest_dir): raise NotImplementedError @abc.abstractmethod def list_dirs(self, fs_path): raise NotImplementedError @abc.abstractmethod def touch(self, fs_path, exist_ok=True): raise NotImplementedError @abc.abstractmethod def cat(self, fs_path=None): raise NotImplementedError class LocalFS(FS): """ A tool of local file system. Examples: .. code-block:: python from paddle.distributed.fleet.utils import LocalFS client = LocalFS() subdirs, files = client.ls_dir("./") """ def ls_dir(self, fs_path): """ List directorys and files under `fs_path` . Args: fs_path(str): The local file path. Returns: Tuple: Return a 2-tuple, the first is a list of all its subdirectories, and the second is a list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]). Examples: .. code-block:: python from paddle.distributed.fleet.utils import LocalFS client = LocalFS() subdirs, files = client.ls_dir("./") """ if not self.is_exist(fs_path): return [], [] dirs = [] files = [] for f in os.listdir(fs_path): if os.path.isdir(fs_path + "/" + f): dirs.append(f) else: files.append(f) return dirs, files def mkdirs(self, fs_path): """ Create a local directory. Args: fs_path(str): The local directory path. Examples: .. code-block:: python from paddle.distributed.fleet.utils import LocalFS client = LocalFS() client.mkdirs("test_mkdirs") client.delete("test_mkdirs") """ assert not os.path.isfile(fs_path), "{} is already a file".format( fs_path ) os.system("mkdir -p {}".format(fs_path)) def rename(self, fs_src_path, fs_dst_path): """ Rename the file. Args: fs_src_path(str): The actual name of the file or directory fs_dst_path(str): The new name of the file or directory. Examples: .. code-block:: python from paddle.distributed.fleet.utils import LocalFS client = LocalFS() client.touch("test_rename_src") print(client.is_exists("test_rename_src")) # True client.rename("test_rename_src", "test_rename_dst") print(client.is_exists("test_rename_src")) # False print(client.is_exists("test_rename_dst")) # True client.delete("test_rename_dst") """ os.rename(fs_src_path, fs_dst_path) def _rmr(self, fs_path): shutil.rmtree(fs_path) def _rm(self, fs_path): os.remove(fs_path) def delete(self, fs_path): """ Delete the local file path, whether it's a file or directory. Args: fs_path(str): The local file path. Examples: .. code-block:: python from paddle.distributed.fleet.utils import LocalFS client = LocalFS() client.mkdirs("test_localFS_mkdirs") client.delete("test_localFS_mkdirs") """ if not self.is_exist(fs_path): return if os.path.isfile(fs_path): return self._rm(fs_path) return self._rmr(fs_path) def need_upload_download(self): return False def is_file(self, fs_path): """ Whether the local file path is a file. Args: fs_path(str): The local file path. Returns: Bool: Return true if the path exists and it's a file, otherwise return false. Examples: .. code-block:: python from paddle.distributed.fleet.utils import LocalFS client = LocalFS() client.touch("test_is_file") print(client.is_file("test_is_file")) # True client.delete("test_is_file") """ return os.path.isfile(fs_path) def is_dir(self, fs_path): """ Whether the local file path is a directory. Args: fs_path(str): The local file path. Returns: Bool: Return true if the path exists and it's a directory, otherwise return false. Examples: .. code-block:: python from paddle.distributed.fleet.utils import LocalFS client = LocalFS() client.mkdirs("test_is_dir") print(client.is_dir("test_is_file")) # True client.delete("test_is_dir") """ return os.path.isdir(fs_path) def is_exist(self, fs_path): """ Whether the local file path exists. Args: fs_path(str): The local file path. Returns: Bool: Wheter it's a file or directory, return true if the path exists, otherwise return false. Examples: .. code-block:: python from paddle.distributed.fleet.utils import LocalFS client = LocalFS() ret = local_fs.is_exist("test_is_exist") """ return os.path.exists(fs_path) def touch(self, fs_path, exist_ok=True): """ Create a local file. Args: fs_path(str): The local file path. exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false, program will throw an Exception. Default is true. Examples: .. code-block:: python from paddle.distributed.fleet.utils import LocalFS client = LocalFS() client.touch("test_touch") client.delete("test_touch") """ if self.is_exist(fs_path): if exist_ok: return raise FSFileExistsError os.system("touch {}".format(fs_path)) def mv(self, src_path, dst_path, overwrite=False, test_exists=False): """ Move a local file or directory from `src_path` to `dst_path` . Args: src_path(str): Name of the file or directory, that's needed to be moved. dst_path(str): Name of the file or directory to which to move to. overwrite(bool): Whether to re-write `dst_path` if that exists. Default is False. Examples: .. code-block:: python from paddle.distributed.fleet.utils import LocalFS client = LocalFS() client.touch("test_mv_src") client.mv("test_mv_src", "test_mv_dst") client.delete("test_mv_dst") """ if not self.is_exist(src_path): raise FSFileNotExistsError if overwrite and self.is_exist(dst_path): self.delete(dst_path) if self.is_exist(dst_path): raise FSFileExistsError return self.rename(src_path, dst_path) def list_dirs(self, fs_path): """ Only list directorys under `fs_path` . Args: fs_path(str): The local file path. Returns: List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...]. Examples: .. code-block:: python from paddle.distributed.fleet.utils import LocalFS client = LocalFS() subdirs = client.list_dirs("./") """ if not self.is_exist(fs_path): return [] dirs = [ f for f in os.listdir(fs_path) if os.path.isdir(fs_path + "/" + f) ] return dirs def _handle_errors(max_time_out=None): def decorator(f): @functools.wraps(f) def handler(*args, **kwargs): o = args[0] time_out = max_time_out if time_out is None: time_out = float(o._time_out) / 1000.0 else: time_out /= 1000.0 inter = float(o._sleep_inter) / 1000.0 start = time.time() last_print_time = start while True: try: return f(*args, **kwargs) # important: only ExecuteError need to retry except ExecuteError as e: if time.time() - start >= time_out: raise FSTimeOut( "args:{} timeout:{}".format( args, time.time() - start ) ) time.sleep(inter) if time.time() - last_print_time > 30: print( "hadoop operator timeout:args:{} timeout:{}".format( args, time.time() - start ) ) last_print_time = time.time() return handler return decorator class HDFSClient(FS): """ A tool of HDFS. Args: hadoop_home(str): Hadoop home. configs(dict): Hadoop config. It is a dictionary and needs to contain the keys: "fs.default.name" and "hadoop.job.ugi". Examples: .. code-block:: text from paddle.distributed.fleet.utils import HDFSClient hadoop_home = "/home/client/hadoop-client/hadoop/" configs = { "fs.default.name": "hdfs://xxx.hadoop.com:54310", "hadoop.job.ugi": "hello,hello123" } client = HDFSClient(hadoop_home, configs) client.ls_dir("hdfs:/test_hdfs_client") """ def __init__( self, hadoop_home, configs, time_out=5 * 60 * 1000, # ms sleep_inter=1000, ): # ms self.pre_commands = [] hadoop_bin = '%s/bin/hadoop' % hadoop_home self.pre_commands.append(hadoop_bin) dfs = 'fs' self.pre_commands.append(dfs) if configs: for k, v in configs.items(): config_command = '-D%s=%s' % (k, v) self.pre_commands.append(config_command) self._time_out = time_out self._sleep_inter = sleep_inter self._base_cmd = " ".join(self.pre_commands) self._bd_err_re = re.compile( r'\s?responseErrorMsg\s?\:.*, errorCode\:\s?[0-9]+, path\:' ) def _run_cmd(self, cmd, redirect_stderr=False, retry_times=5): exe_cmd = "{} -{}".format(self._base_cmd, cmd) ret = 0 output = None retry_sleep_second = 3 for x in range(retry_times + 1): ret, output = core.shell_execute_cmd(exe_cmd, 0, 0, redirect_stderr) ret = int(ret) if ret == 0: break time.sleep(retry_sleep_second) if ret == 134: raise FSShellCmdAborted(cmd) return ret, output.splitlines() @_handle_errors() def list_dirs(self, fs_path): """ Only list directorys under `fs_path` . Args: fs_path(str): The HDFS file path. Returns: List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...]. Examples: .. code-block:: text from paddle.distributed.fleet.utils import HDFSClient hadoop_home = "/home/client/hadoop-client/hadoop/" configs = { "fs.default.name": "hdfs://xxx.hadoop.com:54310", "hadoop.job.ugi": "hello,hello123" } client = HDFSClient(hadoop_home, configs) subdirs = client.list_dirs("hdfs:/test_hdfs_client") """ if not self.is_exist(fs_path): return [] dirs, files = self._ls_dir(fs_path) return dirs @_handle_errors() def ls_dir(self, fs_path): """ List directorys and files under `fs_path` . Args: fs_path(str): The HDFS file path. Returns: Tuple: Return a 2-tuple, the first element is the list of all its subdirectories, and the second one is the list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]). Examples: .. code-block:: text from paddle.distributed.fleet.utils import HDFSClient hadoop_home = "/home/client/hadoop-client/hadoop/" configs = { "fs.default.name": "hdfs://xxx.hadoop.com:54310", "hadoop.job.ugi": "hello,hello123" } client = HDFSClient(hadoop_home, configs) subdirs, files = client.ls_dir("hdfs:/test_hdfs_client") """ if not self.is_exist(fs_path): return [], [] return self._ls_dir(fs_path) def _ls_dir(self, fs_path): cmd = "ls {}".format(fs_path) ret, lines = self._run_cmd(cmd) if ret != 0: raise ExecuteError(cmd) dirs = [] files = [] for line in lines: arr = line.split() if len(arr) != 8: continue p = os.path.basename(arr[7]) if arr[0][0] == 'd': dirs.append(p) else: files.append(p) return dirs, files def _test_match(self, lines): for l in lines: m = self._bd_err_re.match(l) if m is not None: return m return None @_handle_errors() def is_dir(self, fs_path): """ Whether the remote HDFS path is a directory. Args: fs_path(str): The HDFS file path. Returns: Bool: Return true if the path exists and it's a directory, otherwise return false. Examples: .. code-block:: text from paddle.distributed.fleet.utils import HDFSClient hadoop_home = "/home/client/hadoop-client/hadoop/" configs = { "fs.default.name": "hdfs://xxx.hadoop.com:54310", "hadoop.job.ugi": "hello,hello123" } client = HDFSClient(hadoop_home, configs) ret = client.is_file("hdfs:/test_hdfs_client") """ if not self.is_exist(fs_path): return False return self._is_dir(fs_path) def _is_dir(self, fs_path): cmd = "test -d {}".format(fs_path) ret, lines = self._run_cmd(cmd, redirect_stderr=True, retry_times=1) if ret: # other error if self._test_match(lines): print('raise exception: ') print('\n'.join(lines)) raise ExecuteError(cmd) return False return True def is_file(self, fs_path): """ Whether the remote HDFS path is a file. Args: fs_path(str): The HDFS file path. Returns: Bool: Return true if the path exists and it's a file, otherwise return false. Examples: .. code-block:: text from paddle.distributed.fleet.utils import HDFSClient hadoop_home = "/home/client/hadoop-client/hadoop/" configs = { "fs.default.name": "hdfs://xxx.hadoop.com:54310", "hadoop.job.ugi": "hello,hello123" } client = HDFSClient(hadoop_home, configs) ret = client.is_file("hdfs:/test_hdfs_client") """ if not self.is_exist(fs_path): return False return not self._is_dir(fs_path) @_handle_errors() def is_exist(self, fs_path): """ Whether the remote HDFS path exists. Args: fs_path(str): The hdfs file path. Returns: Bool: Whether it's is file or directory, return true if the path exists, otherwise return false. Examples: .. code-block:: text from paddle.distributed.fleet.utils import HDFSClient hadoop_home = "/home/client/hadoop-client/hadoop/" configs = { "fs.default.name": "hdfs://xxx.hadoop.com:54310", "hadoop.job.ugi": "hello,hello123" } client = HDFSClient(hadoop_home, configs) ret = client.is_exist("hdfs:/test_hdfs_client") """ cmd = "test -e {} ".format(fs_path) ret, out = self._run_cmd(cmd, redirect_stderr=True, retry_times=1) if ret != 0: return False return True def upload_dir(self, local_dir, dest_dir, overwrite=False): """ upload dir to hdfs Args: local_dir(str): local dir dest_dir(str): hdfs dest dir overwrite(bool): is overwrite Returns: return code """ local_dir = local_dir.rstrip("/") dest_dir = dest_dir.rstrip("/") local_basename = os.path.basename(local_dir) if self.is_exist(dest_dir + "/" + local_basename) and overwrite: self.delete(dest_dir + "/" + local_basename) if not self.is_exist(dest_dir): self.mkdirs(dest_dir) self._try_upload(local_dir, dest_dir) # can't retry def upload(self, local_path, fs_path, multi_processes=5, overwrite=False): """ Upload the local path to remote HDFS. Args: local_path(str): The local path. fs_path(str): The HDFS path. multi_processes(int|1): the upload data process at the same time, default=5 overwrite(bool|False): will overwrite file on HDFS or not Examples: .. code-block:: text from paddle.distributed.fleet.utils import HDFSClient hadoop_home = "/home/client/hadoop-client/hadoop/" configs = { "fs.default.name": "hdfs://xxx.hadoop.com:54310", "hadoop.job.ugi": "hello,hello123" } client = HDFSClient(hadoop_home, configs) client.upload("test_hdfs_client", "hdfs:/test_hdfs_client") """ def __subprocess_upload(hdfs_path_single, datas): for data in datas: self._try_upload(data, hdfs_path_single) def get_local_files(path): """ get local files Args: path(str): local path Returns: list of local files """ rlist = [] if not os.path.exists(path): return rlist if os.path.isdir(path): for file in os.listdir(path): t = os.path.join(path, file) rlist.append(t) else: rlist.append(path) return rlist local = LocalFS() if not local.is_exist(local_path): raise FSFileNotExistsError("{} not exists".format(local_path)) all_files = get_local_files(local_path) if not all_files: print("there are nothing need to upload, function exit") return if self.is_exist(fs_path) and overwrite: self.delete(fs_path) self.mkdirs(fs_path) procs = [] for i in range(multi_processes): process_datas = self._split_files(all_files, i, multi_processes) p = multiprocessing.Process( target=__subprocess_upload, args=(fs_path, process_datas) ) procs.append(p) p.start() # complete the processes for proc in procs: proc.join() @_handle_errors() def _try_upload(self, local_path, fs_path): cmd = "put {} {}".format(local_path, fs_path) ret = 0 try: ret, _ = self._run_cmd(cmd) if ret != 0: raise ExecuteError(cmd) except Exception as e: self.delete(fs_path) raise e # can't retry def download(self, fs_path, local_path, multi_processes=5, overwrite=False): """ Download remote HDFS path to the local. Args: fs_path(str): The HDFS path. local_path(str): The local path. multi_processes(int|1): the download data process at the same time, default=1 overwrite(bool): is overwrite Examples: .. code-block:: text from paddle.distributed.fleet.utils import HDFSClient hadoop_home = "/home/client/hadoop-client/hadoop/" configs = { "fs.default.name": "hdfs://xxx.hadoop.com:54310", "hadoop.job.ugi": "hello,hello123" } client = HDFSClient(hadoop_home, configs) client.download("hdfs:/test_hdfs_client", "./") """ def __subprocess_download(local_path, datas): """ download file from HDFS Args: local_path(str): the local file path datas(str): the hdfs file path list """ for data in datas: self._try_download(data, local_path) if not self.is_exist(fs_path): raise FSFileNotExistsError("{} not exits".format(fs_path)) # download file if self.is_file(fs_path): return self._try_download(fs_path, local_path) # download dir dirs, all_filenames = self.ls_dir(fs_path) all_files = [fs_path + "/" + i for i in all_filenames] all_files.extend([fs_path + "/" + i for i in dirs]) procs = [] for i in range(multi_processes): process_datas = self._split_files(all_files, i, multi_processes) p = multiprocessing.Process( target=__subprocess_download, args=(local_path, process_datas) ) procs.append(p) p.start() # complete the processes for proc in procs: proc.join() @_handle_errors() def _try_download(self, fs_path, local_path): cmd = "get {} {}".format(fs_path, local_path) ret = 0 try: ret, _ = self._run_cmd(cmd) if ret != 0: raise ExecuteError(cmd) except Exception as e: local_fs = LocalFS() local_fs.delete(local_path) raise e @_handle_errors() def mkdirs(self, fs_path): """ Create a remote HDFS directory. Args: fs_path(str): The HDFS directory path. Examples: .. code-block:: text from paddle.distributed.fleet.utils import HDFSClient hadoop_home = "/home/client/hadoop-client/hadoop/" configs = { "fs.default.name": "hdfs://xxx.hadoop.com:54310", "hadoop.job.ugi": "hello,hello123" } client = HDFSClient(hadoop_home, configs) client.mkdirs("hdfs:/test_hdfs_client") """ if self.is_exist(fs_path): return out_hdfs = False cmd = "mkdir {} ".format(fs_path) ret, out = self._run_cmd(cmd, redirect_stderr=True) if ret != 0: for l in out: if "No such file or directory" in l: out_hdfs = True break if not out_hdfs: raise ExecuteError(cmd) if out_hdfs and not self.is_exist(fs_path): cmd = "mkdir -p {}".format(fs_path) ret, _ = self._run_cmd(cmd) if ret != 0: raise ExecuteError(cmd) def mv(self, fs_src_path, fs_dst_path, overwrite=False, test_exists=True): """ Move a remote HDFS file or directory from `fs_src_path` to `fs_dst_path` . Args: fs_src_path(str): Name of the file or directory, that's needed to be moved. fs_dst_path(str): Name of the file or directory to which to move to. overwrite(bool): Whether to re-write `fs_dst_path` if that exists. Default is False. test_exists(bool): Check the existence of `fs_src_path` and `fs_dst_path` . When `test_exists` is set true, if `fs_src_path` doesn't exist or `fs_dst_path` exists, program will throw an Excetption. Examples: .. code-block:: text from paddle.distributed.fleet.utils import HDFSClient hadoop_home = "/home/client/hadoop-client/hadoop/" configs = { "fs.default.name": "hdfs://xxx.hadoop.com:54310", "hadoop.job.ugi": "hello,hello123" } client = HDFSClient(hadoop_home, configs) client.mv("hdfs:/test_hdfs_client", "hdfs:/test_hdfs_client2") """ if overwrite and self.is_exist(fs_dst_path): self.delete(fs_dst_path) if test_exists: if not self.is_exist(fs_src_path): raise FSFileNotExistsError( "{} is not exists".format(fs_src_path) ) if self.is_exist(fs_dst_path): raise FSFileExistsError("{} exists already".format(fs_dst_path)) return self._try_mv(fs_src_path, fs_dst_path) @_handle_errors() def _try_mv(self, fs_src_path, fs_dst_path): cmd = "mv {} {}".format(fs_src_path, fs_dst_path) ret = 0 try: ret, _ = self._run_cmd(cmd, retry_times=1) if ret != 0: raise ExecuteError(cmd) except Exception as e: if not self.is_exist(fs_src_path) and self.is_exist(fs_dst_path): return raise e def _rmr(self, fs_path): cmd = "rmr {}".format(fs_path) ret, _ = self._run_cmd(cmd) if ret != 0: raise ExecuteError(cmd) def _rm(self, fs_path): cmd = "rm {}".format(fs_path) ret, _ = self._run_cmd(cmd) if ret != 0: raise ExecuteError(cmd) @_handle_errors() def delete(self, fs_path): """ Delete a remote HDFS path, whether it's a file or directory. Args: fs_path(str): The HDFS file path. Examples: .. code-block:: text from paddle.distributed.fleet.utils import HDFSClient hadoop_home = "/home/client/hadoop-client/hadoop/" configs = { "fs.default.name": "hdfs://xxx.hadoop.com:54310", "hadoop.job.ugi": "hello,hello123" } client = HDFSClient(hadoop_home, configs) client.delete("hdfs:/test_hdfs_client") """ if not self.is_exist(fs_path): return is_dir = self._is_dir(fs_path) if is_dir: return self._rmr(fs_path) return self._rm(fs_path) def touch(self, fs_path, exist_ok=True): """ Create a remote HDFS file. Args: fs_path(str): The HDFS file path. exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false, program will throw an Exception. Default is true. Examples: .. code-block:: text from paddle.distributed.fleet.utils import HDFSClient hadoop_home = "/home/client/hadoop-client/hadoop/" configs = { "fs.default.name": "hdfs://xxx.hadoop.com:54310", "hadoop.job.ugi": "hello,hello123" } client = HDFSClient(hadoop_home, configs) client.touch("hdfs:/test_hdfs_client") """ if self.is_exist(fs_path): if exist_ok: return raise FSFileExistsError return self._touchz(fs_path) @_handle_errors() def _touchz(self, fs_path): cmd = "touchz {}".format(fs_path) ret, _ = self._run_cmd(cmd) if ret != 0: raise ExecuteError(cmd) def need_upload_download(self): return True def cat(self, fs_path=None): """ Cat a remote HDFS file. Args: fs_path(str): The HDFS file path. Returns: file content Examples: .. code-block:: text from paddle.distributed.fleet.utils import HDFSClient hadoop_home = "/home/client/hadoop-client/hadoop/" configs = { "fs.default.name": "hdfs://xxx.hadoop.com:54310", "hadoop.job.ugi": "hello,hello123" } client = HDFSClient(hadoop_home, configs) client.cat("hdfs:/test_hdfs_client") """ if self.is_file(fs_path): output = self._try_cat(fs_path) return "\n".join(output) else: return "" @_handle_errors() def _try_cat(self, fs_path): cmd = "cat {}".format(fs_path) ret, output = self._run_cmd(cmd, retry_times=1) if ret != 0: raise ExecuteError(cmd) return output def _split_files(self, files, trainer_id, trainers): """ split file list Args: files(list): file list trainer_id(int): trainer mpi rank id trainers(int): all trainers num Returns: fileist(list): file list of current trainer """ remainder = len(files) % trainers blocksize = len(files) // trainers blocks = [blocksize] * trainers for i in range(remainder): blocks[i] += 1 trainer_files = [[]] * trainers begin = 0 for i in range(trainers): trainer_files[i] = files[begin : begin + blocks[i]] begin += blocks[i] return trainer_files[trainer_id] def list_files_info(self, path_list): """ list_files return file path and size Args: path_list(list): file list Returns: fileist(list): file list with file path and size """ if len(path_list) <= 0: return [] file_list = [] # concat filelist can speed up 'hadoop ls' str_concat = "" for path in path_list: str_concat += path + " " cmd = ( "ls " + str_concat + " | awk '{if ($8 != \"\") {print $5\" \"$8 }}'" ) ret, lines = self._run_cmd(cmd) if len(lines) == 0: logger.warning("list_files empty, path[%s]" % path_list) return [] for line in lines: arr = line.split(' ') if len(arr) < 2: continue file_path = arr[1] file_size = int(arr[0]) file_list.append({'path': file_path, 'size': file_size}) return file_list class AFSClient(FS): """ A tool of AFS. Use AfsWrapper. Examples: .. code-block:: text from paddle.distributed.fleet.utils import AFSClient client = AFSClient() client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") client.ls_dir("hdfs:/test_hdfs_client") """ def __init__(self, time_out=5 * 60 * 1000, sleep_inter=1000): # ms # ms self._fs = core.AfsWrapper() self._time_out = time_out def init(self, fs_name, fs_user, fs_passwd, fs_conf): self._fs.init(fs_name, fs_user, fs_passwd, fs_conf) def list_dirs(self, fs_path): """ Only list directorys under `fs_path` . Args: fs_path(str): The HDFS file path. Returns: List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...]. Examples: .. code-block:: text from paddle.distributed.fleet.utils import AFSClient client = AFSClient() client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") subdirs = client.list_dirs("hdfs:/test_hdfs_client") """ if not self.is_exist(fs_path): return [] # TODO:fengdanlei dirs, files = self._ls_dir(fs_path) return dirs def ls_dir(self, fs_path): """ List directorys and files under `fs_path` . Args: fs_path(str): The HDFS file path. Returns: Tuple: Return a 2-tuple, the first element is the list of all its subdirectories, and the second one is the list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]). Examples: .. code-block:: text from paddle.distributed.fleet.utils import AFSClient client = AFSClient() client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") subdirs, files = client.ls_dir("hdfs:/test_hdfs_client") """ if not self.is_exist(fs_path): return [], [] return self._ls_dir(fs_path) def _ls_dir(self, fs_path): files = self._fs.list(fs_path) dirs = [fs_path] return dirs, files def is_dir(self, fs_path): """ Whether the remote HDFS path is a directory. Args: fs_path(str): The HDFS file path. Returns: Bool: Return true if the path exists and it's a directory, otherwise return false. Examples: .. code-block:: text from paddle.distributed.fleet.utils import AFSClient client = AFSClient() client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") ret = client.is_file("hdfs:/test_hdfs_client") """ if not self.is_exist(fs_path): return False return self._is_dir(fs_path) def _is_dir(self, fs_path): list_path = self._fs.list(fs_path) if (len(list_path)) > 0: return True else: return False def is_file(self, fs_path): """ Whether the remote HDFS path is a file. Args: fs_path(str): The HDFS file path. Returns: Bool: Return true if the path exists and it's a file, otherwise return false. Examples: .. code-block:: text from paddle.distributed.fleet.utils import AFSClient client = AFSClient() client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") ret = client.is_file("hdfs:/test_hdfs_client") """ if not self.is_exist(fs_path): return False return not self._is_dir(fs_path) def is_exist(self, fs_path): """ Whether the remote HDFS path exists. Args: fs_path(str): The hdfs file path. Returns: Bool: Whether it's is file or directory, return true if the path exists, otherwise return false. Examples: .. code-block:: text from paddle.distributed.fleet.utils import AFSClient client = AFSClient() client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") ret = client.is_exist("hdfs:/test_hdfs_client") """ return self._fs.exist(fs_path) def upload_dir(self, local_dir, dest_dir, overwrite=False): """ upload dir to hdfs Args: local_dir(str): local dir dest_dir(str): hdfs dest dir overwrite(bool): is overwrite Returns: return code """ local_dir = local_dir.rstrip("/") dest_dir = dest_dir.rstrip("/") local_basename = os.path.basename(local_dir) if self.is_exist(dest_dir + "/" + local_basename) and overwrite: self.delete(dest_dir + "/" + local_basename) if not self.is_exist(dest_dir): self.mkdirs(dest_dir) self._fs.upload(local_dir, dest_dir) # can't retry def upload(self, local_path, fs_path, multi_processes=1, overwrite=False): """ Upload the local path to remote HDFS. Args: local_path(str): The local path. fs_path(str): The HDFS path. multi_processes(int|1): the upload data process at the same time, default=5 overwrite(bool|False): will overwrite file on HDFS or not Examples: .. code-block:: text from paddle.distributed.fleet.utils import AFSClient client = AFSClient() client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") client.upload("test_hdfs_client", "hdfs:/test_hdfs_client") """ local = LocalFS() if not local.is_exist(local_path): raise FSFileNotExistsError("{} not exists".format(local_path)) self._fs.upload(local_path, fs_path) def download(self, fs_path, local_path, multi_processes=1, overwrite=False): """ Download remote HDFS path to the local. Args: fs_path(str): The HDFS path. local_path(str): The local path. multi_processes(int|1): the download data process at the same time, default=1 overwrite(bool): is overwrite Examples: .. code-block:: text from paddle.distributed.fleet.utils import AFSClient client = AFSClient() client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") client.download("hdfs:/test_hdfs_client", "./") """ def __subprocess_download(local_path, datas): """ download file from HDFS Args: local_path(str): the local file path datas(str): the hdfs file path list """ for data in datas: self._fs.download(local_path, data) if not self.is_exist(fs_path): raise FSFileNotExistsError("{} not exits".format(fs_path)) # download file if self.is_file(fs_path): return self._fs.download(local_path, fs_path) # download dir _, all_filenames = self.ls_dir(fs_path) all_files = [fs_path + i for i in all_filenames] procs = [] for i in range(multi_processes): process_datas = self._split_files(all_files, i, multi_processes) p = multiprocessing.Process( target=__subprocess_download, args=(local_path, process_datas) ) procs.append(p) p.start() # complete the processes for proc in procs: proc.join() def mkdirs(self, fs_path): """ Create a remote HDFS directory. Args: fs_path(str): The HDFS directory path. Examples: .. code-block:: text from paddle.distributed.fleet.utils import AFSClient client = AFSClient() client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") client.mkdirs("hdfs:/test_hdfs_client") """ if self.is_exist(fs_path): return self._fs.mkdir(fs_path) def mv(self, fs_src_path, fs_dst_path, overwrite=False, test_exists=True): """ Move a remote HDFS file or directory from `fs_src_path` to `fs_dst_path` . Args: fs_src_path(str): Name of the file or directory, that's needed to be moved. fs_dst_path(str): Name of the file or directory to which to move to. overwrite(bool): Whether to re-write `fs_dst_path` if that exists. Default is False. test_exists(bool): Check the existence of `fs_src_path` and `fs_dst_path` . When `test_exists` is set true, if `fs_src_path` doesn't exist or `fs_dst_path` exists, program will throw an Excetption. Examples: .. code-block:: text from paddle.distributed.fleet.utils import AFSClient client = AFSClient() client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") client.mv("hdfs:/test_hdfs_client", "hdfs:/test_hdfs_client2") """ if overwrite and self.is_exist(fs_dst_path): self.delete(fs_dst_path) if test_exists: if not self.is_exist(fs_src_path): raise FSFileNotExistsError( "{} is not exists".format(fs_src_path) ) if self.is_exist(fs_dst_path): raise FSFileExistsError("{} exists already".format(fs_dst_path)) self._fs.mv(fs_src_path, fs_dst_path) def delete(self, fs_path): """ Delete a remote HDFS path, whether it's a file or directory. Args: fs_path(str): The HDFS file path. Examples: .. code-block:: text from paddle.distributed.fleet.utils import HDFSClient from paddle.distributed.fleet.utils import AFSClient client = AFSClient() client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") client.delete("hdfs:/test_hdfs_client") """ if not self.is_exist(fs_path): return self._fs.remove(fs_path) def touch(self, fs_path, exist_ok=True): """ Create a remote HDFS file. Args: fs_path(str): The HDFS file path. exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false, program will throw an Exception. Default is true. Examples: .. code-block:: text from paddle.distributed.fleet.utils import AFSClient client = AFSClient() client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") client.touch("hdfs:/test_hdfs_client") """ if self.is_exist(fs_path): if exist_ok: return raise FSFileExistsError return self._fs.touchz(fs_path) def need_upload_download(self): return True def cat(self, fs_path=None): """ Cat a remote HDFS file. Args: fs_path(str): The HDFS file path. Returns: file content Examples: .. code-block:: text from paddle.distributed.fleet.utils import AFSClient client = AFSClient() client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") client.cat("hdfs:/test_hdfs_client") """ if self.is_file(fs_path): return self._fs.cat(fs_path) else: return "" def _split_files(self, files, trainer_id, trainers): """ split file list Args: files(list): file list trainer_id(int): trainer mpi rank id trainers(int): all trainers num Returns: fileist(list): file list of current trainer """ remainder = len(files) % trainers blocksize = len(files) // trainers blocks = [blocksize] * trainers for i in range(remainder): blocks[i] += 1 trainer_files = [[]] * trainers begin = 0 for i in range(trainers): trainer_files[i] = files[begin : begin + blocks[i]] begin += blocks[i] return trainer_files[trainer_id]