diff --git a/python/paddle/fluid/contrib/utils/hdfs_utils.py b/python/paddle/fluid/contrib/utils/hdfs_utils.py index 1bfc966de88e5e816497eea0a7a0d0f2dd667355..2ed37a9be39f1de9b57abcc9e2d2c34cc956950f 100644 --- a/python/paddle/fluid/contrib/utils/hdfs_utils.py +++ b/python/paddle/fluid/contrib/utils/hdfs_utils.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""HDFS Utils""" +"""hdfs_utils.py will move to fluid/incubate/fleet/utils/hdfs.py""" import os import sys diff --git a/python/paddle/fluid/contrib/utils/lookup_table_utils.py b/python/paddle/fluid/contrib/utils/lookup_table_utils.py index 402850e70ab269e3b2e7d1993ad43686c02e9eb9..2d18a9a8620e210d5b0f6fbb90c3b59e31ac8086 100644 --- a/python/paddle/fluid/contrib/utils/lookup_table_utils.py +++ b/python/paddle/fluid/contrib/utils/lookup_table_utils.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""lookup_table_utils.py will move to fluid/incubate/fleet/utils/lookup_table.py""" from __future__ import print_function diff --git a/python/paddle/fluid/incubate/fleet/base/fleet_base.py b/python/paddle/fluid/incubate/fleet/base/fleet_base.py index abba985848acf78de028ccde3398d4f9958b21a7..a52970fad1220b150fd56b365358cdab9a8ae199 100644 --- a/python/paddle/fluid/incubate/fleet/base/fleet_base.py +++ b/python/paddle/fluid/incubate/fleet/base/fleet_base.py @@ -157,19 +157,26 @@ class Fleet(object): Returns: list: files belongs to this worker. """ - file_num = len(files) trainer_id = self.worker_index() - trainer_num = self.worker_num() - if trainer_num > file_num: - raise ValueError("trainer_num should be <= file_num : " - "%s > %s" % (trainer_num, file_num)) - start = 0 - end = 0 - for i in range(0, trainer_id + 1): - length = file_num / trainer_num + (i < (file_num % trainer_num)) - start = end - end += length - return files[start:end] + trainers = self.worker_num() + + if len(files) < trainers: + raise ValueError("file number must gather or equal trainer number") + + remainder = len(files) % trainers + blocksize = len(files) / trainers + + blocks = [blocksize] * trainers + for i in range(remainder): + blocks[i] += 1 + + trainer_files = [[]] * trainers + begin = 0 + for i in range(trainers): + trainer_files[i] = files[begin:begin + blocks[i]] + begin += blocks[i] + + return trainer_files[trainer_id] def init(self, role_maker=None): """ diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index 046015503fbc80a87eb305859b3c174bc862f5b7..7fe45e49fb38fd3c8997561f255da7e2b0e29554 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -102,6 +102,11 @@ class RoleMakerBase(object): """ return self._server_endpoints + def to_string(self): + return "role: {}, current_id: {}, worker_endpoints: {}, server_endpoints: {}".format( + self._role, self._current_id, self._worker_endpoints, + self._server_endpoints) + class MPIRoleMaker(RoleMakerBase): """ diff --git a/python/paddle/fluid/incubate/fleet/utils/__init__.py b/python/paddle/fluid/incubate/fleet/utils/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..d0c32e26092f6ea25771279418582a24ea449ab2 --- /dev/null +++ b/python/paddle/fluid/incubate/fleet/utils/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/python/paddle/fluid/incubate/fleet/utils/hdfs.py b/python/paddle/fluid/incubate/fleet/utils/hdfs.py new file mode 100644 index 0000000000000000000000000000000000000000..5468df42505208d61ae292b9f77b0d8dfe82b159 --- /dev/null +++ b/python/paddle/fluid/incubate/fleet/utils/hdfs.py @@ -0,0 +1,510 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""HDFS Utils""" + +import os +import sys +import subprocess +import multiprocessing +from datetime import datetime + +import re +import copy +import errno + +import logging +from paddle.fluid.log_helper import get_logger + +__all__ = ["HDFSClient"] + +_logger = get_logger( + __name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s') + + +class HDFSClient(object): + """ + A tool of HDFS + + Args: + hadoop_home (string): hadoop_home + configs (dict): hadoop config, it is a dict, please contain \ + key "fs.default.name" and "hadoop.job.ugi" + Can be a float value + Examples: + hadoop_home = "/home/client/hadoop-client/hadoop/" + + configs = { + "fs.default.name": "hdfs://xxx.hadoop.com:54310", + "hadoop.job.ugi": "hello,hello123" + } + + client = HDFSClient(hadoop_home, configs) + + client.ls("/user/com/train-25") + files = client.lsr("/user/com/train-25/models") + """ + + def __init__(self, hadoop_home, configs): + self.pre_commands = [] + hadoop_bin = '%s/bin/hadoop' % hadoop_home + self.pre_commands.append(hadoop_bin) + dfs = 'fs' + self.pre_commands.append(dfs) + + for k, v in configs.iteritems(): + config_command = '-D%s=%s' % (k, v) + self.pre_commands.append(config_command) + + def __run_hdfs_cmd(self, commands, retry_times=5): + whole_commands = copy.deepcopy(self.pre_commands) + whole_commands.extend(commands) + + ret_code = 0 + ret_out = None + ret_err = None + whole_commands = " ".join(whole_commands) + for x in range(retry_times + 1): + proc = subprocess.Popen( + whole_commands, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True) + (output, errors) = proc.communicate() + ret_code, ret_out, ret_err = proc.returncode, output, errors + + _logger.info( + 'Times: %d, Running command: %s. Return code: %d, Error: %s' % + (x, whole_commands, proc.returncode, errors)) + + if ret_code == 0: + break + + return ret_code, ret_out, ret_err + + def is_exist(self, hdfs_path=None): + """ + whether the remote HDFS path exists + + Args: + hdfs_path(str): the hdfs file path + + Returns: + True or False + """ + exist_cmd = ['-test', '-e', hdfs_path] + returncode, output, errors = self.__run_hdfs_cmd( + exist_cmd, retry_times=1) + + if returncode: + _logger.error("HDFS is_exist HDFS path: {} failed".format( + hdfs_path)) + return False + else: + _logger.info("HDFS is_exist HDFS path: {} successfully".format( + hdfs_path)) + return True + + def is_dir(self, hdfs_path=None): + """ + whether the remote HDFS path is directory + + Args: + hdfs_path(str): the hdfs file path + + Returns: + True or False + """ + + if not self.is_exist(hdfs_path): + return False + + dir_cmd = ['-test', '-d', hdfs_path] + returncode, output, errors = self.__run_hdfs_cmd(dir_cmd, retry_times=1) + + if returncode: + _logger.error("HDFS path: {} failed is not a directory".format( + hdfs_path)) + return False + else: + _logger.info("HDFS path: {} successfully is a directory".format( + hdfs_path)) + return True + + def delete(self, hdfs_path): + """ + Remove a file or directory from HDFS. + + whether the remote HDFS path exists + + Args: + hdfs_path: HDFS path. + + Returns: + True or False + This function returns `True` if the deletion was successful and `False` if + no file or directory previously existed at `hdfs_path`. + """ + _logger.info('Deleting %r.', hdfs_path) + + if not self.is_exist(hdfs_path): + _logger.warn("HDFS path: {} do not exist".format(hdfs_path)) + return True + + if self.is_dir(hdfs_path): + del_cmd = ['-rmr', hdfs_path] + else: + del_cmd = ['-rm', hdfs_path] + + returncode, output, errors = self.__run_hdfs_cmd(del_cmd, retry_times=0) + + if returncode: + _logger.error("HDFS path: {} delete files failure".format( + hdfs_path)) + return False + else: + _logger.info("HDFS path: {} delete files successfully".format( + hdfs_path)) + return True + + def rename(self, hdfs_src_path, hdfs_dst_path, overwrite=False): + """ + Move a file or folder on HDFS. + + Args: + hdfs_path(str): HDFS path. + overwrite(bool|False): If the path already exists and overwrite is False, will return False. + + Returns: + True or False + """ + assert hdfs_src_path is not None + assert hdfs_dst_path is not None + + if not self.is_exist(hdfs_src_path): + _logger.info("HDFS path do not exist: {}".format(hdfs_src_path)) + if self.is_exist(hdfs_dst_path) and not overwrite: + _logger.error("HDFS path is exist: {} and overwrite=False".format( + hdfs_dst_path)) + + rename_command = ['-mv', hdfs_src_path, hdfs_dst_path] + returncode, output, errors = self.__run_hdfs_cmd( + rename_command, retry_times=1) + + if returncode: + _logger.error("HDFS rename path: {} to {} failed".format( + hdfs_src_path, hdfs_dst_path)) + return False + else: + _logger.info("HDFS rename path: {} to {} successfully".format( + hdfs_src_path, hdfs_dst_path)) + return True + + @staticmethod + def make_local_dirs(local_path): + """ + create a directiory local, is same to mkdir + Args: + local_path: local path that wants to create a directiory. + """ + try: + os.makedirs(local_path) + except OSError as e: + if e.errno != errno.EEXIST: + raise + + def makedirs(self, hdfs_path): + """ + Create a remote directory, recursively if necessary. + + Args: + hdfs_path(str): Remote path. Intermediate directories will be created appropriately. + + Returns: + True or False + """ + _logger.info('Creating directories to %r.', hdfs_path) + assert hdfs_path is not None + + if self.is_exist(hdfs_path): + _logger.error("HDFS path is exist: {}".format(hdfs_path)) + return + + mkdirs_commands = ['-mkdir', hdfs_path] + returncode, output, errors = self.__run_hdfs_cmd( + mkdirs_commands, retry_times=1) + + if returncode: + _logger.error("HDFS mkdir path: {} failed".format(hdfs_path)) + return False + else: + _logger.error("HDFS mkdir path: {} successfully".format(hdfs_path)) + return True + + def ls(self, hdfs_path): + """ + ls directory contents about HDFS hdfs_path + + Args: + hdfs_path(str): Remote HDFS path will be ls. + + Returns: + List: a contents list about hdfs_path. + """ + assert hdfs_path is not None + + if not self.is_exist(hdfs_path): + return [] + + ls_commands = ['-ls', hdfs_path] + returncode, output, errors = self.__run_hdfs_cmd( + ls_commands, retry_times=1) + + if returncode: + _logger.error("HDFS list path: {} failed".format(hdfs_path)) + return [] + else: + _logger.info("HDFS list path: {} successfully".format(hdfs_path)) + + ret_lines = [] + regex = re.compile('\s+') + out_lines = output.strip().split("\n") + for line in out_lines: + re_line = regex.split(line) + if len(re_line) == 8: + ret_lines.append(re_line[7]) + return ret_lines + + def lsr(self, hdfs_path, excludes=[]): + """ + list directory contents about HDFS hdfs_path recursively + + Args: + hdfs_path(str): Remote HDFS path. + only_file(bool|True): will discard folders. + sort(bool|True): will be sorted by create time. + + Returns: + List: a contents list about hdfs_path. + """ + + assert hdfs_path is not None + + if not self.is_exist(hdfs_path): + return [] + + ls_commands = ['-lsr', hdfs_path] + returncode, output, errors = self.__run_hdfs_cmd( + ls_commands, retry_times=1) + + if returncode: + _logger.error("HDFS list all files: {} failed".format(hdfs_path)) + return [] + else: + _logger.info("HDFS list all files: {} successfully".format( + hdfs_path)) + lines = [] + regex = re.compile('\s+') + out_lines = output.strip().split("\n") + for line_id, line in enumerate(out_lines): + re_line = regex.split(line) + if len(re_line) == 8: + if re_line[0][0] == "d": + continue + if re_line[7] in excludes: + continue + else: + lines.append((re_line[7], re_line[5] + " " + re_line[6], + line_id)) + lines = sorted(lines, key=lambda line: line[2]) + ret_lines = [ret[0] for ret in lines] + return ret_lines + + @staticmethod + def split_flies(files, trainer_id, trainers): + remainder = len(files) % trainers + blocksize = len(files) / trainers + + blocks = [blocksize] * trainers + for i in range(remainder): + blocks[i] += 1 + + trainer_files = [[]] * trainers + begin = 0 + for i in range(trainers): + trainer_files[i] = files[begin:begin + blocks[i]] + begin += blocks[i] + + return trainer_files[trainer_id] + + def download(self, + hdfs_path, + local_path, + multi_processes=5, + overwrite=False, + retry_times=5): + """ + Download files from HDFS using multi process. + + Args: + hdfs_path(str): path on hdfs + local_path(str): path on local + multi_processes(int|5): the download data process at the same time, default=5 + + Returns: + List: + Download files in local folder. + """ + + def __subprocess_download(local_path, datas): + """ + download file from HDFS + + Args: + hdfs_path(str): the hdfs file path + local_path(str): the local file path + overwrite(bool|None): will overwrite the file on HDFS or not + retry_times(int|5): retry times + + Returns: + True or False + """ + for data in datas: + download_commands = ["-get", data, local_path] + + returncode, output, errors = self.__run_hdfs_cmd( + download_commands, retry_times=retry_times) + + if returncode: + _logger.error( + "Get local path: {} from HDFS path: {} failed".format( + local_path, hdfs_path)) + return False + return True + + self.make_local_dirs(local_path) + + all_files = client.ls(hdfs_path) + + procs = [] + for i in range(multi_processes): + process_datas = HDFSClient.split_flies(all_files, i, + multi_processes) + p = multiprocessing.Process( + target=__subprocess_download, + args=( + local_path, + process_datas, )) + procs.append(p) + p.start() + + # complete the processes + for proc in procs: + proc.join() + + _logger.info("Finish {} multi process to download datas".format( + multi_processes)) + + local_downloads = [] + for dirname, folder, files in os.walk(local_path): + for i in files: + t = os.path.join(dirname, i) + local_downloads.append(t) + return local_downloads + + def upload(self, + hdfs_path, + local_path, + multi_processes=5, + overwrite=False, + retry_times=5): + """ + Upload files to HDFS using multi process. + + Args: + hdfs_path(str): path on hdfs + local_path(str): path on local + multi_processes(int|5): the upload data process at the same time, default=5 + overwrite(bool|False): will overwrite file on HDFS or not + sync(bool|True): upload files sync or not. + + Returns: + None + """ + + def __subprocess_upload(hdfs_path_single, datas): + for data in datas: + put_commands = ["-put", data, hdfs_path_single] + returncode, output, errors = self.__run_hdfs_cmd(put_commands, + retry_times) + + if returncode: + _logger.error("Put local path: {} to HDFS path: {} failed". + format(data, hdfs_path_single)) + return False + return True + + def get_local_files(path): + rlist = [] + + if not os.path.exists(path): + return rlist + + if os.path.isdir(path): + for file in os.listdir(path): + t = os.path.join(path, file) + rlist.append(t) + else: + rlist.append(path) + return rlist + + all_files = get_local_files(local_path) + if not all_files: + _logger.info("there are nothing need to upload, exit") + return + + if self.is_exist(hdfs_path) and overwrite: + self.delete(hdfs_path) + self.makedirs(hdfs_path) + + procs = [] + for i in range(multi_processes): + process_datas = HDFSClient.split_flies(all_files, i, + multi_processes) + p = multiprocessing.Process( + target=__subprocess_upload, args=( + hdfs_path, + process_datas, )) + procs.append(p) + p.start() + + # complete the processes + for proc in procs: + proc.join() + + _logger.info("Finish upload datas from {} to {}".format(local_path, + hdfs_path)) + + +if __name__ == "__main__": + hadoop_home = "/home/client/hadoop-client/hadoop/" + + configs = { + "fs.default.name": "hdfs://xxx.hadoop.com:54310", + "hadoop.job.ugi": "hello,hello123" + } + + client = HDFSClient(hadoop_home, configs) + + client.ls("/user/com/train-25") + files = client.lsr("/user/com/train-25/models") diff --git a/python/paddle/fluid/transpiler/details/program_utils.py b/python/paddle/fluid/transpiler/details/program_utils.py index 65807d1b9f60a51136facd9cd295dbecdc71d858..dc78ffe70b3dfda75a799583e85b76d8d921e078 100644 --- a/python/paddle/fluid/transpiler/details/program_utils.py +++ b/python/paddle/fluid/transpiler/details/program_utils.py @@ -90,7 +90,7 @@ def variable_to_code(var): return var_str -def op_to_code(op, skip_op_callstack=False): +def op_to_code(op, skip_op_callstack=True): """ Get readable codes of fluid operator. @@ -187,7 +187,7 @@ def block_to_code(block, block_idx, fout=None, skip_op_callstack=False): print("{0}{1}".format(get_indent_space(indent), '}'), file=fout) -def program_to_code(prog, fout=None, skip_op_callstack=False): +def program_to_code(prog, fout=None, skip_op_callstack=True): """ Print readable codes of fluid program.