Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
models
提交
fedaad1b
M
models
项目概览
PaddlePaddle
/
models
1 年多 前同步成功
通知
222
Star
6828
Fork
2962
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
602
列表
看板
标记
里程碑
合并请求
255
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
models
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
602
Issue
602
列表
看板
标记
里程碑
合并请求
255
合并请求
255
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
fedaad1b
编写于
12月 05, 2018
作者:
T
tangwei12
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
remove hdfs utils
上级
4ff282b0
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
0 addition
and
499 deletion
+0
-499
fluid/PaddleRec/ctr/hdfs_utils.py
fluid/PaddleRec/ctr/hdfs_utils.py
+0
-499
未找到文件。
fluid/PaddleRec/ctr/hdfs_utils.py
已删除
100644 → 0
浏览文件 @
4ff282b0
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""HDFS Utils"""
import
os
import
sys
import
subprocess
import
multiprocessing
from
datetime
import
datetime
import
re
import
copy
import
errno
import
logging
__all__
=
[
"HDFSClient"
,
"multi_download"
]
logging
.
basicConfig
(
format
=
'%(asctime)s - %(levelname)s - %(message)s'
)
_logger
=
logging
.
getLogger
(
"hdfs_utils"
)
_logger
.
setLevel
(
logging
.
INFO
)
class
HDFSClient
(
object
):
def
__init__
(
self
,
hadoop_home
,
configs
):
self
.
pre_commands
=
[]
hadoop_bin
=
'%s/bin/hadoop'
%
hadoop_home
self
.
pre_commands
.
append
(
hadoop_bin
)
dfs
=
'fs'
self
.
pre_commands
.
append
(
dfs
)
for
k
,
v
in
configs
.
iteritems
():
config_command
=
'-D%s=%s'
%
(
k
,
v
)
self
.
pre_commands
.
append
(
config_command
)
def
__run_hdfs_cmd
(
self
,
commands
,
retry_times
=
5
):
whole_commands
=
copy
.
deepcopy
(
self
.
pre_commands
)
whole_commands
.
extend
(
commands
)
print
(
'Running system command: {0}'
.
format
(
' '
.
join
(
whole_commands
)))
ret_code
=
0
ret_out
=
None
ret_err
=
None
for
x
in
range
(
retry_times
+
1
):
proc
=
subprocess
.
Popen
(
whole_commands
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
PIPE
)
(
output
,
errors
)
=
proc
.
communicate
()
ret_code
,
ret_out
,
ret_err
=
proc
.
returncode
,
output
,
errors
if
ret_code
:
_logger
.
warn
(
'Times: %d, Error running command: %s. Return code: %d, Error: %s'
%
(
x
,
' '
.
join
(
whole_commands
),
proc
.
returncode
,
errors
))
else
:
break
return
ret_code
,
ret_out
,
ret_err
def
upload
(
self
,
hdfs_path
,
local_path
,
overwrite
=
False
,
retry_times
=
5
):
"""
upload the local file to hdfs
args:
local_file_path: the local file path
remote_file_path: default value(${OUTPUT_PATH}/${SYS_USER_ID}/${SYS_JOB_ID}/tmp)
return:
True or False
"""
assert
hdfs_path
is
not
None
assert
local_path
is
not
None
and
os
.
path
.
exists
(
local_path
)
if
os
.
path
.
isdir
(
local_path
):
_logger
.
warn
(
"The Local path: {} is dir and I will support it later, return"
.
format
(
local_path
))
return
base
=
os
.
path
.
basename
(
local_path
)
if
not
self
.
is_exist
(
hdfs_path
):
self
.
makedirs
(
hdfs_path
)
else
:
if
self
.
is_exist
(
os
.
path
.
join
(
hdfs_path
,
base
)):
if
overwrite
:
_logger
.
error
(
"The HDFS path: {} is exist and overwrite is True, delete it"
.
format
(
hdfs_path
))
self
.
delete
(
hdfs_path
)
else
:
_logger
.
error
(
"The HDFS path: {} is exist and overwrite is False, return"
.
format
(
hdfs_path
))
return
False
put_commands
=
[
"-put"
,
local_path
,
hdfs_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
put_commands
,
retry_times
)
if
returncode
:
_logger
.
error
(
"Put local path: {} to HDFS path: {} failed"
.
format
(
local_path
,
hdfs_path
))
return
False
else
:
_logger
.
info
(
"Put local path: {} to HDFS path: {} successfully"
.
format
(
local_path
,
hdfs_path
))
return
True
def
download
(
self
,
hdfs_path
,
local_path
,
overwrite
=
False
,
unzip
=
False
):
"""
download from hdfs
args:
local_file_path: the local file path
remote_file_path: remote dir on hdfs
return:
True or False
"""
_logger
.
info
(
'Downloading %r to %r.'
,
hdfs_path
,
local_path
)
_logger
.
info
(
'Download of %s to %r complete.'
,
hdfs_path
,
local_path
)
if
not
self
.
is_exist
(
hdfs_path
):
print
(
"HDFS path: {} do not exist"
.
format
(
hdfs_path
))
return
False
if
self
.
is_dir
(
hdfs_path
):
_logger
.
error
(
"The HDFS path: {} is dir and I will support it later, return"
.
format
(
hdfs_path
))
if
os
.
path
.
exists
(
local_path
):
base
=
os
.
path
.
basename
(
hdfs_path
)
local_file
=
os
.
path
.
join
(
local_path
,
base
)
if
os
.
path
.
exists
(
local_file
):
if
overwrite
:
os
.
remove
(
local_file
)
else
:
_logger
.
error
(
"The Local path: {} is exist and overwrite is False, return"
.
format
(
local_file
))
return
False
self
.
make_local_dirs
(
local_path
)
download_commands
=
[
"-get"
,
hdfs_path
,
local_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
download_commands
)
if
returncode
:
_logger
.
error
(
"Get local path: {} from HDFS path: {} failed"
.
format
(
local_path
,
hdfs_path
))
return
False
else
:
_logger
.
info
(
"Get local path: {} from HDFS path: {} successfully"
.
format
(
local_path
,
hdfs_path
))
return
True
def
is_exist
(
self
,
hdfs_path
=
None
):
"""
whether the remote hdfs path exists?
args:
remote_file_path: default value(${OUTPUT_PATH}/${SYS_USER_ID}/${SYS_JOB_ID}/tmp)
fs_name: The default values are the same as in the job configuration
fs_ugi: The default values are the same as in the job configuration
return:
True or False
"""
exist_cmd
=
[
'-test'
,
'-e'
,
hdfs_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
exist_cmd
,
retry_times
=
1
)
if
returncode
:
_logger
.
error
(
"HDFS is_exist HDFS path: {} failed"
.
format
(
hdfs_path
))
return
False
else
:
_logger
.
info
(
"HDFS is_exist HDFS path: {} successfully"
.
format
(
hdfs_path
))
return
True
def
is_dir
(
self
,
hdfs_path
=
None
):
"""
whether the remote hdfs path exists?
args:
remote_file_path: default value(${OUTPUT_PATH}/${SYS_USER_ID}/${SYS_JOB_ID}/tmp)
fs_name: The default values are the same as in the job configuration
fs_ugi: The default values are the same as in the job configuration
return:
True or False
"""
if
not
self
.
is_exist
(
hdfs_path
):
return
False
dir_cmd
=
[
'-test'
,
'-d'
,
hdfs_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
dir_cmd
,
retry_times
=
1
)
if
returncode
:
_logger
.
error
(
"HDFS path: {} failed is not a directory"
.
format
(
hdfs_path
))
return
False
else
:
_logger
.
info
(
"HDFS path: {} successfully is a directory"
.
format
(
hdfs_path
))
return
True
def
delete
(
self
,
hdfs_path
):
"""Remove a file or directory from HDFS.
:param hdfs_path: HDFS path.
:param recursive: Recursively delete files and directories. By default,
this method will raise an :class:`HdfsError` if trying to delete a
non-empty directory.
This function returns `True` if the deletion was successful and `False` if
no file or directory previously existed at `hdfs_path`.
"""
_logger
.
info
(
'Deleting %r.'
,
hdfs_path
)
if
not
self
.
is_exist
(
hdfs_path
):
_logger
.
warn
(
"HDFS path: {} do not exist"
.
format
(
hdfs_path
))
return
True
if
self
.
is_dir
(
hdfs_path
):
del_cmd
=
[
'-rmr'
,
hdfs_path
]
else
:
del_cmd
=
[
'-rm'
,
hdfs_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
del_cmd
,
retry_times
=
0
)
if
returncode
:
_logger
.
error
(
"HDFS path: {} delete files failure"
.
format
(
hdfs_path
))
return
False
else
:
_logger
.
info
(
"HDFS path: {} delete files successfully"
.
format
(
hdfs_path
))
return
True
def
rename
(
self
,
hdfs_src_path
,
hdfs_dst_path
,
overwrite
=
False
):
"""Move a file or folder.
:param hdfs_src_path: Source path.
:param hdfs_dst_path: Destination path. If the path already exists and is
a directory, the source will be moved into it. If the path exists and is
a file, or if a parent destination directory is missing, this method will
raise an :class:`HdfsError`.
"""
assert
hdfs_src_path
is
not
None
assert
hdfs_dst_path
is
not
None
if
not
self
.
is_exist
(
hdfs_src_path
):
_logger
.
info
(
"HDFS path do not exist: {}"
.
format
(
hdfs_src_path
))
if
self
.
is_exist
(
hdfs_dst_path
)
and
not
overwrite
:
_logger
.
error
(
"HDFS path is exist: {} and overwrite=False"
.
format
(
hdfs_dst_path
))
rename_command
=
[
'-mv'
,
hdfs_src_path
,
hdfs_dst_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
rename_command
,
retry_times
=
1
)
if
returncode
:
_logger
.
error
(
"HDFS rename path: {} to {} failed"
.
format
(
hdfs_src_path
,
hdfs_dst_path
))
return
False
else
:
_logger
.
info
(
"HDFS rename path: {} to {} successfully"
.
format
(
hdfs_src_path
,
hdfs_dst_path
))
return
True
@
staticmethod
def
make_local_dirs
(
local_path
):
try
:
os
.
makedirs
(
local_path
)
except
OSError
as
e
:
if
e
.
errno
!=
errno
.
EEXIST
:
raise
def
makedirs
(
self
,
hdfs_path
):
"""Create a remote directory, recursively if necessary.
:param hdfs_path: Remote path. Intermediate directories will be created
appropriately.
"""
_logger
.
info
(
'Creating directories to %r.'
,
hdfs_path
)
assert
hdfs_path
is
not
None
if
self
.
is_exist
(
hdfs_path
):
_logger
.
error
(
"HDFS path is exist: {}"
.
format
(
hdfs_path
))
return
mkdirs_commands
=
[
'-mkdir'
,
hdfs_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
mkdirs_commands
,
retry_times
=
1
)
if
returncode
:
_logger
.
error
(
"HDFS mkdir path: {} failed"
.
format
(
hdfs_path
))
return
False
else
:
_logger
.
error
(
"HDFS mkdir path: {} successfully"
.
format
(
hdfs_path
))
return
True
def
ls
(
self
,
hdfs_path
):
assert
hdfs_path
is
not
None
if
not
self
.
is_exist
(
hdfs_path
):
return
[]
ls_commands
=
[
'-ls'
,
hdfs_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
ls_commands
,
retry_times
=
1
)
if
returncode
:
_logger
.
error
(
"HDFS list path: {} failed"
.
format
(
hdfs_path
))
return
[]
else
:
_logger
.
info
(
"HDFS list path: {} successfully"
.
format
(
hdfs_path
))
ret_lines
=
[]
regex
=
re
.
compile
(
'\s+'
)
out_lines
=
output
.
strip
().
split
(
"
\n
"
)
for
line
in
out_lines
:
re_line
=
regex
.
split
(
line
)
if
len
(
re_line
)
==
8
:
ret_lines
.
append
(
re_line
[
7
])
return
ret_lines
def
lsr
(
self
,
hdfs_path
,
only_file
=
True
,
sort
=
True
):
def
sort_by_time
(
v1
,
v2
):
v1_time
=
datetime
.
strptime
(
v1
[
1
],
'%Y-%m-%d %H:%M'
)
v2_time
=
datetime
.
strptime
(
v2
[
1
],
'%Y-%m-%d %H:%M'
)
return
v1_time
>
v2_time
assert
hdfs_path
is
not
None
if
not
self
.
is_exist
(
hdfs_path
):
return
[]
ls_commands
=
[
'-lsr'
,
hdfs_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
ls_commands
,
retry_times
=
1
)
if
returncode
:
_logger
.
error
(
"HDFS list all files: {} failed"
.
format
(
hdfs_path
))
return
[]
else
:
_logger
.
info
(
"HDFS list all files: {} successfully"
.
format
(
hdfs_path
))
lines
=
[]
regex
=
re
.
compile
(
'\s+'
)
out_lines
=
output
.
strip
().
split
(
"
\n
"
)
for
line
in
out_lines
:
re_line
=
regex
.
split
(
line
)
if
len
(
re_line
)
==
8
:
if
only_file
and
re_line
[
0
][
0
]
==
"d"
:
continue
else
:
lines
.
append
((
re_line
[
7
],
re_line
[
5
]
+
" "
+
re_line
[
6
]))
if
sort
:
sorted
(
lines
,
cmp
=
sort_by_time
)
ret_lines
=
[
ret
[
0
]
for
ret
in
lines
]
return
ret_lines
def
multi_download
(
client
,
hdfs_path
,
local_path
,
trainer_id
,
trainers
,
multi_processes
=
5
):
"""
multi_download
:param client: instance of HDFSClient
:param hdfs_path: path on hdfs
:param local_path: path on local
:param trainer_id: current trainer id
:param trainers: all trainers number
:param multi_processes: the download data process at the same time, default=5
:return: None
"""
def
__subprocess_download
(
datas
):
for
data
in
datas
:
re_path
=
os
.
path
.
relpath
(
os
.
path
.
dirname
(
data
),
hdfs_path
)
local_re_path
=
os
.
path
.
join
(
local_path
,
re_path
)
client
.
download
(
data
,
local_re_path
)
assert
isinstance
(
client
,
HDFSClient
)
client
.
make_local_dirs
(
local_path
)
_logger
.
info
(
"Make local dir {} successfully"
.
format
(
local_path
))
all_need_download
=
client
.
lsr
(
hdfs_path
,
sort
=
True
)
need_download
=
all_need_download
[
trainer_id
::
trainers
]
_logger
.
info
(
"Get {} files From all {} files need to be download from {}"
.
format
(
len
(
need_download
),
len
(
all_need_download
),
hdfs_path
))
_logger
.
info
(
"Start {} multi process to download datas"
.
format
(
multi_processes
))
procs
=
[]
for
i
in
range
(
multi_processes
):
process_datas
=
need_download
[
i
::
multi_processes
]
p
=
multiprocessing
.
Process
(
target
=
__subprocess_download
,
args
=
(
process_datas
,))
procs
.
append
(
p
)
p
.
start
()
# complete the processes
for
proc
in
procs
:
proc
.
join
()
_logger
.
info
(
"Finish {} multi process to download datas"
.
format
(
multi_processes
))
local_downloads
=
[]
for
data
in
need_download
:
data_name
=
os
.
path
.
basename
(
data
)
re_path
=
os
.
path
.
relpath
(
os
.
path
.
dirname
(
data
),
hdfs_path
)
if
re_path
==
"."
:
local_re_path
=
os
.
path
.
join
(
local_path
,
data_name
)
else
:
local_re_path
=
os
.
path
.
join
(
local_path
,
re_path
,
data_name
)
local_downloads
.
append
(
local_re_path
)
return
local_downloads
def
getfilelist
(
path
):
rlist
=
[]
for
dir
,
folder
,
file
in
os
.
walk
(
path
):
for
i
in
file
:
t
=
os
.
path
.
join
(
dir
,
i
)
rlist
.
append
(
t
)
for
r
in
rlist
:
print
(
r
)
def
multi_upload
(
client
,
hdfs_path
,
local_path
,
multi_processes
=
5
,
overwrite
=
False
,
sync
=
True
):
"""
:param client:
:param hdfs_path:
:param local_path:
:param sync:
:return:
"""
def
__subprocess_upload
(
datas
):
for
data
in
datas
:
re_path
=
os
.
path
.
relpath
(
os
.
path
.
dirname
(
data
),
local_path
)
hdfs_re_path
=
os
.
path
.
join
(
hdfs_path
,
re_path
)
client
.
upload
(
hdfs_re_path
,
data
,
overwrite
,
retry_times
=
5
)
def
get_local_files
(
path
):
rlist
=
[]
if
not
os
.
path
.
isdir
(
path
):
return
rlist
for
dirname
,
folder
,
files
in
os
.
walk
(
path
):
for
i
in
files
:
t
=
os
.
path
.
join
(
dirname
,
i
)
rlist
.
append
(
t
)
return
rlist
assert
isinstance
(
client
,
HDFSClient
)
all_files
=
get_local_files
(
local_path
)
if
not
all_files
:
_logger
.
info
(
"there are nothing need to upload, exit"
)
return
_logger
.
info
(
"Start {} multi process to upload datas"
.
format
(
multi_processes
))
procs
=
[]
for
i
in
range
(
multi_processes
):
process_datas
=
all_files
[
i
::
multi_processes
]
p
=
multiprocessing
.
Process
(
target
=
__subprocess_upload
,
args
=
(
process_datas
,))
procs
.
append
(
p
)
p
.
start
()
# complete the processes
for
proc
in
procs
:
proc
.
join
()
_logger
.
info
(
"Finish {} multi process to upload datas"
.
format
(
multi_processes
))
if
__name__
==
"__main__"
:
pass
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录