Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
d2a31fb8
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d2a31fb8
编写于
2月 07, 2018
作者:
C
chengduoZH
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refine unit test
上级
20c4a4cb
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
56 addition
and
98 deletion
+56
-98
paddle/framework/channel_test.cc
paddle/framework/channel_test.cc
+49
-98
paddle/framework/details/buffered_channel.h
paddle/framework/details/buffered_channel.h
+7
-0
未找到文件。
paddle/framework/channel_test.cc
浏览文件 @
d2a31fb8
...
@@ -115,7 +115,7 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) {
...
@@ -115,7 +115,7 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) {
sum
+=
i
;
sum
+=
i
;
}
}
});
});
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
100
));
// wait 0.
5
sec
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
100
));
// wait 0.
1
sec
EXPECT_EQ
(
sum
,
45U
);
EXPECT_EQ
(
sum
,
45U
);
CloseChannel
(
ch
);
CloseChannel
(
ch
);
...
@@ -144,38 +144,34 @@ TEST(Channel, SimpleUnbufferedChannelTest) {
...
@@ -144,38 +144,34 @@ TEST(Channel, SimpleUnbufferedChannelTest) {
delete
ch
;
delete
ch
;
}
}
// This tests that closing a buffered channel also unblocks
void
ChannelCloseUnblocksReceiversTest
(
Channel
<
int
>
*
ch
)
{
// any receivers waiting on the channel
TEST
(
Channel
,
BufferedChannelCloseUnblocksReceiversTest
)
{
auto
ch
=
MakeChannel
<
int
>
(
1
);
size_t
num_threads
=
5
;
size_t
num_threads
=
5
;
std
::
thread
t
[
num_threads
];
std
::
thread
t
[
num_threads
];
bool
thread_ended
[
num_threads
];
bool
thread_ended
[
num_threads
];
// Launches threads that try to read and are blocked because of no writers
// Launches threads that try to read and are blocked because
w
of no writers
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
thread_ended
[
i
]
=
false
;
thread_ended
[
i
]
=
false
;
t
[
i
]
=
std
::
thread
(
t
[
i
]
=
std
::
thread
(
[
&
](
bool
*
p
)
{
[
&
](
bool
*
p
)
{
int
data
;
int
data
;
// All reads should return false
EXPECT_EQ
(
ch
->
Receive
(
&
data
),
false
);
EXPECT_EQ
(
ch
->
Receive
(
&
data
),
false
);
*
p
=
true
;
*
p
=
true
;
},
},
&
thread_ended
[
i
]);
&
thread_ended
[
i
]);
}
}
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
100
));
// wait
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
100
));
// wait
0.1 sec
// Verify that all threads are blocked
// Verify that all th
e th
reads are blocked
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
EXPECT_EQ
(
thread_ended
[
i
],
false
);
EXPECT_EQ
(
thread_ended
[
i
],
false
);
}
}
// Explicitly close the
channel
// Explicitly close the
thread
// This should unblock all receivers
// This should unblock all receivers
CloseChannel
(
ch
);
CloseChannel
(
ch
);
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
200
));
// wait
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
100
));
// wait 0.1 sec
// Verify that all threads got unblocked
// Verify that all threads got unblocked
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
...
@@ -183,13 +179,12 @@ TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
...
@@ -183,13 +179,12 @@ TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
}
}
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
t
[
i
].
join
();
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
t
[
i
].
join
();
delete
ch
;
}
}
// This tests that closing a buffered channel also unblocks
void
ChannelCloseUnblocksSendersTest
(
Channel
<
int
>
*
ch
)
{
// any senders waiting for channel to have write space
using
paddle
::
framework
::
details
::
Buffered
;
TEST
(
Channel
,
BufferedChannelCloseUnblocksSendersTest
)
{
using
paddle
::
framework
::
details
::
UnBuffered
;
auto
ch
=
MakeChannel
<
int
>
(
1
);
size_t
num_threads
=
5
;
size_t
num_threads
=
5
;
std
::
thread
t
[
num_threads
];
std
::
thread
t
[
num_threads
];
bool
thread_ended
[
num_threads
];
bool
thread_ended
[
num_threads
];
...
@@ -209,34 +204,56 @@ TEST(Channel, BufferedChannelCloseUnblocksSendersTest) {
...
@@ -209,34 +204,56 @@ TEST(Channel, BufferedChannelCloseUnblocksSendersTest) {
}
}
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
100
));
// wait
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
100
));
// wait
// Verify that atleast 4 threads are blocked
if
(
dynamic_cast
<
Buffered
<
int
>
*>
(
ch
))
{
int
ct
=
0
;
// If ch is Buffered, atleast 4 threads must be blocked.
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
int
ct
=
0
;
if
(
thread_ended
[
i
]
==
false
)
ct
++
;
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
if
(
!
thread_ended
[
i
])
ct
++
;
}
EXPECT_GE
(
ct
,
4
);
}
else
{
// If ch is UnBuffered, all the threads should be blocked.
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
EXPECT_EQ
(
thread_ended
[
i
],
false
);
}
}
}
// Atleast 4 threads must be blocked
EXPECT_GE
(
ct
,
4
);
// Explicitly close the thread
// Explicitly close the thread
// This should unblock all senders
// This should unblock all senders
CloseChannel
(
ch
);
CloseChannel
(
ch
);
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
2
00
));
// wait
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
1
00
));
// wait
// Verify that all threads got unblocked
// Verify that all threads got unblocked
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
EXPECT_EQ
(
thread_ended
[
i
],
true
);
EXPECT_EQ
(
thread_ended
[
i
],
true
);
}
}
// Verify that only 1 send was successful
if
(
dynamic_cast
<
Buffered
<
int
>
*>
(
ch
))
{
ct
=
0
;
// Verify that only 1 send was successful
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
int
ct
=
0
;
if
(
send_success
[
i
])
ct
++
;
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
if
(
send_success
[
i
])
ct
++
;
}
// Only 1 send must be successful
EXPECT_EQ
(
ct
,
1
);
}
}
// Only 1 send must be successful
EXPECT_EQ
(
ct
,
1
);
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
t
[
i
].
join
();
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
t
[
i
].
join
();
}
// This tests that closing a buffered channel also unblocks
// any receivers waiting on the channel
TEST
(
Channel
,
BufferedChannelCloseUnblocksReceiversTest
)
{
auto
ch
=
MakeChannel
<
int
>
(
1
);
ChannelCloseUnblocksReceiversTest
(
ch
);
delete
ch
;
}
// This tests that closing a buffered channel also unblocks
// any senders waiting for channel to have write space
TEST
(
Channel
,
BufferedChannelCloseUnblocksSendersTest
)
{
auto
ch
=
MakeChannel
<
int
>
(
1
);
ChannelCloseUnblocksSendersTest
(
ch
);
delete
ch
;
delete
ch
;
}
}
...
@@ -244,40 +261,7 @@ TEST(Channel, BufferedChannelCloseUnblocksSendersTest) {
...
@@ -244,40 +261,7 @@ TEST(Channel, BufferedChannelCloseUnblocksSendersTest) {
// unblocks any receivers waiting for senders
// unblocks any receivers waiting for senders
TEST
(
Channel
,
UnbufferedChannelCloseUnblocksReceiversTest
)
{
TEST
(
Channel
,
UnbufferedChannelCloseUnblocksReceiversTest
)
{
auto
ch
=
MakeChannel
<
int
>
(
0
);
auto
ch
=
MakeChannel
<
int
>
(
0
);
size_t
num_threads
=
5
;
ChannelCloseUnblocksReceiversTest
(
ch
);
std
::
thread
t
[
num_threads
];
bool
thread_ended
[
num_threads
];
// Launches threads that try to read and are blocked becausew of no writers
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
thread_ended
[
i
]
=
false
;
t
[
i
]
=
std
::
thread
(
[
&
](
bool
*
p
)
{
int
data
;
EXPECT_EQ
(
ch
->
Receive
(
&
data
),
false
);
*
p
=
true
;
},
&
thread_ended
[
i
]);
}
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
500
));
// wait 0.5 sec
// Verify that all the threads are blocked
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
EXPECT_EQ
(
thread_ended
[
i
],
false
);
}
// Explicitly close the thread
// This should unblock all receivers
CloseChannel
(
ch
);
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
500
));
// wait 0.5 sec
// Verify that all threads got unblocked
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
EXPECT_EQ
(
thread_ended
[
i
],
true
);
}
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
t
[
i
].
join
();
delete
ch
;
delete
ch
;
}
}
...
@@ -285,40 +269,7 @@ TEST(Channel, UnbufferedChannelCloseUnblocksReceiversTest) {
...
@@ -285,40 +269,7 @@ TEST(Channel, UnbufferedChannelCloseUnblocksReceiversTest) {
// unblocks any senders waiting for senders
// unblocks any senders waiting for senders
TEST
(
Channel
,
UnbufferedChannelCloseUnblocksSendersTest
)
{
TEST
(
Channel
,
UnbufferedChannelCloseUnblocksSendersTest
)
{
auto
ch
=
MakeChannel
<
int
>
(
0
);
auto
ch
=
MakeChannel
<
int
>
(
0
);
size_t
num_threads
=
5
;
ChannelCloseUnblocksReceiversTest
(
ch
);
std
::
thread
t
[
num_threads
];
bool
thread_ended
[
num_threads
];
// Launches threads that try to read and are blocked becausew of no writers
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
thread_ended
[
i
]
=
false
;
t
[
i
]
=
std
::
thread
(
[
&
](
bool
*
p
)
{
int
data
=
10
;
EXPECT_EQ
(
ch
->
Send
(
&
data
),
false
);
*
p
=
true
;
},
&
thread_ended
[
i
]);
}
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
500
));
// wait 0.5 sec
// Verify that all the threads are blocked
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
EXPECT_EQ
(
thread_ended
[
i
],
false
);
}
// Explicitly close the thread
// This should unblock all receivers
CloseChannel
(
ch
);
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
500
));
// wait 0.5 sec
// Verify that all threads got unblocked
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
{
EXPECT_EQ
(
thread_ended
[
i
],
true
);
}
for
(
size_t
i
=
0
;
i
<
num_threads
;
i
++
)
t
[
i
].
join
();
delete
ch
;
delete
ch
;
}
}
...
...
paddle/framework/details/buffered_channel.h
浏览文件 @
d2a31fb8
...
@@ -25,6 +25,13 @@ namespace paddle {
...
@@ -25,6 +25,13 @@ namespace paddle {
namespace
framework
{
namespace
framework
{
namespace
details
{
namespace
details
{
// Four of the properties of Buffered Channel:
// - A send to a full channel blocks temporarily until a receive from the
// channel or the channel is closed
// - A receive from an empty channel blocks temporarily until a send to the
// channel or the channel is closed
// - A send to a closed channel returns false immediately
// - A receive from a closed channel returns false immediately
template
<
typename
T
>
template
<
typename
T
>
class
Buffered
:
public
paddle
::
framework
::
Channel
<
T
>
{
class
Buffered
:
public
paddle
::
framework
::
Channel
<
T
>
{
friend
Channel
<
T
>*
paddle
::
framework
::
MakeChannel
<
T
>
(
size_t
);
friend
Channel
<
T
>*
paddle
::
framework
::
MakeChannel
<
T
>
(
size_t
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录