Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
c545f1ed
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c545f1ed
编写于
2月 25, 2019
作者:
S
sneaxiy
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
unify API
test=develop
上级
a8c4324d
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
164 addition
and
304 deletion
+164
-304
paddle/fluid/API.spec
paddle/fluid/API.spec
+9
-1
paddle/fluid/framework/reader.h
paddle/fluid/framework/reader.h
+5
-48
paddle/fluid/operators/reader/CMakeLists.txt
paddle/fluid/operators/reader/CMakeLists.txt
+1
-2
paddle/fluid/operators/reader/blocking_queue.h
paddle/fluid/operators/reader/blocking_queue.h
+0
-5
paddle/fluid/operators/reader/compose_reader.cc
paddle/fluid/operators/reader/compose_reader.cc
+0
-39
paddle/fluid/operators/reader/compose_reader.h
paddle/fluid/operators/reader/compose_reader.h
+0
-34
paddle/fluid/operators/reader/py_reader.cc
paddle/fluid/operators/reader/py_reader.cc
+0
-37
paddle/fluid/operators/reader/py_reader.h
paddle/fluid/operators/reader/py_reader.h
+0
-18
paddle/fluid/pybind/pybind.cc
paddle/fluid/pybind/pybind.cc
+0
-30
paddle/fluid/pybind/reader_py.cc
paddle/fluid/pybind/reader_py.cc
+3
-5
python/paddle/fluid/layers/io.py
python/paddle/fluid/layers/io.py
+4
-11
python/paddle/fluid/reader.py
python/paddle/fluid/reader.py
+112
-28
python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py
.../paddle/fluid/tests/unittests/test_decoupled_py_reader.py
+30
-46
未找到文件。
paddle/fluid/API.spec
浏览文件 @
c545f1ed
...
...
@@ -10,6 +10,9 @@ paddle.fluid.default_startup_program ArgSpec(args=[], varargs=None, keywords=Non
paddle.fluid.default_main_program ArgSpec(args=[], varargs=None, keywords=None, defaults=None)
paddle.fluid.program_guard ArgSpec(args=['main_program', 'startup_program'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.name_scope ArgSpec(args=['prefix'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.cuda_places ArgSpec(args=['device_ids'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.cpu_places ArgSpec(args=['device_count'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.cuda_pinned_places ArgSpec(args=['device_count'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.Executor.__init__ ArgSpec(args=['self', 'place'], varargs=None, keywords=None, defaults=None)
paddle.fluid.Executor.close ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
paddle.fluid.Executor.run ArgSpec(args=['self', 'program', 'feed', 'fetch_list', 'feed_var_name', 'fetch_var_name', 'scope', 'return_numpy', 'use_program_cache'], varargs=None, keywords=None, defaults=(None, None, None, 'feed', 'fetch', None, True, False))
...
...
@@ -44,7 +47,7 @@ paddle.fluid.AsyncExecutor.run ArgSpec(args=['self', 'program', 'data_feed', 'fi
paddle.fluid.AsyncExecutor.save_model ArgSpec(args=['self', 'save_path'], varargs=None, keywords=None, defaults=None)
paddle.fluid.AsyncExecutor.stop ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
paddle.fluid.CompiledProgram.__init__ ArgSpec(args=['self', 'program'], varargs=None, keywords=None, defaults=None)
paddle.fluid.CompiledProgram.with_data_parallel ArgSpec(args=['self', 'loss_name', 'build_strategy', 'exec_strategy', 'share_vars_from'
], varargs=None, keywords=None, defaults=(
None, None, None, None))
paddle.fluid.CompiledProgram.with_data_parallel ArgSpec(args=['self', 'loss_name', 'build_strategy', 'exec_strategy', 'share_vars_from'
, 'places'], varargs=None, keywords=None, defaults=(None,
None, None, None, None))
paddle.fluid.CompiledProgram.with_inference_optimize ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=None)
paddle.fluid.ExecutionStrategy.__init__ __init__(self: paddle.fluid.core.ParallelExecutor.ExecutionStrategy) -> None
paddle.fluid.BuildStrategy.GradientScaleStrategy.__init__ __init__(self: paddle.fluid.core.ParallelExecutor.BuildStrategy.GradientScaleStrategy, arg0: int) -> None
...
...
@@ -58,6 +61,11 @@ paddle.fluid.io.load_params ArgSpec(args=['executor', 'dirname', 'main_program',
paddle.fluid.io.load_persistables ArgSpec(args=['executor', 'dirname', 'main_program', 'filename'], varargs=None, keywords=None, defaults=(None, None))
paddle.fluid.io.save_inference_model ArgSpec(args=['dirname', 'feeded_var_names', 'target_vars', 'executor', 'main_program', 'model_filename', 'params_filename', 'export_for_deployment'], varargs=None, keywords=None, defaults=(None, None, None, True))
paddle.fluid.io.load_inference_model ArgSpec(args=['dirname', 'executor', 'model_filename', 'params_filename', 'pserver_endpoints'], varargs=None, keywords=None, defaults=(None, None, None))
paddle.fluid.io.PyReader.__init__ ArgSpec(args=['self', 'feed_list', 'capacity', 'use_double_buffer', 'iterable'], varargs=None, keywords=None, defaults=(True, True))
paddle.fluid.io.PyReader.decorate_paddle_reader ArgSpec(args=['self', 'reader', 'places'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.io.PyReader.decorate_tensor_provider ArgSpec(args=['self', 'reader', 'places'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.io.PyReader.reset ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
paddle.fluid.io.PyReader.start ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
paddle.fluid.initializer.ConstantInitializer.__init__ ArgSpec(args=['self', 'value', 'force_cpu'], varargs=None, keywords=None, defaults=(0.0, False))
paddle.fluid.initializer.UniformInitializer.__init__ ArgSpec(args=['self', 'low', 'high', 'seed'], varargs=None, keywords=None, defaults=(-1.0, 1.0, 0))
paddle.fluid.initializer.NormalInitializer.__init__ ArgSpec(args=['self', 'loc', 'scale', 'seed'], varargs=None, keywords=None, defaults=(0.0, 1.0, 0))
...
...
paddle/fluid/framework/reader.h
浏览文件 @
c545f1ed
...
...
@@ -54,7 +54,6 @@ class ReaderBase {
private:
friend
class
DecoratedReader
;
friend
class
MultiDecoratedReader
;
// These methods can be only invoked inside DecoratedReader to record the
// decorating chain.
void
InsertDecoratedReader
(
...
...
@@ -63,20 +62,15 @@ class ReaderBase {
std
::
vector
<
std
::
weak_ptr
<
ReaderBase
>>
decorated_readers_
;
};
class
DecoratedReaderBase
:
public
ReaderBase
{
public:
virtual
void
RegisterDecorateChain
()
=
0
;
};
class
DecoratedReader
:
public
DecoratedReaderBase
,
class
DecoratedReader
:
public
ReaderBase
,
public
std
::
enable_shared_from_this
<
DecoratedReader
>
{
public:
explicit
DecoratedReader
(
const
std
::
shared_ptr
<
ReaderBase
>&
reader
)
:
Decorated
ReaderBase
(),
reader_
(
reader
)
{
:
ReaderBase
(),
reader_
(
reader
)
{
PADDLE_ENFORCE_NOT_NULL
(
reader_
);
}
void
RegisterDecorateChain
()
final
{
void
RegisterDecorateChain
()
{
reader_
->
InsertDecoratedReader
(
shared_from_this
());
}
...
...
@@ -90,41 +84,6 @@ class DecoratedReader : public DecoratedReaderBase,
std
::
shared_ptr
<
ReaderBase
>
reader_
;
};
class
MultiDecoratedReader
:
public
DecoratedReaderBase
,
public
std
::
enable_shared_from_this
<
MultiDecoratedReader
>
{
public:
explicit
MultiDecoratedReader
(
const
std
::
vector
<
std
::
shared_ptr
<
ReaderBase
>>&
readers
)
:
readers_
(
readers
)
{
PADDLE_ENFORCE
(
!
readers_
.
empty
());
for
(
auto
&
r
:
readers_
)
{
PADDLE_ENFORCE_NOT_NULL
(
r
);
}
}
void
RegisterDecorateChain
()
final
{
for
(
auto
&
r
:
readers_
)
{
r
->
InsertDecoratedReader
(
shared_from_this
());
}
}
protected:
void
ShutdownImpl
()
override
{
for
(
auto
&
r
:
readers_
)
{
r
->
Shutdown
();
}
}
void
StartImpl
()
override
{
for
(
auto
&
r
:
readers_
)
{
r
->
Start
();
}
}
std
::
vector
<
std
::
shared_ptr
<
ReaderBase
>>
readers_
;
};
// FileReader is just a conceptual class.
class
FileReader
:
public
ReaderBase
{};
...
...
@@ -173,10 +132,8 @@ class ReaderHolder {
};
template
<
typename
T
,
typename
...
ARGS
>
inline
std
::
shared_ptr
<
DecoratedReaderBase
>
MakeDecoratedReader
(
ARGS
&&
...
args
)
{
std
::
shared_ptr
<
DecoratedReaderBase
>
reader
(
new
T
(
std
::
forward
<
ARGS
>
(
args
)...));
inline
std
::
shared_ptr
<
DecoratedReader
>
MakeDecoratedReader
(
ARGS
&&
...
args
)
{
std
::
shared_ptr
<
DecoratedReader
>
reader
(
new
T
(
std
::
forward
<
ARGS
>
(
args
)...));
reader
->
RegisterDecorateChain
();
return
reader
;
}
...
...
paddle/fluid/operators/reader/CMakeLists.txt
浏览文件 @
c545f1ed
...
...
@@ -18,7 +18,6 @@ function(reader_library TARGET_NAME)
endfunction
()
cc_library
(
py_reader SRCS py_reader.cc DEPS reader
)
cc_library
(
compose_reader SRCS compose_reader.cc DEPS reader
)
cc_library
(
buffered_reader SRCS buffered_reader.cc DEPS reader simple_threadpool
)
reader_library
(
open_files_op SRCS open_files_op.cc DEPS buffered_reader
)
...
...
@@ -41,7 +40,7 @@ cc_test(reader_blocking_queue_test SRCS reader_blocking_queue_test.cc)
# Export local libraries to parent
# set(READER_LIBRARY ${LOCAL_READER_LIBS} PARENT_SCOPE)
op_library
(
read_op DEPS py_reader
compose_reader
buffered_reader
)
op_library
(
read_op DEPS py_reader buffered_reader
)
foreach
(
src
${
LOCAL_READER_LIBS
}
)
set
(
OP_LIBRARY
${
src
}
${
OP_LIBRARY
}
CACHE INTERNAL
"op libs"
)
...
...
paddle/fluid/operators/reader/blocking_queue.h
浏览文件 @
c545f1ed
...
...
@@ -114,11 +114,6 @@ class BlockingQueue {
return
queue_
.
size
();
}
void
Clear
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex_
);
queue_
.
clear
();
}
private:
size_t
capacity_
;
bool
speed_test_mode_
;
...
...
paddle/fluid/operators/reader/compose_reader.cc
已删除
100644 → 0
浏览文件 @
a8c4324d
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/operators/reader/compose_reader.h"
namespace
paddle
{
namespace
operators
{
namespace
reader
{
ComposeReader
::
ComposeReader
(
const
std
::
vector
<
std
::
shared_ptr
<
framework
::
ReaderBase
>>
&
readers
)
:
framework
::
MultiDecoratedReader
(
readers
)
{}
void
ComposeReader
::
ReadNext
(
std
::
vector
<
framework
::
LoDTensor
>
*
out
)
{
out
->
clear
();
std
::
vector
<
framework
::
LoDTensor
>
each_ret
;
for
(
auto
&
r
:
readers_
)
{
r
->
ReadNext
(
&
each_ret
);
out
->
reserve
(
out
->
size
()
+
each_ret
.
size
());
for
(
auto
&
data
:
each_ret
)
{
out
->
emplace_back
(
std
::
move
(
data
));
}
}
}
}
// namespace reader
}
// namespace operators
}
// namespace paddle
paddle/fluid/operators/reader/compose_reader.h
已删除
100644 → 0
浏览文件 @
a8c4324d
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <vector>
#include "paddle/fluid/framework/reader.h"
namespace
paddle
{
namespace
operators
{
namespace
reader
{
class
ComposeReader
:
public
framework
::
MultiDecoratedReader
{
public:
explicit
ComposeReader
(
const
std
::
vector
<
std
::
shared_ptr
<
framework
::
ReaderBase
>>
&
readers
);
void
ReadNext
(
std
::
vector
<
framework
::
LoDTensor
>
*
out
)
override
;
};
}
// namespace reader
}
// namespace operators
}
// namespace paddle
paddle/fluid/operators/reader/py_reader.cc
浏览文件 @
c545f1ed
...
...
@@ -36,43 +36,6 @@ void PyReader::Shutdown() { queue_->Close(); }
void
PyReader
::
Start
()
{
queue_
->
ReOpen
();
}
MultiQueuePyReader
::
MultiQueuePyReader
(
const
std
::
vector
<
std
::
shared_ptr
<
LoDTensorBlockingQueue
>>&
queues
)
:
queues_
(
queues
)
{
PADDLE_ENFORCE
(
!
queues_
.
empty
());
for
(
auto
&
q
:
queues_
)
{
PADDLE_ENFORCE_NOT_NULL
(
q
);
}
}
void
MultiQueuePyReader
::
ReadNext
(
std
::
vector
<
framework
::
LoDTensor
>*
out
)
{
auto
idx
=
read_out_idx_
.
fetch_add
(
1
)
%
queues_
.
size
();
for
(
size_t
i
=
0
;
i
<
queues_
.
size
();
++
i
)
{
*
out
=
queues_
[
idx
]
->
Pop
();
if
(
!
out
->
empty
())
return
;
idx
=
(
idx
+
1
)
%
queues_
.
size
();
}
}
MultiQueuePyReader
::~
MultiQueuePyReader
()
{
for
(
auto
&
q
:
queues_
)
{
q
->
Close
();
}
}
void
MultiQueuePyReader
::
Shutdown
()
{
for
(
auto
&
q
:
queues_
)
{
q
->
Close
();
}
read_out_idx_
.
store
(
0
,
std
::
memory_order
::
memory_order_seq_cst
);
}
void
MultiQueuePyReader
::
Start
()
{
for
(
auto
&
q
:
queues_
)
{
q
->
ReOpen
();
}
}
}
// namespace reader
}
// namespace operators
}
// namespace paddle
paddle/fluid/operators/reader/py_reader.h
浏览文件 @
c545f1ed
...
...
@@ -39,24 +39,6 @@ class PyReader : public framework::FileReader {
std
::
shared_ptr
<
LoDTensorBlockingQueue
>
queue_
;
};
class
MultiQueuePyReader
:
public
framework
::
FileReader
{
public:
explicit
MultiQueuePyReader
(
const
std
::
vector
<
std
::
shared_ptr
<
LoDTensorBlockingQueue
>>&
queues
);
void
ReadNext
(
std
::
vector
<
framework
::
LoDTensor
>*
out
)
override
;
~
MultiQueuePyReader
();
void
Shutdown
()
override
;
void
Start
()
override
;
private:
std
::
vector
<
std
::
shared_ptr
<
LoDTensorBlockingQueue
>>
queues_
;
std
::
atomic
<
size_t
>
read_out_idx_
{
0
};
};
}
// namespace reader
}
// namespace operators
}
// namespace paddle
paddle/fluid/pybind/pybind.cc
浏览文件 @
c545f1ed
...
...
@@ -547,11 +547,6 @@ All parameter, weight, gradient are variables in Paddle.
using
LoDTensorBlockingQueueHolder
=
::
paddle
::
operators
::
reader
::
LoDTensorBlockingQueueHolder
;
using
LockFreeLoDTensorBlockingQueue
=
::
paddle
::
operators
::
reader
::
LockFreeLoDTensorBlockingQueue
;
using
LockFreeLoDTensorBlockingQueueHolder
=
::
paddle
::
operators
::
reader
::
LockFreeLoDTensorBlockingQueueHolder
;
py
::
class_
<
LoDTensorBlockingQueue
,
std
::
shared_ptr
<
LoDTensorBlockingQueue
>>
(
m
,
"LoDTensorBlockingQueue"
,
""
)
.
def
(
"push"
,
...
...
@@ -565,20 +560,6 @@ All parameter, weight, gradient are variables in Paddle.
.
def
(
"close"
,
&
LoDTensorBlockingQueue
::
Close
)
.
def
(
"is_closed"
,
&
LoDTensorBlockingQueue
::
IsClosed
);
py
::
class_
<
LockFreeLoDTensorBlockingQueue
,
std
::
shared_ptr
<
LockFreeLoDTensorBlockingQueue
>>
(
m
,
"LockFreeLoDTensorBlockingQueue"
,
""
)
.
def
(
"push"
,
[](
LockFreeLoDTensorBlockingQueue
&
self
,
std
::
vector
<
framework
::
LoDTensor
>
&
lod_tensor_vec
)
{
pybind11
::
gil_scoped_release
release
;
return
self
.
Push
(
std
::
move
(
lod_tensor_vec
));
})
.
def
(
"size"
,
&
LockFreeLoDTensorBlockingQueue
::
Size
)
.
def
(
"capacity"
,
&
LockFreeLoDTensorBlockingQueue
::
Cap
)
.
def
(
"close"
,
&
LockFreeLoDTensorBlockingQueue
::
Close
)
.
def
(
"is_closed"
,
&
LockFreeLoDTensorBlockingQueue
::
IsClosed
);
m
.
def
(
"init_lod_tensor_blocking_queue"
,
[](
Variable
&
var
,
size_t
capacity
)
->
std
::
shared_ptr
<
LoDTensorBlockingQueue
>
{
...
...
@@ -588,15 +569,6 @@ All parameter, weight, gradient are variables in Paddle.
},
py
::
return_value_policy
::
copy
);
m
.
def
(
"init_lock_free_lod_tensor_blocking_queue"
,
[](
Variable
&
var
,
size_t
capacity
)
->
std
::
shared_ptr
<
LockFreeLoDTensorBlockingQueue
>
{
auto
*
holder
=
var
.
GetMutable
<
LockFreeLoDTensorBlockingQueueHolder
>
();
holder
->
InitOnce
(
capacity
);
return
holder
->
GetQueue
();
},
py
::
return_value_policy
::
copy
);
py
::
class_
<
Scope
>
(
m
,
"_Scope"
,
R"DOC(
Scope is an association of a name to Variable. All variables belong to Scope.
...
...
@@ -777,8 +749,6 @@ All parameter, weight, gradient are variables in Paddle.
.
def
(
"_equals"
,
&
IsSamePlace
<
platform
::
CUDAPlace
,
platform
::
CPUPlace
>
)
.
def
(
"_equals"
,
&
IsSamePlace
<
platform
::
CUDAPlace
,
platform
::
CUDAPinnedPlace
>
)
.
def
(
"gpu_device_id"
,
[](
platform
::
CUDAPlace
&
self
)
{
return
self
.
device
;
})
.
def
(
"__str__"
,
string
::
to_string
<
const
platform
::
CUDAPlace
&>
);
py
::
class_
<
paddle
::
platform
::
CPUPlace
>
(
m
,
"CPUPlace"
)
...
...
paddle/fluid/pybind/reader_py.cc
浏览文件 @
c545f1ed
...
...
@@ -17,7 +17,6 @@
#include <vector>
#include "paddle/fluid/framework/reader.h"
#include "paddle/fluid/operators/reader/buffered_reader.h"
#include "paddle/fluid/operators/reader/compose_reader.h"
#include "paddle/fluid/operators/reader/py_reader.h"
#include "paddle/fluid/platform/place.h"
#include "pybind11/stl.h"
...
...
@@ -82,7 +81,6 @@ class MultiDeviceFeedReader {
void
Reset
()
{
Shutdown
();
Start
();
ReadAsync
();
}
...
...
@@ -117,14 +115,14 @@ class MultiDeviceFeedReader {
}
}
std
::
shared_ptr
<
operators
::
reader
::
LoDTensorBlockingQueue
>
queue_
;
std
::
vector
<
std
::
string
>
names_
;
std
::
unique_ptr
<::
ThreadPool
>
pool_
;
std
::
shared_ptr
<
operators
::
reader
::
LoDTensorBlockingQueue
>
queue_
;
std
::
vector
<
std
::
unique_ptr
<
framework
::
ReaderHolder
>>
readers_
;
std
::
vector
<
std
::
future
<
bool
>>
futures_
;
std
::
vector
<
std
::
vector
<
framework
::
LoDTensor
>>
ret_
;
bool
drop_last_
;
};
namespace
py
=
pybind11
;
...
...
@@ -150,7 +148,7 @@ void BindReader(py::module *module) {
const
std
::
vector
<
std
::
string
>
&
names
,
const
std
::
vector
<
platform
::
Place
>
&
dst_places
,
bool
use_double_buffer
)
{
return
new
MultiDeviceFeedReader
(
queue
s
,
names
,
dst_places
,
return
new
MultiDeviceFeedReader
(
queue
,
names
,
dst_places
,
use_double_buffer
);
},
py
::
return_value_policy
::
take_ownership
);
...
...
python/paddle/fluid/layers/io.py
浏览文件 @
c545f1ed
...
...
@@ -486,8 +486,7 @@ def _py_reader(capacity,
lod_levels
=
None
,
name
=
None
,
use_double_buffer
=
True
,
feed_list
=
None
,
lock_free
=
False
):
feed_list
=
None
):
if
feed_list
is
not
None
:
if
not
isinstance
(
feed_list
,
list
):
...
...
@@ -527,11 +526,7 @@ def _py_reader(capacity,
double_buffer_name
=
"_"
.
join
([
name
,
"double_buffer"
])
var
=
global_scope
().
var
(
queue_name
)
if
not
lock_free
:
feed_queue
=
core
.
init_lod_tensor_blocking_queue
(
var
,
capacity
)
else
:
feed_queue
=
core
.
init_lock_free_lod_tensor_blocking_queue
(
var
,
capacity
)
feed_queue
=
core
.
init_lod_tensor_blocking_queue
(
var
,
capacity
)
startup_blk
=
default_startup_program
().
current_block
()
startup_var
=
startup_blk
.
create_var
(
name
=
reader_name
)
...
...
@@ -644,8 +639,7 @@ def py_reader(capacity,
dtypes
,
lod_levels
=
None
,
name
=
None
,
use_double_buffer
=
True
,
lock_free
=
False
):
use_double_buffer
=
True
):
"""
Create a Python reader for data feeding in Python
...
...
@@ -770,8 +764,7 @@ def py_reader(capacity,
dtypes
=
dtypes
,
lod_levels
=
lod_levels
,
name
=
name
,
use_double_buffer
=
use_double_buffer
,
lock_free
=
lock_free
)
use_double_buffer
=
use_double_buffer
)
def
create_py_reader_by_data
(
capacity
,
...
...
python/paddle/fluid/reader.py
浏览文件 @
c545f1ed
...
...
@@ -15,9 +15,11 @@
import
core
import
six
import
threading
from
.framework
import
Program
,
Variable
,
program_guard
from
.framework
import
Program
,
Variable
,
program_guard
,
default_main_program
,
default_startup_program
from
.executor
import
global_scope
from
.data_feeder
import
DataFeeder
import
paddle.reader.decorator
as
decorator
from
.layers.io
import
monkey_patch_reader_methods
,
_copy_reader_var_
,
double_buffer
import
unique_name
__all__
=
[
'PyReader'
]
...
...
@@ -37,30 +39,101 @@ def _convert_places(places):
return
ret
class
PyReader
(
Reader
):
def
__init__
(
self
,
feed_list
,
places
,
capacity
):
class
PyReader
(
object
):
unique_name_generator
=
unique_name
.
UniqueNameGenerator
()
def
__init__
(
self
,
feed_list
,
capacity
,
use_double_buffer
=
True
,
iterable
=
True
):
self
.
_tensor_reader
=
None
self
.
_thread
=
None
# TODO(zjl): to support drop_last = False
self
.
_drop_last
=
True
self
.
_iterable
=
iterable
self
.
_use_double_buffer
=
use_double_buffer
self
.
_capacity
=
capacity
self
.
_feed_list
=
feed_list
self
.
_
var_names
=
[
v
.
name
for
v
in
feed_list
]
self
.
_queues
=
[]
self
.
_
scope
=
global_scope
()
if
not
self
.
_iterable
:
self
.
_init_non_iterable
()
def
_init_iterable
(
self
,
places
):
self
.
_var_names
=
[
v
.
name
for
v
in
self
.
_feed_list
]
self
.
_places
=
_convert_places
(
places
)
self
.
_queue_capacity
=
capacity
self
.
queue
=
core
.
init_lod_tensor_blocking_queue
(
core
.
Variable
(),
self
.
_queue_capacity
)
self
.
_reader
=
core
.
create_py_reader
(
self
.
_queue
,
self
.
_var_names
,
self
.
_places
,
self
.
_drop_last
)
self
.
_queue
=
core
.
init_lod_tensor_blocking_queue
(
core
.
Variable
(),
self
.
_capacity
)
self
.
_reader
=
core
.
create_py_reader
(
self
.
queue
,
self
.
_var_names
,
self
.
_places
,
self
.
_use_double_buffer
)
def
_init_non_iterable
(
self
):
lod_levels
=
[]
dtypes
=
[]
shape_concat
=
[]
ranks
=
[]
shapes
=
[]
for
feed_data
in
self
.
_feed_list
:
dtypes
.
append
(
feed_data
.
dtype
)
shape_concat
.
extend
(
feed_data
.
shape
)
ranks
.
append
(
len
(
feed_data
.
shape
))
shapes
.
append
(
feed_data
.
shape
)
lod_levels
.
append
(
feed_data
.
lod_level
)
queue_name
=
PyReader
.
unique_name_generator
(
'lod_tensor_blocking_queue'
)
reader_name
=
PyReader
.
unique_name_generator
(
'create_py_reader'
)
double_buffer_name
=
PyReader
.
unique_name_generator
(
'double_buffer'
)
var
=
self
.
_scope
.
var
(
queue_name
)
self
.
_queue
=
core
.
init_lod_tensor_blocking_queue
(
var
,
self
.
_capacity
)
startup_blk
=
default_startup_program
().
current_block
()
startup_var
=
startup_blk
.
create_var
(
name
=
reader_name
)
startup_blk
.
append_op
(
type
=
'create_py_reader'
,
inputs
=
{
'blocking_queue'
:
[
queue_name
]},
outputs
=
{
'Out'
:
[
startup_var
]},
attrs
=
{
'shape_concat'
:
shape_concat
,
'lod_levels'
:
lod_levels
,
'ranks'
:
ranks
})
startup_var
.
desc
.
set_dtypes
(
dtypes
)
startup_var
.
persistable
=
True
main_prog_var
=
_copy_reader_var_
(
default_main_program
().
current_block
(),
startup_var
)
main_prog_var
.
stop_gradient
=
True
main_prog_var
.
persistable
=
True
reader
=
monkey_patch_reader_methods
(
main_prog_var
)
if
self
.
_use_double_buffer
:
double_buffer_reader
=
double_buffer
(
reader
,
name
=
double_buffer_name
)
# we return a double buffer reader. However, the reset method comes from
# py_reader.
double_buffer_reader
.
reset
=
reader
.
reset
reader
=
double_buffer_reader
self
.
_reader
=
reader
default_main_program
().
current_block
().
append_op
(
type
=
'read'
,
inputs
=
{
'Reader'
:
[
self
.
_reader
]},
outputs
=
{
'Out'
:
self
.
_feed_list
})
@
property
def
queue
(
self
):
return
self
.
_queue
@
property
def
iterable
(
self
):
return
self
.
_iterable
def
__call__
(
self
):
assert
self
.
iterable
,
"PyReader is not iterable"
assert
self
.
_tensor_reader
is
not
None
,
\
"Data source of PyReader has not set yet"
...
...
@@ -80,13 +153,22 @@ class PyReader(Reader):
self
.
_reset
()
raise
StopIteration
self
.
_start
()
return
Iterator
(
self
)
def
_reset
(
self
):
if
self
.
_thread
:
self
.
_reader
.
reset
()
self
.
_thread
.
join
()
self
.
_reader
.
reset
()
self
.
_thread
.
join
()
def
start
(
self
):
assert
not
self
.
_iterable
,
"start() cannot be called when PyReader is iterable"
self
.
_start
()
def
reset
(
self
):
assert
not
self
.
_iterable
,
"reset() cannot be called when PyReader is iterable"
self
.
_reset
()
def
_start
(
self
):
def
__thread_main__
():
for
tensors
in
self
.
_tensor_reader
():
array
=
core
.
LoDTensorArray
()
...
...
@@ -98,16 +180,16 @@ class PyReader(Reader):
array
.
append
(
item
)
if
not
self
.
queue
.
push
(
array
):
if
not
self
.
_
queue
.
push
(
array
):
break
self
.
queue
.
close
()
self
.
_
queue
.
close
()
self
.
_thread
=
threading
.
Thread
(
target
=
__thread_main__
)
self
.
_thread
.
daemon
=
True
self
.
_thread
.
start
()
def
set_numpy_reader
(
self
,
reader
):
def
decorate_paddle_reader
(
self
,
reader
,
places
=
None
):
assert
self
.
_tensor_reader
is
None
,
\
"Cannot reset the data source of PyReader"
with
program_guard
(
Program
(),
Program
()):
...
...
@@ -119,10 +201,12 @@ class PyReader(Reader):
for
slots
in
paddle_reader
():
yield
[
slots
[
var
.
name
]
for
var
in
self
.
_feed_list
]
self
.
set_tensor_reader
(
__tensor_reader_impl__
)
self
.
decorate_tensor_provider
(
__tensor_reader_impl__
,
places
)
def
set_tensor_reader
(
self
,
reader
):
def
decorate_tensor_provider
(
self
,
reader
,
places
=
None
):
assert
self
.
_tensor_reader
is
None
,
\
"Cannot reset the data source of PyReader"
self
.
_tensor_reader
=
reader
self
.
_reset
()
if
self
.
_iterable
:
assert
places
is
not
None
,
"Places cannot be None when py_reader is iterable"
self
.
_init_iterable
(
places
)
python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py
浏览文件 @
c545f1ed
...
...
@@ -31,35 +31,22 @@ def random_reader():
yield
image
,
label
def
simple_fc_net
(
places
,
use_legacy_py_reader
,
lock_free
=
False
):
def
simple_fc_net
(
places
,
use_legacy_py_reader
,
use_double_buffer
):
startup_prog
=
fluid
.
Program
()
main_prog
=
fluid
.
Program
()
startup_prog
.
random_seed
=
1
main_prog
.
random_seed
=
1
reader
=
paddle
.
batch
(
random_reader
,
batch_size
=
BATCH_SIZE
)
with
fluid
.
unique_name
.
guard
():
with
fluid
.
program_guard
(
main_prog
,
startup_prog
):
if
not
use_legacy_py_reader
:
image
=
fluid
.
layers
.
data
(
name
=
'image'
,
shape
=
[
784
],
dtype
=
'float32'
)
label
=
fluid
.
layers
.
data
(
name
=
'label'
,
shape
=
[
1
],
dtype
=
'int64'
)
py_reader
=
fluid
.
io
.
PyReader
(
feed_list
=
[
image
,
label
],
places
=
places
,
capacity
=
4
,
multi_queue
=
False
)
py_reader
.
set_numpy_reader
(
reader
)
else
:
py_reader
=
fluid
.
layers
.
py_reader
(
capacity
=
4
,
shapes
=
[(
-
1
,
784
),
(
-
1
,
1
)],
dtypes
=
[
'float32'
,
'int64'
],
lock_free
=
lock_free
)
image
,
label
=
fluid
.
layers
.
read_file
(
py_reader
)
py_reader
.
decorate_paddle_reader
(
reader
)
image
=
fluid
.
layers
.
data
(
name
=
'image'
,
shape
=
[
784
],
dtype
=
'float32'
)
label
=
fluid
.
layers
.
data
(
name
=
'label'
,
shape
=
[
1
],
dtype
=
'int64'
)
py_reader
=
fluid
.
io
.
PyReader
(
feed_list
=
[
image
,
label
],
capacity
=
4
,
iterable
=
not
use_legacy_py_reader
,
use_double_buffer
=
use_double_buffer
)
hidden
=
image
for
hidden_size
in
[
10
,
20
,
30
]:
hidden
=
fluid
.
layers
.
fc
(
...
...
@@ -82,11 +69,19 @@ def simple_fc_net(places, use_legacy_py_reader, lock_free=False):
class
TestBase
(
unittest
.
TestCase
):
def
run_main
(
self
,
use_legacy_py_reader
,
with_data_parallel
,
places
):
def
run_main
(
self
,
use_legacy_py_reader
,
with_data_parallel
,
places
,
use_double_buffer
):
scope
=
fluid
.
Scope
()
with
fluid
.
scope_guard
(
scope
):
startup_prog
,
main_prog
,
py_reader
,
loss
=
simple_fc_net
(
places
,
use_legacy_py_reader
)
places
,
use_legacy_py_reader
,
use_double_buffer
)
reader
=
paddle
.
batch
(
random_reader
,
batch_size
=
BATCH_SIZE
)
ps
=
places
if
use_double_buffer
else
fluid
.
cpu_places
(
len
(
places
))
py_reader
.
decorate_paddle_reader
(
reader
,
places
=
ps
if
py_reader
.
iterable
else
None
)
exe
=
fluid
.
Executor
(
place
=
places
[
0
])
exe
.
run
(
startup_prog
)
...
...
@@ -98,7 +93,7 @@ class TestBase(unittest.TestCase):
step
=
0
step_list
=
[]
start_t
=
time
.
time
()
if
use_legacy_py_reader
:
if
not
py_reader
.
iterable
:
for
_
in
six
.
moves
.
range
(
EPOCH_NUM
):
step
=
0
py_reader
.
start
()
...
...
@@ -107,12 +102,9 @@ class TestBase(unittest.TestCase):
L
,
=
exe
.
run
(
program
=
prog
,
fetch_list
=
[
loss
],
use_program_cache
=
True
)
# print('runned', step, py_reader.queue.is_closed(), py_reader.queue.size())
step
+=
1
except
fluid
.
core
.
EOFException
:
# print('try to reset')
py_reader
.
reset
()
# print('reseted')
break
step_list
.
append
(
step
)
else
:
...
...
@@ -125,8 +117,8 @@ class TestBase(unittest.TestCase):
label
=
item
[
'label'
]
assert
image
.
shape
()
==
[
BATCH_SIZE
,
784
]
assert
label
.
shape
()
==
[
BATCH_SIZE
,
1
]
assert
image
.
_place
().
_equals
(
p
lace
s
[
i
])
assert
label
.
_place
().
_equals
(
p
lace
s
[
i
])
assert
image
.
_place
().
_equals
(
ps
[
i
])
assert
label
.
_place
().
_equals
(
ps
[
i
])
L
,
=
exe
.
run
(
program
=
prog
,
feed
=
d
,
fetch_list
=
[
loss
],
...
...
@@ -138,7 +130,7 @@ class TestBase(unittest.TestCase):
scope
.
_remove_from_pool
()
return
ret
def
prepare_places
(
self
,
with_data_parallel
,
with_cpu
=
Fals
e
,
with_gpu
=
True
):
def
prepare_places
(
self
,
with_data_parallel
,
with_cpu
=
Tru
e
,
with_gpu
=
True
):
places
=
[]
if
with_cpu
:
places
.
append
([
fluid
.
CPUPlace
()])
...
...
@@ -156,21 +148,13 @@ class TestBase(unittest.TestCase):
def
test_main
(
self
):
for
with_data_parallel
in
[
True
,
False
]:
for
p
in
self
.
prepare_places
(
with_data_parallel
):
t
=
[]
for
use_legacy_py_reader
in
[
False
]:
#[True, False]: #[False, True]:
print
(
p
,
use_legacy_py_reader
)
ret
=
self
.
run_main
(
use_legacy_py_reader
=
use_legacy_py_reader
,
with_data_parallel
=
with_data_parallel
,
places
=
p
)
ret
[
'legacy'
]
=
use_legacy_py_reader
ret
[
'data_parallel'
]
=
with_data_parallel
ret
[
'places'
]
=
p
t
.
append
([
ret
[
'step'
],
])
#, ret['places']])
print
(
t
)
for
use_double_buffer
in
[
False
,
True
]:
for
use_legacy_py_reader
in
[
False
,
True
]:
ret
=
self
.
run_main
(
use_legacy_py_reader
=
use_legacy_py_reader
,
with_data_parallel
=
with_data_parallel
,
places
=
p
,
use_double_buffer
=
use_double_buffer
)
if
__name__
==
'__main__'
:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录