Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
10d85208
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
10d85208
编写于
4月 22, 2020
作者:
K
Kaipeng Deng
提交者:
GitHub
4月 22, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix test_multiprocess_dataloader_base timeout. test=develop (#24053)
上级
d053dfd5
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
162 addition
and
114 deletion
+162
-114
python/paddle/fluid/tests/unittests/CMakeLists.txt
python/paddle/fluid/tests/unittests/CMakeLists.txt
+4
-2
python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dynamic.py
...d/tests/unittests/test_multiprocess_dataloader_dynamic.py
+132
-0
python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_static.py
...id/tests/unittests/test_multiprocess_dataloader_static.py
+26
-112
未找到文件。
python/paddle/fluid/tests/unittests/CMakeLists.txt
浏览文件 @
10d85208
...
@@ -211,7 +211,8 @@ if (APPLE OR WIN32)
...
@@ -211,7 +211,8 @@ if (APPLE OR WIN32)
list
(
REMOVE_ITEM TEST_OPS test_imperative_data_loader_fds_clear
)
list
(
REMOVE_ITEM TEST_OPS test_imperative_data_loader_fds_clear
)
list
(
REMOVE_ITEM TEST_OPS test_imperative_data_loader_exit_func
)
list
(
REMOVE_ITEM TEST_OPS test_imperative_data_loader_exit_func
)
list
(
REMOVE_ITEM TEST_OPS test_imperative_signal_handler
)
list
(
REMOVE_ITEM TEST_OPS test_imperative_signal_handler
)
list
(
REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_base
)
list
(
REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_static
)
list
(
REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_dynamic
)
list
(
REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_exception
)
list
(
REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_exception
)
endif
()
endif
()
...
@@ -383,6 +384,7 @@ if(NOT WIN32 AND NOT APPLE)
...
@@ -383,6 +384,7 @@ if(NOT WIN32 AND NOT APPLE)
set_tests_properties
(
test_imperative_data_loader_base PROPERTIES LABELS
"RUN_TYPE=EXCLUSIVE"
RUN_SERIAL TRUE
)
set_tests_properties
(
test_imperative_data_loader_base PROPERTIES LABELS
"RUN_TYPE=EXCLUSIVE"
RUN_SERIAL TRUE
)
set_tests_properties
(
test_imperative_data_loader_fds_clear PROPERTIES LABELS
"RUN_TYPE=EXCLUSIVE"
RUN_SERIAL TRUE
)
set_tests_properties
(
test_imperative_data_loader_fds_clear PROPERTIES LABELS
"RUN_TYPE=EXCLUSIVE"
RUN_SERIAL TRUE
)
# set_tests_properties(test_imperative_data_loader_exception PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" RUN_SERIAL TRUE)
# set_tests_properties(test_imperative_data_loader_exception PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" RUN_SERIAL TRUE)
set_tests_properties
(
test_multiprocess_dataloader_base PROPERTIES LABELS
"RUN_TYPE=EXCLUSIVE"
RUN_SERIAL TRUE
)
set_tests_properties
(
test_multiprocess_dataloader_static PROPERTIES LABELS
"RUN_TYPE=EXCLUSIVE"
RUN_SERIAL TRUE
)
set_tests_properties
(
test_multiprocess_dataloader_dynamic PROPERTIES LABELS
"RUN_TYPE=EXCLUSIVE"
RUN_SERIAL TRUE
)
set_tests_properties
(
test_multiprocess_dataloader_exception PROPERTIES LABELS
"RUN_TYPE=EXCLUSIVE"
RUN_SERIAL TRUE
)
set_tests_properties
(
test_multiprocess_dataloader_exception PROPERTIES LABELS
"RUN_TYPE=EXCLUSIVE"
RUN_SERIAL TRUE
)
endif
()
endif
()
python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dynamic.py
0 → 100644
浏览文件 @
10d85208
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
division
import
os
import
sys
import
six
import
time
import
unittest
import
multiprocessing
import
numpy
as
np
import
paddle.fluid
as
fluid
from
paddle.io
import
Dataset
,
BatchSampler
,
DataLoader
from
paddle.fluid.dygraph.nn
import
Linear
from
paddle.fluid.dygraph.base
import
to_variable
from
test_multiprocess_dataloader_static
import
RandomDataset
,
prepare_places
EPOCH_NUM
=
5
BATCH_SIZE
=
16
IMAGE_SIZE
=
784
SAMPLE_NUM
=
400
CLASS_NUM
=
10
class
SimpleFCNet
(
fluid
.
dygraph
.
Layer
):
def
__init__
(
self
):
super
(
SimpleFCNet
,
self
).
__init__
()
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Constant
(
value
=
0.8
))
bias_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Constant
(
value
=
0.5
))
self
.
_fcs
=
[]
in_channel
=
IMAGE_SIZE
for
hidden_size
in
[
10
,
20
,
30
]:
self
.
_fcs
.
append
(
Linear
(
in_channel
,
hidden_size
,
act
=
'tanh'
,
param_attr
=
param_attr
,
bias_attr
=
bias_attr
))
in_channel
=
hidden_size
self
.
_fcs
.
append
(
Linear
(
in_channel
,
CLASS_NUM
,
act
=
'softmax'
,
param_attr
=
param_attr
,
bias_attr
=
bias_attr
))
def
forward
(
self
,
image
):
out
=
image
for
fc
in
self
.
_fcs
:
out
=
fc
(
out
)
return
out
class
TestDygraphDataLoader
(
unittest
.
TestCase
):
def
run_main
(
self
,
num_workers
,
places
):
fluid
.
default_startup_program
().
random_seed
=
1
fluid
.
default_main_program
().
random_seed
=
1
with
fluid
.
dygraph
.
guard
(
places
[
0
]):
fc_net
=
SimpleFCNet
()
optimizer
=
fluid
.
optimizer
.
Adam
(
parameter_list
=
fc_net
.
parameters
())
dataset
=
RandomDataset
(
SAMPLE_NUM
,
CLASS_NUM
)
dataloader
=
DataLoader
(
dataset
,
places
=
places
,
num_workers
=
num_workers
,
batch_size
=
BATCH_SIZE
,
drop_last
=
True
)
assert
len
(
dataloader
)
==
int
(
SAMPLE_NUM
/
BATCH_SIZE
)
step_list
=
[]
loss_list
=
[]
start_t
=
time
.
time
()
for
_
in
six
.
moves
.
range
(
EPOCH_NUM
):
step
=
0
for
image
,
label
in
dataloader
():
out
=
fc_net
(
image
)
loss
=
fluid
.
layers
.
cross_entropy
(
out
,
label
)
avg_loss
=
fluid
.
layers
.
reduce_mean
(
loss
)
avg_loss
.
backward
()
optimizer
.
minimize
(
avg_loss
)
fc_net
.
clear_gradients
()
loss_list
.
append
(
np
.
mean
(
avg_loss
.
numpy
()))
step
+=
1
step_list
.
append
(
step
)
end_t
=
time
.
time
()
ret
=
{
"time"
:
end_t
-
start_t
,
"step"
:
step_list
,
"loss"
:
np
.
array
(
loss_list
)
}
print
(
"time cost"
,
ret
[
'time'
],
'step_list'
,
ret
[
'step'
])
return
ret
def
test_main
(
self
):
# dynamic graph do not run with_data_parallel
for
p
in
prepare_places
(
False
):
results
=
[]
for
num_workers
in
[
0
,
2
]:
print
(
self
.
__class__
.
__name__
,
p
,
num_workers
)
sys
.
stdout
.
flush
()
ret
=
self
.
run_main
(
num_workers
=
num_workers
,
places
=
p
)
results
.
append
(
ret
)
diff
=
np
.
max
(
np
.
abs
(
results
[
0
][
'loss'
]
-
results
[
1
][
'loss'
])
/
np
.
abs
(
results
[
0
][
'loss'
]))
self
.
assertLess
(
diff
,
1e-2
)
if
__name__
==
'__main__'
:
unittest
.
main
()
python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_
base
.py
→
python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_
static
.py
浏览文件 @
10d85208
...
@@ -24,8 +24,6 @@ import numpy as np
...
@@ -24,8 +24,6 @@ import numpy as np
import
paddle.fluid
as
fluid
import
paddle.fluid
as
fluid
from
paddle.io
import
Dataset
,
BatchSampler
,
DataLoader
from
paddle.io
import
Dataset
,
BatchSampler
,
DataLoader
from
paddle.fluid.dygraph.nn
import
Linear
from
paddle.fluid.dygraph.base
import
to_variable
EPOCH_NUM
=
5
EPOCH_NUM
=
5
BATCH_SIZE
=
16
BATCH_SIZE
=
16
...
@@ -86,42 +84,24 @@ def simple_fc_net_static():
...
@@ -86,42 +84,24 @@ def simple_fc_net_static():
return
startup_prog
,
main_prog
,
image
,
label
,
loss
return
startup_prog
,
main_prog
,
image
,
label
,
loss
class
SimpleFCNet
(
fluid
.
dygraph
.
Layer
):
def
prepare_places
(
with_data_parallel
,
with_cpu
=
False
,
with_gpu
=
True
):
def
__init__
(
self
):
places
=
[]
super
(
SimpleFCNet
,
self
).
__init__
()
if
with_cpu
:
places
.
append
([
fluid
.
CPUPlace
()])
if
with_data_parallel
:
places
.
append
([
fluid
.
CPUPlace
()]
*
2
)
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Constant
(
if
with_gpu
and
fluid
.
core
.
is_compiled_with_cuda
():
value
=
0.8
))
tmp
=
fluid
.
cuda_places
()[:
2
]
bias_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Constant
(
assert
len
(
tmp
)
>
0
,
"no gpu detected"
value
=
0.5
))
if
with_data_parallel
:
self
.
_fcs
=
[]
places
.
append
(
tmp
)
in_channel
=
IMAGE_SIZE
places
.
append
([
tmp
[
0
]])
for
hidden_size
in
[
10
,
20
,
30
]:
return
places
self
.
_fcs
.
append
(
Linear
(
in_channel
,
hidden_size
,
act
=
'tanh'
,
param_attr
=
param_attr
,
bias_attr
=
bias_attr
))
in_channel
=
hidden_size
self
.
_fcs
.
append
(
Linear
(
in_channel
,
CLASS_NUM
,
act
=
'softmax'
,
param_attr
=
param_attr
,
bias_attr
=
bias_attr
))
def
forward
(
self
,
image
):
out
=
image
for
fc
in
self
.
_fcs
:
out
=
fc
(
out
)
return
out
class
TestStaticDataLoader
(
unittest
.
TestCase
):
class
TestStaticDataLoader
(
unittest
.
TestCase
):
def
run_main
(
self
,
num_workers
,
places
,
with_data_parallel
):
def
run_main
(
self
,
num_workers
,
places
):
scope
=
fluid
.
Scope
()
scope
=
fluid
.
Scope
()
with
fluid
.
scope_guard
(
scope
):
with
fluid
.
scope_guard
(
scope
):
startup_prog
,
main_prog
,
image
,
label
,
loss
=
simple_fc_net_static
()
startup_prog
,
main_prog
,
image
,
label
,
loss
=
simple_fc_net_static
()
...
@@ -140,7 +120,7 @@ class TestStaticDataLoader(unittest.TestCase):
...
@@ -140,7 +120,7 @@ class TestStaticDataLoader(unittest.TestCase):
exe
.
run
(
startup_prog
)
exe
.
run
(
startup_prog
)
prog
=
fluid
.
CompiledProgram
(
main_prog
)
prog
=
fluid
.
CompiledProgram
(
main_prog
)
if
with_data_parallel
:
if
len
(
places
)
>
1
:
prog
=
prog
.
with_data_parallel
(
prog
=
prog
.
with_data_parallel
(
loss_name
=
loss
.
name
,
places
=
places
)
loss_name
=
loss
.
name
,
places
=
places
)
...
@@ -176,84 +156,18 @@ class TestStaticDataLoader(unittest.TestCase):
...
@@ -176,84 +156,18 @@ class TestStaticDataLoader(unittest.TestCase):
print
(
"time cost"
,
ret
[
'time'
],
'step_list'
,
ret
[
'step'
])
print
(
"time cost"
,
ret
[
'time'
],
'step_list'
,
ret
[
'step'
])
return
ret
return
ret
def
prepare_places
(
self
,
with_data_parallel
,
with_cpu
=
True
,
with_gpu
=
True
):
places
=
[]
# FIXME: PR_CI_Py35 may hang on Multi-CPUs with multiprocess, but it
# works fine locally, this should be fixed. OTOH, multiprocessing
# is not recommended when running on CPU generally
if
with_cpu
and
not
sys
.
version
.
startswith
(
'3.5'
):
places
.
append
([
fluid
.
CPUPlace
()])
if
with_data_parallel
:
places
.
append
([
fluid
.
CPUPlace
()]
*
2
)
if
with_gpu
and
fluid
.
core
.
is_compiled_with_cuda
():
tmp
=
fluid
.
cuda_places
()[:
2
]
assert
len
(
tmp
)
>
0
,
"no gpu detected"
if
with_data_parallel
:
places
.
append
(
tmp
)
places
.
append
([
tmp
[
0
]])
return
places
def
test_main
(
self
):
def
test_main
(
self
):
for
with_data_parallel
in
[
False
]
if
self
.
__class__
.
__name__
\
for
p
in
prepare_places
(
True
):
==
"TestDygraphDataLoader"
else
[
True
,
False
]:
results
=
[]
for
p
in
self
.
prepare_places
(
with_data_parallel
):
for
num_workers
in
[
0
,
2
]:
results
=
[]
print
(
self
.
__class__
.
__name__
,
p
,
num_workers
)
for
num_workers
in
[
0
,
2
]:
sys
.
stdout
.
flush
()
print
(
self
.
__class__
.
__name__
,
p
,
num_workers
)
ret
=
self
.
run_main
(
num_workers
=
num_workers
,
places
=
p
)
ret
=
self
.
run_main
(
results
.
append
(
ret
)
num_workers
=
num_workers
,
diff
=
np
.
max
(
places
=
p
,
np
.
abs
(
results
[
0
][
'loss'
]
-
results
[
1
][
'loss'
])
/
with_data_parallel
=
with_data_parallel
)
np
.
abs
(
results
[
0
][
'loss'
]))
results
.
append
(
ret
)
self
.
assertLess
(
diff
,
1e-2
)
diff
=
np
.
max
(
np
.
abs
(
results
[
0
][
'loss'
]
-
results
[
1
][
'loss'
])
/
np
.
abs
(
results
[
0
][
'loss'
]))
self
.
assertLess
(
diff
,
1e-2
)
class
TestDygraphDataLoader
(
TestStaticDataLoader
):
def
run_main
(
self
,
num_workers
,
places
,
with_data_parallel
):
fluid
.
default_startup_program
().
random_seed
=
1
fluid
.
default_main_program
().
random_seed
=
1
with
fluid
.
dygraph
.
guard
(
places
[
0
]):
fc_net
=
SimpleFCNet
()
optimizer
=
fluid
.
optimizer
.
Adam
(
parameter_list
=
fc_net
.
parameters
())
dataset
=
RandomDataset
(
SAMPLE_NUM
,
CLASS_NUM
)
dataloader
=
DataLoader
(
dataset
,
places
=
places
,
num_workers
=
num_workers
,
batch_size
=
BATCH_SIZE
,
drop_last
=
True
)
assert
len
(
dataloader
)
==
int
(
SAMPLE_NUM
/
BATCH_SIZE
)
step_list
=
[]
loss_list
=
[]
start_t
=
time
.
time
()
for
_
in
six
.
moves
.
range
(
EPOCH_NUM
):
step
=
0
for
image
,
label
in
dataloader
():
out
=
fc_net
(
image
)
loss
=
fluid
.
layers
.
cross_entropy
(
out
,
label
)
avg_loss
=
fluid
.
layers
.
reduce_mean
(
loss
)
avg_loss
.
backward
()
optimizer
.
minimize
(
avg_loss
)
fc_net
.
clear_gradients
()
loss_list
.
append
(
np
.
mean
(
avg_loss
.
numpy
()))
step
+=
1
step_list
.
append
(
step
)
end_t
=
time
.
time
()
ret
=
{
"time"
:
end_t
-
start_t
,
"step"
:
step_list
,
"loss"
:
np
.
array
(
loss_list
)
}
print
(
"time cost"
,
ret
[
'time'
],
'step_list'
,
ret
[
'step'
])
return
ret
if
__name__
==
'__main__'
:
if
__name__
==
'__main__'
:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录