提交 9f488783 编写于 作者: Q qiaolongfei

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into change-mklml-download-url

...@@ -17,6 +17,7 @@ limitations under the License. */ ...@@ -17,6 +17,7 @@ limitations under the License. */
#include <deque> #include <deque>
#include <memory> #include <memory>
#include <set> #include <set>
#include <string>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
...@@ -96,6 +97,8 @@ class BlockDesc { ...@@ -96,6 +97,8 @@ class BlockDesc {
*/ */
void RemoveOp(size_t s, size_t e); void RemoveOp(size_t s, size_t e);
void RemoveVar(const std::string &name) { vars_.erase(name); }
std::vector<OpDesc *> AllOps() const; std::vector<OpDesc *> AllOps() const;
size_t OpSize() const { return ops_.size(); } size_t OpSize() const { return ops_.size(); }
......
...@@ -14,8 +14,8 @@ limitations under the License. */ ...@@ -14,8 +14,8 @@ limitations under the License. */
#pragma once #pragma once
#include <stddef.h> // for size_t #include <stddef.h> // for size_t
#include <condition_variable> #include <condition_variable> // NOLINT
#include <typeindex> #include <typeindex>
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
...@@ -216,7 +216,8 @@ class ChannelHolder { ...@@ -216,7 +216,8 @@ class ChannelHolder {
template <typename T> template <typename T>
struct PlaceholderImpl : public Placeholder { struct PlaceholderImpl : public Placeholder {
PlaceholderImpl(size_t buffer_size) : type_(std::type_index(typeid(T))) { explicit PlaceholderImpl(size_t buffer_size)
: type_(std::type_index(typeid(T))) {
channel_.reset(MakeChannel<T>(buffer_size)); channel_.reset(MakeChannel<T>(buffer_size));
} }
......
...@@ -15,7 +15,7 @@ limitations under the License. */ ...@@ -15,7 +15,7 @@ limitations under the License. */
#pragma once #pragma once
#include <stddef.h> // for size_t #include <stddef.h> // for size_t
#include <atomic> #include <atomic>
#include <condition_variable> #include <condition_variable> // NOLINT
#include <deque> #include <deque>
#include "paddle/fluid/framework/channel.h" #include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
...@@ -38,7 +38,7 @@ class ChannelImpl : public paddle::framework::Channel<T> { ...@@ -38,7 +38,7 @@ class ChannelImpl : public paddle::framework::Channel<T> {
virtual void Unlock(); virtual void Unlock();
virtual bool IsClosed(); virtual bool IsClosed();
virtual void Close(); virtual void Close();
ChannelImpl(size_t); explicit ChannelImpl(size_t);
virtual ~ChannelImpl(); virtual ~ChannelImpl();
virtual void AddToSendQ(const void *referrer, T *data, virtual void AddToSendQ(const void *referrer, T *data,
...@@ -60,7 +60,7 @@ class ChannelImpl : public paddle::framework::Channel<T> { ...@@ -60,7 +60,7 @@ class ChannelImpl : public paddle::framework::Channel<T> {
const void *referrer; // TODO(thuan): figure out better way to do this const void *referrer; // TODO(thuan): figure out better way to do this
std::function<bool(ChannelAction)> callback; std::function<bool(ChannelAction)> callback;
QueueMessage(T *item) explicit QueueMessage(T *item)
: data(item), cond(std::make_shared<std::condition_variable_any>()) {} : data(item), cond(std::make_shared<std::condition_variable_any>()) {}
QueueMessage(T *item, std::shared_ptr<std::condition_variable_any> cond) QueueMessage(T *item, std::shared_ptr<std::condition_variable_any> cond)
...@@ -88,15 +88,15 @@ class ChannelImpl : public paddle::framework::Channel<T> { ...@@ -88,15 +88,15 @@ class ChannelImpl : public paddle::framework::Channel<T> {
} }
std::shared_ptr<QueueMessage> get_first_message( std::shared_ptr<QueueMessage> get_first_message(
std::deque<std::shared_ptr<QueueMessage>> &queue, ChannelAction action) { std::deque<std::shared_ptr<QueueMessage>> *queue, ChannelAction action) {
while (!queue.empty()) { while (!queue->empty()) {
// Check whether this message was added by Select // Check whether this message was added by Select
// If this was added by Select then execute the callback // If this was added by Select then execute the callback
// to check if you can execute this message. The callback // to check if you can execute this message. The callback
// can return false if some other case was executed in Select. // can return false if some other case was executed in Select.
// In that case just discard this QueueMessage and process next. // In that case just discard this QueueMessage and process next.
std::shared_ptr<QueueMessage> m = queue.front(); std::shared_ptr<QueueMessage> m = queue->front();
queue.pop_front(); queue->pop_front();
if (m->callback == nullptr || m->callback(action)) return m; if (m->callback == nullptr || m->callback(action)) return m;
} }
return nullptr; return nullptr;
...@@ -147,7 +147,7 @@ void ChannelImpl<T>::Send(T *item) { ...@@ -147,7 +147,7 @@ void ChannelImpl<T>::Send(T *item) {
// to send to the receiver, bypassing the channel buffer if any // to send to the receiver, bypassing the channel buffer if any
if (!recvq.empty()) { if (!recvq.empty()) {
std::shared_ptr<QueueMessage> m = std::shared_ptr<QueueMessage> m =
get_first_message(recvq, ChannelAction::SEND); get_first_message(&recvq, ChannelAction::SEND);
if (m != nullptr) { if (m != nullptr) {
*(m->data) = std::move(*item); *(m->data) = std::move(*item);
...@@ -198,7 +198,7 @@ bool ChannelImpl<T>::Receive(T *item) { ...@@ -198,7 +198,7 @@ bool ChannelImpl<T>::Receive(T *item) {
// buffer and move front of send queue to the buffer // buffer and move front of send queue to the buffer
if (!sendq.empty()) { if (!sendq.empty()) {
std::shared_ptr<QueueMessage> m = std::shared_ptr<QueueMessage> m =
get_first_message(sendq, ChannelAction::RECEIVE); get_first_message(&sendq, ChannelAction::RECEIVE);
if (buf_.size() > 0) { if (buf_.size() > 0) {
// Case 1 : Channel is Buffered // Case 1 : Channel is Buffered
// Do Data transfer from front of buffer // Do Data transfer from front of buffer
...@@ -219,8 +219,9 @@ bool ChannelImpl<T>::Receive(T *item) { ...@@ -219,8 +219,9 @@ bool ChannelImpl<T>::Receive(T *item) {
if (m != nullptr) { if (m != nullptr) {
*item = std::move(*(m->data)); *item = std::move(*(m->data));
m->Notify(); m->Notify();
} else } else {
return recv_return(Receive(item)); return recv_return(Receive(item));
}
} }
return recv_return(true); return recv_return(true);
} }
......
...@@ -14,8 +14,8 @@ limitations under the License. */ ...@@ -14,8 +14,8 @@ limitations under the License. */
#include "paddle/fluid/framework/channel.h" #include "paddle/fluid/framework/channel.h"
#include <chrono> #include <chrono> // NOLINT
#include <thread> #include <thread> // NOLINT
#include "gtest/gtest.h" #include "gtest/gtest.h"
using paddle::framework::Channel; using paddle::framework::Channel;
...@@ -166,9 +166,9 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) { ...@@ -166,9 +166,9 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) {
std::thread t([&]() { std::thread t([&]() {
// Try to write more than buffer size. // Try to write more than buffer size.
for (size_t i = 0; i < 2 * buffer_size; ++i) { for (size_t i = 0; i < 2 * buffer_size; ++i) {
if (i < buffer_size) if (i < buffer_size) {
ch->Send(&i); // should block after 10 iterations ch->Send(&i); // should block after 10 iterations
else { } else {
bool is_exception = false; bool is_exception = false;
try { try {
ch->Send(&i); ch->Send(&i);
...@@ -212,12 +212,12 @@ TEST(Channel, RecevingOrderEqualToSendingOrderWithBufferedChannel3) { ...@@ -212,12 +212,12 @@ TEST(Channel, RecevingOrderEqualToSendingOrderWithBufferedChannel3) {
} }
void ChannelCloseUnblocksReceiversTest(Channel<int> *ch) { void ChannelCloseUnblocksReceiversTest(Channel<int> *ch) {
size_t num_threads = 5; const size_t kNumThreads = 5;
std::thread t[num_threads]; std::thread t[kNumThreads];
bool thread_ended[num_threads]; bool thread_ended[kNumThreads];
// Launches threads that try to read and are blocked because of no writers // Launches threads that try to read and are blocked because of no writers
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
thread_ended[i] = false; thread_ended[i] = false;
t[i] = std::thread( t[i] = std::thread(
[&](bool *p) { [&](bool *p) {
...@@ -230,7 +230,7 @@ void ChannelCloseUnblocksReceiversTest(Channel<int> *ch) { ...@@ -230,7 +230,7 @@ void ChannelCloseUnblocksReceiversTest(Channel<int> *ch) {
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec
// Verify that all the threads are blocked // Verify that all the threads are blocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
EXPECT_EQ(thread_ended[i], false); EXPECT_EQ(thread_ended[i], false);
} }
...@@ -241,21 +241,21 @@ void ChannelCloseUnblocksReceiversTest(Channel<int> *ch) { ...@@ -241,21 +241,21 @@ void ChannelCloseUnblocksReceiversTest(Channel<int> *ch) {
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec
// Verify that all threads got unblocked // Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
EXPECT_EQ(thread_ended[i], true); EXPECT_EQ(thread_ended[i], true);
} }
for (size_t i = 0; i < num_threads; i++) t[i].join(); for (size_t i = 0; i < kNumThreads; i++) t[i].join();
} }
void ChannelCloseUnblocksSendersTest(Channel<int> *ch, bool isBuffered) { void ChannelCloseUnblocksSendersTest(Channel<int> *ch, bool isBuffered) {
size_t num_threads = 5; const size_t kNumThreads = 5;
std::thread t[num_threads]; std::thread t[kNumThreads];
bool thread_ended[num_threads]; bool thread_ended[kNumThreads];
bool send_success[num_threads]; bool send_success[kNumThreads];
// Launches threads that try to write and are blocked because of no readers // Launches threads that try to write and are blocked because of no readers
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
thread_ended[i] = false; thread_ended[i] = false;
send_success[i] = false; send_success[i] = false;
t[i] = std::thread( t[i] = std::thread(
...@@ -277,13 +277,13 @@ void ChannelCloseUnblocksSendersTest(Channel<int> *ch, bool isBuffered) { ...@@ -277,13 +277,13 @@ void ChannelCloseUnblocksSendersTest(Channel<int> *ch, bool isBuffered) {
if (isBuffered) { if (isBuffered) {
// If ch is Buffered, atleast 4 threads must be blocked. // If ch is Buffered, atleast 4 threads must be blocked.
int ct = 0; int ct = 0;
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
if (!thread_ended[i]) ct++; if (!thread_ended[i]) ct++;
} }
EXPECT_GE(ct, 4); EXPECT_GE(ct, 4);
} else { } else {
// If ch is UnBuffered, all the threads should be blocked. // If ch is UnBuffered, all the threads should be blocked.
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
EXPECT_EQ(thread_ended[i], false); EXPECT_EQ(thread_ended[i], false);
} }
} }
...@@ -294,21 +294,21 @@ void ChannelCloseUnblocksSendersTest(Channel<int> *ch, bool isBuffered) { ...@@ -294,21 +294,21 @@ void ChannelCloseUnblocksSendersTest(Channel<int> *ch, bool isBuffered) {
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
// Verify that all threads got unblocked // Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
EXPECT_EQ(thread_ended[i], true); EXPECT_EQ(thread_ended[i], true);
} }
if (isBuffered) { if (isBuffered) {
// Verify that only 1 send was successful // Verify that only 1 send was successful
int ct = 0; int ct = 0;
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
if (send_success[i]) ct++; if (send_success[i]) ct++;
} }
// Only 1 send must be successful // Only 1 send must be successful
EXPECT_EQ(ct, 1); EXPECT_EQ(ct, 1);
} }
for (size_t i = 0; i < num_threads; i++) t[i].join(); for (size_t i = 0; i < kNumThreads; i++) t[i].join();
} }
// This tests that closing a buffered channel also unblocks // This tests that closing a buffered channel also unblocks
...@@ -409,13 +409,13 @@ TEST(Channel, UnbufferedMoreReceiveLessSendTest) { ...@@ -409,13 +409,13 @@ TEST(Channel, UnbufferedMoreReceiveLessSendTest) {
// This tests that destroying a channel unblocks // This tests that destroying a channel unblocks
// any senders waiting for channel to have write space // any senders waiting for channel to have write space
void ChannelDestroyUnblockSenders(Channel<int> *ch, bool isBuffered) { void ChannelDestroyUnblockSenders(Channel<int> *ch, bool isBuffered) {
size_t num_threads = 5; const size_t kNumThreads = 5;
std::thread t[num_threads]; std::thread t[kNumThreads];
bool thread_ended[num_threads]; bool thread_ended[kNumThreads];
bool send_success[num_threads]; bool send_success[kNumThreads];
// Launches threads that try to write and are blocked because of no readers // Launches threads that try to write and are blocked because of no readers
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
thread_ended[i] = false; thread_ended[i] = false;
send_success[i] = false; send_success[i] = false;
t[i] = std::thread( t[i] = std::thread(
...@@ -438,14 +438,14 @@ void ChannelDestroyUnblockSenders(Channel<int> *ch, bool isBuffered) { ...@@ -438,14 +438,14 @@ void ChannelDestroyUnblockSenders(Channel<int> *ch, bool isBuffered) {
if (isBuffered) { if (isBuffered) {
// If channel is buffered, verify that atleast 4 threads are blocked // If channel is buffered, verify that atleast 4 threads are blocked
int ct = 0; int ct = 0;
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
if (thread_ended[i] == false) ct++; if (thread_ended[i] == false) ct++;
} }
// Atleast 4 threads must be blocked // Atleast 4 threads must be blocked
EXPECT_GE(ct, 4); EXPECT_GE(ct, 4);
} else { } else {
// Verify that all the threads are blocked // Verify that all the threads are blocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
EXPECT_EQ(thread_ended[i], false); EXPECT_EQ(thread_ended[i], false);
} }
} }
...@@ -454,13 +454,13 @@ void ChannelDestroyUnblockSenders(Channel<int> *ch, bool isBuffered) { ...@@ -454,13 +454,13 @@ void ChannelDestroyUnblockSenders(Channel<int> *ch, bool isBuffered) {
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
// Verify that all threads got unblocked // Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
EXPECT_EQ(thread_ended[i], true); EXPECT_EQ(thread_ended[i], true);
} }
// Count number of successful sends // Count number of successful sends
int ct = 0; int ct = 0;
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
if (send_success[i]) ct++; if (send_success[i]) ct++;
} }
...@@ -473,18 +473,18 @@ void ChannelDestroyUnblockSenders(Channel<int> *ch, bool isBuffered) { ...@@ -473,18 +473,18 @@ void ChannelDestroyUnblockSenders(Channel<int> *ch, bool isBuffered) {
} }
// Join all threads // Join all threads
for (size_t i = 0; i < num_threads; i++) t[i].join(); for (size_t i = 0; i < kNumThreads; i++) t[i].join();
} }
// This tests that destroying a channel also unblocks // This tests that destroying a channel also unblocks
// any receivers waiting on the channel // any receivers waiting on the channel
void ChannelDestroyUnblockReceivers(Channel<int> *ch) { void ChannelDestroyUnblockReceivers(Channel<int> *ch) {
size_t num_threads = 5; const size_t kNumThreads = 5;
std::thread t[num_threads]; std::thread t[kNumThreads];
bool thread_ended[num_threads]; bool thread_ended[kNumThreads];
// Launches threads that try to read and are blocked because of no writers // Launches threads that try to read and are blocked because of no writers
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
thread_ended[i] = false; thread_ended[i] = false;
t[i] = std::thread( t[i] = std::thread(
[&](bool *p) { [&](bool *p) {
...@@ -498,18 +498,18 @@ void ChannelDestroyUnblockReceivers(Channel<int> *ch) { ...@@ -498,18 +498,18 @@ void ChannelDestroyUnblockReceivers(Channel<int> *ch) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait
// Verify that all threads are blocked // Verify that all threads are blocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
EXPECT_EQ(thread_ended[i], false); EXPECT_EQ(thread_ended[i], false);
} }
// delete the channel // delete the channel
delete ch; delete ch;
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
// Verify that all threads got unblocked // Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
EXPECT_EQ(thread_ended[i], true); EXPECT_EQ(thread_ended[i], true);
} }
for (size_t i = 0; i < num_threads; i++) t[i].join(); for (size_t i = 0; i < kNumThreads; i++) t[i].join();
} }
TEST(Channel, BufferedChannelDestroyUnblocksReceiversTest) { TEST(Channel, BufferedChannelDestroyUnblocksReceiversTest) {
...@@ -679,12 +679,12 @@ TEST(ChannelHolder, TypeMismatchReceiveTest) { ...@@ -679,12 +679,12 @@ TEST(ChannelHolder, TypeMismatchReceiveTest) {
} }
void ChannelHolderCloseUnblocksReceiversTest(ChannelHolder *ch) { void ChannelHolderCloseUnblocksReceiversTest(ChannelHolder *ch) {
size_t num_threads = 5; const size_t kNumThreads = 5;
std::thread t[num_threads]; std::thread t[kNumThreads];
bool thread_ended[num_threads]; bool thread_ended[kNumThreads];
// Launches threads that try to read and are blocked because of no writers // Launches threads that try to read and are blocked because of no writers
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
thread_ended[i] = false; thread_ended[i] = false;
t[i] = std::thread( t[i] = std::thread(
[&](bool *p) { [&](bool *p) {
...@@ -697,7 +697,7 @@ void ChannelHolderCloseUnblocksReceiversTest(ChannelHolder *ch) { ...@@ -697,7 +697,7 @@ void ChannelHolderCloseUnblocksReceiversTest(ChannelHolder *ch) {
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec
// Verify that all the threads are blocked // Verify that all the threads are blocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
EXPECT_EQ(thread_ended[i], false); EXPECT_EQ(thread_ended[i], false);
} }
...@@ -708,21 +708,21 @@ void ChannelHolderCloseUnblocksReceiversTest(ChannelHolder *ch) { ...@@ -708,21 +708,21 @@ void ChannelHolderCloseUnblocksReceiversTest(ChannelHolder *ch) {
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec
// Verify that all threads got unblocked // Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
EXPECT_EQ(thread_ended[i], true); EXPECT_EQ(thread_ended[i], true);
} }
for (size_t i = 0; i < num_threads; i++) t[i].join(); for (size_t i = 0; i < kNumThreads; i++) t[i].join();
} }
void ChannelHolderCloseUnblocksSendersTest(ChannelHolder *ch, bool isBuffered) { void ChannelHolderCloseUnblocksSendersTest(ChannelHolder *ch, bool isBuffered) {
size_t num_threads = 5; const size_t kNumThreads = 5;
std::thread t[num_threads]; std::thread t[kNumThreads];
bool thread_ended[num_threads]; bool thread_ended[kNumThreads];
bool send_success[num_threads]; bool send_success[kNumThreads];
// Launches threads that try to write and are blocked because of no readers // Launches threads that try to write and are blocked because of no readers
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
thread_ended[i] = false; thread_ended[i] = false;
send_success[i] = false; send_success[i] = false;
t[i] = std::thread( t[i] = std::thread(
...@@ -744,13 +744,13 @@ void ChannelHolderCloseUnblocksSendersTest(ChannelHolder *ch, bool isBuffered) { ...@@ -744,13 +744,13 @@ void ChannelHolderCloseUnblocksSendersTest(ChannelHolder *ch, bool isBuffered) {
if (isBuffered) { if (isBuffered) {
// If ch is Buffered, atleast 4 threads must be blocked. // If ch is Buffered, atleast 4 threads must be blocked.
int ct = 0; int ct = 0;
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
if (!thread_ended[i]) ct++; if (!thread_ended[i]) ct++;
} }
EXPECT_GE(ct, 4); EXPECT_GE(ct, 4);
} else { } else {
// If ch is UnBuffered, all the threads should be blocked. // If ch is UnBuffered, all the threads should be blocked.
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
EXPECT_EQ(thread_ended[i], false); EXPECT_EQ(thread_ended[i], false);
} }
} }
...@@ -761,21 +761,21 @@ void ChannelHolderCloseUnblocksSendersTest(ChannelHolder *ch, bool isBuffered) { ...@@ -761,21 +761,21 @@ void ChannelHolderCloseUnblocksSendersTest(ChannelHolder *ch, bool isBuffered) {
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
// Verify that all threads got unblocked // Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
EXPECT_EQ(thread_ended[i], true); EXPECT_EQ(thread_ended[i], true);
} }
if (isBuffered) { if (isBuffered) {
// Verify that only 1 send was successful // Verify that only 1 send was successful
int ct = 0; int ct = 0;
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
if (send_success[i]) ct++; if (send_success[i]) ct++;
} }
// Only 1 send must be successful // Only 1 send must be successful
EXPECT_EQ(ct, 1); EXPECT_EQ(ct, 1);
} }
for (size_t i = 0; i < num_threads; i++) t[i].join(); for (size_t i = 0; i < kNumThreads; i++) t[i].join();
} }
// This tests that closing a channelholder unblocks // This tests that closing a channelholder unblocks
...@@ -813,13 +813,13 @@ TEST(Channel, ChannelHolderCloseUnblocksSendersTest) { ...@@ -813,13 +813,13 @@ TEST(Channel, ChannelHolderCloseUnblocksSendersTest) {
// This tests that destroying a channelholder unblocks // This tests that destroying a channelholder unblocks
// any senders waiting for channel // any senders waiting for channel
void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) { void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) {
size_t num_threads = 5; const size_t kNumThreads = 5;
std::thread t[num_threads]; std::thread t[kNumThreads];
bool thread_ended[num_threads]; bool thread_ended[kNumThreads];
bool send_success[num_threads]; bool send_success[kNumThreads];
// Launches threads that try to write and are blocked because of no readers // Launches threads that try to write and are blocked because of no readers
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
thread_ended[i] = false; thread_ended[i] = false;
send_success[i] = false; send_success[i] = false;
t[i] = std::thread( t[i] = std::thread(
...@@ -841,14 +841,14 @@ void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) { ...@@ -841,14 +841,14 @@ void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) {
if (isBuffered) { if (isBuffered) {
// If channel is buffered, verify that atleast 4 threads are blocked // If channel is buffered, verify that atleast 4 threads are blocked
int ct = 0; int ct = 0;
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
if (thread_ended[i] == false) ct++; if (thread_ended[i] == false) ct++;
} }
// Atleast 4 threads must be blocked // Atleast 4 threads must be blocked
EXPECT_GE(ct, 4); EXPECT_GE(ct, 4);
} else { } else {
// Verify that all the threads are blocked // Verify that all the threads are blocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
EXPECT_EQ(thread_ended[i], false); EXPECT_EQ(thread_ended[i], false);
} }
} }
...@@ -857,13 +857,13 @@ void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) { ...@@ -857,13 +857,13 @@ void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) {
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
// Verify that all threads got unblocked // Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
EXPECT_EQ(thread_ended[i], true); EXPECT_EQ(thread_ended[i], true);
} }
// Count number of successfuld sends // Count number of successfuld sends
int ct = 0; int ct = 0;
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
if (send_success[i]) ct++; if (send_success[i]) ct++;
} }
...@@ -876,18 +876,18 @@ void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) { ...@@ -876,18 +876,18 @@ void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) {
} }
// Join all threads // Join all threads
for (size_t i = 0; i < num_threads; i++) t[i].join(); for (size_t i = 0; i < kNumThreads; i++) t[i].join();
} }
// This tests that destroying a channelholder also unblocks // This tests that destroying a channelholder also unblocks
// any receivers waiting on the channel // any receivers waiting on the channel
void ChannelHolderDestroyUnblockReceivers(ChannelHolder *ch) { void ChannelHolderDestroyUnblockReceivers(ChannelHolder *ch) {
size_t num_threads = 5; const size_t kNumThreads = 5;
std::thread t[num_threads]; std::thread t[kNumThreads];
bool thread_ended[num_threads]; bool thread_ended[kNumThreads];
// Launches threads that try to read and are blocked because of no writers // Launches threads that try to read and are blocked because of no writers
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
thread_ended[i] = false; thread_ended[i] = false;
t[i] = std::thread( t[i] = std::thread(
[&](bool *p) { [&](bool *p) {
...@@ -901,18 +901,18 @@ void ChannelHolderDestroyUnblockReceivers(ChannelHolder *ch) { ...@@ -901,18 +901,18 @@ void ChannelHolderDestroyUnblockReceivers(ChannelHolder *ch) {
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
// Verify that all threads are blocked // Verify that all threads are blocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
EXPECT_EQ(thread_ended[i], false); EXPECT_EQ(thread_ended[i], false);
} }
// delete the channel // delete the channel
delete ch; delete ch;
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
// Verify that all threads got unblocked // Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
EXPECT_EQ(thread_ended[i], true); EXPECT_EQ(thread_ended[i], true);
} }
for (size_t i = 0; i < num_threads; i++) t[i].join(); for (size_t i = 0; i < kNumThreads; i++) t[i].join();
} }
TEST(ChannelHolder, ChannelHolderDestroyUnblocksReceiversTest) { TEST(ChannelHolder, ChannelHolderDestroyUnblocksReceiversTest) {
...@@ -945,12 +945,12 @@ TEST(ChannelHolder, ChannelHolderDestroyUnblocksSendersTest) { ...@@ -945,12 +945,12 @@ TEST(ChannelHolder, ChannelHolderDestroyUnblocksSendersTest) {
// This tests that closing a channelholder many times. // This tests that closing a channelholder many times.
void ChannelHolderManyTimesClose(ChannelHolder *ch) { void ChannelHolderManyTimesClose(ChannelHolder *ch) {
const int num_threads = 15; const int kNumThreads = 15;
std::thread t[num_threads]; std::thread t[kNumThreads];
bool thread_ended[num_threads]; bool thread_ended[kNumThreads];
// Launches threads that try to send data to channel. // Launches threads that try to send data to channel.
for (size_t i = 0; i < num_threads / 3; i++) { for (size_t i = 0; i < kNumThreads / 3; i++) {
thread_ended[i] = false; thread_ended[i] = false;
t[i] = std::thread( t[i] = std::thread(
[&](bool *ended) { [&](bool *ended) {
...@@ -962,7 +962,7 @@ void ChannelHolderManyTimesClose(ChannelHolder *ch) { ...@@ -962,7 +962,7 @@ void ChannelHolderManyTimesClose(ChannelHolder *ch) {
} }
// Launches threads that try to receive data to channel. // Launches threads that try to receive data to channel.
for (size_t i = num_threads / 3; i < 2 * num_threads / 3; i++) { for (size_t i = kNumThreads / 3; i < 2 * kNumThreads / 3; i++) {
thread_ended[i] = false; thread_ended[i] = false;
t[i] = std::thread( t[i] = std::thread(
[&](bool *p) { [&](bool *p) {
...@@ -976,7 +976,7 @@ void ChannelHolderManyTimesClose(ChannelHolder *ch) { ...@@ -976,7 +976,7 @@ void ChannelHolderManyTimesClose(ChannelHolder *ch) {
} }
// Launches threads that try to close the channel. // Launches threads that try to close the channel.
for (size_t i = 2 * num_threads / 3; i < num_threads; i++) { for (size_t i = 2 * kNumThreads / 3; i < kNumThreads; i++) {
thread_ended[i] = false; thread_ended[i] = false;
t[i] = std::thread( t[i] = std::thread(
[&](bool *p) { [&](bool *p) {
...@@ -991,13 +991,13 @@ void ChannelHolderManyTimesClose(ChannelHolder *ch) { ...@@ -991,13 +991,13 @@ void ChannelHolderManyTimesClose(ChannelHolder *ch) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait
// Verify that all threads are unblocked // Verify that all threads are unblocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < kNumThreads; i++) {
EXPECT_EQ(thread_ended[i], true); EXPECT_EQ(thread_ended[i], true);
} }
EXPECT_TRUE(ch->IsClosed()); EXPECT_TRUE(ch->IsClosed());
// delete the channel // delete the channel
delete ch; delete ch;
for (size_t i = 0; i < num_threads; i++) t[i].join(); for (size_t i = 0; i < kNumThreads; i++) t[i].join();
} }
TEST(ChannelHolder, ChannelHolderManyTimesCloseTest) { TEST(ChannelHolder, ChannelHolderManyTimesCloseTest) {
......
...@@ -142,6 +142,7 @@ class LoDTensor : public Tensor { ...@@ -142,6 +142,7 @@ class LoDTensor : public Tensor {
return (lod_)[level].size() - 1; return (lod_)[level].size() - 1;
} }
// Split LoDTensor and copy to each place specified in places.
std::vector<LoDTensor> SplitLoDTensor( std::vector<LoDTensor> SplitLoDTensor(
const std::vector<platform::Place> places) const; const std::vector<platform::Place> places) const;
......
...@@ -150,13 +150,30 @@ void ParallelExecutor::BCastParamsToGPUs( ...@@ -150,13 +150,30 @@ void ParallelExecutor::BCastParamsToGPUs(
#endif #endif
} }
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors, void ParallelExecutor::Run(
const std::string &fetched_var_name) { const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name,
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
platform::RecordBlock b(0); platform::RecordBlock b(0);
SplitTensorToPlaces(feed_tensors);
auto fetch_data = member_->executor_->Run(fetch_tensors); auto fetch_data = member_->executor_->Run(fetch_tensors);
*member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() = *member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
fetch_data; fetch_data;
} }
void ParallelExecutor::SplitTensorToPlaces(
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
for (auto it : feed_tensors) {
auto lod_tensors = it.second.SplitLoDTensor(member_->places_);
for (size_t j = 0; j < member_->places_.size(); ++j) {
// TODO(panxy0718): Do I need to delete this var?
member_->local_scopes_[j]
->Var(it.first)
->GetMutable<LoDTensor>()
->ShareDataWith(lod_tensors[j]);
}
}
}
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -42,9 +42,13 @@ class ParallelExecutor { ...@@ -42,9 +42,13 @@ class ParallelExecutor {
bool allow_op_delay); bool allow_op_delay);
void Run(const std::vector<std::string>& fetch_tensors, void Run(const std::vector<std::string>& fetch_tensors,
const std::string& fetched_var_name = "fetched_var"); const std::string& fetched_var_name,
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
private: private:
void SplitTensorToPlaces(
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
ParallelExecutorPrivate* member_; ParallelExecutorPrivate* member_;
void BCastParamsToGPUs(const ProgramDesc& startup_program) const; void BCastParamsToGPUs(const ProgramDesc& startup_program) const;
......
...@@ -128,10 +128,32 @@ class CUDNNConvOpKernel : public framework::OpKernel<T> { ...@@ -128,10 +128,32 @@ class CUDNNConvOpKernel : public framework::OpKernel<T> {
handle, cudnn_input_desc, cudnn_filter_desc, cudnn_conv_desc, handle, cudnn_input_desc, cudnn_filter_desc, cudnn_conv_desc,
cudnn_output_desc, CUDNN_CONVOLUTION_FWD_SPECIFY_WORKSPACE_LIMIT, cudnn_output_desc, CUDNN_CONVOLUTION_FWD_SPECIFY_WORKSPACE_LIMIT,
workspace_size_limit, &algo)); workspace_size_limit, &algo));
#if CUDA_VERSION >= 9000 && CUDNN_VERSION_MIN(7, 0, 1)
// Tensor core is supported since the volta GPU and
// is only enabled when input and filter data are float16
if (dev_ctx.GetComputeCapability() >= 70 &&
std::type_index(typeid(T)) ==
std::type_index(typeid(platform::float16))) {
PADDLE_ENFORCE(platform::dynload::cudnnSetConvolutionMathType(
cudnn_conv_desc, CUDNN_TENSOR_OP_MATH));
// Currently tensor core is only enabled using this algo
algo = CUDNN_CONVOLUTION_FWD_ALGO_IMPLICIT_PRECOMP_GEMM;
} else {
PADDLE_ENFORCE(platform::dynload::cudnnSetConvolutionMathType(
cudnn_conv_desc, CUDNN_DEFAULT_MATH));
}
#endif
// get workspace size able to allocate // get workspace size able to allocate
PADDLE_ENFORCE(platform::dynload::cudnnGetConvolutionForwardWorkspaceSize( PADDLE_ENFORCE(platform::dynload::cudnnGetConvolutionForwardWorkspaceSize(
handle, cudnn_input_desc, cudnn_filter_desc, cudnn_conv_desc, handle, cudnn_input_desc, cudnn_filter_desc, cudnn_conv_desc,
cudnn_output_desc, algo, &workspace_size_in_bytes)); cudnn_output_desc, algo, &workspace_size_in_bytes));
// It is possible for float16 on Volta GPU to allocate more memory than
// the limit because the algo is overrided to use tensor core.
PADDLE_ENFORCE_LE(workspace_size_in_bytes, workspace_size_limit,
"workspace_size to be allocated exceeds the limit");
// Allocate on GPU memory // Allocate on GPU memory
platform::CUDAPlace gpu = boost::get<platform::CUDAPlace>(ctx.GetPlace()); platform::CUDAPlace gpu = boost::get<platform::CUDAPlace>(ctx.GetPlace());
cudnn_workspace = paddle::memory::Alloc(gpu, workspace_size_in_bytes); cudnn_workspace = paddle::memory::Alloc(gpu, workspace_size_in_bytes);
......
...@@ -27,8 +27,8 @@ template <typename T> ...@@ -27,8 +27,8 @@ template <typename T>
class MKLDNNMD { class MKLDNNMD {
public: public:
explicit MKLDNNMD(const T* in, const T* w, bool bias) explicit MKLDNNMD(const T* in, const T* w, bool bias)
: in{paddle::framework::vectorize2int(in->dims())}, : in(paddle::framework::vectorize2int(in->dims())),
w{paddle::framework::vectorize2int(w->dims())} { w(paddle::framework::vectorize2int(w->dims())) {
with_bias_ = bias; with_bias_ = bias;
} }
...@@ -78,7 +78,7 @@ class MKLDNNMD { ...@@ -78,7 +78,7 @@ class MKLDNNMD {
class MKLDNNMemory { class MKLDNNMemory {
public: public:
MKLDNNMemory(MKLDNNMD<Tensor>* t, const mkldnn::engine& e) MKLDNNMemory(MKLDNNMD<Tensor>* t, const mkldnn::engine& e)
: md_{t}, engine_{e} {} : md_(t), engine_(e) {}
virtual ~MKLDNNMemory() = default; virtual ~MKLDNNMemory() = default;
template <typename Output> template <typename Output>
......
...@@ -257,9 +257,11 @@ class ScopedConvolutionDescriptor { ...@@ -257,9 +257,11 @@ class ScopedConvolutionDescriptor {
} }
#endif #endif
cudnnDataType_t compute_type =
(type == CUDNN_DATA_DOUBLE) ? CUDNN_DATA_DOUBLE : CUDNN_DATA_FLOAT;
PADDLE_ENFORCE(dynload::cudnnSetConvolutionNdDescriptor( PADDLE_ENFORCE(dynload::cudnnSetConvolutionNdDescriptor(
desc_, pads.size(), pads.data(), strides.data(), dilations.data(), desc_, pads.size(), pads.data(), strides.data(), dilations.data(),
CUDNN_CROSS_CORRELATION, type)); CUDNN_CROSS_CORRELATION, compute_type));
return desc_; return desc_;
} }
......
...@@ -16,7 +16,7 @@ limitations under the License. */ ...@@ -16,7 +16,7 @@ limitations under the License. */
#include <cudnn.h> #include <cudnn.h>
#include <dlfcn.h> #include <dlfcn.h>
#include <mutex> #include <mutex> // NOLINT
#include "paddle/fluid/platform/dynload/dynamic_loader.h" #include "paddle/fluid/platform/dynload/dynamic_loader.h"
namespace paddle { namespace paddle {
...@@ -140,7 +140,8 @@ CUDNN_DNN_ROUTINE_EACH_R5(DECLARE_DYNAMIC_LOAD_CUDNN_WRAP) ...@@ -140,7 +140,8 @@ CUDNN_DNN_ROUTINE_EACH_R5(DECLARE_DYNAMIC_LOAD_CUDNN_WRAP)
#if CUDNN_VERSION >= 7001 #if CUDNN_VERSION >= 7001
#define CUDNN_DNN_ROUTINE_EACH_R7(__macro) \ #define CUDNN_DNN_ROUTINE_EACH_R7(__macro) \
__macro(cudnnSetConvolutionGroupCount); __macro(cudnnSetConvolutionGroupCount); \
__macro(cudnnSetConvolutionMathType);
CUDNN_DNN_ROUTINE_EACH_R7(DECLARE_DYNAMIC_LOAD_CUDNN_WRAP) CUDNN_DNN_ROUTINE_EACH_R7(DECLARE_DYNAMIC_LOAD_CUDNN_WRAP)
#endif #endif
......
...@@ -15,6 +15,8 @@ limitations under the License. */ ...@@ -15,6 +15,8 @@ limitations under the License. */
#include "paddle/fluid/pybind/protobuf.h" #include "paddle/fluid/pybind/protobuf.h"
#include <deque> #include <deque>
#include <iostream> #include <iostream>
#include <string>
#include <tuple>
#include "paddle/fluid/framework/backward.h" #include "paddle/fluid/framework/backward.h"
#include "paddle/fluid/framework/block_desc.h" #include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/framework/op_desc.h" #include "paddle/fluid/framework/op_desc.h"
...@@ -98,7 +100,7 @@ namespace pybind { ...@@ -98,7 +100,7 @@ namespace pybind {
using namespace paddle::framework; // NOLINT using namespace paddle::framework; // NOLINT
template <typename T> template <typename T>
static py::bytes SerializeMessage(T &self) { static py::bytes SerializeMessage(T &self) { // NOLINT
// Check IsInitialized in Python // Check IsInitialized in Python
std::string retv; std::string retv;
PADDLE_ENFORCE(self.Proto()->SerializePartialToString(&retv), PADDLE_ENFORCE(self.Proto()->SerializePartialToString(&retv),
...@@ -107,7 +109,7 @@ static py::bytes SerializeMessage(T &self) { ...@@ -107,7 +109,7 @@ static py::bytes SerializeMessage(T &self) {
} }
// Bind Methods // Bind Methods
void BindProgramDesc(py::module &m) { void BindProgramDesc(py::module &m) { // NOLINT
py::class_<ProgramDesc>(m, "ProgramDesc", "") py::class_<ProgramDesc>(m, "ProgramDesc", "")
.def(py::init<>()) .def(py::init<>())
.def("__init__", .def("__init__",
...@@ -151,7 +153,7 @@ void BindProgramDesc(py::module &m) { ...@@ -151,7 +153,7 @@ void BindProgramDesc(py::module &m) {
}); });
} }
void BindBlockDesc(py::module &m) { void BindBlockDesc(py::module &m) { // NOLINT
py::class_<BlockDesc>(m, "BlockDesc", "") py::class_<BlockDesc>(m, "BlockDesc", "")
.def_property_readonly("id", &BlockDesc::ID) .def_property_readonly("id", &BlockDesc::ID)
.def_property_readonly("parent", &BlockDesc::Parent) .def_property_readonly("parent", &BlockDesc::Parent)
...@@ -200,13 +202,19 @@ void BindBlockDesc(py::module &m) { ...@@ -200,13 +202,19 @@ void BindBlockDesc(py::module &m) {
return self.FindVarRecursive(name); return self.FindVarRecursive(name);
}, },
py::return_value_policy::reference) py::return_value_policy::reference)
.def("remove_var",
[](BlockDesc &self, py::bytes byte_name) {
std::string name = byte_name;
return self.RemoveVar(name);
},
py::return_value_policy::reference)
.def("all_vars", &BlockDesc::AllVars, py::return_value_policy::reference) .def("all_vars", &BlockDesc::AllVars, py::return_value_policy::reference)
.def("op_size", &BlockDesc::OpSize) .def("op_size", &BlockDesc::OpSize)
.def("op", &BlockDesc::Op, py::return_value_policy::reference) .def("op", &BlockDesc::Op, py::return_value_policy::reference)
.def("serialize_to_string", SerializeMessage<BlockDesc>); .def("serialize_to_string", SerializeMessage<BlockDesc>);
} }
void BindVarDsec(py::module &m) { void BindVarDsec(py::module &m) { // NOLINT
py::class_<VarDesc> var_desc(m, "VarDesc", ""); py::class_<VarDesc> var_desc(m, "VarDesc", "");
var_desc var_desc
.def("name", .def("name",
...@@ -257,7 +265,7 @@ void BindVarDsec(py::module &m) { ...@@ -257,7 +265,7 @@ void BindVarDsec(py::module &m) {
.value("RAW", proto::VarType::RAW); .value("RAW", proto::VarType::RAW);
} }
void BindOpDesc(py::module &m) { void BindOpDesc(py::module &m) { // NOLINT
py::enum_<proto::AttrType>(m, "AttrType", "") py::enum_<proto::AttrType>(m, "AttrType", "")
.value("INT", proto::AttrType::INT) .value("INT", proto::AttrType::INT)
.value("INTS", proto::AttrType::INTS) .value("INTS", proto::AttrType::INTS)
......
...@@ -26,25 +26,29 @@ class ParallelExecutor(object): ...@@ -26,25 +26,29 @@ class ParallelExecutor(object):
use_cuda, use_cuda,
num_threads=None, num_threads=None,
allow_op_delay=False): allow_op_delay=False):
places = [] self._places = []
self._act_places = []
if use_cuda: if use_cuda:
for i in xrange(core.get_cuda_device_count()): for i in xrange(core.get_cuda_device_count()):
p = core.Place() p = core.Place()
p.set_place(core.CUDAPlace(i)) self._act_places.append(core.CUDAPlace(i))
places.append(p) p.set_place(self._act_places[-1])
self._places.append(p)
else: else:
for i in xrange(multiprocessing.cpu_count()): for i in xrange(multiprocessing.cpu_count()):
p = core.Place() p = core.Place()
p.set_place(core.CPUPlace()) self._act_places.append(core.CPUPlace(i))
places.append(p) p.set_place(self._act_places[-1])
self._places.append(p)
assert self._places, "no place for execution"
if num_threads is None: if num_threads is None:
if use_cuda: if use_cuda:
# Experiments on se-resnext shows that too many threads hurt # Experiments on se-resnext shows that too many threads hurt
# performance. Worth tunning for other models in the future. # performance. Worth tunning for other models in the future.
num_threads = len(places) num_threads = len(self._places)
else: else:
min(len(places) * 2, multiprocessing.cpu_count()) min(len(self._places) * 2, multiprocessing.cpu_count())
startup = framework.default_startup_program() startup = framework.default_startup_program()
main = framework.default_main_program() main = framework.default_main_program()
...@@ -53,7 +57,7 @@ class ParallelExecutor(object): ...@@ -53,7 +57,7 @@ class ParallelExecutor(object):
self.executor = core.ParallelExecutor( self.executor = core.ParallelExecutor(
num_threads, num_threads,
True if use_cuda else False, # use_event True if use_cuda else False, # use_event
places, self._places,
set([ set([
p.name for p in main.global_block().iter_parameters() p.name for p in main.global_block().iter_parameters()
if not p.stop_gradient if not p.stop_gradient
...@@ -65,8 +69,25 @@ class ParallelExecutor(object): ...@@ -65,8 +69,25 @@ class ParallelExecutor(object):
allow_op_delay) allow_op_delay)
self.scope = scope self.scope = scope
def run(self, fetch_list): def run(self, fetch_list, feed_dict={}):
"""
:param fetch_list: A list of variable names that will be fetched.
:param feed_dict: A dict mapping for feed variable name to LoDTensor
or numpy array.
:return: fetched value list.
"""
if not isinstance(feed_dict, dict):
raise TypeError("feed_dict should be a dict")
feed_tensor_dict = {}
for i, feed_name in enumerate(feed_dict):
feed_tensor = feed_dict[feed_name]
if not isinstance(feed_tensor, core.LoDTensor):
feed_tensor = core.LoDTensor()
feed_tensor.set(feed_dict[feed_name], self._act_places[0])
feed_tensor_dict[feed_name] = feed_tensor
fetch_var_name = '@FETCHED_VAR_NAME@' fetch_var_name = '@FETCHED_VAR_NAME@'
self.executor.run(fetch_list, fetch_var_name) self.executor.run(fetch_list, fetch_var_name, feed_tensor_dict)
arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array() arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array()
return [arr[i] for i in range(len(arr))] return [arr[i] for i in range(len(arr))]
...@@ -21,13 +21,17 @@ import paddle.dataset.mnist as mnist ...@@ -21,13 +21,17 @@ import paddle.dataset.mnist as mnist
import paddle.dataset.wmt16 as wmt16 import paddle.dataset.wmt16 as wmt16
def simple_fc_net(): def simple_fc_net(use_feed):
reader = fluid.layers.open_recordio_file( if use_feed:
filename='./mnist.recordio', img = fluid.layers.data(name='image', shape=[784], dtype='float32')
shapes=[[-1, 784], [-1, 1]], label = fluid.layers.data(name='label', shape=[1], dtype='int64')
lod_levels=[0, 0], else:
dtypes=['float32', 'int64']) reader = fluid.layers.open_recordio_file(
img, label = fluid.layers.read_file(reader) filename='./mnist.recordio',
shapes=[[-1, 784], [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'])
img, label = fluid.layers.read_file(reader)
hidden = img hidden = img
for _ in xrange(4): for _ in xrange(4):
hidden = fluid.layers.fc( hidden = fluid.layers.fc(
...@@ -42,13 +46,18 @@ def simple_fc_net(): ...@@ -42,13 +46,18 @@ def simple_fc_net():
return loss return loss
def fc_with_batchnorm(): def fc_with_batchnorm(use_feed):
reader = fluid.layers.open_recordio_file( if use_feed:
filename='./mnist.recordio', img = fluid.layers.data(name='image', shape=[784], dtype='float32')
shapes=[[-1, 784], [-1, 1]], label = fluid.layers.data(name='label', shape=[1], dtype='int64')
lod_levels=[0, 0], else:
dtypes=['float32', 'int64']) reader = fluid.layers.open_recordio_file(
img, label = fluid.layers.read_file(reader) filename='./mnist.recordio',
shapes=[[-1, 784], [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'])
img, label = fluid.layers.read_file(reader)
hidden = img hidden = img
for _ in xrange(1): for _ in xrange(1):
hidden = fluid.layers.fc( hidden = fluid.layers.fc(
...@@ -135,7 +144,9 @@ def bottleneck_block(input, num_filters, stride, cardinality, reduction_ratio): ...@@ -135,7 +144,9 @@ def bottleneck_block(input, num_filters, stride, cardinality, reduction_ratio):
return fluid.layers.elementwise_add(x=short, y=scale, act='relu') return fluid.layers.elementwise_add(x=short, y=scale, act='relu')
def SE_ResNeXt152Small(batch_size=2): def SE_ResNeXt152Small(batch_size=2, use_feed=False):
assert not use_feed, "SE_ResNeXt doesn't support feed yet"
img = fluid.layers.fill_constant( img = fluid.layers.fill_constant(
shape=[batch_size, 3, 224, 224], dtype='float32', value=0.0) shape=[batch_size, 3, 224, 224], dtype='float32', value=0.0)
label = fluid.layers.fill_constant( label = fluid.layers.fill_constant(
...@@ -185,30 +196,28 @@ class TestParallelExecutorBase(unittest.TestCase): ...@@ -185,30 +196,28 @@ class TestParallelExecutorBase(unittest.TestCase):
memory_opt=True, memory_opt=True,
iter=10, iter=10,
batch_size=None, batch_size=None,
allow_op_delay=False): allow_op_delay=False,
feed_dict={}):
main = fluid.Program() main = fluid.Program()
startup = fluid.Program() startup = fluid.Program()
with fluid.program_guard(main, startup): with fluid.program_guard(main, startup):
loss = method() loss = method(use_feed=len(feed_dict) > 0)
adam = fluid.optimizer.Adam() adam = fluid.optimizer.Adam()
adam.minimize(loss) adam.minimize(loss)
if memory_opt: if memory_opt:
fluid.memory_optimize(main) fluid.memory_optimize(main)
exe = fluid.ParallelExecutor( exe = fluid.ParallelExecutor(loss_name=loss.name, use_cuda=True)
loss_name=loss.name,
use_cuda=True,
allow_op_delay=allow_op_delay)
if batch_size is not None: if batch_size is not None:
batch_size *= fluid.core.get_cuda_device_count() batch_size *= fluid.core.get_cuda_device_count()
begin = time.time() begin = time.time()
first_loss, = exe.run([loss.name]) first_loss, = exe.run([loss.name], feed_dict=feed_dict)
first_loss = numpy.array(first_loss) first_loss = numpy.array(first_loss)
for i in xrange(iter): for i in xrange(iter):
exe.run([]) exe.run([], feed_dict=feed_dict)
last_loss, = exe.run([loss.name]) last_loss, = exe.run([loss.name], feed_dict=feed_dict)
end = time.time() end = time.time()
if batch_size is not None: if batch_size is not None:
...@@ -242,9 +251,19 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -242,9 +251,19 @@ class TestMNIST(TestParallelExecutorBase):
self.check_network_convergence(simple_fc_net) self.check_network_convergence(simple_fc_net)
self.check_network_convergence(simple_fc_net, allow_op_delay=True) self.check_network_convergence(simple_fc_net, allow_op_delay=True)
img = numpy.zeros(shape=[32, 784], dtype='float32')
label = numpy.ones(shape=[32, 1], dtype='int64')
self.check_network_convergence(
simple_fc_net, feed_dict={"image": img,
"label": label})
def test_batchnorm_fc(self): def test_batchnorm_fc(self):
self.check_network_convergence(fc_with_batchnorm) self.check_network_convergence(fc_with_batchnorm)
self.check_network_convergence(fc_with_batchnorm, allow_op_delay=True) img = numpy.zeros(shape=[32, 784], dtype='float32')
label = numpy.ones(shape=[32, 1], dtype='int64')
self.check_network_convergence(
fc_with_batchnorm, feed_dict={"image": img,
"label": label})
class TestResnet(TestParallelExecutorBase): class TestResnet(TestParallelExecutorBase):
...@@ -400,7 +419,8 @@ def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head): ...@@ -400,7 +419,8 @@ def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head):
import transformer_model import transformer_model
def transformer(): def transformer(use_feed):
assert not use_feed, "transfomer doesn't support feed yet"
return transformer_model.transformer( return transformer_model.transformer(
ModelHyperParams.src_vocab_size + 1, ModelHyperParams.src_vocab_size + 1,
ModelHyperParams.trg_vocab_size + 1, ModelHyperParams.max_length + 1, ModelHyperParams.trg_vocab_size + 1, ModelHyperParams.max_length + 1,
......
...@@ -19,9 +19,9 @@ from paddle.fluid.framework import Program ...@@ -19,9 +19,9 @@ from paddle.fluid.framework import Program
class TestOpDesc(unittest.TestCase): class TestOpDesc(unittest.TestCase):
def test_op_desc(self): def test_op_desc(self):
prog = core.ProgramDesc() program_desc = core.ProgramDesc()
self.assertIsNotNone(prog) self.assertIsNotNone(program_desc)
block = prog.block(0) block = program_desc.block(0)
self.assertIsNotNone(block) self.assertIsNotNone(block)
op = block.append_op() op = block.append_op()
self.assertIsNotNone(op) self.assertIsNotNone(op)
...@@ -67,7 +67,7 @@ class TestOpDesc(unittest.TestCase): ...@@ -67,7 +67,7 @@ class TestOpDesc(unittest.TestCase):
self.assertEqual(8, len(op.attr_names())) self.assertEqual(8, len(op.attr_names()))
op.set_block_attr("block_attr", prog.block(0)) op.set_block_attr("block_attr", program_desc.block(0))
self.assertEqual(0, op.block_attr("block_attr")) self.assertEqual(0, op.block_attr("block_attr"))
mul_op = block.append_op() mul_op = block.append_op()
...@@ -88,20 +88,20 @@ class TestProgramDesc(unittest.TestCase): ...@@ -88,20 +88,20 @@ class TestProgramDesc(unittest.TestCase):
del program_desc del program_desc
def test_append_block(self): def test_append_block(self):
prog_desc = core.ProgramDesc() program_desc = core.ProgramDesc()
self.assertIsNotNone(prog_desc) self.assertIsNotNone(program_desc)
block_root = prog_desc.block(0) block_root = program_desc.block(0)
self.assertIsNotNone(block_root) self.assertIsNotNone(block_root)
self.assertEqual(block_root.id, 0) self.assertEqual(block_root.id, 0)
block1 = prog_desc.append_block(block_root) block1 = program_desc.append_block(block_root)
block2 = prog_desc.append_block(block1) block2 = program_desc.append_block(block1)
self.assertIsNotNone(block1) self.assertIsNotNone(block1)
self.assertEqual(block1.id, block2.parent) self.assertEqual(block1.id, block2.parent)
self.assertEqual(block_root.id, block1.parent) self.assertEqual(block_root.id, block1.parent)
block3 = prog_desc.append_block(block_root) block3 = program_desc.append_block(block_root)
self.assertEqual(block3.parent, block_root.id) self.assertEqual(block3.parent, block_root.id)
self.assertEqual(prog_desc.block(1).id, 1) self.assertEqual(program_desc.block(1).id, 1)
self.assertEqual(4, prog_desc.num_blocks()) self.assertEqual(4, program_desc.num_blocks())
class TestVarDesc(unittest.TestCase): class TestVarDesc(unittest.TestCase):
...@@ -162,9 +162,9 @@ class TestVarDesc(unittest.TestCase): ...@@ -162,9 +162,9 @@ class TestVarDesc(unittest.TestCase):
class TestBlockDesc(unittest.TestCase): class TestBlockDesc(unittest.TestCase):
def test_add_var(self): def test_add_var(self):
prog = core.ProgramDesc() program_desc = core.ProgramDesc()
self.assertIsNotNone(prog) self.assertIsNotNone(program_desc)
block = prog.block(0) block = program_desc.block(0)
self.assertIsNotNone(block) self.assertIsNotNone(block)
var1 = block.var("var1") var1 = block.var("var1")
var2 = block.var("var2") var2 = block.var("var2")
...@@ -175,9 +175,9 @@ class TestBlockDesc(unittest.TestCase): ...@@ -175,9 +175,9 @@ class TestBlockDesc(unittest.TestCase):
self.assertEqual(var2_re, var2) self.assertEqual(var2_re, var2)
def test_add_op(self): def test_add_op(self):
prog = core.ProgramDesc() program_desc = core.ProgramDesc()
self.assertIsNotNone(prog) self.assertIsNotNone(program_desc)
block = prog.block(0) block = program_desc.block(0)
self.assertIsNotNone(block) self.assertIsNotNone(block)
op1 = block.append_op() op1 = block.append_op()
op2 = block.append_op() op2 = block.append_op()
...@@ -189,9 +189,9 @@ class TestBlockDesc(unittest.TestCase): ...@@ -189,9 +189,9 @@ class TestBlockDesc(unittest.TestCase):
def test_remove_op(self): def test_remove_op(self):
program = Program() program = Program()
prog = program.desc program_desc = program.desc
self.assertIsNotNone(prog) self.assertIsNotNone(program_desc)
block = prog.block(0) block = program_desc.block(0)
self.assertIsNotNone(block) self.assertIsNotNone(block)
op0 = block.append_op() op0 = block.append_op()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册