未验证 提交 d8458483 编写于 作者: T tangwei12 提交者: GitHub

do some odd jobs (#18641)

do some odd jobs, test=develop
上级 7e3963f2
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""HDFS Utils""" """hdfs_utils.py will move to fluid/incubate/fleet/utils/hdfs.py"""
import os import os
import sys import sys
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""lookup_table_utils.py will move to fluid/incubate/fleet/utils/lookup_table.py"""
from __future__ import print_function from __future__ import print_function
......
...@@ -157,19 +157,26 @@ class Fleet(object): ...@@ -157,19 +157,26 @@ class Fleet(object):
Returns: Returns:
list: files belongs to this worker. list: files belongs to this worker.
""" """
file_num = len(files)
trainer_id = self.worker_index() trainer_id = self.worker_index()
trainer_num = self.worker_num() trainers = self.worker_num()
if trainer_num > file_num:
raise ValueError("trainer_num should be <= file_num : " if len(files) < trainers:
"%s > %s" % (trainer_num, file_num)) raise ValueError("file number must gather or equal trainer number")
start = 0
end = 0 remainder = len(files) % trainers
for i in range(0, trainer_id + 1): blocksize = len(files) / trainers
length = file_num / trainer_num + (i < (file_num % trainer_num))
start = end blocks = [blocksize] * trainers
end += length for i in range(remainder):
return files[start:end] blocks[i] += 1
trainer_files = [[]] * trainers
begin = 0
for i in range(trainers):
trainer_files[i] = files[begin:begin + blocks[i]]
begin += blocks[i]
return trainer_files[trainer_id]
def init(self, role_maker=None): def init(self, role_maker=None):
""" """
......
...@@ -102,6 +102,11 @@ class RoleMakerBase(object): ...@@ -102,6 +102,11 @@ class RoleMakerBase(object):
""" """
return self._server_endpoints return self._server_endpoints
def to_string(self):
return "role: {}, current_id: {}, worker_endpoints: {}, server_endpoints: {}".format(
self._role, self._current_id, self._worker_endpoints,
self._server_endpoints)
class MPIRoleMaker(RoleMakerBase): class MPIRoleMaker(RoleMakerBase):
""" """
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""HDFS Utils"""
import os
import sys
import subprocess
import multiprocessing
from datetime import datetime
import re
import copy
import errno
import logging
from paddle.fluid.log_helper import get_logger
__all__ = ["HDFSClient"]
_logger = get_logger(
__name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s')
class HDFSClient(object):
"""
A tool of HDFS
Args:
hadoop_home (string): hadoop_home
configs (dict): hadoop config, it is a dict, please contain \
key "fs.default.name" and "hadoop.job.ugi"
Can be a float value
Examples:
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
client.ls("/user/com/train-25")
files = client.lsr("/user/com/train-25/models")
"""
def __init__(self, hadoop_home, configs):
self.pre_commands = []
hadoop_bin = '%s/bin/hadoop' % hadoop_home
self.pre_commands.append(hadoop_bin)
dfs = 'fs'
self.pre_commands.append(dfs)
for k, v in configs.iteritems():
config_command = '-D%s=%s' % (k, v)
self.pre_commands.append(config_command)
def __run_hdfs_cmd(self, commands, retry_times=5):
whole_commands = copy.deepcopy(self.pre_commands)
whole_commands.extend(commands)
ret_code = 0
ret_out = None
ret_err = None
whole_commands = " ".join(whole_commands)
for x in range(retry_times + 1):
proc = subprocess.Popen(
whole_commands,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
(output, errors) = proc.communicate()
ret_code, ret_out, ret_err = proc.returncode, output, errors
_logger.info(
'Times: %d, Running command: %s. Return code: %d, Error: %s' %
(x, whole_commands, proc.returncode, errors))
if ret_code == 0:
break
return ret_code, ret_out, ret_err
def is_exist(self, hdfs_path=None):
"""
whether the remote HDFS path exists
Args:
hdfs_path(str): the hdfs file path
Returns:
True or False
"""
exist_cmd = ['-test', '-e', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(
exist_cmd, retry_times=1)
if returncode:
_logger.error("HDFS is_exist HDFS path: {} failed".format(
hdfs_path))
return False
else:
_logger.info("HDFS is_exist HDFS path: {} successfully".format(
hdfs_path))
return True
def is_dir(self, hdfs_path=None):
"""
whether the remote HDFS path is directory
Args:
hdfs_path(str): the hdfs file path
Returns:
True or False
"""
if not self.is_exist(hdfs_path):
return False
dir_cmd = ['-test', '-d', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(dir_cmd, retry_times=1)
if returncode:
_logger.error("HDFS path: {} failed is not a directory".format(
hdfs_path))
return False
else:
_logger.info("HDFS path: {} successfully is a directory".format(
hdfs_path))
return True
def delete(self, hdfs_path):
"""
Remove a file or directory from HDFS.
whether the remote HDFS path exists
Args:
hdfs_path: HDFS path.
Returns:
True or False
This function returns `True` if the deletion was successful and `False` if
no file or directory previously existed at `hdfs_path`.
"""
_logger.info('Deleting %r.', hdfs_path)
if not self.is_exist(hdfs_path):
_logger.warn("HDFS path: {} do not exist".format(hdfs_path))
return True
if self.is_dir(hdfs_path):
del_cmd = ['-rmr', hdfs_path]
else:
del_cmd = ['-rm', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(del_cmd, retry_times=0)
if returncode:
_logger.error("HDFS path: {} delete files failure".format(
hdfs_path))
return False
else:
_logger.info("HDFS path: {} delete files successfully".format(
hdfs_path))
return True
def rename(self, hdfs_src_path, hdfs_dst_path, overwrite=False):
"""
Move a file or folder on HDFS.
Args:
hdfs_path(str): HDFS path.
overwrite(bool|False): If the path already exists and overwrite is False, will return False.
Returns:
True or False
"""
assert hdfs_src_path is not None
assert hdfs_dst_path is not None
if not self.is_exist(hdfs_src_path):
_logger.info("HDFS path do not exist: {}".format(hdfs_src_path))
if self.is_exist(hdfs_dst_path) and not overwrite:
_logger.error("HDFS path is exist: {} and overwrite=False".format(
hdfs_dst_path))
rename_command = ['-mv', hdfs_src_path, hdfs_dst_path]
returncode, output, errors = self.__run_hdfs_cmd(
rename_command, retry_times=1)
if returncode:
_logger.error("HDFS rename path: {} to {} failed".format(
hdfs_src_path, hdfs_dst_path))
return False
else:
_logger.info("HDFS rename path: {} to {} successfully".format(
hdfs_src_path, hdfs_dst_path))
return True
@staticmethod
def make_local_dirs(local_path):
"""
create a directiory local, is same to mkdir
Args:
local_path: local path that wants to create a directiory.
"""
try:
os.makedirs(local_path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
def makedirs(self, hdfs_path):
"""
Create a remote directory, recursively if necessary.
Args:
hdfs_path(str): Remote path. Intermediate directories will be created appropriately.
Returns:
True or False
"""
_logger.info('Creating directories to %r.', hdfs_path)
assert hdfs_path is not None
if self.is_exist(hdfs_path):
_logger.error("HDFS path is exist: {}".format(hdfs_path))
return
mkdirs_commands = ['-mkdir', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(
mkdirs_commands, retry_times=1)
if returncode:
_logger.error("HDFS mkdir path: {} failed".format(hdfs_path))
return False
else:
_logger.error("HDFS mkdir path: {} successfully".format(hdfs_path))
return True
def ls(self, hdfs_path):
"""
ls directory contents about HDFS hdfs_path
Args:
hdfs_path(str): Remote HDFS path will be ls.
Returns:
List: a contents list about hdfs_path.
"""
assert hdfs_path is not None
if not self.is_exist(hdfs_path):
return []
ls_commands = ['-ls', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(
ls_commands, retry_times=1)
if returncode:
_logger.error("HDFS list path: {} failed".format(hdfs_path))
return []
else:
_logger.info("HDFS list path: {} successfully".format(hdfs_path))
ret_lines = []
regex = re.compile('\s+')
out_lines = output.strip().split("\n")
for line in out_lines:
re_line = regex.split(line)
if len(re_line) == 8:
ret_lines.append(re_line[7])
return ret_lines
def lsr(self, hdfs_path, excludes=[]):
"""
list directory contents about HDFS hdfs_path recursively
Args:
hdfs_path(str): Remote HDFS path.
only_file(bool|True): will discard folders.
sort(bool|True): will be sorted by create time.
Returns:
List: a contents list about hdfs_path.
"""
assert hdfs_path is not None
if not self.is_exist(hdfs_path):
return []
ls_commands = ['-lsr', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(
ls_commands, retry_times=1)
if returncode:
_logger.error("HDFS list all files: {} failed".format(hdfs_path))
return []
else:
_logger.info("HDFS list all files: {} successfully".format(
hdfs_path))
lines = []
regex = re.compile('\s+')
out_lines = output.strip().split("\n")
for line_id, line in enumerate(out_lines):
re_line = regex.split(line)
if len(re_line) == 8:
if re_line[0][0] == "d":
continue
if re_line[7] in excludes:
continue
else:
lines.append((re_line[7], re_line[5] + " " + re_line[6],
line_id))
lines = sorted(lines, key=lambda line: line[2])
ret_lines = [ret[0] for ret in lines]
return ret_lines
@staticmethod
def split_flies(files, trainer_id, trainers):
remainder = len(files) % trainers
blocksize = len(files) / trainers
blocks = [blocksize] * trainers
for i in range(remainder):
blocks[i] += 1
trainer_files = [[]] * trainers
begin = 0
for i in range(trainers):
trainer_files[i] = files[begin:begin + blocks[i]]
begin += blocks[i]
return trainer_files[trainer_id]
def download(self,
hdfs_path,
local_path,
multi_processes=5,
overwrite=False,
retry_times=5):
"""
Download files from HDFS using multi process.
Args:
hdfs_path(str): path on hdfs
local_path(str): path on local
multi_processes(int|5): the download data process at the same time, default=5
Returns:
List:
Download files in local folder.
"""
def __subprocess_download(local_path, datas):
"""
download file from HDFS
Args:
hdfs_path(str): the hdfs file path
local_path(str): the local file path
overwrite(bool|None): will overwrite the file on HDFS or not
retry_times(int|5): retry times
Returns:
True or False
"""
for data in datas:
download_commands = ["-get", data, local_path]
returncode, output, errors = self.__run_hdfs_cmd(
download_commands, retry_times=retry_times)
if returncode:
_logger.error(
"Get local path: {} from HDFS path: {} failed".format(
local_path, hdfs_path))
return False
return True
self.make_local_dirs(local_path)
all_files = client.ls(hdfs_path)
procs = []
for i in range(multi_processes):
process_datas = HDFSClient.split_flies(all_files, i,
multi_processes)
p = multiprocessing.Process(
target=__subprocess_download,
args=(
local_path,
process_datas, ))
procs.append(p)
p.start()
# complete the processes
for proc in procs:
proc.join()
_logger.info("Finish {} multi process to download datas".format(
multi_processes))
local_downloads = []
for dirname, folder, files in os.walk(local_path):
for i in files:
t = os.path.join(dirname, i)
local_downloads.append(t)
return local_downloads
def upload(self,
hdfs_path,
local_path,
multi_processes=5,
overwrite=False,
retry_times=5):
"""
Upload files to HDFS using multi process.
Args:
hdfs_path(str): path on hdfs
local_path(str): path on local
multi_processes(int|5): the upload data process at the same time, default=5
overwrite(bool|False): will overwrite file on HDFS or not
sync(bool|True): upload files sync or not.
Returns:
None
"""
def __subprocess_upload(hdfs_path_single, datas):
for data in datas:
put_commands = ["-put", data, hdfs_path_single]
returncode, output, errors = self.__run_hdfs_cmd(put_commands,
retry_times)
if returncode:
_logger.error("Put local path: {} to HDFS path: {} failed".
format(data, hdfs_path_single))
return False
return True
def get_local_files(path):
rlist = []
if not os.path.exists(path):
return rlist
if os.path.isdir(path):
for file in os.listdir(path):
t = os.path.join(path, file)
rlist.append(t)
else:
rlist.append(path)
return rlist
all_files = get_local_files(local_path)
if not all_files:
_logger.info("there are nothing need to upload, exit")
return
if self.is_exist(hdfs_path) and overwrite:
self.delete(hdfs_path)
self.makedirs(hdfs_path)
procs = []
for i in range(multi_processes):
process_datas = HDFSClient.split_flies(all_files, i,
multi_processes)
p = multiprocessing.Process(
target=__subprocess_upload, args=(
hdfs_path,
process_datas, ))
procs.append(p)
p.start()
# complete the processes
for proc in procs:
proc.join()
_logger.info("Finish upload datas from {} to {}".format(local_path,
hdfs_path))
if __name__ == "__main__":
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
client.ls("/user/com/train-25")
files = client.lsr("/user/com/train-25/models")
...@@ -90,7 +90,7 @@ def variable_to_code(var): ...@@ -90,7 +90,7 @@ def variable_to_code(var):
return var_str return var_str
def op_to_code(op, skip_op_callstack=False): def op_to_code(op, skip_op_callstack=True):
""" """
Get readable codes of fluid operator. Get readable codes of fluid operator.
...@@ -187,7 +187,7 @@ def block_to_code(block, block_idx, fout=None, skip_op_callstack=False): ...@@ -187,7 +187,7 @@ def block_to_code(block, block_idx, fout=None, skip_op_callstack=False):
print("{0}{1}".format(get_indent_space(indent), '}'), file=fout) print("{0}{1}".format(get_indent_space(indent), '}'), file=fout)
def program_to_code(prog, fout=None, skip_op_callstack=False): def program_to_code(prog, fout=None, skip_op_callstack=True):
""" """
Print readable codes of fluid program. Print readable codes of fluid program.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册