diff --git a/paddle/framework/channel_test.cc b/paddle/framework/channel_test.cc index 31ac72eda98859327f9857c18287398d0f459c7b..c3533bbb1ac5b0136b0937bcf3d1eec9fbdd0b59 100644 --- a/paddle/framework/channel_test.cc +++ b/paddle/framework/channel_test.cc @@ -67,7 +67,7 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) { std::thread t([&]() { // Try to write more than buffer size. for (size_t i = 0; i < 2 * buffer_size; ++i) { - ch->Send(&i); // should not block + ch->Send(&i); // should block after 10 iterations sum += i; } }); @@ -207,3 +207,37 @@ TEST(Channel, UnbufferedLessReceiveMoreSendTest) { t.join(); delete ch; } + +TEST(Channel, UnbufferedMoreReceiveLessSendTest) { + auto ch = MakeChannel(0); + unsigned sum_send = 0; + unsigned sum_receive = 0; + // The receiver should block after 5 + // iterations, since there are only 5 senders. + std::thread t([&]() { + for (int i = 0; i < 8; i++) { + int recv; + ch->Receive(&recv); // should block after the fifth iteration. + EXPECT_EQ(recv, i); + sum_receive += i; + } + }); + for (int i = 0; i < 5; i++) { + ch->Send(&i); + sum_send += i; + } + std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec + EXPECT_EQ(sum_send, 10U); + EXPECT_EQ(sum_receive, 10U); + // send three more elements + for (int i = 5; i < 8; i++) { + ch->Send(&i); + sum_send += i; + } + + CloseChannel(ch); + t.join(); + EXPECT_EQ(sum_send, 28U); + EXPECT_EQ(sum_receive, 28U); + delete ch; +}