fs.py 5.3 KB
Newer Older
T
tangwei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

X
xiexionghang 已提交
15
import os
16 17
import time
import logging
T
tangwei 已提交
18

X
xiexionghang 已提交
19 20
from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient

21 22 23 24
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger()
logger.setLevel(logging.INFO)

T
tangwei 已提交
25

X
xiexionghang 已提交
26
def is_afs_path(path):
T
tangwei 已提交
27
    """is_afs_path
X
xiexionghang 已提交
28
    """
X
xiexionghang 已提交
29 30 31 32
    if path.startswith("afs") or path.startswith("hdfs"):
        return True
    return False

X
xiexionghang 已提交
33 34

class LocalFSClient(object):
X
xiexionghang 已提交
35 36 37
    """
    Util for local disk file_system io 
    """
T
for mat  
tangwei 已提交
38

X
xiexionghang 已提交
39
    def __init__(self):
X
xiexionghang 已提交
40 41
        """R
        """
X
xiexionghang 已提交
42
        pass
T
for mat  
tangwei 已提交
43

X
xiexionghang 已提交
44
    def write(self, content, path, mode):
X
xiexionghang 已提交
45 46 47 48 49 50 51
        """
        write to file
        Args:
            content(string)
            path(string)
            mode(string): w/a  w:clear_write a:append_write
        """
X
xiexionghang 已提交
52
        temp_dir = os.path.dirname(path)
T
for mat  
tangwei 已提交
53
        if not os.path.exists(temp_dir):
X
xiexionghang 已提交
54 55 56 57 58 59 60
            os.makedirs(temp_dir)
        f = open(path, mode)
        f.write(content)
        f.flush()
        f.close()

    def cp(self, org_path, dest_path):
X
xiexionghang 已提交
61 62
        """R
        """
X
xiexionghang 已提交
63 64 65 66 67 68
        temp_dir = os.path.dirname(dest_path)
        if not os.path.exists(temp_dir):
            os.makedirs(temp_dir)
        return os.system("cp -r " + org_path + " " + dest_path)

    def cat(self, file_path):
X
xiexionghang 已提交
69 70
        """R
        """
X
xiexionghang 已提交
71 72 73 74 75 76
        f = open(file_path)
        content = f.read()
        f.close()
        return content

    def mkdir(self, dir_name):
X
xiexionghang 已提交
77 78 79
        """R
        """
        os.makedirs(dir_name)
X
xiexionghang 已提交
80 81

    def remove(self, path):
X
xiexionghang 已提交
82 83
        """R
        """
X
xiexionghang 已提交
84
        os.system("rm -rf " + path)
T
for mat  
tangwei 已提交
85

X
xiexionghang 已提交
86
    def is_exist(self, path):
X
xiexionghang 已提交
87 88
        """R
        """
X
xiexionghang 已提交
89 90 91 92 93
        if os.system("ls " + path) == 0:
            return True
        return False

    def ls(self, path):
X
xiexionghang 已提交
94 95
        """R
        """
X
xiexionghang 已提交
96 97 98
        files = os.listdir(path)
        return files

X
xiexionghang 已提交
99 100

class FileHandler(object):
X
xiexionghang 已提交
101 102 103
    """
    A Smart file handler. auto judge local/afs by path 
    """
T
for mat  
tangwei 已提交
104

X
xiexionghang 已提交
105
    def __init__(self, config):
X
xiexionghang 已提交
106 107
        """R
        """
X
xiexionghang 已提交
108
        if 'fs_name' in config:
T
for mat  
tangwei 已提交
109
            hadoop_home = "$HADOOP_HOME"
X
xiexionghang 已提交
110
            hdfs_configs = {
T
for mat  
tangwei 已提交
111
                "hadoop.job.ugi": config['fs_ugi'],
X
xiexionghang 已提交
112 113 114 115 116 117
                "fs.default.name": config['fs_name']
            }
            self._hdfs_client = HDFSClient(hadoop_home, hdfs_configs)
        self._local_fs_client = LocalFSClient()

    def is_exist(self, path):
X
xiexionghang 已提交
118 119
        """R
        """
X
xiexionghang 已提交
120 121 122 123 124 125
        if is_afs_path(path):
            return self._hdfs_client.is_exist(path)
        else:
            return self._local_fs_client.is_exist(path)

    def get_file_name(self, path):
X
xiexionghang 已提交
126 127
        """R
        """
X
xiexionghang 已提交
128 129 130 131
        sub_paths = path.split('/')
        return sub_paths[-1]

    def write(self, content, dest_path, mode='w'):
X
xiexionghang 已提交
132 133
        """R
        """
X
xiexionghang 已提交
134 135 136 137 138 139 140 141
        if is_afs_path(dest_path):
            file_name = self.get_file_name(dest_path)
            temp_local_file = "./tmp/" + file_name
            self._local_fs_client.remove(temp_local_file)
            org_content = ""
            if mode.find('a') >= 0:
                org_content = self._hdfs_client.cat(dest_path)
            content = content + org_content
T
tangwei 已提交
142 143 144
            self._local_fs_client.write(
                content, temp_local_file, mode
            )  # fleet hdfs_client only support upload, so write tmp file
X
xiexionghang 已提交
145 146 147 148 149 150 151
            self._hdfs_client.delete(dest_path + ".tmp")
            self._hdfs_client.upload(dest_path + ".tmp", temp_local_file)
            self._hdfs_client.delete(dest_path + ".bak")
            self._hdfs_client.rename(dest_path, dest_path + '.bak')
            self._hdfs_client.rename(dest_path + ".tmp", dest_path)
        else:
            self._local_fs_client.write(content, dest_path, mode)
T
for mat  
tangwei 已提交
152

X
xiexionghang 已提交
153
    def cat(self, path):
X
xiexionghang 已提交
154 155
        """R
        """
X
xiexionghang 已提交
156 157 158 159 160
        if is_afs_path(path):
            hdfs_cat = self._hdfs_client.cat(path)
            return hdfs_cat
        else:
            return self._local_fs_client.cat(path)
T
for mat  
tangwei 已提交
161

X
xiexionghang 已提交
162
    def ls(self, path):
X
xiexionghang 已提交
163 164
        """R
        """
165
        files = []
X
xiexionghang 已提交
166
        if is_afs_path(path):
167
            files = self._hdfs_client.ls(path)
T
tangwei 已提交
168 169
            files = [path + '/' + self.get_file_name(fi)
                     for fi in files]  # absulte path
X
xiexionghang 已提交
170
        else:
171
            files = self._local_fs_client.ls(path)
172
            files = [path + '/' + fi for fi in files]  # absulte path
173
        return files
T
for mat  
tangwei 已提交
174

X
xiexionghang 已提交
175
    def cp(self, org_path, dest_path):
X
xiexionghang 已提交
176 177
        """R
        """
X
xiexionghang 已提交
178 179 180 181 182 183
        org_is_afs = is_afs_path(org_path)
        dest_is_afs = is_afs_path(dest_path)
        if not org_is_afs and not dest_is_afs:
            return self._local_fs_client.cp(org_path, dest_path)
        if not org_is_afs and dest_is_afs:
            return self._hdfs_client.upload(dest_path, org_path)
T
for mat  
tangwei 已提交
184
        if org_is_afs and not dest_is_afs:
X
xiexionghang 已提交
185
            return self._hdfs_client.download(org_path, dest_path)
186
        logger.info("Not Suppor hdfs cp currently")