“f6c9f56394838021af5db26d046f3c90606a17fc”上不存在“develop/doc/api/v2/fluid/regularizer.html”
提交 0439ba2f 编写于 作者: Y Yu Yang

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into feature/rewrite_vector

快速开始 快速开始
======== ========
快速安装
--------
PaddlePaddle支持使用pip快速安装,目前支持CentOS 6以上, Ubuntu 14.04以及MacOS 10.12,并安装有Python2.7。 PaddlePaddle支持使用pip快速安装,目前支持CentOS 6以上, Ubuntu 14.04以及MacOS 10.12,并安装有Python2.7。
执行下面的命令完成快速安装,版本为cpu_avx_openblas: 执行下面的命令完成快速安装,版本为cpu_avx_openblas:
...@@ -16,6 +19,9 @@ PaddlePaddle支持使用pip快速安装,目前支持CentOS 6以上, Ubuntu 14. ...@@ -16,6 +19,9 @@ PaddlePaddle支持使用pip快速安装,目前支持CentOS 6以上, Ubuntu 14.
更详细的安装和编译方法参考::ref:`install_steps` 。 更详细的安装和编译方法参考::ref:`install_steps` 。
快速使用
--------
创建一个 housing.py 并粘贴此Python代码: 创建一个 housing.py 并粘贴此Python代码:
.. code-block:: python .. code-block:: python
......
Quick Start Quick Start
============ ============
Quick Install
-------------
You can use pip to install PaddlePaddle with a single command, supports You can use pip to install PaddlePaddle with a single command, supports
CentOS 6 above, Ubuntu 14.04 above or MacOS 10.12, with Python 2.7 installed. CentOS 6 above, Ubuntu 14.04 above or MacOS 10.12, with Python 2.7 installed.
Simply run the following command to install, the version is cpu_avx_openblas: Simply run the following command to install, the version is cpu_avx_openblas:
...@@ -17,6 +20,9 @@ If you need to install GPU version (cuda7.5_cudnn5_avx_openblas), run: ...@@ -17,6 +20,9 @@ If you need to install GPU version (cuda7.5_cudnn5_avx_openblas), run:
For more details about installation and build: :ref:`install_steps` . For more details about installation and build: :ref:`install_steps` .
Quick Use
---------
Create a new file called housing.py, and paste this Python Create a new file called housing.py, and paste this Python
code: code:
......
分布式训练 分布式训练
========== ==========
本节将介绍如何使用PaddlePaddle在不同的集群框架下完成分布式训练。分布式训练架构如下图所示:
.. image:: src/ps_cn.png
:width: 500
- 数据分片(Data shard): 用于训练神经网络的数据,被切分成多个部分,每个部分分别给每个trainer使用。
- 计算节点(Trainer): 每个trainer启动后读取切分好的一部分数据,开始神经网络的“前馈”和“后馈”计算,并和参数服务器通信。在完成一定量数据的训练后,上传计算得出的梯度(gradients),然后下载优化更新后的神经网络参数(parameters)。
- 参数服务器(Parameter server):每个参数服务器只保存整个神经网络所有参数的一部分。参数服务器接收从计算节点上传的梯度,并完成参数优化更新,再将更新后的参数下发到每个计算节点。
这样,通过计算节点和参数服务器的分布式协作,可以完成神经网络的SGD方法的训练。PaddlePaddle可以同时支持同步随机梯度下降(SGD)和异步随机梯度下降。
在使用同步SGD训练神经网络时,PaddlePaddle使用同步屏障(barrier),使梯度的提交和参数的更新按照顺序方式执行。在异步SGD中,则并不会等待所有trainer提交梯度才更新参数,这样极大地提高了计算的并行性:参数服务器之间不相互依赖,并行地接收梯度和更新参数,参数服务器也不会等待计算节点全部都提交梯度之后才开始下一步,计算节点之间也不会相互依赖,并行地执行模型的训练。可以看出,虽然异步SGD方式会提高参数更新并行度, 但是并不能保证参数同步更新,在任意时间某一台参数服务器上保存的参数可能比另一台要更新,与同步SGD相比,梯度会有噪声。
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
introduction_cn.md
preparations_cn.md preparations_cn.md
cmd_argument_cn.md cmd_argument_cn.md
multi_cluster/index_cn.rst multi_cluster/index_cn.rst
Distributed Training Distributed Training
==================== ====================
In this section, we'll explain how to run distributed training jobs with PaddlePaddle on different types of clusters. The diagram below shows the main architecture of a distributed trainning job:
.. image:: src/ps_en.png
:width: 500
- Data shard: training data will be split into multiple partitions, trainers use the partitions of the whole dataset to do the training job.
- Trainer: each trainer reads the data shard, and train the neural network. Then the trainer will upload calculated "gradients" to parameter servers, and wait for parameters to be optimized on the parameter server side. When that finishes, the trainer download optimized parameters and continues its training.
- Parameter server: every parameter server stores part of the whole neural network model data. They will do optimization calculations when gradients are uploaded from trainers, and then send updated parameters to trainers.
PaddlePaddle can support both synchronize stochastic gradient descent (SGD) and asynchronous SGD.
When training with synchronize SGD, PaddlePaddle uses an internal "synchronize barrier" which makes gradients update and parameter download in strict order. On the other hand, asynchronous SGD won't wait for all trainers to finish upload at a single step, this will increase the parallelism of distributed training: parameter servers do not depend on each other, they'll do parameter optimization concurrently. Parameter servers will not wait for trainers, so trainers will also do their work concurrently. But asynchronous SGD will introduce more randomness and noises in the gradient.
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
introduction_en.md
preparations_en.md preparations_en.md
cmd_argument_en.md cmd_argument_en.md
multi_cluster/index_en.rst multi_cluster/index_en.rst
## 概述
本节将介绍如何使用PaddlePaddle在不同的集群框架下完成分布式训练。分布式训练架构如下图所示:
<img src="https://user-images.githubusercontent.com/13348433/31772175-5f419eca-b511-11e7-9db7-5231fe3d9ccb.png" width="500">
- 数据分片(Data shard): 用于训练神经网络的数据,被切分成多个部分,每个部分分别给每个trainer使用。
- 计算节点(Trainer): 每个trainer启动后读取切分好的一部分数据,开始神经网络的“前馈”和“后馈”计算,并和参数服务器通信。在完成一定量数据的训练后,上传计算得出的梯度(gradients),然后下载优化更新后的神经网络参数(parameters)。
- 参数服务器(Parameter server):每个参数服务器只保存整个神经网络所有参数的一部分。参数服务器接收从计算节点上传的梯度,并完成参数优化更新,再将更新后的参数下发到每个计算节点。
这样,通过计算节点和参数服务器的分布式协作,可以完成神经网络的SGD方法的训练。PaddlePaddle可以同时支持同步随机梯度下降(SGD)和异步随机梯度下降。
在使用同步SGD训练神经网络时,PaddlePaddle使用同步屏障(barrier),使梯度的提交和参数的更新按照顺序方式执行。在异步SGD中,则并不会等待所有trainer提交梯度才更新参数,这样极大地提高了计算的并行性:参数服务器之间不相互依赖,并行地接收梯度和更新参数,参数服务器也不会等待计算节点全部都提交梯度之后才开始下一步,计算节点之间也不会相互依赖,并行地执行模型的训练。可以看出,虽然异步SGD方式会提高参数更新并行度, 但是并不能保证参数同步更新,在任意时间某一台参数服务器上保存的参数可能比另一台要更新,与同步SGD相比,梯度会有噪声。
## Introduction
In this section, we'll explain how to run distributed training jobs with PaddlePaddle on different types of clusters. The diagram below shows the main architecture of a distributed trainning job:
<img src="https://user-images.githubusercontent.com/13348433/31772146-41523d84-b511-11e7-8a12-a69fd136c283.png" width="500">
- Data shard: training data will be split into multiple partitions, trainers use the partitions of the whole dataset to do the training job.
- Trainer: each trainer reads the data shard, and train the neural network. Then the trainer will upload calculated "gradients" to parameter servers, and wait for parameters to be optimized on the parameter server side. When that finishes, the trainer download optimized parameters and continues its training.
- Parameter server: every parameter server stores part of the whole neural network model data. They will do optimization calculations when gradients are uploaded from trainers, and then send updated parameters to trainers.
PaddlePaddle can support both synchronize stochastic gradient descent (SGD) and asynchronous SGD.
When training with synchronize SGD, PaddlePaddle uses an internal "synchronize barrier" which makes gradients update and parameter download in strict order. On the other hand, asynchronous SGD won't wait for all trainers to finish upload at a single step, this will increase the parallelism of distributed training: parameter servers do not depend on each other, they'll do parameter optimization concurrently. Parameter servers will not wait for trainers, so trainers will also do their work concurrently. But asynchronous SGD will introduce more randomness and noises in the gradient.
RNN相关模型 RNN模型
=========== ===========
.. toctree:: .. toctree::
......
...@@ -162,9 +162,8 @@ BlockDesc::BlockDesc(const BlockDesc &other, proto::BlockDesc *desc, ...@@ -162,9 +162,8 @@ BlockDesc::BlockDesc(const BlockDesc &other, proto::BlockDesc *desc,
: prog_(prog), desc_(desc) { : prog_(prog), desc_(desc) {
need_update_ = true; need_update_ = true;
for (auto &op : other.ops_) { for (auto &op : other.ops_) {
ops_.emplace_back(new OpDesc(*op, this)); ops_.emplace_back(new OpDesc(*op->Proto(), prog, this));
} }
for (auto &it : other.vars_) { for (auto &it : other.vars_) {
auto *var = new VarDesc(*it.second); auto *var = new VarDesc(*it.second);
vars_[it.first].reset(var); vars_[it.first].reset(var);
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. /* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. /* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
...@@ -22,6 +22,28 @@ limitations under the License. */ ...@@ -22,6 +22,28 @@ limitations under the License. */
using paddle::framework::Channel; using paddle::framework::Channel;
using paddle::framework::MakeChannel; using paddle::framework::MakeChannel;
using paddle::framework::CloseChannel; using paddle::framework::CloseChannel;
using paddle::framework::details::Buffered;
using paddle::framework::details::UnBuffered;
void RecevingOrderEqualToSendingOrder(Channel<int> *ch) {
unsigned sum_send = 0;
std::thread t([&]() {
for (int i = 0; i < 5; i++) {
EXPECT_EQ(ch->Send(&i), true);
sum_send += i;
}
});
for (int i = 0; i < 5; i++) {
int recv;
EXPECT_EQ(ch->Receive(&recv), true);
EXPECT_EQ(recv, i);
}
CloseChannel(ch);
t.join();
EXPECT_EQ(sum_send, 10U);
delete ch;
}
TEST(Channel, MakeAndClose) { TEST(Channel, MakeAndClose) {
using paddle::framework::details::Buffered; using paddle::framework::details::Buffered;
...@@ -60,13 +82,54 @@ TEST(Channel, SufficientBufferSizeDoesntBlock) { ...@@ -60,13 +82,54 @@ TEST(Channel, SufficientBufferSizeDoesntBlock) {
delete ch; delete ch;
} }
TEST(Channel, SendOnClosedChannelPanics) { // This tests that a channel must return false
const size_t buffer_size = 10; // on send and receive performed after closing the channel.
auto ch = MakeChannel<size_t>(buffer_size); // Receive will only return false after close when queue is empty.
size_t i = 5; // By creating separate threads for sending and receiving, we make this
EXPECT_EQ(ch->Send(&i), true); // should not block or panic // function able to test both buffered and unbuffered channels.
void SendReceiveWithACloseChannelShouldPanic(Channel<size_t> *ch) {
const size_t data = 5;
std::thread send_thread{[&]() {
size_t i = data;
EXPECT_EQ(ch->Send(&i), true); // should not block
}};
std::thread recv_thread{[&]() {
size_t i;
EXPECT_EQ(ch->Receive(&i), true); // should not block
EXPECT_EQ(i, data);
}};
send_thread.join();
recv_thread.join();
// After closing send should return false. Receive should
// also return false as there is no data in queue.
CloseChannel(ch); CloseChannel(ch);
EXPECT_EQ(ch->Send(&i), false); // should panic send_thread = std::thread{[&]() {
size_t i = data;
EXPECT_EQ(ch->Send(&i), false); // should return false
}};
recv_thread = std::thread{[&]() {
size_t i;
// should return false because channel is closed and queue is empty
EXPECT_EQ(ch->Receive(&i), false);
}};
send_thread.join();
recv_thread.join();
}
TEST(Channel, SendReceiveClosedBufferedChannelPanics) {
size_t buffer_size = 10;
auto ch = MakeChannel<size_t>(buffer_size);
SendReceiveWithACloseChannelShouldPanic(ch);
delete ch;
}
TEST(Channel, SendReceiveClosedUnBufferedChannelPanics) {
auto ch = MakeChannel<size_t>(0);
SendReceiveWithACloseChannelShouldPanic(ch);
delete ch; delete ch;
} }
...@@ -94,9 +157,7 @@ TEST(Channel, ReceiveFromBufferedChannelReturnResidualValuesTest) { ...@@ -94,9 +157,7 @@ TEST(Channel, ReceiveFromBufferedChannelReturnResidualValuesTest) {
for (size_t i = 0; i < buffer_size; ++i) { for (size_t i = 0; i < buffer_size; ++i) {
EXPECT_EQ(ch->Receive(&out), EXPECT_EQ(ch->Receive(&out),
false); // after receiving residual values, return zeros. false); // receiving on closed channel should return false
// Note: we cannot check EXPECT_EQ(out, 0), because C++ doesn't
// define zero values like Go does.
} }
delete ch; delete ch;
} }
...@@ -115,7 +176,7 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) { ...@@ -115,7 +176,7 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) {
sum += i; sum += i;
} }
}); });
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait 0.5 sec std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait 0.1 sec
EXPECT_EQ(sum, 45U); EXPECT_EQ(sum, 45U);
CloseChannel(ch); CloseChannel(ch);
...@@ -123,31 +184,17 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) { ...@@ -123,31 +184,17 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) {
delete ch; delete ch;
} }
TEST(Channel, SimpleUnbufferedChannelTest) { TEST(Channel, RecevingOrderEqualToSendingOrderWithUnBufferedChannel) {
auto ch = MakeChannel<int>(0); auto ch = MakeChannel<int>(0);
unsigned sum_send = 0; RecevingOrderEqualToSendingOrder(ch);
std::thread t([&]() { }
for (int i = 0; i < 5; i++) {
EXPECT_EQ(ch->Send(&i), true);
sum_send += i;
}
});
for (int i = 0; i < 5; i++) {
int recv;
EXPECT_EQ(ch->Receive(&recv), true);
EXPECT_EQ(recv, i);
}
CloseChannel(ch); TEST(Channel, RecevingOrderEqualToSendingOrderWithBufferedChannel) {
t.join(); auto ch = MakeChannel<int>(10);
EXPECT_EQ(sum_send, 10U); RecevingOrderEqualToSendingOrder(ch);
delete ch;
} }
// This tests that closing a buffered channel also unblocks void ChannelCloseUnblocksReceiversTest(Channel<int> *ch) {
// any receivers waiting on the channel
TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
auto ch = MakeChannel<int>(1);
size_t num_threads = 5; size_t num_threads = 5;
std::thread t[num_threads]; std::thread t[num_threads];
bool thread_ended[num_threads]; bool thread_ended[num_threads];
...@@ -158,15 +205,14 @@ TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) { ...@@ -158,15 +205,14 @@ TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
t[i] = std::thread( t[i] = std::thread(
[&](bool *p) { [&](bool *p) {
int data; int data;
// All reads should return false
EXPECT_EQ(ch->Receive(&data), false); EXPECT_EQ(ch->Receive(&data), false);
*p = true; *p = true;
}, },
&thread_ended[i]); &thread_ended[i]);
} }
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait 0.1 sec
// Verify that all threads are blocked // Verify that all the threads are blocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], false); EXPECT_EQ(thread_ended[i], false);
} }
...@@ -175,7 +221,7 @@ TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) { ...@@ -175,7 +221,7 @@ TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
// This should unblock all receivers // This should unblock all receivers
CloseChannel(ch); CloseChannel(ch);
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait 0.1 sec
// Verify that all threads got unblocked // Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < num_threads; i++) {
...@@ -183,13 +229,12 @@ TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) { ...@@ -183,13 +229,12 @@ TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
} }
for (size_t i = 0; i < num_threads; i++) t[i].join(); for (size_t i = 0; i < num_threads; i++) t[i].join();
delete ch;
} }
// This tests that closing a buffered channel also unblocks void ChannelCloseUnblocksSendersTest(Channel<int> *ch) {
// any senders waiting for channel to have write space using paddle::framework::details::Buffered;
TEST(Channel, BufferedChannelCloseUnblocksSendersTest) { using paddle::framework::details::UnBuffered;
auto ch = MakeChannel<int>(1);
size_t num_threads = 5; size_t num_threads = 5;
std::thread t[num_threads]; std::thread t[num_threads];
bool thread_ended[num_threads]; bool thread_ended[num_threads];
...@@ -209,34 +254,56 @@ TEST(Channel, BufferedChannelCloseUnblocksSendersTest) { ...@@ -209,34 +254,56 @@ TEST(Channel, BufferedChannelCloseUnblocksSendersTest) {
} }
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait
// Verify that atleast 4 threads are blocked if (dynamic_cast<Buffered<int> *>(ch)) {
int ct = 0; // If ch is Buffered, atleast 4 threads must be blocked.
for (size_t i = 0; i < num_threads; i++) { int ct = 0;
if (thread_ended[i] == false) ct++; for (size_t i = 0; i < num_threads; i++) {
if (!thread_ended[i]) ct++;
}
EXPECT_GE(ct, 4);
} else {
// If ch is UnBuffered, all the threads should be blocked.
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], false);
}
} }
// Atleast 4 threads must be blocked
EXPECT_GE(ct, 4);
// Explicitly close the thread // Explicitly close the thread
// This should unblock all senders // This should unblock all senders
CloseChannel(ch); CloseChannel(ch);
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait
// Verify that all threads got unblocked // Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true); EXPECT_EQ(thread_ended[i], true);
} }
// Verify that only 1 send was successful if (dynamic_cast<Buffered<int> *>(ch)) {
ct = 0; // Verify that only 1 send was successful
for (size_t i = 0; i < num_threads; i++) { int ct = 0;
if (send_success[i]) ct++; for (size_t i = 0; i < num_threads; i++) {
if (send_success[i]) ct++;
}
// Only 1 send must be successful
EXPECT_EQ(ct, 1);
} }
// Only 1 send must be successful
EXPECT_EQ(ct, 1);
for (size_t i = 0; i < num_threads; i++) t[i].join(); for (size_t i = 0; i < num_threads; i++) t[i].join();
}
// This tests that closing a buffered channel also unblocks
// any receivers waiting on the channel
TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
auto ch = MakeChannel<int>(1);
ChannelCloseUnblocksReceiversTest(ch);
delete ch;
}
// This tests that closing a buffered channel also unblocks
// any senders waiting for channel to have write space
TEST(Channel, BufferedChannelCloseUnblocksSendersTest) {
auto ch = MakeChannel<int>(1);
ChannelCloseUnblocksSendersTest(ch);
delete ch; delete ch;
} }
...@@ -244,40 +311,7 @@ TEST(Channel, BufferedChannelCloseUnblocksSendersTest) { ...@@ -244,40 +311,7 @@ TEST(Channel, BufferedChannelCloseUnblocksSendersTest) {
// unblocks any receivers waiting for senders // unblocks any receivers waiting for senders
TEST(Channel, UnbufferedChannelCloseUnblocksReceiversTest) { TEST(Channel, UnbufferedChannelCloseUnblocksReceiversTest) {
auto ch = MakeChannel<int>(0); auto ch = MakeChannel<int>(0);
size_t num_threads = 5; ChannelCloseUnblocksReceiversTest(ch);
std::thread t[num_threads];
bool thread_ended[num_threads];
// Launches threads that try to read and are blocked becausew of no writers
for (size_t i = 0; i < num_threads; i++) {
thread_ended[i] = false;
t[i] = std::thread(
[&](bool *p) {
int data;
EXPECT_EQ(ch->Receive(&data), false);
*p = true;
},
&thread_ended[i]);
}
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
// Verify that all the threads are blocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], false);
}
// Explicitly close the thread
// This should unblock all receivers
CloseChannel(ch);
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
// Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true);
}
for (size_t i = 0; i < num_threads; i++) t[i].join();
delete ch; delete ch;
} }
...@@ -285,40 +319,7 @@ TEST(Channel, UnbufferedChannelCloseUnblocksReceiversTest) { ...@@ -285,40 +319,7 @@ TEST(Channel, UnbufferedChannelCloseUnblocksReceiversTest) {
// unblocks any senders waiting for senders // unblocks any senders waiting for senders
TEST(Channel, UnbufferedChannelCloseUnblocksSendersTest) { TEST(Channel, UnbufferedChannelCloseUnblocksSendersTest) {
auto ch = MakeChannel<int>(0); auto ch = MakeChannel<int>(0);
size_t num_threads = 5; ChannelCloseUnblocksReceiversTest(ch);
std::thread t[num_threads];
bool thread_ended[num_threads];
// Launches threads that try to read and are blocked becausew of no writers
for (size_t i = 0; i < num_threads; i++) {
thread_ended[i] = false;
t[i] = std::thread(
[&](bool *p) {
int data = 10;
EXPECT_EQ(ch->Send(&data), false);
*p = true;
},
&thread_ended[i]);
}
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
// Verify that all the threads are blocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], false);
}
// Explicitly close the thread
// This should unblock all receivers
CloseChannel(ch);
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
// Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true);
}
for (size_t i = 0; i < num_threads; i++) t[i].join();
delete ch; delete ch;
} }
...@@ -381,3 +382,129 @@ TEST(Channel, UnbufferedMoreReceiveLessSendTest) { ...@@ -381,3 +382,129 @@ TEST(Channel, UnbufferedMoreReceiveLessSendTest) {
EXPECT_EQ(sum_receive, 28U); EXPECT_EQ(sum_receive, 28U);
delete ch; delete ch;
} }
// This tests that destroying a channel unblocks
// any senders waiting for channel to have write space
void ChannelDestroyUnblockSenders(Channel<int> *ch) {
size_t num_threads = 5;
std::thread t[num_threads];
bool thread_ended[num_threads];
bool send_success[num_threads];
// Launches threads that try to write and are blocked because of no readers
for (size_t i = 0; i < num_threads; i++) {
thread_ended[i] = false;
send_success[i] = false;
t[i] = std::thread(
[&](bool *ended, bool *success) {
int data = 10;
*success = ch->Send(&data);
*ended = true;
},
&thread_ended[i], &send_success[i]);
}
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
bool is_buffered_channel = false;
if (dynamic_cast<Buffered<int> *>(ch)) is_buffered_channel = true;
if (is_buffered_channel) {
// If channel is buffered, verify that atleast 4 threads are blocked
int ct = 0;
for (size_t i = 0; i < num_threads; i++) {
if (thread_ended[i] == false) ct++;
}
// Atleast 4 threads must be blocked
EXPECT_GE(ct, 4);
} else {
// Verify that all the threads are blocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], false);
}
}
// Explicitly destroy the channel
delete ch;
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
// Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true);
}
// Count number of successfuld sends
int ct = 0;
for (size_t i = 0; i < num_threads; i++) {
if (send_success[i]) ct++;
}
if (is_buffered_channel) {
// Only 1 send must be successful
EXPECT_EQ(ct, 1);
} else {
// In unbuffered channel, no send should be successful
EXPECT_EQ(ct, 0);
}
// Join all threads
for (size_t i = 0; i < num_threads; i++) t[i].join();
}
// This tests that destroying a channel also unblocks
// any receivers waiting on the channel
void ChannelDestroyUnblockReceivers(Channel<int> *ch) {
size_t num_threads = 5;
std::thread t[num_threads];
bool thread_ended[num_threads];
// Launches threads that try to read and are blocked because of no writers
for (size_t i = 0; i < num_threads; i++) {
thread_ended[i] = false;
t[i] = std::thread(
[&](bool *p) {
int data;
// All reads should return false
EXPECT_EQ(ch->Receive(&data), false);
*p = true;
},
&thread_ended[i]);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait
// Verify that all threads are blocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], false);
}
// delete the channel
delete ch;
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
// Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true);
}
for (size_t i = 0; i < num_threads; i++) t[i].join();
}
TEST(Channel, BufferedChannelDestroyUnblocksReceiversTest) {
size_t buffer_size = 1;
auto ch = MakeChannel<int>(buffer_size);
ChannelDestroyUnblockReceivers(ch);
}
TEST(Channel, BufferedChannelDestroyUnblocksSendersTest) {
size_t buffer_size = 1;
auto ch = MakeChannel<int>(buffer_size);
ChannelDestroyUnblockSenders(ch);
}
// This tests that destroying an unbuffered channel also unblocks
// unblocks any receivers waiting for senders
TEST(Channel, UnbufferedChannelDestroyUnblocksReceiversTest) {
auto ch = MakeChannel<int>(0);
ChannelDestroyUnblockReceivers(ch);
}
TEST(Channel, UnbufferedChannelDestroyUnblocksSendersTest) {
auto ch = MakeChannel<int>(0);
ChannelDestroyUnblockSenders(ch);
}
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. /* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
...@@ -25,6 +25,14 @@ namespace paddle { ...@@ -25,6 +25,14 @@ namespace paddle {
namespace framework { namespace framework {
namespace details { namespace details {
// Four of the properties of Buffered Channel:
// - A send to a full channel blocks temporarily until a receive from the
// channel or the channel is closed.
// - A receive from an empty channel blocks temporarily until a send to the
// channel or the channel is closed.
// - A send to a closed channel returns false immediately.
// - A receive from a closed channel returns false immediately.
template <typename T> template <typename T>
class Buffered : public paddle::framework::Channel<T> { class Buffered : public paddle::framework::Channel<T> {
friend Channel<T>* paddle::framework::MakeChannel<T>(size_t); friend Channel<T>* paddle::framework::MakeChannel<T>(size_t);
...@@ -42,8 +50,11 @@ class Buffered : public paddle::framework::Channel<T> { ...@@ -42,8 +50,11 @@ class Buffered : public paddle::framework::Channel<T> {
std::mutex mu_; std::mutex mu_;
std::condition_variable empty_cond_var_; std::condition_variable empty_cond_var_;
std::condition_variable full_cond_var_; std::condition_variable full_cond_var_;
std::condition_variable destructor_cond_var_;
std::deque<T> channel_; std::deque<T> channel_;
std::atomic<bool> closed_{false}; std::atomic<bool> closed_{false};
std::atomic<unsigned> send_ctr{0};
std::atomic<unsigned> recv_ctr{0};
Buffered(size_t cap) : cap_(cap), closed_(false) { Buffered(size_t cap) : cap_(cap), closed_(false) {
PADDLE_ENFORCE_GT(cap, 0); PADDLE_ENFORCE_GT(cap, 0);
...@@ -58,6 +69,7 @@ bool Buffered<T>::Send(T* item) { ...@@ -58,6 +69,7 @@ bool Buffered<T>::Send(T* item) {
if (closed_) { if (closed_) {
return ret; return ret;
} }
send_ctr++;
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
full_cond_var_.wait(lock, full_cond_var_.wait(lock,
[this]() { return channel_.size() < cap_ || closed_; }); [this]() { return channel_.size() < cap_ || closed_; });
...@@ -67,20 +79,30 @@ bool Buffered<T>::Send(T* item) { ...@@ -67,20 +79,30 @@ bool Buffered<T>::Send(T* item) {
empty_cond_var_.notify_one(); empty_cond_var_.notify_one();
ret = true; ret = true;
} }
send_ctr--;
destructor_cond_var_.notify_one();
return ret; return ret;
} }
template <typename T> template <typename T>
bool Buffered<T>::Receive(T* item) { bool Buffered<T>::Receive(T* item) {
bool ret = false;
// Once the channel has been closed and all data has been consumed,
// just return false. Don't even try acquiring the mutex.
if (closed_ && channel_.empty()) {
return false;
}
recv_ctr++;
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
empty_cond_var_.wait(lock, [this]() { return !channel_.empty() || closed_; }); empty_cond_var_.wait(lock, [this]() { return !channel_.empty() || closed_; });
bool ret = false;
if (!channel_.empty()) { if (!channel_.empty()) {
*item = std::move(channel_.front()); *item = std::move(channel_.front());
channel_.pop_front(); channel_.pop_front();
full_cond_var_.notify_one(); full_cond_var_.notify_one();
ret = true; ret = true;
} }
recv_ctr--;
destructor_cond_var_.notify_one();
return ret; return ret;
} }
...@@ -100,6 +122,12 @@ Buffered<T>::~Buffered() { ...@@ -100,6 +122,12 @@ Buffered<T>::~Buffered() {
closed_ = true; closed_ = true;
channel_.clear(); channel_.clear();
NotifyAllParticipants(&lock); NotifyAllParticipants(&lock);
// The destructor must wait for all readers and writers to complete their task
// The channel has been closed, so we will not accept new readers and writers
lock.lock();
destructor_cond_var_.wait(
lock, [this]() { return send_ctr == 0 && recv_ctr == 0; });
} }
template <typename T> template <typename T>
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. /* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. /* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. /* Copyright (c) 2017 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
......
...@@ -23,6 +23,13 @@ namespace paddle { ...@@ -23,6 +23,13 @@ namespace paddle {
namespace framework { namespace framework {
namespace details { namespace details {
// Four of the properties of UnBuffered Channel:
// - A send to a channel blocks temporarily until a receive from the
// channel or the channel is closed.
// - A receive from a channel blocks temporarily until a send to the
// channel or the channel is closed.
// - A send to a closed channel returns false immediately.
// - A receive from a closed channel returns false immediately.
template <typename T> template <typename T>
class UnBuffered : public paddle::framework::Channel<T> { class UnBuffered : public paddle::framework::Channel<T> {
friend Channel<T>* paddle::framework::MakeChannel<T>(size_t); friend Channel<T>* paddle::framework::MakeChannel<T>(size_t);
...@@ -45,9 +52,11 @@ class UnBuffered : public paddle::framework::Channel<T> { ...@@ -45,9 +52,11 @@ class UnBuffered : public paddle::framework::Channel<T> {
// A transaction occurs only when both are true // A transaction occurs only when both are true
std::atomic<bool> reader_found_{false}, writer_found_{false}; std::atomic<bool> reader_found_{false}, writer_found_{false};
std::condition_variable cv_channel_; std::condition_variable cv_channel_;
std::condition_variable_any cv_reader_, cv_writer_; std::condition_variable_any cv_reader_, cv_writer_, cv_destructor_;
T* item{nullptr}; T* item{nullptr};
std::atomic<bool> closed_{false}; std::atomic<bool> closed_{false};
std::atomic<unsigned> send_ctr{0};
std::atomic<unsigned> recv_ctr{0};
UnBuffered() : closed_(false) {} UnBuffered() : closed_(false) {}
...@@ -62,6 +71,7 @@ bool UnBuffered<T>::Send(T* data) { ...@@ -62,6 +71,7 @@ bool UnBuffered<T>::Send(T* data) {
if (closed_) { if (closed_) {
return ret; return ret;
} }
send_ctr++;
// Prevent other writers from entering // Prevent other writers from entering
std::unique_lock<std::recursive_mutex> writer_lock(mu_write_); std::unique_lock<std::recursive_mutex> writer_lock(mu_write_);
writer_found_ = true; writer_found_ = true;
...@@ -81,6 +91,8 @@ bool UnBuffered<T>::Send(T* data) { ...@@ -81,6 +91,8 @@ bool UnBuffered<T>::Send(T* data) {
ret = true; ret = true;
} }
writer_found_ = false; writer_found_ = false;
send_ctr--;
cv_destructor_.notify_one();
return ret; return ret;
} }
...@@ -88,6 +100,12 @@ bool UnBuffered<T>::Send(T* data) { ...@@ -88,6 +100,12 @@ bool UnBuffered<T>::Send(T* data) {
// data that was sent by a writer is read from a reader. // data that was sent by a writer is read from a reader.
template <typename T> template <typename T>
bool UnBuffered<T>::Receive(T* data) { bool UnBuffered<T>::Receive(T* data) {
bool ret = false;
// If channel is closed, we don't even want any reader to enter.
// Unlike a buffered channel, an unbuffered channel does not allow
// readers to read after closing because there is no buffer to be consumed.
if (closed_) return ret;
recv_ctr++;
// Prevent other readers from entering // Prevent other readers from entering
std::unique_lock<std::recursive_mutex> read_lock{mu_read_}; std::unique_lock<std::recursive_mutex> read_lock{mu_read_};
reader_found_ = true; reader_found_ = true;
...@@ -96,7 +114,6 @@ bool UnBuffered<T>::Receive(T* data) { ...@@ -96,7 +114,6 @@ bool UnBuffered<T>::Receive(T* data) {
cv_reader_.wait(cv_lock, cv_reader_.wait(cv_lock,
[this]() { return writer_found_ == true || closed_; }); [this]() { return writer_found_ == true || closed_; });
cv_writer_.notify_one(); cv_writer_.notify_one();
bool ret = false;
if (!closed_) { if (!closed_) {
std::unique_lock<std::mutex> lock_ch{mu_ch_}; std::unique_lock<std::mutex> lock_ch{mu_ch_};
// Reader should wait for the writer to first write its data // Reader should wait for the writer to first write its data
...@@ -110,6 +127,8 @@ bool UnBuffered<T>::Receive(T* data) { ...@@ -110,6 +127,8 @@ bool UnBuffered<T>::Receive(T* data) {
cv_channel_.notify_one(); cv_channel_.notify_one();
} }
reader_found_ = false; reader_found_ = false;
recv_ctr--;
cv_destructor_.notify_one();
return ret; return ret;
} }
...@@ -135,6 +154,9 @@ UnBuffered<T>::~UnBuffered() { ...@@ -135,6 +154,9 @@ UnBuffered<T>::~UnBuffered() {
item = nullptr; item = nullptr;
closed_ = true; closed_ = true;
NotifyAllParticipants(&lock); NotifyAllParticipants(&lock);
lock.lock();
cv_destructor_.wait(lock,
[this]() { return send_ctr == 0 && recv_ctr == 0; });
} }
// This function notifies all the readers, writers and // This function notifies all the readers, writers and
......
...@@ -125,11 +125,10 @@ OpDesc::OpDesc(const proto::OpDesc &desc, ProgramDesc *prog, BlockDesc *block) ...@@ -125,11 +125,10 @@ OpDesc::OpDesc(const proto::OpDesc &desc, ProgramDesc *prog, BlockDesc *block)
// restore attrs_ // restore attrs_
for (const proto::OpDesc::Attr &attr : desc_.attrs()) { for (const proto::OpDesc::Attr &attr : desc_.attrs()) {
std::string attr_name = attr.name(); std::string attr_name = attr.name();
// The sub_block referred to by the BLOCK attr hasn't been added
// to ProgramDesc class yet, we skip setting BLOCK attr here.
if (attr.type() != proto::AttrType::BLOCK) { if (attr.type() != proto::AttrType::BLOCK) {
attrs_[attr_name] = GetAttrValue(attr); attrs_[attr_name] = GetAttrValue(attr);
} else {
auto bid = attr.block_idx();
attrs_[attr_name] = prog->MutableBlock(bid);
} }
} }
this->block_ = block; this->block_ = block;
......
...@@ -43,11 +43,20 @@ ProgramDesc::ProgramDesc() { ...@@ -43,11 +43,20 @@ ProgramDesc::ProgramDesc() {
ProgramDesc::ProgramDesc(const ProgramDesc &o) { ProgramDesc::ProgramDesc(const ProgramDesc &o) {
desc_ = o.desc_; desc_ = o.desc_;
for (int i = 0; i < desc_.blocks_size(); ++i) { for (int i = 0; i < desc_.blocks_size(); ++i) {
auto *block = desc_.mutable_blocks(i); auto *block = desc_.mutable_blocks(i);
blocks_.emplace_back(new BlockDesc(*o.blocks_[i], block, this)); blocks_.emplace_back(new BlockDesc(*o.blocks_[i], block, this));
} }
for (auto &block : blocks_) {
for (auto *op : block->AllOps()) {
for (const auto &attr : op->Proto()->attrs()) {
if (attr.type() == proto::AttrType::BLOCK) {
size_t blk_idx = attr.block_idx();
op->SetBlockAttr(attr.name(), *this->MutableBlock(blk_idx));
}
}
}
}
} }
ProgramDesc::ProgramDesc(const proto::ProgramDesc &desc) { ProgramDesc::ProgramDesc(const proto::ProgramDesc &desc) {
...@@ -55,6 +64,16 @@ ProgramDesc::ProgramDesc(const proto::ProgramDesc &desc) { ...@@ -55,6 +64,16 @@ ProgramDesc::ProgramDesc(const proto::ProgramDesc &desc) {
for (auto &block_desc : *desc_.mutable_blocks()) { for (auto &block_desc : *desc_.mutable_blocks()) {
blocks_.emplace_back(new BlockDesc(this, &block_desc)); blocks_.emplace_back(new BlockDesc(this, &block_desc));
} }
for (auto &block : blocks_) {
for (auto *op : block->AllOps()) {
for (const auto &attr : op->Proto()->attrs()) {
if (attr.type() == proto::AttrType::BLOCK) {
size_t blk_idx = attr.block_idx();
op->SetBlockAttr(attr.name(), *this->MutableBlock(blk_idx));
}
}
}
}
} }
ProgramDesc::ProgramDesc(const std::string &binary_str) { ProgramDesc::ProgramDesc(const std::string &binary_str) {
......
...@@ -49,11 +49,28 @@ bool IsTarget(const proto::OpDesc& op_desc) { ...@@ -49,11 +49,28 @@ bool IsTarget(const proto::OpDesc& op_desc) {
return false; return false;
} }
void prune_impl(const proto::ProgramDesc& input, proto::ProgramDesc* output, int GetSubBlockIndex(const proto::OpDesc& op_desc) {
int block_id) { for (auto& attr : op_desc.attrs()) {
// TODO(tonyyang-svail): if (attr.type() == proto::AttrType::BLOCK) {
// - will change to use multiple blocks for RNN op and Cond Op PADDLE_ENFORCE(attr.has_block_idx());
return attr.block_idx();
}
}
return -1;
}
bool HasSubBlock(const proto::OpDesc& op_desc) {
return GetSubBlockIndex(op_desc) > 0;
}
// block_id is the idx of the current block in the input desc
// parent_block_id is the idx of the parent of the current block
// in the output desc, -1 means the current block is global block
// dependent_vars is passed recursively from the parent block to
// the child block to help pruning
void prune_impl(const proto::ProgramDesc& input, proto::ProgramDesc* output,
int block_id, int parent_block_id,
std::set<std::string>& dependent_vars) {
auto& block = input.blocks(block_id); auto& block = input.blocks(block_id);
auto& ops = block.ops(); auto& ops = block.ops();
...@@ -72,11 +89,9 @@ void prune_impl(const proto::ProgramDesc& input, proto::ProgramDesc* output, ...@@ -72,11 +89,9 @@ void prune_impl(const proto::ProgramDesc& input, proto::ProgramDesc* output,
expect_fetch = (op_desc.type() == kFetchOpType); expect_fetch = (op_desc.type() == kFetchOpType);
} }
std::set<std::string> dependent_vars;
std::vector<bool> should_run; std::vector<bool> should_run;
for (auto op_iter = ops.rbegin(); op_iter != ops.rend(); ++op_iter) { for (auto op_iter = ops.rbegin(); op_iter != ops.rend(); ++op_iter) {
auto& op_desc = *op_iter; auto& op_desc = *op_iter;
if (IsTarget(op_desc) || HasDependentVar(op_desc, dependent_vars)) { if (IsTarget(op_desc) || HasDependentVar(op_desc, dependent_vars)) {
// insert its input to the dependency graph // insert its input to the dependency graph
for (auto& var : op_desc.inputs()) { for (auto& var : op_desc.inputs()) {
...@@ -84,7 +99,6 @@ void prune_impl(const proto::ProgramDesc& input, proto::ProgramDesc* output, ...@@ -84,7 +99,6 @@ void prune_impl(const proto::ProgramDesc& input, proto::ProgramDesc* output,
dependent_vars.insert(argu); dependent_vars.insert(argu);
} }
} }
should_run.push_back(true); should_run.push_back(true);
} else { } else {
should_run.push_back(false); should_run.push_back(false);
...@@ -95,45 +109,81 @@ void prune_impl(const proto::ProgramDesc& input, proto::ProgramDesc* output, ...@@ -95,45 +109,81 @@ void prune_impl(const proto::ProgramDesc& input, proto::ProgramDesc* output,
// we reverse the should_run vector // we reverse the should_run vector
std::reverse(should_run.begin(), should_run.end()); std::reverse(should_run.begin(), should_run.end());
*output = input; // copy the current block from input to output
auto* op_field = output->mutable_blocks(block_id)->mutable_ops(); auto* block_field = output->mutable_blocks();
*block_field->Add() = input.blocks(block_id);
int output_block_id = output->blocks_size() - 1;
auto* output_block = output->mutable_blocks(output_block_id);
output_block->set_idx(output_block_id);
output_block->set_parent_idx(parent_block_id);
auto* op_field = output_block->mutable_ops();
op_field->Clear(); op_field->Clear();
for (size_t i = 0; i < should_run.size(); ++i) { for (size_t i = 0; i < should_run.size(); ++i) {
if (should_run[i]) { if (should_run[i]) {
*op_field->Add() = input.blocks(block_id).ops(i); auto* op = op_field->Add();
*op = input.blocks(block_id).ops(i);
if (HasSubBlock(*op)) {
// create sub_block_dependent_vars here to help prune the sub block
std::set<std::string> sub_block_dependent_vars;
for (auto& var : op->inputs()) {
for (auto& argu : var.arguments()) {
sub_block_dependent_vars.insert(argu);
}
}
for (auto& var : op->outputs()) {
for (auto& argu : var.arguments()) {
sub_block_dependent_vars.insert(argu);
}
}
// GetSubBlockIndex(*op) is the idx of the sub_block in the input desc
// output_block_id is the idx of the current block in the output desc
prune_impl(input, output, GetSubBlockIndex(*op), output_block_id,
sub_block_dependent_vars);
}
} }
} }
// remove the VarDescs in BlockDesc that are not referenced in // remove the VarDescs in BlockDesc that are not referenced in
// the pruned OpDescs // the pruned OpDescs
std::unordered_map<std::string, proto::VarDesc> var_map; std::unordered_map<std::string, proto::VarDesc> var_map;
auto* var_field = output->mutable_blocks(block_id)->mutable_vars(); auto* var_field = output->mutable_blocks(output_block_id)->mutable_vars();
for (const auto& var : *var_field) { for (const auto& var : *var_field) {
var_map[var.name()] = var; var_map[var.name()] = var;
} }
var_field->Clear(); std::set<std::string> var_names;
for (const auto& op : *op_field) { for (const auto& op : *op_field) {
// add VarDescs of all input arguments for each OpDesc
auto& input_field = op.inputs(); auto& input_field = op.inputs();
for (auto& input_var : input_field) { for (auto& input_var : input_field) {
for (auto& arg : input_var.arguments()) { for (auto& arg : input_var.arguments()) {
*var_field->Add() = var_map[arg]; if (var_map.count(arg) != 0) {
var_names.insert(arg);
}
} }
} }
// add VarDescs of all output arguments for each OpDesc
auto& output_field = op.outputs(); auto& output_field = op.outputs();
for (auto& output_var : output_field) { for (auto& output_var : output_field) {
for (auto& arg : output_var.arguments()) { for (auto& arg : output_var.arguments()) {
*var_field->Add() = var_map[arg]; if (var_map.count(arg) != 0) {
var_names.insert(arg);
}
} }
} }
} }
var_field->Clear();
for (const auto& name : var_names) {
*var_field->Add() = var_map[name];
}
} }
// TODO(fengjiayi): Prune() could be inplaced to avoid unnecessary copies // TODO(fengjiayi): Prune() could be inplaced to avoid unnecessary copies
void Prune(const proto::ProgramDesc& input, proto::ProgramDesc* output) { void Prune(const proto::ProgramDesc& input, proto::ProgramDesc* output) {
prune_impl(input, output, 0); std::set<std::string> dependent_vars;
output->clear_blocks();
prune_impl(input, output, 0, -1, dependent_vars);
} }
void inference_optimize_impl(const proto::ProgramDesc& input, void inference_optimize_impl(const proto::ProgramDesc& input,
......
...@@ -21,6 +21,17 @@ limitations under the License. */ ...@@ -21,6 +21,17 @@ limitations under the License. */
namespace paddle { namespace paddle {
namespace inference { namespace inference {
void ReadBinaryFile(const std::string& filename, std::string& contents) {
VLOG(3) << "loading model from " << filename;
std::ifstream inputfs(filename, std::ios::in | std::ios::binary);
inputfs.seekg(0, std::ios::end);
contents.clear();
contents.resize(inputfs.tellg());
inputfs.seekg(0, std::ios::beg);
inputfs.read(&contents[0], contents.size());
inputfs.close();
}
bool IsParameter(const framework::VarDesc* var, bool IsParameter(const framework::VarDesc* var,
const framework::ProgramDesc& main_program) { const framework::ProgramDesc& main_program) {
if (var->Persistable()) { if (var->Persistable()) {
...@@ -44,12 +55,15 @@ bool IsParameter(const framework::VarDesc* var, ...@@ -44,12 +55,15 @@ bool IsParameter(const framework::VarDesc* var,
void LoadPersistables(framework::Executor& executor, void LoadPersistables(framework::Executor& executor,
framework::Scope& scope, framework::Scope& scope,
const framework::ProgramDesc& main_program,
const std::string& dirname, const std::string& dirname,
const framework::ProgramDesc& main_program) { const std::string& param_filename) {
const framework::BlockDesc& global_block = main_program.Block(0); const framework::BlockDesc& global_block = main_program.Block(0);
framework::ProgramDesc* load_program = new framework::ProgramDesc(); framework::ProgramDesc* load_program = new framework::ProgramDesc();
framework::BlockDesc* load_block = load_program->MutableBlock(0); framework::BlockDesc* load_block = load_program->MutableBlock(0);
std::vector<std::string> paramlist;
for (auto* var : global_block.AllVars()) { for (auto* var : global_block.AllVars()) {
if (IsParameter(var, main_program)) { if (IsParameter(var, main_program)) {
VLOG(3) << "parameter's name: " << var->Name(); VLOG(3) << "parameter's name: " << var->Name();
...@@ -61,15 +75,33 @@ void LoadPersistables(framework::Executor& executor, ...@@ -61,15 +75,33 @@ void LoadPersistables(framework::Executor& executor,
new_var->SetLoDLevel(var->GetLoDLevel()); new_var->SetLoDLevel(var->GetLoDLevel());
new_var->SetPersistable(true); new_var->SetPersistable(true);
// append_op if (!param_filename.empty()) {
framework::OpDesc* op = load_block->AppendOp(); paramlist.push_back(new_var->Name());
op->SetType("load"); } else {
op->SetOutput("Out", {new_var->Name()}); // append_op
op->SetAttr("file_path", {dirname + "/" + new_var->Name()}); framework::OpDesc* op = load_block->AppendOp();
op->CheckAttrs(); op->SetType("load");
op->SetOutput("Out", {new_var->Name()});
op->SetAttr("file_path", {dirname + "/" + new_var->Name()});
op->CheckAttrs();
}
} }
} }
if (!param_filename.empty()) {
// sort paramlist to have consistent ordering
std::sort(paramlist.begin(), paramlist.end());
// append just the load_combine op
framework::OpDesc* op = load_block->AppendOp();
op->SetType("load_combine");
op->SetOutput("Out", paramlist);
op->SetAttr("file_path", {param_filename});
op->CheckAttrs();
}
executor.Run(*load_program, &scope, 0, true, true); executor.Run(*load_program, &scope, 0, true, true);
VLOG(3) << "Ran loading successfully";
delete load_program; delete load_program;
} }
...@@ -77,20 +109,29 @@ std::unique_ptr<framework::ProgramDesc> Load(framework::Executor& executor, ...@@ -77,20 +109,29 @@ std::unique_ptr<framework::ProgramDesc> Load(framework::Executor& executor,
framework::Scope& scope, framework::Scope& scope,
const std::string& dirname) { const std::string& dirname) {
std::string model_filename = dirname + "/__model__"; std::string model_filename = dirname + "/__model__";
LOG(INFO) << "loading model from " << model_filename;
std::ifstream inputfs(model_filename, std::ios::in | std::ios::binary);
std::string program_desc_str; std::string program_desc_str;
inputfs.seekg(0, std::ios::end); ReadBinaryFile(model_filename, program_desc_str);
program_desc_str.resize(inputfs.tellg());
inputfs.seekg(0, std::ios::beg); std::unique_ptr<framework::ProgramDesc> main_program(
LOG(INFO) << "program_desc_str's size: " << program_desc_str.size(); new framework::ProgramDesc(program_desc_str));
inputfs.read(&program_desc_str[0], program_desc_str.size());
inputfs.close(); LoadPersistables(executor, scope, *main_program, dirname, "");
return main_program;
}
std::unique_ptr<framework::ProgramDesc> Load(
framework::Executor& executor,
framework::Scope& scope,
const std::string& prog_filename,
const std::string& param_filename) {
std::string model_filename = prog_filename;
std::string program_desc_str;
ReadBinaryFile(model_filename, program_desc_str);
std::unique_ptr<framework::ProgramDesc> main_program( std::unique_ptr<framework::ProgramDesc> main_program(
new framework::ProgramDesc(program_desc_str)); new framework::ProgramDesc(program_desc_str));
LoadPersistables(executor, scope, dirname, *main_program); LoadPersistables(executor, scope, *main_program, "", param_filename);
return main_program; return main_program;
} }
......
...@@ -26,12 +26,18 @@ namespace inference { ...@@ -26,12 +26,18 @@ namespace inference {
void LoadPersistables(framework::Executor& executor, void LoadPersistables(framework::Executor& executor,
framework::Scope& scope, framework::Scope& scope,
const framework::ProgramDesc& main_program,
const std::string& dirname, const std::string& dirname,
const framework::ProgramDesc& main_program); const std::string& param_filename);
std::unique_ptr<framework::ProgramDesc> Load(framework::Executor& executor, std::unique_ptr<framework::ProgramDesc> Load(framework::Executor& executor,
framework::Scope& scope, framework::Scope& scope,
const std::string& dirname); const std::string& dirname);
std::unique_ptr<framework::ProgramDesc> Load(framework::Executor& executor,
framework::Scope& scope,
const std::string& prog_filename,
const std::string& param_filename);
} // namespace inference } // namespace inference
} // namespace paddle } // namespace paddle
...@@ -27,3 +27,4 @@ endfunction(inference_test) ...@@ -27,3 +27,4 @@ endfunction(inference_test)
inference_test(recognize_digits ARGS mlp) inference_test(recognize_digits ARGS mlp)
inference_test(image_classification ARGS vgg resnet) inference_test(image_classification ARGS vgg resnet)
inference_test(label_semantic_roles) inference_test(label_semantic_roles)
inference_test(rnn_encoder_decoder)
...@@ -67,17 +67,28 @@ void CheckError(paddle::framework::LoDTensor& output1, ...@@ -67,17 +67,28 @@ void CheckError(paddle::framework::LoDTensor& output1,
EXPECT_EQ(count, 0) << "There are " << count << " different elements."; EXPECT_EQ(count, 0) << "There are " << count << " different elements.";
} }
template <typename Place, typename T> template <typename Place, typename T, bool IsCombined = false>
void TestInference(const std::string& dirname, void TestInference(const std::string& dirname,
const std::vector<paddle::framework::LoDTensor*>& cpu_feeds, const std::vector<paddle::framework::LoDTensor*>& cpu_feeds,
std::vector<paddle::framework::LoDTensor*>& cpu_fetchs) { std::vector<paddle::framework::LoDTensor*>& cpu_fetchs) {
// 1. Define place, executor and scope // 1. Define place, executor, scope and inference_program
auto place = Place(); auto place = Place();
auto executor = paddle::framework::Executor(place); auto executor = paddle::framework::Executor(place);
auto* scope = new paddle::framework::Scope(); auto* scope = new paddle::framework::Scope();
std::unique_ptr<paddle::framework::ProgramDesc> inference_program;
// 2. Initialize the inference_program and load all parameters from file // 2. Initialize the inference_program and load all parameters from file
auto inference_program = paddle::inference::Load(executor, *scope, dirname); if (IsCombined) {
// Hard-coding the names for combined params case
std::string prog_filename = "__model_combined__";
std::string param_filename = "__params_combined__";
inference_program = paddle::inference::Load(executor,
*scope,
dirname + "/" + prog_filename,
dirname + "/" + param_filename);
} else {
inference_program = paddle::inference::Load(executor, *scope, dirname);
}
// 3. Get the feed_target_names and fetch_target_names // 3. Get the feed_target_names and fetch_target_names
const std::vector<std::string>& feed_target_names = const std::vector<std::string>& feed_target_names =
......
...@@ -59,3 +59,45 @@ TEST(inference, recognize_digits) { ...@@ -59,3 +59,45 @@ TEST(inference, recognize_digits) {
CheckError<float>(output1, output2); CheckError<float>(output1, output2);
#endif #endif
} }
TEST(inference, recognize_digits_combine) {
if (FLAGS_dirname.empty()) {
LOG(FATAL) << "Usage: ./example --dirname=path/to/your/model";
}
LOG(INFO) << "FLAGS_dirname: " << FLAGS_dirname << std::endl;
std::string dirname = FLAGS_dirname;
// 0. Call `paddle::framework::InitDevices()` initialize all the devices
// In unittests, this is done in paddle/testing/paddle_gtest_main.cc
paddle::framework::LoDTensor input;
// Use normilized image pixels as input data,
// which should be in the range [-1.0, 1.0].
SetupTensor<float>(
input, {1, 28, 28}, static_cast<float>(-1), static_cast<float>(1));
std::vector<paddle::framework::LoDTensor*> cpu_feeds;
cpu_feeds.push_back(&input);
paddle::framework::LoDTensor output1;
std::vector<paddle::framework::LoDTensor*> cpu_fetchs1;
cpu_fetchs1.push_back(&output1);
// Run inference on CPU
TestInference<paddle::platform::CPUPlace, float, true>(
dirname, cpu_feeds, cpu_fetchs1);
LOG(INFO) << output1.dims();
#ifdef PADDLE_WITH_CUDA
paddle::framework::LoDTensor output2;
std::vector<paddle::framework::LoDTensor*> cpu_fetchs2;
cpu_fetchs2.push_back(&output2);
// Run inference on CUDA GPU
TestInference<paddle::platform::CUDAPlace, float, true>(
dirname, cpu_feeds, cpu_fetchs2);
LOG(INFO) << output2.dims();
CheckError<float>(output1, output2);
#endif
}
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <gtest/gtest.h>
#include "gflags/gflags.h"
#include "test_helper.h"
DEFINE_string(dirname, "", "Directory of the inference model.");
TEST(inference, rnn_encoder_decoder) {
if (FLAGS_dirname.empty()) {
LOG(FATAL) << "Usage: ./example --dirname=path/to/your/model";
}
LOG(INFO) << "FLAGS_dirname: " << FLAGS_dirname << std::endl;
std::string dirname = FLAGS_dirname;
// 0. Call `paddle::framework::InitDevices()` initialize all the devices
// In unittests, this is done in paddle/testing/paddle_gtest_main.cc
paddle::framework::LoDTensor word_data, trg_word;
paddle::framework::LoD lod{{0, 4, 10}};
SetupLoDTensor(
word_data, lod, static_cast<int64_t>(0), static_cast<int64_t>(1));
SetupLoDTensor(
trg_word, lod, static_cast<int64_t>(0), static_cast<int64_t>(1));
std::vector<paddle::framework::LoDTensor*> cpu_feeds;
cpu_feeds.push_back(&word_data);
cpu_feeds.push_back(&trg_word);
paddle::framework::LoDTensor output1;
std::vector<paddle::framework::LoDTensor*> cpu_fetchs1;
cpu_fetchs1.push_back(&output1);
// Run inference on CPU
TestInference<paddle::platform::CPUPlace, float>(
dirname, cpu_feeds, cpu_fetchs1);
LOG(INFO) << output1.lod();
LOG(INFO) << output1.dims();
#ifdef PADDLE_WITH_CUDA
paddle::framework::LoDTensor output2;
std::vector<paddle::framework::LoDTensor*> cpu_fetchs2;
cpu_fetchs2.push_back(&output2);
// Run inference on CUDA GPU
TestInference<paddle::platform::CUDAPlace, float>(
dirname, cpu_feeds, cpu_fetchs2);
LOG(INFO) << output2.lod();
LOG(INFO) << output2.dims();
CheckError<float>(output1, output2);
#endif
}
...@@ -62,7 +62,7 @@ class CompareOpKernel ...@@ -62,7 +62,7 @@ class CompareOpKernel
z->mutable_data<T>(context.GetPlace()); z->mutable_data<T>(context.GetPlace());
int axis = context.Attr<int>("axis"); int axis = context.Attr<int>("axis");
ElementwiseComputeEx<Functor, DeviceContext, T, bool>(context, x, y, axis, ElementwiseComputeEx<Functor, DeviceContext, T, bool>(context, x, y, axis,
z); Functor(), z);
} }
}; };
......
...@@ -35,7 +35,8 @@ class ElementwiseAddKernel : public framework::OpKernel<T> { ...@@ -35,7 +35,8 @@ class ElementwiseAddKernel : public framework::OpKernel<T> {
auto* z = ctx.Output<Tensor>("Out"); auto* z = ctx.Output<Tensor>("Out");
z->mutable_data<T>(ctx.GetPlace()); z->mutable_data<T>(ctx.GetPlace());
int axis = ctx.Attr<int>("axis"); int axis = ctx.Attr<int>("axis");
ElementwiseComputeEx<AddFunctor<T>, DeviceContext, T>(ctx, x, y, axis, z); ElementwiseComputeEx<AddFunctor<T>, DeviceContext, T>(ctx, x, y, axis,
AddFunctor<T>(), z);
} }
}; };
......
...@@ -35,7 +35,8 @@ class ElementwiseDivKernel : public framework::OpKernel<T> { ...@@ -35,7 +35,8 @@ class ElementwiseDivKernel : public framework::OpKernel<T> {
auto* z = ctx.Output<Tensor>("Out"); auto* z = ctx.Output<Tensor>("Out");
z->mutable_data<T>(ctx.GetPlace()); z->mutable_data<T>(ctx.GetPlace());
int axis = ctx.Attr<int>("axis"); int axis = ctx.Attr<int>("axis");
ElementwiseComputeEx<DivFunctor<T>, DeviceContext, T>(ctx, x, y, axis, z); ElementwiseComputeEx<DivFunctor<T>, DeviceContext, T>(ctx, x, y, axis,
DivFunctor<T>(), z);
} }
}; };
......
...@@ -35,7 +35,8 @@ class ElementwiseMaxKernel : public framework::OpKernel<T> { ...@@ -35,7 +35,8 @@ class ElementwiseMaxKernel : public framework::OpKernel<T> {
auto* z = ctx.Output<Tensor>("Out"); auto* z = ctx.Output<Tensor>("Out");
z->mutable_data<T>(ctx.GetPlace()); z->mutable_data<T>(ctx.GetPlace());
int axis = ctx.Attr<int>("axis"); int axis = ctx.Attr<int>("axis");
ElementwiseComputeEx<MaxFunctor<T>, DeviceContext, T>(ctx, x, y, axis, z); ElementwiseComputeEx<MaxFunctor<T>, DeviceContext, T>(ctx, x, y, axis,
MaxFunctor<T>(), z);
} }
}; };
......
...@@ -35,7 +35,8 @@ class ElementwiseMinKernel : public framework::OpKernel<T> { ...@@ -35,7 +35,8 @@ class ElementwiseMinKernel : public framework::OpKernel<T> {
auto* z = ctx.Output<Tensor>("Out"); auto* z = ctx.Output<Tensor>("Out");
z->mutable_data<T>(ctx.GetPlace()); z->mutable_data<T>(ctx.GetPlace());
int axis = ctx.Attr<int>("axis"); int axis = ctx.Attr<int>("axis");
ElementwiseComputeEx<MinFunctor<T>, DeviceContext, T>(ctx, x, y, axis, z); ElementwiseComputeEx<MinFunctor<T>, DeviceContext, T>(ctx, x, y, axis,
MinFunctor<T>(), z);
} }
}; };
......
...@@ -34,7 +34,8 @@ class ElementwiseMulKernel : public framework::OpKernel<T> { ...@@ -34,7 +34,8 @@ class ElementwiseMulKernel : public framework::OpKernel<T> {
auto* z = ctx.Output<Tensor>("Out"); auto* z = ctx.Output<Tensor>("Out");
z->mutable_data<T>(ctx.GetPlace()); z->mutable_data<T>(ctx.GetPlace());
int axis = ctx.Attr<int>("axis"); int axis = ctx.Attr<int>("axis");
ElementwiseComputeEx<MulFunctor<T>, DeviceContext, T>(ctx, x, y, axis, z); ElementwiseComputeEx<MulFunctor<T>, DeviceContext, T>(ctx, x, y, axis,
MulFunctor<T>(), z);
} }
}; };
......
...@@ -365,10 +365,10 @@ template <typename Functor, typename DeviceContext, typename T, ...@@ -365,10 +365,10 @@ template <typename Functor, typename DeviceContext, typename T,
typename OutType = T> typename OutType = T>
void ElementwiseComputeEx(const framework::ExecutionContext& ctx, void ElementwiseComputeEx(const framework::ExecutionContext& ctx,
const framework::Tensor* x, const framework::Tensor* x,
const framework::Tensor* y, int axis, const framework::Tensor* y, int axis, Functor func,
framework::Tensor* z) { framework::Tensor* z) {
TransformFunctor<Functor, T, DeviceContext, OutType> functor( TransformFunctor<Functor, T, DeviceContext, OutType> functor(
x, y, z, ctx.template device_context<DeviceContext>(), Functor()); x, y, z, ctx.template device_context<DeviceContext>(), func);
auto x_dims = x->dims(); auto x_dims = x->dims();
auto y_dims = y->dims(); auto y_dims = y->dims();
......
...@@ -36,7 +36,8 @@ class ElementwisePowKernel : public framework::OpKernel<T> { ...@@ -36,7 +36,8 @@ class ElementwisePowKernel : public framework::OpKernel<T> {
auto* z = ctx.Output<Tensor>("Out"); auto* z = ctx.Output<Tensor>("Out");
z->mutable_data<T>(ctx.GetPlace()); z->mutable_data<T>(ctx.GetPlace());
int axis = ctx.Attr<int>("axis"); int axis = ctx.Attr<int>("axis");
ElementwiseComputeEx<PowFunctor<T>, DeviceContext, T>(ctx, x, y, axis, z); ElementwiseComputeEx<PowFunctor<T>, DeviceContext, T>(ctx, x, y, axis,
PowFunctor<T>(), z);
} }
}; };
......
...@@ -34,7 +34,8 @@ class ElementwiseSubKernel : public framework::OpKernel<T> { ...@@ -34,7 +34,8 @@ class ElementwiseSubKernel : public framework::OpKernel<T> {
auto* z = ctx.Output<Tensor>("Out"); auto* z = ctx.Output<Tensor>("Out");
z->mutable_data<T>(ctx.GetPlace()); z->mutable_data<T>(ctx.GetPlace());
int axis = ctx.Attr<int>("axis"); int axis = ctx.Attr<int>("axis");
ElementwiseComputeEx<SubFunctor<T>, DeviceContext, T>(ctx, x, y, axis, z); ElementwiseComputeEx<SubFunctor<T>, DeviceContext, T>(ctx, x, y, axis,
SubFunctor<T>(), z);
} }
}; };
......
...@@ -21,13 +21,6 @@ using Tensor = framework::Tensor; ...@@ -21,13 +21,6 @@ using Tensor = framework::Tensor;
using LoDTensor = framework::LoDTensor; using LoDTensor = framework::LoDTensor;
using DataLayout = framework::DataLayout; using DataLayout = framework::DataLayout;
template <typename T>
using EigenMatrixMapRowMajor = Eigen::Map<
Eigen::Matrix<T, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>>;
template <typename T>
using ConstEigenMatrixMapRowMajor = Eigen::Map<
const Eigen::Matrix<T, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>>;
class LayerNormOp : public framework::OperatorWithKernel { class LayerNormOp : public framework::OperatorWithKernel {
public: public:
using framework::OperatorWithKernel::OperatorWithKernel; using framework::OperatorWithKernel::OperatorWithKernel;
...@@ -108,7 +101,6 @@ class LayerNormOpMaker : public framework::OpProtoAndCheckerMaker { ...@@ -108,7 +101,6 @@ class LayerNormOpMaker : public framework::OpProtoAndCheckerMaker {
AddComment(R"DOC( AddComment(R"DOC(
Layer Normalization. Layer Normalization.
Layer Norm has been implemented as discussed in the paper: Layer Norm has been implemented as discussed in the paper:
https://arxiv.org/abs/1607.06450 https://arxiv.org/abs/1607.06450
... ...
...@@ -116,75 +108,6 @@ https://arxiv.org/abs/1607.06450 ...@@ -116,75 +108,6 @@ https://arxiv.org/abs/1607.06450
} }
}; };
template <typename T>
class LayerNormKernel<platform::CPUDeviceContext, T>
: public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &ctx) const override {
const float epsilon = ctx.Attr<float>("epsilon");
const auto *scale = ctx.Input<Tensor>("Scale");
const auto *bias = ctx.Input<Tensor>("Bias");
const auto *x = ctx.Input<Tensor>("X");
const auto &x_dims = x->dims();
const auto begin_norm_axis = ctx.Attr<int>("begin_norm_axis");
auto *output = ctx.Output<Tensor>("Y");
auto *mean = ctx.Output<Tensor>("Mean");
auto *var = ctx.Output<Tensor>("Variance");
output->mutable_data<T>(ctx.GetPlace());
mean->mutable_data<T>(ctx.GetPlace());
var->mutable_data<T>(ctx.GetPlace());
auto matrix_dim = framework::flatten_to_2d(x_dims, begin_norm_axis);
int left = static_cast<int>(matrix_dim[0]);
int right = static_cast<int>(matrix_dim[1]);
auto input_map = ConstEigenMatrixMapRowMajor<T>(x->data<T>(), left, right);
auto mean_map = EigenMatrixMapRowMajor<T>(mean->data<T>(), left, 1);
auto var_map = EigenMatrixMapRowMajor<T>(var->data<T>(), left, 1);
auto output_map = EigenMatrixMapRowMajor<T>(output->data<T>(), left, right);
auto squre = [](T ele) { return ele * ele; };
auto add_epslion = [epsilon](T ele) { return ele + epsilon; };
mean_map = input_map.rowwise().mean();
var_map = (input_map - mean_map.replicate(1, right))
.unaryExpr(squre)
.rowwise()
.mean()
.unaryExpr(add_epslion);
auto inv_std_func = [](T ele) { return std::sqrt(1 / ele); };
// TODO(zcd): Some thinking about output_map, is it appropriate that
// `output_map` and `input_map` point to the same memory.
auto inv_std = var_map.unaryExpr(inv_std_func);
if (scale && bias) {
auto scale_map =
ConstEigenMatrixMapRowMajor<T>(scale->data<T>(), 1, right);
auto bias_map = ConstEigenMatrixMapRowMajor<T>(bias->data<T>(), 1, right);
output_map = (input_map - mean_map.replicate(1, right))
.cwiseProduct(inv_std.replicate(1, right))
.cwiseProduct(scale_map.replicate(left, 1)) +
bias_map.replicate(left, 1);
} else if (scale) {
auto scale_map =
ConstEigenMatrixMapRowMajor<T>(scale->data<T>(), 1, right);
output_map = (input_map - mean_map.replicate(1, right))
.cwiseProduct(inv_std.replicate(1, right))
.cwiseProduct(scale_map.replicate(left, 1));
} else if (bias) {
auto bias_map = ConstEigenMatrixMapRowMajor<T>(bias->data<T>(), 1, right);
output_map = (input_map - mean_map.replicate(1, right))
.cwiseProduct(inv_std.replicate(1, right)) +
bias_map.replicate(left, 1);
} else {
output_map = (input_map - mean_map.replicate(1, right))
.cwiseProduct(inv_std.replicate(1, right));
}
}
};
class LayerNormGradOp : public framework::OperatorWithKernel { class LayerNormGradOp : public framework::OperatorWithKernel {
public: public:
using framework::OperatorWithKernel::OperatorWithKernel; using framework::OperatorWithKernel::OperatorWithKernel;
...@@ -237,125 +160,6 @@ class LayerNormGradOp : public framework::OperatorWithKernel { ...@@ -237,125 +160,6 @@ class LayerNormGradOp : public framework::OperatorWithKernel {
} }
}; };
template <typename T>
class LayerNormGradKernel<platform::CPUDeviceContext, T>
: public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &ctx) const override {
const auto *x = ctx.Input<Tensor>("X");
const auto *mean = ctx.Input<Tensor>("Mean");
const auto *var = ctx.Input<Tensor>("Variance");
const auto *scale = ctx.Input<Tensor>("Scale");
const auto *d_y = ctx.Input<Tensor>(framework::GradVarName("Y"));
const auto &x_dims = x->dims();
const auto begin_norm_axis = ctx.Attr<int>("begin_norm_axis");
auto matrix_dim = framework::flatten_to_2d(x_dims, begin_norm_axis);
int left = static_cast<int>(matrix_dim[0]);
int right = static_cast<int>(matrix_dim[1]);
// init output
auto *d_x = ctx.Output<Tensor>(framework::GradVarName("X"));
auto *d_scale = ctx.Output<Tensor>(framework::GradVarName("Scale"));
auto *d_bias = ctx.Output<Tensor>(framework::GradVarName("Bias"));
auto x_map = ConstEigenMatrixMapRowMajor<T>(x->data<T>(), left, right);
auto d_y_map = ConstEigenMatrixMapRowMajor<T>(d_y->data<T>(), left, right);
auto mean_map = ConstEigenMatrixMapRowMajor<T>(mean->data<T>(), left, 1);
auto var_map = ConstEigenMatrixMapRowMajor<T>(var->data<T>(), left, 1);
if (d_bias) {
d_bias->mutable_data<T>(ctx.GetPlace());
auto d_bias_map = EigenMatrixMapRowMajor<T>(d_bias->data<T>(), 1, right);
d_bias_map = d_y_map.colwise().sum();
}
if (d_scale) {
d_scale->mutable_data<T>(ctx.GetPlace());
auto d_scale_map =
EigenMatrixMapRowMajor<T>(d_scale->data<T>(), 1, right);
auto inv_std_func = [](T ele) { return std::sqrt(1 / ele); };
// There are two equation to compute d_scale. One uses "Y" and the other
// does not use "Y"
d_scale_map =
((x_map - mean_map.replicate(1, right))
.cwiseProduct(
var_map.unaryExpr(inv_std_func).replicate(1, right))
.cwiseProduct(d_y_map))
.colwise()
.sum();
}
if (d_x) {
d_x->mutable_data<T>(ctx.GetPlace());
auto d_x_map = EigenMatrixMapRowMajor<T>(d_x->data<T>(), left, right);
auto triple_product_func = [](T ele) { return ele * ele * ele; };
auto inv_std_func = [](T ele) { return std::sqrt(1 / ele); };
// TODO(zcd): these code can be refined
if (d_scale) {
auto scale_map =
ConstEigenMatrixMapRowMajor<T>(scale->data<T>(), 1, right);
// dy_dx
auto dx_end = var_map.unaryExpr(inv_std_func)
.replicate(1, right)
.cwiseProduct(d_y_map)
.cwiseProduct(scale_map.replicate(left, 1));
// dy_dmean_dx
auto dx_mean = (T(-1.0) / right) *
var_map.unaryExpr(inv_std_func)
.replicate(1, right)
.cwiseProduct(d_y_map)
.cwiseProduct(scale_map.replicate(left, 1))
.rowwise()
.sum()
.replicate(1, right);
// dy_var_dx
auto dvar_end_part = (x_map - mean_map.replicate(1, right))
.cwiseProduct(scale_map.replicate(left, 1))
.cwiseProduct(d_y_map)
.rowwise()
.sum();
auto dvar_end = var_map.unaryExpr(inv_std_func)
.unaryExpr(triple_product_func)
.cwiseProduct(dvar_end_part)
.replicate(1, right);
auto dx_var =
(T(-1.0) / right) *
(x_map - mean_map.replicate(1, right)).cwiseProduct(dvar_end);
d_x_map = dx_end + dx_mean + dx_var;
} else {
// dy_dx
auto dx_end = var_map.unaryExpr(inv_std_func)
.replicate(1, right)
.cwiseProduct(d_y_map);
// dy_dmean_dx
auto dx_mean = (T(-1.0) / right) *
var_map.unaryExpr(inv_std_func)
.replicate(1, right)
.cwiseProduct(d_y_map)
.rowwise()
.sum()
.replicate(1, right);
// dy_var_dx
auto dvar_end_part = (x_map - mean_map.replicate(1, right))
.cwiseProduct(d_y_map)
.rowwise()
.sum();
auto dvar_end = var_map.unaryExpr(inv_std_func)
.unaryExpr(triple_product_func)
.cwiseProduct(dvar_end_part)
.replicate(1, right);
auto dx_var =
(T(-1.0) / right) *
(x_map - mean_map.replicate(1, right)).cwiseProduct(dvar_end);
d_x_map = dx_end + dx_mean + dx_var;
}
}
}
};
} // namespace operators } // namespace operators
} // namespace paddle } // namespace paddle
...@@ -363,8 +167,9 @@ namespace ops = paddle::operators; ...@@ -363,8 +167,9 @@ namespace ops = paddle::operators;
REGISTER_OP(layer_norm, ops::LayerNormOp, ops::LayerNormOpMaker, REGISTER_OP(layer_norm, ops::LayerNormOp, ops::LayerNormOpMaker,
layer_norm_grad, ops::LayerNormGradOp); layer_norm_grad, ops::LayerNormGradOp);
REGISTER_OP_CPU_KERNEL( REGISTER_OP_CPU_KERNEL(
layer_norm, layer_norm, ops::LayerNormKernel<paddle::platform::CPUDeviceContext, float>,
ops::LayerNormKernel<paddle::platform::CPUDeviceContext, float>); ops::LayerNormKernel<paddle::platform::CPUDeviceContext, double>);
REGISTER_OP_CPU_KERNEL( REGISTER_OP_CPU_KERNEL(
layer_norm_grad, layer_norm_grad,
ops::LayerNormGradKernel<paddle::platform::CPUDeviceContext, float>); ops::LayerNormGradKernel<paddle::platform::CPUDeviceContext, float>,
ops::LayerNormGradKernel<paddle::platform::CPUDeviceContext, double>);
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/operators/layer_norm_op.h"
namespace ops = paddle::operators;
REGISTER_OP_CUDA_KERNEL(
layer_norm,
ops::LayerNormKernel<paddle::platform::CUDADeviceContext, float>,
ops::LayerNormKernel<paddle::platform::CUDADeviceContext, double>);
REGISTER_OP_CUDA_KERNEL(
layer_norm_grad,
ops::LayerNormGradKernel<paddle::platform::CUDADeviceContext, float>,
ops::LayerNormGradKernel<paddle::platform::CUDADeviceContext, double>);
...@@ -16,19 +16,222 @@ limitations under the License. */ ...@@ -16,19 +16,222 @@ limitations under the License. */
#include "paddle/framework/eigen.h" #include "paddle/framework/eigen.h"
#include "paddle/framework/op_registry.h" #include "paddle/framework/op_registry.h"
#include "paddle/operators/elementwise_op_function.h"
#include "paddle/operators/math/math_function.h"
namespace paddle { namespace paddle {
namespace operators { namespace operators {
template <typename T>
struct SubAndSquareFunctor {
inline HOSTDEVICE T operator()(T a, T b) const { return (a - b) * (a - b); }
};
template <typename T>
struct DivAndSqrtFunctor {
explicit DivAndSqrtFunctor(T epsilon) { epsilon_ = epsilon; }
inline HOSTDEVICE T operator()(T a, T b) const {
return a / (sqrt(b + epsilon_));
}
private:
T epsilon_;
};
template <typename T>
struct MulFunctor {
inline HOSTDEVICE T operator()(T a, T b) const { return a * b; }
};
template <typename T>
struct AddFunctor {
inline HOSTDEVICE T operator()(T a, T b) const { return a + b; }
};
template <typename T>
struct SubFunctor {
inline HOSTDEVICE T operator()(T a, T b) const { return a - b; }
};
template <typename T>
struct MulInvVarFunctor {
inline HOSTDEVICE T operator()(T a, T b) const {
return a * std::sqrt(1.0 / b);
}
};
using Tensor = framework::Tensor;
using LoDTensor = framework::LoDTensor;
using DataLayout = framework::DataLayout;
template <typename DeviceContext, typename T> template <typename DeviceContext, typename T>
class LayerNormKernel : public framework::OpKernel<T> { class LayerNormKernel : public framework::OpKernel<T> {
public: public:
void Compute(const framework::ExecutionContext& ctx) const override; void Compute(const framework::ExecutionContext &ctx) const override {
const float epsilon = ctx.Attr<float>("epsilon");
auto *scale = ctx.Input<Tensor>("Scale");
auto *bias = ctx.Input<Tensor>("Bias");
auto x = *ctx.Input<Tensor>("X");
auto *y = ctx.Output<Tensor>("Y");
auto *mean = ctx.Output<Tensor>("Mean");
auto *var = ctx.Output<Tensor>("Variance");
const auto begin_norm_axis = ctx.Attr<int>("begin_norm_axis");
const auto x_dims = x.dims();
y->mutable_data<T>(ctx.GetPlace());
mean->mutable_data<T>(ctx.GetPlace());
var->mutable_data<T>(ctx.GetPlace());
auto matrix_dim = framework::flatten_to_2d(x_dims, begin_norm_axis);
int left = static_cast<int>(matrix_dim[0]);
int right = static_cast<int>(matrix_dim[1]);
framework::DDim matrix_shape({left, right});
x.Resize(matrix_shape);
Tensor out;
out.ShareDataWith(*y);
out.Resize(matrix_shape);
auto &dev_ctx = ctx.template device_context<DeviceContext>();
math::RowwiseMean<DeviceContext, T> row_mean;
// get mean
row_mean(dev_ctx, x, mean);
// get variance
ElementwiseComputeEx<SubAndSquareFunctor<T>, DeviceContext, T>(
ctx, &x, mean, /*axis*/ 0, SubAndSquareFunctor<T>(), &out);
row_mean(dev_ctx, out, var);
// get x_norm
ElementwiseComputeEx<SubFunctor<T>, DeviceContext, T>(
ctx, &x, mean, /*axis*/ 0, SubFunctor<T>(), &out);
ElementwiseComputeEx<DivAndSqrtFunctor<T>, DeviceContext, T>(
ctx, &out, var, /*axis*/ 0,
DivAndSqrtFunctor<T>(static_cast<T>(epsilon)), &out);
if (scale) {
ElementwiseComputeEx<MulFunctor<T>, DeviceContext, T>(
ctx, &out, scale, /*axis*/ 1, MulFunctor<T>(), &out);
}
if (bias) {
ElementwiseComputeEx<AddFunctor<T>, DeviceContext, T>(
ctx, &out, bias, /*axis*/ 1, AddFunctor<T>(), &out);
}
}
}; };
template <typename DeviceContext, typename T> template <typename DeviceContext, typename T>
class LayerNormGradKernel : public framework::OpKernel<T> { class LayerNormGradKernel : public framework::OpKernel<T> {
public: public:
void Compute(const framework::ExecutionContext& ctx) const override; void Compute(const framework::ExecutionContext &ctx) const override {
const float epsilon = ctx.Attr<float>("epsilon");
auto x = *ctx.Input<Tensor>("X");
auto *y = ctx.Input<Tensor>("Y");
auto *mean = ctx.Input<Tensor>("Mean");
auto *var = ctx.Input<Tensor>("Variance");
auto *scale = ctx.Input<Tensor>("Scale");
auto *bias = ctx.Input<Tensor>("Bias");
auto d_y = *ctx.Input<Tensor>(framework::GradVarName("Y"));
const auto begin_norm_axis = ctx.Attr<int>("begin_norm_axis");
// init output
auto *d_x = ctx.Output<Tensor>(framework::GradVarName("X"));
auto *d_scale = ctx.Output<Tensor>(framework::GradVarName("Scale"));
auto *d_bias = ctx.Output<Tensor>(framework::GradVarName("Bias"));
const auto &x_dims = x.dims();
auto matrix_dim = framework::flatten_to_2d(x_dims, begin_norm_axis);
int left = static_cast<int>(matrix_dim[0]);
int right = static_cast<int>(matrix_dim[1]);
framework::DDim matrix_shape({left, right});
d_y.Resize(matrix_shape);
auto &dev_ctx = ctx.template device_context<DeviceContext>();
math::ColwiseSum<DeviceContext, T> colwise_sum;
Tensor temp;
Tensor temp_norm;
if (d_scale || d_x) {
x.Resize(matrix_shape);
temp.mutable_data<T>(matrix_shape, ctx.GetPlace());
if (!(bias && scale)) {
temp_norm.ShareDataWith(*y);
temp_norm.Resize(matrix_shape);
} else {
temp_norm.mutable_data<T>(matrix_shape, ctx.GetPlace());
// get x_norm
ElementwiseComputeEx<SubFunctor<T>, DeviceContext, T>(
ctx, &x, mean, /*axis*/ 0, SubFunctor<T>(), &temp_norm);
ElementwiseComputeEx<DivAndSqrtFunctor<T>, DeviceContext, T>(
ctx, &temp_norm, var, /*axis*/ 0,
DivAndSqrtFunctor<T>(static_cast<T>(epsilon)), &temp_norm);
}
}
if (d_bias) {
d_bias->mutable_data<T>(ctx.GetPlace());
colwise_sum(dev_ctx, d_y, d_bias);
}
if (d_scale) {
d_scale->mutable_data<T>(ctx.GetPlace());
ElementwiseComputeEx<MulFunctor<T>, DeviceContext, T>(
ctx, &temp_norm, &d_y, /*axis*/ 0, MulFunctor<T>(), &temp);
colwise_sum(dev_ctx, temp, d_scale);
}
if (d_x) {
framework::DDim vec_shape({left});
d_x->mutable_data<T>(ctx.GetPlace());
auto dx_dim = d_x->dims();
Tensor temp_vec;
temp_vec.mutable_data<T>(vec_shape, ctx.GetPlace());
math::RowwiseMean<DeviceContext, T> row_mean;
if (d_scale) {
// dy_dx
ElementwiseComputeEx<MulFunctor<T>, DeviceContext, T>(
ctx, &d_y, scale, /*axis*/ 1, MulFunctor<T>(), &temp);
framework::Copy(temp, ctx.GetPlace(), ctx.device_context(), d_x);
// dy_dmean_dx
row_mean(dev_ctx, temp, &temp_vec);
ElementwiseComputeEx<SubFunctor<T>, DeviceContext, T>(
ctx, d_x, &temp_vec, /*axis*/ 0, SubFunctor<T>(), d_x);
// dy_var_dx
ElementwiseComputeEx<MulFunctor<T>, DeviceContext, T>(
ctx, &temp, &temp_norm, /*axis*/ 0, MulFunctor<T>(), &temp);
} else {
// dy_dx
framework::Copy(d_y, ctx.GetPlace(), ctx.device_context(), d_x);
// dy_dmean_dx
row_mean(dev_ctx, d_y, &temp_vec);
ElementwiseComputeEx<SubFunctor<T>, DeviceContext, T>(
ctx, d_x, &temp_vec, /*axis*/ 0, SubFunctor<T>(), d_x);
// dy_var_dx
ElementwiseComputeEx<MulFunctor<T>, DeviceContext, T>(
ctx, &d_y, &temp_norm, /*axis*/ 0, MulFunctor<T>(), &temp);
}
// dy_var_dx
row_mean(dev_ctx, temp, &temp_vec);
ElementwiseComputeEx<MulFunctor<T>, DeviceContext, T>(
ctx, &temp_norm, &temp_vec, /*axis*/ 0, MulFunctor<T>(), &temp);
ElementwiseComputeEx<SubFunctor<T>, DeviceContext, T>(
ctx, d_x, &temp, /*axis*/ 0, SubFunctor<T>(), d_x);
ElementwiseComputeEx<DivAndSqrtFunctor<T>, DeviceContext, T>(
ctx, d_x, var, /*axis*/ 0,
DivAndSqrtFunctor<T>(static_cast<T>(epsilon)), d_x);
d_x->Resize(dx_dim);
}
}
}; };
} // namespace operators } // namespace operators
......
...@@ -331,6 +331,12 @@ template struct RowwiseAdd<platform::CPUDeviceContext, double>; ...@@ -331,6 +331,12 @@ template struct RowwiseAdd<platform::CPUDeviceContext, double>;
template struct ColwiseSum<platform::CPUDeviceContext, float>; template struct ColwiseSum<platform::CPUDeviceContext, float>;
template struct ColwiseSum<platform::CPUDeviceContext, double>; template struct ColwiseSum<platform::CPUDeviceContext, double>;
template struct RowwiseSum<platform::CPUDeviceContext, float>;
template struct RowwiseSum<platform::CPUDeviceContext, double>;
template struct RowwiseMean<platform::CPUDeviceContext, float>;
template struct RowwiseMean<platform::CPUDeviceContext, double>;
} // namespace math } // namespace math
} // namespace operators } // namespace operators
} // namespace paddle } // namespace paddle
...@@ -325,6 +325,31 @@ void ColwiseSum<platform::CUDADeviceContext, double>::operator()( ...@@ -325,6 +325,31 @@ void ColwiseSum<platform::CUDADeviceContext, double>::operator()(
vector->data<double>()); vector->data<double>());
} }
template struct RowwiseSum<platform::CUDADeviceContext, float>;
// template struct RowwiseSum<platform::CUDADeviceContext, double>;
// TODO(zcd): Following ColwiseSum format, need to confirm.
// The RowwiseSum<platform::CUDADeviceContext, double> failed in debug mode,
// and only failed for this case. So reimplemented it.
template <>
void RowwiseSum<platform::CUDADeviceContext, double>::operator()(
const platform::CUDADeviceContext& context, const framework::Tensor& input,
framework::Tensor* vector) {
auto in_dims = input.dims();
auto size = input.numel() / in_dims[0];
PADDLE_ENFORCE_EQ(vector->numel(), in_dims[0]);
framework::Tensor one;
one.mutable_data<double>({size}, context.GetPlace());
SetConstant<platform::CUDADeviceContext, double> set;
set(context, &one, static_cast<double>(1.0));
gemv<platform::CUDADeviceContext, double>(
context, true, static_cast<int>(in_dims[1]), static_cast<int>(in_dims[0]),
1.0, one.data<double>(), input.data<double>(), 0.0,
vector->data<double>());
}
template struct RowwiseMean<platform::CUDADeviceContext, float>;
template struct RowwiseMean<platform::CUDADeviceContext, double>;
} // namespace math } // namespace math
} // namespace operators } // namespace operators
} // namespace paddle } // namespace paddle
...@@ -128,6 +128,18 @@ struct ColwiseSum { ...@@ -128,6 +128,18 @@ struct ColwiseSum {
framework::Tensor* vec); framework::Tensor* vec);
}; };
template <typename DeviceContext, typename T>
struct RowwiseSum {
void operator()(const DeviceContext& context, const framework::Tensor& input,
framework::Tensor* vec);
};
template <typename DeviceContext, typename T>
struct RowwiseMean {
void operator()(const DeviceContext& context, const framework::Tensor& input,
framework::Tensor* vec);
};
} // namespace math } // namespace math
} // namespace operators } // namespace operators
} // namespace paddle } // namespace paddle
...@@ -87,6 +87,88 @@ class ColwiseSum<platform::CPUDeviceContext, T> { ...@@ -87,6 +87,88 @@ class ColwiseSum<platform::CPUDeviceContext, T> {
} }
}; };
template <typename DeviceContext, typename T>
void RowwiseMean<DeviceContext, T>::operator()(const DeviceContext& context,
const framework::Tensor& input,
framework::Tensor* out) {
auto in_dims = input.dims();
PADDLE_ENFORCE_EQ(in_dims.size(), 2U);
PADDLE_ENFORCE_EQ(out->numel(), in_dims[0]);
auto in = framework::EigenMatrix<T>::From(input);
auto vec = framework::EigenVector<T>::Flatten(*out);
vec.device(*context.eigen_device()) = in.mean(Eigen::array<int, 1>({{1}}));
}
// TODO(zcd): Following ColwiseSum format, need to confirm.
// Specialize for CPU, since Eigen implement a general reduce. However,
// rowwise-sum can be easily implemented. General reduce has a huge overhead in
// CPU
template <typename T>
class RowwiseMean<platform::CPUDeviceContext, T> {
public:
void operator()(const platform::CPUDeviceContext& context,
const framework::Tensor& input, framework::Tensor* out) {
auto& in_dims = input.dims();
PADDLE_ENFORCE_EQ(in_dims.size(), 2U);
auto height = in_dims[0];
auto size = in_dims[1];
PADDLE_ENFORCE_EQ(out->numel(), height);
auto inv_size = 1.0 / size;
T* out_buf = out->mutable_data<T>(out->place());
const T* in_buf = input.data<T>();
for (size_t i = 0; i < static_cast<size_t>(height); ++i) {
T sum = 0;
for (size_t j = 0; j < static_cast<size_t>(size); ++j) {
sum += in_buf[i * size + j];
}
out_buf[i] = sum * inv_size;
}
}
};
template <typename DeviceContext, typename T>
void RowwiseSum<DeviceContext, T>::operator()(const DeviceContext& context,
const framework::Tensor& input,
framework::Tensor* out) {
auto in_dims = input.dims();
PADDLE_ENFORCE_EQ(in_dims.size(), 2U);
PADDLE_ENFORCE_EQ(out->numel(), in_dims[0]);
auto in = framework::EigenMatrix<T>::From(input);
auto vec = framework::EigenVector<T>::Flatten(*out);
vec.device(*context.eigen_device()) = in.sum(Eigen::array<int, 1>({{1}}));
}
// TODO(zcd): Following ColwiseSum format, need to confirm.
// Specialize for CPU, since Eigen implement a general reduce. However,
// rowwise-sum can be easily implemented. General reduce has a huge overhead in
// CPU
template <typename T>
class RowwiseSum<platform::CPUDeviceContext, T> {
public:
void operator()(const platform::CPUDeviceContext& context,
const framework::Tensor& input, framework::Tensor* out) {
auto& in_dims = input.dims();
PADDLE_ENFORCE_EQ(in_dims.size(), 2U);
auto height = in_dims[0];
auto size = in_dims[1];
PADDLE_ENFORCE_EQ(out->numel(), size);
T* out_buf = out->mutable_data<T>(out->place());
const T* in_buf = input.data<T>();
for (size_t i = 0; i < static_cast<size_t>(height); ++i) {
T sum = 0;
for (size_t j = 0; j < static_cast<size_t>(size); ++j) {
sum += in_buf[i * size + j];
}
out_buf[i] = sum;
}
}
};
} // namespace math } // namespace math
} // namespace operators } // namespace operators
} // namespace paddle } // namespace paddle
...@@ -300,6 +300,9 @@ class DistributeTranspiler: ...@@ -300,6 +300,9 @@ class DistributeTranspiler:
pass pass
return orig_shape return orig_shape
def _op_input_var(self, op, varname):
pass
def _is_op_on_pserver(self, endpoint, all_ops, idx): def _is_op_on_pserver(self, endpoint, all_ops, idx):
""" """
Recursively check if the op need to run on current server. Recursively check if the op need to run on current server.
...@@ -309,29 +312,35 @@ class DistributeTranspiler: ...@@ -309,29 +312,35 @@ class DistributeTranspiler:
p.name for p in self.param_grad_ep_mapping[endpoint]["params"] p.name for p in self.param_grad_ep_mapping[endpoint]["params"]
] ]
op = all_ops[idx] op = all_ops[idx]
if op.inputs.has_key("Param"): input_names = set(op.input_names)
if op.inputs["Param"].name in param_names: # TODO(typhoonzero): using Param and Grad input name to identify
# that the operator is an optimization operator, need a better way.
if "Param" in input_names:
if op.input("Param")[0] in param_names:
return True return True
else: else:
for n in param_names: for n in param_names:
if same_or_split_var(n, op.inputs[ if same_or_split_var(n, op.input("Param")[0]) \
"Param"].name) and n != op.inputs["Param"].name: and n != op.input("Param")[0]:
return True return True
return False return False
else: else:
j = idx - 1 j = idx - 1
while j >= 0: while j >= 0:
prev_op = all_ops[j] prev_op = all_ops[j]
prev_output_names = [o.name for o in prev_op.outputs.values()] # prev_output_names = [o.name for o in prev_op.outputs.values()]
prev_input_names = [o.name for o in prev_op.inputs.values()] # prev_input_names = [o.name for o in prev_op.inputs.values()]
# NOTE(typhoonzero): consider list input/output
prev_output_names = prev_op.desc.output_arg_names()
prev_input_names = prev_op.desc.input_arg_names()
found1 = False found1 = False
found2 = False found2 = False
for _, v in op.inputs.iteritems(): for varname in op.desc.input_arg_names():
if v.name in prev_output_names: if varname in prev_output_names:
found1 = self._is_op_on_pserver(endpoint, all_ops, j) found1 = self._is_op_on_pserver(endpoint, all_ops, j)
# later ops may produce output for prev op's next batch use. # later ops may produce output for prev op's next batch use.
for _, v in op.outputs.iteritems(): for varname in op.desc.output_arg_names():
if v.name in prev_input_names: if varname in prev_input_names:
found2 = self._is_op_on_pserver(endpoint, all_ops, j) found2 = self._is_op_on_pserver(endpoint, all_ops, j)
if found1 or found2: if found1 or found2:
return True return True
...@@ -342,11 +351,11 @@ class DistributeTranspiler: ...@@ -342,11 +351,11 @@ class DistributeTranspiler:
new_inputs = dict() new_inputs = dict()
# update param/grad shape first, then other inputs like # update param/grad shape first, then other inputs like
# moment can use the updated shape # moment can use the updated shape
for key, var in opt_op.inputs.iteritems(): for key in opt_op.input_names:
if key == "Grad": if key == "Grad":
grad_block = None grad_block = None
for g in self.param_grad_ep_mapping[endpoint]["grads"]: for g in self.param_grad_ep_mapping[endpoint]["grads"]:
if same_or_split_var(g.name, var.name): if same_or_split_var(g.name, opt_op.input(key)[0]):
grad_block = g grad_block = g
break break
if not grad_block: if not grad_block:
...@@ -376,7 +385,7 @@ class DistributeTranspiler: ...@@ -376,7 +385,7 @@ class DistributeTranspiler:
# param is already created on global program # param is already created on global program
param_block = None param_block = None
for p in self.param_grad_ep_mapping[endpoint]["params"]: for p in self.param_grad_ep_mapping[endpoint]["params"]:
if same_or_split_var(p.name, var.name): if same_or_split_var(p.name, opt_op.input(key)[0]):
param_block = p param_block = p
break break
if not param_block: if not param_block:
...@@ -389,11 +398,12 @@ class DistributeTranspiler: ...@@ -389,11 +398,12 @@ class DistributeTranspiler:
new_inputs[key] = tmpvar new_inputs[key] = tmpvar
for key, var in opt_op.inputs.iteritems(): for key in opt_op.input_names:
if key in ["Param", "Grad"]: if key in ["Param", "Grad"]:
continue continue
# update accumulator variable shape # update accumulator variable shape
param_shape = new_inputs["Param"].shape param_shape = new_inputs["Param"].shape
var = program.global_block().vars[opt_op.input(key)[0]]
new_shape = self._get_optimizer_input_shape(opt_op.type, key, new_shape = self._get_optimizer_input_shape(opt_op.type, key,
var.shape, param_shape) var.shape, param_shape)
tmpvar = program.global_block().create_var( tmpvar = program.global_block().create_var(
...@@ -412,30 +422,44 @@ class DistributeTranspiler: ...@@ -412,30 +422,44 @@ class DistributeTranspiler:
shape=new_shape) shape=new_shape)
# change output's ParamOut variable # change output's ParamOut variable
opt_op.outputs["ParamOut"] = new_inputs["Param"] outputs = self._get_output_map_from_op(program.global_block(), opt_op)
outputs["ParamOut"] = new_inputs["Param"]
program.global_block().append_op( program.global_block().append_op(
type=opt_op.type, type=opt_op.type,
inputs=new_inputs, inputs=new_inputs,
outputs=opt_op.outputs, outputs=outputs,
attrs=opt_op.attrs) attrs=opt_op.attrs)
def _append_pserver_non_opt_ops(self, program, pserver_program, opt_op): def _append_pserver_non_opt_ops(self, program, pserver_program, opt_op):
# Append the ops for parameters that do not need to be optimized/updated # Append the ops for parameters that do not need to be optimized/updated
for _, var in opt_op.inputs.iteritems(): inputs = self._get_input_map_from_op(self.program.global_block().vars,
program.global_block().create_var( opt_op)
name=var.name, for var in inputs.itervalues():
persistable=var.persistable, if type(var) == list:
dtype=var.dtype, varlist = var
shape=var.shape) else:
pserver_program.global_block().create_var( varlist = [var]
name=var.name, for var in varlist:
persistable=var.persistable, # TODO(typhoonzero): will remove below line later.
dtype=var.dtype, program.global_block().create_var(
shape=var.shape) name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
if not pserver_program.global_block().vars.has_key(var.name):
pserver_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
outputs = self._get_output_map_from_op(self.program.global_block().vars,
opt_op)
program.global_block().append_op( program.global_block().append_op(
type=opt_op.type, type=opt_op.type,
inputs=opt_op.inputs, inputs=inputs,
outputs=opt_op.outputs, outputs=outputs,
attrs=opt_op.attrs) attrs=opt_op.attrs)
def get_pserver_program(self, endpoint): def get_pserver_program(self, endpoint):
...@@ -472,7 +496,7 @@ class DistributeTranspiler: ...@@ -472,7 +496,7 @@ class DistributeTranspiler:
self.optimize_ops, idx) self.optimize_ops, idx)
if not is_op_on_pserver: if not is_op_on_pserver:
continue continue
if opt_op.inputs.has_key("Grad"): if "Grad" in opt_op.desc.input_arg_names():
self._append_pserver_ops(optimize_sub_program, pserver_program, self._append_pserver_ops(optimize_sub_program, pserver_program,
opt_op, endpoint) opt_op, endpoint)
else: else:
...@@ -499,6 +523,30 @@ class DistributeTranspiler: ...@@ -499,6 +523,30 @@ class DistributeTranspiler:
pserver_program.sync_with_cpp() pserver_program.sync_with_cpp()
return pserver_program return pserver_program
def _get_input_map_from_op(self, varmap, op):
iomap = dict()
for key in op.input_names:
vars = []
for varname in op.input(key):
vars.append(varmap[varname])
if len(vars) == 1:
iomap[key] = vars[0]
else:
iomap[key] = vars
return iomap
def _get_output_map_from_op(self, varmap, op):
iomap = dict()
for key in op.output_names:
vars = []
for varname in op.output(key):
vars.append(varmap[varname])
if len(vars) == 1:
iomap[key] = vars[0]
else:
iomap[key] = vars
return iomap
def get_startup_program(self, endpoint, pserver_program): def get_startup_program(self, endpoint, pserver_program):
""" """
Get startup program for current parameter server. Get startup program for current parameter server.
...@@ -529,17 +577,21 @@ class DistributeTranspiler: ...@@ -529,17 +577,21 @@ class DistributeTranspiler:
# 2. rename op outputs # 2. rename op outputs
for op in orig_s_prog.global_block().ops: for op in orig_s_prog.global_block().ops:
new_inputs = dict()
new_outputs = dict() new_outputs = dict()
# do not append startup op if var is not on this pserver # do not append startup op if var is not on this pserver
op_on_pserver = False op_on_pserver = False
for key, var in op.outputs.iteritems(): for key in op.output_names:
newname, _ = _get_splited_name_and_shape(var.name) newname, _ = _get_splited_name_and_shape(op.output(key)[0])
if newname: if newname:
op_on_pserver = True op_on_pserver = True
new_outputs[key] = created_var_map[newname] new_outputs[key] = created_var_map[newname]
elif var.name in pserver_vars: elif op.output(key)[0] in pserver_vars:
op_on_pserver = True op_on_pserver = True
new_outputs[key] = pserver_vars[var.name] new_outputs[key] = pserver_vars[op.output(key)[0]]
# most startup program ops have no inputs
new_inputs = self._get_input_map_from_op(pserver_vars, op)
if op_on_pserver: if op_on_pserver:
if op.type in [ if op.type in [
...@@ -548,7 +600,7 @@ class DistributeTranspiler: ...@@ -548,7 +600,7 @@ class DistributeTranspiler:
op.attrs["shape"] = new_outputs["Out"].shape op.attrs["shape"] = new_outputs["Out"].shape
s_prog.global_block().append_op( s_prog.global_block().append_op(
type=op.type, type=op.type,
inputs=op.inputs, inputs=new_inputs,
outputs=new_outputs, outputs=new_outputs,
attrs=op.attrs) attrs=op.attrs)
return s_prog return s_prog
...@@ -740,6 +740,9 @@ class Block(object): ...@@ -740,6 +740,9 @@ class Block(object):
raise e raise e
self.desc.remove_op(start, end + 1) self.desc.remove_op(start, end + 1)
def slice_ops(self, start, end):
return list(self.ops)[start:end]
def prepend_op(self, *args, **kwargs): def prepend_op(self, *args, **kwargs):
op_desc = self.desc.prepend_op() op_desc = self.desc.prepend_op()
op = Operator(self, op_desc, *args, **kwargs) op = Operator(self, op_desc, *args, **kwargs)
......
...@@ -342,7 +342,11 @@ def save_inference_model(dirname, ...@@ -342,7 +342,11 @@ def save_inference_model(dirname,
prepend_feed_ops(inference_program, feeded_var_names) prepend_feed_ops(inference_program, feeded_var_names)
append_fetch_ops(inference_program, fetch_var_names) append_fetch_ops(inference_program, fetch_var_names)
model_file_name = dirname + "/__model__" if save_file_name == None:
model_file_name = dirname + "/__model__"
else:
model_file_name = dirname + "/__model_combined__"
with open(model_file_name, "wb") as f: with open(model_file_name, "wb") as f:
f.write(inference_program.desc.serialize_to_string()) f.write(inference_program.desc.serialize_to_string())
...@@ -384,7 +388,11 @@ def load_inference_model(dirname, executor, load_file_name=None): ...@@ -384,7 +388,11 @@ def load_inference_model(dirname, executor, load_file_name=None):
if not os.path.isdir(dirname): if not os.path.isdir(dirname):
raise ValueError("There is no directory named '%s'", dirname) raise ValueError("There is no directory named '%s'", dirname)
model_file_name = dirname + "/__model__" if load_file_name == None:
model_file_name = dirname + "/__model__"
else:
model_file_name = dirname + "/__model_combined__"
with open(model_file_name, "rb") as f: with open(model_file_name, "rb") as f:
program_desc_str = f.read() program_desc_str = f.read()
......
...@@ -92,14 +92,13 @@ class ControlFlowGraph(object): ...@@ -92,14 +92,13 @@ class ControlFlowGraph(object):
live_in = defaultdict(set) live_in = defaultdict(set)
live_out = defaultdict(set) live_out = defaultdict(set)
while True: while True:
for i in range(self.op_size): for i in range(self.op_size, 0, -1):
live_in[i] = set(self._live_in[i]) live_in[i] = set(self._live_in[i])
live_out[i] = set(self._live_out[i]) live_out[i] = set(self._live_out[i])
self._live_in[i] = self._uses[i] | (
self._live_out[i] - self._defs[i])
for s in self._successors[i]: for s in self._successors[i]:
self._live_out[i] |= self._live_in[s] self._live_out[i] |= self._live_in[s]
self._live_in[i] = self._uses[i] | (
self._live_out[i] - self._defs[i])
if self._reach_fixed_point(live_in, live_out): if self._reach_fixed_point(live_in, live_out):
break break
......
...@@ -190,6 +190,8 @@ class Optimizer(object): ...@@ -190,6 +190,8 @@ class Optimizer(object):
# Create any accumulators # Create any accumulators
program = loss.block.program program = loss.block.program
with program_guard(program, startup_program): with program_guard(program, startup_program):
global_block = framework.default_main_program().global_block()
start = len(global_block.ops)
self.helper = LayerHelper(self.__class__.__name__) self.helper = LayerHelper(self.__class__.__name__)
self._create_accumulators(loss.block, self._create_accumulators(loss.block,
[p[0] for p in parameters_and_grads]) [p[0] for p in parameters_and_grads])
...@@ -203,19 +205,14 @@ class Optimizer(object): ...@@ -203,19 +205,14 @@ class Optimizer(object):
param_and_grad) param_and_grad)
optimize_ops.append(optimize_op) optimize_ops.append(optimize_op)
# Returned list of ops can include more ops in addition
# to optimization ops
return_ops = optimize_ops
# Get custom finish ops for subclasses # Get custom finish ops for subclasses
# FIXME: Need to fix this once we figure out how to handle dependencies # FIXME: Need to fix this once we figure out how to handle dependencies
finish_ops = self._finish_update(loss.block) self._finish_update(loss.block)
if finish_ops is not None:
return_ops += finish_ops
if self._global_step is not None: if self._global_step is not None:
return_ops.append(self._increment_global_step(loss.block)) self._increment_global_step(loss.block)
return return_ops end = len(global_block.ops)
return global_block.slice_ops(start, end)
def minimize(self, def minimize(self,
loss, loss,
......
...@@ -78,7 +78,7 @@ def conv_net(img, label): ...@@ -78,7 +78,7 @@ def conv_net(img, label):
return loss_net(conv_pool_2, label) return loss_net(conv_pool_2, label)
def train(nn_type, use_cuda, parallel, save_dirname): def train(nn_type, use_cuda, parallel, save_dirname, save_param_filename):
if use_cuda and not fluid.core.is_compiled_with_cuda(): if use_cuda and not fluid.core.is_compiled_with_cuda():
return return
img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32') img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32')
...@@ -143,8 +143,10 @@ def train(nn_type, use_cuda, parallel, save_dirname): ...@@ -143,8 +143,10 @@ def train(nn_type, use_cuda, parallel, save_dirname):
avg_loss_val = numpy.array(avg_loss_set).mean() avg_loss_val = numpy.array(avg_loss_set).mean()
if float(acc_val) > 0.85: # test acc > 85% if float(acc_val) > 0.85: # test acc > 85%
if save_dirname is not None: if save_dirname is not None:
fluid.io.save_inference_model(save_dirname, ["img"], fluid.io.save_inference_model(
[prediction], exe) save_dirname, ["img"], [prediction],
exe,
save_file_name=save_param_filename)
return return
else: else:
print( print(
...@@ -156,7 +158,7 @@ def train(nn_type, use_cuda, parallel, save_dirname): ...@@ -156,7 +158,7 @@ def train(nn_type, use_cuda, parallel, save_dirname):
raise AssertionError("Loss of recognize digits is too large") raise AssertionError("Loss of recognize digits is too large")
def infer(use_cuda, save_dirname=None): def infer(use_cuda, save_dirname=None, param_filename=None):
if save_dirname is None: if save_dirname is None:
return return
...@@ -167,8 +169,8 @@ def infer(use_cuda, save_dirname=None): ...@@ -167,8 +169,8 @@ def infer(use_cuda, save_dirname=None):
# the feed_target_names (the names of variables that will be feeded # the feed_target_names (the names of variables that will be feeded
# data using feed operators), and the fetch_targets (variables that # data using feed operators), and the fetch_targets (variables that
# we want to obtain data from using fetch operators). # we want to obtain data from using fetch operators).
[inference_program, feed_target_names, [inference_program, feed_target_names, fetch_targets
fetch_targets] = fluid.io.load_inference_model(save_dirname, exe) ] = fluid.io.load_inference_model(save_dirname, exe, param_filename)
# The input's dimension of conv should be 4-D or 5-D. # The input's dimension of conv should be 4-D or 5-D.
# Use normilized image pixels as input data, which should be in the range [-1.0, 1.0]. # Use normilized image pixels as input data, which should be in the range [-1.0, 1.0].
...@@ -183,36 +185,45 @@ def infer(use_cuda, save_dirname=None): ...@@ -183,36 +185,45 @@ def infer(use_cuda, save_dirname=None):
print("infer results: ", results[0]) print("infer results: ", results[0])
def main(use_cuda, parallel, nn_type): def main(use_cuda, parallel, nn_type, combine):
if not use_cuda and not parallel: if not use_cuda and not parallel:
save_dirname = "recognize_digits_" + nn_type + ".inference.model" save_dirname = "recognize_digits_" + nn_type + ".inference.model"
save_filename = None
if combine == True:
save_filename = "__params_combined__"
else: else:
save_dirname = None save_dirname = None
save_filename = None
train( train(
nn_type=nn_type, nn_type=nn_type,
use_cuda=use_cuda, use_cuda=use_cuda,
parallel=parallel, parallel=parallel,
save_dirname=save_dirname) save_dirname=save_dirname,
infer(use_cuda=use_cuda, save_dirname=save_dirname) save_param_filename=save_filename)
infer(
use_cuda=use_cuda,
save_dirname=save_dirname,
param_filename=save_filename)
class TestRecognizeDigits(unittest.TestCase): class TestRecognizeDigits(unittest.TestCase):
pass pass
def inject_test_method(use_cuda, parallel, nn_type): def inject_test_method(use_cuda, parallel, nn_type, combine):
def __impl__(self): def __impl__(self):
prog = fluid.Program() prog = fluid.Program()
startup_prog = fluid.Program() startup_prog = fluid.Program()
scope = fluid.core.Scope() scope = fluid.core.Scope()
with fluid.scope_guard(scope): with fluid.scope_guard(scope):
with fluid.program_guard(prog, startup_prog): with fluid.program_guard(prog, startup_prog):
main(use_cuda, parallel, nn_type) main(use_cuda, parallel, nn_type, combine)
fn = 'test_{0}_{1}_{2}'.format(nn_type, 'cuda' fn = 'test_{0}_{1}_{2}_{3}'.format(nn_type, 'cuda'
if use_cuda else 'cpu', 'parallel' if use_cuda else 'cpu', 'parallel'
if parallel else 'normal') if parallel else 'normal', 'combine'
if combine else 'separate')
setattr(TestRecognizeDigits, fn, __impl__) setattr(TestRecognizeDigits, fn, __impl__)
...@@ -221,7 +232,10 @@ def inject_all_tests(): ...@@ -221,7 +232,10 @@ def inject_all_tests():
for use_cuda in (False, True): for use_cuda in (False, True):
for parallel in (False, True): for parallel in (False, True):
for nn_type in ('mlp', 'conv'): for nn_type in ('mlp', 'conv'):
inject_test_method(use_cuda, parallel, nn_type) inject_test_method(use_cuda, parallel, nn_type, True)
# One unit-test for saving parameters as separate files
inject_test_method(False, False, 'mlp', False)
inject_all_tests() inject_all_tests()
......
...@@ -18,6 +18,10 @@ import paddle.v2.fluid as fluid ...@@ -18,6 +18,10 @@ import paddle.v2.fluid as fluid
import paddle.v2.fluid.core as core import paddle.v2.fluid.core as core
import paddle.v2.fluid.framework as framework import paddle.v2.fluid.framework as framework
import paddle.v2.fluid.layers as layers import paddle.v2.fluid.layers as layers
import contextlib
import math
import sys
import unittest
from paddle.v2.fluid.executor import Executor from paddle.v2.fluid.executor import Executor
dict_size = 30000 dict_size = 30000
...@@ -145,7 +149,7 @@ def seq_to_seq_net(): ...@@ -145,7 +149,7 @@ def seq_to_seq_net():
cost = fluid.layers.cross_entropy(input=prediction, label=label) cost = fluid.layers.cross_entropy(input=prediction, label=label)
avg_cost = fluid.layers.mean(x=cost) avg_cost = fluid.layers.mean(x=cost)
return avg_cost return avg_cost, prediction
def to_lodtensor(data, place): def to_lodtensor(data, place):
...@@ -163,8 +167,16 @@ def to_lodtensor(data, place): ...@@ -163,8 +167,16 @@ def to_lodtensor(data, place):
return res return res
def main(): def create_random_lodtensor(lod, place, low, high):
avg_cost = seq_to_seq_net() data = np.random.random_integers(low, high, [lod[-1], 1]).astype("int64")
res = fluid.LoDTensor()
res.set(data, place)
res.set_lod([lod])
return res
def train(use_cuda, save_dirname=None):
[avg_cost, prediction] = seq_to_seq_net()
optimizer = fluid.optimizer.Adagrad(learning_rate=1e-4) optimizer = fluid.optimizer.Adagrad(learning_rate=1e-4)
optimizer.minimize(avg_cost) optimizer.minimize(avg_cost)
...@@ -174,7 +186,7 @@ def main(): ...@@ -174,7 +186,7 @@ def main():
paddle.dataset.wmt14.train(dict_size), buf_size=1000), paddle.dataset.wmt14.train(dict_size), buf_size=1000),
batch_size=batch_size) batch_size=batch_size)
place = core.CPUPlace() place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = Executor(place) exe = Executor(place)
exe.run(framework.default_startup_program()) exe.run(framework.default_startup_program())
...@@ -185,6 +197,7 @@ def main(): ...@@ -185,6 +197,7 @@ def main():
word_data = to_lodtensor(map(lambda x: x[0], data), place) word_data = to_lodtensor(map(lambda x: x[0], data), place)
trg_word = to_lodtensor(map(lambda x: x[1], data), place) trg_word = to_lodtensor(map(lambda x: x[1], data), place)
trg_word_next = to_lodtensor(map(lambda x: x[2], data), place) trg_word_next = to_lodtensor(map(lambda x: x[2], data), place)
outs = exe.run(framework.default_main_program(), outs = exe.run(framework.default_main_program(),
feed={ feed={
'source_sequence': word_data, 'source_sequence': word_data,
...@@ -192,13 +205,86 @@ def main(): ...@@ -192,13 +205,86 @@ def main():
'label_sequence': trg_word_next 'label_sequence': trg_word_next
}, },
fetch_list=[avg_cost]) fetch_list=[avg_cost])
avg_cost_val = np.array(outs[0]) avg_cost_val = np.array(outs[0])
print('pass_id=' + str(pass_id) + ' batch=' + str(batch_id) + print('pass_id=' + str(pass_id) + ' batch=' + str(batch_id) +
" avg_cost=" + str(avg_cost_val)) " avg_cost=" + str(avg_cost_val))
if math.isnan(float(avg_cost_val[0])):
sys.exit("got NaN loss, training failed.")
if batch_id > 3: if batch_id > 3:
exit(0) if save_dirname is not None:
fluid.io.save_inference_model(
save_dirname, ['source_sequence',
'target_sequence'], [prediction], exe)
return
batch_id += 1 batch_id += 1
def infer(use_cuda, save_dirname=None):
if save_dirname is None:
return
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
# Use fluid.io.load_inference_model to obtain the inference program desc,
# the feed_target_names (the names of variables that will be feeded
# data using feed operators), and the fetch_targets (variables that
# we want to obtain data from using fetch operators).
[inference_program, feed_target_names,
fetch_targets] = fluid.io.load_inference_model(save_dirname, exe)
lod = [0, 4, 10]
word_data = create_random_lodtensor(lod, place, low=0, high=1)
trg_word = create_random_lodtensor(lod, place, low=0, high=1)
# Construct feed as a dictionary of {feed_target_name: feed_target_data}
# and results will contain a list of data corresponding to fetch_targets.
assert feed_target_names[0] == 'source_sequence'
assert feed_target_names[1] == 'target_sequence'
results = exe.run(inference_program,
feed={
feed_target_names[0]: word_data,
feed_target_names[1]: trg_word,
},
fetch_list=fetch_targets,
return_numpy=False)
print(results[0].lod())
np_data = np.array(results[0])
print("Inference shape: ", np_data.shape)
print("Inference results: ", np_data)
def main(use_cuda):
if use_cuda and not fluid.core.is_compiled_with_cuda():
return
# Directory for saving the trained model
save_dirname = "rnn_encoder_decoder.inference.model"
train(use_cuda, save_dirname)
infer(use_cuda, save_dirname)
class TestRnnEncoderDecoder(unittest.TestCase):
def test_cuda(self):
with self.scope_prog_guard():
main(use_cuda=True)
def test_cpu(self):
with self.scope_prog_guard():
main(use_cuda=False)
@contextlib.contextmanager
def scope_prog_guard(self):
prog = fluid.Program()
startup_prog = fluid.Program()
scope = fluid.core.Scope()
with fluid.scope_guard(scope):
with fluid.program_guard(prog, startup_prog):
yield
if __name__ == '__main__': if __name__ == '__main__':
main() unittest.main()
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
import numpy as np import numpy as np
import paddle.v2 as paddle import paddle.v2 as paddle
import paddle.v2.fluid as fluid import paddle.v2.fluid as fluid
import math
import sys
# need to fix random seed and training data to compare the loss # need to fix random seed and training data to compare the loss
# value accurately calculated by the default and the memory optimization # value accurately calculated by the default and the memory optimization
...@@ -63,4 +65,6 @@ for pass_id in range(PASS_NUM): ...@@ -63,4 +65,6 @@ for pass_id in range(PASS_NUM):
if avg_loss_value[0] < 10.0: if avg_loss_value[0] < 10.0:
exit(0) # if avg cost less than 10.0, we think our code is good. exit(0) # if avg cost less than 10.0, we think our code is good.
if math.isnan(float(avg_loss_value)):
sys.exit("got NaN loss, training failed.")
exit(1) exit(1)
...@@ -18,6 +18,8 @@ import sys ...@@ -18,6 +18,8 @@ import sys
import paddle.v2 as paddle import paddle.v2 as paddle
import paddle.v2.fluid as fluid import paddle.v2.fluid as fluid
import math
import sys
# need to fix random seed and training data to compare the loss # need to fix random seed and training data to compare the loss
# value accurately calculated by the default and the memory optimization # value accurately calculated by the default and the memory optimization
...@@ -152,7 +154,10 @@ for pass_id in range(PASS_NUM): ...@@ -152,7 +154,10 @@ for pass_id in range(PASS_NUM):
print("loss:" + str(loss) + " acc:" + str(acc) + " pass_acc:" + str( print("loss:" + str(loss) + " acc:" + str(acc) + " pass_acc:" + str(
pass_acc)) pass_acc))
# this model is slow, so if we can train two mini batch, we think it works properly. # this model is slow, so if we can train two mini batch, we think it works properly.
if i > 2: if i > 2:
exit(0) exit(0)
if math.isnan(float(loss)):
sys.exit("got NaN loss, training failed.")
i += 1 i += 1
exit(1) exit(1)
...@@ -19,6 +19,8 @@ import paddle.v2.fluid.core as core ...@@ -19,6 +19,8 @@ import paddle.v2.fluid.core as core
import paddle.v2.fluid.framework as framework import paddle.v2.fluid.framework as framework
import paddle.v2.fluid.layers as layers import paddle.v2.fluid.layers as layers
from paddle.v2.fluid.executor import Executor from paddle.v2.fluid.executor import Executor
import math
import sys
dict_size = 30000 dict_size = 30000
source_dict_dim = target_dict_dim = dict_size source_dict_dim = target_dict_dim = dict_size
...@@ -137,6 +139,8 @@ def main(): ...@@ -137,6 +139,8 @@ def main():
" avg_cost=" + str(avg_cost_val)) " avg_cost=" + str(avg_cost_val))
if batch_id > 2: if batch_id > 2:
exit(0) exit(0)
if math.isnan(float(avg_cost_val)):
sys.exit("got NaN loss, training failed.")
batch_id += 1 batch_id += 1
......
...@@ -20,6 +20,8 @@ import paddle.v2.fluid.core as core ...@@ -20,6 +20,8 @@ import paddle.v2.fluid.core as core
from paddle.v2.fluid.op import Operator from paddle.v2.fluid.op import Operator
from paddle.v2.fluid.framework import grad_var_name from paddle.v2.fluid.framework import grad_var_name
np.random.random(123)
def _reference_layer_norm_naive(x, scale, beta, epsilon, begin_norm_axis=1): def _reference_layer_norm_naive(x, scale, beta, epsilon, begin_norm_axis=1):
x_shape = x.shape x_shape = x.shape
...@@ -62,9 +64,9 @@ def _reference_layer_norm_grad(x, grad_y, scale, mean, var, begin_norm_axis=1): ...@@ -62,9 +64,9 @@ def _reference_layer_norm_grad(x, grad_y, scale, mean, var, begin_norm_axis=1):
grad_x = dx_end + d_mean + d_std grad_x = dx_end + d_mean + d_std
grad_y.shape = x_shape grad_x.shape, x.shape, grad_y.shape = x_shape, x_shape, x_shape
x.shape = x_shape
scale.shape = scale_shape scale.shape = scale_shape
var.shape, mean.shape = [N, ], [N, ]
return grad_x, d_scale, d_bias return grad_x, d_scale, d_bias
...@@ -112,10 +114,7 @@ def set_output_grad(scope, outputs, place, feed_dict=None): ...@@ -112,10 +114,7 @@ def set_output_grad(scope, outputs, place, feed_dict=None):
class TestLayerNormdOp(OpTest): class TestLayerNormdOp(OpTest):
def __assert_close(self, tensor, np_array, msg, atol=1e-4): def __assert_close(self, tensor, np_array, msg, atol=1e-4):
self.assertTrue( self.assertTrue(np.allclose(np.array(tensor), np_array, atol=atol), msg)
np.allclose(
np.array(tensor).reshape(np_array.shape), np_array, atol=atol),
msg)
def __assert_grad_close(self, def __assert_grad_close(self,
tensor, tensor,
...@@ -123,7 +122,7 @@ class TestLayerNormdOp(OpTest): ...@@ -123,7 +122,7 @@ class TestLayerNormdOp(OpTest):
name, name,
place, place,
max_relative_error=0.02): max_relative_error=0.02):
a = np.array(tensor).reshape(np_array.shape) a = np.array(tensor)
b = np_array b = np_array
abs_a = np.abs(a) abs_a = np.abs(a)
abs_a[abs_a < 1e-5] = 1 abs_a[abs_a < 1e-5] = 1
...@@ -151,7 +150,7 @@ class TestLayerNormdOp(OpTest): ...@@ -151,7 +150,7 @@ class TestLayerNormdOp(OpTest):
x_shape = shape x_shape = shape
D = reduce(mul, x_shape[begin_norm_axis:len(x_shape)], 1) D = reduce(mul, x_shape[begin_norm_axis:len(x_shape)], 1)
scale_shape = [D] scale_shape = [D]
np.random.random(123)
x_val = np.random.random_sample(x_shape).astype(np.float32) x_val = np.random.random_sample(x_shape).astype(np.float32)
scale_val = np.random.random_sample(scale_shape).astype(np.float32) scale_val = np.random.random_sample(scale_shape).astype(np.float32)
bias_val = np.random.random_sample(scale_shape).astype(np.float32) bias_val = np.random.random_sample(scale_shape).astype(np.float32)
......
...@@ -42,9 +42,9 @@ class TestOptimizer(unittest.TestCase): ...@@ -42,9 +42,9 @@ class TestOptimizer(unittest.TestCase):
type="mean", inputs={"X": mul_out}, outputs={"Out": mean_out}) type="mean", inputs={"X": mul_out}, outputs={"Out": mean_out})
sgd_optimizer = optimizer.SGDOptimizer(learning_rate=0.01) sgd_optimizer = optimizer.SGDOptimizer(learning_rate=0.01)
opts, _ = sgd_optimizer.minimize(mean_out, init_program) opts, _ = sgd_optimizer.minimize(mean_out, init_program)
self.assertEqual(len(opts), 1) self.assertEqual(len(opts), 3)
sgd_op = opts[0] self.assertEqual([op.type for op in opts],
self.assertEqual(sgd_op.type, "sgd") ["fill_constant", "elementwise_mul", "sgd"])
def test_sgd_optimizer_with_global_step(self): def test_sgd_optimizer_with_global_step(self):
init_program = framework.Program() init_program = framework.Program()
...@@ -72,11 +72,10 @@ class TestOptimizer(unittest.TestCase): ...@@ -72,11 +72,10 @@ class TestOptimizer(unittest.TestCase):
sgd_optimizer = optimizer.SGDOptimizer( sgd_optimizer = optimizer.SGDOptimizer(
learning_rate=learning_rate, global_step=global_step) learning_rate=learning_rate, global_step=global_step)
opts, _ = sgd_optimizer.minimize(mean_out, init_program) opts, _ = sgd_optimizer.minimize(mean_out, init_program)
self.assertEqual(len(opts), 2) self.assertEqual(len(opts), 4)
sgd_op = opts[0] self.assertEqual(
self.assertEqual(sgd_op.type, "sgd") [op.type for op in opts],
increment_op = opts[1] ["fill_constant", "elementwise_mul", "sgd", "increment"])
self.assertEqual(increment_op.type, "increment")
# Check init_program # Check init_program
init_ops = init_program.global_block().ops init_ops = init_program.global_block().ops
...@@ -121,9 +120,10 @@ class TestMomentumOptimizer(unittest.TestCase): ...@@ -121,9 +120,10 @@ class TestMomentumOptimizer(unittest.TestCase):
self.assertEqual(len(momentum_optimizer.get_accumulators()), 0) self.assertEqual(len(momentum_optimizer.get_accumulators()), 0)
opts = momentum_optimizer.create_optimization_pass( opts = momentum_optimizer.create_optimization_pass(
params_grads, mul_out, init_program) params_grads, mul_out, init_program)
self.assertEqual(len(opts), 1) self.assertEqual(len(opts), 3)
sgd_op = opts[0] sgd_op = opts[-1]
self.assertEqual(sgd_op.type, "momentum") self.assertEqual([op.type for op in opts],
["fill_constant", "elementwise_mul", "momentum"])
self.assertFalse(sgd_op.attr('use_nesterov')) self.assertFalse(sgd_op.attr('use_nesterov'))
# Check accumulators # Check accumulators
...@@ -170,9 +170,10 @@ class TestMomentumOptimizer(unittest.TestCase): ...@@ -170,9 +170,10 @@ class TestMomentumOptimizer(unittest.TestCase):
self.assertEqual(len(momentum_optimizer.get_accumulators()), 0) self.assertEqual(len(momentum_optimizer.get_accumulators()), 0)
opts = momentum_optimizer.create_optimization_pass( opts = momentum_optimizer.create_optimization_pass(
params_grads, mul_out, init_program) params_grads, mul_out, init_program)
self.assertEqual(len(opts), 1) self.assertEqual(len(opts), 3)
sgd_op = opts[0] sgd_op = opts[-1]
self.assertEqual(sgd_op.type, "momentum") self.assertEqual([op.type for op in opts],
["fill_constant", "elementwise_mul", "momentum"])
self.assertTrue(sgd_op.attr('use_nesterov')) self.assertTrue(sgd_op.attr('use_nesterov'))
# Check accumulators # Check accumulators
...@@ -228,9 +229,9 @@ class TestAdagradOptimizer(unittest.TestCase): ...@@ -228,9 +229,9 @@ class TestAdagradOptimizer(unittest.TestCase):
self.assertEqual(len(adagrad_optimizer.get_accumulators()), 0) self.assertEqual(len(adagrad_optimizer.get_accumulators()), 0)
opts = adagrad_optimizer.create_optimization_pass(params_grads, mul_out, opts = adagrad_optimizer.create_optimization_pass(params_grads, mul_out,
init_program) init_program)
self.assertEqual(len(opts), 1) self.assertEqual(len(opts), 3)
adagrad_op = opts[0] self.assertEqual([op.type for op in opts],
self.assertEqual(adagrad_op.type, "adagrad") ["fill_constant", "elementwise_mul", "adagrad"])
# Check accumulators # Check accumulators
accumulators = adagrad_optimizer.get_accumulators() accumulators = adagrad_optimizer.get_accumulators()
...@@ -288,9 +289,10 @@ class TestAdamOptimizer(unittest.TestCase): ...@@ -288,9 +289,10 @@ class TestAdamOptimizer(unittest.TestCase):
self.assertEqual(len(adam_optimizer.get_accumulators()), 0) self.assertEqual(len(adam_optimizer.get_accumulators()), 0)
opts = adam_optimizer.create_optimization_pass(params_grads, mul_out, opts = adam_optimizer.create_optimization_pass(params_grads, mul_out,
init_program) init_program)
self.assertEqual(len(opts), 3) self.assertEqual(len(opts), 5)
adam_op = opts[0] self.assertEqual(
self.assertEqual(adam_op.type, "adam") [op.type for op in opts],
["fill_constant", "elementwise_mul", "adam", "scale", "scale"])
# Check accumulators # Check accumulators
accumulators = adam_optimizer.get_accumulators() accumulators = adam_optimizer.get_accumulators()
...@@ -350,9 +352,10 @@ class TestAdamaxOptimizer(unittest.TestCase): ...@@ -350,9 +352,10 @@ class TestAdamaxOptimizer(unittest.TestCase):
self.assertEqual(len(adamax_optimizer.get_accumulators()), 0) self.assertEqual(len(adamax_optimizer.get_accumulators()), 0)
opts = adamax_optimizer.create_optimization_pass(params_grads, mul_out, opts = adamax_optimizer.create_optimization_pass(params_grads, mul_out,
init_program) init_program)
self.assertEqual(len(opts), 2) self.assertEqual(len(opts), 4)
adam_op = opts[0] self.assertEqual(
self.assertEqual(adam_op.type, "adamax") [op.type for op in opts],
["fill_constant", "elementwise_mul", "adamax", "scale"])
# Check accumulators # Check accumulators
accumulators = adamax_optimizer.get_accumulators() accumulators = adamax_optimizer.get_accumulators()
...@@ -409,9 +412,10 @@ class TestDecayedAdagradOptimizer(unittest.TestCase): ...@@ -409,9 +412,10 @@ class TestDecayedAdagradOptimizer(unittest.TestCase):
self.assertEqual(len(decayed_adagrad_optimizer.get_accumulators()), 0) self.assertEqual(len(decayed_adagrad_optimizer.get_accumulators()), 0)
opts = decayed_adagrad_optimizer.create_optimization_pass( opts = decayed_adagrad_optimizer.create_optimization_pass(
params_grads, mul_out, init_program) params_grads, mul_out, init_program)
self.assertEqual(len(opts), 1) self.assertEqual(len(opts), 3)
decayed_adagrad_op = opts[0] self.assertEqual(
self.assertEqual(decayed_adagrad_op.type, "decayed_adagrad") [op.type for op in opts],
["fill_constant", "elementwise_mul", "decayed_adagrad"])
# Check accumulators # Check accumulators
accumulators = decayed_adagrad_optimizer.get_accumulators() accumulators = decayed_adagrad_optimizer.get_accumulators()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册