kagle_fs.py 4.5 KB
Newer Older
X
xiexionghang 已提交
1 2 3
"""
util for file_system io
"""
X
xiexionghang 已提交
4 5 6 7 8
import os
import time
from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient

def is_afs_path(path):
X
xiexionghang 已提交
9 10
    """R 
    """
X
xiexionghang 已提交
11 12 13 14 15
    if path.startswith("afs") or path.startswith("hdfs"):
        return True
    return False

class LocalFSClient:
X
xiexionghang 已提交
16 17 18 19
    """
    Util for local disk file_system io 
    """
    
X
xiexionghang 已提交
20
    def __init__(self):
X
xiexionghang 已提交
21 22
        """R
        """
X
xiexionghang 已提交
23
        pass
X
xiexionghang 已提交
24
    
X
xiexionghang 已提交
25
    def write(self, content, path, mode):
X
xiexionghang 已提交
26 27 28 29 30 31 32
        """
        write to file
        Args:
            content(string)
            path(string)
            mode(string): w/a  w:clear_write a:append_write
        """
X
xiexionghang 已提交
33 34 35 36 37 38 39 40 41
        temp_dir = os.path.dirname(path)
        if not os.path.exists(temp_dir): 
            os.makedirs(temp_dir)
        f = open(path, mode)
        f.write(content)
        f.flush()
        f.close()

    def cp(self, org_path, dest_path):
X
xiexionghang 已提交
42 43
        """R
        """
X
xiexionghang 已提交
44 45 46 47 48 49
        temp_dir = os.path.dirname(dest_path)
        if not os.path.exists(temp_dir):
            os.makedirs(temp_dir)
        return os.system("cp -r " + org_path + " " + dest_path)

    def cat(self, file_path):
X
xiexionghang 已提交
50 51
        """R
        """
X
xiexionghang 已提交
52 53 54 55 56 57
        f = open(file_path)
        content = f.read()
        f.close()
        return content

    def mkdir(self, dir_name):
X
xiexionghang 已提交
58 59 60
        """R
        """
        os.makedirs(dir_name)
X
xiexionghang 已提交
61 62

    def remove(self, path):
X
xiexionghang 已提交
63 64
        """R
        """
X
xiexionghang 已提交
65 66 67
        os.system("rm -rf " + path)
    
    def is_exist(self, path):
X
xiexionghang 已提交
68 69
        """R
        """
X
xiexionghang 已提交
70 71 72 73 74
        if os.system("ls " + path) == 0:
            return True
        return False

    def ls(self, path):
X
xiexionghang 已提交
75 76
        """R
        """
X
xiexionghang 已提交
77 78 79 80 81
        files = os.listdir(path)
        files = [ path + '/' + fi for fi in files ]
        return files

class FileHandler:
X
xiexionghang 已提交
82 83 84
    """
    A Smart file handler. auto judge local/afs by path 
    """
X
xiexionghang 已提交
85
    def __init__(self, config):
X
xiexionghang 已提交
86 87
        """R
        """
X
xiexionghang 已提交
88 89 90 91 92 93 94 95 96 97
        if 'fs_name' in config:
            hadoop_home="$HADOOP_HOME"
            hdfs_configs = {
                "hadoop.job.ugi": config['fs_ugi'], 
                "fs.default.name": config['fs_name']
            }
            self._hdfs_client = HDFSClient(hadoop_home, hdfs_configs)
        self._local_fs_client = LocalFSClient()

    def is_exist(self, path):
X
xiexionghang 已提交
98 99
        """R
        """
X
xiexionghang 已提交
100 101 102 103 104 105
        if is_afs_path(path):
            return self._hdfs_client.is_exist(path)
        else:
            return self._local_fs_client.is_exist(path)

    def get_file_name(self, path):
X
xiexionghang 已提交
106 107
        """R
        """
X
xiexionghang 已提交
108 109 110 111
        sub_paths = path.split('/')
        return sub_paths[-1]

    def write(self, content, dest_path, mode='w'):
X
xiexionghang 已提交
112 113
        """R
        """
X
xiexionghang 已提交
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
        if is_afs_path(dest_path):
            file_name = self.get_file_name(dest_path)
            temp_local_file = "./tmp/" + file_name
            self._local_fs_client.remove(temp_local_file)
            org_content = ""
            if mode.find('a') >= 0:
                org_content = self._hdfs_client.cat(dest_path)
            content = content + org_content
            self._local_fs_client.write(content, temp_local_file, mode) #fleet hdfs_client only support upload, so write tmp file
            self._hdfs_client.delete(dest_path + ".tmp")
            self._hdfs_client.upload(dest_path + ".tmp", temp_local_file)
            self._hdfs_client.delete(dest_path + ".bak")
            self._hdfs_client.rename(dest_path, dest_path + '.bak')
            self._hdfs_client.rename(dest_path + ".tmp", dest_path)
        else:
            self._local_fs_client.write(content, dest_path, mode)
            
    
    def cat(self, path):
X
xiexionghang 已提交
133 134
        """R
        """
X
xiexionghang 已提交
135 136 137 138 139 140 141 142 143
        if is_afs_path(path):
            print("xxh go cat " + path)
            hdfs_cat = self._hdfs_client.cat(path)
            print(hdfs_cat)
            return hdfs_cat
        else:
            return self._local_fs_client.cat(path)
    
    def ls(self, path):
X
xiexionghang 已提交
144 145
        """R
        """
X
xiexionghang 已提交
146 147 148 149 150 151 152
        if is_afs_path(path):
            return self._hdfs_client.ls(path)
        else:
            return self._local_fs_client.ls(path)

    
    def cp(self, org_path, dest_path):
X
xiexionghang 已提交
153 154
        """R
        """
X
xiexionghang 已提交
155 156 157 158 159 160 161 162 163 164
        org_is_afs = is_afs_path(org_path)
        dest_is_afs = is_afs_path(dest_path)
        if not org_is_afs and not dest_is_afs:
            return self._local_fs_client.cp(org_path, dest_path)
        if not org_is_afs and dest_is_afs:
            return self._hdfs_client.upload(dest_path, org_path)
        if org_is_afs and not dest_is_afs: 
            return self._hdfs_client.download(org_path, dest_path)
        print("Not Suppor hdfs cp currently")