diff --git a/paddle/fluid/framework/channel.h b/paddle/fluid/framework/channel.h index 019bea600f496a6b58579ad0aa8af836cd6134a9..722bf8e8ecba0c9cbc5e3ad737dbf73148d2873c 100644 --- a/paddle/fluid/framework/channel.h +++ b/paddle/fluid/framework/channel.h @@ -14,8 +14,8 @@ limitations under the License. */ #pragma once -#include // for size_t -#include +#include // for size_t +#include // NOLINT #include #include "paddle/fluid/platform/enforce.h" @@ -216,7 +216,8 @@ class ChannelHolder { template struct PlaceholderImpl : public Placeholder { - PlaceholderImpl(size_t buffer_size) : type_(std::type_index(typeid(T))) { + explicit PlaceholderImpl(size_t buffer_size) + : type_(std::type_index(typeid(T))) { channel_.reset(MakeChannel(buffer_size)); } diff --git a/paddle/fluid/framework/channel_impl.h b/paddle/fluid/framework/channel_impl.h index e056779ea0dd0a31191b628f82724298efaf50ff..26d454534e1ae38c4f83376c0836a45781ea9101 100644 --- a/paddle/fluid/framework/channel_impl.h +++ b/paddle/fluid/framework/channel_impl.h @@ -15,7 +15,7 @@ limitations under the License. */ #pragma once #include // for size_t #include -#include +#include // NOLINT #include #include "paddle/fluid/framework/channel.h" #include "paddle/fluid/platform/enforce.h" @@ -38,7 +38,7 @@ class ChannelImpl : public paddle::framework::Channel { virtual void Unlock(); virtual bool IsClosed(); virtual void Close(); - ChannelImpl(size_t); + explicit ChannelImpl(size_t); virtual ~ChannelImpl(); virtual void AddToSendQ(const void *referrer, T *data, @@ -60,7 +60,7 @@ class ChannelImpl : public paddle::framework::Channel { const void *referrer; // TODO(thuan): figure out better way to do this std::function callback; - QueueMessage(T *item) + explicit QueueMessage(T *item) : data(item), cond(std::make_shared()) {} QueueMessage(T *item, std::shared_ptr cond) @@ -88,15 +88,15 @@ class ChannelImpl : public paddle::framework::Channel { } std::shared_ptr get_first_message( - std::deque> &queue, ChannelAction action) { - while (!queue.empty()) { + std::deque> *queue, ChannelAction action) { + while (!queue->empty()) { // Check whether this message was added by Select // If this was added by Select then execute the callback // to check if you can execute this message. The callback // can return false if some other case was executed in Select. // In that case just discard this QueueMessage and process next. - std::shared_ptr m = queue.front(); - queue.pop_front(); + std::shared_ptr m = queue->front(); + queue->pop_front(); if (m->callback == nullptr || m->callback(action)) return m; } return nullptr; @@ -147,7 +147,7 @@ void ChannelImpl::Send(T *item) { // to send to the receiver, bypassing the channel buffer if any if (!recvq.empty()) { std::shared_ptr m = - get_first_message(recvq, ChannelAction::SEND); + get_first_message(&recvq, ChannelAction::SEND); if (m != nullptr) { *(m->data) = std::move(*item); @@ -198,7 +198,7 @@ bool ChannelImpl::Receive(T *item) { // buffer and move front of send queue to the buffer if (!sendq.empty()) { std::shared_ptr m = - get_first_message(sendq, ChannelAction::RECEIVE); + get_first_message(&sendq, ChannelAction::RECEIVE); if (buf_.size() > 0) { // Case 1 : Channel is Buffered // Do Data transfer from front of buffer @@ -219,8 +219,9 @@ bool ChannelImpl::Receive(T *item) { if (m != nullptr) { *item = std::move(*(m->data)); m->Notify(); - } else + } else { return recv_return(Receive(item)); + } } return recv_return(true); } diff --git a/paddle/fluid/framework/channel_test.cc b/paddle/fluid/framework/channel_test.cc index 1184bfdae1940286fb72d9091ae4f23ff7f84a54..542d791f6bbdf7d68a4786998ccc0233fff6473d 100644 --- a/paddle/fluid/framework/channel_test.cc +++ b/paddle/fluid/framework/channel_test.cc @@ -14,8 +14,8 @@ limitations under the License. */ #include "paddle/fluid/framework/channel.h" -#include -#include +#include // NOLINT +#include // NOLINT #include "gtest/gtest.h" using paddle::framework::Channel; @@ -166,9 +166,9 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) { std::thread t([&]() { // Try to write more than buffer size. for (size_t i = 0; i < 2 * buffer_size; ++i) { - if (i < buffer_size) + if (i < buffer_size) { ch->Send(&i); // should block after 10 iterations - else { + } else { bool is_exception = false; try { ch->Send(&i); @@ -212,12 +212,12 @@ TEST(Channel, RecevingOrderEqualToSendingOrderWithBufferedChannel3) { } void ChannelCloseUnblocksReceiversTest(Channel *ch) { - size_t num_threads = 5; - std::thread t[num_threads]; - bool thread_ended[num_threads]; + const size_t kNumThreads = 5; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; // Launches threads that try to read and are blocked because of no writers - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { thread_ended[i] = false; t[i] = std::thread( [&](bool *p) { @@ -230,7 +230,7 @@ void ChannelCloseUnblocksReceiversTest(Channel *ch) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec // Verify that all the threads are blocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], false); } @@ -241,21 +241,21 @@ void ChannelCloseUnblocksReceiversTest(Channel *ch) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec // Verify that all threads got unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } void ChannelCloseUnblocksSendersTest(Channel *ch, bool isBuffered) { - size_t num_threads = 5; - std::thread t[num_threads]; - bool thread_ended[num_threads]; - bool send_success[num_threads]; + const size_t kNumThreads = 5; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; + bool send_success[kNumThreads]; // Launches threads that try to write and are blocked because of no readers - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { thread_ended[i] = false; send_success[i] = false; t[i] = std::thread( @@ -277,13 +277,13 @@ void ChannelCloseUnblocksSendersTest(Channel *ch, bool isBuffered) { if (isBuffered) { // If ch is Buffered, atleast 4 threads must be blocked. int ct = 0; - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { if (!thread_ended[i]) ct++; } EXPECT_GE(ct, 4); } else { // If ch is UnBuffered, all the threads should be blocked. - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], false); } } @@ -294,21 +294,21 @@ void ChannelCloseUnblocksSendersTest(Channel *ch, bool isBuffered) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait // Verify that all threads got unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } if (isBuffered) { // Verify that only 1 send was successful int ct = 0; - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { if (send_success[i]) ct++; } // Only 1 send must be successful EXPECT_EQ(ct, 1); } - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } // This tests that closing a buffered channel also unblocks @@ -409,13 +409,13 @@ TEST(Channel, UnbufferedMoreReceiveLessSendTest) { // This tests that destroying a channel unblocks // any senders waiting for channel to have write space void ChannelDestroyUnblockSenders(Channel *ch, bool isBuffered) { - size_t num_threads = 5; - std::thread t[num_threads]; - bool thread_ended[num_threads]; - bool send_success[num_threads]; + const size_t kNumThreads = 5; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; + bool send_success[kNumThreads]; // Launches threads that try to write and are blocked because of no readers - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { thread_ended[i] = false; send_success[i] = false; t[i] = std::thread( @@ -438,14 +438,14 @@ void ChannelDestroyUnblockSenders(Channel *ch, bool isBuffered) { if (isBuffered) { // If channel is buffered, verify that atleast 4 threads are blocked int ct = 0; - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { if (thread_ended[i] == false) ct++; } // Atleast 4 threads must be blocked EXPECT_GE(ct, 4); } else { // Verify that all the threads are blocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], false); } } @@ -454,13 +454,13 @@ void ChannelDestroyUnblockSenders(Channel *ch, bool isBuffered) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait // Verify that all threads got unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } // Count number of successful sends int ct = 0; - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { if (send_success[i]) ct++; } @@ -473,18 +473,18 @@ void ChannelDestroyUnblockSenders(Channel *ch, bool isBuffered) { } // Join all threads - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } // This tests that destroying a channel also unblocks // any receivers waiting on the channel void ChannelDestroyUnblockReceivers(Channel *ch) { - size_t num_threads = 5; - std::thread t[num_threads]; - bool thread_ended[num_threads]; + const size_t kNumThreads = 5; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; // Launches threads that try to read and are blocked because of no writers - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { thread_ended[i] = false; t[i] = std::thread( [&](bool *p) { @@ -498,18 +498,18 @@ void ChannelDestroyUnblockReceivers(Channel *ch) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait // Verify that all threads are blocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], false); } // delete the channel delete ch; std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait // Verify that all threads got unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } TEST(Channel, BufferedChannelDestroyUnblocksReceiversTest) { @@ -679,12 +679,12 @@ TEST(ChannelHolder, TypeMismatchReceiveTest) { } void ChannelHolderCloseUnblocksReceiversTest(ChannelHolder *ch) { - size_t num_threads = 5; - std::thread t[num_threads]; - bool thread_ended[num_threads]; + const size_t kNumThreads = 5; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; // Launches threads that try to read and are blocked because of no writers - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { thread_ended[i] = false; t[i] = std::thread( [&](bool *p) { @@ -697,7 +697,7 @@ void ChannelHolderCloseUnblocksReceiversTest(ChannelHolder *ch) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec // Verify that all the threads are blocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], false); } @@ -708,21 +708,21 @@ void ChannelHolderCloseUnblocksReceiversTest(ChannelHolder *ch) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec // Verify that all threads got unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } void ChannelHolderCloseUnblocksSendersTest(ChannelHolder *ch, bool isBuffered) { - size_t num_threads = 5; - std::thread t[num_threads]; - bool thread_ended[num_threads]; - bool send_success[num_threads]; + const size_t kNumThreads = 5; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; + bool send_success[kNumThreads]; // Launches threads that try to write and are blocked because of no readers - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { thread_ended[i] = false; send_success[i] = false; t[i] = std::thread( @@ -744,13 +744,13 @@ void ChannelHolderCloseUnblocksSendersTest(ChannelHolder *ch, bool isBuffered) { if (isBuffered) { // If ch is Buffered, atleast 4 threads must be blocked. int ct = 0; - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { if (!thread_ended[i]) ct++; } EXPECT_GE(ct, 4); } else { // If ch is UnBuffered, all the threads should be blocked. - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], false); } } @@ -761,21 +761,21 @@ void ChannelHolderCloseUnblocksSendersTest(ChannelHolder *ch, bool isBuffered) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait // Verify that all threads got unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } if (isBuffered) { // Verify that only 1 send was successful int ct = 0; - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { if (send_success[i]) ct++; } // Only 1 send must be successful EXPECT_EQ(ct, 1); } - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } // This tests that closing a channelholder unblocks @@ -813,13 +813,13 @@ TEST(Channel, ChannelHolderCloseUnblocksSendersTest) { // This tests that destroying a channelholder unblocks // any senders waiting for channel void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) { - size_t num_threads = 5; - std::thread t[num_threads]; - bool thread_ended[num_threads]; - bool send_success[num_threads]; + const size_t kNumThreads = 5; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; + bool send_success[kNumThreads]; // Launches threads that try to write and are blocked because of no readers - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { thread_ended[i] = false; send_success[i] = false; t[i] = std::thread( @@ -841,14 +841,14 @@ void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) { if (isBuffered) { // If channel is buffered, verify that atleast 4 threads are blocked int ct = 0; - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { if (thread_ended[i] == false) ct++; } // Atleast 4 threads must be blocked EXPECT_GE(ct, 4); } else { // Verify that all the threads are blocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], false); } } @@ -857,13 +857,13 @@ void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait // Verify that all threads got unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } // Count number of successfuld sends int ct = 0; - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { if (send_success[i]) ct++; } @@ -876,18 +876,18 @@ void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) { } // Join all threads - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } // This tests that destroying a channelholder also unblocks // any receivers waiting on the channel void ChannelHolderDestroyUnblockReceivers(ChannelHolder *ch) { - size_t num_threads = 5; - std::thread t[num_threads]; - bool thread_ended[num_threads]; + const size_t kNumThreads = 5; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; // Launches threads that try to read and are blocked because of no writers - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { thread_ended[i] = false; t[i] = std::thread( [&](bool *p) { @@ -901,18 +901,18 @@ void ChannelHolderDestroyUnblockReceivers(ChannelHolder *ch) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait // Verify that all threads are blocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], false); } // delete the channel delete ch; std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait // Verify that all threads got unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } TEST(ChannelHolder, ChannelHolderDestroyUnblocksReceiversTest) { @@ -945,12 +945,12 @@ TEST(ChannelHolder, ChannelHolderDestroyUnblocksSendersTest) { // This tests that closing a channelholder many times. void ChannelHolderManyTimesClose(ChannelHolder *ch) { - const int num_threads = 15; - std::thread t[num_threads]; - bool thread_ended[num_threads]; + const int kNumThreads = 15; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; // Launches threads that try to send data to channel. - for (size_t i = 0; i < num_threads / 3; i++) { + for (size_t i = 0; i < kNumThreads / 3; i++) { thread_ended[i] = false; t[i] = std::thread( [&](bool *ended) { @@ -962,7 +962,7 @@ void ChannelHolderManyTimesClose(ChannelHolder *ch) { } // Launches threads that try to receive data to channel. - for (size_t i = num_threads / 3; i < 2 * num_threads / 3; i++) { + for (size_t i = kNumThreads / 3; i < 2 * kNumThreads / 3; i++) { thread_ended[i] = false; t[i] = std::thread( [&](bool *p) { @@ -976,7 +976,7 @@ void ChannelHolderManyTimesClose(ChannelHolder *ch) { } // Launches threads that try to close the channel. - for (size_t i = 2 * num_threads / 3; i < num_threads; i++) { + for (size_t i = 2 * kNumThreads / 3; i < kNumThreads; i++) { thread_ended[i] = false; t[i] = std::thread( [&](bool *p) { @@ -991,13 +991,13 @@ void ChannelHolderManyTimesClose(ChannelHolder *ch) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait // Verify that all threads are unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } EXPECT_TRUE(ch->IsClosed()); // delete the channel delete ch; - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } TEST(ChannelHolder, ChannelHolderManyTimesCloseTest) {