未验证 提交 1d2dd9c4 编写于 作者: A Abhinav Arora 提交者: GitHub

Close buffered channel should unblock the blocked senders and receivers (#8109)

上级 b60da672
...@@ -48,12 +48,12 @@ TEST(Channel, SufficientBufferSizeDoesntBlock) { ...@@ -48,12 +48,12 @@ TEST(Channel, SufficientBufferSizeDoesntBlock) {
const size_t buffer_size = 10; const size_t buffer_size = 10;
auto ch = MakeChannel<size_t>(buffer_size); auto ch = MakeChannel<size_t>(buffer_size);
for (size_t i = 0; i < buffer_size; ++i) { for (size_t i = 0; i < buffer_size; ++i) {
ch->Send(&i); // should not block EXPECT_EQ(ch->Send(&i), true); // should not block
} }
size_t out; size_t out;
for (size_t i = 0; i < buffer_size; ++i) { for (size_t i = 0; i < buffer_size; ++i) {
ch->Receive(&out); // should not block EXPECT_EQ(ch->Receive(&out), true); // should not block
EXPECT_EQ(out, i); EXPECT_EQ(out, i);
} }
CloseChannel(ch); CloseChannel(ch);
...@@ -67,7 +67,10 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) { ...@@ -67,7 +67,10 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) {
std::thread t([&]() { std::thread t([&]() {
// Try to write more than buffer size. // Try to write more than buffer size.
for (size_t i = 0; i < 2 * buffer_size; ++i) { for (size_t i = 0; i < 2 * buffer_size; ++i) {
ch->Send(&i); // should block after 10 iterations if (i < buffer_size)
EXPECT_EQ(ch->Send(&i), true); // should block after 10 iterations
else
EXPECT_EQ(ch->Send(&i), false);
sum += i; sum += i;
} }
}); });
...@@ -84,13 +87,13 @@ TEST(Channel, SimpleUnbufferedChannelTest) { ...@@ -84,13 +87,13 @@ TEST(Channel, SimpleUnbufferedChannelTest) {
unsigned sum_send = 0; unsigned sum_send = 0;
std::thread t([&]() { std::thread t([&]() {
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
ch->Send(&i); EXPECT_EQ(ch->Send(&i), true);
sum_send += i; sum_send += i;
} }
}); });
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
int recv; int recv;
ch->Receive(&recv); EXPECT_EQ(ch->Receive(&recv), true);
EXPECT_EQ(recv, i); EXPECT_EQ(recv, i);
} }
...@@ -100,6 +103,102 @@ TEST(Channel, SimpleUnbufferedChannelTest) { ...@@ -100,6 +103,102 @@ TEST(Channel, SimpleUnbufferedChannelTest) {
delete ch; delete ch;
} }
// This tests that closing a buffered channel also unblocks
// any receivers waiting on the channel
TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
auto ch = MakeChannel<int>(1);
size_t num_threads = 5;
std::thread t[num_threads];
bool thread_ended[num_threads];
// Launches threads that try to read and are blocked because of no writers
for (size_t i = 0; i < num_threads; i++) {
thread_ended[i] = false;
t[i] = std::thread(
[&](bool *p) {
int data;
// All reads should return false
EXPECT_EQ(ch->Receive(&data), false);
*p = true;
},
&thread_ended[i]);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait
// Verify that all threads are blocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], false);
}
// Explicitly close the channel
// This should unblock all receivers
CloseChannel(ch);
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
// Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true);
}
for (size_t i = 0; i < num_threads; i++) t[i].join();
delete ch;
}
// This tests that closing a buffered channel also unblocks
// any senders waiting for channel to have write space
TEST(Channel, BufferedChannelCloseUnblocksSendersTest) {
auto ch = MakeChannel<int>(1);
size_t num_threads = 5;
std::thread t[num_threads];
bool thread_ended[num_threads];
bool send_success[num_threads];
// Launches threads that try to write and are blocked because of no readers
for (size_t i = 0; i < num_threads; i++) {
thread_ended[i] = false;
send_success[i] = false;
t[i] = std::thread(
[&](bool *ended, bool *success) {
int data = 10;
*success = ch->Send(&data);
*ended = true;
},
&thread_ended[i], &send_success[i]);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait
// Verify that atleast 4 threads are blocked
int ct = 0;
for (size_t i = 0; i < num_threads; i++) {
if (thread_ended[i] == false) ct++;
}
// Atleast 4 threads must be blocked
EXPECT_GE(ct, 4);
// Explicitly close the thread
// This should unblock all senders
CloseChannel(ch);
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
// Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true);
}
// Verify that only 1 send was successful
ct = 0;
for (size_t i = 0; i < num_threads; i++) {
if (send_success[i]) ct++;
}
// Only 1 send must be successful
EXPECT_EQ(ct, 1);
for (size_t i = 0; i < num_threads; i++) t[i].join();
delete ch;
}
// This tests that closing an unbuffered channel also unblocks // This tests that closing an unbuffered channel also unblocks
// unblocks any receivers waiting for senders // unblocks any receivers waiting for senders
TEST(Channel, UnbufferedChannelCloseUnblocksReceiversTest) { TEST(Channel, UnbufferedChannelCloseUnblocksReceiversTest) {
...@@ -114,7 +213,7 @@ TEST(Channel, UnbufferedChannelCloseUnblocksReceiversTest) { ...@@ -114,7 +213,7 @@ TEST(Channel, UnbufferedChannelCloseUnblocksReceiversTest) {
t[i] = std::thread( t[i] = std::thread(
[&](bool *p) { [&](bool *p) {
int data; int data;
ch->Receive(&data); EXPECT_EQ(ch->Receive(&data), false);
*p = true; *p = true;
}, },
&thread_ended[i]); &thread_ended[i]);
...@@ -155,7 +254,7 @@ TEST(Channel, UnbufferedChannelCloseUnblocksSendersTest) { ...@@ -155,7 +254,7 @@ TEST(Channel, UnbufferedChannelCloseUnblocksSendersTest) {
t[i] = std::thread( t[i] = std::thread(
[&](bool *p) { [&](bool *p) {
int data = 10; int data = 10;
ch->Send(&data); EXPECT_EQ(ch->Send(&data), false);
*p = true; *p = true;
}, },
&thread_ended[i]); &thread_ended[i]);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册