hdfs.py 6.6 KB
Newer Older
T
tangwei12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14
"""HDFS Utils."""
T
tangwei12 已提交
15 16 17 18 19 20 21 22 23 24

import os
import sys
import subprocess
import multiprocessing
from datetime import datetime

import re
import copy
import errno
25
import time
T
tangwei12 已提交
26
import logging
G
gongweibao 已提交
27 28 29 30 31
import six
from . import fs
from .fs import FS, LocalFS, FSFileExistsError, FSFileNotExistsError, ExecuteError, FSTimeOut
import paddle.fluid as fluid
import functools
T
tangwei12 已提交
32

G
gongweibao 已提交
33 34
from pathlib import PurePosixPath, Path
import shutil
T
tangwei12 已提交
35

G
gongweibao 已提交
36
__all__ = ["HDFSClient"]
T
tangwei12 已提交
37 38


G
gongweibao 已提交
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
def _handle_errors(f):
    def handler(*args, **kwargs):
        start = time.time()
        while True:
            try:
                return f(*args, **kwargs)
            except ExecuteError as e:
                o = args[0]
                time_out = float(o._time_out) / 1000.0
                inter = float(o._sleep_inter) / 1000.0
                if time.time() - start >= time_out:
                    raise FSTimeOut
                time.sleep(inter)

    return functools.wraps(f)(handler)


class HDFSClient(FS):
    def __init__(
            self,
            hadoop_home,
            configs,
            time_out=5 * 60 * 1000,  #ms
            sleep_inter=1000):  #ms
        # Raise exception if JAVA_HOME not exists.
        java_home = os.environ["JAVA_HOME"]
T
tangwei12 已提交
65 66 67 68 69 70 71

        self.pre_commands = []
        hadoop_bin = '%s/bin/hadoop' % hadoop_home
        self.pre_commands.append(hadoop_bin)
        dfs = 'fs'
        self.pre_commands.append(dfs)

G
gongweibao 已提交
72 73 74
        if configs:
            for k, v in six.iteritems(configs):
                config_command = '-D%s=%s' % (k, v)
75

G
gongweibao 已提交
76 77 78 79 80
        self._time_out = time_out
        self._sleep_inter = sleep_inter
        self._base_cmd = " ".join(self.pre_commands)
        self._bd_err_re = re.compile(
            r'\s?responseErrorMsg\s?\:.*, errorCode\:\s?[0-9]+, path\:')
81

G
gongweibao 已提交
82 83 84
    def _run_cmd(self, cmd, redirect_stderr=False):
        ret, output = fluid.core.shell_execute_cmd(cmd, 0, 0, redirect_stderr)
        return int(ret), output.splitlines()
T
tangwei12 已提交
85

G
gongweibao 已提交
86 87 88
    def list_dirs(self, fs_path):
        if not self.is_exist(fs_path):
            return []
T
tangwei12 已提交
89

G
gongweibao 已提交
90 91
        dirs, _ = self.ls_dir(fs_path)
        return dirs
T
tangwei12 已提交
92

G
gongweibao 已提交
93 94 95 96
    @_handle_errors
    def ls_dir(self, fs_path):
        """	
        list directory under fs_path, and only give the pure name, not include the fs_path	
T
tangwei12 已提交
97
        """
G
gongweibao 已提交
98 99
        if not self.is_exist(fs_path):
            return [], []
T
tangwei12 已提交
100

G
gongweibao 已提交
101 102
        cmd = "{} -ls {}".format(self._base_cmd, fs_path)
        ret, lines = self._run_cmd(cmd)
T
tangwei12 已提交
103

G
gongweibao 已提交
104 105
        if ret != 0:
            raise ExecuteError
T
tangwei12 已提交
106

G
gongweibao 已提交
107 108 109 110 111 112
        dirs = []
        files = []
        for line in lines:
            arr = line.split()
            if len(arr) != 8:
                continue
T
tangwei12 已提交
113

G
gongweibao 已提交
114 115
            if fs_path not in arr[7]:
                continue
116

G
gongweibao 已提交
117 118 119 120 121
            p = PurePosixPath(arr[7])
            if arr[0][0] == 'd':
                dirs.append(p.name)
            else:
                files.append(p.name)
122

G
gongweibao 已提交
123
        return dirs, files
124

G
gongweibao 已提交
125 126 127 128 129
    def _test_match(self, lines):
        for l in lines:
            m = self._bd_err_re.match(l)
            if m != None:
                return m
130

G
gongweibao 已提交
131
        return None
132

