提交 64800cfe 编写于 作者: K Kexin Zhao

merge update

# C++ Data Feeding
In training with Paddle V2 API, data feeding wholly dependents on Python code. To get rid of the Python environment and achieve the goal of "wrapping the whole training by a while loop op" in Paddle Fluid, a C++ data feeding mechanism is required.
In this document we show the fundamental design of C++ data feeding process, which includes the data reading, shuffling and batching.
## Reader
A new concept named 'Reader' is introduced. `Reader` is a series of inherited classes which can be hold by our `Variable` and they are used to read or process file data.
### `ReaderBase`
`ReaderBase` is the abstract base class of all readers. It defines the all readers' interfaces.
class ReaderBase {
explicit ReaderBase(const std::vector<DDim>& shapes) : shapes_(shapes) {
// Read the next batch of data. (A 'batch' can be only one instance)
virtual void ReadNext(std::vector<LoDTensor>* out) = 0;
// Show whether the next bacth exists.
virtual bool HasNext() const = 0;
// Reinitialize the reader and read the file from the begin.
virtual void ReInit() = 0;
// Get a certain read in data's shape.
DDim shape(size_t idx) const;
// Get shapes of all read in data.
std::vector<DDim> shapes() const { return shapes_; }
// Set shapes of read in data.
void set_shapes(const std::vector<DDim>& shapes) { shapes_ = shapes; }
virtual ~ReaderBase() {}
std::vector<DDim> shapes_;
### `FileReader` and `DecoratedReader`
These two classes are derived from the `ReaderBase` and will further be derived by respective specific readers. That is to say, in our design, there are two kinds of readers: file readers and decorated readers. A file reader reads from a file of some specific format, and yield only one instance of data at a time. e.g. RecordIO reader, jpg reader, .... A decorated reader takes another reader(both file reader and decorated reader are OK) as its 'underlying reader'. It gets data from its underlying reader, does some process on them(shuffling, or batching), then yields processed data. The output data of a decorated reader can be a single instance or a batch. `ShuffleReader` and `BatchReader` are both decorated readers.
All the readers share exactly the same interfaces defined in `ReaderBase`. So they can be decorated for more than one time: We can **shuffle** a reader's outputs and then **batch** the shuffle outputs. The interface consistency also allows related ops use readers without knowing what they are exactly.
### `ReaderHolder`
Different readers belong to different class types. It leads to a problem: How can we drop them into `Variable`s and fetch them out by a unified method? For example, if a Variable holds a `BatchReader`, we can not get it by the following code:
we have to write:
This requires each time getting a reader from a variable we must know the reader's type exactly. It is nearly impossible.
To solve this problem, we introduce `ReaderHolder` as a wrapper. It acts as an empty decorator of `ReaderBase`, which erases reader's type. With `ReaderHolder` we are able to fetch all types of readers by `var->Get<ReaderHolder>("...")` and regard the obtained object as a reader.
## Related Operators
To create and invoke readers, some now ops are introduced:
### `CreateReaderOp`
Each reader has its creating op. File readers' creating ops have no input and yield the created file reader as its output. Decorated readers' creating ops take the underlying readers as inputs and then yield new decorated readers.
### `ReadOp`
A reader is only a Variable. It cannot trigger the reading process by itself. So we add the `ReadOp` to execute it. A `ReadOp` takes a reader Variable as its input. Each time it runs, it invokes the reader‘s `ReadNext()` function and gets a new batch of data(or only one instance of data, if we use file reader directly). The output data of a reader are in the form of `std::vector<LoDTenosr>`, so the `ReadOp` also needs to split the vector and move LoDTensors to their respective output Variables.
快速开始 快速开始
======== ========
PaddlePaddle支持使用pip快速安装,目前支持CentOS 6以上, Ubuntu 14.04以及MacOS 10.12,并安装有Python2.7。 PaddlePaddle支持使用pip快速安装,目前支持CentOS 6以上, Ubuntu 14.04以及MacOS 10.12,并安装有Python2.7。
执行下面的命令完成快速安装,版本为cpu_avx_openblas: 执行下面的命令完成快速安装,版本为cpu_avx_openblas:
...@@ -16,6 +19,9 @@ PaddlePaddle支持使用pip快速安装,目前支持CentOS 6以上, Ubuntu 14. ...@@ -16,6 +19,9 @@ PaddlePaddle支持使用pip快速安装,目前支持CentOS 6以上, Ubuntu 14.
更详细的安装和编译方法参考::ref:`install_steps` 。 更详细的安装和编译方法参考::ref:`install_steps` 。
创建一个 housing.py 并粘贴此Python代码: 创建一个 housing.py 并粘贴此Python代码:
.. code-block:: python .. code-block:: python
Quick Start Quick Start
============ ============
Quick Install
You can use pip to install PaddlePaddle with a single command, supports You can use pip to install PaddlePaddle with a single command, supports
CentOS 6 above, Ubuntu 14.04 above or MacOS 10.12, with Python 2.7 installed. CentOS 6 above, Ubuntu 14.04 above or MacOS 10.12, with Python 2.7 installed.
Simply run the following command to install, the version is cpu_avx_openblas: Simply run the following command to install, the version is cpu_avx_openblas:
...@@ -17,6 +20,9 @@ If you need to install GPU version (cuda7.5_cudnn5_avx_openblas), run: ...@@ -17,6 +20,9 @@ If you need to install GPU version (cuda7.5_cudnn5_avx_openblas), run:
For more details about installation and build: :ref:`install_steps` . For more details about installation and build: :ref:`install_steps` .
Quick Use
Create a new file called housing.py, and paste this Python Create a new file called housing.py, and paste this Python
code: code:
分布式训练 分布式训练
========== ==========
.. image:: src/ps_cn.png
:width: 500
- 数据分片(Data shard): 用于训练神经网络的数据,被切分成多个部分,每个部分分别给每个trainer使用。
- 计算节点(Trainer): 每个trainer启动后读取切分好的一部分数据,开始神经网络的“前馈”和“后馈”计算,并和参数服务器通信。在完成一定量数据的训练后,上传计算得出的梯度(gradients),然后下载优化更新后的神经网络参数(parameters)。
- 参数服务器(Parameter server):每个参数服务器只保存整个神经网络所有参数的一部分。参数服务器接收从计算节点上传的梯度,并完成参数优化更新,再将更新后的参数下发到每个计算节点。
在使用同步SGD训练神经网络时,PaddlePaddle使用同步屏障(barrier),使梯度的提交和参数的更新按照顺序方式执行。在异步SGD中,则并不会等待所有trainer提交梯度才更新参数,这样极大地提高了计算的并行性:参数服务器之间不相互依赖,并行地接收梯度和更新参数,参数服务器也不会等待计算节点全部都提交梯度之后才开始下一步,计算节点之间也不会相互依赖,并行地执行模型的训练。可以看出,虽然异步SGD方式会提高参数更新并行度, 但是并不能保证参数同步更新,在任意时间某一台参数服务器上保存的参数可能比另一台要更新,与同步SGD相比,梯度会有噪声。
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
preparations_cn.md preparations_cn.md
cmd_argument_cn.md cmd_argument_cn.md
multi_cluster/index_cn.rst multi_cluster/index_cn.rst
Distributed Training Distributed Training
==================== ====================
In this section, we'll explain how to run distributed training jobs with PaddlePaddle on different types of clusters. The diagram below shows the main architecture of a distributed trainning job:
.. image:: src/ps_en.png
:width: 500
- Data shard: training data will be split into multiple partitions, trainers use the partitions of the whole dataset to do the training job.
- Trainer: each trainer reads the data shard, and train the neural network. Then the trainer will upload calculated "gradients" to parameter servers, and wait for parameters to be optimized on the parameter server side. When that finishes, the trainer download optimized parameters and continues its training.
- Parameter server: every parameter server stores part of the whole neural network model data. They will do optimization calculations when gradients are uploaded from trainers, and then send updated parameters to trainers.
PaddlePaddle can support both synchronize stochastic gradient descent (SGD) and asynchronous SGD.
When training with synchronize SGD, PaddlePaddle uses an internal "synchronize barrier" which makes gradients update and parameter download in strict order. On the other hand, asynchronous SGD won't wait for all trainers to finish upload at a single step, this will increase the parallelism of distributed training: parameter servers do not depend on each other, they'll do parameter optimization concurrently. Parameter servers will not wait for trainers, so trainers will also do their work concurrently. But asynchronous SGD will introduce more randomness and noises in the gradient.
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
preparations_en.md preparations_en.md
cmd_argument_en.md cmd_argument_en.md
multi_cluster/index_en.rst multi_cluster/index_en.rst
## 概述
<img src="https://user-images.githubusercontent.com/13348433/31772175-5f419eca-b511-11e7-9db7-5231fe3d9ccb.png" width="500">
- 数据分片(Data shard): 用于训练神经网络的数据,被切分成多个部分,每个部分分别给每个trainer使用。
- 计算节点(Trainer): 每个trainer启动后读取切分好的一部分数据,开始神经网络的“前馈”和“后馈”计算,并和参数服务器通信。在完成一定量数据的训练后,上传计算得出的梯度(gradients),然后下载优化更新后的神经网络参数(parameters)。
- 参数服务器(Parameter server):每个参数服务器只保存整个神经网络所有参数的一部分。参数服务器接收从计算节点上传的梯度,并完成参数优化更新,再将更新后的参数下发到每个计算节点。
在使用同步SGD训练神经网络时,PaddlePaddle使用同步屏障(barrier),使梯度的提交和参数的更新按照顺序方式执行。在异步SGD中,则并不会等待所有trainer提交梯度才更新参数,这样极大地提高了计算的并行性:参数服务器之间不相互依赖,并行地接收梯度和更新参数,参数服务器也不会等待计算节点全部都提交梯度之后才开始下一步,计算节点之间也不会相互依赖,并行地执行模型的训练。可以看出,虽然异步SGD方式会提高参数更新并行度, 但是并不能保证参数同步更新,在任意时间某一台参数服务器上保存的参数可能比另一台要更新,与同步SGD相比,梯度会有噪声。
## Introduction
In this section, we'll explain how to run distributed training jobs with PaddlePaddle on different types of clusters. The diagram below shows the main architecture of a distributed trainning job:
<img src="https://user-images.githubusercontent.com/13348433/31772146-41523d84-b511-11e7-8a12-a69fd136c283.png" width="500">
- Data shard: training data will be split into multiple partitions, trainers use the partitions of the whole dataset to do the training job.
- Trainer: each trainer reads the data shard, and train the neural network. Then the trainer will upload calculated "gradients" to parameter servers, and wait for parameters to be optimized on the parameter server side. When that finishes, the trainer download optimized parameters and continues its training.
- Parameter server: every parameter server stores part of the whole neural network model data. They will do optimization calculations when gradients are uploaded from trainers, and then send updated parameters to trainers.
PaddlePaddle can support both synchronize stochastic gradient descent (SGD) and asynchronous SGD.
When training with synchronize SGD, PaddlePaddle uses an internal "synchronize barrier" which makes gradients update and parameter download in strict order. On the other hand, asynchronous SGD won't wait for all trainers to finish upload at a single step, this will increase the parallelism of distributed training: parameter servers do not depend on each other, they'll do parameter optimization concurrently. Parameter servers will not wait for trainers, so trainers will also do their work concurrently. But asynchronous SGD will introduce more randomness and noises in the gradient.
RNN相关模型 RNN模型
=========== ===========
.. toctree:: .. toctree::
...@@ -20,6 +20,7 @@ endif() ...@@ -20,6 +20,7 @@ endif()
cc_test(eigen_test SRCS eigen_test.cc DEPS tensor) cc_test(eigen_test SRCS eigen_test.cc DEPS tensor)
nv_test(mixed_vector_test SRCS mixed_vector_test.cu DEPS place paddle_memory device_context init)
cc_library(lod_tensor SRCS lod_tensor.cc DEPS ddim place tensor framework_proto) cc_library(lod_tensor SRCS lod_tensor.cc DEPS ddim place tensor framework_proto)
cc_test(lod_tensor_test SRCS lod_tensor_test.cc DEPS lod_tensor paddle_memory) cc_test(lod_tensor_test SRCS lod_tensor_test.cc DEPS lod_tensor paddle_memory)
nv_test(lod_tensor_gpu_test SRCS lod_tensor_test.cu DEPS lod_tensor init) nv_test(lod_tensor_gpu_test SRCS lod_tensor_test.cu DEPS lod_tensor init)
...@@ -22,6 +22,8 @@ limitations under the License. */ ...@@ -22,6 +22,8 @@ limitations under the License. */
using paddle::framework::Channel; using paddle::framework::Channel;
using paddle::framework::MakeChannel; using paddle::framework::MakeChannel;
using paddle::framework::CloseChannel; using paddle::framework::CloseChannel;
using paddle::framework::details::Buffered;
using paddle::framework::details::UnBuffered;
TEST(Channel, MakeAndClose) { TEST(Channel, MakeAndClose) {
using paddle::framework::details::Buffered; using paddle::framework::details::Buffered;
...@@ -60,13 +62,54 @@ TEST(Channel, SufficientBufferSizeDoesntBlock) { ...@@ -60,13 +62,54 @@ TEST(Channel, SufficientBufferSizeDoesntBlock) {
delete ch; delete ch;
} }
TEST(Channel, SendOnClosedChannelPanics) { // This tests that a channel must return false
const size_t buffer_size = 10; // on send and receive performed after closing the channel.
auto ch = MakeChannel<size_t>(buffer_size); // Receive will only return false after close when queue is empty.
size_t i = 5; // By creating separate threads for sending and receiving, we make this
EXPECT_EQ(ch->Send(&i), true); // should not block or panic // function able to test both buffered and unbuffered channels.
void SendReceiveWithACloseChannelShouldPanic(Channel<size_t> *ch) {
const size_t data = 5;
std::thread send_thread{[&]() {
size_t i = data;
EXPECT_EQ(ch->Send(&i), true); // should not block
std::thread recv_thread{[&]() {
size_t i;
EXPECT_EQ(ch->Receive(&i), true); // should not block
EXPECT_EQ(i, data);
// After closing send should return false. Receive should
// also return false as there is no data in queue.
CloseChannel(ch); CloseChannel(ch);
EXPECT_EQ(ch->Send(&i), false); // should panic send_thread = std::thread{[&]() {
size_t i = data;
EXPECT_EQ(ch->Send(&i), false); // should return false
recv_thread = std::thread{[&]() {
size_t i;
// should return false because channel is closed and queue is empty
EXPECT_EQ(ch->Receive(&i), false);
TEST(Channel, SendReceiveClosedBufferedChannelPanics) {
size_t buffer_size = 10;
auto ch = MakeChannel<size_t>(buffer_size);
delete ch;
TEST(Channel, SendReceiveClosedUnBufferedChannelPanics) {
auto ch = MakeChannel<size_t>(0);
delete ch; delete ch;
} }
...@@ -381,3 +424,129 @@ TEST(Channel, UnbufferedMoreReceiveLessSendTest) { ...@@ -381,3 +424,129 @@ TEST(Channel, UnbufferedMoreReceiveLessSendTest) {
EXPECT_EQ(sum_receive, 28U); EXPECT_EQ(sum_receive, 28U);
delete ch; delete ch;
} }
// This tests that destroying a channel unblocks
// any senders waiting for channel to have write space
void ChannelDestroyUnblockSenders(Channel<int> *ch) {
size_t num_threads = 5;
std::thread t[num_threads];
bool thread_ended[num_threads];
bool send_success[num_threads];
// Launches threads that try to write and are blocked because of no readers
for (size_t i = 0; i < num_threads; i++) {
thread_ended[i] = false;
send_success[i] = false;
t[i] = std::thread(
[&](bool *ended, bool *success) {
int data = 10;
*success = ch->Send(&data);
*ended = true;
&thread_ended[i], &send_success[i]);
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
bool is_buffered_channel = false;
if (dynamic_cast<Buffered<int> *>(ch)) is_buffered_channel = true;
if (is_buffered_channel) {
// If channel is buffered, verify that atleast 4 threads are blocked
int ct = 0;
for (size_t i = 0; i < num_threads; i++) {
if (thread_ended[i] == false) ct++;
// Atleast 4 threads must be blocked
EXPECT_GE(ct, 4);
} else {
// Verify that all the threads are blocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], false);
// Explicitly destroy the channel
delete ch;
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
// Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true);
// Count number of successfuld sends
int ct = 0;
for (size_t i = 0; i < num_threads; i++) {
if (send_success[i]) ct++;
if (is_buffered_channel) {
// Only 1 send must be successful
EXPECT_EQ(ct, 1);
} else {
// In unbuffered channel, no send should be successful
EXPECT_EQ(ct, 0);
// Join all threads
for (size_t i = 0; i < num_threads; i++) t[i].join();
// This tests that destroying a channel also unblocks
// any receivers waiting on the channel
void ChannelDestroyUnblockReceivers(Channel<int> *ch) {
size_t num_threads = 5;
std::thread t[num_threads];
bool thread_ended[num_threads];
// Launches threads that try to read and are blocked because of no writers
for (size_t i = 0; i < num_threads; i++) {
thread_ended[i] = false;
t[i] = std::thread(
[&](bool *p) {
int data;
// All reads should return false
EXPECT_EQ(ch->Receive(&data), false);
*p = true;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait
// Verify that all threads are blocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], false);
// delete the channel
delete ch;
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
// Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true);
for (size_t i = 0; i < num_threads; i++) t[i].join();
TEST(Channel, BufferedChannelDestroyUnblocksReceiversTest) {
size_t buffer_size = 1;
auto ch = MakeChannel<int>(buffer_size);
TEST(Channel, BufferedChannelDestroyUnblocksSendersTest) {
size_t buffer_size = 1;
auto ch = MakeChannel<int>(buffer_size);
// This tests that destroying an unbuffered channel also unblocks
// unblocks any receivers waiting for senders
TEST(Channel, UnbufferedChannelDestroyUnblocksReceiversTest) {
auto ch = MakeChannel<int>(0);
TEST(Channel, UnbufferedChannelDestroyUnblocksSendersTest) {
auto ch = MakeChannel<int>(0);
...@@ -42,8 +42,11 @@ class Buffered : public paddle::framework::Channel<T> { ...@@ -42,8 +42,11 @@ class Buffered : public paddle::framework::Channel<T> {
std::mutex mu_; std::mutex mu_;
std::condition_variable empty_cond_var_; std::condition_variable empty_cond_var_;
std::condition_variable full_cond_var_; std::condition_variable full_cond_var_;
std::condition_variable destructor_cond_var_;
std::deque<T> channel_; std::deque<T> channel_;
std::atomic<bool> closed_{false}; std::atomic<bool> closed_{false};
std::atomic<unsigned> send_ctr{0};
std::atomic<unsigned> recv_ctr{0};
Buffered(size_t cap) : cap_(cap), closed_(false) { Buffered(size_t cap) : cap_(cap), closed_(false) {
...@@ -58,6 +61,7 @@ bool Buffered<T>::Send(T* item) { ...@@ -58,6 +61,7 @@ bool Buffered<T>::Send(T* item) {
if (closed_) { if (closed_) {
return ret; return ret;
} }
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
full_cond_var_.wait(lock, full_cond_var_.wait(lock,
[this]() { return channel_.size() < cap_ || closed_; }); [this]() { return channel_.size() < cap_ || closed_; });
...@@ -67,20 +71,30 @@ bool Buffered<T>::Send(T* item) { ...@@ -67,20 +71,30 @@ bool Buffered<T>::Send(T* item) {
empty_cond_var_.notify_one(); empty_cond_var_.notify_one();
ret = true; ret = true;
} }
return ret; return ret;
} }
template <typename T> template <typename T>
bool Buffered<T>::Receive(T* item) { bool Buffered<T>::Receive(T* item) {
bool ret = false;
// Once the channel has been closed and all data has been consumed,
// just return false. Don't even try acquiring the mutex.
if (closed_ && channel_.empty()) {
return false;
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
empty_cond_var_.wait(lock, [this]() { return !channel_.empty() || closed_; }); empty_cond_var_.wait(lock, [this]() { return !channel_.empty() || closed_; });
bool ret = false;
if (!channel_.empty()) { if (!channel_.empty()) {
*item = std::move(channel_.front()); *item = std::move(channel_.front());
channel_.pop_front(); channel_.pop_front();
full_cond_var_.notify_one(); full_cond_var_.notify_one();
ret = true; ret = true;
} }
return ret; return ret;
} }
...@@ -100,6 +114,12 @@ Buffered<T>::~Buffered() { ...@@ -100,6 +114,12 @@ Buffered<T>::~Buffered() {
closed_ = true; closed_ = true;
channel_.clear(); channel_.clear();
NotifyAllParticipants(&lock); NotifyAllParticipants(&lock);
// The destructor must wait for all readers and writers to complete their task
// The channel has been closed, so we will not accept new readers and writers
lock, [this]() { return send_ctr == 0 && recv_ctr == 0; });
} }
template <typename T> template <typename T>
...@@ -45,9 +45,11 @@ class UnBuffered : public paddle::framework::Channel<T> { ...@@ -45,9 +45,11 @@ class UnBuffered : public paddle::framework::Channel<T> {
// A transaction occurs only when both are true // A transaction occurs only when both are true
std::atomic<bool> reader_found_{false}, writer_found_{false}; std::atomic<bool> reader_found_{false}, writer_found_{false};
std::condition_variable cv_channel_; std::condition_variable cv_channel_;
std::condition_variable_any cv_reader_, cv_writer_; std::condition_variable_any cv_reader_, cv_writer_, cv_destructor_;
T* item{nullptr}; T* item{nullptr};
std::atomic<bool> closed_{false}; std::atomic<bool> closed_{false};
std::atomic<unsigned> send_ctr{0};
std::atomic<unsigned> recv_ctr{0};
UnBuffered() : closed_(false) {} UnBuffered() : closed_(false) {}
...@@ -62,6 +64,7 @@ bool UnBuffered<T>::Send(T* data) { ...@@ -62,6 +64,7 @@ bool UnBuffered<T>::Send(T* data) {
if (closed_) { if (closed_) {
return ret; return ret;
} }
// Prevent other writers from entering // Prevent other writers from entering
std::unique_lock<std::recursive_mutex> writer_lock(mu_write_); std::unique_lock<std::recursive_mutex> writer_lock(mu_write_);
writer_found_ = true; writer_found_ = true;
...@@ -81,6 +84,8 @@ bool UnBuffered<T>::Send(T* data) { ...@@ -81,6 +84,8 @@ bool UnBuffered<T>::Send(T* data) {
ret = true; ret = true;
} }
writer_found_ = false; writer_found_ = false;
return ret; return ret;
} }
...@@ -88,6 +93,12 @@ bool UnBuffered<T>::Send(T* data) { ...@@ -88,6 +93,12 @@ bool UnBuffered<T>::Send(T* data) {
// data that was sent by a writer is read from a reader. // data that was sent by a writer is read from a reader.
template <typename T> template <typename T>
bool UnBuffered<T>::Receive(T* data) { bool UnBuffered<T>::Receive(T* data) {
bool ret = false;
// If channel is closed, we don't even want any reader to enter.
// Unlike a buffered channel, an unbuffered channel does not allow
// readers to read after closing because there is no buffer to be consumed.
if (closed_) return ret;
// Prevent other readers from entering // Prevent other readers from entering
std::unique_lock<std::recursive_mutex> read_lock{mu_read_}; std::unique_lock<std::recursive_mutex> read_lock{mu_read_};
reader_found_ = true; reader_found_ = true;
...@@ -96,7 +107,6 @@ bool UnBuffered<T>::Receive(T* data) { ...@@ -96,7 +107,6 @@ bool UnBuffered<T>::Receive(T* data) {
cv_reader_.wait(cv_lock, cv_reader_.wait(cv_lock,
[this]() { return writer_found_ == true || closed_; }); [this]() { return writer_found_ == true || closed_; });
cv_writer_.notify_one(); cv_writer_.notify_one();
bool ret = false;
if (!closed_) { if (!closed_) {
std::unique_lock<std::mutex> lock_ch{mu_ch_}; std::unique_lock<std::mutex> lock_ch{mu_ch_};
// Reader should wait for the writer to first write its data // Reader should wait for the writer to first write its data
...@@ -110,6 +120,8 @@ bool UnBuffered<T>::Receive(T* data) { ...@@ -110,6 +120,8 @@ bool UnBuffered<T>::Receive(T* data) {
cv_channel_.notify_one(); cv_channel_.notify_one();
} }
reader_found_ = false; reader_found_ = false;
return ret; return ret;
} }
...@@ -135,6 +147,9 @@ UnBuffered<T>::~UnBuffered() { ...@@ -135,6 +147,9 @@ UnBuffered<T>::~UnBuffered() {
item = nullptr; item = nullptr;
closed_ = true; closed_ = true;
NotifyAllParticipants(&lock); NotifyAllParticipants(&lock);
[this]() { return send_ctr == 0 && recv_ctr == 0; });
} }
// This function notifies all the readers, writers and // This function notifies all the readers, writers and
...@@ -48,12 +48,26 @@ namespace framework { ...@@ -48,12 +48,26 @@ namespace framework {
*/ */
struct LoD : public std::vector<Vector<size_t>> { struct LoD : public std::vector<Vector<size_t>> {
using std::vector<Vector<size_t>>::vector; using std::vector<Vector<size_t>>::vector;
platform::Place place() const {
if (this->size() == 0) {
// Not Initialze Yet.
return platform::CPUPlace();
} else {
return this->front().place();
void CopyFromCUDA() { void CopyFromCUDA() {
for (auto it = this->begin(); it != this->end(); ++it) { for (auto it = this->begin(); it != this->end(); ++it) {
it->CopyFromCUDA(); it->CopyFromCUDA();
} }
} }
void CopyToPeer(platform::Place place) {
for (auto it = this->begin(); it != this->end(); ++it) {
}; };
std::ostream& operator<<(std::ostream& os, const LoD& lod); std::ostream& operator<<(std::ostream& os, const LoD& lod);
...@@ -28,28 +28,6 @@ __global__ void test(size_t* a, int size) { ...@@ -28,28 +28,6 @@ __global__ void test(size_t* a, int size) {
} }
} }
TEST(Vector, Normal) {
using namespace paddle::framework;
using namespace paddle::platform;
using namespace paddle::memory;
paddle::framework::Vector<size_t> vec({1, 2, 3});
size_t* ptr = vec.data();
for (size_t i = 0; i < vec.size(); ++i) {
EXPECT_EQ(vec[i], *(ptr + i));
std::vector<size_t> v = {1, 2, 3};
for (size_t i = 0; i < v.size(); ++i) {
EXPECT_EQ(v[i], vec[i]);
TEST(LoD, data) { TEST(LoD, data) {
paddle::framework::InitDevices(); paddle::framework::InitDevices();
...@@ -40,20 +40,21 @@ class Vector : public std::vector<T> { ...@@ -40,20 +40,21 @@ class Vector : public std::vector<T> {
Vector() {} Vector() {}
Vector(const std::vector<T> &v) : std::vector<T>(v) {} // NOLINT Vector(const std::vector<T> &v) : std::vector<T>(v) {} // NOLINT
virtual ~Vector() { inline platform::Place place() const { return place_; }
if (cuda_ptr_ != nullptr) {
memory::Free<platform::CUDAPlace>(place_, cuda_ptr_);
/*! Return a pointer to constant memory block. */
inline const T *data(platform::Place place) const;
/*! Return a pointer to mutable memory block. */
inline T *mutable_data(platform::Place place);
// TODO(dzhwinter): below interfaces should be removed
/* Get device vector */ /* Get device vector */
T *cuda_data() { T *cuda_data() {
CopyToCUDA(); CopyToCUDA();
cuda_ptr_, "No data or Insufficient CUDA memory to allocation"); cuda_ptr_, "No data or Insufficient CUDA memory to allocation");
return static_cast<T *>(cuda_ptr_); return static_cast<T *>(cuda_ptr_.get());
} }
/* Get host vector */ /* Get host vector */
...@@ -76,25 +77,73 @@ class Vector : public std::vector<T> { ...@@ -76,25 +77,73 @@ class Vector : public std::vector<T> {
void CopyToPeer(platform::Place); void CopyToPeer(platform::Place);
private: private:
void *cuda_ptr_ = nullptr; std::shared_ptr<void> cuda_ptr_;
size_t cuda_size_ = 0; // device vector numel size_t cuda_size_ = 0; // device vector numel
platform::CUDAPlace place_; platform::CUDAPlace place_;
}; };
template <typename T> template <typename T>
void Vector<T>::CopyToCUDA() { inline const T *Vector<T>::data(platform::Place place) const {
if (platform::is_cpu_place(place)) {
return std::vector<T>::data();
} else if (platform::is_gpu_place(place)) {
if (cuda_ptr_ == nullptr) {
return nullptr;
if (boost::get<platform::CUDAPlace>(place) == place_) {
return static_cast<const T *>(cuda_ptr_.get());
} else {
"Unmatched place. Please use `mutable_data` copy lod to the target "
"Place first.");
} else {
PADDLE_THROW("Unsupport Place.");
template <typename T>
inline T *Vector<T>::mutable_data(platform::Place place) {
if (platform::is_cpu_place(place)) {
return std::vector<T>::data();
} else if (platform::is_gpu_place(place)) {
if (boost::get<platform::CUDAPlace>(place) != place_) {
place_ = boost::get<platform::CUDAPlace>(place);
if (cuda_size_ < this->size()) { if (cuda_size_ < this->size() || cuda_ptr_ == nullptr) {
if (cuda_ptr_ != nullptr) { cuda_ptr_.reset(
memory::Free<platform::CUDAPlace>(place_, cuda_ptr_); memory::Alloc<platform::CUDAPlace>(place_, this->size() * sizeof(T)),
memory::PlainDeleter<void, platform::CUDAPlace>(place_));
} }
cuda_ptr_ = cuda_size_ = this->size();
memory::Alloc<platform::CUDAPlace>(place_, this->size() * sizeof(T)); platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto *ctx = pool.GetByPlace(place_);
memory::Copy(place_, cuda_ptr_.get(), platform::CPUPlace(),
static_cast<const void *>(this->data()),
this->size() * sizeof(T), ctx->stream());
return static_cast<T *>(cuda_ptr_.get());
return nullptr;
} else {
PADDLE_THROW("Unsupport Place.");
template <typename T>
void Vector<T>::CopyToCUDA() {
if (cuda_size_ < this->size() || cuda_ptr_ == nullptr) {
memory::Alloc<platform::CUDAPlace>(place_, this->size() * sizeof(T)),
memory::PlainDeleter<void, platform::CUDAPlace>(place_));
} }
cuda_size_ = this->size(); cuda_size_ = this->size();
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto *ctx = pool.GetByPlace(place_); auto *ctx = pool.GetByPlace(place_);
memory::Copy(place_, cuda_ptr_, platform::CPUPlace(), memory::Copy(place_, cuda_ptr_.get(), platform::CPUPlace(),
static_cast<const void *>(this->data()), static_cast<const void *>(this->data()),
this->size() * sizeof(T), ctx->stream()); this->size() * sizeof(T), ctx->stream());
ctx->Wait(); ctx->Wait();
...@@ -112,32 +161,32 @@ void Vector<T>::CopyFromCUDA() { ...@@ -112,32 +161,32 @@ void Vector<T>::CopyFromCUDA() {
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto *ctx = pool.GetByPlace(place_); auto *ctx = pool.GetByPlace(place_);
memory::Copy(platform::CPUPlace(), static_cast<void *>(this->data()), place_, memory::Copy(platform::CPUPlace(), static_cast<void *>(this->data()), place_,
static_cast<const void *>(cuda_ptr_), this->size() * sizeof(T), static_cast<const void *>(cuda_ptr_.get()),
ctx->stream()); this->size() * sizeof(T), ctx->stream());
ctx->Wait(); ctx->Wait();
#endif #endif
} }
template <typename T> template <typename T>
void Vector<T>::CopyToPeer(platform::Place peer_place) { void Vector<T>::CopyToPeer(platform::Place place) {
auto *ctx = platform::DeviceContextPool::Instance().GetByPlace(place_); if (boost::get<platform::CUDAPlace>(place) != place_) {
void *peer_cuda_ptr = memory::Alloc<platform::CUDAPlace>( place_ = boost::get<platform::CUDAPlace>(place);
boost::get<platform::CUDAPlace>(peer_place), this->size() * sizeof(T)); }
memory::Copy(boost::get<platform::CUDAPlace>(peer_place), peer_cuda_ptr, if (cuda_size_ < this->size() || cuda_ptr_ == nullptr) {
place_, cuda_ptr_, this->size() * sizeof(T), ctx->stream()); cuda_ptr_.reset(
memory::Alloc<platform::CUDAPlace>(place_, this->size() * sizeof(T)),
memory::PlainDeleter<void, platform::CUDAPlace>(place_));
cuda_size_ = this->size();
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto *ctx = pool.GetByPlace(place_);
memory::Copy(place_, cuda_ptr_.get(), platform::CPUPlace(),
static_cast<const void *>(this->data()),
this->size() * sizeof(T), ctx->stream());
ctx->Wait(); ctx->Wait();
memory::Free<platform::CUDAPlace>(place_, cuda_ptr_);
place_ = boost::get<platform::CUDAPlace>(peer_place);
cuda_ptr_ = peer_cuda_ptr;
#endif #endif
} }
template class Vector<int>;
template class Vector<unsigned>;
template class Vector<size_t>;
template class Vector<int64_t>;
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License. */
#include <cuda.h>
#include <cuda_runtime.h>
#include "gtest/gtest.h"
#include "paddle/framework/init.h"
#include "paddle/framework/mixed_vector.h"
using namespace paddle::framework;
using namespace paddle::platform;
using namespace paddle::memory;
template <typename T>
__global__ void test(T* data, int size) {
for (int i = blockIdx.x * blockDim.x + threadIdx.x; i < size;
i += blockDim.x * gridDim.x) {
data[i] *= 2;
TEST(Vector, Normal) {
// fill the device context pool.
Vector<size_t> vec({1, 2, 3});
size_t* ptr = vec.data();
for (size_t i = 0; i < vec.size(); ++i) {
EXPECT_EQ(vec[i], *(ptr + i));
std::vector<size_t> v = {1, 2, 3};
for (size_t i = 0; i < v.size(); ++i) {
EXPECT_EQ(v[i], vec[i]);
TEST(Vector, MultipleCopy) {
Vector<size_t> vec({1, 2, 3});
CUDAPlace place(0);
auto vec2 = Vector<size_t>(vec);
const size_t* ptr = vec2.data(CPUPlace());
for (size_t i = 0; i < vec2.size(); ++i) {
EXPECT_EQ(*(ptr + i), vec[i]);
test<size_t><<<3, 3>>>(vec2.mutable_data(place), vec2.size());
const size_t* ptr = vec2.data(CPUPlace());
for (size_t i = 0; i < vec2.size(); ++i) {
EXPECT_EQ(*(ptr + i), vec[i] * 2);
set(PYTHON_TESTS_DIR ${PADDLE_SOURCE_DIR}/python/paddle/v2/fluid/tests) function(inference_test TARGET_NAME)
cc_test(test_inference_recognize_digits_mlp set(options "")
SRCS test_inference_recognize_digits.cc set(oneValueArgs "")
DEPS ARCHIVE_START paddle_fluid ARCHIVE_END set(multiValueArgs ARGS)
ARGS --dirname=${PYTHON_TESTS_DIR}/book/recognize_digits_mlp.inference.model) cmake_parse_arguments(inference_test "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
SRCS test_inference_image_classification.cc set(PYTHON_TESTS_DIR ${PADDLE_SOURCE_DIR}/python/paddle/v2/fluid/tests)
DEPS ARCHIVE_START paddle_fluid ARCHIVE_END if(inference_test_ARGS)
ARGS --dirname=${PYTHON_TESTS_DIR}/book/image_classification_vgg.inference.model) foreach(arg ${inference_test_ARGS})
cc_test(test_inference_image_classification_resnet cc_test(test_inference_${TARGET_NAME}_${arg}
SRCS test_inference_image_classification.cc SRCS test_inference_${TARGET_NAME}.cc
ARGS --dirname=${PYTHON_TESTS_DIR}/book/image_classification_resnet.inference.model) ARGS --dirname=${PYTHON_TESTS_DIR}/book/${TARGET_NAME}_${arg}.inference.model)
cc_test(test_inference_label_semantic_roles set_tests_properties(test_inference_${TARGET_NAME}_${arg}
SRCS test_inference_label_semantic_roles.cc PROPERTIES DEPENDS test_${TARGET_NAME})
DEPS ARCHIVE_START paddle_fluid ARCHIVE_END endforeach()
ARGS --dirname=${PYTHON_TESTS_DIR}/book/label_semantic_roles.inference.model) else()
cc_test(test_inference_rnn_encoder_decoder cc_test(test_inference_${TARGET_NAME}
SRCS test_inference_rnn_encoder_decoder.cc SRCS test_inference_${TARGET_NAME}.cc
ARGS --dirname=${PYTHON_TESTS_DIR}/book/rnn_encoder_decoder.inference.model) ARGS --dirname=${PYTHON_TESTS_DIR}/book/${TARGET_NAME}.inference.model)
set_tests_properties(test_inference_recognize_digits_mlp set_tests_properties(test_inference_${TARGET_NAME}
set_tests_properties(test_inference_image_classification_vgg endif()
PROPERTIES DEPENDS test_image_classification_train) endfunction(inference_test)
PROPERTIES DEPENDS test_image_classification_train) inference_test(recognize_digits ARGS mlp)
set_tests_properties(test_inference_label_semantic_roles inference_test(image_classification ARGS vgg resnet)
PROPERTIES DEPENDS test_label_semantic_roles) inference_test(label_semantic_roles)
set_tests_properties(test_inference_rnn_encoder_decoder inference_test(rnn_encoder_decoder)
PROPERTIES DEPENDS test_rnn_encoder_decoder)
...@@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ...@@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include <time.h>
#include "paddle/framework/lod_tensor.h" #include "paddle/framework/lod_tensor.h"
#include "paddle/inference/io.h" #include "paddle/inference/io.h"
...@@ -13,51 +13,11 @@ See the License for the specific language governing permissions and ...@@ -13,51 +13,11 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <time.h>
#include <sstream>
#include "gflags/gflags.h" #include "gflags/gflags.h"
#include "paddle/framework/lod_tensor.h" #include "test_helper.h"
#include "paddle/inference/io.h"
DEFINE_string(dirname, "", "Directory of the inference model."); DEFINE_string(dirname, "", "Directory of the inference model.");
template <typename Place, typename T>
void TestInference(const std::string& dirname,
const std::vector<paddle::framework::LoDTensor*>& cpu_feeds,
std::vector<paddle::framework::LoDTensor*>& cpu_fetchs) {
// 1. Define place, executor and scope
auto place = Place();
auto executor = paddle::framework::Executor(place);
auto* scope = new paddle::framework::Scope();
// 2. Initialize the inference_program and load all parameters from file
auto inference_program = paddle::inference::Load(executor, *scope, dirname);
// 3. Get the feed_target_names and fetch_target_names
const std::vector<std::string>& feed_target_names =
const std::vector<std::string>& fetch_target_names =
// 4. Prepare inputs: set up maps for feed targets
std::map<std::string, const paddle::framework::LoDTensor*> feed_targets;
for (size_t i = 0; i < feed_target_names.size(); ++i) {
// Please make sure that cpu_feeds[i] is right for feed_target_names[i]
feed_targets[feed_target_names[i]] = cpu_feeds[i];
// 5. Define Tensor to get the outputs: set up maps for fetch targets
std::map<std::string, paddle::framework::LoDTensor*> fetch_targets;
for (size_t i = 0; i < fetch_target_names.size(); ++i) {
fetch_targets[fetch_target_names[i]] = cpu_fetchs[i];
// 6. Run the inference program
executor.Run(*inference_program, scope, feed_targets, fetch_targets);
delete scope;
TEST(inference, image_classification) { TEST(inference, image_classification) {
if (FLAGS_dirname.empty()) { if (FLAGS_dirname.empty()) {
LOG(FATAL) << "Usage: ./example --dirname=path/to/your/model"; LOG(FATAL) << "Usage: ./example --dirname=path/to/your/model";
...@@ -70,12 +30,10 @@ TEST(inference, image_classification) { ...@@ -70,12 +30,10 @@ TEST(inference, image_classification) {
// In unittests, this is done in paddle/testing/paddle_gtest_main.cc // In unittests, this is done in paddle/testing/paddle_gtest_main.cc
paddle::framework::LoDTensor input; paddle::framework::LoDTensor input;
srand(time(0)); // Use normilized image pixels as input data,
float* input_ptr = // which should be in the range [0.0, 1.0].
input.mutable_data<float>({1, 3, 32, 32}, paddle::platform::CPUPlace()); SetupTensor<float>(
for (int i = 0; i < 3072; ++i) { input, {1, 3, 32, 32}, static_cast<float>(0), static_cast<float>(1));
input_ptr[i] = rand() / (static_cast<float>(RAND_MAX));
std::vector<paddle::framework::LoDTensor*> cpu_feeds; std::vector<paddle::framework::LoDTensor*> cpu_feeds;
cpu_feeds.push_back(&input); cpu_feeds.push_back(&input);
...@@ -98,16 +56,6 @@ TEST(inference, image_classification) { ...@@ -98,16 +56,6 @@ TEST(inference, image_classification) {
dirname, cpu_feeds, cpu_fetchs2); dirname, cpu_feeds, cpu_fetchs2);
LOG(INFO) << output2.dims(); LOG(INFO) << output2.dims();
EXPECT_EQ(output1.dims(), output2.dims()); CheckError<float>(output1, output2);
EXPECT_EQ(output1.numel(), output2.numel());
float err = 1E-3;
int count = 0;
for (int64_t i = 0; i < output1.numel(); ++i) {
if (fabs(output1.data<float>()[i] - output2.data<float>()[i]) > err) {
EXPECT_EQ(count, 0) << "There are " << count << " different elements.";
#endif #endif
} }
...@@ -13,8 +13,6 @@ See the License for the specific language governing permissions and ...@@ -13,8 +13,6 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <time.h>
#include <sstream>
#include "gflags/gflags.h" #include "gflags/gflags.h"
#include "test_helper.h" #include "test_helper.h"
...@@ -13,8 +13,6 @@ See the License for the specific language governing permissions and ...@@ -13,8 +13,6 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <time.h>
#include <sstream>
#include "gflags/gflags.h" #include "gflags/gflags.h"
#include "test_helper.h" #include "test_helper.h"
...@@ -81,5 +81,23 @@ class PODDeleter { ...@@ -81,5 +81,23 @@ class PODDeleter {
Place place_; Place place_;
}; };
* \brief Free memory block in one place does not meet POD
* \note In some cases, custom deleter is used to
* deallocate the memory automatically for
* std::unique_ptr<T> in tensor.h.
template <typename T, typename Place>
class PlainDeleter {
explicit PlainDeleter(Place place) : place_(place) {}
void operator()(T* ptr) { Free(place_, reinterpret_cast<void*>(ptr)); }
Place place_;
} // namespace memory } // namespace memory
} // namespace paddle } // namespace paddle
...@@ -80,6 +80,14 @@ class CTCAlignOpCUDAKernel : public framework::OpKernel<T> { ...@@ -80,6 +80,14 @@ class CTCAlignOpCUDAKernel : public framework::OpKernel<T> {
// resize output dims // resize output dims
output->Resize({static_cast<int64_t>(host_out_lod0.back()), 1}); output->Resize({static_cast<int64_t>(host_out_lod0.back()), 1});
if (host_out_lod0.back() == 0) {
output->Resize({1, 1});
math::SetConstant<platform::CUDADeviceContext, T> set_constant;
set_constant(ctx.template device_context<platform::CUDADeviceContext>(),
output, -1);
} }
}; };
...@@ -16,6 +16,8 @@ limitations under the License. */ ...@@ -16,6 +16,8 @@ limitations under the License. */
#include <string.h> #include <string.h>
#include "paddle/framework/op_registry.h" #include "paddle/framework/op_registry.h"
#include "paddle/operators/math/math_function.h"
namespace paddle { namespace paddle {
namespace operators { namespace operators {
...@@ -65,9 +67,14 @@ class CTCAlignKernel : public framework::OpKernel<T> { ...@@ -65,9 +67,14 @@ class CTCAlignKernel : public framework::OpKernel<T> {
framework::LoD output_lod; framework::LoD output_lod;
output_lod.push_back(output_lod0); output_lod.push_back(output_lod0);
output->set_lod(output_lod); output->set_lod(output_lod);
// resize output dims // resize output dims
output->Resize({static_cast<int64_t>(output_lod0.back()), 1}); output->Resize({static_cast<int64_t>(output_lod0.back()), 1});
// for empty sequence
if (output_lod0.back() == 0) {
output->Resize({1, 1});
output_data = output->mutable_data<T>(ctx.GetPlace());
output_data[0] = -1;
} }
}; };
...@@ -76,18 +76,25 @@ inline void CopyOrShare(const framework::Variable &src, ...@@ -76,18 +76,25 @@ inline void CopyOrShare(const framework::Variable &src,
if (src.IsType<LoDTensor>()) { if (src.IsType<LoDTensor>()) {
if (src.Get<LoDTensor>().place() == dst_place) { if (src.Get<LoDTensor>().place() == dst_place) {
dst->GetMutable<LoDTensor>()->ShareDataWith(src.Get<LoDTensor>()); dst->GetMutable<LoDTensor>()->ShareDataWith(src.Get<LoDTensor>());
} else { } else {
Copy(src.Get<LoDTensor>(), dst_place, dst->GetMutable<LoDTensor>()); Copy(src.Get<LoDTensor>(), dst_place, dst->GetMutable<LoDTensor>());
framework::LoD lod(src.Get<LoDTensor>().lod());
} }
} else if (src.IsType<SelectedRows>()) { } else if (src.IsType<SelectedRows>()) {
auto &src_sr = src.Get<SelectedRows>(); auto &src_sr = src.Get<SelectedRows>();
auto *dst_sr = dst->GetMutable<SelectedRows>(); auto *dst_sr = dst->GetMutable<SelectedRows>();
dst_sr->set_height(src_sr.height()); dst_sr->set_height(src_sr.height());
if (src_sr.value().place() == dst_place) { if (src_sr.value().place() == dst_place) {
dst_sr->mutable_value()->ShareDataWith(src_sr.value()); dst_sr->mutable_value()->ShareDataWith(src_sr.value());
} else { } else {
Copy(src_sr.value(), dst_place, dst_sr->mutable_value()); Copy(src_sr.value(), dst_place, dst_sr->mutable_value());
framework::Vector<int64_t> lod(src_sr.rows());
} }
} else { } else {
PADDLE_THROW("Expect LoDTensor/SelectedRows, get %s", src.Type().name()); PADDLE_THROW("Expect LoDTensor/SelectedRows, get %s", src.Type().name());
...@@ -145,6 +152,9 @@ class ParallelDoOp : public framework::OperatorBase { ...@@ -145,6 +152,9 @@ class ParallelDoOp : public framework::OperatorBase {
auto *sub_scope = sub_scopes[i]; auto *sub_scope = sub_scopes[i];
auto *dst = sub_scope->Var(param)->GetMutable<LoDTensor>(); auto *dst = sub_scope->Var(param)->GetMutable<LoDTensor>();
framework::Copy(src, place, dst); framework::Copy(src, place, dst);
framework::LoD lod(src.lod());
} }
} }
WaitOnPlaces(places); WaitOnPlaces(places);
...@@ -248,17 +258,19 @@ class ParallelDoGradOp : public framework::OperatorBase { ...@@ -248,17 +258,19 @@ class ParallelDoGradOp : public framework::OperatorBase {
const std::vector<framework::Scope *> &sub_scopes, const std::vector<framework::Scope *> &sub_scopes,
const platform::PlaceList &places) const { const platform::PlaceList &places) const {
for (auto &s : Outputs(framework::GradVarName(kParameters))) { for (auto &s : Outputs(framework::GradVarName(kParameters))) {
VLOG(3) << "Accumulating " << s;
if (s == framework::kEmptyVarName) continue;
std::string tmp_name; std::string tmp_name;
auto *tmp = sub_scopes[0]->Var(&tmp_name); auto *tmp = sub_scopes[0]->Var(&tmp_name);
for (size_t i = 1; i < sub_scopes.size(); ++i) { for (size_t i = 1; i < sub_scopes.size(); ++i) {
CopyOrShare(*sub_scopes[i]->FindVar(s), places[0], tmp); CopyOrShare(*sub_scopes[i]->FindVar(s), places[0], tmp);
WaitOnPlace(places[0]); WaitOnPlaces(places);
auto sum_op = framework::OpRegistry::CreateOp( auto sum_op = framework::OpRegistry::CreateOp(
"sum", {{"X", {s, tmp_name}}}, {{"Out", {s}}}, "sum", {{"X", {s, tmp_name}}}, {{"Out", {s}}},
framework::AttributeMap{}); framework::AttributeMap{});
VLOG(3) << sum_op->DebugStringEx(sub_scopes[0]); VLOG(10) << sum_op->DebugStringEx(sub_scopes[0]);
sum_op->Run(*sub_scopes[0], places[0]); sum_op->Run(*sub_scopes[0], places[0]);
WaitOnPlace(places[0]); WaitOnPlace(places[0]);
} }
...@@ -334,16 +346,9 @@ class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker { ...@@ -334,16 +346,9 @@ class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker {
class ParallelDoGradOpShapeInference : public framework::InferShapeBase { class ParallelDoGradOpShapeInference : public framework::InferShapeBase {
public: public:
void operator()(framework::InferShapeContext *ctx) const override { void operator()(framework::InferShapeContext *ctx) const override {
std::vector<std::string> input{kParameters, kInputs};
std::vector<std::string> output{kOutputs};
PADDLE_ENFORCE(ctx->HasInputs(kParameters)); PADDLE_ENFORCE(ctx->HasInputs(kParameters));
PADDLE_ENFORCE(ctx->HasInputs(kInputs)); PADDLE_ENFORCE(ctx->HasInputs(kInputs));
for (auto &s : output) {
ctx->SetOutputsDim(framework::GradVarName(kParameters), ctx->SetOutputsDim(framework::GradVarName(kParameters),
ctx->GetInputsDim(kParameters)); ctx->GetInputsDim(kParameters));
...@@ -360,10 +365,14 @@ class ParallelDoGradOpShapeInference : public framework::InferShapeBase { ...@@ -360,10 +365,14 @@ class ParallelDoGradOpShapeInference : public framework::InferShapeBase {
ctx->SetDims({ig_name}, {i_dims[i]}); ctx->SetDims({ig_name}, {i_dims[i]});
} }
if (ctx->HasInputs(kParameters)) { auto p_dims = ctx->GetInputsDim(kParameters);
PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(kParameters))); auto pg_names = ctx->Outputs(framework::GradVarName(kParameters));
ctx->SetOutputsDim(framework::GradVarName(kParameters), for (size_t i = 0; i < pg_names.size(); ++i) {
ctx->GetInputsDim(kParameters)); auto &pg_name = pg_names[i];
if (pg_name == framework::kEmptyVarName) {
ctx->SetDims({pg_name}, {p_dims[i]});
} }
} }
}; };
...@@ -38,6 +38,7 @@ __all__ = [ ...@@ -38,6 +38,7 @@ __all__ = [
'array_write', 'array_write',
'create_array', 'create_array',
'less_than', 'less_than',
'array_read', 'array_read',
'shrink_memory', 'shrink_memory',
'array_length', 'array_length',
...@@ -276,21 +277,20 @@ class ParallelDo(object): ...@@ -276,21 +277,20 @@ class ParallelDo(object):
parent_block = self.parent_block() parent_block = self.parent_block()
local_inputs = set() local_inputs = set()
params = list()
for op in current_block.ops:
for oname in op.output_names:
for out_var_name in op.output(oname):
for var in self.inputs: for var in self.inputs:
local_inputs.add(var.name) local_inputs.add(var.name)
params = list()
for op in current_block.ops: for op in current_block.ops:
for iname in op.input_names: for iname in op.input_names:
for in_var_name in op.input(iname): for in_var_name in op.input(iname):
if in_var_name not in local_inputs: if in_var_name not in local_inputs:
params.append(in_var_name) params.append(in_var_name)
for oname in op.output_names:
for out_var_name in op.output(oname):
params = list(set(params)) params = list(set(params))
return [parent_block.var(name) for name in params] return [parent_block.var(name) for name in params]
...@@ -975,6 +975,36 @@ def less_than(x, y, cond=None, **ignored): ...@@ -975,6 +975,36 @@ def less_than(x, y, cond=None, **ignored):
return cond return cond
def equal(x, y, cond=None, **ignored):
This layer returns the truth value of :math:`x == y` elementwise.
x(Variable): First operand of *equal*
y(Variable): Second operand of *equal*
cond(Variable|None): Optional output variable to store the result of *equal*
Variable: The tensor variable storing the output of *equal*.
.. code-block:: python
less = fluid.layers.equal(x=label, y=limit)
helper = LayerHelper("equal", **locals())
if cond is None:
cond = helper.create_tmp_variable(dtype='bool')
cond.stop_gradient = True
type='equal', inputs={'X': [x],
'Y': [y]}, outputs={'Out': [cond]})
return cond
def array_read(array, i): def array_read(array, i):
"""This function performs the operation to read the data in as an """This function performs the operation to read the data in as an
...@@ -410,12 +410,12 @@ def dynamic_lstmp(input, ...@@ -410,12 +410,12 @@ def dynamic_lstmp(input,
""" """
**Dynamic LSTMP Layer** **Dynamic LSTMP Layer**
LSTMP (LSTM with recurrent projection) layer has a separate projection LSTMP (LSTM with recurrent projection) layer has a separate projection
layer after the LSTM layer, projecting the original hidden state to a layer after the LSTM layer, projecting the original hidden state to a
lower-dimensional one, which is proposed to reduce the number of total lower-dimensional one, which is proposed to reduce the number of total
parameters and furthermore computational complexity for the LSTM, parameters and furthermore computational complexity for the LSTM,
espeacially for the case that the size of output units is relative espeacially for the case that the size of output units is relative
large (https://research.google.com/pubs/archive/43905.pdf). large (https://research.google.com/pubs/archive/43905.pdf).
The formula is as follows: The formula is as follows:
...@@ -441,27 +441,27 @@ def dynamic_lstmp(input, ...@@ -441,27 +441,27 @@ def dynamic_lstmp(input,
the matrix of weights from the input gate to the input). the matrix of weights from the input gate to the input).
* :math:`W_{ic}`, :math:`W_{fc}`, :math:`W_{oc}`: Diagonal weight \ * :math:`W_{ic}`, :math:`W_{fc}`, :math:`W_{oc}`: Diagonal weight \
matrices for peephole connections. In our implementation, \ matrices for peephole connections. In our implementation, \
we use vectors to reprenset these diagonal weight matrices. we use vectors to reprenset these diagonal weight matrices.
* :math:`b`: Denotes bias vectors (e.g. :math:`b_i` is the input gate \ * :math:`b`: Denotes bias vectors (e.g. :math:`b_i` is the input gate \
bias vector). bias vector).
* :math:`\sigma`: The activation, such as logistic sigmoid function. * :math:`\sigma`: The activation, such as logistic sigmoid function.
* :math:`i, f, o` and :math:`c`: The input gate, forget gate, output \ * :math:`i, f, o` and :math:`c`: The input gate, forget gate, output \
gate, and cell activation vectors, respectively, all of which have \ gate, and cell activation vectors, respectively, all of which have \
the same size as the cell output activation vector :math:`h`. the same size as the cell output activation vector :math:`h`.
* :math:`h`: The hidden state. * :math:`h`: The hidden state.
* :math:`r`: The recurrent projection of the hidden state. * :math:`r`: The recurrent projection of the hidden state.
* :math:`\\tilde{c_t}`: The candidate hidden state, whose \ * :math:`\\tilde{c_t}`: The candidate hidden state, whose \
computation is based on the current input and previous hidden state. computation is based on the current input and previous hidden state.
* :math:`\odot`: The element-wise product of the vectors. * :math:`\odot`: The element-wise product of the vectors.
* :math:`act_g` and :math:`act_h`: The cell input and cell output \ * :math:`act_g` and :math:`act_h`: The cell input and cell output \
activation functions and `tanh` is usually used for them. activation functions and `tanh` is usually used for them.
* :math:`\overline{act_h}`: The activation function for the projection \ * :math:`\overline{act_h}`: The activation function for the projection \
output, usually using `identity` or same as :math:`act_h`. output, usually using `identity` or same as :math:`act_h`.
Set `use_peepholes` to `False` to disable peephole connection. The formula Set `use_peepholes` to `False` to disable peephole connection. The formula
is omitted here, please refer to the paper is omitted here, please refer to the paper
http://www.bioinf.jku.at/publications/older/2604.pdf for details. http://www.bioinf.jku.at/publications/older/2604.pdf for details.
Note that these :math:`W_{xi}x_{t}, W_{xf}x_{t}, W_{xc}x_{t}, W_{xo}x_{t}` Note that these :math:`W_{xi}x_{t}, W_{xf}x_{t}, W_{xc}x_{t}, W_{xo}x_{t}`
operations on the input :math:`x_{t}` are NOT included in this operator. operations on the input :math:`x_{t}` are NOT included in this operator.
Users can choose to use fully-connected layer before LSTMP layer. Users can choose to use fully-connected layer before LSTMP layer.
...@@ -479,8 +479,8 @@ def dynamic_lstmp(input, ...@@ -479,8 +479,8 @@ def dynamic_lstmp(input,
- Hidden-hidden weight = {:math:`W_{ch}, W_{ih}, \ - Hidden-hidden weight = {:math:`W_{ch}, W_{ih}, \
W_{fh}, W_{oh}`}. W_{fh}, W_{oh}`}.
- The shape of hidden-hidden weight is (P x 4D), - The shape of hidden-hidden weight is (P x 4D),
where P is the projection size and D the hidden where P is the projection size and D the hidden
size. size.
- Projection weight = {:math:`W_{rh}`}. - Projection weight = {:math:`W_{rh}`}.
- The shape of projection weight is (D x P). - The shape of projection weight is (D x P).
...@@ -525,9 +525,9 @@ def dynamic_lstmp(input, ...@@ -525,9 +525,9 @@ def dynamic_lstmp(input,
hidden_dim, proj_dim = 512, 256 hidden_dim, proj_dim = 512, 256
fc_out = fluid.layers.fc(input=input_seq, size=hidden_dim * 4, fc_out = fluid.layers.fc(input=input_seq, size=hidden_dim * 4,
act=None, bias_attr=None) act=None, bias_attr=None)
proj_out, _ = fluid.layers.dynamic_lstmp(input=fc_out, proj_out, _ = fluid.layers.dynamic_lstmp(input=fc_out,
size=hidden_dim * 4, size=hidden_dim * 4,
proj_size=proj_dim, proj_size=proj_dim,
use_peepholes=False, use_peepholes=False,
is_reverse=True, is_reverse=True,
cell_activation="tanh", cell_activation="tanh",
...@@ -2525,7 +2525,8 @@ def ctc_greedy_decoder(input, blank, name=None): ...@@ -2525,7 +2525,8 @@ def ctc_greedy_decoder(input, blank, name=None):
interval [0, num_classes + 1). interval [0, num_classes + 1).
Returns: Returns:
Variable: CTC greedy decode result. Variable: CTC greedy decode result. If all the sequences in result were
empty, the result LoDTensor will be [-1] with LoD [[0]] and dims [1, 1].
Examples: Examples:
.. code-block:: python .. code-block:: python
...@@ -15,7 +15,10 @@ ...@@ -15,7 +15,10 @@
import layers import layers
from framework import Variable from framework import Variable
__all__ = ['exponential_decay', 'natural_exp_decay', 'inverse_time_decay'] __all__ = [
'exponential_decay', 'natural_exp_decay', 'inverse_time_decay',
'polynomial_decay', 'piecewise_decay'
""" """
When training a model, it's often useful to decay the When training a model, it's often useful to decay the
learning rate during training process, this is called learning rate during training process, this is called
...@@ -101,7 +104,7 @@ def inverse_time_decay(learning_rate, ...@@ -101,7 +104,7 @@ def inverse_time_decay(learning_rate,
```python ```python
if staircase: if staircase:
decayed_learning_rate = learning_rate / (1 + decay_rate * floor(global_step / decay_step)) decayed_learning_rate = learning_rate / (1 + decay_rate * floor(global_step / decay_step))
else else:
decayed_learning_rate = learning_rate / (1 + decay_rate * global_step / decay_step) decayed_learning_rate = learning_rate / (1 + decay_rate * global_step / decay_step)
``` ```
Args: Args:
...@@ -123,3 +126,98 @@ def inverse_time_decay(learning_rate, ...@@ -123,3 +126,98 @@ def inverse_time_decay(learning_rate,
div_res = layers.floor(x=div_res) div_res = layers.floor(x=div_res)
return learning_rate / (1 + decay_rate * div_res) return learning_rate / (1 + decay_rate * div_res)
def polynomial_decay(learning_rate,
"""Applies polynomial decay to the initial learning rate.
if cycle:
decay_steps = decay_steps * ceil(global_step / decay_steps)
global_step = min(global_step, decay_steps)
decayed_learning_rate = (learning_rate - end_learning_rate) *
(1 - global_step / decay_steps) ^ power +
learning_rate: A scalar float32 value or a Variable. This
will be the initial learning rate during training
global_step: A Variable that record the training step.
decay_steps: A Python `int32` number.
end_learning_rate: A Python `float` number.
power: A Python `float` number
cycle: Boolean. If set true, decay the learning rate every decay_steps.
The decayed learning rate
if not isinstance(global_step, Variable):
raise ValueError("global_step is required for inverse_time_decay.")
if cycle:
div_res = layers.ceil(x=(global_step / decay_steps))
zero_var = layers.fill_constant(shape=[1], dtype='float32', value=0.0)
one_var = layers.fill_constant(shape=[1], dtype='float32', value=1.0)
with layers.Switch() as switch:
with switch.case(layers.equal(x=global_step, y=zero_var)):
layers.assign(input=one_var, output=div_res)
decay_steps = decay_steps * div_res
decay_steps_var = layers.fill_constant(
shape=[1], dtype='float32', value=float(decay_steps))
global_step = layers.elementwise_min(x=global_step, y=decay_steps_var)
return (learning_rate - end_learning_rate) * \
((1 - global_step / decay_steps) ** power) + end_learning_rate
def piecewise_decay(global_step, boundaries, values):
"""Applies piecewise decay to the initial learning rate.
boundaries = [10000, 20000]
values = [1.0, 0.5, 0.1]
if step < 10000:
learning_rate = 1.0
elif step >= 10000 and step < 20000:
learning_rate = 0.5
learning_rate = 0.1
if len(values) - len(boundaries) != 1:
raise ValueError("len(values) - len(boundaries) should be 1")
if not isinstance(global_step, Variable):
raise ValueError("global_step is required for piecewise_decay.")
lr = layers.create_global_var(
with layers.Switch() as switch:
for i in range(len(boundaries)):
boundary_val = layers.fill_constant(
shape=[1], dtype='float32', value=float(boundaries[i]))
value_var = layers.fill_constant(
shape=[1], dtype='float32', value=float(values[i]))
with switch.case(layers.less_than(global_step, boundary_val)):
layers.assign(value_var, lr)
last_value_var = layers.fill_constant(
shape=[1], dtype='float32', value=float(values[len(values) - 1]))
with switch.default():
layers.assign(last_value_var, lr)
return lr
...@@ -67,6 +67,7 @@ def conv_net(img, label): ...@@ -67,6 +67,7 @@ def conv_net(img, label):
pool_size=2, pool_size=2,
pool_stride=2, pool_stride=2,
act="relu") act="relu")
conv_pool_1 = fluid.layers.batch_norm(conv_pool_1)
conv_pool_2 = fluid.nets.simple_img_conv_pool( conv_pool_2 = fluid.nets.simple_img_conv_pool(
input=conv_pool_1, input=conv_pool_1,
filter_size=5, filter_size=5,
...@@ -158,6 +158,4 @@ for use_cuda in (False, True): ...@@ -158,6 +158,4 @@ for use_cuda in (False, True):
inject_test_method(use_cuda, is_sparse, parallel) inject_test_method(use_cuda, is_sparse, parallel)
if __name__ == '__main__': if __name__ == '__main__':
# FIXME(tonyyang-svail):
# This test always fail on MultiGPU CI
unittest.main() unittest.main()
...@@ -31,6 +31,8 @@ def CTCAlign(input, lod, blank, merge_repeated): ...@@ -31,6 +31,8 @@ def CTCAlign(input, lod, blank, merge_repeated):
result.append(token) result.append(token)
prev_token = token prev_token = token
result = np.array(result).reshape([len(result), 1]).astype("int32") result = np.array(result).reshape([len(result), 1]).astype("int32")
if len(result) == 0:
result = np.array([-1])
return result return result
...@@ -72,5 +74,14 @@ class TestCTCAlignOpCase1(TestCTCAlignOp): ...@@ -72,5 +74,14 @@ class TestCTCAlignOpCase1(TestCTCAlignOp):
[19, 1]).astype("int32") [19, 1]).astype("int32")
class TestCTCAlignOpCase2(TestCTCAlignOp):
def config(self):
self.op_type = "ctc_align"
self.input_lod = [[0, 4]]
self.blank = 0
self.merge_repeated = True
self.input = np.array([0, 0, 0, 0]).reshape([4, 1]).astype("int32")
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
import unittest import unittest
import math import math
import copy
import paddle.v2.fluid.framework as framework import paddle.v2.fluid.framework as framework
import paddle.v2.fluid as fluid import paddle.v2.fluid as fluid
import paddle.v2.fluid.layers as layers import paddle.v2.fluid.layers as layers
...@@ -54,21 +56,37 @@ def inverse_time_decay(learning_rate, ...@@ -54,21 +56,37 @@ def inverse_time_decay(learning_rate,
return learning_rate / (1 + decay_rate * temp) return learning_rate / (1 + decay_rate * temp)
class TestLearningRateDecay(unittest.TestCase): def polynomial_decay(learning_rate,
def check_decay(self, python_decay_fn, fluid_decay_fn, staircase): global_step,
init_lr = 1.0 decay_steps,
decay_steps = 5 end_learning_rate=0.0001,
decay_rate = 0.5 power=1.0,
if cycle:
div = math.ceil(global_step / float(decay_steps))
if div == 0:
div = 1
decay_steps = decay_steps * div
global_step = min(global_step, decay_steps)
return (learning_rate - end_learning_rate) * \
((1 - float(global_step) / float(decay_steps)) ** power) + end_learning_rate
def piecewise_decay(global_step, boundaries, values):
assert len(boundaries) + 1 == len(values)
for i in range(len(boundaries)):
if global_step < boundaries[i]:
return values[i]
return values[len(values) - 1]
class TestLearningRateDecay(unittest.TestCase):
def check_decay(self, python_decay_fn, fluid_decay_fn, kwargs):
global_step = layers.create_global_var( global_step = layers.create_global_var(
shape=[1], value=0.0, dtype='float32', persistable=True) shape=[1], value=0.0, dtype='float32', persistable=True)
decayed_lr = fluid_decay_fn( decayed_lr = fluid_decay_fn(global_step=global_step, **kwargs)
layers.increment(global_step, 1.0) layers.increment(global_step, 1.0)
place = fluid.CPUPlace() place = fluid.CPUPlace()
...@@ -79,31 +97,52 @@ class TestLearningRateDecay(unittest.TestCase): ...@@ -79,31 +97,52 @@ class TestLearningRateDecay(unittest.TestCase):
step_val, lr_val = exe.run(fluid.default_main_program(), step_val, lr_val = exe.run(fluid.default_main_program(),
feed=[], feed=[],
fetch_list=[global_step, decayed_lr]) fetch_list=[global_step, decayed_lr])
python_decayed_lr = python_decay_fn( python_decayed_lr = python_decay_fn(global_step=step, **kwargs)
self.assertAlmostEqual(python_decayed_lr, lr_val[0]) self.assertAlmostEqual(python_decayed_lr, lr_val[0])
def test_decay(self): def test_decay(self):
common_kwargs_true = {
"learning_rate": 1.0,
"decay_steps": 5,
"decay_rate": 0.5,
"staircase": True
common_kwargs_false = copy.deepcopy(common_kwargs_true)
common_kwargs_false["staircase"] = False
decay_fns = [ decay_fns = [
(exponential_decay, lr_decay.exponential_decay, True), (exponential_decay, lr_decay.exponential_decay, common_kwargs_true),
(exponential_decay, lr_decay.exponential_decay, False), (exponential_decay, lr_decay.exponential_decay,
(natural_exp_decay, lr_decay.natural_exp_decay, True), common_kwargs_false),
(natural_exp_decay, lr_decay.natural_exp_decay, False), (natural_exp_decay, lr_decay.natural_exp_decay, common_kwargs_true),
(inverse_time_decay, lr_decay.inverse_time_decay, True), (natural_exp_decay, lr_decay.natural_exp_decay,
(inverse_time_decay, lr_decay.inverse_time_decay, False), common_kwargs_false),
(inverse_time_decay, lr_decay.inverse_time_decay,
(inverse_time_decay, lr_decay.inverse_time_decay,
(polynomial_decay, lr_decay.polynomial_decay, {
"learning_rate": 1.0,
"decay_steps": 5,
"cycle": True
(polynomial_decay, lr_decay.polynomial_decay, {
"learning_rate": 1.0,
"decay_steps": 5,
"cycle": False
(piecewise_decay, lr_decay.piecewise_decay, {
"boundaries": [3, 6, 9],
"values": [0.1, 0.2, 0.3, 0.4]
] ]
for py_decay_fn, fluid_decay_fn, staircase in decay_fns: for py_decay_fn, fluid_decay_fn, kwargs in decay_fns:
print("decay_fn=" + str(py_decay_fn) + " staircase=" + str( print("decay_fn=" + py_decay_fn.__name__ + " kwargs=" + str(kwargs))
main_program = framework.Program() main_program = framework.Program()
startup_program = framework.Program() startup_program = framework.Program()
with framework.program_guard(main_program, startup_program): with framework.program_guard(main_program, startup_program):
self.check_decay(py_decay_fn, fluid_decay_fn, staircase) self.check_decay(py_decay_fn, fluid_decay_fn, kwargs)
if __name__ == '__main__': if __name__ == '__main__':
...@@ -198,7 +198,4 @@ class ParallelOpTestMultipleInput(BaseParallelForTest): ...@@ -198,7 +198,4 @@ class ParallelOpTestMultipleInput(BaseParallelForTest):
if __name__ == '__main__': if __name__ == '__main__':
# FIXME(tonyyang-svail):
# This test always fail on MultiGPU CI
unittest.main() unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册