hdfs.py 9.3 KB
Newer Older
T
tangwei12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14
"""HDFS Utils."""
T
tangwei12 已提交
15 16 17 18 19 20 21 22 23 24

import os
import sys
import subprocess
import multiprocessing
from datetime import datetime

import re
import copy
import errno
25
import time
T
tangwei12 已提交
26
import logging
G
gongweibao 已提交
27 28
import six
from . import fs
29 30
from .fs import FS, LocalFS, FSFileExistsError, FSFileNotExistsError, ExecuteError, FSTimeOut, FSShellCmdAborted
from paddle.fluid import core
G
gongweibao 已提交
31
import functools
T
tangwei12 已提交
32

G
gongweibao 已提交
33 34
from pathlib import PurePosixPath, Path
import shutil
T
tangwei12 已提交
35

G
gongweibao 已提交
36
__all__ = ["HDFSClient"]
T
tangwei12 已提交
37 38


39 40 41 42 43 44 45
def _handle_errors(max_time_out=None):
    def decorator(f):
        @functools.wraps(f)
        def handler(*args, **kwargs):
            o = args[0]
            time_out = max_time_out
            if time_out is None:
G
gongweibao 已提交
46
                time_out = float(o._time_out) / 1000.0
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
            else:
                time_out /= 1000.0
            inter = float(o._sleep_inter) / 1000.0

            start = time.time()
            last_print_time = start
            while True:
                try:
                    return f(*args, **kwargs)
                #important: only ExecuteError need to retry
                except ExecuteError as e:
                    if time.time() - start >= time_out:
                        raise FSTimeOut("args:{} timeout:{}".format(
                            args, time.time() - start))

                    time.sleep(inter)
G
gongweibao 已提交
63

64 65 66 67 68 69 70 71
                if time.time() - last_print_time > 30:
                    print("hadoop operator timeout:args:{} timeout:{}".format(
                        args, time.time() - start))
                    last_print_time = time.time()

        return handler

    return decorator
G
gongweibao 已提交
72 73 74 75 76 77 78 79 80 81


class HDFSClient(FS):
    def __init__(
            self,
            hadoop_home,
            configs,
            time_out=5 * 60 * 1000,  #ms
            sleep_inter=1000):  #ms
        # Raise exception if JAVA_HOME not exists.
T
tangwei12 已提交
82 83 84 85 86 87 88

        self.pre_commands = []
        hadoop_bin = '%s/bin/hadoop' % hadoop_home
        self.pre_commands.append(hadoop_bin)
        dfs = 'fs'
        self.pre_commands.append(dfs)

G
gongweibao 已提交
89 90 91
        if configs:
            for k, v in six.iteritems(configs):
                config_command = '-D%s=%s' % (k, v)
92
                self.pre_commands.append(config_command)
93

G
gongweibao 已提交
94 95 96 97 98
        self._time_out = time_out
        self._sleep_inter = sleep_inter
        self._base_cmd = " ".join(self.pre_commands)
        self._bd_err_re = re.compile(
            r'\s?responseErrorMsg\s?\:.*, errorCode\:\s?[0-9]+, path\:')
99

G
gongweibao 已提交
100
    def _run_cmd(self, cmd, redirect_stderr=False):
101 102 103 104 105 106 107 108
        exe_cmd = "{} -{}".format(self._base_cmd, cmd)
        ret, output = core.shell_execute_cmd(exe_cmd, 0, 0, redirect_stderr)
        ret = int(ret)
        if ret == 134:
            raise FSShellCmdAborted(cmd)
        return ret, output.splitlines()

    @_handle_errors()
G
gongweibao 已提交
109 110 111
    def list_dirs(self, fs_path):
        if not self.is_exist(fs_path):
            return []
T
tangwei12 已提交
112

113
        dirs, files = self._ls_dir(fs_path)
G
gongweibao 已提交
114
        return dirs
T
tangwei12 已提交
115

116
    @_handle_errors()
