diff --git a/paddle/fluid/framework/block_desc.h b/paddle/fluid/framework/block_desc.h index 468423e0e8e7b8c9ebc14b7568c9c3bd21645ea7..873969b2a884f6d9e133fe87bf72725c36ce8b98 100644 --- a/paddle/fluid/framework/block_desc.h +++ b/paddle/fluid/framework/block_desc.h @@ -17,6 +17,7 @@ limitations under the License. */ #include #include #include +#include #include #include @@ -96,6 +97,8 @@ class BlockDesc { */ void RemoveOp(size_t s, size_t e); + void RemoveVar(const std::string &name) { vars_.erase(name); } + std::vector AllOps() const; size_t OpSize() const { return ops_.size(); } diff --git a/paddle/fluid/framework/channel.h b/paddle/fluid/framework/channel.h index 019bea600f496a6b58579ad0aa8af836cd6134a9..722bf8e8ecba0c9cbc5e3ad737dbf73148d2873c 100644 --- a/paddle/fluid/framework/channel.h +++ b/paddle/fluid/framework/channel.h @@ -14,8 +14,8 @@ limitations under the License. */ #pragma once -#include // for size_t -#include +#include // for size_t +#include // NOLINT #include #include "paddle/fluid/platform/enforce.h" @@ -216,7 +216,8 @@ class ChannelHolder { template struct PlaceholderImpl : public Placeholder { - PlaceholderImpl(size_t buffer_size) : type_(std::type_index(typeid(T))) { + explicit PlaceholderImpl(size_t buffer_size) + : type_(std::type_index(typeid(T))) { channel_.reset(MakeChannel(buffer_size)); } diff --git a/paddle/fluid/framework/channel_impl.h b/paddle/fluid/framework/channel_impl.h index e056779ea0dd0a31191b628f82724298efaf50ff..26d454534e1ae38c4f83376c0836a45781ea9101 100644 --- a/paddle/fluid/framework/channel_impl.h +++ b/paddle/fluid/framework/channel_impl.h @@ -15,7 +15,7 @@ limitations under the License. */ #pragma once #include // for size_t #include -#include +#include // NOLINT #include #include "paddle/fluid/framework/channel.h" #include "paddle/fluid/platform/enforce.h" @@ -38,7 +38,7 @@ class ChannelImpl : public paddle::framework::Channel { virtual void Unlock(); virtual bool IsClosed(); virtual void Close(); - ChannelImpl(size_t); + explicit ChannelImpl(size_t); virtual ~ChannelImpl(); virtual void AddToSendQ(const void *referrer, T *data, @@ -60,7 +60,7 @@ class ChannelImpl : public paddle::framework::Channel { const void *referrer; // TODO(thuan): figure out better way to do this std::function callback; - QueueMessage(T *item) + explicit QueueMessage(T *item) : data(item), cond(std::make_shared()) {} QueueMessage(T *item, std::shared_ptr cond) @@ -88,15 +88,15 @@ class ChannelImpl : public paddle::framework::Channel { } std::shared_ptr get_first_message( - std::deque> &queue, ChannelAction action) { - while (!queue.empty()) { + std::deque> *queue, ChannelAction action) { + while (!queue->empty()) { // Check whether this message was added by Select // If this was added by Select then execute the callback // to check if you can execute this message. The callback // can return false if some other case was executed in Select. // In that case just discard this QueueMessage and process next. - std::shared_ptr m = queue.front(); - queue.pop_front(); + std::shared_ptr m = queue->front(); + queue->pop_front(); if (m->callback == nullptr || m->callback(action)) return m; } return nullptr; @@ -147,7 +147,7 @@ void ChannelImpl::Send(T *item) { // to send to the receiver, bypassing the channel buffer if any if (!recvq.empty()) { std::shared_ptr m = - get_first_message(recvq, ChannelAction::SEND); + get_first_message(&recvq, ChannelAction::SEND); if (m != nullptr) { *(m->data) = std::move(*item); @@ -198,7 +198,7 @@ bool ChannelImpl::Receive(T *item) { // buffer and move front of send queue to the buffer if (!sendq.empty()) { std::shared_ptr m = - get_first_message(sendq, ChannelAction::RECEIVE); + get_first_message(&sendq, ChannelAction::RECEIVE); if (buf_.size() > 0) { // Case 1 : Channel is Buffered // Do Data transfer from front of buffer @@ -219,8 +219,9 @@ bool ChannelImpl::Receive(T *item) { if (m != nullptr) { *item = std::move(*(m->data)); m->Notify(); - } else + } else { return recv_return(Receive(item)); + } } return recv_return(true); } diff --git a/paddle/fluid/framework/channel_test.cc b/paddle/fluid/framework/channel_test.cc index 1184bfdae1940286fb72d9091ae4f23ff7f84a54..542d791f6bbdf7d68a4786998ccc0233fff6473d 100644 --- a/paddle/fluid/framework/channel_test.cc +++ b/paddle/fluid/framework/channel_test.cc @@ -14,8 +14,8 @@ limitations under the License. */ #include "paddle/fluid/framework/channel.h" -#include -#include +#include // NOLINT +#include // NOLINT #include "gtest/gtest.h" using paddle::framework::Channel; @@ -166,9 +166,9 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) { std::thread t([&]() { // Try to write more than buffer size. for (size_t i = 0; i < 2 * buffer_size; ++i) { - if (i < buffer_size) + if (i < buffer_size) { ch->Send(&i); // should block after 10 iterations - else { + } else { bool is_exception = false; try { ch->Send(&i); @@ -212,12 +212,12 @@ TEST(Channel, RecevingOrderEqualToSendingOrderWithBufferedChannel3) { } void ChannelCloseUnblocksReceiversTest(Channel *ch) { - size_t num_threads = 5; - std::thread t[num_threads]; - bool thread_ended[num_threads]; + const size_t kNumThreads = 5; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; // Launches threads that try to read and are blocked because of no writers - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { thread_ended[i] = false; t[i] = std::thread( [&](bool *p) { @@ -230,7 +230,7 @@ void ChannelCloseUnblocksReceiversTest(Channel *ch) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec // Verify that all the threads are blocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], false); } @@ -241,21 +241,21 @@ void ChannelCloseUnblocksReceiversTest(Channel *ch) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec // Verify that all threads got unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } void ChannelCloseUnblocksSendersTest(Channel *ch, bool isBuffered) { - size_t num_threads = 5; - std::thread t[num_threads]; - bool thread_ended[num_threads]; - bool send_success[num_threads]; + const size_t kNumThreads = 5; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; + bool send_success[kNumThreads]; // Launches threads that try to write and are blocked because of no readers - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { thread_ended[i] = false; send_success[i] = false; t[i] = std::thread( @@ -277,13 +277,13 @@ void ChannelCloseUnblocksSendersTest(Channel *ch, bool isBuffered) { if (isBuffered) { // If ch is Buffered, atleast 4 threads must be blocked. int ct = 0; - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { if (!thread_ended[i]) ct++; } EXPECT_GE(ct, 4); } else { // If ch is UnBuffered, all the threads should be blocked. - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], false); } } @@ -294,21 +294,21 @@ void ChannelCloseUnblocksSendersTest(Channel *ch, bool isBuffered) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait // Verify that all threads got unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } if (isBuffered) { // Verify that only 1 send was successful int ct = 0; - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { if (send_success[i]) ct++; } // Only 1 send must be successful EXPECT_EQ(ct, 1); } - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } // This tests that closing a buffered channel also unblocks @@ -409,13 +409,13 @@ TEST(Channel, UnbufferedMoreReceiveLessSendTest) { // This tests that destroying a channel unblocks // any senders waiting for channel to have write space void ChannelDestroyUnblockSenders(Channel *ch, bool isBuffered) { - size_t num_threads = 5; - std::thread t[num_threads]; - bool thread_ended[num_threads]; - bool send_success[num_threads]; + const size_t kNumThreads = 5; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; + bool send_success[kNumThreads]; // Launches threads that try to write and are blocked because of no readers - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { thread_ended[i] = false; send_success[i] = false; t[i] = std::thread( @@ -438,14 +438,14 @@ void ChannelDestroyUnblockSenders(Channel *ch, bool isBuffered) { if (isBuffered) { // If channel is buffered, verify that atleast 4 threads are blocked int ct = 0; - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { if (thread_ended[i] == false) ct++; } // Atleast 4 threads must be blocked EXPECT_GE(ct, 4); } else { // Verify that all the threads are blocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], false); } } @@ -454,13 +454,13 @@ void ChannelDestroyUnblockSenders(Channel *ch, bool isBuffered) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait // Verify that all threads got unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } // Count number of successful sends int ct = 0; - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { if (send_success[i]) ct++; } @@ -473,18 +473,18 @@ void ChannelDestroyUnblockSenders(Channel *ch, bool isBuffered) { } // Join all threads - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } // This tests that destroying a channel also unblocks // any receivers waiting on the channel void ChannelDestroyUnblockReceivers(Channel *ch) { - size_t num_threads = 5; - std::thread t[num_threads]; - bool thread_ended[num_threads]; + const size_t kNumThreads = 5; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; // Launches threads that try to read and are blocked because of no writers - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { thread_ended[i] = false; t[i] = std::thread( [&](bool *p) { @@ -498,18 +498,18 @@ void ChannelDestroyUnblockReceivers(Channel *ch) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait // Verify that all threads are blocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], false); } // delete the channel delete ch; std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait // Verify that all threads got unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } TEST(Channel, BufferedChannelDestroyUnblocksReceiversTest) { @@ -679,12 +679,12 @@ TEST(ChannelHolder, TypeMismatchReceiveTest) { } void ChannelHolderCloseUnblocksReceiversTest(ChannelHolder *ch) { - size_t num_threads = 5; - std::thread t[num_threads]; - bool thread_ended[num_threads]; + const size_t kNumThreads = 5; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; // Launches threads that try to read and are blocked because of no writers - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { thread_ended[i] = false; t[i] = std::thread( [&](bool *p) { @@ -697,7 +697,7 @@ void ChannelHolderCloseUnblocksReceiversTest(ChannelHolder *ch) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec // Verify that all the threads are blocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], false); } @@ -708,21 +708,21 @@ void ChannelHolderCloseUnblocksReceiversTest(ChannelHolder *ch) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec // Verify that all threads got unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } void ChannelHolderCloseUnblocksSendersTest(ChannelHolder *ch, bool isBuffered) { - size_t num_threads = 5; - std::thread t[num_threads]; - bool thread_ended[num_threads]; - bool send_success[num_threads]; + const size_t kNumThreads = 5; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; + bool send_success[kNumThreads]; // Launches threads that try to write and are blocked because of no readers - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { thread_ended[i] = false; send_success[i] = false; t[i] = std::thread( @@ -744,13 +744,13 @@ void ChannelHolderCloseUnblocksSendersTest(ChannelHolder *ch, bool isBuffered) { if (isBuffered) { // If ch is Buffered, atleast 4 threads must be blocked. int ct = 0; - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { if (!thread_ended[i]) ct++; } EXPECT_GE(ct, 4); } else { // If ch is UnBuffered, all the threads should be blocked. - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], false); } } @@ -761,21 +761,21 @@ void ChannelHolderCloseUnblocksSendersTest(ChannelHolder *ch, bool isBuffered) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait // Verify that all threads got unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } if (isBuffered) { // Verify that only 1 send was successful int ct = 0; - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { if (send_success[i]) ct++; } // Only 1 send must be successful EXPECT_EQ(ct, 1); } - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } // This tests that closing a channelholder unblocks @@ -813,13 +813,13 @@ TEST(Channel, ChannelHolderCloseUnblocksSendersTest) { // This tests that destroying a channelholder unblocks // any senders waiting for channel void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) { - size_t num_threads = 5; - std::thread t[num_threads]; - bool thread_ended[num_threads]; - bool send_success[num_threads]; + const size_t kNumThreads = 5; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; + bool send_success[kNumThreads]; // Launches threads that try to write and are blocked because of no readers - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { thread_ended[i] = false; send_success[i] = false; t[i] = std::thread( @@ -841,14 +841,14 @@ void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) { if (isBuffered) { // If channel is buffered, verify that atleast 4 threads are blocked int ct = 0; - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { if (thread_ended[i] == false) ct++; } // Atleast 4 threads must be blocked EXPECT_GE(ct, 4); } else { // Verify that all the threads are blocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], false); } } @@ -857,13 +857,13 @@ void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait // Verify that all threads got unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } // Count number of successfuld sends int ct = 0; - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { if (send_success[i]) ct++; } @@ -876,18 +876,18 @@ void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) { } // Join all threads - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } // This tests that destroying a channelholder also unblocks // any receivers waiting on the channel void ChannelHolderDestroyUnblockReceivers(ChannelHolder *ch) { - size_t num_threads = 5; - std::thread t[num_threads]; - bool thread_ended[num_threads]; + const size_t kNumThreads = 5; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; // Launches threads that try to read and are blocked because of no writers - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { thread_ended[i] = false; t[i] = std::thread( [&](bool *p) { @@ -901,18 +901,18 @@ void ChannelHolderDestroyUnblockReceivers(ChannelHolder *ch) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait // Verify that all threads are blocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], false); } // delete the channel delete ch; std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait // Verify that all threads got unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } TEST(ChannelHolder, ChannelHolderDestroyUnblocksReceiversTest) { @@ -945,12 +945,12 @@ TEST(ChannelHolder, ChannelHolderDestroyUnblocksSendersTest) { // This tests that closing a channelholder many times. void ChannelHolderManyTimesClose(ChannelHolder *ch) { - const int num_threads = 15; - std::thread t[num_threads]; - bool thread_ended[num_threads]; + const int kNumThreads = 15; + std::thread t[kNumThreads]; + bool thread_ended[kNumThreads]; // Launches threads that try to send data to channel. - for (size_t i = 0; i < num_threads / 3; i++) { + for (size_t i = 0; i < kNumThreads / 3; i++) { thread_ended[i] = false; t[i] = std::thread( [&](bool *ended) { @@ -962,7 +962,7 @@ void ChannelHolderManyTimesClose(ChannelHolder *ch) { } // Launches threads that try to receive data to channel. - for (size_t i = num_threads / 3; i < 2 * num_threads / 3; i++) { + for (size_t i = kNumThreads / 3; i < 2 * kNumThreads / 3; i++) { thread_ended[i] = false; t[i] = std::thread( [&](bool *p) { @@ -976,7 +976,7 @@ void ChannelHolderManyTimesClose(ChannelHolder *ch) { } // Launches threads that try to close the channel. - for (size_t i = 2 * num_threads / 3; i < num_threads; i++) { + for (size_t i = 2 * kNumThreads / 3; i < kNumThreads; i++) { thread_ended[i] = false; t[i] = std::thread( [&](bool *p) { @@ -991,13 +991,13 @@ void ChannelHolderManyTimesClose(ChannelHolder *ch) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait // Verify that all threads are unblocked - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < kNumThreads; i++) { EXPECT_EQ(thread_ended[i], true); } EXPECT_TRUE(ch->IsClosed()); // delete the channel delete ch; - for (size_t i = 0; i < num_threads; i++) t[i].join(); + for (size_t i = 0; i < kNumThreads; i++) t[i].join(); } TEST(ChannelHolder, ChannelHolderManyTimesCloseTest) { diff --git a/paddle/fluid/framework/lod_tensor.h b/paddle/fluid/framework/lod_tensor.h index dee505fee0dccd8d60bb290a8bec4df243e504a2..4f130d265900483ec7a7c541f2610d17a352913f 100644 --- a/paddle/fluid/framework/lod_tensor.h +++ b/paddle/fluid/framework/lod_tensor.h @@ -142,6 +142,7 @@ class LoDTensor : public Tensor { return (lod_)[level].size() - 1; } + // Split LoDTensor and copy to each place specified in places. std::vector SplitLoDTensor( const std::vector places) const; diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 17885143247f0e0db8f12931e3c3412e7114ef3d..7be93fa6002ae93c3e1b75c8f7fe5ca5f40b271f 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -150,13 +150,30 @@ void ParallelExecutor::BCastParamsToGPUs( #endif } -void ParallelExecutor::Run(const std::vector &fetch_tensors, - const std::string &fetched_var_name) { +void ParallelExecutor::Run( + const std::vector &fetch_tensors, + const std::string &fetched_var_name, + const std::unordered_map &feed_tensors) { platform::RecordBlock b(0); + SplitTensorToPlaces(feed_tensors); auto fetch_data = member_->executor_->Run(fetch_tensors); *member_->global_scope_->Var(fetched_var_name)->GetMutable() = fetch_data; } +void ParallelExecutor::SplitTensorToPlaces( + const std::unordered_map &feed_tensors) { + for (auto it : feed_tensors) { + auto lod_tensors = it.second.SplitLoDTensor(member_->places_); + for (size_t j = 0; j < member_->places_.size(); ++j) { + // TODO(panxy0718): Do I need to delete this var? + member_->local_scopes_[j] + ->Var(it.first) + ->GetMutable() + ->ShareDataWith(lod_tensors[j]); + } + } +} + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 964b476234e622cae934d41bc3793bc3114a5f1a..c7c58b2b808383621a6d492f9188b0d36bfa6858 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -42,9 +42,13 @@ class ParallelExecutor { bool allow_op_delay); void Run(const std::vector& fetch_tensors, - const std::string& fetched_var_name = "fetched_var"); + const std::string& fetched_var_name, + const std::unordered_map& feed_tensors); private: + void SplitTensorToPlaces( + const std::unordered_map& feed_tensors); + ParallelExecutorPrivate* member_; void BCastParamsToGPUs(const ProgramDesc& startup_program) const; diff --git a/paddle/fluid/operators/conv_cudnn_op.cu.cc b/paddle/fluid/operators/conv_cudnn_op.cu.cc index a32aba4c1ff2f5e775aeb41f25b02322dbc6a64a..c70e3cc3c9198008d9eca5f462000aa67ff7e5ba 100644 --- a/paddle/fluid/operators/conv_cudnn_op.cu.cc +++ b/paddle/fluid/operators/conv_cudnn_op.cu.cc @@ -128,10 +128,32 @@ class CUDNNConvOpKernel : public framework::OpKernel { handle, cudnn_input_desc, cudnn_filter_desc, cudnn_conv_desc, cudnn_output_desc, CUDNN_CONVOLUTION_FWD_SPECIFY_WORKSPACE_LIMIT, workspace_size_limit, &algo)); + +#if CUDA_VERSION >= 9000 && CUDNN_VERSION_MIN(7, 0, 1) + // Tensor core is supported since the volta GPU and + // is only enabled when input and filter data are float16 + if (dev_ctx.GetComputeCapability() >= 70 && + std::type_index(typeid(T)) == + std::type_index(typeid(platform::float16))) { + PADDLE_ENFORCE(platform::dynload::cudnnSetConvolutionMathType( + cudnn_conv_desc, CUDNN_TENSOR_OP_MATH)); + // Currently tensor core is only enabled using this algo + algo = CUDNN_CONVOLUTION_FWD_ALGO_IMPLICIT_PRECOMP_GEMM; + } else { + PADDLE_ENFORCE(platform::dynload::cudnnSetConvolutionMathType( + cudnn_conv_desc, CUDNN_DEFAULT_MATH)); + } +#endif + // get workspace size able to allocate PADDLE_ENFORCE(platform::dynload::cudnnGetConvolutionForwardWorkspaceSize( handle, cudnn_input_desc, cudnn_filter_desc, cudnn_conv_desc, cudnn_output_desc, algo, &workspace_size_in_bytes)); + // It is possible for float16 on Volta GPU to allocate more memory than + // the limit because the algo is overrided to use tensor core. + PADDLE_ENFORCE_LE(workspace_size_in_bytes, workspace_size_limit, + "workspace_size to be allocated exceeds the limit"); + // Allocate on GPU memory platform::CUDAPlace gpu = boost::get(ctx.GetPlace()); cudnn_workspace = paddle::memory::Alloc(gpu, workspace_size_in_bytes); diff --git a/paddle/fluid/operators/fc_mkldnn_op.cc b/paddle/fluid/operators/fc_mkldnn_op.cc index 9c704a2949f7100e0812eafe1e58ef04bf71f840..847b7b0c12e1679501dbe83d578b23ca2aef3e9e 100644 --- a/paddle/fluid/operators/fc_mkldnn_op.cc +++ b/paddle/fluid/operators/fc_mkldnn_op.cc @@ -27,8 +27,8 @@ template class MKLDNNMD { public: explicit MKLDNNMD(const T* in, const T* w, bool bias) - : in{paddle::framework::vectorize2int(in->dims())}, - w{paddle::framework::vectorize2int(w->dims())} { + : in(paddle::framework::vectorize2int(in->dims())), + w(paddle::framework::vectorize2int(w->dims())) { with_bias_ = bias; } @@ -78,7 +78,7 @@ class MKLDNNMD { class MKLDNNMemory { public: MKLDNNMemory(MKLDNNMD* t, const mkldnn::engine& e) - : md_{t}, engine_{e} {} + : md_(t), engine_(e) {} virtual ~MKLDNNMemory() = default; template diff --git a/paddle/fluid/platform/cudnn_helper.h b/paddle/fluid/platform/cudnn_helper.h index 7c604e14eb245232ed92f53a00b9bde45c2fbaec..c0d399d078f73743836fc2a0c1d4b1e6b31ecd83 100644 --- a/paddle/fluid/platform/cudnn_helper.h +++ b/paddle/fluid/platform/cudnn_helper.h @@ -257,9 +257,11 @@ class ScopedConvolutionDescriptor { } #endif + cudnnDataType_t compute_type = + (type == CUDNN_DATA_DOUBLE) ? CUDNN_DATA_DOUBLE : CUDNN_DATA_FLOAT; PADDLE_ENFORCE(dynload::cudnnSetConvolutionNdDescriptor( desc_, pads.size(), pads.data(), strides.data(), dilations.data(), - CUDNN_CROSS_CORRELATION, type)); + CUDNN_CROSS_CORRELATION, compute_type)); return desc_; } diff --git a/paddle/fluid/platform/dynload/cudnn.h b/paddle/fluid/platform/dynload/cudnn.h index 81acc445bd3803dede158ff09507a72fb6e293ac..49a54d8478e9a4e507d31a67b924802def356bfa 100644 --- a/paddle/fluid/platform/dynload/cudnn.h +++ b/paddle/fluid/platform/dynload/cudnn.h @@ -16,7 +16,7 @@ limitations under the License. */ #include #include -#include +#include // NOLINT #include "paddle/fluid/platform/dynload/dynamic_loader.h" namespace paddle { @@ -140,7 +140,8 @@ CUDNN_DNN_ROUTINE_EACH_R5(DECLARE_DYNAMIC_LOAD_CUDNN_WRAP) #if CUDNN_VERSION >= 7001 #define CUDNN_DNN_ROUTINE_EACH_R7(__macro) \ - __macro(cudnnSetConvolutionGroupCount); + __macro(cudnnSetConvolutionGroupCount); \ + __macro(cudnnSetConvolutionMathType); CUDNN_DNN_ROUTINE_EACH_R7(DECLARE_DYNAMIC_LOAD_CUDNN_WRAP) #endif diff --git a/paddle/fluid/pybind/protobuf.cc b/paddle/fluid/pybind/protobuf.cc index 45a64f43846e79c27295e52c59dca6bdfaa120a3..985984983a2239f6961bf519bae27fcbb9e7d6d3 100644 --- a/paddle/fluid/pybind/protobuf.cc +++ b/paddle/fluid/pybind/protobuf.cc @@ -15,6 +15,8 @@ limitations under the License. */ #include "paddle/fluid/pybind/protobuf.h" #include #include +#include +#include #include "paddle/fluid/framework/backward.h" #include "paddle/fluid/framework/block_desc.h" #include "paddle/fluid/framework/op_desc.h" @@ -98,7 +100,7 @@ namespace pybind { using namespace paddle::framework; // NOLINT template -static py::bytes SerializeMessage(T &self) { +static py::bytes SerializeMessage(T &self) { // NOLINT // Check IsInitialized in Python std::string retv; PADDLE_ENFORCE(self.Proto()->SerializePartialToString(&retv), @@ -107,7 +109,7 @@ static py::bytes SerializeMessage(T &self) { } // Bind Methods -void BindProgramDesc(py::module &m) { +void BindProgramDesc(py::module &m) { // NOLINT py::class_(m, "ProgramDesc", "") .def(py::init<>()) .def("__init__", @@ -151,7 +153,7 @@ void BindProgramDesc(py::module &m) { }); } -void BindBlockDesc(py::module &m) { +void BindBlockDesc(py::module &m) { // NOLINT py::class_(m, "BlockDesc", "") .def_property_readonly("id", &BlockDesc::ID) .def_property_readonly("parent", &BlockDesc::Parent) @@ -200,13 +202,19 @@ void BindBlockDesc(py::module &m) { return self.FindVarRecursive(name); }, py::return_value_policy::reference) + .def("remove_var", + [](BlockDesc &self, py::bytes byte_name) { + std::string name = byte_name; + return self.RemoveVar(name); + }, + py::return_value_policy::reference) .def("all_vars", &BlockDesc::AllVars, py::return_value_policy::reference) .def("op_size", &BlockDesc::OpSize) .def("op", &BlockDesc::Op, py::return_value_policy::reference) .def("serialize_to_string", SerializeMessage); } -void BindVarDsec(py::module &m) { +void BindVarDsec(py::module &m) { // NOLINT py::class_ var_desc(m, "VarDesc", ""); var_desc .def("name", @@ -257,7 +265,7 @@ void BindVarDsec(py::module &m) { .value("RAW", proto::VarType::RAW); } -void BindOpDesc(py::module &m) { +void BindOpDesc(py::module &m) { // NOLINT py::enum_(m, "AttrType", "") .value("INT", proto::AttrType::INT) .value("INTS", proto::AttrType::INTS) diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index a2c830b3c943b114f3024f23f73f78bf87e1da34..1b3ba414ecb50cc4d75dcaecd1f31265334c9aec 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -26,25 +26,29 @@ class ParallelExecutor(object): use_cuda, num_threads=None, allow_op_delay=False): - places = [] + self._places = [] + self._act_places = [] if use_cuda: for i in xrange(core.get_cuda_device_count()): p = core.Place() - p.set_place(core.CUDAPlace(i)) - places.append(p) + self._act_places.append(core.CUDAPlace(i)) + p.set_place(self._act_places[-1]) + self._places.append(p) else: for i in xrange(multiprocessing.cpu_count()): p = core.Place() - p.set_place(core.CPUPlace()) - places.append(p) + self._act_places.append(core.CPUPlace(i)) + p.set_place(self._act_places[-1]) + self._places.append(p) + assert self._places, "no place for execution" if num_threads is None: if use_cuda: # Experiments on se-resnext shows that too many threads hurt # performance. Worth tunning for other models in the future. - num_threads = len(places) + num_threads = len(self._places) else: - min(len(places) * 2, multiprocessing.cpu_count()) + min(len(self._places) * 2, multiprocessing.cpu_count()) startup = framework.default_startup_program() main = framework.default_main_program() @@ -53,7 +57,7 @@ class ParallelExecutor(object): self.executor = core.ParallelExecutor( num_threads, True if use_cuda else False, # use_event - places, + self._places, set([ p.name for p in main.global_block().iter_parameters() if not p.stop_gradient @@ -65,8 +69,25 @@ class ParallelExecutor(object): allow_op_delay) self.scope = scope - def run(self, fetch_list): + def run(self, fetch_list, feed_dict={}): + """ + :param fetch_list: A list of variable names that will be fetched. + :param feed_dict: A dict mapping for feed variable name to LoDTensor + or numpy array. + :return: fetched value list. + """ + if not isinstance(feed_dict, dict): + raise TypeError("feed_dict should be a dict") + + feed_tensor_dict = {} + for i, feed_name in enumerate(feed_dict): + feed_tensor = feed_dict[feed_name] + if not isinstance(feed_tensor, core.LoDTensor): + feed_tensor = core.LoDTensor() + feed_tensor.set(feed_dict[feed_name], self._act_places[0]) + feed_tensor_dict[feed_name] = feed_tensor + fetch_var_name = '@FETCHED_VAR_NAME@' - self.executor.run(fetch_list, fetch_var_name) + self.executor.run(fetch_list, fetch_var_name, feed_tensor_dict) arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array() return [arr[i] for i in range(len(arr))] diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py index a79e4b3e183eaef06be27a724893799923e84ac1..0cbef82e33c306e2b89ea4f3d66d48da3840ccbd 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -21,13 +21,17 @@ import paddle.dataset.mnist as mnist import paddle.dataset.wmt16 as wmt16 -def simple_fc_net(): - reader = fluid.layers.open_recordio_file( - filename='./mnist.recordio', - shapes=[[-1, 784], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64']) - img, label = fluid.layers.read_file(reader) +def simple_fc_net(use_feed): + if use_feed: + img = fluid.layers.data(name='image', shape=[784], dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + else: + reader = fluid.layers.open_recordio_file( + filename='./mnist.recordio', + shapes=[[-1, 784], [-1, 1]], + lod_levels=[0, 0], + dtypes=['float32', 'int64']) + img, label = fluid.layers.read_file(reader) hidden = img for _ in xrange(4): hidden = fluid.layers.fc( @@ -42,13 +46,18 @@ def simple_fc_net(): return loss -def fc_with_batchnorm(): - reader = fluid.layers.open_recordio_file( - filename='./mnist.recordio', - shapes=[[-1, 784], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64']) - img, label = fluid.layers.read_file(reader) +def fc_with_batchnorm(use_feed): + if use_feed: + img = fluid.layers.data(name='image', shape=[784], dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + else: + reader = fluid.layers.open_recordio_file( + filename='./mnist.recordio', + shapes=[[-1, 784], [-1, 1]], + lod_levels=[0, 0], + dtypes=['float32', 'int64']) + img, label = fluid.layers.read_file(reader) + hidden = img for _ in xrange(1): hidden = fluid.layers.fc( @@ -135,7 +144,9 @@ def bottleneck_block(input, num_filters, stride, cardinality, reduction_ratio): return fluid.layers.elementwise_add(x=short, y=scale, act='relu') -def SE_ResNeXt152Small(batch_size=2): +def SE_ResNeXt152Small(batch_size=2, use_feed=False): + assert not use_feed, "SE_ResNeXt doesn't support feed yet" + img = fluid.layers.fill_constant( shape=[batch_size, 3, 224, 224], dtype='float32', value=0.0) label = fluid.layers.fill_constant( @@ -185,30 +196,28 @@ class TestParallelExecutorBase(unittest.TestCase): memory_opt=True, iter=10, batch_size=None, - allow_op_delay=False): + allow_op_delay=False, + feed_dict={}): main = fluid.Program() startup = fluid.Program() with fluid.program_guard(main, startup): - loss = method() + loss = method(use_feed=len(feed_dict) > 0) adam = fluid.optimizer.Adam() adam.minimize(loss) if memory_opt: fluid.memory_optimize(main) - exe = fluid.ParallelExecutor( - loss_name=loss.name, - use_cuda=True, - allow_op_delay=allow_op_delay) + exe = fluid.ParallelExecutor(loss_name=loss.name, use_cuda=True) if batch_size is not None: batch_size *= fluid.core.get_cuda_device_count() begin = time.time() - first_loss, = exe.run([loss.name]) + first_loss, = exe.run([loss.name], feed_dict=feed_dict) first_loss = numpy.array(first_loss) for i in xrange(iter): - exe.run([]) + exe.run([], feed_dict=feed_dict) - last_loss, = exe.run([loss.name]) + last_loss, = exe.run([loss.name], feed_dict=feed_dict) end = time.time() if batch_size is not None: @@ -242,9 +251,19 @@ class TestMNIST(TestParallelExecutorBase): self.check_network_convergence(simple_fc_net) self.check_network_convergence(simple_fc_net, allow_op_delay=True) + img = numpy.zeros(shape=[32, 784], dtype='float32') + label = numpy.ones(shape=[32, 1], dtype='int64') + self.check_network_convergence( + simple_fc_net, feed_dict={"image": img, + "label": label}) + def test_batchnorm_fc(self): self.check_network_convergence(fc_with_batchnorm) - self.check_network_convergence(fc_with_batchnorm, allow_op_delay=True) + img = numpy.zeros(shape=[32, 784], dtype='float32') + label = numpy.ones(shape=[32, 1], dtype='int64') + self.check_network_convergence( + fc_with_batchnorm, feed_dict={"image": img, + "label": label}) class TestResnet(TestParallelExecutorBase): @@ -400,7 +419,8 @@ def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head): import transformer_model -def transformer(): +def transformer(use_feed): + assert not use_feed, "transfomer doesn't support feed yet" return transformer_model.transformer( ModelHyperParams.src_vocab_size + 1, ModelHyperParams.trg_vocab_size + 1, ModelHyperParams.max_length + 1, diff --git a/python/paddle/fluid/tests/unittests/test_protobuf_descs.py b/python/paddle/fluid/tests/unittests/test_protobuf_descs.py index e4cf4a8bce8a53c0348130716dc18c61ac9a5913..f98a8bbc68a4315df3ae761f2e52b8f11cb620c6 100644 --- a/python/paddle/fluid/tests/unittests/test_protobuf_descs.py +++ b/python/paddle/fluid/tests/unittests/test_protobuf_descs.py @@ -19,9 +19,9 @@ from paddle.fluid.framework import Program class TestOpDesc(unittest.TestCase): def test_op_desc(self): - prog = core.ProgramDesc() - self.assertIsNotNone(prog) - block = prog.block(0) + program_desc = core.ProgramDesc() + self.assertIsNotNone(program_desc) + block = program_desc.block(0) self.assertIsNotNone(block) op = block.append_op() self.assertIsNotNone(op) @@ -67,7 +67,7 @@ class TestOpDesc(unittest.TestCase): self.assertEqual(8, len(op.attr_names())) - op.set_block_attr("block_attr", prog.block(0)) + op.set_block_attr("block_attr", program_desc.block(0)) self.assertEqual(0, op.block_attr("block_attr")) mul_op = block.append_op() @@ -88,20 +88,20 @@ class TestProgramDesc(unittest.TestCase): del program_desc def test_append_block(self): - prog_desc = core.ProgramDesc() - self.assertIsNotNone(prog_desc) - block_root = prog_desc.block(0) + program_desc = core.ProgramDesc() + self.assertIsNotNone(program_desc) + block_root = program_desc.block(0) self.assertIsNotNone(block_root) self.assertEqual(block_root.id, 0) - block1 = prog_desc.append_block(block_root) - block2 = prog_desc.append_block(block1) + block1 = program_desc.append_block(block_root) + block2 = program_desc.append_block(block1) self.assertIsNotNone(block1) self.assertEqual(block1.id, block2.parent) self.assertEqual(block_root.id, block1.parent) - block3 = prog_desc.append_block(block_root) + block3 = program_desc.append_block(block_root) self.assertEqual(block3.parent, block_root.id) - self.assertEqual(prog_desc.block(1).id, 1) - self.assertEqual(4, prog_desc.num_blocks()) + self.assertEqual(program_desc.block(1).id, 1) + self.assertEqual(4, program_desc.num_blocks()) class TestVarDesc(unittest.TestCase): @@ -162,9 +162,9 @@ class TestVarDesc(unittest.TestCase): class TestBlockDesc(unittest.TestCase): def test_add_var(self): - prog = core.ProgramDesc() - self.assertIsNotNone(prog) - block = prog.block(0) + program_desc = core.ProgramDesc() + self.assertIsNotNone(program_desc) + block = program_desc.block(0) self.assertIsNotNone(block) var1 = block.var("var1") var2 = block.var("var2") @@ -175,9 +175,9 @@ class TestBlockDesc(unittest.TestCase): self.assertEqual(var2_re, var2) def test_add_op(self): - prog = core.ProgramDesc() - self.assertIsNotNone(prog) - block = prog.block(0) + program_desc = core.ProgramDesc() + self.assertIsNotNone(program_desc) + block = program_desc.block(0) self.assertIsNotNone(block) op1 = block.append_op() op2 = block.append_op() @@ -189,9 +189,9 @@ class TestBlockDesc(unittest.TestCase): def test_remove_op(self): program = Program() - prog = program.desc - self.assertIsNotNone(prog) - block = prog.block(0) + program_desc = program.desc + self.assertIsNotNone(program_desc) + block = program_desc.block(0) self.assertIsNotNone(block) op0 = block.append_op()