Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
s920243400
PaddleDetection
提交
442c1503
P
PaddleDetection
项目概览
s920243400
/
PaddleDetection
与 Fork 源项目一致
Fork自
PaddlePaddle / PaddleDetection
通知
2
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
PaddleDetection
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
442c1503
编写于
4月 04, 2018
作者:
F
fengjiayi
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
a draft of ThreadedReader
上级
6dcfd97a
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
125 addition
and
0 deletion
+125
-0
paddle/fluid/operators/reader/create_threaded_reader_op.cc
paddle/fluid/operators/reader/create_threaded_reader_op.cc
+125
-0
未找到文件。
paddle/fluid/operators/reader/create_threaded_reader_op.cc
0 → 100644
浏览文件 @
442c1503
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/operators/detail/safe_ref.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h"
namespace
paddle
{
namespace
operators
{
namespace
reader
{
class
ThreadedReader
:
public
framework
::
DecoratedReader
{
public:
ThreadedReader
(
ReaderBase
*
reader
,
bool
unsafe_mode
)
:
DecoratedReader
(
reader
),
unsafe_mode_
(
unsafe_mode
)
{}
void
ReadNext
(
std
::
vector
<
framework
::
LoDTensor
>*
out
)
override
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex_
);
if
(
!
unsafe_mode
)
{
if
(
!
reader_
->
HasNext
())
{
PADDLE_THROW
(
"There is no next data!"
);
}
reader_
->
ReadNext
(
out
);
}
else
{
auto
&
thread_buffer
=
thread_buffers_
[
std
::
this_thread
::
get_id
()];
if
(
thread_buffer
.
empty
())
{
PADDLE_THROW
(
"thread_buffer is empty! HasNext() must be invoked before "
"ReadNext() in the same thread."
);
}
*
out
=
thread_buffer
;
thread_buffer
.
clear
();
}
}
bool
HasNext
()
const
override
{
if
(
!
unsafe_mode_
)
{
PADDLE_THROW
(
"ThreadedReader::HasNext() is disabled when 'unsafe_mode' is false."
);
}
std
::
thread
::
id
thread_id
=
std
::
this_thread
::
get_id
();
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex_
);
auto
&
thread_buffer
=
thread_buffers_
[
thread_id
];
if
(
thread_buffer
.
empty
()
&&
reader_
->
HasNext
())
{
reader_
->
ReadNext
(
&
thread_buffer
);
}
return
!
threda_buffer
.
empty
();
}
void
ReInit
()
override
;
~
ThreadedReader
()
{
for
(
auto
&
p
:
thread_buffers_
)
{
if
(
!
p
.
second
.
empty
())
{
PADDLE_THROW
(
"Find an unused data batch in ThreadedReader! Maybe one thread "
"invokes 'HasNext()' without subsequent 'ReadNext()'."
);
}
}
}
private:
mutable
std
::
mutex
mutex_
;
mutable
std
::
unordered_map
<
std
::
thread
::
id
,
std
::
vector
<
framework
::
LoDTensor
>>
thread_buffers_
;
};
class
CreateThreadedReaderOp
:
public
framework
::
OperatorBase
{
public:
using
framework
::
OperatorBase
::
OperatorBase
;
private:
void
RunImpl
(
const
framework
::
Scope
&
scope
,
const
platform
::
Place
&
dev_place
)
const
override
{
auto
*
out
=
detail
::
Ref
(
scope
.
FindVar
(
Output
(
"Out"
)))
.
GetMutable
<
framework
::
ReaderHolder
>
();
if
(
out
->
Get
()
!=
nullptr
)
{
return
;
}
const
auto
&
underlying_reader
=
scope
.
FindVar
(
Input
(
"UnderlyingReader"
))
->
Get
<
framework
::
ReaderHolder
>
();
bool
unsafe_mode
=
Attr
<
bool
>
(
"unsafe_mode"
);
out
->
Reset
(
new
ThreadedReader
(
underlying_reader
.
Get
(),
unsafe_mode
));
}
};
class
CreateThreadedReaderOpMaker
:
public
DecoratedReaderMakerBase
{
public:
CreateThreadedReaderOpMaker
(
OpProto
*
op_proto
,
OpAttrChecker
*
op_checker
)
:
DecoratedReaderMakerBase
(
op_proto
,
op_checker
)
{
AddAttr
<
bool
>
(
"unsafe_mode"
,
"When 'unsafe_mode' is false, invoking 'HasNext()' or "
"'ReInit()' is not allowed to avoid unexpected bugs in "
"multi-thread environment."
)
.
SetDefault
(
false
);
AddComment
(
R"DOC(
CreateThreadedReader Operator
This operator creates a threaded reader. A threaded reader's
'ReadNext()' can be invoked by several threads at the same
time.
When the attribute 'unsafe_mode' is false, the threaded reader's
'HasNext()' and 'ReInit()' will be disabled to avoid unexpected
bugs in multi-thread environment. If you really need them, you
can enable them by setting 'unsafe_mode' true. In this case,
'HasNext()' returning true only guarantees the safety of
invoking 'ReadNext()' in the same thread. Each thread must
invoke 'HasNext()' and 'ReadNext()' in pair.
)DOC"
)
}
};
}
// namespace reader
}
// namespace operators
}
// namespace paddle
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录