G
gongweibao 已提交
117 118 119
    def ls_dir(self, fs_path):
        """	
        list directory under fs_path, and only give the pure name, not include the fs_path	
T
tangwei12 已提交
120
        """
G
gongweibao 已提交
121 122
        if not self.is_exist(fs_path):
            return [], []
T
tangwei12 已提交
123

124 125 126 127
        return self._ls_dir(fs_path)

    def _ls_dir(self, fs_path):
        cmd = "ls {}".format(fs_path)
G
gongweibao 已提交
128
        ret, lines = self._run_cmd(cmd)
T
tangwei12 已提交
129

G
gongweibao 已提交
130
        if ret != 0:
131
            raise ExecuteError(cmd)
T
tangwei12 已提交
132

G
gongweibao 已提交
133 134 135 136 137 138
        dirs = []
        files = []
        for line in lines:
            arr = line.split()
            if len(arr) != 8:
                continue
T
tangwei12 已提交
139

G
gongweibao 已提交
140 141 142 143 144
            p = PurePosixPath(arr[7])
            if arr[0][0] == 'd':
                dirs.append(p.name)
            else:
                files.append(p.name)
145

G
gongweibao 已提交
146
        return dirs, files
147

G
gongweibao 已提交
148 149 150 151 152
    def _test_match(self, lines):
        for l in lines:
            m = self._bd_err_re.match(l)
            if m != None:
                return m
153

G
gongweibao 已提交
154
        return None
155

156
    @_handle_errors()
G
gongweibao 已提交
157 158
    def is_dir(self, fs_path):
        if not self.is_exist(fs_path):
159
            return False
T
tangwei12 已提交
160

161 162 163 164
        return self._is_dir(fs_path)

    def _is_dir(self, fs_path):
        cmd = "test -d {}".format(fs_path, redirect_stderr=True)
G
gongweibao 已提交
165 166 167
        ret, lines = self._run_cmd(cmd)
        if ret:
            # other error
168 169
            if self._test_match(lines):
                raise ExecuteError(cmd)
T
tangwei12 已提交
170 171 172

            return False

G
gongweibao 已提交
173
        return True
T
tangwei12 已提交
174

G
gongweibao 已提交
175 176
    def is_file(self, fs_path):
        if not self.is_exist(fs_path):
T
tangwei12 已提交
177 178
            return False

179
        return not self._is_dir(fs_path)
T
tangwei12 已提交
180

181
    @_handle_errors()
G
gongweibao 已提交
182
    def is_exist(self, fs_path):
183
        cmd = "ls {} ".format(fs_path)
G
gongweibao 已提交
184 185 186 187
        ret, out = self._run_cmd(cmd, redirect_stderr=True)
        if ret != 0:
            for l in out:
                if "No such file or directory" in l:
T
tangwei12 已提交
188
                    return False
189
            raise ExecuteError(cmd)
T
tangwei12 已提交
190

G
gongweibao 已提交
191
        return True
T
tangwei12 已提交
192

193
    # can't retry
G
gongweibao 已提交
194 195
    def upload(self, local_path, fs_path):
        if self.is_exist(fs_path):
196
            raise FSFileExistsError("{} exists".format(fs_path))
T
tangwei12 已提交
197

G
gongweibao 已提交
198 199
        local = LocalFS()
        if not local.is_exist(local_path):
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
            raise FSFileNotExistsError("{} not exists".format(local_path))

        return self._try_upload(local_path, fs_path)

    @_handle_errors()
    def _try_upload(self, local_path, fs_path):
        cmd = "put {} {}".format(local_path, fs_path)
        ret = 0
        try:
            ret, lines = self._run_cmd(cmd)
            if ret != 0:
                raise ExecuteError(cmd)
        except Exception as e:
            self.delete(fs_path)
            raise e

    # can't retry
G
gongweibao 已提交
217 218
    def download(self, fs_path, local_path):
        if self.is_exist(local_path):
219
            raise FSFileExistsError("{} exists".format(local_path))
T
tangwei12 已提交
220

