Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
e58c705b
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
e58c705b
编写于
4月 22, 2021
作者:
T
tianshuo78520a
提交者:
GitHub
4月 22, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Delete WITH_GRPC flag and Distributed old code (#32383)
上级
bf0ec9b8
变更
20
隐藏空白更改
内联
并排
Showing
20 changed file
with
2 addition
and
1786 deletion
+2
-1786
CMakeLists.txt
CMakeLists.txt
+0
-4
cmake/configure.cmake
cmake/configure.cmake
+0
-4
paddle/fluid/pybind/pybind.cc
paddle/fluid/pybind/pybind.cc
+0
-5
paddle/scripts/paddle_build.sh
paddle/scripts/paddle_build.sh
+1
-5
paddle/testing/paddle_gtest_main.cc
paddle/testing/paddle_gtest_main.cc
+1
-2
python/paddle/fluid/__init__.py
python/paddle/fluid/__init__.py
+0
-1
python/paddle/fluid/contrib/__init__.py
python/paddle/fluid/contrib/__init__.py
+0
-6
python/paddle/fluid/contrib/reader/README.md
python/paddle/fluid/contrib/reader/README.md
+0
-25
python/paddle/fluid/contrib/reader/__init__.py
python/paddle/fluid/contrib/reader/__init__.py
+0
-20
python/paddle/fluid/contrib/reader/distributed_reader.py
python/paddle/fluid/contrib/reader/distributed_reader.py
+0
-65
python/paddle/fluid/contrib/tests/test_distributed_reader.py
python/paddle/fluid/contrib/tests/test_distributed_reader.py
+0
-45
python/paddle/fluid/contrib/utils/__init__.py
python/paddle/fluid/contrib/utils/__init__.py
+0
-23
python/paddle/fluid/contrib/utils/hdfs_utils.py
python/paddle/fluid/contrib/utils/hdfs_utils.py
+0
-603
python/paddle/fluid/contrib/utils/lookup_table_utils.py
python/paddle/fluid/contrib/utils/lookup_table_utils.py
+0
-496
python/paddle/fluid/incubate/data_generator/__init__.py
python/paddle/fluid/incubate/data_generator/__init__.py
+0
-375
python/paddle/fluid/incubate/data_generator/test_data_generator.py
...ddle/fluid/incubate/data_generator/test_data_generator.py
+0
-36
python/paddle/fluid/incubate/fleet/utils/fleet_barrier_util.py
...n/paddle/fluid/incubate/fleet/utils/fleet_barrier_util.py
+0
-56
python/paddle/fluid/tests/unittests/test_fleet_utils.py
python/paddle/fluid/tests/unittests/test_fleet_utils.py
+0
-10
python/paddle/incubate/__init__.py
python/paddle/incubate/__init__.py
+0
-2
python/setup.py.in
python/setup.py.in
+0
-3
未找到文件。
CMakeLists.txt
浏览文件 @
e58c705b
...
@@ -182,7 +182,6 @@ option(WITH_PSLIB "Compile with pslib support" OFF)
...
@@ -182,7 +182,6 @@ option(WITH_PSLIB "Compile with pslib support" OFF)
option
(
WITH_BOX_PS
"Compile with box_ps support"
OFF
)
option
(
WITH_BOX_PS
"Compile with box_ps support"
OFF
)
option
(
WITH_XBYAK
"Compile with xbyak support"
ON
)
option
(
WITH_XBYAK
"Compile with xbyak support"
ON
)
option
(
WITH_CONTRIB
"Compile the third-party contributation"
OFF
)
option
(
WITH_CONTRIB
"Compile the third-party contributation"
OFF
)
option
(
WITH_GRPC
"Use grpc as the default rpc framework"
${
WITH_DISTRIBUTE
}
)
option
(
WITH_PSCORE
"Compile with parameter server support"
${
WITH_DISTRIBUTE
}
)
option
(
WITH_PSCORE
"Compile with parameter server support"
${
WITH_DISTRIBUTE
}
)
option
(
WITH_HETERPS
"Compile with heterps"
OFF}
)
option
(
WITH_HETERPS
"Compile with heterps"
OFF}
)
option
(
WITH_INFERENCE_API_TEST
"Test fluid inference C++ high-level api interface"
OFF
)
option
(
WITH_INFERENCE_API_TEST
"Test fluid inference C++ high-level api interface"
OFF
)
...
@@ -259,9 +258,6 @@ endif()
...
@@ -259,9 +258,6 @@ endif()
if
(
WITH_BRPC_RDMA
)
if
(
WITH_BRPC_RDMA
)
message
(
STATUS
"Use brpc with rdma."
)
message
(
STATUS
"Use brpc with rdma."
)
if
(
WITH_GRPC
)
message
(
FATAL_ERROR
"Can't use grpc with brpc rdma."
)
endif
()
if
(
NOT WITH_DISTRIBUTE
)
if
(
NOT WITH_DISTRIBUTE
)
message
(
FATAL_ERROR
"Can't use brpc rdma in no distribute env."
)
message
(
FATAL_ERROR
"Can't use brpc rdma in no distribute env."
)
endif
()
endif
()
...
...
cmake/configure.cmake
浏览文件 @
e58c705b
...
@@ -177,10 +177,6 @@ if(WITH_HETERPS)
...
@@ -177,10 +177,6 @@ if(WITH_HETERPS)
add_definitions
(
-DPADDLE_WITH_HETERPS
)
add_definitions
(
-DPADDLE_WITH_HETERPS
)
endif
()
endif
()
if
(
WITH_GRPC
)
add_definitions
(
-DPADDLE_WITH_GRPC
)
endif
(
WITH_GRPC
)
if
(
WITH_BRPC_RDMA
)
if
(
WITH_BRPC_RDMA
)
add_definitions
(
-DPADDLE_WITH_BRPC_RDMA
)
add_definitions
(
-DPADDLE_WITH_BRPC_RDMA
)
endif
(
WITH_BRPC_RDMA
)
endif
(
WITH_BRPC_RDMA
)
...
...
paddle/fluid/pybind/pybind.cc
浏览文件 @
e58c705b
...
@@ -268,11 +268,6 @@ bool IsCompiledWithBrpc() {
...
@@ -268,11 +268,6 @@ bool IsCompiledWithBrpc() {
#ifndef PADDLE_WITH_DISTRIBUTE
#ifndef PADDLE_WITH_DISTRIBUTE
return
false
;
return
false
;
#endif
#endif
#ifdef PADDLE_WITH_GRPC
return
false
;
#endif
return
true
;
return
true
;
}
}
...
...
paddle/scripts/paddle_build.sh
浏览文件 @
e58c705b
...
@@ -227,7 +227,6 @@ function cmake_base() {
...
@@ -227,7 +227,6 @@ function cmake_base() {
fi
fi
distibuted_flag
=
${
WITH_DISTRIBUTE
:-
OFF
}
distibuted_flag
=
${
WITH_DISTRIBUTE
:-
OFF
}
grpc_flag
=
"OFF"
gloo_flag
=
${
distibuted_flag
}
gloo_flag
=
${
distibuted_flag
}
cat
<<
EOF
cat
<<
EOF
...
@@ -255,7 +254,6 @@ function cmake_base() {
...
@@ -255,7 +254,6 @@ function cmake_base() {
-DINFERENCE_DEMO_INSTALL_DIR=
${
INFERENCE_DEMO_INSTALL_DIR
}
-DINFERENCE_DEMO_INSTALL_DIR=
${
INFERENCE_DEMO_INSTALL_DIR
}
-DPY_VERSION=
${
PY_VERSION
:-
2
.7
}
-DPY_VERSION=
${
PY_VERSION
:-
2
.7
}
-DCMAKE_INSTALL_PREFIX=
${
INSTALL_PREFIX
:-
/paddle/build
}
-DCMAKE_INSTALL_PREFIX=
${
INSTALL_PREFIX
:-
/paddle/build
}
-DWITH_GRPC=
${
grpc_flag
}
-DWITH_PSCORE=
${
distibuted_flag
}
-DWITH_PSCORE=
${
distibuted_flag
}
-DWITH_GLOO=
${
gloo_flag
}
-DWITH_GLOO=
${
gloo_flag
}
-DWITH_LITE=
${
WITH_LITE
:-
OFF
}
-DWITH_LITE=
${
WITH_LITE
:-
OFF
}
...
@@ -292,7 +290,6 @@ EOF
...
@@ -292,7 +290,6 @@ EOF
-DINFERENCE_DEMO_INSTALL_DIR
=
${
INFERENCE_DEMO_INSTALL_DIR
}
\
-DINFERENCE_DEMO_INSTALL_DIR
=
${
INFERENCE_DEMO_INSTALL_DIR
}
\
-DPY_VERSION
=
${
PY_VERSION
:-
2
.7
}
\
-DPY_VERSION
=
${
PY_VERSION
:-
2
.7
}
\
-DCMAKE_INSTALL_PREFIX
=
${
INSTALL_PREFIX
:-
/paddle/build
}
\
-DCMAKE_INSTALL_PREFIX
=
${
INSTALL_PREFIX
:-
/paddle/build
}
\
-DWITH_GRPC
=
${
grpc_flag
}
\
-DWITH_PSCORE
=
${
distibuted_flag
}
\
-DWITH_PSCORE
=
${
distibuted_flag
}
\
-DWITH_GLOO
=
${
gloo_flag
}
\
-DWITH_GLOO
=
${
gloo_flag
}
\
-DLITE_GIT_TAG
=
release/v2.8
\
-DLITE_GIT_TAG
=
release/v2.8
\
...
@@ -521,8 +518,7 @@ function run_brpc_test() {
...
@@ -521,8 +518,7 @@ function run_brpc_test() {
mkdir
-p
${
PADDLE_ROOT
}
/build
mkdir
-p
${
PADDLE_ROOT
}
/build
cd
${
PADDLE_ROOT
}
/build
cd
${
PADDLE_ROOT
}
/build
if
[[
${
WITH_TESTING
:-
ON
}
==
"ON"
\
if
[[
${
WITH_TESTING
:-
ON
}
==
"ON"
\
&&
${
WITH_DISTRIBUTE
:-
OFF
}
==
"ON"
\
&&
${
WITH_DISTRIBUTE
:-
OFF
}
==
"ON"
]]
;
then
&&
${
WITH_GRPC
:-
OFF
}
==
"OFF"
]]
;
then
cat
<<
EOF
cat
<<
EOF
========================================
========================================
Running brpc unit tests ...
Running brpc unit tests ...
...
...
paddle/testing/paddle_gtest_main.cc
浏览文件 @
e58c705b
...
@@ -29,8 +29,7 @@ int main(int argc, char** argv) {
...
@@ -29,8 +29,7 @@ int main(int argc, char** argv) {
std
::
vector
<
std
::
string
>
envs
;
std
::
vector
<
std
::
string
>
envs
;
std
::
vector
<
std
::
string
>
undefok
;
std
::
vector
<
std
::
string
>
undefok
;
#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_GRPC) && \
#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB)
!defined(PADDLE_WITH_PSLIB)
std
::
string
str_max_body_size
;
std
::
string
str_max_body_size
;
if
(
::
GFLAGS_NAMESPACE
::
GetCommandLineOption
(
"max_body_size"
,
if
(
::
GFLAGS_NAMESPACE
::
GetCommandLineOption
(
"max_body_size"
,
&
str_max_body_size
))
{
&
str_max_body_size
))
{
...
...
python/paddle/fluid/__init__.py
浏览文件 @
e58c705b
...
@@ -72,7 +72,6 @@ from .data_feeder import DataFeeder
...
@@ -72,7 +72,6 @@ from .data_feeder import DataFeeder
from
.core
import
LoDTensor
,
LoDTensorArray
,
Scope
,
_Scope
from
.core
import
LoDTensor
,
LoDTensorArray
,
Scope
,
_Scope
from
.core
import
CPUPlace
,
XPUPlace
,
CUDAPlace
,
CUDAPinnedPlace
,
NPUPlace
from
.core
import
CPUPlace
,
XPUPlace
,
CUDAPlace
,
CUDAPinnedPlace
,
NPUPlace
from
.incubate
import
fleet
from
.incubate
import
fleet
from
.incubate
import
data_generator
from
.transpiler
import
DistributeTranspiler
,
\
from
.transpiler
import
DistributeTranspiler
,
\
memory_optimize
,
release_memory
,
DistributeTranspilerConfig
memory_optimize
,
release_memory
,
DistributeTranspilerConfig
from
.lod_tensor
import
create_lod_tensor
,
create_random_int_lodtensor
from
.lod_tensor
import
create_lod_tensor
,
create_random_int_lodtensor
...
...
python/paddle/fluid/contrib/__init__.py
浏览文件 @
e58c705b
...
@@ -22,11 +22,7 @@ from . import op_frequence
...
@@ -22,11 +22,7 @@ from . import op_frequence
from
.op_frequence
import
*
from
.op_frequence
import
*
from
.
import
quantize
from
.
import
quantize
from
.quantize
import
*
from
.quantize
import
*
from
.
import
reader
from
.reader
import
*
from
.
import
slim
from
.
import
slim
from
.
import
utils
from
.utils
import
*
from
.
import
extend_optimizer
from
.
import
extend_optimizer
from
.extend_optimizer
import
*
from
.extend_optimizer
import
*
from
.
import
model_stat
from
.
import
model_stat
...
@@ -42,8 +38,6 @@ __all__ += decoder.__all__
...
@@ -42,8 +38,6 @@ __all__ += decoder.__all__
__all__
+=
memory_usage_calc
.
__all__
__all__
+=
memory_usage_calc
.
__all__
__all__
+=
op_frequence
.
__all__
__all__
+=
op_frequence
.
__all__
__all__
+=
quantize
.
__all__
__all__
+=
quantize
.
__all__
__all__
+=
reader
.
__all__
__all__
+=
utils
.
__all__
__all__
+=
extend_optimizer
.
__all__
__all__
+=
extend_optimizer
.
__all__
__all__
+=
[
'mixed_precision'
]
__all__
+=
[
'mixed_precision'
]
__all__
+=
layers
.
__all__
__all__
+=
layers
.
__all__
...
...
python/paddle/fluid/contrib/reader/README.md
已删除
100644 → 0
浏览文件 @
bf0ec9b8
## CTR READER
An multi-thread cpp reader that has the same interface with py_reader. It
uses cpp multi-thread to read file and is much more faster then the Python read
thread in py_reader.
Currently, it support two types of file:
-
gzip
-
plain text file
and two types of data format:
-
cvs data format is :
*
label dense_fea,dense_fea sparse_fea,sparse_fea
-
the svm data format is :
*
label slot1:fea_sign slot2:fea_sign slot1:fea_sign
## Distributed reader
The distributed reader is mainly used by multi-process tasks, and the input must be a batch reader.
Cons:
-
It can be operated conveniently so that different processes can read different data.
Pros:
-
If batch_reader produces training data, and batch_reader loads or preprocesses data for a long time, this data reading method may be slower.
python/paddle/fluid/contrib/reader/__init__.py
已删除
100644 → 0
浏览文件 @
bf0ec9b8
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
from
.distributed_reader
import
*
__all__
=
[]
__all__
+=
distributed_reader
.
__all__
python/paddle/fluid/contrib/reader/distributed_reader.py
已删除
100644 → 0
浏览文件 @
bf0ec9b8
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
os
__all__
=
[
"distributed_batch_reader"
]
def
distributed_batch_reader
(
batch_reader
):
"""
Create a reader for multi-process training. The input must be a batch reader.
Args:
batch_reader (callable): The input reader should be a batch reader.
Examples:
.. code-block:: python
import paddle
import paddle.fluid as fluid
train_reader = paddle.batch(paddle.dataset.mnist.train(),
batch_size=32,drop_last=True)
train_reader = fluid.contrib.reader.distributed_batch_reader(
train_reader)
"""
trainers_num
=
int
(
os
.
environ
.
get
(
'PADDLE_TRAINERS_NUM'
,
1
))
trainer_id
=
int
(
os
.
getenv
(
"PADDLE_TRAINER_ID"
,
0
))
assert
trainer_id
<
trainers_num
def
decorate_for_multi_process
():
if
trainers_num
>
1
:
print
(
"start data reader (trainers_num: {}, trainer_id: {})"
.
format
(
trainers_num
,
trainer_id
))
train_data
,
idx
=
None
,
1
for
batch_id
,
data
in
enumerate
(
batch_reader
()):
if
trainers_num
>
1
:
if
idx
<
trainers_num
:
if
idx
==
trainer_id
+
1
:
train_data
=
data
idx
+=
1
else
:
if
idx
==
trainer_id
+
1
:
train_data
=
data
assert
train_data
is
not
None
,
"train data should not be None."
yield
train_data
train_data
,
idx
=
None
,
1
else
:
yield
data
return
decorate_for_multi_process
python/paddle/fluid/contrib/tests/test_distributed_reader.py
已删除
100644 → 0
浏览文件 @
bf0ec9b8
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
unittest
import
numpy
as
np
import
paddle.fluid
as
fluid
import
os
def
data_generator
():
data
=
[
0
,
1
,
2
,
3
]
for
val
in
data
:
yield
val
class
TestDistributedReader
(
unittest
.
TestCase
):
def
test_distributed_reader
(
self
):
trainer_num
=
4
os
.
environ
[
'PADDLE_TRAINER_ID'
]
=
str
(
1
)
os
.
environ
[
'PADDLE_TRAINERS_NUM'
]
=
str
(
trainer_num
)
reader
=
fluid
.
contrib
.
reader
.
distributed_batch_reader
(
data_generator
)
data
=
next
(
reader
())
assert
data
==
1
#Note: windows python3 don't have unsetenv
del
os
.
environ
[
'PADDLE_TRAINER_ID'
]
del
os
.
environ
[
'PADDLE_TRAINERS_NUM'
]
if
__name__
==
'__main__'
:
unittest
.
main
()
python/paddle/fluid/contrib/utils/__init__.py
已删除
100644 → 0
浏览文件 @
bf0ec9b8
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
from
.
import
lookup_table_utils
from
.lookup_table_utils
import
*
from
.
import
hdfs_utils
from
.hdfs_utils
import
*
__all__
=
[]
__all__
+=
lookup_table_utils
.
__all__
__all__
+=
hdfs_utils
.
__all__
python/paddle/fluid/contrib/utils/hdfs_utils.py
已删除
100644 → 0
浏览文件 @
bf0ec9b8
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""hdfs_utils.py will move to fluid/incubate/fleet/utils/hdfs.py"""
import
os
import
sys
import
subprocess
import
multiprocessing
from
datetime
import
datetime
import
re
import
copy
import
errno
import
logging
from
paddle.fluid.log_helper
import
get_logger
__all__
=
[
"HDFSClient"
,
"multi_download"
,
"multi_upload"
]
_logger
=
get_logger
(
__name__
,
logging
.
INFO
,
fmt
=
'%(asctime)s-%(levelname)s: %(message)s'
)
class
HDFSClient
(
object
):
r
"""
A tool of HDFS
Args:
hadoop_home (string): hadoop_home
configs (dict): hadoop config, it is a dict, please contain \
key "fs.default.name" and "hadoop.job.ugi"
Can be a float value
Examples:
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
client.ls("/user/com/train-25")
files = client.lsr("/user/com/train-25/models")
"""
def
__init__
(
self
,
hadoop_home
,
configs
):
self
.
pre_commands
=
[]
hadoop_bin
=
'%s/bin/hadoop'
%
hadoop_home
self
.
pre_commands
.
append
(
hadoop_bin
)
dfs
=
'fs'
self
.
pre_commands
.
append
(
dfs
)
for
k
,
v
in
configs
.
items
():
config_command
=
'-D%s=%s'
%
(
k
,
v
)
self
.
pre_commands
.
append
(
config_command
)
def
__run_hdfs_cmd
(
self
,
commands
,
retry_times
=
5
):
whole_commands
=
copy
.
deepcopy
(
self
.
pre_commands
)
whole_commands
.
extend
(
commands
)
print
(
'Running system command: {0}'
.
format
(
' '
.
join
(
whole_commands
)))
ret_code
=
0
ret_out
=
None
ret_err
=
None
whole_commands
=
" "
.
join
(
whole_commands
)
for
x
in
range
(
retry_times
+
1
):
proc
=
subprocess
.
Popen
(
whole_commands
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
PIPE
,
shell
=
True
)
(
output
,
errors
)
=
proc
.
communicate
()
ret_code
,
ret_out
,
ret_err
=
proc
.
returncode
,
output
,
errors
if
ret_code
:
_logger
.
warn
(
'Times: %d, Error running command: %s. Return code: %d, Error: %s'
%
(
x
,
' '
.
join
(
whole_commands
),
proc
.
returncode
,
errors
))
else
:
break
return
ret_code
,
ret_out
,
ret_err
def
upload
(
self
,
hdfs_path
,
local_path
,
overwrite
=
False
,
retry_times
=
5
):
"""
upload the local file to hdfs
Args:
hdfs_path(str): the hdfs file path
local_path(str): the local file path
overwrite(bool|None): will overwrite the file on HDFS or not
retry_times(int|5): retry times
Returns:
True or False
"""
assert
hdfs_path
is
not
None
assert
local_path
is
not
None
and
os
.
path
.
exists
(
local_path
)
if
os
.
path
.
isdir
(
local_path
):
_logger
.
warn
(
"The Local path: {} is dir and I will support it later, return"
.
format
(
local_path
))
return
False
base
=
os
.
path
.
basename
(
local_path
)
if
not
self
.
is_exist
(
hdfs_path
):
self
.
makedirs
(
hdfs_path
)
else
:
if
self
.
is_exist
(
os
.
path
.
join
(
hdfs_path
,
base
)):
if
overwrite
:
_logger
.
error
(
"The HDFS path: {} is exist and overwrite is True, delete it"
.
format
(
hdfs_path
))
self
.
delete
(
hdfs_path
)
else
:
_logger
.
error
(
"The HDFS path: {} is exist and overwrite is False, return"
.
format
(
hdfs_path
))
return
False
put_commands
=
[
"-put"
,
local_path
,
hdfs_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
put_commands
,
retry_times
)
if
returncode
:
_logger
.
error
(
"Put local path: {} to HDFS path: {} failed"
.
format
(
local_path
,
hdfs_path
))
return
False
else
:
_logger
.
info
(
"Put local path: {} to HDFS path: {} successfully"
.
format
(
local_path
,
hdfs_path
))
return
True
def
download
(
self
,
hdfs_path
,
local_path
,
overwrite
=
False
,
unzip
=
False
):
"""
download file from HDFS
Args:
hdfs_path(str): the hdfs file path
local_path(str): the local file path
overwrite(bool|None): will overwrite the file on HDFS or not
unzip(bool|False): if the download file is compressed by zip, unzip it or not.
Returns:
True or False
"""
_logger
.
info
(
'Downloading %r to %r.'
,
hdfs_path
,
local_path
)
_logger
.
info
(
'Download of %s to %r complete.'
,
hdfs_path
,
local_path
)
if
not
self
.
is_exist
(
hdfs_path
):
print
(
"HDFS path: {} do not exist"
.
format
(
hdfs_path
))
return
False
if
self
.
is_dir
(
hdfs_path
):
_logger
.
error
(
"The HDFS path: {} is dir and I will support it later, return"
.
format
(
hdfs_path
))
if
os
.
path
.
exists
(
local_path
):
base
=
os
.
path
.
basename
(
hdfs_path
)
local_file
=
os
.
path
.
join
(
local_path
,
base
)
if
os
.
path
.
exists
(
local_file
):
if
overwrite
:
os
.
remove
(
local_file
)
else
:
_logger
.
error
(
"The Local path: {} is exist and overwrite is False, return"
.
format
(
local_file
))
return
False
self
.
make_local_dirs
(
local_path
)
download_commands
=
[
"-get"
,
hdfs_path
,
local_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
download_commands
)
if
returncode
:
_logger
.
error
(
"Get local path: {} from HDFS path: {} failed"
.
format
(
local_path
,
hdfs_path
))
return
False
else
:
_logger
.
info
(
"Get local path: {} from HDFS path: {} successfully"
.
format
(
local_path
,
hdfs_path
))
return
True
def
is_exist
(
self
,
hdfs_path
=
None
):
"""
whether the remote HDFS path exists
Args:
hdfs_path(str): the hdfs file path
Returns:
True or False
"""
exist_cmd
=
[
'-test'
,
'-e'
,
hdfs_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
exist_cmd
,
retry_times
=
1
)
if
returncode
:
_logger
.
error
(
"HDFS is_exist HDFS path: {} failed"
.
format
(
hdfs_path
))
return
False
else
:
_logger
.
info
(
"HDFS is_exist HDFS path: {} successfully"
.
format
(
hdfs_path
))
return
True
def
is_dir
(
self
,
hdfs_path
=
None
):
"""
whether the remote HDFS path is directory
Args:
hdfs_path(str): the hdfs file path
Returns:
True or False
"""
if
not
self
.
is_exist
(
hdfs_path
):
return
False
dir_cmd
=
[
'-test'
,
'-d'
,
hdfs_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
dir_cmd
,
retry_times
=
1
)
if
returncode
:
_logger
.
error
(
"HDFS path: {} failed is not a directory"
.
format
(
hdfs_path
))
return
False
else
:
_logger
.
info
(
"HDFS path: {} successfully is a directory"
.
format
(
hdfs_path
))
return
True
def
delete
(
self
,
hdfs_path
):
"""
Remove a file or directory from HDFS.
whether the remote HDFS path exists
Args:
hdfs_path: HDFS path.
Returns:
True or False
This function returns `True` if the deletion was successful and `False` if
no file or directory previously existed at `hdfs_path`.
"""
_logger
.
info
(
'Deleting %r.'
,
hdfs_path
)
if
not
self
.
is_exist
(
hdfs_path
):
_logger
.
warn
(
"HDFS path: {} do not exist"
.
format
(
hdfs_path
))
return
True
if
self
.
is_dir
(
hdfs_path
):
del_cmd
=
[
'-rmr'
,
hdfs_path
]
else
:
del_cmd
=
[
'-rm'
,
hdfs_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
del_cmd
,
retry_times
=
0
)
if
returncode
:
_logger
.
error
(
"HDFS path: {} delete files failure"
.
format
(
hdfs_path
))
return
False
else
:
_logger
.
info
(
"HDFS path: {} delete files successfully"
.
format
(
hdfs_path
))
return
True
def
rename
(
self
,
hdfs_src_path
,
hdfs_dst_path
,
overwrite
=
False
):
"""
Move a file or folder on HDFS.
Args:
hdfs_path(str): HDFS path.
overwrite(bool|False): If the path already exists and overwrite is False, will return False.
Returns:
True or False
"""
assert
hdfs_src_path
is
not
None
assert
hdfs_dst_path
is
not
None
if
not
self
.
is_exist
(
hdfs_src_path
):
_logger
.
info
(
"HDFS path do not exist: {}"
.
format
(
hdfs_src_path
))
if
self
.
is_exist
(
hdfs_dst_path
)
and
not
overwrite
:
_logger
.
error
(
"HDFS path is exist: {} and overwrite=False"
.
format
(
hdfs_dst_path
))
rename_command
=
[
'-mv'
,
hdfs_src_path
,
hdfs_dst_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
rename_command
,
retry_times
=
1
)
if
returncode
:
_logger
.
error
(
"HDFS rename path: {} to {} failed"
.
format
(
hdfs_src_path
,
hdfs_dst_path
))
return
False
else
:
_logger
.
info
(
"HDFS rename path: {} to {} successfully"
.
format
(
hdfs_src_path
,
hdfs_dst_path
))
return
True
@
staticmethod
def
make_local_dirs
(
local_path
):
"""
create a directory local, is same to mkdir
Args:
local_path: local path that wants to create a directory.
"""
try
:
os
.
makedirs
(
local_path
)
except
OSError
as
e
:
if
e
.
errno
!=
errno
.
EEXIST
:
raise
def
makedirs
(
self
,
hdfs_path
):
"""
Create a remote directory, recursively if necessary.
Args:
hdfs_path(str): Remote path. Intermediate directories will be created appropriately.
Returns:
True or False
"""
_logger
.
info
(
'Creating directories to %r.'
,
hdfs_path
)
assert
hdfs_path
is
not
None
if
self
.
is_exist
(
hdfs_path
):
_logger
.
error
(
"HDFS path is exist: {}"
.
format
(
hdfs_path
))
return
mkdirs_commands
=
[
'-mkdir'
,
hdfs_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
mkdirs_commands
,
retry_times
=
1
)
if
returncode
:
_logger
.
error
(
"HDFS mkdir path: {} failed"
.
format
(
hdfs_path
))
return
False
else
:
_logger
.
error
(
"HDFS mkdir path: {} successfully"
.
format
(
hdfs_path
))
return
True
def
ls
(
self
,
hdfs_path
):
"""
ls directory contents about HDFS hdfs_path
Args:
hdfs_path(str): Remote HDFS path will be ls.
Returns:
List: a contents list about hdfs_path.
"""
assert
hdfs_path
is
not
None
if
not
self
.
is_exist
(
hdfs_path
):
return
[]
ls_commands
=
[
'-ls'
,
hdfs_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
ls_commands
,
retry_times
=
1
)
if
returncode
:
_logger
.
error
(
"HDFS list path: {} failed"
.
format
(
hdfs_path
))
return
[]
else
:
_logger
.
info
(
"HDFS list path: {} successfully"
.
format
(
hdfs_path
))
ret_lines
=
[]
regex
=
re
.
compile
(
r
'\s+'
)
out_lines
=
output
.
strip
().
split
(
"
\n
"
)
for
line
in
out_lines
:
re_line
=
regex
.
split
(
line
)
if
len
(
re_line
)
==
8
:
ret_lines
.
append
(
re_line
[
7
])
return
ret_lines
def
lsr
(
self
,
hdfs_path
,
only_file
=
True
,
sort
=
True
):
"""
list directory contents about HDFS hdfs_path recursively
Args:
hdfs_path(str): Remote HDFS path.
only_file(bool|True): will discard folders.
sort(bool|True): will be sorted by create time.
Returns:
List: a contents list about hdfs_path.
"""
def
sort_by_time
(
v1
,
v2
):
v1_time
=
datetime
.
strptime
(
v1
[
1
],
'%Y-%m-%d %H:%M'
)
v2_time
=
datetime
.
strptime
(
v2
[
1
],
'%Y-%m-%d %H:%M'
)
return
v1_time
>
v2_time
assert
hdfs_path
is
not
None
if
not
self
.
is_exist
(
hdfs_path
):
return
[]
ls_commands
=
[
'-lsr'
,
hdfs_path
]
returncode
,
output
,
errors
=
self
.
__run_hdfs_cmd
(
ls_commands
,
retry_times
=
1
)
if
returncode
:
_logger
.
error
(
"HDFS list all files: {} failed"
.
format
(
hdfs_path
))
return
[]
else
:
_logger
.
info
(
"HDFS list all files: {} successfully"
.
format
(
hdfs_path
))
lines
=
[]
regex
=
re
.
compile
(
r
'\s+'
)
out_lines
=
output
.
strip
().
split
(
"
\n
"
)
for
line
in
out_lines
:
re_line
=
regex
.
split
(
line
)
if
len
(
re_line
)
==
8
:
if
only_file
and
re_line
[
0
][
0
]
==
"d"
:
continue
else
:
lines
.
append
(
(
re_line
[
7
],
re_line
[
5
]
+
" "
+
re_line
[
6
]))
if
sort
:
sorted
(
lines
,
cmp
=
sort_by_time
)
ret_lines
=
[
ret
[
0
]
for
ret
in
lines
]
return
ret_lines
def
multi_download
(
client
,
hdfs_path
,
local_path
,
trainer_id
,
trainers
,
multi_processes
=
5
):
"""
Download files from HDFS using multi process.
Args:
client(HDFSClient): instance of HDFSClient
hdfs_path(str): path on hdfs
local_path(str): path on local
trainer_id(int): current trainer id
trainers(int): all trainers number
multi_processes(int|5): the download data process at the same time, default=5
Returns:
List:
Download files in local folder.
"""
def
__subprocess_download
(
datas
):
for
data
in
datas
:
re_path
=
os
.
path
.
relpath
(
os
.
path
.
dirname
(
data
),
hdfs_path
)
if
re_path
==
os
.
curdir
:
sub_local_re_path
=
local_path
else
:
sub_local_re_path
=
os
.
path
.
join
(
local_path
,
re_path
)
client
.
download
(
data
,
sub_local_re_path
)
assert
isinstance
(
client
,
HDFSClient
)
client
.
make_local_dirs
(
local_path
)
_logger
.
info
(
"Make local dir {} successfully"
.
format
(
local_path
))
all_need_download
=
client
.
lsr
(
hdfs_path
,
sort
=
True
)
need_download
=
all_need_download
[
trainer_id
::
trainers
]
_logger
.
info
(
"Get {} files From all {} files need to be download from {}"
.
format
(
len
(
need_download
),
len
(
all_need_download
),
hdfs_path
))
_logger
.
info
(
"Start {} multi process to download datas"
.
format
(
multi_processes
))
procs
=
[]
for
i
in
range
(
multi_processes
):
process_datas
=
need_download
[
i
::
multi_processes
]
p
=
multiprocessing
.
Process
(
target
=
__subprocess_download
,
args
=
(
process_datas
,
))
procs
.
append
(
p
)
p
.
start
()
# complete the processes
for
proc
in
procs
:
proc
.
join
()
_logger
.
info
(
"Finish {} multi process to download datas"
.
format
(
multi_processes
))
local_downloads
=
[]
for
data
in
need_download
:
data_name
=
os
.
path
.
basename
(
data
)
re_path
=
os
.
path
.
relpath
(
os
.
path
.
dirname
(
data
),
hdfs_path
)
if
re_path
==
os
.
curdir
:
local_re_path
=
os
.
path
.
join
(
local_path
,
data_name
)
else
:
local_re_path
=
os
.
path
.
join
(
local_path
,
re_path
,
data_name
)
local_downloads
.
append
(
local_re_path
)
return
local_downloads
def
getfilelist
(
path
):
rlist
=
[]
for
dir
,
folder
,
file
in
os
.
walk
(
path
):
for
i
in
file
:
t
=
os
.
path
.
join
(
dir
,
i
)
rlist
.
append
(
t
)
for
r
in
rlist
:
print
(
r
)
def
multi_upload
(
client
,
hdfs_path
,
local_path
,
multi_processes
=
5
,
overwrite
=
False
,
sync
=
True
):
"""
Upload files to HDFS using multi process.
Args:
client(HDFSClient): instance of HDFSClient
hdfs_path(str): path on hdfs
local_path(str): path on local
multi_processes(int|5): the upload data process at the same time, default=5
overwrite(bool|False): will overwrite file on HDFS or not
sync(bool|True): upload files sync or not.
Returns:
None
"""
def
__subprocess_upload
(
datas
):
for
data
in
datas
:
re_path
=
os
.
path
.
relpath
(
os
.
path
.
dirname
(
data
),
local_path
)
hdfs_re_path
=
os
.
path
.
join
(
hdfs_path
,
re_path
)
client
.
upload
(
hdfs_re_path
,
data
,
overwrite
,
retry_times
=
5
)
def
get_local_files
(
path
):
rlist
=
[]
if
not
os
.
path
.
isdir
(
path
):
return
rlist
for
dirname
,
folder
,
files
in
os
.
walk
(
path
):
for
i
in
files
:
t
=
os
.
path
.
join
(
dirname
,
i
)
rlist
.
append
(
t
)
return
rlist
assert
isinstance
(
client
,
HDFSClient
)
all_files
=
get_local_files
(
local_path
)
if
not
all_files
:
_logger
.
info
(
"there are nothing need to upload, exit"
)
return
_logger
.
info
(
"Start {} multi process to upload datas"
.
format
(
multi_processes
))
procs
=
[]
for
i
in
range
(
multi_processes
):
process_datas
=
all_files
[
i
::
multi_processes
]
p
=
multiprocessing
.
Process
(
target
=
__subprocess_upload
,
args
=
(
process_datas
,
))
procs
.
append
(
p
)
p
.
start
()
# complete the processes
for
proc
in
procs
:
proc
.
join
()
_logger
.
info
(
"Finish {} multi process to upload datas"
.
format
(
multi_processes
))
if
__name__
==
"__main__"
:
hadoop_home
=
"/home/client/hadoop-client/hadoop/"
configs
=
{
"fs.default.name"
:
"hdfs://xxx.hadoop.com:54310"
,
"hadoop.job.ugi"
:
"hello,hello123"
}
client
=
HDFSClient
(
hadoop_home
,
configs
)
client
.
ls
(
"/user/com/train-25"
)
files
=
client
.
lsr
(
"/user/com/train-25/models"
)
downloads
=
multi_download
(
client
,
"/user/com/train-25/model"
,
"/home/xx/data1"
,
1
,
5
,
100
,
multi_processes
=
5
)
multi_upload
(
client
,
"/user/com/train-25/model"
,
"/home/xx/data1"
)
python/paddle/fluid/contrib/utils/lookup_table_utils.py
已删除
100644 → 0
浏览文件 @
bf0ec9b8
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""lookup_table_utils.py will move to fluid/incubate/fleet/utils/lookup_table.py"""
from
__future__
import
print_function
import
os
import
time
import
logging
import
paddle
from
paddle.fluid
import
core
from
paddle.fluid
import
io
from
paddle.fluid
import
Program
from
paddle.fluid.log_helper
import
get_logger
__all__
=
[
"load_persistables_for_increment"
,
"load_persistables_for_inference"
,
"convert_dist_to_sparse_program"
]
_logger
=
get_logger
(
'lookup_table_utils'
,
logging
.
INFO
,
fmt
=
'%(asctime)s-%(levelname)s: %(message)s'
)
model_filename
=
"__model__"
lookup_table_dir
=
"__lookup_table__"
def
__insert_lookup_sparse_table_op
(
main_program
,
idx
,
ids
,
w
,
out
):
main_program
.
global_block
().
_insert_op
(
index
=
idx
,
type
=
"lookup_sparse_table"
,
inputs
=
{
"Ids"
:
[
ids
],
"W"
:
[
w
]},
outputs
=
{
"Out"
:
[
out
]},
attrs
=
{
"is_distributed"
:
False
,
"is_sparse"
:
True
,
"grad_inplace"
:
False
})
def
__get_prefetch_op_tuples
(
main_program
):
# current lookup tables op is split_ids->prefetch->merge_ids
prefetch_op_tuples
=
None
op_types
=
[
op
.
type
for
op
in
main_program
.
global_block
().
ops
]
for
i
in
range
(
len
(
op_types
)):
if
op_types
[
i
]
==
"prefetch"
:
if
op_types
[
i
-
1
]
==
"split_ids"
and
op_types
[
i
+
1
]
==
"merge_ids"
:
split_ids_op_id
=
i
-
1
split_ids_inputs
=
main_program
.
global_block
().
ops
[
i
-
1
].
input
(
"Ids"
)
prefetch_op_inputs
=
main_program
.
global_block
().
ops
[
i
].
input
(
"X"
)
prefetch_op_outputs
=
main_program
.
global_block
().
ops
[
i
].
output
(
"Out"
)
merge_ids_outputs
=
main_program
.
global_block
().
ops
[
i
+
1
].
output
(
"Out"
)
need_delete_vars
=
[]
need_delete_vars
.
extend
(
prefetch_op_inputs
)
need_delete_vars
.
extend
(
prefetch_op_outputs
)
prefetch_op_tuples
=
(
split_ids_op_id
,
split_ids_inputs
,
merge_ids_outputs
,
need_delete_vars
)
break
return
prefetch_op_tuples
def
convert_dist_to_sparse_program
(
program
):
"""
WARNING: this function will only be used for distributed training with distributed lookup table.
when we train model with distributed lookup table but want to do the local inference, we can use
this function to convert the train program with distributed lookup table to sparse lookup table.
Args:
program(Program): the program must be the trainer program, which will be get by the distribute transpiler.
Returns:
program: The `program` is a Program, it's the program replace distributed lookup table to sparse lookup table.
"""
if
not
program
.
_distributed_lookup_table
:
_logger
.
warn
(
"There are no distributed lookup tables need to be converted"
)
return
# create table param and grad var in pserver program
origin_emb_var
=
"{}.origin"
.
format
(
program
.
_distributed_lookup_table
)
emb_var
=
program
.
_distributed_lookup_table
program
.
global_block
().
_rename_var
(
emb_var
,
origin_emb_var
)
origin_param_var
=
program
.
global_block
().
vars
[
origin_emb_var
]
param_var
=
program
.
global_block
().
create_var
(
name
=
emb_var
,
shape
=
origin_param_var
.
shape
,
dtype
=
origin_param_var
.
dtype
,
type
=
core
.
VarDesc
.
VarType
.
SELECTED_ROWS
,
persistable
=
True
)
# parameter must be selected rows
param_var
.
desc
.
set_type
(
core
.
VarDesc
.
VarType
.
SELECTED_ROWS
)
program
.
_sync_with_cpp
()
prefetch_op_tuples
=
__get_prefetch_op_tuples
(
program
)
split_ids_id
=
prefetch_op_tuples
[
0
]
for
idx
in
range
(
split_ids_id
+
2
,
split_ids_id
-
1
,
-
1
):
program
.
global_block
().
_remove_op
(
idx
)
program
.
desc
.
flush
()
in_out_pairs
=
zip
(
prefetch_op_tuples
[
1
],
prefetch_op_tuples
[
2
])
for
in_out_pair
in
in_out_pairs
:
idx
=
split_ids_id
ids
=
program
.
global_block
().
vars
[
in_out_pair
[
0
]]
out
=
program
.
global_block
().
vars
[
in_out_pair
[
1
]]
__insert_lookup_sparse_table_op
(
program
,
idx
,
ids
,
param_var
,
out
)
program
.
desc
.
flush
()
return
program
def
load_persistables_for_increment
(
dirname
,
executor
,
program
,
lookup_table_var
,
lookup_table_var_path
):
"""
WARNING: this function will only be used for distributed training with distributed lookup table.
for increment training, the pserver will not only load dense variables,
but also load the suitable lookup table var. Because of sliced lookup table
var with HASH, we must load the correct sliced var.
Args:
dirname(str): The directory path
executor(Executor): The executor to run for loading inference model.
program(Program): The parameter server program, which will run on Pserver.
lookup_table_var: the distributed lookup tables var name.
lookup_table_var_path: the the distributed lookup tables var location.
Returns:
None
"""
def
_load_persistable_vars
(
executor
,
dirname
,
need_load_vars
):
load_prog
=
Program
()
load_block
=
load_prog
.
global_block
()
need_delete_vars
=
[]
for
param
in
need_load_vars
:
origin_var
=
param
.
origin
slice_var
=
param
.
slice
is_slice
=
param
.
is_slice
offset
=
param
.
offset
if
is_slice
:
origin
=
load_block
.
create_var
(
name
=
"{}.load"
.
format
(
origin_var
.
name
),
type
=
origin_var
.
type
,
shape
=
origin_var
.
shape
,
dtype
=
origin_var
.
dtype
,
persistable
=
True
)
load_block
.
append_op
(
type
=
'load'
,
inputs
=
{},
outputs
=
{
'Out'
:
[
origin
]},
attrs
=
{
'file_path'
:
os
.
path
.
join
(
dirname
,
origin_var
.
name
)
})
slice
=
load_block
.
create_var
(
name
=
slice_var
.
name
,
type
=
slice_var
.
type
,
shape
=
slice_var
.
shape
,
dtype
=
slice_var
.
dtype
,
persistable
=
True
)
dim1_flatten
=
reduce
(
lambda
x
,
y
:
x
*
y
,
slice
.
shape
[
1
:])
start
=
int
(
offset
/
dim1_flatten
)
end
=
int
(
offset
/
dim1_flatten
+
slice
.
shape
[
0
])
load_block
.
append_op
(
type
=
"slice"
,
inputs
=
{
'Input'
:
origin
},
outputs
=
{
'Out'
:
slice
},
attrs
=
{
'axes'
:
[
0
],
'starts'
:
[
start
],
'ends'
:
[
end
]})
need_delete_vars
.
append
(
origin
)
else
:
origin
=
load_block
.
create_var
(
name
=
"{}"
.
format
(
origin_var
.
name
),
type
=
origin_var
.
type
,
shape
=
origin_var
.
shape
,
dtype
=
origin_var
.
dtype
,
persistable
=
True
)
load_block
.
append_op
(
type
=
'load'
,
inputs
=
{},
outputs
=
{
'Out'
:
[
origin
]},
attrs
=
{
'file_path'
:
os
.
path
.
join
(
dirname
,
origin_var
.
name
)
})
load_block
.
append_op
(
type
=
'delete_var'
,
inputs
=
{
'X'
:
need_delete_vars
},
)
executor
.
run
(
load_prog
)
def
__load_lookup_table_vars
(
executor
,
main_program
,
lookup_table_var
,
lookup_table_var_path
):
emb_var
=
main_program
.
global_block
().
var
(
lookup_table_var
)
load_program
=
Program
()
load_block
=
load_program
.
global_block
()
load_block
.
append_op
(
type
=
'load'
,
inputs
=
{},
outputs
=
{
'Out'
:
[
emb_var
]},
attrs
=
{
'file_path'
:
lookup_table_var_path
})
executor
.
run
(
load_program
)
if
not
os
.
path
.
isdir
(
dirname
):
raise
ValueError
(
"There is no directory named '%s'"
,
dirname
)
if
not
os
.
path
.
exists
(
lookup_table_var_path
):
raise
ValueError
(
"There is no file named '%s'"
,
lookup_table_var_path
)
if
not
isinstance
(
program
,
Program
):
raise
ValueError
(
"program must be an instance of fluid.Program"
)
_logger
.
info
(
"Start Load Sparse Program With "
"Distributed Lookup Table Vars from {}, time = {}"
.
format
(
dirname
,
time
.
ctime
()))
need_load_vars
=
program
.
_parameters_on_pservers
.
get_distributed_vars_by_ep
(
program
.
_ps_endpoint
)
_load_persistable_vars
(
executor
,
dirname
,
need_load_vars
)
__load_lookup_table_vars
(
executor
,
program
,
lookup_table_var
,
lookup_table_var_path
)
_logger
.
info
(
"Finish Load Sparse Program With "
"Distributed Lookup Table Vars from {}, time = {}"
.
format
(
dirname
,
time
.
ctime
()))
def
load_persistables_for_inference
(
dirname
,
executor
,
program
,
lookup_table_var_name
):
"""
WARNING: this function will only be used for inference with distributed lookup table.
Inference with distributed lookup table is a little funky, this function will load distributed
lookup table vars into sparse var, can be used in local inference mode.
Args:
dirname(str): The directory path
executor(Executor): The executor to run for loading inference model.
program(Program): The parameter server program, which will run on Pserver.
lookup_table_var_name: the distributed lookup tables var name.
Returns:
None
"""
def
_load_persistable_vars
(
executor
,
dirname
,
program
,
lookup_table_vars
):
def
_is_checkpoint_var
(
exclude_fluid_vars
=
None
):
"""
the checkpoint will not save or load all the variables.
var type is FEED_MINIBATCH/FETCH_LIST/RAW or var name ends with @GRAD are discarded.
: param var(Variable)
"""
if
exclude_fluid_vars
is
None
:
exclude_fluid_vars
=
[]
def
is_valid
(
var
):
if
var
.
desc
.
type
()
==
core
.
VarDesc
.
VarType
.
FEED_MINIBATCH
or
\
var
.
desc
.
type
()
==
core
.
VarDesc
.
VarType
.
FETCH_LIST
or
\
var
.
desc
.
type
()
==
core
.
VarDesc
.
VarType
.
RAW
:
return
False
# @GRAD are named for gradient variables, checkpoint will not save it.
if
"@GRAD"
in
var
.
name
:
return
False
# .trainer_ are named for distribute train variables, checkpoint will not save it.
if
".trainer_"
in
var
.
name
:
return
False
# .block is named for distribute train variables, checkpoint will not save it.
if
".block"
in
var
.
name
:
return
False
if
"tmp_"
in
var
.
name
:
return
False
if
var
.
name
in
exclude_fluid_vars
:
return
False
return
var
.
persistable
return
is_valid
io
.
load_vars
(
executor
,
dirname
=
dirname
,
main_program
=
program
,
predicate
=
_is_checkpoint_var
(
lookup_table_vars
),
filename
=
None
)
def
_load_lookup_table_vars
(
executor
,
dirname
,
main_program
,
lookup_table_vars
):
if
not
os
.
path
.
isdir
(
dirname
):
raise
ValueError
(
"There is no directory named '%s'"
,
dirname
)
lookup_table_dirname
=
os
.
path
.
join
(
dirname
,
lookup_table_dir
)
emb_var_name
=
lookup_table_vars
[
0
]
emb_var
=
main_program
.
global_block
().
var
(
emb_var_name
)
emb_files
=
[]
for
emb_name
in
os
.
listdir
(
lookup_table_dirname
):
if
emb_var_name
in
emb_name
:
emb_files
.
append
(
emb_name
)
convert_program
=
Program
()
global_block
=
convert_program
.
global_block
()
emb_var
=
global_block
.
create_var
(
name
=
emb_var
.
name
,
shape
=
emb_var
.
shape
,
dtype
=
emb_var
.
dtype
,
type
=
core
.
VarDesc
.
VarType
.
SELECTED_ROWS
,
persistable
=
True
)
emb_var
.
desc
.
set_type
(
core
.
VarDesc
.
VarType
.
SELECTED_ROWS
)
sums
=
[]
for
i
,
emb_file
in
enumerate
(
emb_files
):
var_name
=
"{}_{}"
.
format
(
emb_var
.
name
,
i
)
param_var
=
global_block
.
create_var
(
name
=
var_name
,
shape
=
emb_var
.
shape
,
dtype
=
emb_var
.
dtype
,
type
=
core
.
VarDesc
.
VarType
.
SELECTED_ROWS
,
persistable
=
True
)
param_var
.
desc
.
set_type
(
core
.
VarDesc
.
VarType
.
SELECTED_ROWS
)
global_block
.
append_op
(
type
=
'load'
,
inputs
=
{},
outputs
=
{
'Out'
:
[
param_var
]},
attrs
=
{
'file_path'
:
os
.
path
.
join
(
lookup_table_dirname
,
var_name
)
})
sums
.
append
(
param_var
)
global_block
.
append_op
(
type
=
'merge_sparse_lookup_table'
,
inputs
=
{
"X"
:
sums
},
outputs
=
{
'Out'
:
emb_var
},
attrs
=
{})
global_block
.
append_op
(
type
=
'save'
,
inputs
=
{
"X"
:
[
emb_var
]},
outputs
=
{},
attrs
=
{
'file_path'
:
os
.
path
.
join
(
lookup_table_dirname
,
emb_var
.
name
)
})
global_block
.
append_op
(
type
=
'delete_var'
,
inputs
=
{
'X'
:
sums
})
executor
.
run
(
convert_program
)
if
not
os
.
path
.
isdir
(
dirname
):
raise
ValueError
(
"There is no directory named '%s'"
,
dirname
)
if
program
:
if
not
isinstance
(
program
,
Program
):
raise
ValueError
(
"program must be an instance of fluid.Program"
)
else
:
local_model
=
os
.
path
.
join
(
dirname
,
model_filename
)
with
open
(
local_model
,
"rb"
)
as
f
:
program_desc_str
=
f
.
read
()
program
=
Program
.
parse_from_string
(
program_desc_str
)
if
not
core
.
_is_program_version_supported
(
program
.
_version
()):
raise
ValueError
(
"Unsupported program version: %d
\n
"
%
program
.
_version
())
_logger
.
info
(
"Start Load Sparse Program With "
"Distributed Lookup Table Vars from {}, time = {}"
.
format
(
dirname
,
time
.
ctime
()))
_load_persistable_vars
(
executor
,
dirname
,
program
,
[
lookup_table_var_name
])
_load_lookup_table_vars
(
executor
,
dirname
,
program
,
[
lookup_table_var_name
])
_logger
.
info
(
"Finish Load Sparse Program With "
"Distributed Lookup Table Vars from {}, time = {}"
.
format
(
dirname
,
time
.
ctime
()))
return
program
def
get_inference_model
(
main_program
,
feeded_var_names
,
target_vars
):
"""
Prune the given `main_program` to build a new program especially for inference with distributed lookup table ,
and then add `feeded_vars` and `target_vars` in this program.
Args:
main_program(Program|None): The original program, which will be pruned to
build the inference model. If is set None,
the default main program will be used.
Default: None.
feeded_var_names(list[str]): Names of variables that need to be fed data
during inference.
target_vars(list[Variable]): Variables from which we can get inference
results.
Returns:
program(Program)
Raises:
ValueError: If `feed_var_names` is not a list of basestring.
ValueError: If `target_vars` is not a list of Variable.
"""
def
prepend_feed_ops
(
inference_program
,
feed_target_names
,
feed_holder_name
=
'feed'
):
if
len
(
feed_target_names
)
==
0
:
return
global_block
=
inference_program
.
global_block
()
feed_var
=
global_block
.
create_var
(
name
=
feed_holder_name
,
type
=
core
.
VarDesc
.
VarType
.
FEED_MINIBATCH
,
persistable
=
True
)
for
i
,
name
in
enumerate
(
feed_target_names
):
out
=
global_block
.
var
(
name
)
global_block
.
_prepend_op
(
type
=
'feed'
,
inputs
=
{
'X'
:
[
feed_var
]},
outputs
=
{
'Out'
:
[
out
]},
attrs
=
{
'col'
:
i
})
def
append_fetch_ops
(
inference_program
,
fetch_target_names
,
fetch_holder_name
=
'fetch'
):
global_block
=
inference_program
.
global_block
()
fetch_var
=
global_block
.
create_var
(
name
=
fetch_holder_name
,
type
=
core
.
VarDesc
.
VarType
.
FETCH_LIST
,
persistable
=
True
)
for
i
,
name
in
enumerate
(
fetch_target_names
):
global_block
.
append_op
(
type
=
'fetch'
,
inputs
=
{
'X'
:
[
name
]},
outputs
=
{
'Out'
:
[
fetch_var
]},
attrs
=
{
'col'
:
i
})
origin_program
=
main_program
.
clone
()
main_program
=
main_program
.
clone
()
global_block
=
main_program
.
global_block
()
need_to_remove_op_index
=
[]
for
i
,
op
in
enumerate
(
global_block
.
ops
):
op
.
desc
.
set_is_target
(
False
)
if
op
.
type
==
"feed"
or
op
.
type
==
"fetch"
:
need_to_remove_op_index
.
append
(
i
)
for
index
in
need_to_remove_op_index
[::
-
1
]:
global_block
.
_remove_op
(
index
)
main_program
.
desc
.
flush
()
main_program
=
main_program
.
_prune
(
targets
=
target_vars
)
main_program
=
main_program
.
_inference_optimize
(
prune_read_op
=
True
)
fetch_var_names
=
[
v
.
name
for
v
in
target_vars
]
prepend_feed_ops
(
main_program
,
feeded_var_names
)
append_fetch_ops
(
main_program
,
fetch_var_names
)
return
main_program
python/paddle/fluid/incubate/data_generator/__init__.py
已删除
100644 → 0
浏览文件 @
bf0ec9b8
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
os
import
sys
__all__
=
[
'MultiSlotDataGenerator'
,
'MultiSlotStringDataGenerator'
]
class
DataGenerator
(
object
):
"""
DataGenerator is a general Base class for user to inherit
A user who wants to define his/her own python processing logic
with paddle.fluid.dataset should inherit this class.
"""
def
__init__
(
self
):
self
.
_proto_info
=
None
self
.
batch_size_
=
32
def
_set_line_limit
(
self
,
line_limit
):
if
not
isinstance
(
line_limit
,
int
):
raise
ValueError
(
"line_limit%s must be in int type"
%
type
(
line_limit
))
if
line_limit
<
1
:
raise
ValueError
(
"line_limit can not less than 1"
)
self
.
_line_limit
=
line_limit
def
set_batch
(
self
,
batch_size
):
'''
Set batch size of current DataGenerator
This is necessary only if a user wants to define generator_batch
Example:
.. code-block:: python
import paddle.fluid.incubate.data_generator as dg
class MyData(dg.DataGenerator):
def generate_sample(self, line):
def local_iter():
int_words = [int(x) for x in line.split()]
yield ("words", int_words)
return local_iter
def generate_batch(self, samples):
def local_iter():
for s in samples:
yield ("words", s[1].extend([s[1][0]]))
mydata = MyData()
mydata.set_batch(128)
'''
self
.
batch_size_
=
batch_size
def
run_from_memory
(
self
):
'''
This function generator data from memory, it is usually used for
debug and benchmarking
Example:
.. code-block:: python
import paddle.fluid.incubate.data_generator as dg
class MyData(dg.DataGenerator):
def generate_sample(self, line):
def local_iter():
yield ("words", [1, 2, 3, 4])
return local_iter
mydata = MyData()
mydata.run_from_memory()
'''
batch_samples
=
[]
line_iter
=
self
.
generate_sample
(
None
)
for
user_parsed_line
in
line_iter
():
if
user_parsed_line
==
None
:
continue
batch_samples
.
append
(
user_parsed_line
)
if
len
(
batch_samples
)
==
self
.
batch_size_
:
batch_iter
=
self
.
generate_batch
(
batch_samples
)
for
sample
in
batch_iter
():
sys
.
stdout
.
write
(
self
.
_gen_str
(
sample
))
batch_samples
=
[]
if
len
(
batch_samples
)
>
0
:
batch_iter
=
self
.
generate_batch
(
batch_samples
)
for
sample
in
batch_iter
():
sys
.
stdout
.
write
(
self
.
_gen_str
(
sample
))
def
run_from_stdin
(
self
):
'''
This function reads the data row from stdin, parses it with the
process function, and further parses the return value of the
process function with the _gen_str function. The parsed data will
be wrote to stdout and the corresponding protofile will be
generated.
Example:
.. code-block:: python
import paddle.fluid.incubate.data_generator as dg
class MyData(dg.DataGenerator):
def generate_sample(self, line):
def local_iter():
int_words = [int(x) for x in line.split()]
yield ("words", [int_words])
return local_iter
mydata = MyData()
mydata.run_from_stdin()
'''
batch_samples
=
[]
for
line
in
sys
.
stdin
:
line_iter
=
self
.
generate_sample
(
line
)
for
user_parsed_line
in
line_iter
():
if
user_parsed_line
==
None
:
continue
batch_samples
.
append
(
user_parsed_line
)
if
len
(
batch_samples
)
==
self
.
batch_size_
:
batch_iter
=
self
.
generate_batch
(
batch_samples
)
for
sample
in
batch_iter
():
sys
.
stdout
.
write
(
self
.
_gen_str
(
sample
))
batch_samples
=
[]
if
len
(
batch_samples
)
>
0
:
batch_iter
=
self
.
generate_batch
(
batch_samples
)
for
sample
in
batch_iter
():
sys
.
stdout
.
write
(
self
.
_gen_str
(
sample
))
def
_gen_str
(
self
,
line
):
'''
Further processing the output of the process() function rewritten by
user, outputting data that can be directly read by the datafeed,and
updating proto_info information.
Args:
line(str): the output of the process() function rewritten by user.
Returns:
Return a string data that can be read directly by the datafeed.
'''
raise
NotImplementedError
(
"pls use MultiSlotDataGenerator or PairWiseDataGenerator"
)
def
generate_sample
(
self
,
line
):
'''
This function needs to be overridden by the user to process the
original data row into a list or tuple.
Args:
line(str): the original data row
Returns:
Returns the data processed by the user.
The data format is list or tuple:
[(name, [feasign, ...]), ...]
or ((name, [feasign, ...]), ...)
For example:
[("words", [1926, 08, 17]), ("label", [1])]
or (("words", [1926, 08, 17]), ("label", [1]))
Note:
The type of feasigns must be in int or float. Once the float
element appears in the feasign, the type of that slot will be
processed into a float.
Example:
.. code-block:: python
import paddle.fluid.incubate.data_generator as dg
class MyData(dg.DataGenerator):
def generate_sample(self, line):
def local_iter():
int_words = [int(x) for x in line.split()]
yield ("words", [int_words])
return local_iter
'''
raise
NotImplementedError
(
"Please rewrite this function to return a list or tuple: "
+
"[(name, [feasign, ...]), ...] or ((name, [feasign, ...]), ...)"
)
def
generate_batch
(
self
,
samples
):
'''
This function needs to be overridden by the user to process the
generated samples from generate_sample(self, str) function
It is usually used as batch processing when a user wants to
do preprocessing on a batch of samples, e.g. padding according to
the max length of a sample in the batch
Args:
samples(list tuple): generated sample from generate_sample
Returns:
a python generator, the same format as return value of generate_sample
Example:
.. code-block:: python
import paddle.fluid.incubate.data_generator as dg
class MyData(dg.DataGenerator):
def generate_sample(self, line):
def local_iter():
int_words = [int(x) for x in line.split()]
yield ("words", int_words)
return local_iter
def generate_batch(self, samples):
def local_iter():
for s in samples:
yield ("words", s[1].extend([s[1][0]]))
mydata = MyData()
mydata.set_batch(128)
'''
def
local_iter
():
for
sample
in
samples
:
yield
sample
return
local_iter
# TODO: guru4elephant
# add more generalized DataGenerator that can adapt user-defined slot
# for example, [(name, float_list), (name, str_list), (name, int_list)]
class
MultiSlotStringDataGenerator
(
DataGenerator
):
def
_gen_str
(
self
,
line
):
'''
Further processing the output of the process() function rewritten by
user, outputting data that can be directly read by the MultiSlotDataFeed,
and updating proto_info information.
The input line will be in this format:
>>> [(name, [str(feasign), ...]), ...]
>>> or ((name, [str(feasign), ...]), ...)
The output will be in this format:
>>> [ids_num id1 id2 ...] ...
For example, if the input is like this:
>>> [("words", ["1926", "08", "17"]), ("label", ["1"])]
>>> or (("words", ["1926", "08", "17"]), ("label", ["1"]))
the output will be:
>>> 3 1234 2345 3456 1 1
Args:
line(str): the output of the process() function rewritten by user.
Returns:
Return a string data that can be read directly by the MultiSlotDataFeed.
'''
if
not
isinstance
(
line
,
list
)
and
not
isinstance
(
line
,
tuple
):
raise
ValueError
(
"the output of process() must be in list or tuple type"
"Examples: [('words', ['1926', '08', '17']), ('label', ['1'])]"
)
output
=
""
for
index
,
item
in
enumerate
(
line
):
name
,
elements
=
item
if
output
:
output
+=
" "
out_str
=
[]
out_str
.
append
(
str
(
len
(
elements
)))
out_str
.
extend
(
elements
)
output
+=
" "
.
join
(
out_str
)
return
output
+
"
\n
"
class
MultiSlotDataGenerator
(
DataGenerator
):
def
_gen_str
(
self
,
line
):
'''
Further processing the output of the process() function rewritten by
user, outputting data that can be directly read by the MultiSlotDataFeed,
and updating proto_info information.
The input line will be in this format:
>>> [(name, [feasign, ...]), ...]
>>> or ((name, [feasign, ...]), ...)
The output will be in this format:
>>> [ids_num id1 id2 ...] ...
The proto_info will be in this format:
>>> [(name, type), ...]
For example, if the input is like this:
>>> [("words", [1926, 08, 17]), ("label", [1])]
>>> or (("words", [1926, 08, 17]), ("label", [1]))
the output will be:
>>> 3 1234 2345 3456 1 1
the proto_info will be:
>>> [("words", "uint64"), ("label", "uint64")]
Args:
line(str): the output of the process() function rewritten by user.
Returns:
Return a string data that can be read directly by the MultiSlotDataFeed.
'''
if
not
isinstance
(
line
,
list
)
and
not
isinstance
(
line
,
tuple
):
raise
ValueError
(
"the output of process() must be in list or tuple type"
"Example: [('words', [1926, 08, 17]), ('label', [1])]"
)
output
=
""
if
self
.
_proto_info
is
None
:
self
.
_proto_info
=
[]
for
item
in
line
:
name
,
elements
=
item
if
not
isinstance
(
name
,
str
):
raise
ValueError
(
"name%s must be in str type"
%
type
(
name
))
if
not
isinstance
(
elements
,
list
):
raise
ValueError
(
"elements%s must be in list type"
%
type
(
elements
))
if
not
elements
:
raise
ValueError
(
"the elements of each field can not be empty, you need padding it in process()."
)
self
.
_proto_info
.
append
((
name
,
"uint64"
))
if
output
:
output
+=
" "
output
+=
str
(
len
(
elements
))
for
elem
in
elements
:
if
isinstance
(
elem
,
float
):
self
.
_proto_info
[
-
1
]
=
(
name
,
"float"
)
elif
not
isinstance
(
elem
,
int
)
and
not
isinstance
(
elem
,
long
):
raise
ValueError
(
"the type of element%s must be in int or float"
%
type
(
elem
))
output
+=
" "
+
str
(
elem
)
else
:
if
len
(
line
)
!=
len
(
self
.
_proto_info
):
raise
ValueError
(
"the complete field set of two given line are inconsistent."
)
for
index
,
item
in
enumerate
(
line
):
name
,
elements
=
item
if
not
isinstance
(
name
,
str
):
raise
ValueError
(
"name%s must be in str type"
%
type
(
name
))
if
not
isinstance
(
elements
,
list
):
raise
ValueError
(
"elements%s must be in list type"
%
type
(
elements
))
if
not
elements
:
raise
ValueError
(
"the elements of each field can not be empty, you need padding it in process()."
)
if
name
!=
self
.
_proto_info
[
index
][
0
]:
raise
ValueError
(
"the field name of two given line are not match: require<%s>, get<%s>."
%
(
self
.
_proto_info
[
index
][
0
],
name
))
if
output
:
output
+=
" "
output
+=
str
(
len
(
elements
))
for
elem
in
elements
:
if
self
.
_proto_info
[
index
][
1
]
!=
"float"
:
if
isinstance
(
elem
,
float
):
self
.
_proto_info
[
index
]
=
(
name
,
"float"
)
elif
not
isinstance
(
elem
,
int
)
and
not
isinstance
(
elem
,
long
):
raise
ValueError
(
"the type of element%s must be in int or float"
%
type
(
elem
))
output
+=
" "
+
str
(
elem
)
return
output
+
"
\n
"
python/paddle/fluid/incubate/data_generator/test_data_generator.py
已删除
100644 → 0
浏览文件 @
bf0ec9b8
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
from
__init__
import
*
class
SyntheticData
(
MultiSlotDataGenerator
):
def
generate_sample
(
self
,
line
):
def
data_iter
():
for
i
in
range
(
10000
):
yield
(
"words"
,
[
1
,
2
,
3
,
4
]),
(
"label"
,
[
0
])
return
data_iter
class
SyntheticStringData
(
MultiSlotStringDataGenerator
):
def
generate_sample
(
self
,
line
):
def
data_iter
():
for
i
in
range
(
10000
):
yield
(
"words"
,
[
"1"
,
"2"
,
"3"
,
"4"
],
(
"label"
,
[
"0"
]))
sd
=
SyntheticData
()
sd
.
run_from_memory
()
sd2
=
SyntheticStringData
()
sd
.
run_from_memory
()
python/paddle/fluid/incubate/fleet/utils/fleet_barrier_util.py
已删除
100644 → 0
浏览文件 @
bf0ec9b8
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler
import
fleet
from
paddle.fluid.contrib.utils
import
HDFSClient
import
os
import
time
def
check_all_trainers_ready
(
ready_path
,
epoch
):
trainer_num
=
fleet
.
worker_num
()
trainer_id
=
fleet
.
worker_index
()
hadoop_home
=
os
.
getenv
(
"HADOOP_HOME"
)
configs
=
{
"fs.default.name"
:
os
.
getenv
(
"FS_NAME"
),
"hadoop.job.ugi"
:
os
.
getenv
(
"FS_UGI"
)
}
node_ready
=
"ready.{}.{}.done"
.
format
(
epoch
,
trainer_id
)
with
open
(
node_ready
,
"w"
)
as
node
:
node
.
write
(
""
)
client
=
HDFSClient
(
hadoop_home
,
configs
)
if
not
client
.
is_dir
(
ready_path
):
client
.
makedirs
(
ready_path
)
client
.
upload
(
hdfs_path
=
ready_path
,
local_path
=
node_ready
,
overwrite
=
True
,
retry_times
=
0
)
print
(
"PUT {} ON HDFS {} OK"
.
format
(
node_ready
,
ready_path
))
while
True
:
ready_num
=
len
(
client
.
ls
(
ready_path
))
print
(
"have {} trainers need to be ready"
.
format
(
trainer_num
-
ready_num
%
trainer_num
))
if
ready_num
%
trainer_num
==
0
:
break
time
.
sleep
(
10
)
ready_num
=
len
(
client
.
ls
(
ready_path
))
print
(
"All trainers are ready, continue training"
)
python/paddle/fluid/tests/unittests/test_fleet_utils.py
浏览文件 @
e58c705b
...
@@ -24,7 +24,6 @@ import sys
...
@@ -24,7 +24,6 @@ import sys
from
paddle.dataset.common
import
download
,
DATA_HOME
from
paddle.dataset.common
import
download
,
DATA_HOME
import
paddle.fluid.incubate.fleet.base.role_maker
as
role_maker
import
paddle.fluid.incubate.fleet.base.role_maker
as
role_maker
from
paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler
import
fleet
from
paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler
import
fleet
from
paddle.fluid.incubate.fleet.utils.fleet_barrier_util
import
check_all_trainers_ready
from
paddle.fluid.incubate.fleet.utils.fleet_util
import
FleetUtil
from
paddle.fluid.incubate.fleet.utils.fleet_util
import
FleetUtil
import
paddle.fluid.incubate.fleet.utils.utils
as
utils
import
paddle.fluid.incubate.fleet.utils.utils
as
utils
...
@@ -50,15 +49,6 @@ class TestFleetUtils(unittest.TestCase):
...
@@ -50,15 +49,6 @@ class TestFleetUtils(unittest.TestCase):
fleet_util_transpiler
=
FleetUtil
(
mode
=
"transpiler"
)
fleet_util_transpiler
=
FleetUtil
(
mode
=
"transpiler"
)
self
.
assertRaises
(
Exception
,
FleetUtil
,
"other"
)
self
.
assertRaises
(
Exception
,
FleetUtil
,
"other"
)
def
test_fleet_barrier
(
self
):
role
=
role_maker
.
UserDefinedRoleMaker
(
current_id
=
0
,
role
=
role_maker
.
Role
.
WORKER
,
worker_num
=
1
,
server_endpoints
=
[
'127.0.0.1'
])
fleet
.
init
(
role
)
check_all_trainers_ready
(
"/ready_path/"
,
0
)
def
test_program_type_trans
(
self
):
def
test_program_type_trans
(
self
):
data_dir
=
self
.
download_files
()
data_dir
=
self
.
download_files
()
program_dir
=
os
.
path
.
join
(
data_dir
,
self
.
pruned_dir
)
program_dir
=
os
.
path
.
join
(
data_dir
,
self
.
pruned_dir
)
...
...
python/paddle/incubate/__init__.py
浏览文件 @
e58c705b
...
@@ -14,10 +14,8 @@
...
@@ -14,10 +14,8 @@
from
.
import
optimizer
from
.
import
optimizer
from
.
import
checkpoint
from
.
import
checkpoint
from
..fluid.contrib
import
reader
from
..fluid.layer_helper
import
LayerHelper
from
..fluid.layer_helper
import
LayerHelper
__all__
=
[]
__all__
=
[]
__all__
+=
[
"reader"
]
__all__
+=
optimizer
.
__all__
__all__
+=
optimizer
.
__all__
__all__
+=
checkpoint
.
__all__
__all__
+=
checkpoint
.
__all__
python/setup.py.in
浏览文件 @
e58c705b
...
@@ -177,11 +177,9 @@ packages=['paddle',
...
@@ -177,11 +177,9 @@ packages=['paddle',
'paddle.fluid.contrib',
'paddle.fluid.contrib',
'paddle.fluid.contrib.decoder',
'paddle.fluid.contrib.decoder',
'paddle.fluid.contrib.quantize',
'paddle.fluid.contrib.quantize',
'paddle.fluid.contrib.reader',
'paddle.fluid.contrib.slim',
'paddle.fluid.contrib.slim',
'paddle.fluid.contrib.slim.quantization',
'paddle.fluid.contrib.slim.quantization',
'paddle.fluid.contrib.slim.quantization.imperative',
'paddle.fluid.contrib.slim.quantization.imperative',
'paddle.fluid.contrib.utils',
'paddle.fluid.contrib.extend_optimizer',
'paddle.fluid.contrib.extend_optimizer',
'paddle.fluid.contrib.mixed_precision',
'paddle.fluid.contrib.mixed_precision',
'paddle.fluid.contrib.mixed_precision.bf16',
'paddle.fluid.contrib.mixed_precision.bf16',
...
@@ -189,7 +187,6 @@ packages=['paddle',
...
@@ -189,7 +187,6 @@ packages=['paddle',
'paddle.fluid.transpiler',
'paddle.fluid.transpiler',
'paddle.fluid.transpiler.details',
'paddle.fluid.transpiler.details',
'paddle.fluid.incubate',
'paddle.fluid.incubate',
'paddle.fluid.incubate.data_generator',
'paddle.fluid.incubate.fleet',
'paddle.fluid.incubate.fleet',
'paddle.fluid.incubate.checkpoint',
'paddle.fluid.incubate.checkpoint',
'paddle.fluid.incubate.fleet.base',
'paddle.fluid.incubate.fleet.base',
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录