提交 ce2f0963 编写于 作者: Y Yu Yang

Merge branch 'cpp_parallel_executor' of github.com:reyoung/Paddle into cpp_parallel_executor

...@@ -34,7 +34,7 @@ SET(MKLML_DOWNLOAD_DIR "${MKLML_SOURCE_DIR}/src/${MKLML_PROJECT}") ...@@ -34,7 +34,7 @@ SET(MKLML_DOWNLOAD_DIR "${MKLML_SOURCE_DIR}/src/${MKLML_PROJECT}")
SET(MKLML_DST_DIR "mklml") SET(MKLML_DST_DIR "mklml")
SET(MKLML_INSTALL_ROOT "${THIRD_PARTY_PATH}/install") SET(MKLML_INSTALL_ROOT "${THIRD_PARTY_PATH}/install")
SET(MKLML_INSTALL_DIR ${MKLML_INSTALL_ROOT}/${MKLML_DST_DIR}) SET(MKLML_INSTALL_DIR ${MKLML_INSTALL_ROOT}/${MKLML_DST_DIR})
SET(MKLML_ROOT ${MKLML_INSTALL_DIR}/${MKLML_VER}) SET(MKLML_ROOT ${MKLML_INSTALL_DIR})
SET(MKLML_INC_DIR ${MKLML_ROOT}/include) SET(MKLML_INC_DIR ${MKLML_ROOT}/include)
SET(MKLML_LIB_DIR ${MKLML_ROOT}/lib) SET(MKLML_LIB_DIR ${MKLML_ROOT}/lib)
SET(MKLML_LIB ${MKLML_LIB_DIR}/libmklml_intel.so) SET(MKLML_LIB ${MKLML_LIB_DIR}/libmklml_intel.so)
...@@ -46,7 +46,7 @@ INCLUDE_DIRECTORIES(${MKLML_INC_DIR}) ...@@ -46,7 +46,7 @@ INCLUDE_DIRECTORIES(${MKLML_INC_DIR})
FILE(WRITE ${MKLML_DOWNLOAD_DIR}/CMakeLists.txt FILE(WRITE ${MKLML_DOWNLOAD_DIR}/CMakeLists.txt
"PROJECT(MKLML)\n" "PROJECT(MKLML)\n"
"cmake_minimum_required(VERSION 3.0)\n" "cmake_minimum_required(VERSION 3.0)\n"
"install(DIRECTORY ${MKLML_VER}\n" "install(DIRECTORY ${MKLML_VER}/include ${MKLML_VER}/lib \n"
" DESTINATION ${MKLML_DST_DIR})\n") " DESTINATION ${MKLML_DST_DIR})\n")
ExternalProject_Add( ExternalProject_Add(
......
...@@ -69,6 +69,12 @@ if(NOT CBLAS_FOUND) ...@@ -69,6 +69,12 @@ if(NOT CBLAS_FOUND)
SRCS ${CBLAS_INSTALL_DIR}/lib ${CBLAS_INSTALL_DIR}/include SRCS ${CBLAS_INSTALL_DIR}/lib ${CBLAS_INSTALL_DIR}/include
DSTS ${dst_dir} ${dst_dir} DSTS ${dst_dir} ${dst_dir}
) )
elseif (WITH_MKLML)
set(dst_dir "${CMAKE_INSTALL_PREFIX}/third_party/install/mklml")
copy(mklml_lib
SRCS ${MKLML_LIB} ${MKLML_IOMP_LIB} ${MKLML_INC_DIR}
DSTS ${dst_dir}/lib ${dst_dir}/lib ${dst_dir}
)
endif() endif()
# paddle fluid module # paddle fluid module
......
...@@ -8,7 +8,7 @@ The executor is a very naive interpreter. It runs operators one by one. We can u ...@@ -8,7 +8,7 @@ The executor is a very naive interpreter. It runs operators one by one. We can u
We want a `ProgramDesc` can be run on different nodes. It is better not to contain device information in `ProgramDesc`. However, we can write a high-performance interpreter, which can hold an alternative intermediate representation of `ProgramDesc`, to take full usage of Multi-GPUs. We want a `ProgramDesc` can be run on different nodes. It is better not to contain device information in `ProgramDesc`. However, we can write a high-performance interpreter, which can hold an alternative intermediate representation of `ProgramDesc`, to take full usage of Multi-GPUs.
ParallelExecutor is an interpreter of `ProgramDesc` which will [out-of-order execute](Out-of-order execution) `Program` in data parallelism mode and maximise the utility of Multi-GPUs. ParallelExecutor is an interpreter of `ProgramDesc` which will [out-of-order execute](https://en.wikipedia.org/wiki/Out-of-order_execution) `Program` in data parallelism mode and maximise the utility of Multi-GPUs.
## Overview of MultiGPUs logic ## Overview of MultiGPUs logic
......
# go_op Design
## Introduction
The **go_op** allows user's of PaddlePaddle to run program blocks on a detached
thread. It works in conjuction with CSP operators (channel_send,
channel_receive, channel_open, channel_close, and select) to allow users to
concurrently process data and communicate easily between different threads.
## How to use it
```
channel = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR)
with fluid.Go():
# Send a tensor of value 99 to "channel" on a detached thread
tensor = fill_constant(shape=[1], dtype='int', value=99)
tensor.stop_gradient = True
fluid.channel_send(channel, tensor)
# Receive sent tensor from "channel" on the main thread
result = fill_constant(shape=[1], dtype='int', value=-1)
fluid.channel_recv(ch, result)
```
The go operator can be accessed by using the fluid.Go() control flow. This
will create a new sub block, where the user can add additional operators
to be ran on the thread.
**Note:** Since back propegation is currently not support in the go_op, users
should ensure that operators in the go block does not require gradient
calculations.
## How it Works
Similar to other control blocks, go_op will create a sub block and add it
as a child to the current block. Operators and variables defined in this
block will be added to the go sub_block.
In addition, the go operator will create a new child scope whose parent is
the global scope. Please refer to [block captures](#block-captures) for more
information.
When Paddle executor runs go_op, go_op will take the sub_block and pass it to
the executor.run method (along with a newly created local scope) on a detached
thread.
An example of the generated program description is shown below. Take note of
the **go_op** in particular. It is added as an operator in the current
block (in this example, block0). The **go_op** contains a `sub_block`
attribute, which points to the id of the block that will be executed in a
detached thread.
```
blocks {
idx: 0
parent_idx: -1
vars {
name: "return_value"
type {
type: LOD_TENSOR
lod_tensor {
tensor {
data_type: INT64
}
}
}
}
vars {
name: "status_recv"
type {
type: LOD_TENSOR
lod_tensor {
tensor {
data_type: BOOL
}
}
}
}
...
ops {
outputs {
parameter: "Out"
arguments: "channel"
}
type: "channel_create"
attrs {
name: "data_type"
type: INT
i: 7
}
attrs {
name: "capacity"
type: INT
i: 0
}
}
ops {
inputs {
parameter: "X"
arguments: "channel"
}
type: "go"
attrs {
name: "sub_block"
type: BLOCK
block_idx: 1
}
}
ops {
inputs {
parameter: "Channel"
arguments: "channel"
}
outputs {
parameter: "Out"
arguments: "return_value"
}
outputs {
parameter: "Status"
arguments: "status_recv"
}
type: "channel_recv"
}
...
}
blocks {
idx: 1
parent_idx: 0
vars {
name: "status"
type {
type: LOD_TENSOR
lod_tensor {
tensor {
data_type: BOOL
}
}
}
}
...
ops {
outputs {
parameter: "Out"
arguments: "fill_constant_1.tmp_0"
}
type: "fill_constant"
attrs {
name: "force_cpu"
type: BOOLEAN
b: false
}
attrs {
name: "value"
type: FLOAT
f: 99.0
}
attrs {
name: "shape"
type: INTS
ints: 1
}
attrs {
name: "dtype"
type: INT
i: 3
}
}
ops {
inputs {
parameter: "Channel"
arguments: "channel"
}
inputs {
parameter: "X"
arguments: "fill_constant_1.tmp_0"
}
outputs {
parameter: "Status"
arguments: "status"
}
type: "channel_send"
attrs {
name: "copy"
type: BOOLEAN
b: false
}
}
```
## Current Limitations
#### <a name="block-captures"></a>Scopes and block captures:
Paddle utilizes [scopes](./../concepts/scope.md) to store variables used in a
block. When a block is executed, a new local scope is created from the parent
scope (ie: scope derived from the parent block) and associated with the new
child block. After the block finishes executing, then the local scope and
all associated variables in the scope is deleted.
This works well in a single threaded scenario, however with introduction of
go_op, a child block may continue to execute even after the parent block has
exited. If the go_op tries to access variables located in the parent block's
scope, it may receive a segmentation fault because the parent scope may have
been deleted.
We need to implement block closures in order to prevent access to parent
scope variables from causing a segmentation fault. As a temporary workaround,
please ensure that all variables accessed in the go block is not destructed
before it is being accessed. Currently, the go_op will explicitly enforce
this requirement and raise an exception if a variable could not be found in
the scope.
Please refer to [Closure issue](https://github.com/PaddlePaddle/Paddle/issues/8502)
for more details.
#### Green Threads
Golang utilizes `green threads`, which is a mechnism for the runtime library to
manage multiple threads (instead of natively by the OS). Green threads usually
allows for faster thread creation and switching, as there is less overhead
when spawning these threads. For the first version of CSP, we only support
OS threads.
#### Backward Propegation:
go_op currently does not support backwards propagation. Please use go_op with
non training operators.
Development Development
------------ ------------
PaddlePaddle adheres to the following three sections of code and document specifications.
PaddlePaddle uses git for version control and Docker is used for building and testing environment. The code includes Cuda, C++, Python, Shell and other programming languages,which comply with Google C++ Style, Pep-8, and the code base includes style checking by an automatic inspection tool. Code comments need to follow the Doxygen specification. The code that does not meet the style requirements will fail to compile. We provide the following guidelines for the use of Git, build tests and code development.
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
contribute_to_paddle_en.md contribute_to_paddle_en.md
PaddlePaddle is well documented in English and Chinese. We recommend using the English version of the documents and problem description. The design documents focus on problem descriptions, backgrounds, and are followed by solutions. As documents are generated by Sphinx, code comments should comply with the Sphinx documentation standard. We recommend to use the paddlepaddle.org tool to compile and generate and preview documents locally. Please refer to:
.. toctree::
:maxdepth: 1
write_docs_en.rst write_docs_en.rst
PaddlePaddle V2 defines new operations by adding new Layers. You can implement various complex layers by combining basic APIs to satisfy most applications. If you want to customize layer, please refer to the following, and welcome to propose patch.
.. toctree::
:maxdepth: 1
new_layer_en.rst new_layer_en.rst
...@@ -2,4 +2,15 @@ ...@@ -2,4 +2,15 @@
Cluster Training and Prediction Cluster Training and Prediction
############################### ###############################
TBD .. contents::
1. Network connection errors in the log during multi-node cluster training
------------------------------------------------
There are maybe some errors in the log belonging to network connection problem during multi-node cluster training, for example, :code:`Connection reset by peer`.
This kind of error is usually caused by the abnormal exit of a training process in some node, and the other nodes cannot connect with this node any longer. Steps to troubleshoot the problem are as follows:
* Find the first error in the :code:`train.log`, :code:`server.log`, check whether other fault casued the problem, such as FPE, lacking of memory or disk.
* If the first error in server.log says "Address already used", this may be caused by the port conflict of the non-exclusive execution. Connect the sys-admin to check if the current MPI cluster supports jobs submitted with parameter :code:`resource=full`. If the current MPI cluster does not support this parameter, change the server port and try agian.
* If the current MPI cluster does not support exclusive pattern which allows a process to occupy the whole node, ask the administrator to replace or update the this cluster.
...@@ -34,7 +34,7 @@ class Channel { ...@@ -34,7 +34,7 @@ class Channel {
public: public:
virtual bool CanSend() = 0; virtual bool CanSend() = 0;
virtual bool CanReceive() = 0; virtual bool CanReceive() = 0;
virtual bool Send(T*) = 0; virtual void Send(T*) = 0;
virtual bool Receive(T*) = 0; virtual bool Receive(T*) = 0;
virtual size_t Cap() = 0; virtual size_t Cap() = 0;
virtual void Lock() = 0; virtual void Lock() = 0;
...@@ -84,69 +84,81 @@ class ChannelHolder { ...@@ -84,69 +84,81 @@ class ChannelHolder {
} }
template <typename T> template <typename T>
bool Send(T* data) { void Send(T* data) {
if (!IsInitialized()) return false; PADDLE_ENFORCE_EQ(IsInitialized(), true,
PADDLE_ENFORCE_EQ(holder_->Type(), std::type_index(typeid(T))); "The Channel hasn't been initialized");
PADDLE_ENFORCE_EQ(
holder_->Type(), std::type_index(typeid(T)),
"Channel type is not same as the type of the data being sent");
// Static cast should be safe because we have ensured that types are same // Static cast should be safe because we have ensured that types are same
Channel<T>* channel = static_cast<Channel<T>*>(holder_->Ptr()); Channel<T>* channel = static_cast<Channel<T>*>(holder_->Ptr());
return channel != nullptr ? channel->Send(data) : false; PADDLE_ENFORCE_EQ(channel != nullptr, true, "Channel should not be null.");
channel->Send(data);
} }
template <typename T> template <typename T>
bool Receive(T* data) { bool Receive(T* data) {
if (!IsInitialized()) return false; PADDLE_ENFORCE_EQ(IsInitialized(), true,
PADDLE_ENFORCE_EQ(holder_->Type(), std::type_index(typeid(T))); "The Channel hasn't been initialized");
PADDLE_ENFORCE_EQ(
holder_->Type(), std::type_index(typeid(T)),
"Channel type is not same as the type of the data being sent");
Channel<T>* channel = static_cast<Channel<T>*>(holder_->Ptr()); Channel<T>* channel = static_cast<Channel<T>*>(holder_->Ptr());
return channel != nullptr ? channel->Receive(data) : false; PADDLE_ENFORCE_EQ(channel != nullptr, true, "Channel should not be null.");
return channel->Receive(data);
} }
bool IsClosed() { bool IsClosed() {
if (IsInitialized()) { PADDLE_ENFORCE_EQ(IsInitialized(), true,
return holder_->IsClosed(); "The Channel hasn't been initialized");
} return holder_->IsClosed();
return false;
} }
bool CanSend() { bool CanSend() {
if (IsInitialized()) { PADDLE_ENFORCE_EQ(IsInitialized(), true,
return holder_->CanSend(); "The Channel hasn't been initialized");
} return holder_->CanSend();
return false;
} }
bool CanReceive() { bool CanReceive() {
if (IsInitialized()) { PADDLE_ENFORCE_EQ(IsInitialized(), true,
return holder_->CanReceive(); "The Channel hasn't been initialized");
} return holder_->CanReceive();
return false;
} }
void close() { void close() {
if (IsInitialized()) holder_->Close(); PADDLE_ENFORCE_EQ(IsInitialized(), true,
"The Channel hasn't been initialized");
holder_->Close();
} }
size_t Cap() { size_t Cap() {
if (IsInitialized()) return holder_->Cap(); PADDLE_ENFORCE_EQ(IsInitialized(), true,
return -1; "The Channel hasn't been initialized");
return holder_->Cap();
} }
void Lock() { void Lock() {
if (IsInitialized()) holder_->Lock(); PADDLE_ENFORCE_EQ(IsInitialized(), true,
"The Channel hasn't been initialized");
holder_->Lock();
} }
void Unlock() { void Unlock() {
if (IsInitialized()) holder_->Unlock(); PADDLE_ENFORCE_EQ(IsInitialized(), true,
"The Channel hasn't been initialized");
holder_->Unlock();
} }
template <typename T> template <typename T>
void AddToSendQ(const void* referrer, T* data, void AddToSendQ(const void* referrer, T* data,
std::shared_ptr<std::condition_variable_any> cond, std::shared_ptr<std::condition_variable_any> cond,
std::function<bool(ChannelAction)> cb) { std::function<bool(ChannelAction)> cb) {
if (IsInitialized()) { PADDLE_ENFORCE_EQ(IsInitialized(), true,
Channel<T>* channel = static_cast<Channel<T>*>(holder_->Ptr()); "The Channel hasn't been initialized");
if (channel != nullptr) { Channel<T>* channel = static_cast<Channel<T>*>(holder_->Ptr());
channel->AddToSendQ(referrer, data, cond, cb); if (channel != nullptr) {
} channel->AddToSendQ(referrer, data, cond, cb);
} }
} }
...@@ -154,26 +166,31 @@ class ChannelHolder { ...@@ -154,26 +166,31 @@ class ChannelHolder {
void AddToReceiveQ(const void* referrer, T* data, void AddToReceiveQ(const void* referrer, T* data,
std::shared_ptr<std::condition_variable_any> cond, std::shared_ptr<std::condition_variable_any> cond,
std::function<bool(ChannelAction)> cb) { std::function<bool(ChannelAction)> cb) {
if (IsInitialized()) { PADDLE_ENFORCE_EQ(IsInitialized(), true,
Channel<T>* channel = static_cast<Channel<T>*>(holder_->Ptr()); "The Channel hasn't been initialized");
if (channel != nullptr) { Channel<T>* channel = static_cast<Channel<T>*>(holder_->Ptr());
channel->AddToReceiveQ(referrer, data, cond, cb); if (channel != nullptr) {
} channel->AddToReceiveQ(referrer, data, cond, cb);
} }
} }
void RemoveFromSendQ(const void* referrer) { void RemoveFromSendQ(const void* referrer) {
if (IsInitialized()) holder_->RemoveFromSendQ(referrer); PADDLE_ENFORCE_EQ(IsInitialized(), true,
"The Channel hasn't been initialized");
holder_->RemoveFromSendQ(referrer);
} }
void RemoveFromReceiveQ(const void* referrer) { void RemoveFromReceiveQ(const void* referrer) {
if (IsInitialized()) holder_->RemoveFromReceiveQ(referrer); PADDLE_ENFORCE_EQ(IsInitialized(), true,
"The Channel hasn't been initialized");
holder_->RemoveFromReceiveQ(referrer);
} }
inline bool IsInitialized() const { return holder_ != nullptr; } inline bool IsInitialized() const { return holder_ != nullptr; }
inline const std::type_index Type() { inline const std::type_index Type() {
PADDLE_ENFORCE_EQ(IsInitialized(), true); PADDLE_ENFORCE_EQ(IsInitialized(), true,
"The Channel hasn't been initialized");
return holder_->Type(); return holder_->Type();
} }
......
...@@ -31,7 +31,7 @@ class ChannelImpl : public paddle::framework::Channel<T> { ...@@ -31,7 +31,7 @@ class ChannelImpl : public paddle::framework::Channel<T> {
public: public:
virtual bool CanSend(); virtual bool CanSend();
virtual bool CanReceive(); virtual bool CanReceive();
virtual bool Send(T *); virtual void Send(T *);
virtual bool Receive(T *); virtual bool Receive(T *);
virtual size_t Cap() { return cap_; } virtual size_t Cap() { return cap_; }
virtual void Lock(); virtual void Lock();
...@@ -76,10 +76,9 @@ class ChannelImpl : public paddle::framework::Channel<T> { ...@@ -76,10 +76,9 @@ class ChannelImpl : public paddle::framework::Channel<T> {
} }
}; };
bool send_return(bool value) { void send_return() {
send_ctr--; send_ctr--;
destructor_cond_.notify_all(); destructor_cond_.notify_all();
return value;
} }
bool recv_return(bool value) { bool recv_return(bool value) {
...@@ -118,15 +117,15 @@ bool ChannelImpl<T>::CanReceive() { ...@@ -118,15 +117,15 @@ bool ChannelImpl<T>::CanReceive() {
} }
template <typename T> template <typename T>
bool ChannelImpl<T>::Send(T *item) { void ChannelImpl<T>::Send(T *item) {
send_ctr++; send_ctr++;
std::unique_lock<std::recursive_mutex> lock{mu_}; std::unique_lock<std::recursive_mutex> lock{mu_};
// If channel is closed, do nothing // If channel is closed, throw exception
if (closed_) { if (closed_) {
lock.unlock(); lock.unlock();
// TODO(abhinavarora) Should panic on closed channel send_return();
return send_return(false); PADDLE_THROW("Cannot send on closed channel");
} }
// If there is a receiver, directly pass the value we want // If there is a receiver, directly pass the value we want
...@@ -143,7 +142,7 @@ bool ChannelImpl<T>::Send(T *item) { ...@@ -143,7 +142,7 @@ bool ChannelImpl<T>::Send(T *item) {
if (m->callback != nullptr) do_send = m->callback(ChannelAction::SEND); if (m->callback != nullptr) do_send = m->callback(ChannelAction::SEND);
if (do_send) if (do_send)
*(m->data) = std::move(*item); *(m->data) = std::move(*item);
else else {
// We cannot do the data transfer because // We cannot do the data transfer because
// this QueueMessage was added by Select // this QueueMessage was added by Select
// and some other case was executed. // and some other case was executed.
...@@ -151,12 +150,17 @@ bool ChannelImpl<T>::Send(T *item) { ...@@ -151,12 +150,17 @@ bool ChannelImpl<T>::Send(T *item) {
// We do not care about notifying other // We do not care about notifying other
// because they would have been notified // because they would have been notified
// by the executed select case. // by the executed select case.
return send_return(Send(item)); lock.unlock();
Send(item);
send_return();
return;
}
// Wake up the blocked process and unlock // Wake up the blocked process and unlock
m->Notify(); m->Notify();
lock.unlock(); lock.unlock();
return send_return(true); send_return();
return;
} }
// Unbuffered channel will always bypass this // Unbuffered channel will always bypass this
...@@ -167,7 +171,8 @@ bool ChannelImpl<T>::Send(T *item) { ...@@ -167,7 +171,8 @@ bool ChannelImpl<T>::Send(T *item) {
buf_.push_back(std::move(*item)); buf_.push_back(std::move(*item));
// Release lock and return true // Release lock and return true
lock.unlock(); lock.unlock();
return send_return(true); send_return();
return;
} }
// Block on channel, because some receiver will complete // Block on channel, because some receiver will complete
...@@ -175,8 +180,12 @@ bool ChannelImpl<T>::Send(T *item) { ...@@ -175,8 +180,12 @@ bool ChannelImpl<T>::Send(T *item) {
auto m = std::make_shared<QueueMessage>(item); auto m = std::make_shared<QueueMessage>(item);
sendq.push_back(m); sendq.push_back(m);
m->Wait(lock); m->Wait(lock);
// TODO(abhinavarora) Should panic on closed channel if (m->chan_closed) {
return send_return(!m->chan_closed); lock.unlock();
send_return();
PADDLE_THROW("Cannot send on closed channel");
}
send_return();
} }
template <typename T> template <typename T>
......
...@@ -16,7 +16,6 @@ limitations under the License. */ ...@@ -16,7 +16,6 @@ limitations under the License. */
#include <chrono> #include <chrono>
#include <thread> #include <thread>
#include "gtest/gtest.h" #include "gtest/gtest.h"
using paddle::framework::Channel; using paddle::framework::Channel;
...@@ -41,7 +40,7 @@ void RecevingOrderEqualToSendingOrder(Channel<int> *ch) { ...@@ -41,7 +40,7 @@ void RecevingOrderEqualToSendingOrder(Channel<int> *ch) {
unsigned sum_send = 0; unsigned sum_send = 0;
std::thread t([&]() { std::thread t([&]() {
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
EXPECT_EQ(ch->Send(&i), true); ch->Send(&i);
sum_send += i; sum_send += i;
} }
}); });
...@@ -61,7 +60,7 @@ TEST(Channel, SufficientBufferSizeDoesntBlock) { ...@@ -61,7 +60,7 @@ TEST(Channel, SufficientBufferSizeDoesntBlock) {
const size_t buffer_size = 10; const size_t buffer_size = 10;
auto ch = MakeChannel<size_t>(buffer_size); auto ch = MakeChannel<size_t>(buffer_size);
for (size_t i = 0; i < buffer_size; ++i) { for (size_t i = 0; i < buffer_size; ++i) {
EXPECT_EQ(ch->Send(&i), true); // should not block ch->Send(&i);
} }
size_t out; size_t out;
...@@ -82,7 +81,7 @@ void SendReceiveWithACloseChannelShouldPanic(Channel<size_t> *ch) { ...@@ -82,7 +81,7 @@ void SendReceiveWithACloseChannelShouldPanic(Channel<size_t> *ch) {
const size_t data = 5; const size_t data = 5;
std::thread send_thread{[&]() { std::thread send_thread{[&]() {
size_t i = data; size_t i = data;
EXPECT_EQ(ch->Send(&i), true); // should not block ch->Send(&i); // should not block
}}; }};
std::thread recv_thread{[&]() { std::thread recv_thread{[&]() {
...@@ -94,12 +93,18 @@ void SendReceiveWithACloseChannelShouldPanic(Channel<size_t> *ch) { ...@@ -94,12 +93,18 @@ void SendReceiveWithACloseChannelShouldPanic(Channel<size_t> *ch) {
send_thread.join(); send_thread.join();
recv_thread.join(); recv_thread.join();
// After closing send should return false. Receive should // After closing send should panic. Receive should
// also return false as there is no data in queue. // also false as there is no data in queue.
CloseChannel(ch); CloseChannel(ch);
send_thread = std::thread{[&]() { send_thread = std::thread{[&]() {
size_t i = data; size_t i = data;
EXPECT_EQ(ch->Send(&i), false); // should return false bool is_exception = false;
try {
ch->Send(&i);
} catch (paddle::platform::EnforceNotMet e) {
is_exception = true;
}
EXPECT_EQ(is_exception, true);
}}; }};
recv_thread = std::thread{[&]() { recv_thread = std::thread{[&]() {
size_t i; size_t i;
...@@ -129,7 +134,7 @@ TEST(Channel, ReceiveFromBufferedChannelReturnResidualValuesTest) { ...@@ -129,7 +134,7 @@ TEST(Channel, ReceiveFromBufferedChannelReturnResidualValuesTest) {
auto ch = MakeChannel<size_t>(buffer_size); auto ch = MakeChannel<size_t>(buffer_size);
for (size_t i = 0; i < buffer_size; ++i) { for (size_t i = 0; i < buffer_size; ++i) {
EXPECT_EQ(ch->Send(&i), true); // sending should not block ch->Send(&i); // sending should not block
} }
size_t out; size_t out;
...@@ -160,9 +165,16 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) { ...@@ -160,9 +165,16 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) {
// Try to write more than buffer size. // Try to write more than buffer size.
for (size_t i = 0; i < 2 * buffer_size; ++i) { for (size_t i = 0; i < 2 * buffer_size; ++i) {
if (i < buffer_size) if (i < buffer_size)
EXPECT_EQ(ch->Send(&i), true); // should block after 10 iterations ch->Send(&i); // should block after 10 iterations
else else {
EXPECT_EQ(ch->Send(&i), false); bool is_exception = false;
try {
ch->Send(&i);
} catch (paddle::platform::EnforceNotMet e) {
is_exception = true;
}
EXPECT_EQ(is_exception, true);
}
} }
}); });
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec
...@@ -231,7 +243,13 @@ void ChannelCloseUnblocksSendersTest(Channel<int> *ch, bool isBuffered) { ...@@ -231,7 +243,13 @@ void ChannelCloseUnblocksSendersTest(Channel<int> *ch, bool isBuffered) {
t[i] = std::thread( t[i] = std::thread(
[&](bool *ended, bool *success) { [&](bool *ended, bool *success) {
int data = 10; int data = 10;
*success = ch->Send(&data); bool is_exception = false;
try {
ch->Send(&data);
} catch (paddle::platform::EnforceNotMet e) {
is_exception = true;
}
*success = !is_exception;
*ended = true; *ended = true;
}, },
&thread_ended[i], &send_success[i]); &thread_ended[i], &send_success[i]);
...@@ -316,8 +334,11 @@ TEST(Channel, UnbufferedLessReceiveMoreSendTest) { ...@@ -316,8 +334,11 @@ TEST(Channel, UnbufferedLessReceiveMoreSendTest) {
// Try to send more number of times // Try to send more number of times
// than receivers // than receivers
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
ch->Send(&i); try {
sum_send += i; ch->Send(&i);
sum_send += i;
} catch (paddle::platform::EnforceNotMet e) {
}
} }
}); });
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
...@@ -382,7 +403,13 @@ void ChannelDestroyUnblockSenders(Channel<int> *ch, bool isBuffered) { ...@@ -382,7 +403,13 @@ void ChannelDestroyUnblockSenders(Channel<int> *ch, bool isBuffered) {
t[i] = std::thread( t[i] = std::thread(
[&](bool *ended, bool *success) { [&](bool *ended, bool *success) {
int data = 10; int data = 10;
*success = ch->Send(&data); bool is_exception = false;
try {
ch->Send(&data);
} catch (paddle::platform::EnforceNotMet e) {
is_exception = true;
}
*success = !is_exception;
*ended = true; *ended = true;
}, },
&thread_ended[i], &send_success[i]); &thread_ended[i], &send_success[i]);
...@@ -508,7 +535,7 @@ void ChannelHolderSendReceive(ChannelHolder *ch) { ...@@ -508,7 +535,7 @@ void ChannelHolderSendReceive(ChannelHolder *ch) {
unsigned sum_send = 0; unsigned sum_send = 0;
std::thread t([&]() { std::thread t([&]() {
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
EXPECT_EQ(ch->Send(&i), true); ch->Send(&i);
sum_send += i; sum_send += i;
} }
}); });
...@@ -541,8 +568,22 @@ TEST(ChannelHolder, ChannelUninitializedTest) { ...@@ -541,8 +568,22 @@ TEST(ChannelHolder, ChannelUninitializedTest) {
ChannelHolder *ch = new ChannelHolder(); ChannelHolder *ch = new ChannelHolder();
EXPECT_EQ(ch->IsInitialized(), false); EXPECT_EQ(ch->IsInitialized(), false);
int i = 10; int i = 10;
EXPECT_EQ(ch->Send(&i), false); bool send_exception = false;
EXPECT_EQ(ch->Receive(&i), false); try {
ch->Send(&i);
} catch (paddle::platform::EnforceNotMet e) {
send_exception = true;
}
EXPECT_EQ(send_exception, true);
bool recv_exception = false;
try {
ch->Receive(&i);
} catch (paddle::platform::EnforceNotMet e) {
recv_exception = true;
}
EXPECT_EQ(recv_exception, true);
bool is_exception = false; bool is_exception = false;
try { try {
ch->Type(); ch->Type();
...@@ -669,7 +710,13 @@ void ChannelHolderCloseUnblocksSendersTest(ChannelHolder *ch, bool isBuffered) { ...@@ -669,7 +710,13 @@ void ChannelHolderCloseUnblocksSendersTest(ChannelHolder *ch, bool isBuffered) {
t[i] = std::thread( t[i] = std::thread(
[&](bool *ended, bool *success) { [&](bool *ended, bool *success) {
int data = 10; int data = 10;
*success = ch->Send(&data); bool is_exception = false;
try {
ch->Send(&data);
} catch (paddle::platform::EnforceNotMet e) {
is_exception = true;
}
*success = !is_exception;
*ended = true; *ended = true;
}, },
&thread_ended[i], &send_success[i]); &thread_ended[i], &send_success[i]);
...@@ -760,7 +807,13 @@ void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) { ...@@ -760,7 +807,13 @@ void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) {
t[i] = std::thread( t[i] = std::thread(
[&](bool *ended, bool *success) { [&](bool *ended, bool *success) {
int data = 10; int data = 10;
*success = ch->Send(&data); bool is_exception = false;
try {
ch->Send(&data);
} catch (paddle::platform::EnforceNotMet e) {
is_exception = true;
}
*success = !is_exception;
*ended = true; *ended = true;
}, },
&thread_ended[i], &send_success[i]); &thread_ended[i], &send_success[i]);
......
...@@ -45,10 +45,11 @@ class Tensor { ...@@ -45,10 +45,11 @@ class Tensor {
friend struct EigenVector; friend struct EigenVector;
public: public:
Tensor() : offset_(0) {} Tensor() : offset_(0), is_pinned_(false) {}
/*! Constructor with place should only be used in pybind. */ /*! Constructor with place should only be used in pybind. */
explicit Tensor(const platform::Place& place) : offset_(0) { explicit Tensor(const platform::Place& place)
: offset_(0), is_pinned_(false) {
holder_->set_place(place); holder_->set_place(place);
} }
...@@ -69,11 +70,12 @@ class Tensor { ...@@ -69,11 +70,12 @@ class Tensor {
* @note If not exist, then allocation. * @note If not exist, then allocation.
*/ */
template <typename T> template <typename T>
inline T* mutable_data(platform::Place place); inline T* mutable_data(platform::Place place, bool is_pinned = false);
inline void* mutable_data(platform::Place place, std::type_index type); inline void* mutable_data(platform::Place place, std::type_index type,
bool is_pinned = false);
inline void* mutable_data(platform::Place place); inline void* mutable_data(platform::Place place, bool is_pinned = false);
/** /**
* @brief Return a pointer to mutable memory block. * @brief Return a pointer to mutable memory block.
...@@ -84,7 +86,8 @@ class Tensor { ...@@ -84,7 +86,8 @@ class Tensor {
* @note If not exist, then allocation. * @note If not exist, then allocation.
*/ */
template <typename T> template <typename T>
inline T* mutable_data(DDim dims, platform::Place place); inline T* mutable_data(DDim dims, platform::Place place,
bool is_pinned = false);
/*! Return the dimensions of the memory block. */ /*! Return the dimensions of the memory block. */
inline const DDim& dims() const; inline const DDim& dims() const;
...@@ -92,6 +95,9 @@ class Tensor { ...@@ -92,6 +95,9 @@ class Tensor {
/*! Return the numel of the memory block. */ /*! Return the numel of the memory block. */
inline int64_t numel() const; inline int64_t numel() const;
/*! Return the numel of the memory block. */
inline bool isPinned() const;
/*! Resize the dimensions of the memory block. */ /*! Resize the dimensions of the memory block. */
inline Tensor& Resize(const DDim& dims); inline Tensor& Resize(const DDim& dims);
...@@ -146,12 +152,14 @@ class Tensor { ...@@ -146,12 +152,14 @@ class Tensor {
template <typename Place> template <typename Place>
struct PlaceholderImpl : public Placeholder { struct PlaceholderImpl : public Placeholder {
PlaceholderImpl(Place place, size_t size, std::type_index type) PlaceholderImpl(Place place, size_t size, std::type_index type,
: ptr_(static_cast<uint8_t*>(memory::Alloc(place, size)), bool is_pinned = false)
memory::PODDeleter<uint8_t, Place>(place)), : ptr_(static_cast<uint8_t*>(memory::Alloc(place, size, is_pinned)),
memory::PODDeleter<uint8_t, Place>(place, is_pinned)),
place_(place), place_(place),
size_(size), size_(size),
type_(type) { type_(type),
is_pinned_(is_pinned) {
PADDLE_ENFORCE_NOT_NULL(ptr_, "Insufficient %s memory to allocation.", PADDLE_ENFORCE_NOT_NULL(ptr_, "Insufficient %s memory to allocation.",
(is_cpu_place(place_) ? "CPU" : "GPU")); (is_cpu_place(place_) ? "CPU" : "GPU"));
} }
...@@ -174,6 +182,9 @@ class Tensor { ...@@ -174,6 +182,9 @@ class Tensor {
/* the current type of memory */ /* the current type of memory */
std::type_index type_; std::type_index type_;
/*! use pinned memory or not. */
bool is_pinned_;
}; };
/*! holds the memory block if allocated. */ /*! holds the memory block if allocated. */
...@@ -208,6 +219,7 @@ class Tensor { ...@@ -208,6 +219,7 @@ class Tensor {
* PlaceHolder::ptr_ and where the tensor data really begins. * PlaceHolder::ptr_ and where the tensor data really begins.
*/ */
size_t offset_; size_t offset_;
bool is_pinned_;
}; };
inline void Tensor::switch_place(platform::Place new_place) { inline void Tensor::switch_place(platform::Place new_place) {
......
...@@ -101,19 +101,21 @@ inline T* Tensor::data() { ...@@ -101,19 +101,21 @@ inline T* Tensor::data() {
} }
template <typename T> template <typename T>
inline T* Tensor::mutable_data(DDim dims, platform::Place place) { inline T* Tensor::mutable_data(DDim dims, platform::Place place,
bool is_pinned) {
static_assert(std::is_pod<T>::value, "T must be POD"); static_assert(std::is_pod<T>::value, "T must be POD");
Resize(dims); Resize(dims);
return mutable_data<T>(place); return mutable_data<T>(place, is_pinned);
} }
template <typename T> template <typename T>
inline T* Tensor::mutable_data(platform::Place place) { inline T* Tensor::mutable_data(platform::Place place, bool is_pinned) {
static_assert(std::is_pod<T>::value, "T must be POD"); static_assert(std::is_pod<T>::value, "T must be POD");
return reinterpret_cast<T*>(mutable_data(place, typeid(T))); return reinterpret_cast<T*>(mutable_data(place, typeid(T), is_pinned));
} }
inline void* Tensor::mutable_data(platform::Place place, std::type_index type) { inline void* Tensor::mutable_data(platform::Place place, std::type_index type,
bool is_pinned) {
if (holder_ != nullptr) { if (holder_ != nullptr) {
holder_->set_type(type); holder_->set_type(type);
} }
...@@ -127,26 +129,27 @@ inline void* Tensor::mutable_data(platform::Place place, std::type_index type) { ...@@ -127,26 +129,27 @@ inline void* Tensor::mutable_data(platform::Place place, std::type_index type) {
holder_->size() < size + offset_) { holder_->size() < size + offset_) {
if (platform::is_cpu_place(place)) { if (platform::is_cpu_place(place)) {
holder_.reset(new PlaceholderImpl<platform::CPUPlace>( holder_.reset(new PlaceholderImpl<platform::CPUPlace>(
boost::get<platform::CPUPlace>(place), size, type)); boost::get<platform::CPUPlace>(place), size, type, is_pinned));
} else if (platform::is_gpu_place(place)) { } else if (platform::is_gpu_place(place)) {
#ifndef PADDLE_WITH_CUDA #ifndef PADDLE_WITH_CUDA
PADDLE_THROW("'CUDAPlace' is not supported in CPU only device."); PADDLE_THROW("'CUDAPlace' is not supported in CPU only device.");
} }
#else #else
holder_.reset(new PlaceholderImpl<platform::CUDAPlace>( holder_.reset(new PlaceholderImpl<platform::CUDAPlace>(
boost::get<platform::CUDAPlace>(place), size, type)); boost::get<platform::CUDAPlace>(place), size, type, is_pinned));
} }
#endif #endif
offset_ = 0; offset_ = 0;
is_pinned_ = is_pinned;
} }
return reinterpret_cast<void*>(reinterpret_cast<uintptr_t>(holder_->ptr()) + return reinterpret_cast<void*>(reinterpret_cast<uintptr_t>(holder_->ptr()) +
offset_); offset_);
} }
inline void* Tensor::mutable_data(platform::Place place) { inline void* Tensor::mutable_data(platform::Place place, bool is_pinned) {
PADDLE_ENFORCE(this->holder_ != nullptr, PADDLE_ENFORCE(this->holder_ != nullptr,
"Cannot invoke mutable data if current hold nothing"); "Cannot invoke mutable data if current hold nothing");
return mutable_data(place, holder_->type()); return mutable_data(place, holder_->type(), is_pinned);
} }
inline Tensor& Tensor::ShareDataWith(const Tensor& src) { inline Tensor& Tensor::ShareDataWith(const Tensor& src) {
...@@ -188,6 +191,8 @@ inline const DDim& Tensor::dims() const { return dims_; } ...@@ -188,6 +191,8 @@ inline const DDim& Tensor::dims() const { return dims_; }
inline int64_t Tensor::numel() const { return product(dims_); } inline int64_t Tensor::numel() const { return product(dims_); }
inline bool Tensor::isPinned() const { return is_pinned_; }
inline Tensor ReshapeToMatrix(const Tensor& src, int num_col_dims) { inline Tensor ReshapeToMatrix(const Tensor& src, int num_col_dims) {
Tensor res; Tensor res;
res.ShareDataWith(src); res.ShareDataWith(src);
......
...@@ -130,6 +130,50 @@ void GPUAllocator::Free(void* p, size_t size, size_t index) { ...@@ -130,6 +130,50 @@ void GPUAllocator::Free(void* p, size_t size, size_t index) {
bool GPUAllocator::UseGpu() const { return true; } bool GPUAllocator::UseGpu() const { return true; }
// PINNED memory allows direct DMA transfers by the GPU to and from system
// memory. It’s locked to a physical address.
void* CUDAPinnedAllocator::Alloc(size_t& index, size_t size) {
if (size <= 0) return nullptr;
void* p;
// NOTE: here, we use GpuMaxAllocSize() as the maximum memory size
// of host pinned allocation. Allocates too much would reduce
// the amount of memory available to the underlying system for paging.
size_t usable = paddle::platform::GpuMaxAllocSize() - fallback_alloc_size_;
if (size > usable) return nullptr;
// PINNED memory is visible to all CUDA contexts.
cudaError_t result = cudaMallocHost(&p, size);
if (result == cudaSuccess) {
index = 1;
fallback_alloc_size_ += size;
return p;
}
return nullptr;
}
void CUDAPinnedAllocator::Free(void* p, size_t size, size_t index) {
cudaError_t err;
PADDLE_ASSERT(index == 1);
PADDLE_ASSERT(fallback_alloc_size_ >= size);
fallback_alloc_size_ -= size;
err = cudaFreeHost(p);
// Purposefully allow cudaErrorCudartUnloading, because
// that is returned if you ever call cudaFreeHost after the
// driver has already shutdown. This happens only if the
// process is terminating, in which case we don't care if
// cudaFreeHost succeeds.
if (err != cudaErrorCudartUnloading) {
PADDLE_ENFORCE(err, "cudaFreeHost failed in GPUPinnedAllocator::Free.");
}
}
bool CUDAPinnedAllocator::UseGpu() const { return true; }
#endif #endif
} // namespace detail } // namespace detail
......
...@@ -54,6 +54,18 @@ class GPUAllocator : public SystemAllocator { ...@@ -54,6 +54,18 @@ class GPUAllocator : public SystemAllocator {
size_t fallback_alloc_size_ = 0; size_t fallback_alloc_size_ = 0;
int gpu_id_; int gpu_id_;
}; };
class CUDAPinnedAllocator : public SystemAllocator {
public:
virtual void* Alloc(size_t& index, size_t size);
virtual void Free(void* p, size_t size, size_t index);
virtual bool UseGpu() const;
private:
size_t gpu_alloc_size_ =
0; // TODO(zcd): how to define the upper limit of CUDAPinnedMemory?
size_t fallback_alloc_size_ = 0;
};
#endif #endif
} // namespace detail } // namespace detail
......
...@@ -38,7 +38,8 @@ BuddyAllocator* GetCPUBuddyAllocator() { ...@@ -38,7 +38,8 @@ BuddyAllocator* GetCPUBuddyAllocator() {
} }
template <> template <>
void* Alloc<platform::CPUPlace>(platform::CPUPlace place, size_t size) { void* Alloc<platform::CPUPlace>(platform::CPUPlace place, size_t size,
bool is_pinned) {
VLOG(10) << "Allocate " << size << " bytes on " << platform::Place(place); VLOG(10) << "Allocate " << size << " bytes on " << platform::Place(place);
void* p = GetCPUBuddyAllocator()->Alloc(size); void* p = GetCPUBuddyAllocator()->Alloc(size);
VLOG(10) << " pointer=" << p; VLOG(10) << " pointer=" << p;
...@@ -46,7 +47,8 @@ void* Alloc<platform::CPUPlace>(platform::CPUPlace place, size_t size) { ...@@ -46,7 +47,8 @@ void* Alloc<platform::CPUPlace>(platform::CPUPlace place, size_t size) {
} }
template <> template <>
void Free<platform::CPUPlace>(platform::CPUPlace place, void* p) { void Free<platform::CPUPlace>(platform::CPUPlace place, void* p,
bool is_pinned) {
VLOG(10) << "Free pointer=" << p << " on " << platform::Place(place); VLOG(10) << "Free pointer=" << p << " on " << platform::Place(place);
GetCPUBuddyAllocator()->Free(p); GetCPUBuddyAllocator()->Free(p);
} }
...@@ -82,15 +84,47 @@ BuddyAllocator* GetGPUBuddyAllocator(int gpu_id) { ...@@ -82,15 +84,47 @@ BuddyAllocator* GetGPUBuddyAllocator(int gpu_id) {
return as[gpu_id]; return as[gpu_id];
} }
BuddyAllocator* GetCUDAPinnedBuddyAllocator(int gpu_id) {
static BuddyAllocator** as = NULL;
if (as == NULL) {
int gpu_num = platform::GetCUDADeviceCount();
as = new BuddyAllocator*[gpu_num];
for (int gpu = 0; gpu < gpu_num; gpu++) {
as[gpu] = nullptr;
}
}
platform::SetDeviceId(gpu_id);
if (!as[gpu_id]) {
as[gpu_id] = new BuddyAllocator(new detail::CUDAPinnedAllocator,
platform::GpuMinChunkSize(),
platform::GpuMaxChunkSize());
VLOG(10) << "\n\nNOTE: each GPU device use "
<< FLAGS_fraction_of_gpu_memory_to_use * 100
<< "% of GPU memory.\n"
<< "You can set GFlags environment variable '"
<< "FLAGS_fraction_of_gpu_memory_to_use"
<< "' to change the fraction of GPU usage.\n\n";
}
return as[gpu_id];
}
template <> template <>
size_t Used<platform::CUDAPlace>(platform::CUDAPlace place) { size_t Used<platform::CUDAPlace>(platform::CUDAPlace place) {
return GetGPUBuddyAllocator(place.device)->Used(); return GetGPUBuddyAllocator(place.device)->Used();
} }
template <> template <>
void* Alloc<platform::CUDAPlace>(platform::CUDAPlace place, size_t size) { void* Alloc<platform::CUDAPlace>(platform::CUDAPlace place, size_t size,
auto* buddy_allocator = GetGPUBuddyAllocator(place.device); bool is_pinned) {
auto* ptr = buddy_allocator->Alloc(size); void* ptr;
if (is_pinned) {
auto* buddy_allocator = GetCUDAPinnedBuddyAllocator(place.device);
ptr = buddy_allocator->Alloc(size);
} else {
auto* buddy_allocator = GetGPUBuddyAllocator(place.device);
ptr = buddy_allocator->Alloc(size);
}
if (ptr == nullptr) { if (ptr == nullptr) {
int cur_dev = platform::GetCurrentDeviceId(); int cur_dev = platform::GetCurrentDeviceId();
platform::SetDeviceId(place.device); platform::SetDeviceId(place.device);
...@@ -108,8 +142,13 @@ void* Alloc<platform::CUDAPlace>(platform::CUDAPlace place, size_t size) { ...@@ -108,8 +142,13 @@ void* Alloc<platform::CUDAPlace>(platform::CUDAPlace place, size_t size) {
} }
template <> template <>
void Free<platform::CUDAPlace>(platform::CUDAPlace place, void* p) { void Free<platform::CUDAPlace>(platform::CUDAPlace place, void* p,
GetGPUBuddyAllocator(place.device)->Free(p); bool is_pinned) {
if (is_pinned) {
GetCUDAPinnedBuddyAllocator(place.device)->Free(p);
} else {
GetGPUBuddyAllocator(place.device)->Free(p);
}
} }
#endif #endif
......
...@@ -33,7 +33,7 @@ namespace memory { ...@@ -33,7 +33,7 @@ namespace memory {
* address is valid or not. * address is valid or not.
*/ */
template <typename Place> template <typename Place>
void* Alloc(Place place, size_t size); void* Alloc(Place place, size_t size, bool is_pinned = false);
/** /**
* \brief Free memory block in one place. * \brief Free memory block in one place.
...@@ -43,7 +43,7 @@ void* Alloc(Place place, size_t size); ...@@ -43,7 +43,7 @@ void* Alloc(Place place, size_t size);
* *
*/ */
template <typename Place> template <typename Place>
void Free(Place place, void* ptr); void Free(Place place, void* ptr, bool is_pinned = false);
/** /**
* \brief Total size of used memory in one place. * \brief Total size of used memory in one place.
...@@ -74,11 +74,13 @@ class PODDeleter { ...@@ -74,11 +74,13 @@ class PODDeleter {
static_assert(std::is_pod<T>::value, "T must be POD"); static_assert(std::is_pod<T>::value, "T must be POD");
public: public:
explicit PODDeleter(Place place) : place_(place) {} explicit PODDeleter(Place place, bool is_pinned = false)
void operator()(T* ptr) { Free(place_, static_cast<void*>(ptr)); } : place_(place), is_pinned_(is_pinned) {}
void operator()(T* ptr) { Free(place_, static_cast<void*>(ptr), is_pinned_); }
private: private:
Place place_; Place place_;
bool is_pinned_;
}; };
/** /**
......
...@@ -264,3 +264,4 @@ cc_test(strided_memcpy_test SRCS strided_memcpy_test.cc DEPS tensor paddle_memor ...@@ -264,3 +264,4 @@ cc_test(strided_memcpy_test SRCS strided_memcpy_test.cc DEPS tensor paddle_memor
cc_test(save_load_op_test SRCS save_load_op_test.cc DEPS save_op load_op) cc_test(save_load_op_test SRCS save_load_op_test.cc DEPS save_op load_op)
cc_test(save_load_combine_op_test SRCS save_load_combine_op_test.cc DEPS save_combine_op load_combine_op) cc_test(save_load_combine_op_test SRCS save_load_combine_op_test.cc DEPS save_combine_op load_combine_op)
nv_test(nccl_op_test SRCS nccl_op_test.cu.cc DEPS nccl_op gpu_info device_context) nv_test(nccl_op_test SRCS nccl_op_test.cu.cc DEPS nccl_op gpu_info device_context)
nv_test(dropout_op_test SRCS dropout_op_test.cc DEPS dropout_op tensor)
...@@ -23,21 +23,10 @@ limitations under the License. */ ...@@ -23,21 +23,10 @@ limitations under the License. */
static constexpr char Channel[] = "Channel"; static constexpr char Channel[] = "Channel";
static constexpr char X[] = "X"; static constexpr char X[] = "X";
static constexpr char Status[] = "Status";
static constexpr char copy[] = "copy";
namespace paddle { namespace paddle {
namespace operators { namespace operators {
void SetSendStatus(const platform::Place &dev_place,
framework::Variable &status_var, bool status) {
auto cpu = platform::CPUPlace();
auto status_tensor =
status_var.GetMutable<framework::LoDTensor>()->mutable_data<bool>({1},
cpu);
status_tensor[0] = status;
}
class ChannelSendOp : public framework::OperatorBase { class ChannelSendOp : public framework::OperatorBase {
public: public:
ChannelSendOp(const std::string &type, ChannelSendOp(const std::string &type,
...@@ -51,9 +40,6 @@ class ChannelSendOp : public framework::OperatorBase { ...@@ -51,9 +40,6 @@ class ChannelSendOp : public framework::OperatorBase {
"Input(Channel) of ChannelSendOp should not be null."); "Input(Channel) of ChannelSendOp should not be null.");
PADDLE_ENFORCE(ctx->HasInput(X), PADDLE_ENFORCE(ctx->HasInput(X),
"Input(X) of ChannelSendOp should not be null."); "Input(X) of ChannelSendOp should not be null.");
PADDLE_ENFORCE(ctx->HasOutput(Status),
"Output(Status) of ChannelSendOp should not be null.");
ctx->SetOutputDim("Status", {1});
} }
private: private:
...@@ -65,10 +51,7 @@ class ChannelSendOp : public framework::OperatorBase { ...@@ -65,10 +51,7 @@ class ChannelSendOp : public framework::OperatorBase {
auto input_var = scope.FindVar(Input(X)); auto input_var = scope.FindVar(Input(X));
// Send the input data through the channel. // Send the input data through the channel.
bool ok = concurrency::ChannelSend(ch, input_var); concurrency::ChannelSend(ch, input_var);
// Set the status output of the `ChannelSend` call.
SetSendStatus(dev_place, *scope.FindVar(Output(Status)), ok);
} }
}; };
...@@ -82,12 +65,6 @@ class ChannelSendOpMaker : public framework::OpProtoAndCheckerMaker { ...@@ -82,12 +65,6 @@ class ChannelSendOpMaker : public framework::OpProtoAndCheckerMaker {
.AsDuplicable(); .AsDuplicable();
AddInput(X, "(Variable) The value which gets sent by the channel.") AddInput(X, "(Variable) The value which gets sent by the channel.")
.AsDuplicable(); .AsDuplicable();
AddOutput(Status,
"(Tensor) An LoD Tensor that returns a boolean status of the"
"result of the send operation.")
.AsDuplicable();
AddAttr<bool>(copy, "(bool, default false) Should copy before send")
.SetDefault(false);
AddComment(R"DOC( AddComment(R"DOC(
)DOC"); )DOC");
} }
......
...@@ -17,20 +17,20 @@ limitations under the License. */ ...@@ -17,20 +17,20 @@ limitations under the License. */
namespace poc = paddle::operators::concurrency; namespace poc = paddle::operators::concurrency;
bool poc::ChannelSend(framework::ChannelHolder *ch, framework::Variable *var) { void poc::ChannelSend(framework::ChannelHolder *ch, framework::Variable *var) {
auto type = framework::ToVarType(var->Type()); auto type = framework::ToVarType(var->Type());
if (type == framework::proto::VarType_Type_LOD_TENSOR) if (type == framework::proto::VarType_Type_LOD_TENSOR)
return ch->Send(var->GetMutable<framework::LoDTensor>()); ch->Send(var->GetMutable<framework::LoDTensor>());
else if (type == framework::proto::VarType_Type_LOD_RANK_TABLE) else if (type == framework::proto::VarType_Type_LOD_RANK_TABLE)
return ch->Send(var->GetMutable<framework::LoDRankTable>()); ch->Send(var->GetMutable<framework::LoDRankTable>());
else if (type == framework::proto::VarType_Type_LOD_TENSOR_ARRAY) else if (type == framework::proto::VarType_Type_LOD_TENSOR_ARRAY)
return ch->Send(var->GetMutable<framework::LoDTensorArray>()); ch->Send(var->GetMutable<framework::LoDTensorArray>());
else if (type == framework::proto::VarType_Type_SELECTED_ROWS) else if (type == framework::proto::VarType_Type_SELECTED_ROWS)
return ch->Send(var->GetMutable<framework::SelectedRows>()); ch->Send(var->GetMutable<framework::SelectedRows>());
else if (type == framework::proto::VarType_Type_READER) else if (type == framework::proto::VarType_Type_READER)
return ch->Send(var->GetMutable<framework::ReaderHolder>()); ch->Send(var->GetMutable<framework::ReaderHolder>());
else if (type == framework::proto::VarType_Type_CHANNEL) else if (type == framework::proto::VarType_Type_CHANNEL)
return ch->Send(var->GetMutable<framework::ChannelHolder>()); ch->Send(var->GetMutable<framework::ChannelHolder>());
else else
PADDLE_THROW("ChannelSend:Unsupported type"); PADDLE_THROW("ChannelSend:Unsupported type");
} }
......
...@@ -21,7 +21,7 @@ namespace paddle { ...@@ -21,7 +21,7 @@ namespace paddle {
namespace operators { namespace operators {
namespace concurrency { namespace concurrency {
bool ChannelSend(framework::ChannelHolder *ch, framework::Variable *var); void ChannelSend(framework::ChannelHolder *ch, framework::Variable *var);
bool ChannelReceive(framework::ChannelHolder *ch, framework::Variable *var); bool ChannelReceive(framework::ChannelHolder *ch, framework::Variable *var);
void ChannelAddToSendQ(framework::ChannelHolder *ch, const void *referrer, void ChannelAddToSendQ(framework::ChannelHolder *ch, const void *referrer,
......
...@@ -48,6 +48,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, ...@@ -48,6 +48,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input,
void* dest, int size) { void* dest, int size) {
const void* data = NULL; const void* data = NULL;
int size_to_write = 0; int size_to_write = 0;
int length = size;
int total_written = 0;
if (platform::is_gpu_place(place)) { if (platform::is_gpu_place(place)) {
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
...@@ -56,16 +58,21 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, ...@@ -56,16 +58,21 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input,
platform::CPUPlace cpu; platform::CPUPlace cpu;
char* p = reinterpret_cast<char*>(dest); char* p = reinterpret_cast<char*>(dest);
while (size > 0) { while (total_written < length) {
if (!input->GetDirectBufferPointer(&data, &size_to_write)) { if (!input->GetDirectBufferPointer(&data, &size_to_write)) {
return false; return false;
} }
// NOTE: if raw buffer is large and have two neighbor fields of raw
// buffers GetDirectBufferPointer can get all of them, use length to
// truncate it.
if (total_written + size_to_write > length) {
size_to_write = length - total_written;
}
memory::Copy(boost::get<platform::CUDAPlace>(place), memory::Copy(boost::get<platform::CUDAPlace>(place),
reinterpret_cast<void*>(p), cpu, data, size_to_write, reinterpret_cast<void*>(p), cpu, data, size_to_write,
gpu_dev_ctx.stream()); gpu_dev_ctx.stream());
p += size_to_write; p += size_to_write;
size -= size_to_write; total_written += size_to_write;
input->Skip(size_to_write); input->Skip(size_to_write);
} }
...@@ -77,16 +84,21 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, ...@@ -77,16 +84,21 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input,
} }
char* p = reinterpret_cast<char*>(dest); char* p = reinterpret_cast<char*>(dest);
while (size > 0) { while (total_written < length) {
if (!input->GetDirectBufferPointer(&data, &size_to_write)) { if (!input->GetDirectBufferPointer(&data, &size_to_write)) {
return false; return false;
} }
// NOTE: if raw buffer is large and have two neighbor fields of raw buffers
// GetDirectBufferPointer can get all of them, use length to truncate it.
if (total_written + size_to_write > length) {
size_to_write = length - total_written;
}
// TODO(gongwb): can we avoid copy? // TODO(gongwb): can we avoid copy?
platform::CPUPlace cpu; platform::CPUPlace cpu;
memory::Copy(cpu, reinterpret_cast<void*>(p), cpu, data, size_to_write); memory::Copy(cpu, reinterpret_cast<void*>(p), cpu, data, size_to_write);
p += size_to_write; p += size_to_write;
size -= size_to_write; total_written += size_to_write;
input->Skip(size_to_write); input->Skip(size_to_write);
} }
...@@ -153,6 +165,7 @@ bool VariableResponse::CopySelectRowsData( ...@@ -153,6 +165,7 @@ bool VariableResponse::CopySelectRowsData(
const platform::DeviceContext& ctx, int length) { const platform::DeviceContext& ctx, int length) {
auto var = scope_->FindVar(meta_.varname()); auto var = scope_->FindVar(meta_.varname());
auto* slr = var->GetMutable<framework::SelectedRows>(); auto* slr = var->GetMutable<framework::SelectedRows>();
slr->mutable_rows()->resize(length / 8); // int64
int64_t* rows_data = slr->mutable_rows()->data(); int64_t* rows_data = slr->mutable_rows()->data();
// copy rows CPU data, GPU data will be copied lazily. // copy rows CPU data, GPU data will be copied lazily.
...@@ -233,7 +246,6 @@ int VariableResponse::Parse(Source* source) { ...@@ -233,7 +246,6 @@ int VariableResponse::Parse(Source* source) {
if (tag != 0) { if (tag != 0) {
return -1; return -1;
} }
return 0; return 0;
} }
......
...@@ -55,9 +55,6 @@ class GPUDropoutKernel : public framework::OpKernel<T> { ...@@ -55,9 +55,6 @@ class GPUDropoutKernel : public framework::OpKernel<T> {
y->mutable_data<T>(context.GetPlace()); y->mutable_data<T>(context.GetPlace());
float dropout_prob = context.Attr<float>("dropout_prob"); float dropout_prob = context.Attr<float>("dropout_prob");
auto X = EigenMatrix<T>::Reshape(*x, 1);
auto Y = EigenMatrix<T>::Reshape(*y, 1);
auto& place = *context.template device_context<Place>().eigen_device(); auto& place = *context.template device_context<Place>().eigen_device();
if (!context.Attr<bool>("is_test")) { if (!context.Attr<bool>("is_test")) {
auto* mask = context.Output<Tensor>("Mask"); auto* mask = context.Output<Tensor>("Mask");
...@@ -76,6 +73,8 @@ class GPUDropoutKernel : public framework::OpKernel<T> { ...@@ -76,6 +73,8 @@ class GPUDropoutKernel : public framework::OpKernel<T> {
T><<<grid, threads, 0, context.cuda_device_context().stream()>>>( T><<<grid, threads, 0, context.cuda_device_context().stream()>>>(
size, seed, dropout_prob, x_data, mask_data, y_data); size, seed, dropout_prob, x_data, mask_data, y_data);
} else { } else {
auto X = EigenMatrix<T>::Reshape(*x, 1);
auto Y = EigenMatrix<T>::Reshape(*y, 1);
Y.device(place) = X * static_cast<T>(1.0f - dropout_prob); Y.device(place) = X * static_cast<T>(1.0f - dropout_prob);
} }
} }
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <unistd.h>
#include <string>
#include <thread>
#include "gtest/gtest.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/operators/dropout_op.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/string/printf.h"
namespace f = paddle::framework;
namespace p = paddle::platform;
namespace m = paddle::operators::math;
USE_OP(dropout);
void Compare(f::Scope& scope, p::DeviceContext& ctx) {
// init
auto var = scope.Var("X");
auto tensor = var->GetMutable<f::LoDTensor>();
tensor->Resize({10, 10});
std::vector<float> init;
for (int64_t i = 0; i < 10 * 10; ++i) {
init.push_back(1.0);
}
TensorFromVector(init, ctx, tensor);
auto place = ctx.GetPlace();
auto out_var = scope.Var("Out");
auto out_tensor = out_var->GetMutable<f::LoDTensor>();
out_tensor->Resize({10, 10});
out_tensor->mutable_data<float>(place); // allocate
auto mask_var = scope.Var("Mask");
auto mask_tensor = mask_var->GetMutable<f::LoDTensor>();
mask_tensor->Resize({10, 10});
mask_tensor->mutable_data<float>(place); // allocate
// run
f::AttributeMap attrs;
float dropout_prob = 0.5;
attrs.insert({"fix_seed", 1});
attrs.insert({"seed", 3});
attrs.insert({"dropout_prob", dropout_prob});
auto dropout_op = f::OpRegistry::CreateOp(
"dropout", {{"X", {"X"}}}, {{"Out", {"Out"}}, {"Mask", {"Mask"}}}, attrs);
dropout_op->Run(scope, place);
std::vector<float> out_vec;
TensorToVector(*out_tensor, ctx, &out_vec);
std::vector<float> std_out = {
0, 0, 1, 1, 1, 1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 1, 0, 1,
1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0, 1, 1, 0,
1, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 0, 1, 0, 0, 1, 1,
1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0,
1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1};
EXPECT_EQ(out_vec.size(), std_out.size());
for (uint32_t i = 0; i < out_vec.size(); i++) {
EXPECT_EQ(out_vec[i], std_out[i]);
}
}
TEST(Dropout, CPUDense) {
f::Scope scope;
p::CPUPlace place;
p::CPUDeviceContext ctx(place);
Compare(scope, ctx);
}
TEST(Dropout, GPUDense) {
f::Scope scope;
p::CUDAPlace place;
p::CUDADeviceContext ctx(place);
Compare(scope, ctx);
}
...@@ -22,6 +22,103 @@ limitations under the License. */ ...@@ -22,6 +22,103 @@ limitations under the License. */
namespace paddle { namespace paddle {
namespace operators { namespace operators {
// Wrap RowwiseMean and ColwiseMean.
// Reuse the cpu codes and replace the gpu codes with cublas_gemv, which is
// significantly faster. Unlike the RowwiseMean and ColwiseMean, the
// implementation only considers 2D.
template <typename DeviceContext, typename T>
struct RowwiseMean2D {
RowwiseMean2D(int left, int right, const platform::DeviceContext& dev_ctx);
void operator()(const platform::DeviceContext& context,
const framework::Tensor& input, framework::Tensor* vec);
};
#ifdef PADDLE_WITH_CUDA
template <typename T>
class RowwiseMean2D<platform::CUDADeviceContext, T> {
public:
RowwiseMean2D(int left, int right, const platform::DeviceContext& dev_ctx)
: left_(left), right_(right) {
framework::DDim ones_dim({right_});
divisor_.mutable_data<T>(ones_dim, dev_ctx.GetPlace());
math::set_constant(dev_ctx, &divisor_, 1.0 / right);
}
void operator()(const platform::CUDADeviceContext& context,
const framework::Tensor& input, framework::Tensor* out) {
math::gemv<platform::CUDADeviceContext, T>(
context, false, left_, right_, 1., input.data<T>(), divisor_.data<T>(),
0., out->data<T>());
}
private:
int left_;
int right_;
framework::Tensor divisor_;
};
#endif
template <typename T>
class RowwiseMean2D<platform::CPUDeviceContext, T> {
public:
RowwiseMean2D(int left, int right, const platform::DeviceContext& dev_ctx) {}
void operator()(const platform::CPUDeviceContext& context,
const framework::Tensor& input, framework::Tensor* out) {
row_mean_(context, input, out);
}
private:
math::RowwiseMean<platform::CPUDeviceContext, T> row_mean_;
};
template <typename DeviceContext, typename T>
struct ColwiseSum2D {
ColwiseSum2D(int left, int right, const platform::DeviceContext& dev_ctx);
void operator()(const platform::DeviceContext& context,
const framework::Tensor& input, framework::Tensor* vec);
};
#ifdef PADDLE_WITH_CUDA
template <typename T>
class ColwiseSum2D<platform::CUDADeviceContext, T> {
public:
ColwiseSum2D(int left, int right, const platform::DeviceContext& dev_ctx)
: left_(left), right_(right) {
framework::DDim ones_dim({left_});
divisor_.mutable_data<T>(ones_dim, dev_ctx.GetPlace());
math::set_constant(dev_ctx, &divisor_, 1.0);
}
void operator()(const platform::CUDADeviceContext& context,
const framework::Tensor& input, framework::Tensor* out) {
math::gemv<platform::CUDADeviceContext, T>(
context, true, left_, right_, 1., input.data<T>(), divisor_.data<T>(),
0., out->data<T>());
}
private:
int left_;
int right_;
framework::Tensor divisor_;
};
#endif
template <typename T>
class ColwiseSum2D<platform::CPUDeviceContext, T> {
public:
ColwiseSum2D(int left, int right, const platform::DeviceContext& dev_ctx) {}
void operator()(const platform::CPUDeviceContext& context,
const framework::Tensor& input, framework::Tensor* out) {
col_wise_(context, input, out);
}
private:
math::ColwiseSum<platform::CPUDeviceContext, T> col_wise_;
};
template <typename T> template <typename T>
struct SubAndSquareFunctor { struct SubAndSquareFunctor {
inline HOSTDEVICE T operator()(T a, T b) const { return (a - b) * (a - b); } inline HOSTDEVICE T operator()(T a, T b) const { return (a - b) * (a - b); }
...@@ -67,15 +164,15 @@ using DataLayout = framework::DataLayout; ...@@ -67,15 +164,15 @@ using DataLayout = framework::DataLayout;
template <typename DeviceContext, typename T> template <typename DeviceContext, typename T>
class LayerNormKernel : public framework::OpKernel<T> { class LayerNormKernel : public framework::OpKernel<T> {
public: public:
void Compute(const framework::ExecutionContext &ctx) const override { void Compute(const framework::ExecutionContext& ctx) const override {
const float epsilon = ctx.Attr<float>("epsilon"); const float epsilon = ctx.Attr<float>("epsilon");
auto *scale = ctx.Input<Tensor>("Scale"); auto* scale = ctx.Input<Tensor>("Scale");
auto *bias = ctx.Input<Tensor>("Bias"); auto* bias = ctx.Input<Tensor>("Bias");
auto x = *ctx.Input<Tensor>("X"); auto x = *ctx.Input<Tensor>("X");
auto *y = ctx.Output<Tensor>("Y"); auto* y = ctx.Output<Tensor>("Y");
auto *mean = ctx.Output<Tensor>("Mean"); auto* mean = ctx.Output<Tensor>("Mean");
auto *var = ctx.Output<Tensor>("Variance"); auto* var = ctx.Output<Tensor>("Variance");
const auto begin_norm_axis = ctx.Attr<int>("begin_norm_axis"); const auto begin_norm_axis = ctx.Attr<int>("begin_norm_axis");
const auto x_dims = x.dims(); const auto x_dims = x.dims();
...@@ -94,8 +191,8 @@ class LayerNormKernel : public framework::OpKernel<T> { ...@@ -94,8 +191,8 @@ class LayerNormKernel : public framework::OpKernel<T> {
out.ShareDataWith(*y); out.ShareDataWith(*y);
out.Resize(matrix_shape); out.Resize(matrix_shape);
auto &dev_ctx = ctx.template device_context<DeviceContext>(); auto& dev_ctx = ctx.template device_context<DeviceContext>();
math::RowwiseMean<DeviceContext, T> row_mean; RowwiseMean2D<DeviceContext, T> row_mean(left, right, ctx.device_context());
// get mean // get mean
row_mean(dev_ctx, x, mean); row_mean(dev_ctx, x, mean);
...@@ -126,31 +223,32 @@ class LayerNormKernel : public framework::OpKernel<T> { ...@@ -126,31 +223,32 @@ class LayerNormKernel : public framework::OpKernel<T> {
template <typename DeviceContext, typename T> template <typename DeviceContext, typename T>
class LayerNormGradKernel : public framework::OpKernel<T> { class LayerNormGradKernel : public framework::OpKernel<T> {
public: public:
void Compute(const framework::ExecutionContext &ctx) const override { void Compute(const framework::ExecutionContext& ctx) const override {
const float epsilon = ctx.Attr<float>("epsilon"); const float epsilon = ctx.Attr<float>("epsilon");
auto x = *ctx.Input<Tensor>("X"); auto x = *ctx.Input<Tensor>("X");
auto *y = ctx.Input<Tensor>("Y"); auto* y = ctx.Input<Tensor>("Y");
auto *mean = ctx.Input<Tensor>("Mean"); auto* mean = ctx.Input<Tensor>("Mean");
auto *var = ctx.Input<Tensor>("Variance"); auto* var = ctx.Input<Tensor>("Variance");
auto *scale = ctx.Input<Tensor>("Scale"); auto* scale = ctx.Input<Tensor>("Scale");
auto *bias = ctx.Input<Tensor>("Bias"); auto* bias = ctx.Input<Tensor>("Bias");
auto d_y = *ctx.Input<Tensor>(framework::GradVarName("Y")); auto d_y = *ctx.Input<Tensor>(framework::GradVarName("Y"));
const auto begin_norm_axis = ctx.Attr<int>("begin_norm_axis"); const auto begin_norm_axis = ctx.Attr<int>("begin_norm_axis");
// init output // init output
auto *d_x = ctx.Output<Tensor>(framework::GradVarName("X")); auto* d_x = ctx.Output<Tensor>(framework::GradVarName("X"));
auto *d_scale = ctx.Output<Tensor>(framework::GradVarName("Scale")); auto* d_scale = ctx.Output<Tensor>(framework::GradVarName("Scale"));
auto *d_bias = ctx.Output<Tensor>(framework::GradVarName("Bias")); auto* d_bias = ctx.Output<Tensor>(framework::GradVarName("Bias"));
const auto &x_dims = x.dims(); const auto& x_dims = x.dims();
auto matrix_dim = framework::flatten_to_2d(x_dims, begin_norm_axis); auto matrix_dim = framework::flatten_to_2d(x_dims, begin_norm_axis);
int left = static_cast<int>(matrix_dim[0]); int left = static_cast<int>(matrix_dim[0]);
int right = static_cast<int>(matrix_dim[1]); int right = static_cast<int>(matrix_dim[1]);
framework::DDim matrix_shape({left, right}); framework::DDim matrix_shape({left, right});
d_y.Resize(matrix_shape); d_y.Resize(matrix_shape);
auto &dev_ctx = ctx.template device_context<DeviceContext>(); auto& dev_ctx = ctx.template device_context<DeviceContext>();
math::ColwiseSum<DeviceContext, T> colwise_sum; ColwiseSum2D<DeviceContext, T> colwise_sum(left, right,
ctx.device_context());
Tensor temp; Tensor temp;
Tensor temp_norm; Tensor temp_norm;
...@@ -190,7 +288,8 @@ class LayerNormGradKernel : public framework::OpKernel<T> { ...@@ -190,7 +288,8 @@ class LayerNormGradKernel : public framework::OpKernel<T> {
Tensor temp_vec; Tensor temp_vec;
temp_vec.mutable_data<T>(vec_shape, ctx.GetPlace()); temp_vec.mutable_data<T>(vec_shape, ctx.GetPlace());
math::RowwiseMean<DeviceContext, T> row_mean; RowwiseMean2D<DeviceContext, T> row_mean(left, right,
ctx.device_context());
if (d_scale) { if (d_scale) {
// dy_dx // dy_dx
......
...@@ -36,6 +36,14 @@ std::shared_ptr<T> insert_to_context(const std::string& key, ...@@ -36,6 +36,14 @@ std::shared_ptr<T> insert_to_context(const std::string& key,
return p; return p;
} }
template <typename... Args>
void run_primitive(Args&&... args) {
auto forward_op = mkldnn::lrn_forward{args...};
std::vector<mkldnn::primitive> pipeline = {forward_op};
mkldnn::stream(mkldnn::stream::kind::eager).submit(pipeline).wait();
}
} // namespace } // namespace
template <typename T> template <typename T>
...@@ -87,8 +95,6 @@ class LRNMKLDNNOpKernel : public paddle::framework::OpKernel<T> { ...@@ -87,8 +95,6 @@ class LRNMKLDNNOpKernel : public paddle::framework::OpKernel<T> {
auto dst_memory = mkldnn::memory{{dst_md, mkldnn_engine}, auto dst_memory = mkldnn::memory{{dst_md, mkldnn_engine},
static_cast<void*>(output_data)}; static_cast<void*>(output_data)};
std::unique_ptr<mkldnn::lrn_forward> forward_op = nullptr;
if (!is_test) { if (!is_test) {
const std::string key = ctx.op().Output("Out"); const std::string key = ctx.op().Output("Out");
const std::string key_src_memory = key + "@lrn_src_memory"; const std::string key_src_memory = key + "@lrn_src_memory";
...@@ -108,9 +114,7 @@ class LRNMKLDNNOpKernel : public paddle::framework::OpKernel<T> { ...@@ -108,9 +114,7 @@ class LRNMKLDNNOpKernel : public paddle::framework::OpKernel<T> {
key_workspace_memory, dev_ctx, key_workspace_memory, dev_ctx,
forward_pd->workspace_primitive_desc()); forward_pd->workspace_primitive_desc());
forward_op.reset(new mkldnn::lrn_forward{*forward_pd, *src_memory, run_primitive(*forward_pd, *src_memory, *workspace_memory, dst_memory);
*workspace_memory, dst_memory});
} else { } else {
auto forward_pd = auto forward_pd =
mkldnn::lrn_forward::primitive_desc{forward_desc, mkldnn_engine}; mkldnn::lrn_forward::primitive_desc{forward_desc, mkldnn_engine};
...@@ -119,12 +123,8 @@ class LRNMKLDNNOpKernel : public paddle::framework::OpKernel<T> { ...@@ -119,12 +123,8 @@ class LRNMKLDNNOpKernel : public paddle::framework::OpKernel<T> {
auto workspace_memory = auto workspace_memory =
mkldnn::memory{forward_pd.workspace_primitive_desc()}; mkldnn::memory{forward_pd.workspace_primitive_desc()};
forward_op.reset(new mkldnn::lrn_forward{forward_pd, src_memory, run_primitive(forward_pd, src_memory, workspace_memory, dst_memory);
workspace_memory, dst_memory});
} }
std::vector<mkldnn::primitive> pipeline = {*forward_op};
mkldnn::stream(mkldnn::stream::kind::eager).submit(pipeline).wait();
} }
}; };
...@@ -136,6 +136,9 @@ class LRNMKLDNNGradOpKernel : public paddle::framework::OpKernel<T> { ...@@ -136,6 +136,9 @@ class LRNMKLDNNGradOpKernel : public paddle::framework::OpKernel<T> {
"MKLDNN LRN must use float data."); "MKLDNN LRN must use float data.");
PADDLE_ENFORCE(paddle::platform::is_cpu_place(ctx.GetPlace()), PADDLE_ENFORCE(paddle::platform::is_cpu_place(ctx.GetPlace()),
"MKLDNN LRN must use CPUPlace."); "MKLDNN LRN must use CPUPlace.");
PADDLE_ENFORCE(
!ctx.Attr<bool>("is_test"),
"is_test attribute should be set to False in training phase.");
auto x = ctx.Input<Tensor>("X"); auto x = ctx.Input<Tensor>("X");
......
...@@ -155,8 +155,8 @@ class LRNOp : public framework::OperatorWithKernel { ...@@ -155,8 +155,8 @@ class LRNOp : public framework::OperatorWithKernel {
PADDLE_ENFORCE_EQ(x_dim.size(), 4, "Input(X)'rank of LRNOp should be 4."); PADDLE_ENFORCE_EQ(x_dim.size(), 4, "Input(X)'rank of LRNOp should be 4.");
ctx->SetOutputDim("Out", x_dim); ctx->SetOutputDim("Out", x_dim);
ctx->SetOutputDim("MidOut", x_dim);
ctx->ShareLoD("X", /*->*/ "Out"); ctx->ShareLoD("X", /*->*/ "Out");
ctx->SetOutputDim("MidOut", x_dim);
} }
framework::OpKernelType GetExpectedKernelType( framework::OpKernelType GetExpectedKernelType(
......
...@@ -19,13 +19,6 @@ limitations under the License. */ ...@@ -19,13 +19,6 @@ limitations under the License. */
#include <mkl_vml_functions.h> #include <mkl_vml_functions.h>
#endif #endif
#ifdef PADDLE_USE_ATLAS
extern "C" {
#include <cblas.h>
#include <clapack.h>
}
#endif
#ifdef PADDLE_USE_OPENBLAS #ifdef PADDLE_USE_OPENBLAS
#include <cblas.h> #include <cblas.h>
#include <lapacke.h> #include <lapacke.h>
......
...@@ -144,7 +144,12 @@ class ParallelDoOp : public framework::OperatorBase { ...@@ -144,7 +144,12 @@ class ParallelDoOp : public framework::OperatorBase {
PADDLE_ENFORCE(scope.FindVar(param)->IsType<LoDTensor>(), PADDLE_ENFORCE(scope.FindVar(param)->IsType<LoDTensor>(),
"Only support parameter type as LoDTensor"); "Only support parameter type as LoDTensor");
auto &src = scope.FindVar(param)->Get<LoDTensor>(); auto &src = scope.FindVar(param)->Get<LoDTensor>();
for (size_t i = 0; i < sub_scopes.size(); ++i) {
auto *sub_scope0 = sub_scopes[0];
auto *dst0 = sub_scope0->Var(param)->GetMutable<LoDTensor>();
dst0->ShareDataWith(src);
for (size_t i = 1; i < sub_scopes.size(); ++i) {
auto &place = places[i]; auto &place = places[i];
auto *sub_scope = sub_scopes[i]; auto *sub_scope = sub_scopes[i];
auto *dst = sub_scope->Var(param)->GetMutable<LoDTensor>(); auto *dst = sub_scope->Var(param)->GetMutable<LoDTensor>();
......
...@@ -166,7 +166,9 @@ void DoubleBufferReader::PrefetchThreadFunc() { ...@@ -166,7 +166,9 @@ void DoubleBufferReader::PrefetchThreadFunc() {
std::swap(gpu_batch, batch.payloads_); std::swap(gpu_batch, batch.payloads_);
} }
if (!buffer_->Send(&batch)) { try {
buffer_->Send(&batch);
} catch (paddle::platform::EnforceNotMet e) {
VLOG(5) << "WARNING: The double buffer channel has been closed. The " VLOG(5) << "WARNING: The double buffer channel has been closed. The "
"prefetch thread will terminate."; "prefetch thread will terminate.";
break; break;
......
...@@ -146,14 +146,19 @@ void MultipleReader::PrefetchThreadFunc(std::string file_name, ...@@ -146,14 +146,19 @@ void MultipleReader::PrefetchThreadFunc(std::string file_name,
while (reader->HasNext()) { while (reader->HasNext()) {
std::vector<framework::LoDTensor> ins; std::vector<framework::LoDTensor> ins;
reader->ReadNext(&ins); reader->ReadNext(&ins);
if (!buffer_->Send(&ins)) { try {
buffer_->Send(&ins);
} catch (paddle::platform::EnforceNotMet e) {
VLOG(5) << "WARNING: The buffer channel has been closed. The prefetch " VLOG(5) << "WARNING: The buffer channel has been closed. The prefetch "
"thread of file '" "thread of file '"
<< file_name << "' will terminate."; << file_name << "' will terminate.";
break; break;
} }
} }
if (!available_thread_idx_->Send(&thread_idx)) {
try {
available_thread_idx_->Send(&thread_idx);
} catch (paddle::platform::EnforceNotMet e) {
VLOG(5) << "WARNING: The available_thread_idx_ channel has been closed. " VLOG(5) << "WARNING: The available_thread_idx_ channel has been closed. "
"Fail to send thread_idx."; "Fail to send thread_idx.";
} }
......
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/split_ids_op.h"
namespace paddle {
namespace operators {
class SplitIdsOpMaker : public framework::OpProtoAndCheckerMaker {
public:
SplitIdsOpMaker(OpProto *proto, OpAttrChecker *op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("Ids", "(LoDTensor) the input ids with shape{batch_num, 1}");
AddOutput("Out", "(LoDTensor) The outputs of the input Ids.")
.AsDuplicable();
AddComment(R"DOC(
Split a LoDTensor of Ids into multi LoDTensors, the number is pserver's number
Example:
Input:
X = [1,2,3,4,5,6]
Out(3 output):
out0 = [3, 6]
out1 = [1, 4]
out2 = [2, 5]
)DOC");
}
};
class SplitIdsOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
void InferShape(framework::InferShapeContext *ctx) const override {
PADDLE_ENFORCE(ctx->HasInput("Ids"), "SplitIdsOp must has input Ids.");
PADDLE_ENFORCE(ctx->HasOutputs("Out"), "SplitIdsOp must has output Out.");
auto ids_var_type = ctx->GetInputsVarType("Ids").front();
PADDLE_ENFORCE_EQ(ids_var_type, framework::proto::VarType::LOD_TENSOR);
auto ids_dims = ctx->GetInputDim("Ids");
PADDLE_ENFORCE_EQ(ids_dims.size(), 2);
PADDLE_ENFORCE_EQ(ids_dims[1], 1);
}
};
class SplitIdsOpInferVarType : public framework::VarTypeInference {
public:
void operator()(const framework::OpDesc &op_desc,
framework::BlockDesc *block) const override {
for (auto &out_var : op_desc.Output("Out")) {
block->Var(out_var)->SetType(framework::proto::VarType::LOD_TENSOR);
}
}
};
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OPERATOR(split_ids, ops::SplitIdsOp, ops::SplitIdsOpMaker,
ops::SplitIdsOpInferVarType);
REGISTER_OP_CPU_KERNEL(
split_ids, ops::SplitIdsOpKernel<paddle::platform::CPUPlace, int64_t>);
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <vector>
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/math/selected_rows_functor.h"
namespace paddle {
namespace operators {
template <typename DeviceContext, typename T>
class SplitIdsOpKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
auto place = ctx.GetPlace();
if (!platform::is_cpu_place(place)) {
PADDLE_THROW("SplitIds do not support GPU kernel");
}
const auto* ids_t = ctx.Input<framework::LoDTensor>("Ids");
auto& ids_dims = ids_t->dims();
auto outs = ctx.MultiOutput<framework::LoDTensor>("Out");
const T* ids = ids_t->data<T>();
const size_t shard_num = outs.size();
std::vector<std::vector<T>> out_ids;
out_ids.resize(outs.size());
// split id by their shard_num.
for (size_t i = 0; i < ids_dims[0]; ++i) {
T id = ids[i];
size_t shard_id = static_cast<size_t>(id) % shard_num;
out_ids[shard_id].push_back(id);
}
// create tensor for each shard and send to parameter server
for (size_t i = 0; i < out_ids.size(); ++i) {
auto* shard_t = outs[i];
std::vector<T> ids = out_ids[i];
auto* shard_data = shard_t->mutable_data<T>(
framework::make_ddim({static_cast<int64_t>(ids.size()), 1}), place);
for (size_t i = 0; i < ids.size(); ++i) {
shard_data[i] = ids[i];
}
}
}
};
} // namespace operators
} // namespace paddle
...@@ -59,17 +59,10 @@ void* lapack_dso_handle = nullptr; ...@@ -59,17 +59,10 @@ void* lapack_dso_handle = nullptr;
} __name; // struct DynLoad__##__name } __name; // struct DynLoad__##__name
#endif #endif
#ifdef PADDLE_USE_ATLAS #define PADDLE_SGETRF LAPACKE_sgetrf
#define PADDLE_SGETRF clapack_sgetrf #define PADDLE_DGETRF LAPACKE_dgetrf
#define PADDLE_DGETRF clapack_dgetrf #define PADDLE_SGETRI LAPACKE_sgetri
#define PADDLE_SGETRI clapack_sgetri #define PADDLE_DGETRI LAPACKE_dgetri
#define PADDLE_DGETRI clapack_dgetri
#else
#define PADDLE_SGETRF LAPACKE_sgetrf
#define PADDLE_DGETRF LAPACKE_dgetrf
#define PADDLE_SGETRI LAPACKE_sgetri
#define PADDLE_DGETRI LAPACKE_dgetri
#endif
#define LAPACK_ROUTINE_EACH(__macro) \ #define LAPACK_ROUTINE_EACH(__macro) \
__macro(PADDLE_SGETRF) \ __macro(PADDLE_SGETRF) \
......
...@@ -21,7 +21,7 @@ limitations under the License. */ ...@@ -21,7 +21,7 @@ limitations under the License. */
#include <mkl_vml_functions.h> #include <mkl_vml_functions.h>
#endif #endif
#if defined(PADDLE_USE_ATLAS) || defined(PADDLE_USE_VECLIB) #if defined(PADDLE_USE_VECLIB)
extern "C" { extern "C" {
#include <cblas.h> #include <cblas.h>
#include <clapack.h> #include <clapack.h>
......
...@@ -153,9 +153,15 @@ if [ $? -ne 0 ]; then ...@@ -153,9 +153,15 @@ if [ $? -ne 0 ]; then
exit 1 exit 1
fi fi
INSTALLED_VERSION=`pip freeze 2>/dev/null | grep '^paddle' | sed 's/.*==//g'` if [ "@WITH_GPU@" == "ON" ]; then
PADDLE_NAME="paddlepaddle-gpu"
else
PADDLE_NAME="paddlepaddle"
fi
INSTALLED_VERSION=`pip freeze 2>/dev/null | grep "^${PADDLE_NAME}==" | sed 's/.*==//g'`
if [ -z ${INSTALLED_VERSION} ]; then if [ -z "${INSTALLED_VERSION}" ]; then
INSTALLED_VERSION="0.0.0" # not installed INSTALLED_VERSION="0.0.0" # not installed
fi fi
cat <<EOF | python - cat <<EOF | python -
......
...@@ -82,11 +82,14 @@ class SelectCase(object): ...@@ -82,11 +82,14 @@ class SelectCase(object):
RECEIVE = 2 RECEIVE = 2
def __init__(self, def __init__(self,
select,
case_idx, case_idx,
case_to_execute, case_to_execute,
channel_action_fn=None, channel_action_fn=None,
channel=None, channel=None,
value=None): value=None,
is_copy=False):
self.select = select
self.helper = LayerHelper('conditional_block') self.helper = LayerHelper('conditional_block')
self.main_program = self.helper.main_program self.main_program = self.helper.main_program
self.is_scalar_condition = True self.is_scalar_condition = True
...@@ -99,7 +102,24 @@ class SelectCase(object): ...@@ -99,7 +102,24 @@ class SelectCase(object):
self.action = (self.SEND self.action = (self.SEND
if channel_action_fn.__name__ == ('channel_send') else if channel_action_fn.__name__ == ('channel_send') else
self.RECEIVE) if channel_action_fn else self.DEFAULT self.RECEIVE) if channel_action_fn else self.DEFAULT
self.value = value
X = value
if self.action == self.SEND and is_copy:
# We create of copy of the data we want to send
copied_X = self.select.parent_block.create_var(
name=unique_name.generate(value.name + '_copy'),
type=value.type,
dtype=value.dtype,
shape=value.shape,
lod_level=value.lod_level,
capacity=value.capacity
if hasattr(value, 'capacity') else None, )
self.select.parent_block.append_op(
type="assign", inputs={"X": value}, outputs={"Out": copied_X})
X = copied_X
self.value = X
self.channel = channel self.channel = channel
def __enter__(self): def __enter__(self):
...@@ -173,6 +193,7 @@ class SelectCase(object): ...@@ -173,6 +193,7 @@ class SelectCase(object):
class Select(BlockGuard): class Select(BlockGuard):
def __init__(self, name=None): def __init__(self, name=None):
self.helper = LayerHelper('select', name=name) self.helper = LayerHelper('select', name=name)
self.parent_block = self.helper.main_program.current_block()
self.cases = [] self.cases = []
super(Select, self).__init__(self.helper.main_program) super(Select, self).__init__(self.helper.main_program)
...@@ -183,12 +204,12 @@ class Select(BlockGuard): ...@@ -183,12 +204,12 @@ class Select(BlockGuard):
super(Select, self).__enter__() super(Select, self).__enter__()
return self return self
def case(self, channel_action_fn, channel, value): def case(self, channel_action_fn, channel, value, is_copy=False):
"""Create a new block for this condition. """Create a new block for this condition.
""" """
select_case = SelectCase( select_case = SelectCase(self,
len(self.cases), self.case_to_execute, channel_action_fn, channel, len(self.cases), self.case_to_execute,
value) channel_action_fn, channel, value, is_copy)
self.cases.append(select_case) self.cases.append(select_case)
...@@ -197,7 +218,7 @@ class Select(BlockGuard): ...@@ -197,7 +218,7 @@ class Select(BlockGuard):
def default(self): def default(self):
"""Create a default case block for this condition. """Create a default case block for this condition.
""" """
default_case = SelectCase(len(self.cases), self.case_to_execute) default_case = SelectCase(self, len(self.cases), self.case_to_execute)
self.cases.append(default_case) self.cases.append(default_case)
...@@ -339,35 +360,26 @@ def channel_send(channel, value, is_copy=False): ...@@ -339,35 +360,26 @@ def channel_send(channel, value, is_copy=False):
main_program = helper.main_program main_program = helper.main_program
channel_send_block = main_program.current_block() channel_send_block = main_program.current_block()
status = helper.create_variable(
name=unique_name.generate('status'),
type=core.VarDesc.VarType.LOD_TENSOR,
dtype=core.VarDesc.VarType.BOOL)
X = value X = value
if is_copy is True: if is_copy:
copied_X = helper.create_variable( copied_X = helper.create_variable(
name=unique_name.generate(value.name + '_copy'), name=unique_name.generate(value.name + '_copy'),
type=value.type, type=value.type,
dtype=value.dtype, dtype=value.dtype,
shape=value.shape, shape=value.shape,
lod_level=value.lod_level, lod_level=value.lod_level,
capacity=value.capacity) capacity=value.capacity if hasattr(value, 'capacity') else None)
assign_op = channel_send_block.append_op( assign_op = channel_send_block.append_op(
type="assign_op", inputs={"X": value}, outputs={"Out": copied_X}) type="assign", inputs={"X": value}, outputs={"Out": copied_X})
X = copied_X X = copied_X
channel_send_op = channel_send_block.append_op( channel_send_block.append_op(
type="channel_send", type="channel_send", inputs={
inputs={
"Channel": channel, "Channel": channel,
"X": X, "X": X,
}, })
outputs={"Status": status})
return status
def channel_recv(channel, return_value): def channel_recv(channel, return_value):
......
...@@ -134,6 +134,7 @@ def detection_output(loc, ...@@ -134,6 +134,7 @@ def detection_output(loc,
scores = nn.softmax(input=scores) scores = nn.softmax(input=scores)
scores = ops.reshape(x=scores, shape=old_shape) scores = ops.reshape(x=scores, shape=old_shape)
scores = nn.transpose(scores, perm=[0, 2, 1]) scores = nn.transpose(scores, perm=[0, 2, 1])
scores.stop_gradient = True
nmsed_outs = helper.create_tmp_variable(dtype=decoded_box.dtype) nmsed_outs = helper.create_tmp_variable(dtype=decoded_box.dtype)
helper.append_op( helper.append_op(
type="multiclass_nms", type="multiclass_nms",
...@@ -148,6 +149,7 @@ def detection_output(loc, ...@@ -148,6 +149,7 @@ def detection_output(loc,
'score_threshold': score_threshold, 'score_threshold': score_threshold,
'nms_eta': 1.0 'nms_eta': 1.0
}) })
nmsed_outs.stop_gradient = True
return nmsed_outs return nmsed_outs
...@@ -837,4 +839,6 @@ def multi_box_head(inputs, ...@@ -837,4 +839,6 @@ def multi_box_head(inputs,
mbox_locs_concat = tensor.concat(mbox_locs, axis=1) mbox_locs_concat = tensor.concat(mbox_locs, axis=1)
mbox_confs_concat = tensor.concat(mbox_confs, axis=1) mbox_confs_concat = tensor.concat(mbox_confs, axis=1)
box.stop_gradient = True
var.stop_gradient = True
return mbox_locs_concat, mbox_confs_concat, box, var return mbox_locs_concat, mbox_confs_concat, box, var
...@@ -113,9 +113,9 @@ class ListenAndServ(object): ...@@ -113,9 +113,9 @@ class ListenAndServ(object):
which can receive variables from clients and run a block. which can receive variables from clients and run a block.
""" """
def __init__(self, endpoint, fan_in=1, optimizer_mode=True): def __init__(self, endpoint, inputs, fan_in=1, optimizer_mode=True):
self.helper = LayerHelper("listen_and_serv") self.helper = LayerHelper("listen_and_serv")
self.inputs = [] self.inputs = inputs
self.outputs = [] self.outputs = []
self.endpoint = endpoint self.endpoint = endpoint
self.fan_in = fan_in self.fan_in = fan_in
...@@ -160,18 +160,13 @@ class ListenAndServ(object): ...@@ -160,18 +160,13 @@ class ListenAndServ(object):
current_block = main_program.current_block() current_block = main_program.current_block()
parent_block = self.parent_block() parent_block = self.parent_block()
params, grads = self.get_params_and_grads()
param_names = [p.name for p in params]
grad_names = [g.name for g in grads]
parent_block.append_op( parent_block.append_op(
type='listen_and_serv', type='listen_and_serv',
inputs={}, inputs={"X": self.inputs},
outputs={}, outputs={},
attrs={ attrs={
'endpoint': self.endpoint, 'endpoint': self.endpoint,
'Fanin': self.fan_in, 'Fanin': self.fan_in,
'ParamList': param_names,
'GradList': grad_names,
'OptimizeBlock': current_block 'OptimizeBlock': current_block
}) })
...@@ -196,10 +191,14 @@ def Send(endpoints, send_vars, get_vars): ...@@ -196,10 +191,14 @@ def Send(endpoints, send_vars, get_vars):
endpoints = list(set(epmap)) endpoints = list(set(epmap))
helper = LayerHelper("Send", **locals()) helper = LayerHelper("Send", **locals())
rpc_client_var = default_main_program().global_block().create_var(
name="RPC_CLIENT_VAR", persistable=True, type=core.VarDesc.VarType.RAW)
helper.append_op( helper.append_op(
type="send", type="send",
inputs={"X": send_vars}, inputs={"X": send_vars},
outputs={"Out": get_vars}, outputs={"Out": get_vars,
"RPCClient": rpc_client_var},
attrs={"endpoints": endpoints, attrs={"endpoints": endpoints,
"epmap": epmap}) "epmap": epmap})
......
...@@ -74,6 +74,7 @@ __all__ = [ ...@@ -74,6 +74,7 @@ __all__ = [
'one_hot', 'one_hot',
'autoincreased_step_counter', 'autoincreased_step_counter',
'lod_reset', 'lod_reset',
'lrn',
] ]
...@@ -1482,6 +1483,7 @@ def batch_norm(input, ...@@ -1482,6 +1483,7 @@ def batch_norm(input,
param_attr=None, param_attr=None,
bias_attr=None, bias_attr=None,
data_layout='NCHW', data_layout='NCHW',
in_place=False,
name=None, name=None,
moving_mean_name=None, moving_mean_name=None,
moving_variance_name=None): moving_variance_name=None):
...@@ -1537,7 +1539,7 @@ def batch_norm(input, ...@@ -1537,7 +1539,7 @@ def batch_norm(input,
saved_mean = helper.create_tmp_variable(dtype=dtype, stop_gradient=True) saved_mean = helper.create_tmp_variable(dtype=dtype, stop_gradient=True)
saved_variance = helper.create_tmp_variable(dtype=dtype, stop_gradient=True) saved_variance = helper.create_tmp_variable(dtype=dtype, stop_gradient=True)
batch_norm_out = helper.create_tmp_variable(dtype) batch_norm_out = input if in_place else helper.create_tmp_variable(dtype)
helper.append_op( helper.append_op(
type="batch_norm", type="batch_norm",
...@@ -3410,3 +3412,73 @@ def lod_reset(x, y=None, target_lod=None): ...@@ -3410,3 +3412,73 @@ def lod_reset(x, y=None, target_lod=None):
raise ValueError("y and target_lod should not be both None.") raise ValueError("y and target_lod should not be both None.")
return out return out
def lrn(input, n=5, k=1.0, alpha=1e-4, beta=0.75, name=None):
"""
Local Response Normalization Layer. This layer performs a type of
"lateral inhibition" by normalizing over local input regions.
The formula is as follows:
.. math::
Output(i, x, y) = Input(i, x, y) / \left(
k + \alpha \sum\limits^{\min(C, c + n/2)}_{j = \max(0, c - n/2)}
(Input(j, x, y))^2 \right)^{\beta}
In the above equation:
* :math:`n`: The number of channels to sum over.
* :math:`k`: The offset (avoid being divided by 0).
* :math:`alpha`: The scaling parameter.
* :math:`beta`: The exponent parameter.
Refer to `ImageNet Classification with Deep Convolutional Neural Networks
<https://papers.nips.cc/paper/4824-imagenet-classification-with-deep-convolutional-neural-networks.pdf>`_
Args:
input (Variable): The input tensor of this layer, and the dimension of input tensor must be 4.
n (int, default 5): The number of channels to sum over.
k (float, default 1.0): An offset (usually positive to avoid dividing by 0).
alpha (float, default 1e-4): The scaling parameter.
beta (float, default 0.75): The exponent.
name (str, default None): A name for this operation.
Raises:
ValueError: If rank of the input tensor is not 4.
Returns:
A tensor variable storing the transformation result.
Examples:
.. code-block:: python
data = fluid.layers.data(name="data", shape=[3, 112, 112], dtype="float32")
lrn = fluid.layers.lrn(input=data)
"""
helper = LayerHelper('lrn', **locals())
dtype = helper.input_dtype()
input_shape = input.shape
dims = len(input_shape)
if dims != 4:
raise ValueError(
"dims of input must be 4(not %d), and it's order must be NCHW" %
(dims))
mid_out = helper.create_tmp_variable(dtype=dtype, stop_gradient=True)
lrn_out = helper.create_tmp_variable(dtype)
helper.append_op(
type="lrn",
inputs={"X": input},
outputs={
"Out": lrn_out,
"MidOut": mid_out,
},
attrs={"n": n,
"k": k,
"alpha": alpha,
"beta": beta})
return lrn_out
...@@ -98,7 +98,7 @@ def img_conv_group(input, ...@@ -98,7 +98,7 @@ def img_conv_group(input,
use_mkldnn=use_mkldnn) use_mkldnn=use_mkldnn)
if conv_with_batchnorm[i]: if conv_with_batchnorm[i]:
tmp = layers.batch_norm(input=tmp, act=conv_act) tmp = layers.batch_norm(input=tmp, act=conv_act, in_place=True)
drop_rate = conv_batchnorm_drop_rate[i] drop_rate = conv_batchnorm_drop_rate[i]
if abs(drop_rate) > 1e-5: if abs(drop_rate) > 1e-5:
tmp = layers.dropout(x=tmp, dropout_prob=drop_rate) tmp = layers.dropout(x=tmp, dropout_prob=drop_rate)
......
...@@ -173,16 +173,10 @@ class TestRoutineOp(unittest.TestCase): ...@@ -173,16 +173,10 @@ class TestRoutineOp(unittest.TestCase):
with while_op.block(): with while_op.block():
result2 = fill_constant( result2 = fill_constant(
shape=[1], dtype=core.VarDesc.VarType.INT32, value=0) shape=[1], dtype=core.VarDesc.VarType.INT32, value=0)
x_to_send_tmp = fill_constant(
shape=[1], dtype=core.VarDesc.VarType.INT32, value=0)
# TODO(abhinav): Need to perform copy when doing a channel send.
# Once this is complete, we can remove these lines
assign(input=x, output=x_to_send_tmp)
with fluid.Select() as select: with fluid.Select() as select:
with select.case(fluid.channel_send, channel, with select.case(
x_to_send_tmp): fluid.channel_send, channel, x, is_copy=True):
assign(input=x, output=x_tmp) assign(input=x, output=x_tmp)
assign(input=y, output=x) assign(input=y, output=x)
assign(elementwise_add(x=x_tmp, y=y), output=y) assign(elementwise_add(x=x_tmp, y=y), output=y)
...@@ -230,21 +224,12 @@ class TestRoutineOp(unittest.TestCase): ...@@ -230,21 +224,12 @@ class TestRoutineOp(unittest.TestCase):
core.VarDesc.VarType.LOD_TENSOR, core.VarDesc.VarType.LOD_TENSOR,
core.VarDesc.VarType.FP64) core.VarDesc.VarType.FP64)
pong_result = self._create_tensor('pong_return_value',
core.VarDesc.VarType.LOD_TENSOR,
core.VarDesc.VarType.FP64)
def ping(ch, message): def ping(ch, message):
message_to_send_tmp = fill_constant( fluid.channel_send(ch, message, is_copy=True)
shape=[1], dtype=core.VarDesc.VarType.FP64, value=0)
assign(input=message, output=message_to_send_tmp)
fluid.channel_send(ch, message_to_send_tmp)
def pong(ch1, ch2): def pong(ch1, ch2):
fluid.channel_recv(ch1, ping_result) fluid.channel_recv(ch1, ping_result)
assign(input=ping_result, output=pong_result) fluid.channel_send(ch2, ping_result, is_copy=True)
fluid.channel_send(ch2, pong_result)
pings = fluid.make_channel( pings = fluid.make_channel(
dtype=core.VarDesc.VarType.LOD_TENSOR, capacity=1) dtype=core.VarDesc.VarType.LOD_TENSOR, capacity=1)
......
...@@ -231,6 +231,13 @@ class TestBook(unittest.TestCase): ...@@ -231,6 +231,13 @@ class TestBook(unittest.TestCase):
self.assertIsNotNone(layers.softmax(hid)) self.assertIsNotNone(layers.softmax(hid))
print(str(program)) print(str(program))
def test_lrn(self):
program = Program()
with program_guard(program):
data = layers.data(name='data', shape=[6, 2, 2], dtype='float32')
self.assertIsNotNone(layers.lrn(data))
print(str(program))
def test_get_places(self): def test_get_places(self):
program = Program() program = Program()
with program_guard(program): with program_guard(program):
......
...@@ -97,5 +97,24 @@ class TestLRNMKLDNNOp(TestLRNOp): ...@@ -97,5 +97,24 @@ class TestLRNMKLDNNOp(TestLRNOp):
self.check_output(atol=0.002) self.check_output(atol=0.002)
class TestLRNMKLDNNOpWithIsTest(TestLRNMKLDNNOp):
def get_attrs(self):
attrs = TestLRNMKLDNNOp.get_attrs(self)
attrs['is_test'] = True
return attrs
def test_check_grad_normal(self):
def check_raise_is_test():
try:
self.check_grad(['X'], 'Out', max_relative_error=0.01)
except Exception as e:
t = \
"is_test attribute should be set to False in training phase."
if t in str(e):
raise AttributeError
self.assertRaises(AttributeError, check_raise_is_test)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -38,14 +38,15 @@ class TestRecvOp(unittest.TestCase): ...@@ -38,14 +38,15 @@ class TestRecvOp(unittest.TestCase):
def init_serv(self, place): def init_serv(self, place):
main = fluid.Program() main = fluid.Program()
with fluid.program_guard(main): with fluid.program_guard(main):
x = layers.data( serv = layers.ListenAndServ(
shape=[32, 32], "127.0.0.1:6174", ["X"], optimizer_mode=False)
dtype='float32',
name="X",
append_batch_size=False)
fluid.initializer.Constant(value=1.0)(x, main.global_block())
serv = layers.ListenAndServ("127.0.0.1:6174", optimizer_mode=False)
with serv.do(): with serv.do():
x = layers.data(
shape=[32, 32],
dtype='float32',
name="X",
append_batch_size=False)
fluid.initializer.Constant(value=1.0)(x, main.global_block())
o = layers.scale(x=x, scale=10.0) o = layers.scale(x=x, scale=10.0)
main.global_block().create_var( main.global_block().create_var(
name=o.name, psersistable=False, dtype=o.dtype, shape=o.shape) name=o.name, psersistable=False, dtype=o.dtype, shape=o.shape)
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
from op_test import OpTest
class TestSplitIdsOp(OpTest):
def setUp(self):
self.op_type = "split_ids"
ids = np.array([[0], [2], [2], [3], [5], [5], [6]]).astype('int64')
out0 = np.array([[0], [3], [6]]).astype('int64')
out1 = np.array([[]]).astype('int64')
out2 = np.array([[2], [2], [5], [5]]).astype('int64')
self.inputs = {'Ids': ids}
self.outputs = {'Out': [('out0', out0), ('out1', out1), ('out2', out2)]}
def test_check_output(self):
self.check_output()
if __name__ == '__main__':
unittest.main()
...@@ -77,7 +77,7 @@ class SoftmaxActivation(BaseActivation): ...@@ -77,7 +77,7 @@ class SoftmaxActivation(BaseActivation):
.. math:: .. math::
P(y=j|x) = \\frac{e^{x_j}} {\\sum^K_{k=1} e^{x_j} } P(y=j|x) = \\frac{e^{x_j}} {\\sum^K_{k=1} e^{x_k} }
""" """
def __init__(self): def __init__(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册