hdfs.py 9.3 KB
Newer Older
T
tangwei12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14
"""HDFS Utils."""
T
tangwei12 已提交
15 16 17 18 19 20 21 22 23 24

import os
import sys
import subprocess
import multiprocessing
from datetime import datetime

import re
import copy
import errno
25
import time
T
tangwei12 已提交
26
import logging
G
gongweibao 已提交
27
import six
K
kuizhiqing 已提交
28 29
#from . import fs
from paddle.distributed.fleet.utils.fs import FS, LocalFS, FSFileExistsError, FSFileNotExistsError, ExecuteError, FSTimeOut, FSShellCmdAborted
30
from paddle.fluid import core
G
gongweibao 已提交
31
import functools
T
tangwei12 已提交
32

G
gongweibao 已提交
33
import shutil
T
tangwei12 已提交
34

G
gongweibao 已提交
35
__all__ = ["HDFSClient"]
T
tangwei12 已提交
36 37


38 39 40 41 42 43 44
def _handle_errors(max_time_out=None):
    def decorator(f):
        @functools.wraps(f)
        def handler(*args, **kwargs):
            o = args[0]
            time_out = max_time_out
            if time_out is None:
G
gongweibao 已提交
45
                time_out = float(o._time_out) / 1000.0
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
            else:
                time_out /= 1000.0
            inter = float(o._sleep_inter) / 1000.0

            start = time.time()
            last_print_time = start
            while True:
                try:
                    return f(*args, **kwargs)
                #important: only ExecuteError need to retry
                except ExecuteError as e:
                    if time.time() - start >= time_out:
                        raise FSTimeOut("args:{} timeout:{}".format(
                            args, time.time() - start))

                    time.sleep(inter)
G
gongweibao 已提交
62

63 64 65 66 67 68 69 70
                if time.time() - last_print_time > 30:
                    print("hadoop operator timeout:args:{} timeout:{}".format(
                        args, time.time() - start))
                    last_print_time = time.time()

        return handler

    return decorator
G
gongweibao 已提交
71 72 73 74 75 76 77 78 79 80


class HDFSClient(FS):
    def __init__(
            self,
            hadoop_home,
            configs,
            time_out=5 * 60 * 1000,  #ms
            sleep_inter=1000):  #ms
        # Raise exception if JAVA_HOME not exists.
T
tangwei12 已提交
81 82 83 84 85 86 87

        self.pre_commands = []
        hadoop_bin = '%s/bin/hadoop' % hadoop_home
        self.pre_commands.append(hadoop_bin)
        dfs = 'fs'
        self.pre_commands.append(dfs)

G
gongweibao 已提交
88 89 90
        if configs:
            for k, v in six.iteritems(configs):
                config_command = '-D%s=%s' % (k, v)
91
                self.pre_commands.append(config_command)
92

G
gongweibao 已提交
93 94 95 96 97
        self._time_out = time_out
        self._sleep_inter = sleep_inter
        self._base_cmd = " ".join(self.pre_commands)
        self._bd_err_re = re.compile(
            r'\s?responseErrorMsg\s?\:.*, errorCode\:\s?[0-9]+, path\:')
98

G
gongweibao 已提交
99
    def _run_cmd(self, cmd, redirect_stderr=False):
100 101 102 103 104 105 106 107
        exe_cmd = "{} -{}".format(self._base_cmd, cmd)
        ret, output = core.shell_execute_cmd(exe_cmd, 0, 0, redirect_stderr)
        ret = int(ret)
        if ret == 134:
            raise FSShellCmdAborted(cmd)
        return ret, output.splitlines()

    @_handle_errors()
G
gongweibao 已提交
108 109 110
    def list_dirs(self, fs_path):
        if not self.is_exist(fs_path):
            return []
T
tangwei12 已提交
111

112
        dirs, files = self._ls_dir(fs_path)
G
gongweibao 已提交
113
        return dirs
T
tangwei12 已提交
114

115
    @_handle_errors()
