Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Crayon鑫
Paddle
提交
c8bd5210
P
Paddle
项目概览
Crayon鑫
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c8bd5210
编写于
10月 18, 2018
作者:
Q
Qiao Longfei
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add reader thread status
上级
71cbc8bd
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
22 addition
and
10 deletion
+22
-10
paddle/fluid/operators/reader/ctr_reader.cc
paddle/fluid/operators/reader/ctr_reader.cc
+5
-0
paddle/fluid/operators/reader/ctr_reader.h
paddle/fluid/operators/reader/ctr_reader.h
+17
-10
未找到文件。
paddle/fluid/operators/reader/ctr_reader.cc
浏览文件 @
c8bd5210
...
@@ -124,7 +124,10 @@ class MultiGzipReader : public Reader {
...
@@ -124,7 +124,10 @@ class MultiGzipReader : public Reader {
void
ReadThread
(
const
std
::
vector
<
std
::
string
>&
file_list
,
void
ReadThread
(
const
std
::
vector
<
std
::
string
>&
file_list
,
const
std
::
vector
<
std
::
string
>&
slots
,
int
batch_size
,
const
std
::
vector
<
std
::
string
>&
slots
,
int
batch_size
,
int
thread_id
,
std
::
vector
<
ReaderThreadStatus
>*
thread_status
,
std
::
shared_ptr
<
LoDTensorBlockingQueue
>
queue
)
{
std
::
shared_ptr
<
LoDTensorBlockingQueue
>
queue
)
{
(
*
thread_status
)[
thread_id
]
=
Running
;
std
::
string
line
;
std
::
string
line
;
std
::
vector
<
std
::
unordered_map
<
std
::
string
,
std
::
vector
<
int64_t
>>>
batch_data
;
std
::
vector
<
std
::
unordered_map
<
std
::
string
,
std
::
vector
<
int64_t
>>>
batch_data
;
...
@@ -181,6 +184,8 @@ void ReadThread(const std::vector<std::string>& file_list,
...
@@ -181,6 +184,8 @@ void ReadThread(const std::vector<std::string>& file_list,
queue
->
Push
(
lod_datas
);
queue
->
Push
(
lod_datas
);
}
}
(
*
thread_status
)[
thread_id
]
=
Stopped
;
}
}
}
// namespace reader
}
// namespace reader
...
...
paddle/fluid/operators/reader/ctr_reader.h
浏览文件 @
c8bd5210
...
@@ -30,8 +30,11 @@ namespace paddle {
...
@@ -30,8 +30,11 @@ namespace paddle {
namespace
operators
{
namespace
operators
{
namespace
reader
{
namespace
reader
{
enum
ReaderThreadStatus
{
Running
,
Stopped
};
void
ReadThread
(
const
std
::
vector
<
std
::
string
>&
file_list
,
void
ReadThread
(
const
std
::
vector
<
std
::
string
>&
file_list
,
const
std
::
vector
<
std
::
string
>&
slots
,
int
batch_size
,
const
std
::
vector
<
std
::
string
>&
slots
,
int
batch_size
,
int
thread_id
,
std
::
vector
<
ReaderThreadStatus
>*
thread_status
,
std
::
shared_ptr
<
LoDTensorBlockingQueue
>
queue
);
std
::
shared_ptr
<
LoDTensorBlockingQueue
>
queue
);
class
CTRReader
:
public
framework
::
FileReader
{
class
CTRReader
:
public
framework
::
FileReader
{
...
@@ -40,13 +43,16 @@ class CTRReader : public framework::FileReader {
...
@@ -40,13 +43,16 @@ class CTRReader : public framework::FileReader {
int
batch_size
,
int
thread_num
,
int
batch_size
,
int
thread_num
,
const
std
::
vector
<
std
::
string
>&
slots
,
const
std
::
vector
<
std
::
string
>&
slots
,
const
std
::
vector
<
std
::
string
>&
file_list
)
const
std
::
vector
<
std
::
string
>&
file_list
)
:
thread_num_
(
thread_num
),
:
batch_size_
(
batch_size
),
slots_
(
slots
),
file_list_
(
file_list
)
{
batch_size_
(
batch_size
),
slots_
(
slots
),
file_list_
(
file_list
)
{
PADDLE_ENFORCE
(
queue
!=
nullptr
,
"LoDTensorBlockingQueue must not be null"
);
PADDLE_ENFORCE
(
queue
!=
nullptr
,
"LoDTensorBlockingQueue must not be null"
);
PADDLE_ENFORCE_GT
(
file_list
.
size
(),
0
,
"file list should not be empty"
);
thread_num_
=
file_list_
.
size
()
>
thread_num_
?
thread_num_
:
file_list_
.
size
();
queue_
=
queue
;
queue_
=
queue
;
SplitFiles
();
SplitFiles
();
for
(
int
i
=
0
;
i
<
thread_num
;
++
i
)
{
read_thread_status_
.
push_back
(
Stopped
);
}
}
}
~
CTRReader
()
{
queue_
->
Close
();
}
~
CTRReader
()
{
queue_
->
Close
();
}
...
@@ -69,28 +75,29 @@ class CTRReader : public framework::FileReader {
...
@@ -69,28 +75,29 @@ class CTRReader : public framework::FileReader {
void
Start
()
override
{
void
Start
()
override
{
VLOG
(
3
)
<<
"Start reader"
;
VLOG
(
3
)
<<
"Start reader"
;
queue_
->
ReOpen
();
queue_
->
ReOpen
();
for
(
int
i
=
0
;
i
<
file_groups_
.
size
();
i
++
)
{
for
(
int
thread_id
=
0
;
thread_id
<
file_groups_
.
size
();
thread_id
++
)
{
read_threads_
.
emplace_back
(
new
std
::
thread
(
std
::
bind
(
read_threads_
.
emplace_back
(
new
std
::
thread
(
&
ReadThread
,
file_groups_
[
i
],
slots_
,
batch_size_
,
queue_
)));
std
::
bind
(
&
ReadThread
,
file_groups_
[
thread_id
],
slots_
,
batch_size_
,
thread_id
,
&
read_thread_status_
,
queue_
)));
}
}
}
}
private:
private:
void
SplitFiles
()
{
void
SplitFiles
()
{
file_groups_
.
resize
(
file_list_
.
size
()
>
thread_num_
?
thread_num_
file_groups_
.
resize
(
thread_num_
);
:
file_list_
.
size
());
for
(
int
i
=
0
;
i
<
file_list_
.
size
();
++
i
)
{
for
(
int
i
=
0
;
i
<
file_list_
.
size
();
++
i
)
{
file_groups_
[
i
%
thread_num_
].
push_back
(
file_list_
[
i
]);
file_groups_
[
i
%
thread_num_
].
push_back
(
file_list_
[
i
]);
}
}
}
}
private:
private:
const
int
thread_num_
;
int
thread_num_
;
const
int
batch_size_
;
const
int
batch_size_
;
const
std
::
vector
<
std
::
string
>
slots_
;
const
std
::
vector
<
std
::
string
>
slots_
;
const
std
::
vector
<
std
::
string
>
file_list_
;
const
std
::
vector
<
std
::
string
>
file_list_
;
std
::
shared_ptr
<
LoDTensorBlockingQueue
>
queue_
;
std
::
shared_ptr
<
LoDTensorBlockingQueue
>
queue_
;
std
::
vector
<
std
::
unique_ptr
<
std
::
thread
>>
read_threads_
;
std
::
vector
<
std
::
unique_ptr
<
std
::
thread
>>
read_threads_
;
std
::
vector
<
ReaderThreadStatus
>
read_thread_status_
;
std
::
vector
<
std
::
vector
<
std
::
string
>>
file_groups_
;
std
::
vector
<
std
::
vector
<
std
::
string
>>
file_groups_
;
};
};
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录