G
gongweibao 已提交
221
        if not self.is_exist(fs_path):
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
            raise FSFileNotExistsError("{} not exits".format(fs_path))

        return self._try_download(fs_path, local_path)

    @_handle_errors()
    def _try_download(self, fs_path, local_path):
        cmd = "get {} {}".format(fs_path, local_path)
        ret = 0
        try:
            ret, lines = self._run_cmd(cmd)
            if ret != 0:
                raise ExecuteError(cmd)
        except Exception as e:
            local_fs = LocalFS()
            local_fs.delete(local_path)
            raise e

    @_handle_errors()
G
gongweibao 已提交
240 241
    def mkdirs(self, fs_path):
        if self.is_exist(fs_path):
T
tangwei12 已提交
242 243
            return

244 245 246 247
        out_hdfs = False

        cmd = "mkdir {} ".format(fs_path)
        ret, out = self._run_cmd(cmd, redirect_stderr=True)
G
gongweibao 已提交
248
        if ret != 0:
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
            for l in out:
                if "No such file or directory" in l:
                    out_hdfs = True
                    break
            if not out_hdfs:
                raise ExecuteError(cmd)

        if out_hdfs and not self.is_exist(fs_path):
            cmd = "mkdir -p {}".format(fs_path)
            ret, lines = self._run_cmd(cmd)
            if ret != 0:
                raise ExecuteError(cmd)

    def mv(self, fs_src_path, fs_dst_path, overwrite=False, test_exists=True):
        if overwrite and self.is_exist(fs_dst_path):
            self.delete(fs_dst_path)
G
gongweibao 已提交
265 266 267

        if test_exists:
            if not self.is_exist(fs_src_path):
268 269
                raise FSFileNotExistsError("{} is not exists".format(
                    fs_src_path))
G
gongweibao 已提交
270 271

            if self.is_exist(fs_dst_path):
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
                raise FSFileExistsError("{} exists already".format(
                    fs_src_path, fs_dst_path, fs_dst_path))

        return self._try_mv(fs_src_path, fs_dst_path)

    @_handle_errors()
    def _try_mv(self, fs_src_path, fs_dst_path):
        cmd = "mv {} {}".format(fs_src_path, fs_dst_path)
        ret = 0
        try:
            ret, _ = self._run_cmd(cmd)
            if ret != 0:
                raise ExecuteError(cmd)
        except Exception as e:
            if not self.is_exist(fs_src_path) and \
                    self.is_exist(fs_dst_path):
                return
            raise e
G
gongweibao 已提交
290 291

    def _rmr(self, fs_path):
292
        cmd = "rmr {}".format(fs_path)
G
gongweibao 已提交
293 294
        ret, _ = self._run_cmd(cmd)
        if ret != 0:
295
            raise ExecuteError(cmd)
G
gongweibao 已提交
296 297

    def _rm(self, fs_path):
298
        cmd = "rm {}".format(fs_path)
G
gongweibao 已提交
299 300
        ret, _ = self._run_cmd(cmd)
        if ret != 0:
301
            raise ExecuteError(cmd)
G
gongweibao 已提交
302

303
    @_handle_errors()
G
gongweibao 已提交
304 305 306
    def delete(self, fs_path):
        if not self.is_exist(fs_path):
            return
T
tangwei12 已提交
307

308
        is_dir = self._is_dir(fs_path)
G
gongweibao 已提交
309 310
        if is_dir:
            return self._rmr(fs_path)
T
tangwei12 已提交
311

G
gongweibao 已提交
312
        return self._rm(fs_path)
T
tangwei12 已提交
313

314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
    def touch(self, fs_path, exist_ok=True):
        if self.is_exist(fs_path):
            if exist_ok:
                return
            raise FSFileExistsError

        return self._touchz(fs_path)

    @_handle_errors()
    def _touchz(self, fs_path):
        cmd = "touchz {}".format(fs_path)
        ret, _ = self._run_cmd(cmd)
        if ret != 0:
            raise ExecuteError

G
gongweibao 已提交
329 330
    def need_upload_download(self):
        return True