Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
2945a98e
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2945a98e
编写于
4月 01, 2018
作者:
F
fengjiayi
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Make MultipleReader thread-safe
上级
ef802ce9
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
25 addition
and
10 deletion
+25
-10
paddle/fluid/operators/reader/open_files_op.cc
paddle/fluid/operators/reader/open_files_op.cc
+25
-10
未找到文件。
paddle/fluid/operators/reader/open_files_op.cc
浏览文件 @
2945a98e
...
@@ -21,6 +21,22 @@ namespace reader {
...
@@ -21,6 +21,22 @@ namespace reader {
class
MultipleReader
:
public
framework
::
ReaderBase
{
class
MultipleReader
:
public
framework
::
ReaderBase
{
public:
public:
class
ThreadBufferMap
{
public:
std
::
vector
<
framework
::
LoDTensor
>&
operator
[](
const
std
::
thread
::
id
&
thread_id
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex_
);
return
buffer_
[
thread_id
];
}
void
Clear
()
{
buffer_
.
clear
();
}
private:
std
::
mutex
mutex_
;
std
::
unordered_map
<
std
::
thread
::
id
,
std
::
vector
<
framework
::
LoDTensor
>>
buffer_
;
};
MultipleReader
(
const
std
::
vector
<
std
::
string
>&
file_names
,
MultipleReader
(
const
std
::
vector
<
std
::
string
>&
file_names
,
const
std
::
vector
<
framework
::
DDim
>&
dims
,
size_t
thread_num
)
const
std
::
vector
<
framework
::
DDim
>&
dims
,
size_t
thread_num
)
:
file_names_
(
file_names
),
dims_
(
dims
)
{
:
file_names_
(
file_names
),
dims_
(
dims
)
{
...
@@ -47,28 +63,27 @@ class MultipleReader : public framework::ReaderBase {
...
@@ -47,28 +63,27 @@ class MultipleReader : public framework::ReaderBase {
framework
::
Channel
<
size_t
>*
waiting_file_idx_
;
framework
::
Channel
<
size_t
>*
waiting_file_idx_
;
framework
::
Channel
<
size_t
>*
available_thread_idx_
;
framework
::
Channel
<
size_t
>*
available_thread_idx_
;
framework
::
Channel
<
std
::
vector
<
framework
::
LoDTensor
>>*
buffer_
;
framework
::
Channel
<
std
::
vector
<
framework
::
LoDTensor
>>*
buffer_
;
mutable
std
::
vector
<
framework
::
LoDTensor
>
local_buffer
_
;
mutable
ThreadBufferMap
thread_buffer_map
_
;
};
};
void
MultipleReader
::
ReadNext
(
std
::
vector
<
framework
::
LoDTensor
>*
out
)
{
void
MultipleReader
::
ReadNext
(
std
::
vector
<
framework
::
LoDTensor
>*
out
)
{
if
(
!
HasNext
())
{
if
(
!
HasNext
())
{
PADDLE_THROW
(
"There is no next data!"
);
PADDLE_THROW
(
"There is no next data!"
);
}
}
auto
&
thread_local_buffer
=
thread_buffer_map_
[
std
::
this_thread
::
get_id
()];
if
(
local_buffer_
.
empty
())
{
*
out
=
thread_local_buffer
;
buffer_
->
Receive
(
&
local_buffer_
);
thread_local_buffer
.
clear
();
}
*
out
=
local_buffer_
;
local_buffer_
.
clear
();
}
}
bool
MultipleReader
::
HasNext
()
const
{
bool
MultipleReader
::
HasNext
()
const
{
return
local_buffer_
.
empty
()
?
buffer_
->
Receive
(
&
local_buffer_
)
:
true
;
auto
&
thread_local_buffer
=
thread_buffer_map_
[
std
::
this_thread
::
get_id
()];
return
thread_local_buffer
.
empty
()
?
buffer_
->
Receive
(
&
thread_local_buffer
)
:
true
;
}
}
void
MultipleReader
::
ReInit
()
{
void
MultipleReader
::
ReInit
()
{
EndScheduler
();
EndScheduler
();
local_buffer_
.
c
lear
();
thread_buffer_map_
.
C
lear
();
StartNewScheduler
();
StartNewScheduler
();
}
}
...
@@ -176,7 +191,7 @@ class OpenFilesOp : public framework::OperatorBase {
...
@@ -176,7 +191,7 @@ class OpenFilesOp : public framework::OperatorBase {
const
auto
&
ranks
=
Attr
<
std
::
vector
<
int
>>
(
"ranks"
);
const
auto
&
ranks
=
Attr
<
std
::
vector
<
int
>>
(
"ranks"
);
PADDLE_ENFORCE
(
!
shape_concat
.
empty
()
&&
!
ranks
.
empty
());
PADDLE_ENFORCE
(
!
shape_concat
.
empty
()
&&
!
ranks
.
empty
());
PADDLE_ENFORCE_EQ
(
std
::
accumulate
(
ranks
.
begin
(),
ranks
.
end
(),
0
),
PADDLE_ENFORCE_EQ
(
std
::
accumulate
(
ranks
.
begin
(),
ranks
.
end
(),
0
),
int
(
shape_concat
.
size
()),
static_cast
<
int
>
(
shape_concat
.
size
()),
"The accumulate of all ranks should be equal to the "
"The accumulate of all ranks should be equal to the "
"shape concat's length."
);
"shape concat's length."
);
const
auto
&
file_names
=
Attr
<
std
::
vector
<
std
::
string
>>
(
"file_names"
);
const
auto
&
file_names
=
Attr
<
std
::
vector
<
std
::
string
>>
(
"file_names"
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录