G
gongweibao 已提交
116 117 118
    def ls_dir(self, fs_path):
        """	
        list directory under fs_path, and only give the pure name, not include the fs_path	
T
tangwei12 已提交
119
        """
G
gongweibao 已提交
120 121
        if not self.is_exist(fs_path):
            return [], []
T
tangwei12 已提交
122

123 124 125 126
        return self._ls_dir(fs_path)

    def _ls_dir(self, fs_path):
        cmd = "ls {}".format(fs_path)
G
gongweibao 已提交
127
        ret, lines = self._run_cmd(cmd)
T
tangwei12 已提交
128

G
gongweibao 已提交
129
        if ret != 0:
130
            raise ExecuteError(cmd)
T
tangwei12 已提交
131

G
gongweibao 已提交
132 133 134 135 136 137
        dirs = []
        files = []
        for line in lines:
            arr = line.split()
            if len(arr) != 8:
                continue
T
tangwei12 已提交
138

1
123malin 已提交
139
            p = os.path.basename(arr[7])
G
gongweibao 已提交
140
            if arr[0][0] == 'd':
1
123malin 已提交
141
                dirs.append(p)
G
gongweibao 已提交
142
            else:
1
123malin 已提交
143
                files.append(p)
144

G
gongweibao 已提交
145
        return dirs, files
146

G
gongweibao 已提交
147 148 149 150 151
    def _test_match(self, lines):
        for l in lines:
            m = self._bd_err_re.match(l)
            if m != None:
                return m
152

G
gongweibao 已提交
153
        return None
154

155
    @_handle_errors()
G
gongweibao 已提交
156 157
    def is_dir(self, fs_path):
        if not self.is_exist(fs_path):
158
            return False
T
tangwei12 已提交
159

160 161 162 163
        return self._is_dir(fs_path)

    def _is_dir(self, fs_path):
        cmd = "test -d {}".format(fs_path, redirect_stderr=True)
G
gongweibao 已提交
164 165 166
        ret, lines = self._run_cmd(cmd)
        if ret:
            # other error
167 168
            if self._test_match(lines):
                raise ExecuteError(cmd)
T
tangwei12 已提交
169 170 171

            return False

G
gongweibao 已提交
172
        return True
T
tangwei12 已提交
173

G
gongweibao 已提交
174 175
    def is_file(self, fs_path):
        if not self.is_exist(fs_path):
T
tangwei12 已提交
176 177
            return False

178
        return not self._is_dir(fs_path)
T
tangwei12 已提交
179

180
    @_handle_errors()
G
gongweibao 已提交
181
    def is_exist(self, fs_path):
182
        cmd = "ls {} ".format(fs_path)
G
gongweibao 已提交
183 184 185 186
        ret, out = self._run_cmd(cmd, redirect_stderr=True)
        if ret != 0:
            for l in out:
                if "No such file or directory" in l:
T
tangwei12 已提交
187
                    return False
188
            raise ExecuteError(cmd)
T
tangwei12 已提交
189

G
gongweibao 已提交
190
        return True
T
tangwei12 已提交
191

192
    # can't retry
G
gongweibao 已提交
193 194
    def upload(self, local_path, fs_path):
        if self.is_exist(fs_path):
195
            raise FSFileExistsError("{} exists".format(fs_path))
T
tangwei12 已提交
196

G
gongweibao 已提交
197 198
        local = LocalFS()
        if not local.is_exist(local_path):
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
            raise FSFileNotExistsError("{} not exists".format(local_path))

        return self._try_upload(local_path, fs_path)

    @_handle_errors()
    def _try_upload(self, local_path, fs_path):
        cmd = "put {} {}".format(local_path, fs_path)
        ret = 0
        try:
            ret, lines = self._run_cmd(cmd)
            if ret != 0:
                raise ExecuteError(cmd)
        except Exception as e:
            self.delete(fs_path)
            raise e

    # can't retry
G
gongweibao 已提交
216 217
    def download(self, fs_path, local_path):
        if self.is_exist(local_path):
218
            raise FSFileExistsError("{} exists".format(local_path))
T
tangwei12 已提交
219