G
gongweibao 已提交
133 134 135
    @_handle_errors
    def is_dir(self, fs_path):
        if not self.is_exist(fs_path):
136
            return False
T
tangwei12 已提交
137

G
gongweibao 已提交
138 139 140 141 142 143 144
        cmd = "{} -test -d {}".format(
            self._base_cmd, fs_path, redirect_stderr=True)
        ret, lines = self._run_cmd(cmd)
        if ret:
            # other error
            if self._test_match(lines) != None:
                raise ExecuteError
T
tangwei12 已提交
145 146 147

            return False

G
gongweibao 已提交
148
        return True
T
tangwei12 已提交
149

G
gongweibao 已提交
150 151
    def is_file(self, fs_path):
        if not self.is_exist(fs_path):
T
tangwei12 已提交
152 153
            return False

G
gongweibao 已提交
154
        return not self.is_dir(fs_path)
T
tangwei12 已提交
155

G
gongweibao 已提交
156 157 158 159 160 161 162
    @_handle_errors
    def is_exist(self, fs_path):
        cmd = "{} -ls {} ".format(self._base_cmd, fs_path)
        ret, out = self._run_cmd(cmd, redirect_stderr=True)
        if ret != 0:
            for l in out:
                if "No such file or directory" in l:
T
tangwei12 已提交
163
                    return False
G
gongweibao 已提交
164
            raise ExecuteError
T
tangwei12 已提交
165

G
gongweibao 已提交
166
        return True
T
tangwei12 已提交
167

G
gongweibao 已提交
168 169 170 171
    @_handle_errors
    def upload(self, local_path, fs_path):
        if self.is_exist(fs_path):
            raise FSFileExistsError
T
tangwei12 已提交
172

G
gongweibao 已提交
173 174 175
        local = LocalFS()
        if not local.is_exist(local_path):
            raise FSFileNotExistsError
176

G
gongweibao 已提交
177 178 179 180
        cmd = "{} -put {} {}".format(self._base_cmd, local_path, fs_path)
        ret, lines = self._run_cmd(cmd)
        if ret != 0:
            raise ExecuteError
181

G
gongweibao 已提交
182 183 184 185
    @_handle_errors
    def download(self, fs_path, local_path):
        if self.is_exist(local_path):
            raise FSFileExistsError
T
tangwei12 已提交
186

G
gongweibao 已提交
187 188
        if not self.is_exist(fs_path):
            raise FSFileNotExistsError
T
tangwei12 已提交
189

G
gongweibao 已提交
190 191 192 193
        cmd = "{} -get {} {}".format(self._base_cmd, fs_path, local_path)
        ret, lines = self._run_cmd(cmd)
        if ret != 0:
            raise ExecuteError
T
tangwei12 已提交
194

G
gongweibao 已提交
195 196 197
    @_handle_errors
    def mkdirs(self, fs_path):
        if self.is_exist(fs_path):
T
tangwei12 已提交
198 199
            return

G
gongweibao 已提交
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
        cmd = "{} -mkdir {}".format(self._base_cmd, fs_path)
        ret, lines = self._run_cmd(cmd)
        if ret != 0:
            raise ExecuteError

    @_handle_errors
    def mv(self, fs_src_path, fs_dst_path, test_exists=True):
        if test_exists:
            if not self.is_exist(fs_src_path):
                raise FSFileNotExistsError

            if self.is_exist(fs_dst_path):
                raise FSFileExistsError

        cmd = "{} -mv {} {}".format(self._base_cmd, fs_src_path, fs_dst_path)
        ret, _ = self._run_cmd(cmd)
        if ret != 0:
            raise ExecuteError

    @_handle_errors
    def _rmr(self, fs_path):
        cmd = "{} -rmr {}".format(self._base_cmd, fs_path)
        ret, _ = self._run_cmd(cmd)
        if ret != 0:
            raise ExecuteError

    @_handle_errors
    def _rm(self, fs_path):
        cmd = "{} -rm {}".format(self._base_cmd, fs_path)
        ret, _ = self._run_cmd(cmd)
        if ret != 0:
            raise ExecuteError

    def delete(self, fs_path):
        if not self.is_exist(fs_path):
            return
T
tangwei12 已提交
236

G
gongweibao 已提交
237 238 239
        is_dir = self.is_dir(fs_path)
        if is_dir:
            return self._rmr(fs_path)
T
tangwei12 已提交
240

G
gongweibao 已提交
241
        return self._rm(fs_path)
T
tangwei12 已提交
242

G
gongweibao 已提交
243 244
    def need_upload_download(self):
        return True