# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """HDFS Utils.""" import os import sys import subprocess import multiprocessing from datetime import datetime import re import copy import errno import time import logging import six from . import fs from .fs import FS, LocalFS, FSFileExistsError, FSFileNotExistsError, ExecuteError, FSTimeOut, FSShellCmdAborted from paddle.fluid import core import functools import shutil __all__ = ["HDFSClient"] def _handle_errors(max_time_out=None): def decorator(f): @functools.wraps(f) def handler(*args, **kwargs): o = args[0] time_out = max_time_out if time_out is None: time_out = float(o._time_out) / 1000.0 else: time_out /= 1000.0 inter = float(o._sleep_inter) / 1000.0 start = time.time() last_print_time = start while True: try: return f(*args, **kwargs) #important: only ExecuteError need to retry except ExecuteError as e: if time.time() - start >= time_out: raise FSTimeOut("args:{} timeout:{}".format( args, time.time() - start)) time.sleep(inter) if time.time() - last_print_time > 30: print("hadoop operator timeout:args:{} timeout:{}".format( args, time.time() - start)) last_print_time = time.time() return handler return decorator class HDFSClient(FS): def __init__( self, hadoop_home, configs, time_out=5 * 60 * 1000, #ms sleep_inter=1000): #ms # Raise exception if JAVA_HOME not exists. self.pre_commands = [] hadoop_bin = '%s/bin/hadoop' % hadoop_home self.pre_commands.append(hadoop_bin) dfs = 'fs' self.pre_commands.append(dfs) if configs: for k, v in six.iteritems(configs): config_command = '-D%s=%s' % (k, v) self.pre_commands.append(config_command) self._time_out = time_out self._sleep_inter = sleep_inter self._base_cmd = " ".join(self.pre_commands) self._bd_err_re = re.compile( r'\s?responseErrorMsg\s?\:.*, errorCode\:\s?[0-9]+, path\:') def _run_cmd(self, cmd, redirect_stderr=False): exe_cmd = "{} -{}".format(self._base_cmd, cmd) ret, output = core.shell_execute_cmd(exe_cmd, 0, 0, redirect_stderr) ret = int(ret) if ret == 134: raise FSShellCmdAborted(cmd) return ret, output.splitlines() @_handle_errors() def list_dirs(self, fs_path): if not self.is_exist(fs_path): return [] dirs, files = self._ls_dir(fs_path) return dirs @_handle_errors() def ls_dir(self, fs_path): """ list directory under fs_path, and only give the pure name, not include the fs_path """ if not self.is_exist(fs_path): return [], [] return self._ls_dir(fs_path) def _ls_dir(self, fs_path): cmd = "ls {}".format(fs_path) ret, lines = self._run_cmd(cmd) if ret != 0: raise ExecuteError(cmd) dirs = [] files = [] for line in lines: arr = line.split() if len(arr) != 8: continue p = os.path.basename(arr[7]) if arr[0][0] == 'd': dirs.append(p) else: files.append(p) return dirs, files def _test_match(self, lines): for l in lines: m = self._bd_err_re.match(l) if m != None: return m return None @_handle_errors() def is_dir(self, fs_path): if not self.is_exist(fs_path): return False return self._is_dir(fs_path) def _is_dir(self, fs_path): cmd = "test -d {}".format(fs_path, redirect_stderr=True) ret, lines = self._run_cmd(cmd) if ret: # other error if self._test_match(lines): raise ExecuteError(cmd) return False return True def is_file(self, fs_path): if not self.is_exist(fs_path): return False return not self._is_dir(fs_path) @_handle_errors() def is_exist(self, fs_path): cmd = "ls {} ".format(fs_path) ret, out = self._run_cmd(cmd, redirect_stderr=True) if ret != 0: for l in out: if "No such file or directory" in l: return False raise ExecuteError(cmd) return True # can't retry def upload(self, local_path, fs_path): if self.is_exist(fs_path): raise FSFileExistsError("{} exists".format(fs_path)) local = LocalFS() if not local.is_exist(local_path): raise FSFileNotExistsError("{} not exists".format(local_path)) return self._try_upload(local_path, fs_path) @_handle_errors() def _try_upload(self, local_path, fs_path): cmd = "put {} {}".format(local_path, fs_path) ret = 0 try: ret, lines = self._run_cmd(cmd) if ret != 0: raise ExecuteError(cmd) except Exception as e: self.delete(fs_path) raise e # can't retry def download(self, fs_path, local_path): if self.is_exist(local_path): raise FSFileExistsError("{} exists".format(local_path)) if not self.is_exist(fs_path): raise FSFileNotExistsError("{} not exits".format(fs_path)) return self._try_download(fs_path, local_path) @_handle_errors() def _try_download(self, fs_path, local_path): cmd = "get {} {}".format(fs_path, local_path) ret = 0 try: ret, lines = self._run_cmd(cmd) if ret != 0: raise ExecuteError(cmd) except Exception as e: local_fs = LocalFS() local_fs.delete(local_path) raise e @_handle_errors() def mkdirs(self, fs_path): if self.is_exist(fs_path): return out_hdfs = False cmd = "mkdir {} ".format(fs_path) ret, out = self._run_cmd(cmd, redirect_stderr=True) if ret != 0: for l in out: if "No such file or directory" in l: out_hdfs = True break if not out_hdfs: raise ExecuteError(cmd) if out_hdfs and not self.is_exist(fs_path): cmd = "mkdir -p {}".format(fs_path) ret, lines = self._run_cmd(cmd) if ret != 0: raise ExecuteError(cmd) def mv(self, fs_src_path, fs_dst_path, overwrite=False, test_exists=True): if overwrite and self.is_exist(fs_dst_path): self.delete(fs_dst_path) if test_exists: if not self.is_exist(fs_src_path): raise FSFileNotExistsError("{} is not exists".format( fs_src_path)) if self.is_exist(fs_dst_path): raise FSFileExistsError("{} exists already".format(fs_dst_path)) return self._try_mv(fs_src_path, fs_dst_path) @_handle_errors() def _try_mv(self, fs_src_path, fs_dst_path): cmd = "mv {} {}".format(fs_src_path, fs_dst_path) ret = 0 try: ret, _ = self._run_cmd(cmd) if ret != 0: raise ExecuteError(cmd) except Exception as e: if not self.is_exist(fs_src_path) and \ self.is_exist(fs_dst_path): return raise e def _rmr(self, fs_path): cmd = "rmr {}".format(fs_path) ret, _ = self._run_cmd(cmd) if ret != 0: raise ExecuteError(cmd) def _rm(self, fs_path): cmd = "rm {}".format(fs_path) ret, _ = self._run_cmd(cmd) if ret != 0: raise ExecuteError(cmd) @_handle_errors() def delete(self, fs_path): if not self.is_exist(fs_path): return is_dir = self._is_dir(fs_path) if is_dir: return self._rmr(fs_path) return self._rm(fs_path) def touch(self, fs_path, exist_ok=True): if self.is_exist(fs_path): if exist_ok: return raise FSFileExistsError return self._touchz(fs_path) @_handle_errors() def _touchz(self, fs_path): cmd = "touchz {}".format(fs_path) ret, _ = self._run_cmd(cmd) if ret != 0: raise ExecuteError def need_upload_download(self): return True