G
gongweibao 已提交
220
        if not self.is_exist(fs_path):
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
            raise FSFileNotExistsError("{} not exits".format(fs_path))

        return self._try_download(fs_path, local_path)

    @_handle_errors()
    def _try_download(self, fs_path, local_path):
        cmd = "get {} {}".format(fs_path, local_path)
        ret = 0
        try:
            ret, lines = self._run_cmd(cmd)
            if ret != 0:
                raise ExecuteError(cmd)
        except Exception as e:
            local_fs = LocalFS()
            local_fs.delete(local_path)
            raise e

    @_handle_errors()
G
gongweibao 已提交
239 240
    def mkdirs(self, fs_path):
        if self.is_exist(fs_path):
T
tangwei12 已提交
241 242
            return

243 244 245 246
        out_hdfs = False

        cmd = "mkdir {} ".format(fs_path)
        ret, out = self._run_cmd(cmd, redirect_stderr=True)
G
gongweibao 已提交
247
        if ret != 0:
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
            for l in out:
                if "No such file or directory" in l:
                    out_hdfs = True
                    break
            if not out_hdfs:
                raise ExecuteError(cmd)

        if out_hdfs and not self.is_exist(fs_path):
            cmd = "mkdir -p {}".format(fs_path)
            ret, lines = self._run_cmd(cmd)
            if ret != 0:
                raise ExecuteError(cmd)

    def mv(self, fs_src_path, fs_dst_path, overwrite=False, test_exists=True):
        if overwrite and self.is_exist(fs_dst_path):
            self.delete(fs_dst_path)
G
gongweibao 已提交
264 265 266

        if test_exists:
            if not self.is_exist(fs_src_path):
267 268
                raise FSFileNotExistsError("{} is not exists".format(
                    fs_src_path))
G
gongweibao 已提交
269 270

            if self.is_exist(fs_dst_path):
Z
zhangchunle 已提交
271
                raise FSFileExistsError("{} exists already".format(fs_dst_path))
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287

        return self._try_mv(fs_src_path, fs_dst_path)

    @_handle_errors()
    def _try_mv(self, fs_src_path, fs_dst_path):
        cmd = "mv {} {}".format(fs_src_path, fs_dst_path)
        ret = 0
        try:
            ret, _ = self._run_cmd(cmd)
            if ret != 0:
                raise ExecuteError(cmd)
        except Exception as e:
            if not self.is_exist(fs_src_path) and \
                    self.is_exist(fs_dst_path):
                return
            raise e
G
gongweibao 已提交
288 289

    def _rmr(self, fs_path):
290
        cmd = "rmr {}".format(fs_path)
G
gongweibao 已提交
291 292
        ret, _ = self._run_cmd(cmd)
        if ret != 0:
293
            raise ExecuteError(cmd)
G
gongweibao 已提交
294 295

    def _rm(self, fs_path):
296
        cmd = "rm {}".format(fs_path)
G
gongweibao 已提交
297 298
        ret, _ = self._run_cmd(cmd)
        if ret != 0:
299
            raise ExecuteError(cmd)
G
gongweibao 已提交
300

301
    @_handle_errors()
G
gongweibao 已提交
302 303 304
    def delete(self, fs_path):
        if not self.is_exist(fs_path):
            return
T
tangwei12 已提交
305

306
        is_dir = self._is_dir(fs_path)
G
gongweibao 已提交
307 308
        if is_dir:
            return self._rmr(fs_path)
T
tangwei12 已提交
309

G
gongweibao 已提交
310
        return self._rm(fs_path)
T
tangwei12 已提交
311

312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
    def touch(self, fs_path, exist_ok=True):
        if self.is_exist(fs_path):
            if exist_ok:
                return
            raise FSFileExistsError

        return self._touchz(fs_path)

    @_handle_errors()
    def _touchz(self, fs_path):
        cmd = "touchz {}".format(fs_path)
        ret, _ = self._run_cmd(cmd)
        if ret != 0:
            raise ExecuteError

G
gongweibao 已提交
327 328
    def need_upload_download(self):
        return True