Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Paddle
提交
accf3f75
P
Paddle
项目概览
PaddlePaddle
/
Paddle
大约 1 年 前同步成功
通知
2298
Star
20931
Fork
5422
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1423
列表
看板
标记
里程碑
合并请求
543
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1,423
Issue
1,423
列表
看板
标记
里程碑
合并请求
543
合并请求
543
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
accf3f75
编写于
9月 27, 2018
作者:
Q
qiaolongfei
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
optimize pyreader
上级
1ab7b551
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
197 addition
and
141 deletion
+197
-141
paddle/fluid/CMakeLists.txt
paddle/fluid/CMakeLists.txt
+1
-2
python/paddle/fluid/layers/io.py
python/paddle/fluid/layers/io.py
+166
-122
python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py
...le/fluid/tests/unittests/test_py_reader_using_executor.py
+30
-17
未找到文件。
paddle/fluid/CMakeLists.txt
浏览文件 @
accf3f75
...
...
@@ -12,6 +12,5 @@ endif(NOT WIN32)
if
(
WITH_INFERENCE
)
# NOTE: please add subdirectory inference at last.
add_subdirectory
(
inference
)
add_subdirectory
(
train
)
endif
()
add_subdirectory
(
train
)
python/paddle/fluid/layers/io.py
浏览文件 @
accf3f75
...
...
@@ -30,7 +30,8 @@ from ..unique_name import generate as unique_name
__all__
=
[
'data'
,
'open_files'
,
'read_file'
,
'shuffle'
,
'batch'
,
'double_buffer'
,
'random_data_generator'
,
'py_reader'
,
'Preprocessor'
,
'load'
'random_data_generator'
,
'py_reader'
,
'py_reader_by_data'
,
'Preprocessor'
,
'load'
]
...
...
@@ -471,6 +472,154 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True):
return
monkey_patch_reader_methods
(
main_prog_var
)
def
_py_reader
(
capacity
,
shapes
,
dtypes
,
lod_levels
=
None
,
name
=
None
,
use_double_buffer
=
True
,
feed_list
=
None
):
if
feed_list
is
not
None
:
assert
isinstance
(
feed_list
,
list
)
lod_levels
=
[]
dtypes
=
[]
shape_concat
=
[]
ranks
=
[]
shapes
=
[]
for
data
in
feed_list
:
dtypes
.
append
(
data
.
dtype
)
shape_concat
.
extend
(
data
.
shape
)
ranks
.
append
(
len
(
data
.
shape
))
shapes
.
append
(
data
.
shape
)
lod_levels
.
append
(
data
.
lod_level
)
else
:
dtypes
=
[
convert_np_dtype_to_dtype_
(
dt
)
for
dt
in
dtypes
]
shape_concat
=
[]
ranks
=
[]
for
shape
in
shapes
:
shape_concat
.
extend
(
shape
)
ranks
.
append
(
len
(
shape
))
if
lod_levels
is
None
:
lod_levels
=
[
0
]
*
len
(
shapes
)
if
name
is
None
:
queue_name
=
unique_name
(
'lod_tensor_blocking_queue'
)
reader_name
=
unique_name
(
'create_py_reader'
)
double_buffer_name
=
unique_name
(
'double_buffer'
)
else
:
queue_name
=
"_"
.
join
([
name
,
"queue"
])
reader_name
=
"_"
.
join
([
name
,
"reader"
])
double_buffer_name
=
"_"
.
join
([
name
,
"double_buffer"
])
var
=
global_scope
().
var
(
queue_name
)
feed_queue
=
core
.
init_lod_tensor_blocking_queue
(
var
,
capacity
,
shapes
)
startup_blk
=
default_startup_program
().
current_block
()
startup_var
=
startup_blk
.
create_var
(
name
=
reader_name
)
startup_blk
.
append_op
(
type
=
'create_py_reader'
,
inputs
=
{
'blocking_queue'
:
[
queue_name
]},
outputs
=
{
'Out'
:
[
startup_var
]},
attrs
=
{
'shape_concat'
:
shape_concat
,
'lod_levels'
:
lod_levels
,
'ranks'
:
ranks
})
startup_var
.
desc
.
set_dtypes
(
dtypes
)
startup_var
.
desc
.
set_lod_levels
(
lod_levels
)
startup_var
.
persistable
=
True
main_prog_var
=
_copy_reader_var_
(
default_main_program
().
current_block
(),
startup_var
)
reader
=
monkey_patch_reader_methods
(
main_prog_var
)
if
use_double_buffer
:
double_buffer_reader
=
double_buffer
(
reader
,
name
=
double_buffer_name
)
# we return a double buffer reader. However, the reset method comes from
# py_reader.
double_buffer_reader
.
reset
=
reader
.
reset
reader
=
double_buffer_reader
# monkey patch py_reader special methods
reader
.
queue
=
feed_queue
current_reset_method
=
reader
.
reset
reader
.
thread
=
None
reader
.
tensor_provider
=
None
reader
.
exited
=
False
def
start_provide_thread
(
func
):
def
__provider_thread__
():
for
tensors
in
func
():
array
=
core
.
LoDTensorArray
()
for
item
in
tensors
:
if
not
isinstance
(
item
,
core
.
LoDTensor
):
tmp
=
core
.
LoDTensor
()
tmp
.
set
(
item
,
core
.
CPUPlace
())
item
=
tmp
array
.
append
(
item
)
if
reader
.
exited
:
break
feed_queue
.
push
(
array
)
if
reader
.
exited
:
break
feed_queue
.
close
()
reader
.
thread
=
threading
.
Thread
(
target
=
__provider_thread__
)
reader
.
thread
.
daemon
=
True
reader
.
thread
.
start
()
def
__set_tensor_provider__
(
func
):
reader
.
tensor_provider
=
func
def
__set_paddle_reader__
(
paddle_reader
):
with
program_guard
(
Program
(),
Program
()):
if
feed_list
is
None
:
feed_list
=
[]
counter
=
0
for
dtype
,
shape
,
lod_level
in
zip
(
dtypes
,
shapes
,
lod_levels
):
name
=
str
(
counter
)
feed_list
.
append
(
data
(
name
=
name
,
dtype
=
dtype
,
shape
=
shape
,
lod_level
=
lod_level
))
counter
+=
1
feeder
=
DataFeeder
(
feed_list
=
feed_list
,
place
=
core
.
CPUPlace
())
paddle_reader
=
feeder
.
decorate_reader
(
paddle_reader
,
multi_devices
=
False
)
def
__tensor_provider__
():
for
slots
in
paddle_reader
():
yield
[
slots
[
str
(
idx
)]
for
idx
in
six
.
moves
.
xrange
(
counter
)]
__set_tensor_provider__
(
__tensor_provider__
)
def
__reset__
():
current_reset_method
()
if
reader
.
thread
is
not
None
and
reader
.
tensor_provider
is
not
None
:
reader
.
exited
=
True
reader
.
thread
.
join
()
reader
.
exited
=
False
def
__start__
():
start_provide_thread
(
reader
.
tensor_provider
)
reader
.
reset
=
__reset__
reader
.
decorate_tensor_provider
=
__set_tensor_provider__
reader
.
decorate_paddle_reader
=
__set_paddle_reader__
reader
.
start
=
__start__
return
reader
def
py_reader
(
capacity
,
shapes
,
dtypes
,
...
...
@@ -597,129 +746,24 @@ def py_reader(capacity,
>>> except fluid.core.EOFException:
>>> test_reader.reset()
"""
dtypes
=
[
convert_np_dtype_to_dtype_
(
dt
)
for
dt
in
dtypes
]
shape_concat
=
[]
ranks
=
[]
for
shape
in
shapes
:
shape_concat
.
extend
(
shape
)
ranks
.
append
(
len
(
shape
))
if
lod_levels
is
None
:
lod_levels
=
[
0
]
*
len
(
shapes
)
if
name
is
None
:
queue_name
=
unique_name
(
'lod_tensor_blocking_queue'
)
reader_name
=
unique_name
(
'create_py_reader'
)
double_buffer_name
=
unique_name
(
'double_buffer'
)
else
:
queue_name
=
"_"
.
join
([
name
,
"queue"
])
reader_name
=
"_"
.
join
([
name
,
"reader"
])
double_buffer_name
=
"_"
.
join
([
name
,
"double_buffer"
])
var
=
global_scope
().
var
(
queue_name
)
feed_queue
=
core
.
init_lod_tensor_blocking_queue
(
var
,
capacity
,
shapes
)
startup_blk
=
default_startup_program
().
current_block
()
startup_var
=
startup_blk
.
create_var
(
name
=
reader_name
)
startup_blk
.
append_op
(
type
=
'create_py_reader'
,
inputs
=
{
'blocking_queue'
:
[
queue_name
]},
outputs
=
{
'Out'
:
[
startup_var
]},
attrs
=
{
'shape_concat'
:
shape_concat
,
'lod_levels'
:
lod_levels
,
'ranks'
:
ranks
})
startup_var
.
desc
.
set_dtypes
(
dtypes
)
startup_var
.
desc
.
set_lod_levels
(
lod_levels
)
startup_var
.
persistable
=
True
main_prog_var
=
_copy_reader_var_
(
default_main_program
().
current_block
(),
startup_var
)
reader
=
monkey_patch_reader_methods
(
main_prog_var
)
if
use_double_buffer
:
double_buffer_reader
=
double_buffer
(
reader
,
name
=
double_buffer_name
)
# we return a double buffer reader. However, the reset method comes from
# py_reader.
double_buffer_reader
.
reset
=
reader
.
reset
reader
=
double_buffer_reader
# monkey patch py_reader special methods
reader
.
queue
=
feed_queue
current_reset_method
=
reader
.
reset
reader
.
thread
=
None
reader
.
tensor_provider
=
None
reader
.
exited
=
False
def
start_provide_thread
(
func
):
def
__provider_thread__
():
for
tensors
in
func
():
array
=
core
.
LoDTensorArray
()
for
item
in
tensors
:
if
not
isinstance
(
item
,
core
.
LoDTensor
):
tmp
=
core
.
LoDTensor
()
tmp
.
set
(
item
,
core
.
CPUPlace
())
item
=
tmp
array
.
append
(
item
)
if
reader
.
exited
:
break
feed_queue
.
push
(
array
)
if
reader
.
exited
:
break
feed_queue
.
close
()
reader
.
thread
=
threading
.
Thread
(
target
=
__provider_thread__
)
reader
.
thread
.
daemon
=
True
reader
.
thread
.
start
()
def
__set_tensor_provider__
(
func
):
reader
.
tensor_provider
=
func
def
__set_paddle_reader__
(
paddle_reader
):
with
program_guard
(
Program
(),
Program
()):
feed_list
=
[]
counter
=
0
for
dtype
,
shape
,
lod_level
in
zip
(
dtypes
,
shapes
,
lod_levels
):
name
=
str
(
counter
)
feed_list
.
append
(
data
(
name
=
name
,
dtype
=
dtype
,
shape
=
shape
,
lod_level
=
lod_level
))
counter
+=
1
feeder
=
DataFeeder
(
feed_list
=
feed_list
,
place
=
core
.
CPUPlace
())
paddle_reader
=
feeder
.
decorate_reader
(
paddle_reader
,
multi_devices
=
False
)
def
__tensor_provider__
():
for
slots
in
paddle_reader
():
yield
[
slots
[
str
(
idx
)]
for
idx
in
six
.
moves
.
xrange
(
counter
)]
__set_tensor_provider__
(
__tensor_provider__
)
def
__reset__
():
current_reset_method
()
if
reader
.
thread
is
not
None
and
reader
.
tensor_provider
is
not
None
:
reader
.
exited
=
True
reader
.
thread
.
join
()
reader
.
exited
=
False
return
_py_reader
(
capacity
=
capacity
,
shapes
=
shapes
,
dtypes
=
dtypes
,
lod_levels
=
lod_levels
,
name
=
name
,
use_double_buffer
=
use_double_buffer
)
def
__start__
():
start_provide_thread
(
reader
.
tensor_provider
)
reader
.
reset
=
__reset__
reader
.
decorate_tensor_provider
=
__set_tensor_provider__
reader
.
decorate_paddle_reader
=
__set_paddle_reader__
reader
.
start
=
__start__
return
reader
def
py_reader_by_data
(
capacity
,
feed_list
,
name
=
None
,
use_double_buffer
=
True
):
return
_py_reader
(
capacity
=
capacity
,
shapes
=
None
,
dtypes
=
None
,
lod_levels
=
None
,
name
=
name
,
use_double_buffer
=
use_double_buffer
,
feed_list
=
feed_list
)
def
open_files
(
filenames
,
...
...
python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py
浏览文件 @
accf3f75
...
...
@@ -53,13 +53,22 @@ def simple_fc_net(in_size,
hidden_sizes
,
batch_size
,
queue_capacity
,
use_double_buffer
=
False
):
reader
=
fluid
.
layers
.
py_reader
(
capacity
=
queue_capacity
,
shapes
=
[[
-
1
,
in_size
],
[
-
1
,
1
]],
lod_levels
=
[
0
,
0
],
dtypes
=
[
'float32'
,
'int64'
],
use_double_buffer
=
False
)
use_double_buffer
=
False
,
use_feed_list
=
True
):
if
use_feed_list
:
data
=
fluid
.
layers
.
data
(
name
=
"data"
,
dtype
=
'float32'
,
shape
=
[
in_size
])
label
=
fluid
.
layers
.
data
(
name
=
'label'
,
dtype
=
'int64'
,
shape
=
[
1
])
reader
=
fluid
.
layers
.
py_reader_by_data
(
capacity
=
queue_capacity
,
use_double_buffer
=
False
,
feed_list
=
[
data
,
label
])
else
:
reader
=
fluid
.
layers
.
py_reader
(
capacity
=
queue_capacity
,
shapes
=
[[
-
1
,
in_size
],
[
-
1
,
1
]],
lod_levels
=
[
0
,
0
],
dtypes
=
[
'float32'
,
'int64'
],
use_double_buffer
=
False
)
feed_queue
=
reader
.
queue
reader
=
fluid
.
layers
.
batch
(
reader
,
batch_size
=
batch_size
)
if
use_double_buffer
:
...
...
@@ -100,14 +109,15 @@ class TestPyReaderUsingExecutor(unittest.TestCase):
if
core
.
is_compiled_with_cuda
()
else
[
False
]):
for
use_parallel_executor
in
[
False
,
True
]:
for
use_double_buffer
in
[
False
,
True
]:
print
(
'Test Parameters:'
),
print
({
'use_cuda'
:
use_cuda
,
'use_parallel_executor'
:
use_parallel_executor
,
'use_double_buffer'
:
use_double_buffer
})
self
.
main
(
use_cuda
,
use_parallel_executor
,
use_double_buffer
)
for
use_feed_list
in
[
False
,
True
]:
print
(
'Test Parameters:'
),
print
({
'use_cuda'
:
use_cuda
,
'use_parallel_executor'
:
use_parallel_executor
,
'use_double_buffer'
:
use_double_buffer
})
self
.
main
(
use_cuda
,
use_parallel_executor
,
use_double_buffer
,
use_feed_list
)
def
random_reader
(
self
):
def
reader
():
...
...
@@ -143,12 +153,14 @@ class TestPyReaderUsingExecutor(unittest.TestCase):
def
main
(
self
,
use_cuda
=
True
,
use_parallel_executor
=
False
,
use_double_buffer
=
False
):
use_double_buffer
=
False
,
use_feed_list
=
False
):
assert
not
use_cuda
or
use_cuda
and
core
.
is_compiled_with_cuda
()
self
.
use_cuda
=
use_cuda
self
.
use_parallel_executor
=
use_parallel_executor
self
.
use_double_buffer
=
use_double_buffer
self
.
use_feed_list
=
use_feed_list
startup_program
=
fluid
.
Program
()
main_program
=
fluid
.
Program
()
...
...
@@ -160,7 +172,8 @@ class TestPyReaderUsingExecutor(unittest.TestCase):
hidden_sizes
=
self
.
hidden_sizes
,
batch_size
=
self
.
batch_size
,
queue_capacity
=
self
.
queue_capacity
,
use_double_buffer
=
self
.
use_double_buffer
)
use_double_buffer
=
self
.
use_double_buffer
,
use_feed_list
=
self
.
use_feed_list
)
place
=
fluid
.
CUDAPlace
(
0
)
if
use_cuda
else
fluid
.
CPUPlace
()
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录