提交 62dc593e 编写于 作者: W wanghaox

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into hard_example

...@@ -186,6 +186,11 @@ function(cc_library TARGET_NAME) ...@@ -186,6 +186,11 @@ function(cc_library TARGET_NAME)
add_library(${TARGET_NAME} STATIC ${cc_library_SRCS}) add_library(${TARGET_NAME} STATIC ${cc_library_SRCS})
endif() endif()
if (cc_library_DEPS) if (cc_library_DEPS)
# Don't need link libwarpctc.so
if ("${cc_library_DEPS};" MATCHES "warpctc;")
list(REMOVE_ITEM cc_library_DEPS warpctc)
add_dependencies(${TARGET_NAME} warpctc)
endif()
add_dependencies(${TARGET_NAME} ${cc_library_DEPS}) add_dependencies(${TARGET_NAME} ${cc_library_DEPS})
target_link_libraries(${TARGET_NAME} ${cc_library_DEPS}) target_link_libraries(${TARGET_NAME} ${cc_library_DEPS})
endif() endif()
...@@ -465,10 +470,10 @@ function(py_test TARGET_NAME) ...@@ -465,10 +470,10 @@ function(py_test TARGET_NAME)
if(WITH_TESTING) if(WITH_TESTING)
set(options "") set(options "")
set(oneValueArgs "") set(oneValueArgs "")
set(multiValueArgs SRCS DEPS ARGS) set(multiValueArgs SRCS DEPS ARGS ENVS)
cmake_parse_arguments(py_test "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) cmake_parse_arguments(py_test "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
add_test(NAME ${TARGET_NAME} add_test(NAME ${TARGET_NAME}
COMMAND env PYTHONPATH=${PADDLE_PYTHON_BUILD_DIR}/lib-python COMMAND env PYTHONPATH=${PADDLE_PYTHON_BUILD_DIR}/lib-python ${py_test_ENVS}
${PYTHON_EXECUTABLE} -u ${py_test_SRCS} ${py_test_ARGS} ${PYTHON_EXECUTABLE} -u ${py_test_SRCS} ${py_test_ARGS}
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
endif() endif()
......
### Design Doc: Switch
### Background
Many programming languages provide `switch` as a generalization of `if-elif-else`. We want to add it to Fluid.
The following example shows the usage of `fluid.switch`.
```python
a = fluid.Var(10)
b = fluid.Var(0)
switch = fluid.switch()
with switch.block():
with switch.case(fluid.less_equal(a, 10)):
fluid.print("Case 1")
with switch.case(fluid.larger(a, 0)):
fluid.print("Case 2")
with switch.default():
fluid.print("Case 3")
```
### The Semantics
1. A `switch` control-flow checks cases one-by-one.
1. The condition of each case is a boolean value, which is a scalar, and differs from the `fluid.if_else` control-flow, which condition could be a vector of boolean values.
1. It runs the first matched case, or the default case if there is one.
1. Once it matches a case, it runs the corresponding branch and only that branch. It's like there is a C's `break` keyword at the end of each case.
The above program should print and print only "Case 1".
The implementation of the backward pass of the `switch` control-flow is easier than the backward of the `if_else`, because `switch` runs at most one branch, whereas `if-else` could run more than one branches.
...@@ -92,11 +92,11 @@ paddle.init( ...@@ -92,11 +92,11 @@ paddle.init(
参数说明 参数说明
- use_gpu: **可选,默认False**,是否启用GPU训练 - use_gpu: **可选,默认False**,是否启用GPU训练
- trainer_count:**必选,默认1**,当前训练任务trainer总个数 - trainer_count:**必选,默认1**,当前trainer的线程数目
- port:**必选,默认7164**,连接到pserver的端口 - port:**必选,默认7164**,连接到pserver的端口
- ports_num:**必选,默认1**,连接到pserver的端口个数 - ports_num:**必选,默认1**,连接到pserver的端口个数
- ports_num_for_sparse:**必选,默认0**,和pserver之间用于稀疏类型参数通信的端口个数 - ports_num_for_sparse:**必选,默认0**,和pserver之间用于稀疏类型参数通信的端口个数
- num_gradient_servers:**必选,默认1**,当前训练任务pserver总数 - num_gradient_servers:**必选,默认1**,当前训练任务trainer总数
- trainer_id:**必选,默认0**,每个trainer的唯一ID,从0开始的整数 - trainer_id:**必选,默认0**,每个trainer的唯一ID,从0开始的整数
- pservers:**必选,默认127.0.0.1**,当前训练任务启动的pserver的IP列表,多个IP使用“,”隔开 - pservers:**必选,默认127.0.0.1**,当前训练任务启动的pserver的IP列表,多个IP使用“,”隔开
......
...@@ -95,11 +95,11 @@ paddle.init( ...@@ -95,11 +95,11 @@ paddle.init(
Parameter Description Parameter Description
- use_gpu: **optional, default False**, set to "True" to enable GPU training. - use_gpu: **optional, default False**, set to "True" to enable GPU training.
- trainer_count: **required, default 1**, total count of trainers in the training job. - trainer_count: **required, default 1**, number of threads in current trainer.
- port: **required, default 7164**, port to connect to parameter server. - port: **required, default 7164**, port to connect to parameter server.
- ports_num: **required, default 1**, number of ports for communication. - ports_num: **required, default 1**, number of ports for communication.
- ports_num_for_sparse: **required, default 0**, number of ports for sparse type caculation. - ports_num_for_sparse: **required, default 0**, number of ports for sparse type caculation.
- num_gradient_servers: **required, default 1**, total number of gradient server. - num_gradient_servers: **required, default 1**, number of trainers in current job.
- trainer_id: **required, default 0**, ID for every trainer, start from 0. - trainer_id: **required, default 0**, ID for every trainer, start from 0.
- pservers: **required, default 127.0.0.1**, list of IPs of parameter servers, separated by ",". - pservers: **required, default 127.0.0.1**, list of IPs of parameter servers, separated by ",".
......
...@@ -29,16 +29,16 @@ TEST(Channel, MakeAndClose) { ...@@ -29,16 +29,16 @@ TEST(Channel, MakeAndClose) {
{ {
// MakeChannel should return a buffered channel is buffer_size > 0. // MakeChannel should return a buffered channel is buffer_size > 0.
auto ch = MakeChannel<int>(10); auto ch = MakeChannel<int>(10);
EXPECT_NE(dynamic_cast<Buffered<int>*>(ch), nullptr); EXPECT_NE(dynamic_cast<Buffered<int> *>(ch), nullptr);
EXPECT_EQ(dynamic_cast<UnBuffered<int>*>(ch), nullptr); EXPECT_EQ(dynamic_cast<UnBuffered<int> *>(ch), nullptr);
CloseChannel(ch); CloseChannel(ch);
delete ch; delete ch;
} }
{ {
// MakeChannel should return an un-buffered channel is buffer_size = 0. // MakeChannel should return an un-buffered channel is buffer_size = 0.
auto ch = MakeChannel<int>(0); auto ch = MakeChannel<int>(0);
EXPECT_EQ(dynamic_cast<Buffered<int>*>(ch), nullptr); EXPECT_EQ(dynamic_cast<Buffered<int> *>(ch), nullptr);
EXPECT_NE(dynamic_cast<UnBuffered<int>*>(ch), nullptr); EXPECT_NE(dynamic_cast<UnBuffered<int> *>(ch), nullptr);
CloseChannel(ch); CloseChannel(ch);
delete ch; delete ch;
} }
...@@ -78,3 +78,132 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) { ...@@ -78,3 +78,132 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) {
t.join(); t.join();
delete ch; delete ch;
} }
TEST(Channel, SimpleUnbufferedChannelTest) {
auto ch = MakeChannel<int>(0);
unsigned sum_send = 0;
std::thread t([&]() {
for (int i = 0; i < 5; i++) {
ch->Send(&i);
sum_send += i;
}
});
for (int i = 0; i < 5; i++) {
int recv;
ch->Receive(&recv);
EXPECT_EQ(recv, i);
}
CloseChannel(ch);
t.join();
EXPECT_EQ(sum_send, 10U);
delete ch;
}
// This tests that closing an unbuffered channel also unblocks
// unblocks any receivers waiting for senders
TEST(Channel, UnbufferedChannelCloseUnblocksReceiversTest) {
auto ch = MakeChannel<int>(0);
size_t num_threads = 5;
std::thread t[num_threads];
bool thread_ended[num_threads];
// Launches threads that try to read and are blocked becausew of no writers
for (size_t i = 0; i < num_threads; i++) {
thread_ended[i] = false;
t[i] = std::thread(
[&](bool *p) {
int data;
ch->Receive(&data);
*p = true;
},
&thread_ended[i]);
}
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
// Verify that all the threads are blocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], false);
}
// Explicitly close the thread
// This should unblock all receivers
CloseChannel(ch);
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
// Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true);
}
for (size_t i = 0; i < num_threads; i++) t[i].join();
delete ch;
}
// This tests that closing an unbuffered channel also unblocks
// unblocks any senders waiting for senders
TEST(Channel, UnbufferedChannelCloseUnblocksSendersTest) {
auto ch = MakeChannel<int>(0);
size_t num_threads = 5;
std::thread t[num_threads];
bool thread_ended[num_threads];
// Launches threads that try to read and are blocked becausew of no writers
for (size_t i = 0; i < num_threads; i++) {
thread_ended[i] = false;
t[i] = std::thread(
[&](bool *p) {
int data = 10;
ch->Send(&data);
*p = true;
},
&thread_ended[i]);
}
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
// Verify that all the threads are blocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], false);
}
// Explicitly close the thread
// This should unblock all receivers
CloseChannel(ch);
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
// Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true);
}
for (size_t i = 0; i < num_threads; i++) t[i].join();
delete ch;
}
TEST(Channel, UnbufferedLessReceiveMoreSendTest) {
auto ch = MakeChannel<int>(0);
unsigned sum_send = 0;
// Send should block after three iterations
// since we only have three receivers.
std::thread t([&]() {
// Try to send more number of times
// than receivers
for (int i = 0; i < 4; i++) {
ch->Send(&i);
sum_send += i;
}
});
for (int i = 0; i < 3; i++) {
int recv;
ch->Receive(&recv);
EXPECT_EQ(recv, i);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait 0.5 sec
EXPECT_EQ(sum_send, 3U);
CloseChannel(ch);
t.join();
delete ch;
}
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. /* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
...@@ -13,8 +13,8 @@ See the License for the specific language governing permissions and ...@@ -13,8 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#pragma once #pragma once
#include <atomic>
#include <condition_variable> #include <condition_variable>
#include <deque>
#include <mutex> #include <mutex>
#include "paddle/framework/channel.h" #include "paddle/framework/channel.h"
...@@ -36,20 +36,104 @@ class UnBuffered : public paddle::framework::Channel<T> { ...@@ -36,20 +36,104 @@ class UnBuffered : public paddle::framework::Channel<T> {
virtual ~UnBuffered(); virtual ~UnBuffered();
private: private:
UnBuffered() {} std::mutex mu_ch_;
// Mutex for readers and writers who are waiting for other reader
// and writer to complete execution
std::recursive_mutex mu_read_, mu_write_;
// reader_found_ is set true when a reader is ready to accept data
// writer_found_ is set true when a writer is ready to send data
// A transaction occurs only when both are true
std::atomic<bool> reader_found_{false}, writer_found_{false};
std::condition_variable cv_channel_;
std::condition_variable_any cv_reader_, cv_writer_;
T* item{nullptr};
std::atomic<bool> closed_{false};
UnBuffered() : closed_(false) {}
void NotifyAllParticipants(std::unique_lock<std::mutex>*);
}; };
// This function implements the concept of how data should
// be sent from a writer to a reader.
template <typename T>
void UnBuffered<T>::Send(T* data) {
// Prevent other writers from entering
std::unique_lock<std::recursive_mutex> writer_lock(mu_write_);
writer_found_ = true;
std::unique_lock<std::recursive_mutex> cv_lock(mu_write_);
// If writer comes first, it should wait till a reader arrives
cv_writer_.wait(cv_lock,
[this]() { return reader_found_ == true || closed_; });
cv_reader_.notify_one();
if (!closed_) {
std::unique_lock<std::mutex> channel_lock(mu_ch_);
item = data;
channel_lock.unlock();
cv_channel_.notify_one();
channel_lock.lock();
cv_channel_.wait(channel_lock,
[this]() { return item == nullptr || closed_; });
}
writer_found_ = false;
}
// This function implements the concept of how
// data that was sent by a writer is read from a reader.
template <typename T> template <typename T>
void UnBuffered<T>::Send(T* channel_element) {} void UnBuffered<T>::Receive(T* data) {
// Prevent other readers from entering
std::unique_lock<std::recursive_mutex> read_lock{mu_read_};
reader_found_ = true;
std::unique_lock<std::recursive_mutex> cv_lock{mu_read_};
// If reader comes first, it should wait till a writer arrives
cv_reader_.wait(cv_lock,
[this]() { return writer_found_ == true || closed_; });
cv_writer_.notify_one();
if (!closed_) {
std::unique_lock<std::mutex> lock_ch{mu_ch_};
// Reader should wait for the writer to first write its data
cv_channel_.wait(lock_ch, [this]() { return item != nullptr || closed_; });
if (!closed_) {
*data = std::move(*item);
item = nullptr;
lock_ch.unlock();
}
cv_channel_.notify_one();
}
reader_found_ = false;
}
// This function implements the sequence of events
// that take place once the channel is closed.
template <typename T> template <typename T>
void UnBuffered<T>::Receive(T*) {} void UnBuffered<T>::Close() {
std::unique_lock<std::mutex> lock(mu_ch_);
item = nullptr;
closed_ = true;
NotifyAllParticipants(&lock);
}
// This function implements the sequence of events
// that are executed once the object of an UnBuffered
// channel is destroyed.
template <typename T> template <typename T>
void UnBuffered<T>::Close() {} UnBuffered<T>::~UnBuffered() {
std::unique_lock<std::mutex> lock(mu_ch_);
item = nullptr;
closed_ = true;
NotifyAllParticipants(&lock);
}
// This function notifies all the readers, writers and
// the channel condition variables.
template <typename T> template <typename T>
UnBuffered<T>::~UnBuffered() {} void UnBuffered<T>::NotifyAllParticipants(std::unique_lock<std::mutex>* lock) {
lock->unlock();
cv_writer_.notify_all();
cv_channel_.notify_all();
cv_reader_.notify_all();
}
} // namespace details } // namespace details
} // namespace framework } // namespace framework
......
...@@ -34,18 +34,6 @@ namespace framework { ...@@ -34,18 +34,6 @@ namespace framework {
template <typename T> template <typename T>
class Vector : public std::vector<T> { class Vector : public std::vector<T> {
public:
/* NOTE(dzhwinter):
* Data always store and modified on Host.
* If the data is modified when use cuda_data interface,
* You need to call the CopyFromCUDA explicitly to synchronize data.
*
*/
enum class kDataPosition {
kDataOnHost = 0,
kDataOnDevice = 1,
};
public: public:
using std::vector<T>::vector; using std::vector<T>::vector;
...@@ -55,11 +43,12 @@ class Vector : public std::vector<T> { ...@@ -55,11 +43,12 @@ class Vector : public std::vector<T> {
virtual ~Vector() { virtual ~Vector() {
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
if (cuda_ptr_ != nullptr) { if (cuda_ptr_ != nullptr) {
memory::Free<platform::CUDAPlace>(place_, static_cast<void *>(cuda_ptr_)); memory::Free<platform::CUDAPlace>(place_, cuda_ptr_);
} }
#endif #endif
} }
/* Get device vector */
T *cuda_data() { T *cuda_data() {
CopyToCUDA(); CopyToCUDA();
PADDLE_ENFORCE_NOT_NULL( PADDLE_ENFORCE_NOT_NULL(
...@@ -67,81 +56,73 @@ class Vector : public std::vector<T> { ...@@ -67,81 +56,73 @@ class Vector : public std::vector<T> {
return static_cast<T *>(cuda_ptr_); return static_cast<T *>(cuda_ptr_);
} }
/* Get host vector */
T *data() { return std::vector<T>::data(); } T *data() { return std::vector<T>::data(); }
const T *data() const { return std::vector<T>::data(); } const T *data() const { return std::vector<T>::data(); }
/* Synchronize host vector to device vector */
void CopyToCUDA(); void CopyToCUDA();
/* Synchronize device vector to host vector */
void CopyFromCUDA(); void CopyFromCUDA();
/* Switch device vector location */
void CopyToPeer(platform::Place); void CopyToPeer(platform::Place);
private: private:
void *cuda_ptr_ = nullptr; void *cuda_ptr_ = nullptr;
size_t cuda_size_ = 0; size_t cuda_size_ = 0; // device vector numel
/*The DataPosition is unused now,
if we want support random access from cpu and cuda,
we need to overload all the vector method */
kDataPosition position_ = kDataPosition::kDataOnHost;
platform::CUDAPlace place_; platform::CUDAPlace place_;
}; };
template <typename T> template <typename T>
void Vector<T>::CopyToCUDA() { void Vector<T>::CopyToCUDA() {
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
if (cuda_ptr_ == nullptr) { if (cuda_size_ < this->size()) {
if (cuda_ptr_ != nullptr) {
memory::Free<platform::CUDAPlace>(place_, cuda_ptr_);
}
cuda_ptr_ = cuda_ptr_ =
memory::Alloc<platform::CUDAPlace>(place_, this->size() * sizeof(T)); memory::Alloc<platform::CUDAPlace>(place_, this->size() * sizeof(T));
} }
cuda_size_ = this->size();
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto *cuda_ctx = pool.GetByPlace(place_); auto *ctx = pool.GetByPlace(place_);
memory::Copy(place_, cuda_ptr_, platform::CPUPlace(),
memory::Copy(place_, static_cast<void *>(cuda_ptr_), platform::CPUPlace(),
static_cast<const void *>(this->data()), static_cast<const void *>(this->data()),
this->size() * sizeof(T), cuda_ctx->stream()); this->size() * sizeof(T), ctx->stream());
cuda_ctx->Wait(); ctx->Wait();
cuda_size_ = this->size();
#endif #endif
} }
template <typename T> template <typename T>
void Vector<T>::CopyFromCUDA() { void Vector<T>::CopyFromCUDA() {
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto *cuda_ctx = pool.GetByPlace(place_);
if (cuda_ptr_ == nullptr) { if (cuda_ptr_ == nullptr) {
LOG(WARNING) << "No uncommited cuda data."; LOG(WARNING) << "No uncommitted cuda data.";
return; return;
} }
this->resize(cuda_size_); this->resize(cuda_size_);
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto *ctx = pool.GetByPlace(place_);
memory::Copy(platform::CPUPlace(), static_cast<void *>(this->data()), place_, memory::Copy(platform::CPUPlace(), static_cast<void *>(this->data()), place_,
static_cast<const void *>(cuda_ptr_), this->size() * sizeof(T), static_cast<const void *>(cuda_ptr_), this->size() * sizeof(T),
cuda_ctx->stream()); ctx->stream());
cuda_ctx->Wait(); ctx->Wait();
#endif #endif
} }
template <typename T> template <typename T>
void Vector<T>::CopyToPeer(platform::Place peer_place) { void Vector<T>::CopyToPeer(platform::Place peer_place) {
if (platform::is_cpu_place(peer_place)) {
return;
}
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
auto *cuda_ctx = platform::DeviceContextPool::Instance().GetByPlace(place_); auto *ctx = platform::DeviceContextPool::Instance().GetByPlace(place_);
void *peer_cuda_ptr_ = memory::Alloc<platform::CUDAPlace>( void *peer_cuda_ptr = memory::Alloc<platform::CUDAPlace>(
boost::get<platform::CUDAPlace>(peer_place), this->size() * sizeof(T)); boost::get<platform::CUDAPlace>(peer_place), this->size() * sizeof(T));
memory::Copy(boost::get<platform::CUDAPlace>(peer_place), memory::Copy(boost::get<platform::CUDAPlace>(peer_place), peer_cuda_ptr,
static_cast<void *>(peer_cuda_ptr_), place_, place_, cuda_ptr_, this->size() * sizeof(T), ctx->stream());
static_cast<const void *>(cuda_ptr_), this->size() * sizeof(T), ctx->Wait();
cuda_ctx->stream());
cuda_ctx->Wait(); memory::Free<platform::CUDAPlace>(place_, cuda_ptr_);
memory::Free<platform::CUDAPlace>(place_, static_cast<void *>(cuda_ptr_));
place_ = boost::get<platform::CUDAPlace>(peer_place); place_ = boost::get<platform::CUDAPlace>(peer_place);
cuda_ptr_ = peer_cuda_ptr_; cuda_ptr_ = peer_cuda_ptr;
#endif #endif
} }
......
...@@ -178,19 +178,22 @@ public: ...@@ -178,19 +178,22 @@ public:
real* inputData = inputs[0].data<real>(); real* inputData = inputs[0].data<real>();
real* filterData = inputs[1].data<real>(); real* filterData = inputs[1].data<real>();
real* outputData = outputs[0].data<real>(); real* outputData = outputs[0].data<real>();
real* colData = NULL;
bool needIm2col = isNeedIm2col(filter); bool needIm2col = isNeedIm2col(filter);
TensorShape imShape = TensorShape imShape =
TensorShape({inputChannels / groups_, inputHeight, inputWidth}); TensorShape({inputChannels / groups_, inputHeight, inputWidth});
TensorShape colShape; TensorShape colShape;
real* colData = NULL;
size_t colHeight = inputChannels / groups_ * filterHeight * filterWidth; // Max col matrix width 4096, Max col matrix size 4M.
size_t colWidth = outputHeight * outputWidth; size_t outputHeightSteps =
// Max col matrix height 256, Max col matrix width 1024 std::min(std::max(4096 / outputWidth, (size_t)1), outputHeight);
size_t stepColHeight = std::min(colHeight, static_cast<size_t>(256)); size_t maxColWidth = outputHeightSteps * outputWidth;
size_t stepColWidth = std::min(colWidth, static_cast<size_t>(2048)); size_t channelSteps =
std::min(std::max((1048576 / maxColWidth) / filterHeight * filterWidth,
(size_t)1),
inputChannels / groups_);
size_t maxColHeight = channelSteps * filterHeight * filterWidth;
if (needIm2col) { if (needIm2col) {
colShape = TensorShape({inputChannels / groups_, colShape = TensorShape({inputChannels / groups_,
...@@ -199,7 +202,7 @@ public: ...@@ -199,7 +202,7 @@ public:
outputHeight, outputHeight,
outputWidth}); outputWidth});
resizeBuffer<Device>(stepColHeight * stepColWidth * sizeof(real)); resizeBuffer<Device>(maxColHeight * maxColWidth * sizeof(real));
colData = reinterpret_cast<real*>(memory_->getBuf()); colData = reinterpret_cast<real*>(memory_->getBuf());
} }
...@@ -209,20 +212,24 @@ public: ...@@ -209,20 +212,24 @@ public:
(outputChannels / groups_) * outputHeight * outputWidth; (outputChannels / groups_) * outputHeight * outputWidth;
size_t filterOffset = filter.getElements() / groups_; size_t filterOffset = filter.getElements() / groups_;
int nStride = colWidth; int nStride = outputHeight * outputWidth;
int kStride = colHeight; int kStride = inputChannels / groups_ * filterHeight * filterWidth;
for (size_t i = 0; i < batchSize; i++) { for (size_t i = 0; i < batchSize; i++) {
filterData = inputs[1].data<real>();
for (size_t g = 0; g < groups_; g++) { for (size_t g = 0; g < groups_; g++) {
if (needIm2col) { if (needIm2col) {
real beta_ = beta; real beta_ = beta;
for (size_t colHeightStart = 0; colHeightStart < colHeight; for (size_t ic = 0; ic < inputChannels / groups_;
colHeightStart += stepColHeight) { ic += channelSteps) {
for (size_t colWidthStart = 0; colWidthStart < colWidth; int channels = std::min(inputChannels / groups_ - ic, channelSteps);
colWidthStart += stepColWidth) { for (size_t oh = 0; oh < outputHeight; oh += outputHeightSteps) {
int N = std::min(colWidth - colWidthStart, stepColWidth); int height = std::min(outputHeight - oh, outputHeightSteps);
int K = std::min(colHeight - colHeightStart, stepColHeight);
int M = outputChannels / groups_;
int N = height * outputWidth;
int K = channels * filterHeight * filterWidth;
// im2col // im2col
im2col(inputData + g * inputOffset, im2col(inputData,
imShape, imShape,
colData, colData,
colShape, colShape,
...@@ -232,13 +239,12 @@ public: ...@@ -232,13 +239,12 @@ public:
paddingW(), paddingW(),
dilationH(), dilationH(),
dilationW(), dilationW(),
colHeightStart, channels,
K, oh,
colWidthStart, height,
N); N);
// gemm // gemm
int M = outputChannels / groups_;
BlasGemm<Device, real>::compute( BlasGemm<Device, real>::compute(
false, false,
false, false,
...@@ -246,12 +252,12 @@ public: ...@@ -246,12 +252,12 @@ public:
N, N,
K, K,
1.0f, 1.0f,
filterData + g * filterOffset + colHeightStart, filterData + ic * filterHeight * filterWidth,
kStride, kStride,
colData, colData,
N, N,
beta_, beta_,
outputData + g * outputOffset + colWidthStart, outputData + oh * outputWidth,
nStride); nStride);
} }
beta_ = 1.0; beta_ = 1.0;
...@@ -266,17 +272,18 @@ public: ...@@ -266,17 +272,18 @@ public:
N, N,
K, K,
1.0f, 1.0f,
filterData + g * filterOffset, filterData,
K, K,
inputData + g * inputOffset, inputData,
N, N,
beta, beta,
outputData + g * outputOffset, outputData,
N); N);
} }
inputData += inputOffset;
outputData += outputOffset;
filterData += filterOffset;
} }
inputData += inputChannels * inputHeight * inputWidth;
outputData += outputChannels * outputHeight * outputWidth;
} }
memory_.reset(); memory_.reset();
......
...@@ -111,39 +111,42 @@ public: ...@@ -111,39 +111,42 @@ public:
int paddingWidth, int paddingWidth,
int dilationHeight, int dilationHeight,
int dilationWidth, int dilationWidth,
int colHeightStart, int inputChannels,
int colHeightSize, int colOffset,
int colWidthStart, int colOutputHeight,
int colWidthSize) { int colWidth) {
int inputHeight = imShape[1]; int inputHeight = imShape[1];
int inputWidth = imShape[2]; int inputWidth = imShape[2];
int filterHeight = colShape[1]; int filterHeight = colShape[1];
int filterWidth = colShape[2]; int filterWidth = colShape[2];
int outputWidth = colShape[4]; int outputWidth = colShape[4];
for (int colh = 0; colh < colHeightSize; colh++) { for (int ic = 0; ic < inputChannels; ic++) {
int wOffset = (colHeightStart + colh) % filterWidth; for (int oh = 0; oh < colOutputHeight; oh++) {
int hOffset = ((colHeightStart + colh) / filterWidth) % filterHeight; T* dstData = colData + oh * outputWidth;
int c_im = (colHeightStart + colh) / filterWidth / filterHeight; for (int fh = 0; fh < filterHeight; fh++) {
for (int fw = 0; fw < filterWidth; fw++) {
for (int colw = 0; colw < colWidthSize; colw++) { int imRowIdx = (oh + colOffset) * strideHeight +
int h = (colWidthStart + colw) / outputWidth; fh * dilationHeight - paddingHeight;
int w = (colWidthStart + colw) % outputWidth; if (imRowIdx < 0 || imRowIdx >= inputHeight) {
memset(dstData, 0, outputWidth * sizeof(T));
int imRowIdx = h * strideHeight + hOffset * dilationHeight; } else {
int imColIdx = w * strideWidth + wOffset * dilationWidth; for (int ow = 0; ow < outputWidth; ow++) {
if ((imRowIdx - paddingHeight) < 0 || int imColIdx =
(imRowIdx - paddingHeight) >= inputHeight || ow * strideWidth + fw * dilationWidth - paddingWidth;
(imColIdx - paddingWidth) < 0 || if (imColIdx < 0 || imColIdx >= inputWidth) {
(imColIdx - paddingWidth) >= inputWidth) { dstData[ow] = T(0);
colData[colh * colWidthSize + colw] = static_cast<T>(0); } else {
} else { dstData[ow] = imData[imRowIdx * inputWidth + imColIdx];
imRowIdx += c_im * inputHeight - paddingHeight; }
imColIdx -= paddingWidth; }
colData[colh * colWidthSize + colw] = }
imData[imRowIdx * inputWidth + imColIdx]; dstData += colWidth;
}
} }
} }
colData += filterHeight * filterWidth * colWidth;
imData += inputHeight * inputWidth;
} }
} }
}; };
......
...@@ -202,10 +202,10 @@ void TestIm2ColMobileFunctor() { ...@@ -202,10 +202,10 @@ void TestIm2ColMobileFunctor() {
padding, padding,
dilation, dilation,
dilation, dilation,
channels,
0, 0,
height, outputHeight,
0, outputHeight * outputWidth);
width);
autotest::TensorCheckEqual(*output1, *output2); autotest::TensorCheckEqual(*output1, *output2);
} }
......
...@@ -4,4 +4,4 @@ cc_test(test_inference_recognize_digits_mlp ...@@ -4,4 +4,4 @@ cc_test(test_inference_recognize_digits_mlp
DEPS ARCHIVE_START paddle_fluid ARCHIVE_END DEPS ARCHIVE_START paddle_fluid ARCHIVE_END
ARGS --dirname=${PYTHON_TESTS_DIR}/book/recognize_digits_mlp.inference.model) ARGS --dirname=${PYTHON_TESTS_DIR}/book/recognize_digits_mlp.inference.model)
set_tests_properties(test_inference_recognize_digits_mlp set_tests_properties(test_inference_recognize_digits_mlp
PROPERTIES DEPENDS test_recognize_digits_mlp_cpu) PROPERTIES DEPENDS test_recognize_digits)
...@@ -2015,13 +2015,6 @@ void CpuMatrix::maxPoolForward(Matrix& inputMat, ...@@ -2015,13 +2015,6 @@ void CpuMatrix::maxPoolForward(Matrix& inputMat,
CHECK_EQ(channels * outLength, maskMatP->getWidth()); CHECK_EQ(channels * outLength, maskMatP->getWidth());
} }
/* initialize the data_ */
for (size_t i = 0; i < height_; i++) {
for (size_t j = 0; j < width_; j++) {
outData[i * outStride + j] = -(real)FLT_MAX;
}
}
/* pool max one by one */ /* pool max one by one */
for (size_t n = 0; n < num; ++n) { // frame by frame for (size_t n = 0; n < num; ++n) { // frame by frame
if (!isContiguous()) { if (!isContiguous()) {
...@@ -2030,19 +2023,24 @@ void CpuMatrix::maxPoolForward(Matrix& inputMat, ...@@ -2030,19 +2023,24 @@ void CpuMatrix::maxPoolForward(Matrix& inputMat,
for (size_t c = 0; c < channels; ++c) { // channel by channel for (size_t c = 0; c < channels; ++c) { // channel by channel
for (size_t ph = 0; ph < outputH; ++ph) { for (size_t ph = 0; ph < outputH; ++ph) {
int hstart = ph * strideH - paddingH; int hstart = ph * strideH - paddingH;
int hend = std::min(hstart + sizeY, imgSizeH); int hend = hstart + sizeY;
hstart = std::max(hstart, 0); hstart = hstart < 0 ? 0 : hstart;
hend = hend < (int)imgSizeH ? hend : (int)imgSizeH;
for (size_t pw = 0; pw < outputW; ++pw) { for (size_t pw = 0; pw < outputW; ++pw) {
int wstart = pw * strideW - paddingW; int wstart = pw * strideW - paddingW;
int wend = std::min(wstart + sizeX, imgSizeW); int wend = wstart + sizeX;
wstart = std::max(wstart, 0); wstart = wstart < 0 ? 0 : wstart;
wend = wend < (int)imgSizeW ? wend : (int)imgSizeW;
if (maskData == NULL) { if (maskData == NULL) {
real tmp = -(real)FLT_MAX;
for (int h = hstart; h < hend; ++h) { for (int h = hstart; h < hend; ++h) {
for (int w = wstart; w < wend; ++w) { for (int w = wstart; w < wend; ++w) {
outData[ph * outputW + pw] = std::max( tmp = tmp < inputData[h * imgSizeW + w]
outData[ph * outputW + pw], inputData[h * imgSizeW + w]); ? inputData[h * imgSizeW + w]
: tmp;
} }
} }
outData[ph * outputW + pw] = tmp;
} else { } else {
for (int h = hstart; h < hend; ++h) { for (int h = hstart; h < hend; ++h) {
for (int w = wstart; w < wend; ++w) { for (int w = wstart; w < wend; ++w) {
......
...@@ -122,9 +122,11 @@ if(WITH_DISTRIBUTE) ...@@ -122,9 +122,11 @@ if(WITH_DISTRIBUTE)
set_source_files_properties(send_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(send_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
op_library(recv_op DEPS ${DISTRIBUTE_DEPS}) op_library(recv_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(recv_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(recv_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS send_op recv_op sum_op executor) op_library(listen_and_serv_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(listen_and_serv_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS send_op listen_and_serv_op sum_op executor)
else() else()
set(DEPS_OPS ${DEPS_OPS} send_op recv_op) set(DEPS_OPS ${DEPS_OPS} send_op recv_op listen_and_serv_op)
endif() endif()
op_library(cond_op DEPS framework_proto tensor net_op) op_library(cond_op DEPS framework_proto tensor net_op)
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <stdint.h>
#include <sys/stat.h>
#include <ostream>
#include <thread>
#include <unistd.h>
#include "paddle/framework/executor.h"
#include "paddle/framework/framework.pb.h"
#include "paddle/framework/lod_tensor.h"
#include "paddle/framework/op_registry.h"
#include "paddle/framework/proto_desc.h"
#include "paddle/operators/detail/grpc_server.h"
#include "paddle/operators/detail/sendrecvop_utils.h"
#include "paddle/operators/detail/simple_block_queue.h"
#include "paddle/string/printf.h"
namespace paddle {
namespace operators {
constexpr char kOptimizeBlock[] = "OptimizeBlock";
void RunServer(std::shared_ptr<detail::AsyncGRPCServer> service) {
service->RunSyncUpdate();
VLOG(4) << "RunServer thread end";
}
static void CreateTensorFromMessageType(framework::Variable *var,
sendrecv::VarType var_type) {
if (var_type == sendrecv::VarType::LOD_TENSOR) {
var->GetMutable<framework::LoDTensor>();
} else if (var_type == sendrecv::VarType::SELECTED_ROWS) {
var->GetMutable<framework::SelectedRows>();
} else {
PADDLE_THROW(
"VariableMessage type %d is not in "
"[LoDTensor, SelectedRows]",
var_type);
}
}
class ListenAndServOp : public framework::OperatorBase {
public:
ListenAndServOp(const std::string &type,
const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs)
: OperatorBase(type, inputs, outputs, attrs) {
if (!rpc_service_) {
std::string endpoint = Attr<std::string>("endpoint");
rpc_service_.reset(new detail::AsyncGRPCServer(endpoint));
server_thread_.reset(new std::thread(RunServer, rpc_service_));
}
}
void Stop() override {
detail::MessageWithName term_msg;
term_msg.first = LISTEN_TERMINATE_MESSAGE;
rpc_service_->Push(term_msg);
rpc_service_->ShutDown();
server_thread_->join();
}
std::string GetGradVarNameForTrainer(const std::string &varname) const {
if (grads_counter_.find(varname) == grads_counter_.end()) {
grads_counter_[varname] = 0;
}
return string::Sprintf("%s.trainer_%d", varname, grads_counter_[varname]++);
}
void Run(const framework::Scope &scope,
const platform::Place &dev_place) const override {
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &dev_ctx = *pool.Get(dev_place);
framework::Scope &recv_scope = scope.NewScope();
// FIXME(Yancey1989): initialize rpc server with lazy mode.
rpc_service_->SetScope(&recv_scope);
rpc_service_->SetDevCtx(&dev_ctx);
auto param_list = Attr<std::vector<std::string>>("ParamList");
auto grad_list = Attr<std::vector<std::string>>("GradList");
auto fan_in = Attr<int>("Fanin");
auto *block = Attr<framework::BlockDesc *>(kOptimizeBlock);
auto *program = block->Program();
framework::Executor executor(dev_place);
// TODO(typhoonzero): change this to a while_op for every cluster-batch.
bool exit_flag = false;
while (!exit_flag) {
// Get from multiple trainers, we don't care about the order in which
// the gradients arrives, just add suffix 0~n and merge the gradient.
rpc_service_->SetCond(0);
size_t recv_var_cnt = 0;
int batch_barrier = 0;
while (batch_barrier != fan_in) {
const detail::MessageWithName &v = rpc_service_->Get();
auto grad_var_name = v.first;
if (grad_var_name == LISTEN_TERMINATE_MESSAGE) {
LOG(INFO) << "received terminate message and exit";
exit_flag = true;
break;
} else if (grad_var_name == BATCH_BARRIER_MESSAGE) {
VLOG(3) << "recv batch barrier message";
batch_barrier++;
continue;
} else {
// receive a variable
recv_var_cnt++;
auto it =
std::find(grad_list.begin(), grad_list.end(), grad_var_name);
std::string param_var_name;
if (it != grad_list.end()) {
param_var_name = param_list[it - grad_list.begin()];
} else {
LOG(ERROR) << "grad has no paired param:" << grad_var_name;
}
VLOG(3) << "received grad: " << grad_var_name
<< " updating param: " << param_var_name;
if (fan_in > 1) {
grad_var_name = this->GetGradVarNameForTrainer(grad_var_name);
}
auto *var = recv_scope.FindVar(grad_var_name);
if (var == nullptr) {
LOG(ERROR) << "Can not find server side var: " << grad_var_name;
PADDLE_THROW("Can not find server side var");
}
detail::DeserializeFromMessage(v.second, dev_ctx, var);
}
}
VLOG(3) << "recv " << recv_var_cnt << " parmeters for one barrier.";
// TODO(Yancey1989): merge SelectedRows variables here
if (exit_flag) {
rpc_service_->ShutDown();
}
try {
executor.Run(*program, &recv_scope, block->ID(), /*global_block*/
false /*create_local_scope*/, false /*create_vars*/);
} catch (std::exception &e) {
LOG(ERROR) << "run sub program error " << e.what();
}
rpc_service_->SetCond(1);
rpc_service_->WaitClientGet(recv_var_cnt);
grads_counter_.clear();
} // while(true)
}
protected:
std::shared_ptr<detail::AsyncGRPCServer> rpc_service_;
std::shared_ptr<std::thread> server_thread_;
mutable std::unordered_map<std::string, int> grads_counter_;
};
class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker {
public:
ListenAndServOpMaker(OpProto *proto, OpAttrChecker *op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddComment(R"DOC(
ListenAndServ operator
This operator will start a RPC server which can receive variables
from send_op and send back variables to recv_op.
)DOC");
AddAttr<std::string>("endpoint",
"(string, default 127.0.0.1:6164)"
"IP address to listen on.")
.SetDefault("127.0.0.1:6164")
.AddCustomChecker([](const std::string &ip) { return !ip.empty(); });
AddAttr<framework::BlockDesc *>(kOptimizeBlock,
"BlockID to run on server side.");
AddAttr<std::vector<std::string>>(
"ParamList", "type list of string",
"grad->param name mapping to find which parameters to optimize.")
.SetDefault({});
AddAttr<std::vector<std::string>>(
"GradList", "type list of string",
"grad->param name mapping to find which parameters to optimize.")
.SetDefault({});
AddAttr<int>("Fanin", "type int",
"Number of trainers in the current cluster job")
.SetDefault(1);
}
};
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OPERATOR(listen_and_serv, ops::ListenAndServOp,
ops::ListenAndServOpMaker);
...@@ -12,187 +12,60 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ...@@ -12,187 +12,60 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include <stdint.h>
#include <sys/stat.h>
#include <ostream> #include <ostream>
#include <thread>
#include <unistd.h> #include "paddle/framework/data_type.h"
#include "paddle/framework/executor.h"
#include "paddle/framework/framework.pb.h" #include "paddle/framework/framework.pb.h"
#include "paddle/framework/lod_tensor.h" #include "paddle/framework/lod_tensor.h"
#include "paddle/framework/op_registry.h" #include "paddle/framework/op_registry.h"
#include "paddle/framework/proto_desc.h"
#include "paddle/operators/detail/grpc_server.h" #include <future>
#include "paddle/operators/detail/sendrecvop_utils.h" #include "paddle/operators/detail/grpc_client.h"
#include "paddle/operators/detail/simple_block_queue.h"
#include "paddle/string/printf.h"
namespace paddle { namespace paddle {
namespace operators { namespace operators {
constexpr char kOptimizeBlock[] = "OptimizeBlock";
void RunServer(std::shared_ptr<detail::AsyncGRPCServer> service) {
service->RunSyncUpdate();
VLOG(4) << "RunServer thread end";
}
static void CreateTensorFromMessageType(framework::Variable *var,
sendrecv::VarType var_type) {
if (var_type == sendrecv::VarType::LOD_TENSOR) {
var->GetMutable<framework::LoDTensor>();
} else if (var_type == sendrecv::VarType::SELECTED_ROWS) {
var->GetMutable<framework::SelectedRows>();
} else {
PADDLE_THROW(
"VariableMessage type %d is not in "
"[LoDTensor, SelectedRows]",
var_type);
}
}
class RecvOp : public framework::OperatorBase { class RecvOp : public framework::OperatorBase {
public: public:
RecvOp(const std::string &type, const framework::VariableNameMap &inputs, RecvOp(const std::string& type, const framework::VariableNameMap& inputs,
const framework::VariableNameMap &outputs, const framework::VariableNameMap& outputs,
const framework::AttributeMap &attrs) const framework::AttributeMap& attrs)
: OperatorBase(type, inputs, outputs, attrs) { : OperatorBase(type, inputs, outputs, attrs) {}
if (!rpc_service_) {
std::string endpoint = Attr<std::string>("endpoint"); void Run(const framework::Scope& scope,
rpc_service_.reset(new detail::AsyncGRPCServer(endpoint)); const platform::Place& place) const override {
server_thread_.reset(new std::thread(RunServer, rpc_service_)); auto outs = Outputs("Out");
} std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap");
}
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
void Stop() override { auto& ctx = *pool.Get(place);
detail::MessageWithName term_msg;
term_msg.first = LISTEN_TERMINATE_MESSAGE; for (size_t i = 0; i < outs.size(); i++) {
rpc_service_->Push(term_msg); VLOG(3) << "getting " << outs[i];
rpc_service_->ShutDown(); client_.AsyncGetVariable(epmap[i], ctx, scope, outs[i]);
server_thread_->join();
}
std::string GetGradVarNameForTrainer(const std::string &varname) const {
if (grads_counter_.find(varname) == grads_counter_.end()) {
grads_counter_[varname] = 0;
} }
return string::Sprintf("%s.trainer_%d", varname, grads_counter_[varname]++); PADDLE_ENFORCE(client_.Wait());
} }
void Run(const framework::Scope &scope, private:
const platform::Place &dev_place) const override { mutable detail::RPCClient client_;
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &dev_ctx = *pool.Get(dev_place);
framework::Scope &recv_scope = scope.NewScope();
// FIXME(Yancey1989): initialize rpc server with laze mode.
rpc_service_->SetScope(&recv_scope);
rpc_service_->SetDevCtx(&dev_ctx);
auto param_list = Attr<std::vector<std::string>>("ParamList");
auto grad_list = Attr<std::vector<std::string>>("GradList");
auto fan_in = Attr<int>("Fanin");
auto *block = Attr<framework::BlockDesc *>(kOptimizeBlock);
auto *program = block->Program();
framework::Executor executor(dev_place);
// TODO(typhoonzero): change this to a while_op for every cluster-batch.
bool exit_flag = false;
while (!exit_flag) {
// Get from multiple trainers, we don't care about the order in which
// the gradients arrives, just add suffix 0~n and merge the gradient.
rpc_service_->SetCond(0);
size_t recv_var_cnt = 0;
int batch_barrier = 0;
while (batch_barrier != fan_in) {
const detail::MessageWithName &v = rpc_service_->Get();
auto grad_var_name = v.first;
if (grad_var_name == LISTEN_TERMINATE_MESSAGE) {
LOG(INFO) << "received terminate message and exit";
exit_flag = true;
break;
} else if (grad_var_name == BATCH_BARRIER_MESSAGE) {
VLOG(3) << "recv batch barrier message";
batch_barrier++;
continue;
} else {
// receive a variable
recv_var_cnt++;
auto it =
std::find(grad_list.begin(), grad_list.end(), grad_var_name);
std::string param_var_name;
if (it != grad_list.end()) {
param_var_name = param_list[it - grad_list.begin()];
} else {
LOG(ERROR) << "grad has no paired param:" << grad_var_name;
}
VLOG(3) << "received grad: " << grad_var_name
<< " updating param: " << param_var_name;
if (fan_in > 1) {
grad_var_name = this->GetGradVarNameForTrainer(grad_var_name);
}
auto *var = recv_scope.FindVar(grad_var_name);
if (var == nullptr) {
LOG(ERROR) << "Can not find server side var: " << grad_var_name;
PADDLE_THROW("Can not find server side var");
}
detail::DeserializeFromMessage(v.second, dev_ctx, var);
}
}
VLOG(3) << "recv " << recv_var_cnt << " parmeters for one barrier.";
// TODO(Yancey1989): merge SelectedRows variables here
if (exit_flag) {
break;
}
try {
executor.Run(*program, &recv_scope, block->ID(), /*global_block*/
false /*create_local_scope*/, false /*create_vars*/);
} catch (std::exception &e) {
LOG(ERROR) << "run sub program error " << e.what();
}
rpc_service_->SetCond(1);
rpc_service_->WaitClientGet(recv_var_cnt);
grads_counter_.clear();
} // while(true)
}
protected:
std::shared_ptr<detail::AsyncGRPCServer> rpc_service_;
std::shared_ptr<std::thread> server_thread_;
mutable std::unordered_map<std::string, int> grads_counter_;
}; };
class RecvOpMaker : public framework::OpProtoAndCheckerMaker { class RecvOpMaker : public framework::OpProtoAndCheckerMaker {
public: public:
RecvOpMaker(OpProto *proto, OpAttrChecker *op_checker) RecvOpMaker(OpProto* proto, OpAttrChecker* op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) { : OpProtoAndCheckerMaker(proto, op_checker) {
AddOutput("Out", "(Tensor) Variables to get from server.").AsDuplicable();
AddComment(R"DOC( AddComment(R"DOC(
Recv operator Recv operator
This operator will recieve tensor from send_op This operator can get variables from server side.
)DOC"); )DOC");
AddAttr<std::string>("endpoint", AddAttr<std::vector<std::string>>("epmap",
"(string, default 127.0.0.1:6164)" "(string vector, default 127.0.0.1:6164)"
"IP address to listen on.") "Server endpoints in the order of input "
.SetDefault("127.0.0.1:6164") "variables for mapping")
.AddCustomChecker([](const std::string &ip) { return !ip.empty(); });
AddAttr<framework::BlockDesc *>(
kOptimizeBlock, "Serialized ProgramDesc string for recv to run.");
AddAttr<std::vector<std::string>>(
"ParamList", "type list of string",
"grad->param name mapping to find which parameters to optimize.")
.SetDefault({});
AddAttr<std::vector<std::string>>(
"GradList", "type list of string",
"grad->param name mapping to find which parameters to optimize.")
.SetDefault({}); .SetDefault({});
AddAttr<int>("Fanin", "type int",
"Number of trainers in the current cluster job")
.SetDefault(1);
} }
}; };
......
...@@ -62,11 +62,13 @@ class SendOp : public framework::OperatorBase { ...@@ -62,11 +62,13 @@ class SendOp : public framework::OperatorBase {
} }
PADDLE_ENFORCE(rpc_client->Wait()); PADDLE_ENFORCE(rpc_client->Wait());
for (size_t i = 0; i < outs.size(); i++) { if (outs.size() > 0) {
VLOG(3) << "getting " << outs[i] << " from " << epmap[i]; for (size_t i = 0; i < outs.size(); i++) {
rpc_client->AsyncGetVariable(epmap[i], ctx, scope, outs[i]); VLOG(3) << "getting " << outs[i] << " from " << epmap[i];
rpc_client->AsyncGetVariable(epmap[i], ctx, scope, outs[i]);
}
PADDLE_ENFORCE(rpc_client->Wait());
} }
PADDLE_ENFORCE(rpc_client->Wait());
} }
}; };
...@@ -85,6 +87,8 @@ Send operator ...@@ -85,6 +87,8 @@ Send operator
This operator will send tensor to recv_op at the parameter server. This operator will send tensor to recv_op at the parameter server.
)DOC"); )DOC");
// TODO(typhoonzero): remove this attr generate de-duplicated vector from
// epmap when initializing.
AddAttr<std::vector<std::string>>("endpoints", AddAttr<std::vector<std::string>>("endpoints",
"(string vector, default 127.0.0.1:6164)" "(string vector, default 127.0.0.1:6164)"
"Server endpoints to send variables to.") "Server endpoints to send variables to.")
......
...@@ -25,7 +25,7 @@ limitations under the License. */ ...@@ -25,7 +25,7 @@ limitations under the License. */
#include "paddle/string/printf.h" #include "paddle/string/printf.h"
USE_NO_KERNEL_OP(send); USE_NO_KERNEL_OP(send);
USE_NO_KERNEL_OP(recv); USE_NO_KERNEL_OP(listen_and_serv);
USE_OP(sum); USE_OP(sum);
namespace f = paddle::framework; namespace f = paddle::framework;
...@@ -33,7 +33,7 @@ namespace p = paddle::platform; ...@@ -33,7 +33,7 @@ namespace p = paddle::platform;
namespace m = paddle::operators::math; namespace m = paddle::operators::math;
// global for simplicity. // global for simplicity.
std::unique_ptr<f::OperatorBase> recv_op; std::unique_ptr<f::OperatorBase> listen_and_serv_op;
void InitTensorsInScope(f::Scope &scope, p::CPUPlace &place) { void InitTensorsInScope(f::Scope &scope, p::CPUPlace &place) {
p::CPUDeviceContext ctx(place); p::CPUDeviceContext ctx(place);
...@@ -120,7 +120,7 @@ void StartServerNet(bool is_sparse) { ...@@ -120,7 +120,7 @@ void StartServerNet(bool is_sparse) {
InitTensorsInScope(scope, place); InitTensorsInScope(scope, place);
} }
// sub program run in recv_op, for simple test we use sum // sub program run in listen_and_serv_op, for simple test we use sum
f::ProgramDesc program; f::ProgramDesc program;
f::BlockDesc *block = program.MutableBlock(0); f::BlockDesc *block = program.MutableBlock(0);
// X for server side tensors, RX for received tensers, must be of same shape. // X for server side tensors, RX for received tensers, must be of same shape.
...@@ -131,8 +131,9 @@ void StartServerNet(bool is_sparse) { ...@@ -131,8 +131,9 @@ void StartServerNet(bool is_sparse) {
attrs.insert({"ParamList", std::vector<std::string>({"Out"})}); attrs.insert({"ParamList", std::vector<std::string>({"Out"})});
attrs.insert({"GradList", std::vector<std::string>({"x1"})}); attrs.insert({"GradList", std::vector<std::string>({"x1"})});
attrs.insert({"OptimizeBlock", block}); attrs.insert({"OptimizeBlock", block});
recv_op = f::OpRegistry::CreateOp("recv", {{"RX", {"x1"}}}, {}, attrs); listen_and_serv_op =
recv_op->Run(scope, place); f::OpRegistry::CreateOp("listen_and_serv", {}, {}, attrs);
listen_and_serv_op->Run(scope, place);
} }
TEST(SendRecvOp, CPUDense) { TEST(SendRecvOp, CPUDense) {
...@@ -161,9 +162,9 @@ TEST(SendRecvOp, CPUDense) { ...@@ -161,9 +162,9 @@ TEST(SendRecvOp, CPUDense) {
for (int64_t i = 0; i < target->numel(); ++i) { for (int64_t i = 0; i < target->numel(); ++i) {
EXPECT_EQ(expected[i] * 2, actual[i]); EXPECT_EQ(expected[i] * 2, actual[i]);
} }
recv_op->Stop(); listen_and_serv_op->Stop();
server_thread.join(); server_thread.join();
recv_op.reset(nullptr); listen_and_serv_op.reset(nullptr);
} }
TEST(SendRecvOp, CPUSparse) { TEST(SendRecvOp, CPUSparse) {
...@@ -200,7 +201,7 @@ TEST(SendRecvOp, CPUSparse) { ...@@ -200,7 +201,7 @@ TEST(SendRecvOp, CPUSparse) {
EXPECT_EQ(expect_value->mutable_data<float>(place)[i], EXPECT_EQ(expect_value->mutable_data<float>(place)[i],
actual->mutable_data<float>(place)[i]); actual->mutable_data<float>(place)[i]);
} }
recv_op->Stop(); listen_and_serv_op->Stop();
server_thread.join(); server_thread.join();
recv_op.reset(); listen_and_serv_op.reset();
} }
...@@ -53,6 +53,8 @@ class WhileOp : public framework::OperatorBase { ...@@ -53,6 +53,8 @@ class WhileOp : public framework::OperatorBase {
auto step_scopes = auto step_scopes =
scope.FindVar(Output(kStepScopes))->GetMutable<StepScopeVar>(); scope.FindVar(Output(kStepScopes))->GetMutable<StepScopeVar>();
PADDLE_ENFORCE(platform::is_cpu_place(cond.place()),
"Condition of while op must in CPU memory.");
while (cond.data<bool>()[0]) { while (cond.data<bool>()[0]) {
auto &current_scope = scope.NewScope(); auto &current_scope = scope.NewScope();
step_scopes->push_back(&current_scope); step_scopes->push_back(&current_scope);
...@@ -99,6 +101,9 @@ class WhileGradOp : public framework::OperatorBase { ...@@ -99,6 +101,9 @@ class WhileGradOp : public framework::OperatorBase {
void Run(const framework::Scope &scope, void Run(const framework::Scope &scope,
const platform::Place &dev_place) const override { const platform::Place &dev_place) const override {
// get device context from pool
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &dev_ctx = *pool.Get(dev_place);
framework::Executor executor(dev_place); framework::Executor executor(dev_place);
auto *block = Attr<framework::BlockDesc *>(kStepBlock); auto *block = Attr<framework::BlockDesc *>(kStepBlock);
auto *program = block->Program(); auto *program = block->Program();
...@@ -205,6 +210,8 @@ class WhileGradOp : public framework::OperatorBase { ...@@ -205,6 +210,8 @@ class WhileGradOp : public framework::OperatorBase {
sum_op->Run(cur_scope, dev_place); sum_op->Run(cur_scope, dev_place);
cur_scope.Rename(new_inside_name, inside_grad_name); cur_scope.Rename(new_inside_name, inside_grad_name);
} }
dev_ctx.Wait();
const_cast<framework::Scope &>(scope).DeleteScope(&cur_scope);
} }
} }
}; };
......
...@@ -32,7 +32,7 @@ function cmake_gen() { ...@@ -32,7 +32,7 @@ function cmake_gen() {
cat <<EOF cat <<EOF
======================================== ========================================
Configuring cmake in /paddle/build ... Configuring cmake in /paddle/build ...
-DCMAKE_BUILD_TYPE=${BUILD_TYPE:Release} -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE:-Release}
${PYTHON_FLAGS} ${PYTHON_FLAGS}
-DWITH_DOC=OFF -DWITH_DOC=OFF
-DWITH_GPU=${WITH_GPU:-OFF} -DWITH_GPU=${WITH_GPU:-OFF}
...@@ -55,7 +55,7 @@ EOF ...@@ -55,7 +55,7 @@ EOF
# docker environment is fully controlled by this script. # docker environment is fully controlled by this script.
# See /Paddle/CMakeLists.txt, UNITTEST_USE_VIRTUALENV option. # See /Paddle/CMakeLists.txt, UNITTEST_USE_VIRTUALENV option.
cmake .. \ cmake .. \
-DCMAKE_BUILD_TYPE=${BUILD_TYPE:Release} \ -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE:-Release} \
${PYTHON_FLAGS} \ ${PYTHON_FLAGS} \
-DWITH_DOC=OFF \ -DWITH_DOC=OFF \
-DWITH_GPU=${WITH_GPU:-OFF} \ -DWITH_GPU=${WITH_GPU:-OFF} \
......
...@@ -27,9 +27,10 @@ int main(int argc, char** argv) { ...@@ -27,9 +27,10 @@ int main(int argc, char** argv) {
} }
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
new_argv.push_back( new_argv.push_back(
strdup("--tryfromenv=fraction_of_gpu_memory_to_use,use_pinned_memory")); strdup("--tryfromenv=fraction_of_gpu_memory_to_use,use_pinned_memory,"
"warpctc_dir"));
#else #else
new_argv.push_back(strdup("--tryfromenv=use_pinned_memory")); new_argv.push_back(strdup("--tryfromenv=use_pinned_memory,warpctc_dir"));
#endif #endif
int new_argc = static_cast<int>(new_argv.size()); int new_argc = static_cast<int>(new_argv.size());
char** new_argv_address = new_argv.data(); char** new_argv_address = new_argv.data();
......
...@@ -76,7 +76,9 @@ def __bootstrap__(): ...@@ -76,7 +76,9 @@ def __bootstrap__():
os.environ['OMP_NUM_THREADS'] = str(num_threads) os.environ['OMP_NUM_THREADS'] = str(num_threads)
read_env_flags = ['use_pinned_memory', 'check_nan_inf', 'benchmark'] read_env_flags = [
'use_pinned_memory', 'check_nan_inf', 'benchmark', 'warpctc_dir'
]
if core.is_compiled_with_cuda(): if core.is_compiled_with_cuda():
read_env_flags += ['fraction_of_gpu_memory_to_use'] read_env_flags += ['fraction_of_gpu_memory_to_use']
core.init_gflags([sys.argv[0]] + core.init_gflags([sys.argv[0]] +
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from graphviz import GraphPreviewGenerator
import proto.framework_pb2 as framework_pb2
def draw_block_graphviz(block, highlights=None, path="./temp.dot"):
'''
Generate a debug graph for block.
Args:
block(Block): a block.
'''
graph = GraphPreviewGenerator("some graph")
# collect parameters and args
protostr = block.desc.serialize_to_string()
desc = framework_pb2.BlockDesc.FromString(str(protostr))
def need_highlight(name):
if highlights is None: return False
for pattern in highlights:
assert type(pattern) is str
if re.match(pattern, name):
return True
return False
# draw parameters and args
vars = {}
for var in desc.vars:
shape = [str(i) for i in var.lod_tensor.tensor.dims]
if not shape:
shape = ['null']
# create var
if var.persistable:
varn = graph.add_param(
var.name, var.type, shape, highlight=need_highlight(var.name))
else:
varn = graph.add_arg(var.name, highlight=need_highlight(var.name))
vars[var.name] = varn
def add_op_link_var(op, var, op2var=False):
for arg in var.arguments:
if arg not in vars:
# add missing variables as argument
vars[arg] = graph.add_arg(arg, highlight=need_highlight(arg))
varn = vars[arg]
highlight = need_highlight(op.description) or need_highlight(
varn.description)
if op2var:
graph.add_edge(op, varn, highlight=highlight)
else:
graph.add_edge(varn, op, highlight=highlight)
for op in desc.ops:
opn = graph.add_op(op.type, highlight=need_highlight(op.type))
for var in op.inputs:
add_op_link_var(opn, var, False)
for var in op.outputs:
add_op_link_var(opn, var, True)
graph(path, show=True)
...@@ -478,9 +478,9 @@ class DistributeTranspiler: ...@@ -478,9 +478,9 @@ class DistributeTranspiler:
else: else:
self._append_pserver_non_opt_ops(optimize_sub_program, self._append_pserver_non_opt_ops(optimize_sub_program,
pserver_program, opt_op) pserver_program, opt_op)
# Append the recv op # Append the listen_and_serv op
pserver_program.global_block().append_op( pserver_program.global_block().append_op(
type="recv", type="listen_and_serv",
inputs={}, inputs={},
outputs={}, outputs={},
attrs={ attrs={
......
...@@ -451,9 +451,8 @@ class Operator(object): ...@@ -451,9 +451,8 @@ class Operator(object):
if not given == need: if not given == need:
raise ValueError(("Incorrect setting for output(s) of " raise ValueError(("Incorrect setting for output(s) of "
"operator \"%s\". Need: [%s] Given: [%s]") % "operator \"%s\". Need: [%s] Given: [%s]") %
(type, ", ".join(str(e) (type, ", ".join(str(e) for e in need),
for e in need), ", ".join( ", ".join(str(e) for e in given)))
str(e) for e in given)))
for out_proto in proto.outputs: for out_proto in proto.outputs:
out_args = outputs[out_proto.name] out_args = outputs[out_proto.name]
...@@ -489,7 +488,8 @@ class Operator(object): ...@@ -489,7 +488,8 @@ class Operator(object):
no_kernel_op_set = { no_kernel_op_set = {
'feed', 'fetch', 'save', 'load', 'recurrent', 'feed', 'fetch', 'save', 'load', 'recurrent',
'rnn_memory_helper_grad', 'conditional_block', 'while', 'send', 'rnn_memory_helper_grad', 'conditional_block', 'while', 'send',
'recv', 'parallel_do' 'recv', 'listen_and_serv', 'parallel_do', 'save_combine',
'load_combine'
} }
if type not in no_kernel_op_set: if type not in no_kernel_op_set:
self.desc.infer_var_type(self.block.desc) self.desc.infer_var_type(self.block.desc)
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import random
import subprocess
import logging
def crepr(v):
if type(v) is str or type(v) is unicode:
return '"%s"' % v
return str(v)
class Rank(object):
def __init__(self, kind, name, priority):
'''
kind: str
name: str
priority: int
'''
self.kind = kind
self.name = name
self.priority = priority
self.nodes = []
def __str__(self):
if not self.nodes:
return ''
return '{' + 'rank={};'.format(self.kind) + \
','.join([node.name for node in self.nodes]) + '}'
class Graph(object):
rank_counter = 0
def __init__(self, title, **attrs):
self.title = title
self.attrs = attrs
self.nodes = []
self.edges = []
self.rank_groups = {}
def code(self):
return self.__str__()
def rank_group(self, kind, priority):
name = "rankgroup-%d" % Graph.rank_counter
Graph.rank_counter += 1
rank = Rank(kind, name, priority)
self.rank_groups[name] = rank
return name
def node(self, label, prefix, description="", **attrs):
node = Node(label, prefix, description, **attrs)
if 'rank' in attrs:
rank = self.rank_groups[attrs['rank']]
del attrs['rank']
rank.nodes.append(node)
self.nodes.append(node)
return node
def edge(self, source, target, **attrs):
edge = Edge(source, target, **attrs)
self.edges.append(edge)
return edge
def compile(self, dot_path):
file = open(dot_path, 'w')
file.write(self.__str__())
image_path = os.path.join(
os.path.dirname(__file__), dot_path[:-3] + "pdf")
cmd = ["dot", "-Tpdf", dot_path, "-o", image_path]
subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
logging.warning("write block debug graph to {}".format(image_path))
return image_path
def show(self, dot_path):
image = self.compile(dot_path)
cmd = ["open", image]
subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def _rank_repr(self):
ranks = sorted(
self.rank_groups.items(),
cmp=lambda a, b: a[1].priority > b[1].priority)
repr = []
for x in ranks:
repr.append(str(x[1]))
return '\n'.join(repr) + '\n'
def __str__(self):
reprs = [
'digraph G {',
'title = {}'.format(crepr(self.title)),
]
for attr in self.attrs:
reprs.append("{key}={value};".format(
key=attr, value=crepr(self.attrs[attr])))
reprs.append(self._rank_repr())
random.shuffle(self.nodes)
reprs += [str(node) for node in self.nodes]
for x in self.edges:
reprs.append(str(x))
reprs.append('}')
return '\n'.join(reprs)
class Node(object):
counter = 1
def __init__(self, label, prefix, description="", **attrs):
self.label = label
self.name = "%s_%d" % (prefix, Node.counter)
self.description = description
self.attrs = attrs
Node.counter += 1
def __str__(self):
reprs = '{name} [label={label} {extra} ];'.format(
name=self.name,
label=self.label,
extra=',' + ','.join("%s=%s" % (key, crepr(value))
for key, value in self.attrs.items())
if self.attrs else "")
return reprs
class Edge(object):
def __init__(self, source, target, **attrs):
'''
Link source to target.
:param source: Node
:param target: Node
:param graph: Graph
:param attrs: dic
'''
self.source = source
self.target = target
self.attrs = attrs
def __str__(self):
repr = "{source} -> {target} {extra}".format(
source=self.source.name,
target=self.target.name,
extra="" if not self.attrs else
"[" + ','.join("{}={}".format(attr[0], crepr(attr[1]))
for attr in self.attrs.items()) + "]")
return repr
class GraphPreviewGenerator(object):
'''
Generate a graph image for ONNX proto.
'''
def __init__(self, title):
# init graphviz graph
self.graph = Graph(
title,
layout="dot",
concentrate="true",
rankdir="TB", )
self.op_rank = self.graph.rank_group('same', 2)
self.param_rank = self.graph.rank_group('same', 1)
self.arg_rank = self.graph.rank_group('same', 0)
def __call__(self, path='temp.dot', show=False):
if not show:
self.graph.compile(path)
else:
self.graph.show(path)
def add_param(self, name, data_type, shape, highlight=False):
label = '\n'.join([
'<<table cellpadding="5">',
' <tr>',
' <td bgcolor="#2b787e">',
' <b>',
name,
' </b>',
' </td>',
' </tr>',
' <tr>',
' <td>',
str(data_type),
' </td>'
' </tr>',
' <tr>',
' <td>',
'[%s]' % 'x'.join(shape),
' </td>'
' </tr>',
'</table>>',
])
return self.graph.node(
label,
prefix="param",
description=name,
shape="none",
style="rounded,filled,bold",
width="1.3",
color="#148b97" if not highlight else "orange",
fontcolor="#ffffff",
fontname="Arial")
def add_op(self, opType, **kwargs):
highlight = False
if 'highlight' in kwargs:
highlight = kwargs['highlight']
del kwargs['highlight']
return self.graph.node(
"<<B>%s</B>>" % opType,
prefix="op",
description=opType,
shape="box",
style="rounded, filled, bold",
color="#303A3A" if not highlight else "orange",
fontname="Arial",
fontcolor="#ffffff",
width="1.3",
height="0.84", )
def add_arg(self, name, highlight=False):
return self.graph.node(
crepr(name),
prefix="arg",
description=name,
shape="box",
style="rounded,filled,bold",
fontname="Arial",
fontcolor="#999999",
color="#dddddd" if not highlight else "orange")
def add_edge(self, source, target, **kwargs):
highlight = False
if 'highlight' in kwargs:
highlight = kwargs['highlight']
del kwargs['highlight']
return self.graph.edge(
source,
target,
color="#00000" if not highlight else "orange",
**kwargs)
...@@ -46,6 +46,9 @@ def is_parameter(var): ...@@ -46,6 +46,9 @@ def is_parameter(var):
def is_persistable(var): def is_persistable(var):
if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \
var.desc.type() == core.VarDesc.VarType.FETCH_LIST:
return False
return var.persistable return var.persistable
...@@ -60,7 +63,12 @@ def _clone_var_in_block_(block, var): ...@@ -60,7 +63,12 @@ def _clone_var_in_block_(block, var):
persistable=True) persistable=True)
def save_vars(executor, dirname, main_program=None, vars=None, predicate=None): def save_vars(executor,
dirname,
main_program=None,
vars=None,
predicate=None,
save_file_name=None):
""" """
Save variables to directory by executor. Save variables to directory by executor.
...@@ -69,9 +77,12 @@ def save_vars(executor, dirname, main_program=None, vars=None, predicate=None): ...@@ -69,9 +77,12 @@ def save_vars(executor, dirname, main_program=None, vars=None, predicate=None):
:param main_program: program. If vars is None, then filter all variables in this :param main_program: program. If vars is None, then filter all variables in this
program which fit `predicate`. Default default_main_program. program which fit `predicate`. Default default_main_program.
:param predicate: The Predicate describes a callable that returns a variable :param predicate: The Predicate describes a callable that returns a variable
as a bool. If it returns true, the variables will be saved. as a bool. If it returns true, the corresponding input variable will be saved.
:param vars: variables need to be saved. If specify vars, program & predicate :param vars: variables need to be saved. If vars is specified, program & predicate
will be ignored will be ignored
:param save_file_name: The name of a single file that all vars are saved to.
If it is None, save variables to separate files.
:return: None :return: None
""" """
if vars is None: if vars is None:
...@@ -83,21 +94,39 @@ def save_vars(executor, dirname, main_program=None, vars=None, predicate=None): ...@@ -83,21 +94,39 @@ def save_vars(executor, dirname, main_program=None, vars=None, predicate=None):
save_vars( save_vars(
executor, executor,
dirname=dirname, dirname=dirname,
vars=filter(predicate, main_program.list_vars())) vars=filter(predicate, main_program.list_vars()),
save_file_name=save_file_name)
else: else:
save_program = Program() save_program = Program()
save_block = save_program.global_block() save_block = save_program.global_block()
save_var_map = {}
for each_var in vars: for each_var in vars:
new_var = _clone_var_in_block_(save_block, each_var) new_var = _clone_var_in_block_(save_block, each_var)
if save_file_name is None:
save_block.append_op(
type='save',
inputs={'X': [new_var]},
outputs={},
attrs={'file_path': os.path.join(dirname, new_var.name)})
else:
save_var_map[new_var.name] = new_var
if save_file_name is not None:
save_var_list = []
for name in sorted(save_var_map.keys()):
save_var_list.append(save_var_map[name])
save_block.append_op( save_block.append_op(
type='save', type='save_combine',
inputs={'X': [new_var]}, inputs={'X': save_var_list},
outputs={}, outputs={},
attrs={'file_path': os.path.join(dirname, new_var.name)}) attrs={'file_path': os.path.join(dirname, save_file_name)})
executor.run(save_program) executor.run(save_program)
def save_params(executor, dirname, main_program=None): def save_params(executor, dirname, main_program=None, save_file_name=None):
""" """
Save all parameters to directory with executor. Save all parameters to directory with executor.
""" """
...@@ -106,10 +135,12 @@ def save_params(executor, dirname, main_program=None): ...@@ -106,10 +135,12 @@ def save_params(executor, dirname, main_program=None):
dirname=dirname, dirname=dirname,
main_program=main_program, main_program=main_program,
vars=None, vars=None,
predicate=is_parameter) predicate=is_parameter,
save_file_name=save_file_name)
def save_persistables(executor, dirname, main_program=None): def save_persistables(executor, dirname, main_program=None,
save_file_name=None):
""" """
Save all persistables to directory with executor. Save all persistables to directory with executor.
""" """
...@@ -118,21 +149,30 @@ def save_persistables(executor, dirname, main_program=None): ...@@ -118,21 +149,30 @@ def save_persistables(executor, dirname, main_program=None):
dirname=dirname, dirname=dirname,
main_program=main_program, main_program=main_program,
vars=None, vars=None,
predicate=is_persistable) predicate=is_persistable,
save_file_name=save_file_name)
def load_vars(executor, dirname, main_program=None, vars=None, predicate=None): def load_vars(executor,
dirname,
main_program=None,
vars=None,
predicate=None,
load_file_name=None):
""" """
Load variables from directory by executor. Load variables from directory by executor.
:param executor: executor that save variable :param executor: executor that load variable
:param dirname: directory path :param dirname: directory path
:param main_program: program. If vars is None, then filter all variables in this :param main_program: program. If vars is None, then filter all variables in this
program which fit `predicate`. Default default_main_program(). program which fit `predicate`. Default default_main_program().
:param predicate: The Predicate describes a callable that returns a variable :param predicate: The Predicate describes a callable that returns a variable
as a bool. If it returns true, the variables will be loaded. as a bool. If it returns true, the corresponding input variable will be loaded.
:param vars: variables need to be loaded. If specify vars, program & :param vars: variables need to be loaded. If vars is specified, program &
predicate will be ignored predicate will be ignored
:param load_file_name: The name of the single file that all vars are loaded from.
If it is None, load variables from separate files.
:return: None :return: None
""" """
if vars is None: if vars is None:
...@@ -144,23 +184,40 @@ def load_vars(executor, dirname, main_program=None, vars=None, predicate=None): ...@@ -144,23 +184,40 @@ def load_vars(executor, dirname, main_program=None, vars=None, predicate=None):
load_vars( load_vars(
executor, executor,
dirname=dirname, dirname=dirname,
vars=filter(predicate, main_program.list_vars())) vars=filter(predicate, main_program.list_vars()),
load_file_name=load_file_name)
else: else:
load_prog = Program() load_prog = Program()
load_block = load_prog.global_block() load_block = load_prog.global_block()
load_var_map = {}
for each_var in vars: for each_var in vars:
assert isinstance(each_var, Variable) assert isinstance(each_var, Variable)
new_var = _clone_var_in_block_(load_block, each_var) new_var = _clone_var_in_block_(load_block, each_var)
if load_file_name is None:
load_block.append_op(
type='load',
inputs={},
outputs={'Out': [new_var]},
attrs={'file_path': os.path.join(dirname, new_var.name)})
else:
load_var_map[new_var.name] = new_var
if load_file_name is not None:
load_var_list = []
for name in sorted(load_var_map.keys()):
load_var_list.append(load_var_map[name])
load_block.append_op( load_block.append_op(
type='load', type='load_combine',
inputs={}, inputs={},
outputs={"Out": [new_var]}, outputs={"Out": load_var_list},
attrs={'file_path': os.path.join(dirname, new_var.name)}) attrs={'file_path': os.path.join(dirname, load_file_name)})
executor.run(load_prog) executor.run(load_prog)
def load_params(executor, dirname, main_program=None): def load_params(executor, dirname, main_program=None, load_file_name=None):
""" """
load all parameters from directory by executor. load all parameters from directory by executor.
""" """
...@@ -168,10 +225,12 @@ def load_params(executor, dirname, main_program=None): ...@@ -168,10 +225,12 @@ def load_params(executor, dirname, main_program=None):
executor, executor,
dirname=dirname, dirname=dirname,
main_program=main_program, main_program=main_program,
predicate=is_parameter) predicate=is_parameter,
load_file_name=load_file_name)
def load_persistables(executor, dirname, main_program=None): def load_persistables(executor, dirname, main_program=None,
load_file_name=None):
""" """
load all persistables from directory by executor. load all persistables from directory by executor.
""" """
...@@ -179,7 +238,8 @@ def load_persistables(executor, dirname, main_program=None): ...@@ -179,7 +238,8 @@ def load_persistables(executor, dirname, main_program=None):
executor, executor,
dirname=dirname, dirname=dirname,
main_program=main_program, main_program=main_program,
predicate=is_persistable) predicate=is_persistable,
load_file_name=load_file_name)
def get_inference_program(target_vars, main_program=None): def get_inference_program(target_vars, main_program=None):
...@@ -238,7 +298,8 @@ def save_inference_model(dirname, ...@@ -238,7 +298,8 @@ def save_inference_model(dirname,
feeded_var_names, feeded_var_names,
target_vars, target_vars,
executor, executor,
main_program=None): main_program=None,
save_file_name=None):
""" """
Build a model especially for inference, Build a model especially for inference,
and save it to directory by the executor. and save it to directory by the executor.
...@@ -249,6 +310,8 @@ def save_inference_model(dirname, ...@@ -249,6 +310,8 @@ def save_inference_model(dirname,
:param executor: executor that save inference model :param executor: executor that save inference model
:param main_program: original program, which will be pruned to build the inference model. :param main_program: original program, which will be pruned to build the inference model.
Default default_main_program(). Default default_main_program().
:param save_file_name: The name of a single file that all parameters are saved to.
If it is None, save parameters to separate files.
:return: None :return: None
""" """
...@@ -283,25 +346,7 @@ def save_inference_model(dirname, ...@@ -283,25 +346,7 @@ def save_inference_model(dirname,
with open(model_file_name, "wb") as f: with open(model_file_name, "wb") as f:
f.write(inference_program.desc.serialize_to_string()) f.write(inference_program.desc.serialize_to_string())
save_params(executor, dirname, main_program) save_persistables(executor, dirname, inference_program, save_file_name)
def load_persistables_if_exist(executor, dirname, main_program=None):
filenames = next(os.walk(dirname))[2]
filenames = set(filenames)
def _is_presistable_and_exist_(var):
if not is_persistable(var):
return False
else:
return var.name in filenames
load_vars(
executor,
dirname,
main_program=main_program,
vars=None,
predicate=_is_presistable_and_exist_)
def get_feed_targets_names(program): def get_feed_targets_names(program):
...@@ -322,13 +367,15 @@ def get_fetch_targets_names(program): ...@@ -322,13 +367,15 @@ def get_fetch_targets_names(program):
return fetch_targets_names return fetch_targets_names
def load_inference_model(dirname, executor): def load_inference_model(dirname, executor, load_file_name=None):
""" """
Load inference model from a directory Load inference model from a directory
:param dirname: directory path :param dirname: directory path
:param executor: executor that load inference model :param executor: executor that load inference model
:param load_file_name: The name of the single file that all parameters are loaded from.
If it is None, load parameters from separate files.
:return: [program, feed_target_names, fetch_targets] :return: [program, feed_target_names, fetch_targets]
program: program especially for inference. program: program especially for inference.
feed_target_names: Names of variables that need to feed data feed_target_names: Names of variables that need to feed data
...@@ -342,7 +389,7 @@ def load_inference_model(dirname, executor): ...@@ -342,7 +389,7 @@ def load_inference_model(dirname, executor):
program_desc_str = f.read() program_desc_str = f.read()
program = Program.parse_from_string(program_desc_str) program = Program.parse_from_string(program_desc_str)
load_persistables_if_exist(executor, dirname, program) load_persistables(executor, dirname, program, load_file_name)
feed_target_names = get_feed_targets_names(program) feed_target_names = get_feed_targets_names(program)
fetch_target_names = get_fetch_targets_names(program) fetch_target_names = get_fetch_targets_names(program)
...@@ -359,6 +406,7 @@ def get_parameter_value(para, executor): ...@@ -359,6 +406,7 @@ def get_parameter_value(para, executor):
:param executor: executor for retrieving the value :param executor: executor for retrieving the value
:param para: the given parameter :param para: the given parameter
:return: the LoDTensor for the parameter :return: the LoDTensor for the parameter
""" """
assert is_parameter(para) assert is_parameter(para)
...@@ -377,6 +425,7 @@ def get_parameter_value_by_name(name, executor, program=None): ...@@ -377,6 +425,7 @@ def get_parameter_value_by_name(name, executor, program=None):
:param name: the name of the parameter :param name: the name of the parameter
:param program: the program where the variable is found :param program: the program where the variable is found
Default default_main_program(). Default default_main_program().
:return: the LoDTensor for the variable :return: the LoDTensor for the variable
""" """
if program is None: if program is None:
......
...@@ -108,7 +108,7 @@ class ListenAndServ(object): ...@@ -108,7 +108,7 @@ class ListenAndServ(object):
""" """
def __init__(self, endpoint, fan_in=1, optimizer_mode=True): def __init__(self, endpoint, fan_in=1, optimizer_mode=True):
self.helper = LayerHelper("recv") self.helper = LayerHelper("listen_and_serv")
self.inputs = [] self.inputs = []
self.outputs = [] self.outputs = []
self.endpoint = endpoint self.endpoint = endpoint
...@@ -158,7 +158,7 @@ class ListenAndServ(object): ...@@ -158,7 +158,7 @@ class ListenAndServ(object):
param_names = [p.name for p in params] param_names = [p.name for p in params]
grad_names = [g.name for g in grads] grad_names = [g.name for g in grads]
parent_block.append_op( parent_block.append_op(
type='recv', type='listen_and_serv',
inputs={}, inputs={},
outputs={}, outputs={},
attrs={ attrs={
...@@ -196,3 +196,31 @@ def Send(endpoints, send_vars, get_vars): ...@@ -196,3 +196,31 @@ def Send(endpoints, send_vars, get_vars):
outputs={"Out": get_vars}, outputs={"Out": get_vars},
attrs={"endpoints": endpoints, attrs={"endpoints": endpoints,
"epmap": epmap}) "epmap": epmap})
def Recv(endpoints, get_vars):
"""
Recv layer
Args:
endpoints: comma seperated IP:PORT pairs in the order
of send_vars to send
send_vars: vars to send
get_vars: vars to get from server after send completes.
Send variables to the server side, and get vars from server
side when server have finished running server side program.
"""
assert (type(send_vars) == list)
assert (type(get_vars) == list)
epmap = endpoints.split(",")
endpoints = list(set(epmap))
helper = LayerHelper("Recv", **locals())
helper.append_op(
type="recv",
inputs={"X": get_vars},
outputs={"Out": get_vars},
attrs={"endpoints": endpoints,
"epmap": epmap})
...@@ -59,6 +59,7 @@ __all__ = [ ...@@ -59,6 +59,7 @@ __all__ = [
'elementwise_pow', 'elementwise_pow',
'clip', 'clip',
'clip_by_norm', 'clip_by_norm',
'softmax',
'sequence_softmax', 'sequence_softmax',
] + __activations__ ] + __activations__
......
...@@ -295,7 +295,7 @@ def fill_constant_batch_size_like(input, ...@@ -295,7 +295,7 @@ def fill_constant_batch_size_like(input,
return out return out
def ones(shape, dtype): def ones(shape, dtype, force_cpu=False):
""" """
**ones** **ones**
...@@ -319,7 +319,7 @@ def ones(shape, dtype): ...@@ -319,7 +319,7 @@ def ones(shape, dtype):
return fill_constant(value=1.0, **locals()) return fill_constant(value=1.0, **locals())
def zeros(shape, dtype): def zeros(shape, dtype, force_cpu=False):
""" """
**zeros** **zeros**
......
...@@ -31,7 +31,7 @@ dtype_to_size = { ...@@ -31,7 +31,7 @@ dtype_to_size = {
class ControlFlowGraph(object): class ControlFlowGraph(object):
def __init__(self, Program, ops, forward_num): def __init__(self, Program, ops, forward_num, skip_opt):
self._program = Program self._program = Program
self._ops = ops self._ops = ops
self._forward_num = forward_num self._forward_num = forward_num
...@@ -41,6 +41,7 @@ class ControlFlowGraph(object): ...@@ -41,6 +41,7 @@ class ControlFlowGraph(object):
self._defs = defaultdict(set) self._defs = defaultdict(set)
self._live_in = defaultdict(set) self._live_in = defaultdict(set)
self._live_out = defaultdict(set) self._live_out = defaultdict(set)
self._skip_opt = skip_opt
def _add_connections(self, connections): def _add_connections(self, connections):
for node1, node2 in connections: for node1, node2 in connections:
...@@ -130,6 +131,10 @@ class ControlFlowGraph(object): ...@@ -130,6 +131,10 @@ class ControlFlowGraph(object):
block_desc, x, block_desc, x,
is_forward).type() != core.VarDesc.VarType.LOD_TENSOR: is_forward).type() != core.VarDesc.VarType.LOD_TENSOR:
return False return False
if x in self._skip_opt:
return False
if not self._find_var(block_desc, x, is_forward).shape():
return False
return True return True
self._build_graph() self._build_graph()
...@@ -140,6 +145,7 @@ class ControlFlowGraph(object): ...@@ -140,6 +145,7 @@ class ControlFlowGraph(object):
if op.type() == "while" or op.type() == "while_grad": if op.type() == "while" or op.type() == "while_grad":
continue continue
block_desc = op.block() block_desc = op.block()
self.current_block_desc = block_desc
is_forward = i < self._forward_num is_forward = i < self._forward_num
if self.pool: if self.pool:
defs_can_optimize = filter( defs_can_optimize = filter(
...@@ -197,28 +203,32 @@ def get_cfgs(input_program): ...@@ -197,28 +203,32 @@ def get_cfgs(input_program):
block_desc = pdesc.block(0) block_desc = pdesc.block(0)
op_size = block_desc.op_size() op_size = block_desc.op_size()
# Get global block ops # Get global block ops
ops_list.append(([block_desc.op(i) for i in range(op_size)], op_size)) ops_list.append(
([block_desc.op(i) for i in range(op_size)], op_size, set()))
while_sub_block_ids = [] while_sub_block_ids = []
while_grad_sub_block_ids = [] while_grad_sub_block_ids = []
while_pair = [] while_op_output = set()
while_block_id_pair = []
for i in range(op_size): for i in range(op_size):
op = block_desc.op(i) op = block_desc.op(i)
if op.type() == "while": if op.type() == "while":
while_sub_block_ids.append(op.attr("sub_block").id) while_sub_block_ids.append(op.attr("sub_block").id)
while_op_output.update(op.output_arg_names())
elif op.type() == "while_grad": elif op.type() == "while_grad":
while_grad_sub_block_ids.append(op.attr("sub_block").id) while_grad_sub_block_ids.append(op.attr("sub_block").id)
while_op_output.update(op.output_arg_names())
# Find while/while_grad block pair # Find while/while_grad block pair
for grad_id in while_grad_sub_block_ids: for grad_id in while_grad_sub_block_ids:
parent_id = pdesc.block(grad_id).parent parent_id = pdesc.block(grad_id).parent
if parent_id in while_sub_block_ids: if parent_id in while_sub_block_ids:
while_pair.append((parent_id, grad_id)) while_block_id_pair.append((parent_id, grad_id))
while_sub_block_ids.remove(parent_id) while_sub_block_ids.remove(parent_id)
# Get while/while_grad block ops # Get while/while_grad block ops
for parent_id, grad_id in while_pair: for parent_id, grad_id in while_block_id_pair:
while_block_ops = [] while_block_ops = []
while_block = pdesc.block(parent_id) while_block = pdesc.block(parent_id)
while_block_op_size = while_block.op_size() while_block_op_size = while_block.op_size()
...@@ -230,7 +240,7 @@ def get_cfgs(input_program): ...@@ -230,7 +240,7 @@ def get_cfgs(input_program):
for i in range(while_grad_block_op_size): for i in range(while_grad_block_op_size):
while_block_ops.append(while_grad_block.op(i)) while_block_ops.append(while_grad_block.op(i))
ops_list.append((while_block_ops, while_block_op_size)) ops_list.append((while_block_ops, while_block_op_size, while_op_output))
# Process rest while block ops # Process rest while block ops
for parent_id in while_sub_block_ids: for parent_id in while_sub_block_ids:
...@@ -242,7 +252,7 @@ def get_cfgs(input_program): ...@@ -242,7 +252,7 @@ def get_cfgs(input_program):
ops_list.append((while_block_ops, while_block_op_size)) ops_list.append((while_block_ops, while_block_op_size))
cfgs = [ControlFlowGraph(input_program, i, j) for i, j in ops_list] cfgs = [ControlFlowGraph(input_program, i, j, k) for i, j, k in ops_list]
return cfgs return cfgs
......
...@@ -5,9 +5,11 @@ if(NOT WITH_DISTRIBUTE) ...@@ -5,9 +5,11 @@ if(NOT WITH_DISTRIBUTE)
list(REMOVE_ITEM TEST_OPS test_recv_op) list(REMOVE_ITEM TEST_OPS test_recv_op)
endif(NOT WITH_DISTRIBUTE) endif(NOT WITH_DISTRIBUTE)
list(REMOVE_ITEM TEST_OPS test_warpctc_op)
foreach(src ${TEST_OPS}) foreach(src ${TEST_OPS})
py_test(${src} SRCS ${src}.py) py_test(${src} SRCS ${src}.py)
endforeach() endforeach()
py_test(test_warpctc_op SRCS test_warpctc_op.py ENVS FLAGS_warpctc_dir=${WARPCTC_LIB_DIR})
add_subdirectory(book) add_subdirectory(book)
add_subdirectory(book_distribute) add_subdirectory(book_distribute)
......
recognize_digits_*.inference.model
file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py") file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py")
string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
list(REMOVE_ITEM TEST_OPS test_image_classification_train test_recognize_digits)
py_test(test_image_classification_train_resnet SRCS test_image_classification_train.py ARGS resnet)
py_test(test_image_classification_train_vgg SRCS test_image_classification_train.py ARGS vgg)
py_test(test_recognize_digits_mlp_cpu
SRCS test_recognize_digits.py
ARGS mlp)
py_test(test_recognize_digits_mlp_cuda
SRCS test_recognize_digits.py
ARGS mlp --use_cuda)
py_test(test_recognize_digits_conv_cpu
SRCS test_recognize_digits.py
ARGS conv)
py_test(test_recognize_digits_conv_cuda
SRCS test_recognize_digits.py
ARGS conv --use_cuda)
py_test(test_recognize_digits_mlp_cpu_parallel
SRCS test_recognize_digits.py
ARGS mlp --parallel)
py_test(test_recognize_digits_mlp_cuda_parallel
SRCS test_recognize_digits.py
ARGS mlp --use_cuda --parallel)
py_test(test_recognize_digits_conv_cpu_parallel
SRCS test_recognize_digits.py
ARGS conv --parallel)
py_test(test_recognize_digits_conv_cuda_parallel
SRCS test_recognize_digits.py
ARGS conv --use_cuda --parallel)
# default test # default test
foreach(src ${TEST_OPS}) foreach(src ${TEST_OPS})
py_test(${src} SRCS ${src}.py) py_test(${src} SRCS ${src}.py)
......
...@@ -12,44 +12,74 @@ ...@@ -12,44 +12,74 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import numpy as np
import paddle.v2 as paddle import paddle.v2 as paddle
import paddle.v2.fluid as fluid import paddle.v2.fluid as fluid
import contextlib
import unittest
x = fluid.layers.data(name='x', shape=[13], dtype='float32')
y_predict = fluid.layers.fc(input=x, size=1, act=None) def main(use_cuda):
if use_cuda and not fluid.core.is_compiled_with_cuda():
return
y = fluid.layers.data(name='y', shape=[1], dtype='float32') x = fluid.layers.data(name='x', shape=[13], dtype='float32')
cost = fluid.layers.square_error_cost(input=y_predict, label=y) y_predict = fluid.layers.fc(input=x, size=1, act=None)
avg_cost = fluid.layers.mean(x=cost)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) y = fluid.layers.data(name='y', shape=[1], dtype='float32')
sgd_optimizer.minimize(avg_cost)
BATCH_SIZE = 20 cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(x=cost)
train_reader = paddle.batch( sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
paddle.reader.shuffle( sgd_optimizer.minimize(avg_cost)
paddle.dataset.uci_housing.train(), buf_size=500),
batch_size=BATCH_SIZE)
place = fluid.CPUPlace() BATCH_SIZE = 20
feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program()) train_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.uci_housing.train(), buf_size=500),
batch_size=BATCH_SIZE)
PASS_NUM = 100 place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
for pass_id in range(PASS_NUM): feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
fluid.io.save_persistables(exe, "./fit_a_line.model/") exe = fluid.Executor(place)
fluid.io.load_persistables(exe, "./fit_a_line.model/")
for data in train_reader(): exe.run(fluid.default_startup_program())
avg_loss_value, = exe.run(fluid.default_main_program(),
feed=feeder.feed(data), PASS_NUM = 100
fetch_list=[avg_cost]) for pass_id in range(PASS_NUM):
print(avg_loss_value) fluid.io.save_persistables(exe, "./fit_a_line.model/")
if avg_loss_value[0] < 10.0: fluid.io.load_persistables(exe, "./fit_a_line.model/")
exit(0) # if avg cost less than 10.0, we think our code is good. for data in train_reader():
exit(1) avg_loss_value, = exe.run(fluid.default_main_program(),
feed=feeder.feed(data),
fetch_list=[avg_cost])
print(avg_loss_value)
if avg_loss_value[0] < 10.0:
return
raise AssertionError("Fit a line cost is too large, {0:2.2}".format(
avg_loss_value[0]))
class TestFitALine(unittest.TestCase):
def test_cpu(self):
with self.program_scope_guard():
main(use_cuda=False)
def test_cuda(self):
with self.program_scope_guard():
main(use_cuda=True)
@contextlib.contextmanager
def program_scope_guard(self):
prog = fluid.Program()
startup_prog = fluid.Program()
scope = fluid.core.Scope()
with fluid.scope_guard(scope):
with fluid.program_guard(prog, startup_prog):
yield
if __name__ == '__main__':
unittest.main()
...@@ -14,10 +14,10 @@ ...@@ -14,10 +14,10 @@
from __future__ import print_function from __future__ import print_function
import sys
import paddle.v2 as paddle import paddle.v2 as paddle
import paddle.v2.fluid as fluid import paddle.v2.fluid as fluid
import unittest
import contextlib
def resnet_cifar10(input, depth=32): def resnet_cifar10(input, depth=32):
...@@ -89,56 +89,89 @@ def vgg16_bn_drop(input): ...@@ -89,56 +89,89 @@ def vgg16_bn_drop(input):
return fc2 return fc2
classdim = 10 def main(net_type, use_cuda):
data_shape = [3, 32, 32] if use_cuda and not fluid.core.is_compiled_with_cuda():
return
images = fluid.layers.data(name='pixel', shape=data_shape, dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64') classdim = 10
data_shape = [3, 32, 32]
net_type = "vgg"
if len(sys.argv) >= 2: images = fluid.layers.data(name='pixel', shape=data_shape, dtype='float32')
net_type = sys.argv[1] label = fluid.layers.data(name='label', shape=[1], dtype='int64')
if net_type == "vgg": if net_type == "vgg":
print("train vgg net") print("train vgg net")
net = vgg16_bn_drop(images) net = vgg16_bn_drop(images)
elif net_type == "resnet": elif net_type == "resnet":
print("train resnet") print("train resnet")
net = resnet_cifar10(images, 32) net = resnet_cifar10(images, 32)
else: else:
raise ValueError("%s network is not supported" % net_type) raise ValueError("%s network is not supported" % net_type)
predict = fluid.layers.fc(input=net, size=classdim, act='softmax') predict = fluid.layers.fc(input=net, size=classdim, act='softmax')
cost = fluid.layers.cross_entropy(input=predict, label=label) cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost) avg_cost = fluid.layers.mean(x=cost)
optimizer = fluid.optimizer.Adam(learning_rate=0.001) optimizer = fluid.optimizer.Adam(learning_rate=0.001)
opts = optimizer.minimize(avg_cost) optimizer.minimize(avg_cost)
accuracy = fluid.evaluator.Accuracy(input=predict, label=label) accuracy = fluid.evaluator.Accuracy(input=predict, label=label)
BATCH_SIZE = 128 BATCH_SIZE = 128
PASS_NUM = 1 PASS_NUM = 1
train_reader = paddle.batch( train_reader = paddle.batch(
paddle.reader.shuffle( paddle.reader.shuffle(
paddle.dataset.cifar.train10(), buf_size=128 * 10), paddle.dataset.cifar.train10(), buf_size=128 * 10),
batch_size=BATCH_SIZE) batch_size=BATCH_SIZE)
place = fluid.CPUPlace() place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place) exe = fluid.Executor(place)
feeder = fluid.DataFeeder(place=place, feed_list=[images, label]) feeder = fluid.DataFeeder(place=place, feed_list=[images, label])
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
for pass_id in range(PASS_NUM): loss = 0.0
accuracy.reset(exe) for pass_id in range(PASS_NUM):
for data in train_reader(): accuracy.reset(exe)
loss, acc = exe.run(fluid.default_main_program(), for data in train_reader():
feed=feeder.feed(data), loss, acc = exe.run(fluid.default_main_program(),
fetch_list=[avg_cost] + accuracy.metrics) feed=feeder.feed(data),
pass_acc = accuracy.eval(exe) fetch_list=[avg_cost] + accuracy.metrics)
print("loss:" + str(loss) + " acc:" + str(acc) + " pass_acc:" + str( pass_acc = accuracy.eval(exe)
pass_acc)) print("loss:" + str(loss) + " acc:" + str(acc) + " pass_acc:" + str(
# this model is slow, so if we can train two mini batch, we think it works properly. pass_acc))
exit(0) return
exit(1)
raise AssertionError(
"Image classification loss is too large, {0:2.2}".format(loss))
class TestImageClassification(unittest.TestCase):
def test_vgg_cuda(self):
with self.scope_prog_guard():
main('vgg', use_cuda=True)
def test_resnet_cuda(self):
with self.scope_prog_guard():
main('resnet', use_cuda=True)
def test_vgg_cpu(self):
with self.scope_prog_guard():
main('vgg', use_cuda=False)
def test_resnet_cpu(self):
with self.scope_prog_guard():
main('resnet', use_cuda=False)
@contextlib.contextmanager
def scope_prog_guard(self):
prog = fluid.Program()
startup_prog = fluid.Program()
scope = fluid.core.Scope()
with fluid.scope_guard(scope):
with fluid.program_guard(prog, startup_prog):
yield
if __name__ == '__main__':
unittest.main()
...@@ -11,21 +11,20 @@ ...@@ -11,21 +11,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import contextlib
import numpy as np import numpy as np
import paddle.v2 as paddle import paddle.v2 as paddle
import paddle.v2.fluid as fluid import paddle.v2.fluid as fluid
import paddle.v2.fluid.core as core
import paddle.v2.fluid.framework as framework import paddle.v2.fluid.framework as framework
import paddle.v2.fluid.layers as pd import paddle.v2.fluid.layers as pd
from paddle.v2.fluid.executor import Executor from paddle.v2.fluid.executor import Executor
import unittest
dict_size = 30000 dict_size = 30000
source_dict_dim = target_dict_dim = dict_size source_dict_dim = target_dict_dim = dict_size
src_dict, trg_dict = paddle.dataset.wmt14.get_dict(dict_size)
hidden_dim = 32 hidden_dim = 32
word_dim = 16 word_dim = 16
IS_SPARSE = True
batch_size = 2 batch_size = 2
max_length = 8 max_length = 8
topk_size = 50 topk_size = 50
...@@ -34,10 +33,8 @@ beam_size = 2 ...@@ -34,10 +33,8 @@ beam_size = 2
decoder_size = hidden_dim decoder_size = hidden_dim
place = core.CPUPlace()
def encoder(is_sparse):
def encoder():
# encoder # encoder
src_word_id = pd.data( src_word_id = pd.data(
name="src_word_id", shape=[1], dtype='int64', lod_level=1) name="src_word_id", shape=[1], dtype='int64', lod_level=1)
...@@ -45,7 +42,7 @@ def encoder(): ...@@ -45,7 +42,7 @@ def encoder():
input=src_word_id, input=src_word_id,
size=[dict_size, word_dim], size=[dict_size, word_dim],
dtype='float32', dtype='float32',
is_sparse=IS_SPARSE, is_sparse=is_sparse,
param_attr=fluid.ParamAttr(name='vemb')) param_attr=fluid.ParamAttr(name='vemb'))
fc1 = pd.fc(input=src_embedding, size=hidden_dim * 4, act='tanh') fc1 = pd.fc(input=src_embedding, size=hidden_dim * 4, act='tanh')
...@@ -54,7 +51,7 @@ def encoder(): ...@@ -54,7 +51,7 @@ def encoder():
return encoder_out return encoder_out
def decoder_train(context): def decoder_train(context, is_sparse):
# decoder # decoder
trg_language_word = pd.data( trg_language_word = pd.data(
name="target_language_word", shape=[1], dtype='int64', lod_level=1) name="target_language_word", shape=[1], dtype='int64', lod_level=1)
...@@ -62,7 +59,7 @@ def decoder_train(context): ...@@ -62,7 +59,7 @@ def decoder_train(context):
input=trg_language_word, input=trg_language_word,
size=[dict_size, word_dim], size=[dict_size, word_dim],
dtype='float32', dtype='float32',
is_sparse=IS_SPARSE, is_sparse=is_sparse,
param_attr=fluid.ParamAttr(name='vemb')) param_attr=fluid.ParamAttr(name='vemb'))
rnn = pd.DynamicRNN() rnn = pd.DynamicRNN()
...@@ -82,10 +79,10 @@ def decoder_train(context): ...@@ -82,10 +79,10 @@ def decoder_train(context):
return rnn() return rnn()
def decoder_decode(context): def decoder_decode(context, is_sparse):
init_state = context init_state = context
array_len = pd.fill_constant(shape=[1], dtype='int64', value=max_length) array_len = pd.fill_constant(shape=[1], dtype='int64', value=max_length)
counter = pd.zeros(shape=[1], dtype='int64') counter = pd.zeros(shape=[1], dtype='int64', force_cpu=True)
# fill the first element with init_state # fill the first element with init_state
state_array = pd.create_array('float32') state_array = pd.create_array('float32')
...@@ -117,7 +114,7 @@ def decoder_decode(context): ...@@ -117,7 +114,7 @@ def decoder_decode(context):
input=pre_ids, input=pre_ids,
size=[dict_size, word_dim], size=[dict_size, word_dim],
dtype='float32', dtype='float32',
is_sparse=IS_SPARSE) is_sparse=is_sparse)
# use rnn unit to update rnn # use rnn unit to update rnn
current_state = pd.fc(input=[pre_ids_emb, pre_state_expanded], current_state = pd.fc(input=[pre_ids_emb, pre_state_expanded],
...@@ -150,7 +147,7 @@ def decoder_decode(context): ...@@ -150,7 +147,7 @@ def decoder_decode(context):
def set_init_lod(data, lod, place): def set_init_lod(data, lod, place):
res = core.LoDTensor() res = fluid.LoDTensor()
res.set(data, place) res.set(data, place)
res.set_lod(lod) res.set_lod(lod)
return res return res
...@@ -165,15 +162,19 @@ def to_lodtensor(data, place): ...@@ -165,15 +162,19 @@ def to_lodtensor(data, place):
lod.append(cur_len) lod.append(cur_len)
flattened_data = np.concatenate(data, axis=0).astype("int64") flattened_data = np.concatenate(data, axis=0).astype("int64")
flattened_data = flattened_data.reshape([len(flattened_data), 1]) flattened_data = flattened_data.reshape([len(flattened_data), 1])
res = core.LoDTensor() res = fluid.LoDTensor()
res.set(flattened_data, place) res.set(flattened_data, place)
res.set_lod([lod]) res.set_lod([lod])
return res return res
def train_main(): def train_main(use_cuda, is_sparse):
context = encoder() if use_cuda and not fluid.core.is_compiled_with_cuda():
rnn_out = decoder_train(context) return
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
context = encoder(is_sparse)
rnn_out = decoder_train(context, is_sparse)
label = pd.data( label = pd.data(
name="target_language_next_word", shape=[1], dtype='int64', lod_level=1) name="target_language_next_word", shape=[1], dtype='int64', lod_level=1)
cost = pd.cross_entropy(input=rnn_out, label=label) cost = pd.cross_entropy(input=rnn_out, label=label)
...@@ -212,9 +213,13 @@ def train_main(): ...@@ -212,9 +213,13 @@ def train_main():
batch_id += 1 batch_id += 1
def decode_main(): def decode_main(use_cuda, is_sparse):
context = encoder() if use_cuda and not fluid.core.is_compiled_with_cuda():
translation_ids, translation_scores = decoder_decode(context) return
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
context = encoder(is_sparse)
translation_ids, translation_scores = decoder_decode(context, is_sparse)
exe = Executor(place) exe = Executor(place)
exe.run(framework.default_startup_program()) exe.run(framework.default_startup_program())
...@@ -250,6 +255,60 @@ def decode_main(): ...@@ -250,6 +255,60 @@ def decode_main():
break break
class TestMachineTranslation(unittest.TestCase):
pass
@contextlib.contextmanager
def scope_prog_guard():
prog = fluid.Program()
startup_prog = fluid.Program()
scope = fluid.core.Scope()
with fluid.scope_guard(scope):
with fluid.program_guard(prog, startup_prog):
yield
def inject_test_train(use_cuda, is_sparse):
f_name = 'test_{0}_{1}_train'.format('cuda' if use_cuda else 'cpu', 'sparse'
if is_sparse else 'dense')
def f(*args):
with scope_prog_guard():
train_main(use_cuda, is_sparse)
setattr(TestMachineTranslation, f_name, f)
def inject_test_decode(use_cuda, is_sparse, decorator=None):
f_name = 'test_{0}_{1}_decode'.format('cuda'
if use_cuda else 'cpu', 'sparse'
if is_sparse else 'dense')
def f(*args):
with scope_prog_guard():
decode_main(use_cuda, is_sparse)
if decorator is not None:
f = decorator(f)
setattr(TestMachineTranslation, f_name, f)
for _use_cuda_ in (False, True):
for _is_sparse_ in (False, True):
inject_test_train(_use_cuda_, _is_sparse_)
for _use_cuda_ in (False, True):
for _is_sparse_ in (False, True):
_decorator_ = None
if _use_cuda_:
_decorator_ = unittest.skip(
reason='Beam Search does not support CUDA!')
inject_test_decode(
is_sparse=_is_sparse_, use_cuda=_use_cuda_, decorator=_decorator_)
if __name__ == '__main__': if __name__ == '__main__':
# train_main() unittest.main()
decode_main()
...@@ -17,6 +17,7 @@ import paddle.v2.fluid as fluid ...@@ -17,6 +17,7 @@ import paddle.v2.fluid as fluid
import paddle.v2 as paddle import paddle.v2 as paddle
import sys import sys
import numpy import numpy
import unittest
def parse_arg(): def parse_arg():
...@@ -74,18 +75,18 @@ def conv_net(img, label): ...@@ -74,18 +75,18 @@ def conv_net(img, label):
return loss_net(conv_pool_2, label) return loss_net(conv_pool_2, label)
def train(args, save_dirname=None): def train(nn_type, use_cuda, parallel, save_dirname):
print("recognize digits with args: {0}".format(" ".join(sys.argv[1:]))) if use_cuda and not fluid.core.is_compiled_with_cuda():
return
img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32') img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64') label = fluid.layers.data(name='label', shape=[1], dtype='int64')
if args.nn_type == 'mlp': if nn_type == 'mlp':
net_conf = mlp net_conf = mlp
else: else:
net_conf = conv_net net_conf = conv_net
if args.parallel: if parallel:
places = fluid.layers.get_places() places = fluid.layers.get_places()
pd = fluid.layers.ParallelDo(places) pd = fluid.layers.ParallelDo(places)
with pd.do(): with pd.do():
...@@ -107,7 +108,7 @@ def train(args, save_dirname=None): ...@@ -107,7 +108,7 @@ def train(args, save_dirname=None):
optimizer = fluid.optimizer.Adam(learning_rate=0.001) optimizer = fluid.optimizer.Adam(learning_rate=0.001)
optimizer.minimize(avg_loss) optimizer.minimize(avg_loss)
place = fluid.CUDAPlace(0) if args.use_cuda else fluid.CPUPlace() place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
...@@ -147,13 +148,14 @@ def train(args, save_dirname=None): ...@@ -147,13 +148,14 @@ def train(args, save_dirname=None):
'PassID {0:1}, BatchID {1:04}, Test Loss {2:2.2}, Acc {3:2.2}'. 'PassID {0:1}, BatchID {1:04}, Test Loss {2:2.2}, Acc {3:2.2}'.
format(pass_id, batch_id + 1, format(pass_id, batch_id + 1,
float(avg_loss_val), float(acc_val))) float(avg_loss_val), float(acc_val)))
raise AssertionError("Loss of recognize digits is too large")
def infer(args, save_dirname=None): def infer(use_cuda, save_dirname=None):
if save_dirname is None: if save_dirname is None:
return return
place = fluid.CUDAPlace(0) if args.use_cuda else fluid.CPUPlace() place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place) exe = fluid.Executor(place)
# Use fluid.io.load_inference_model to obtain the inference program desc, # Use fluid.io.load_inference_model to obtain the inference program desc,
...@@ -174,11 +176,48 @@ def infer(args, save_dirname=None): ...@@ -174,11 +176,48 @@ def infer(args, save_dirname=None):
print("infer results: ", results[0]) print("infer results: ", results[0])
if __name__ == '__main__': def main(use_cuda, parallel, nn_type):
args = parse_arg() if not use_cuda and not parallel:
if not args.use_cuda and not args.parallel: save_dirname = "recognize_digits_" + nn_type + ".inference.model"
save_dirname = "recognize_digits_" + args.nn_type + ".inference.model"
else: else:
save_dirname = None save_dirname = None
train(args, save_dirname)
infer(args, save_dirname) train(
nn_type=nn_type,
use_cuda=use_cuda,
parallel=parallel,
save_dirname=save_dirname)
infer(use_cuda=use_cuda, save_dirname=save_dirname)
class TestRecognizeDigits(unittest.TestCase):
pass
def inject_test_method(use_cuda, parallel, nn_type):
def __impl__(self):
prog = fluid.Program()
startup_prog = fluid.Program()
scope = fluid.core.Scope()
with fluid.scope_guard(scope):
with fluid.program_guard(prog, startup_prog):
main(use_cuda, parallel, nn_type)
fn = 'test_{0}_{1}_{2}'.format(nn_type, 'cuda'
if use_cuda else 'cpu', 'parallel'
if parallel else 'normal')
setattr(TestRecognizeDigits, fn, __impl__)
def inject_all_tests():
for use_cuda in (False, True):
for parallel in (False, True):
for nn_type in ('mlp', 'conv'):
inject_test_method(use_cuda, parallel, nn_type)
inject_all_tests()
if __name__ == '__main__':
unittest.main()
...@@ -223,6 +223,14 @@ class TestBook(unittest.TestCase): ...@@ -223,6 +223,14 @@ class TestBook(unittest.TestCase):
self.assertIsNotNone(layers.sequence_softmax(x=seq)) self.assertIsNotNone(layers.sequence_softmax(x=seq))
print(str(program)) print(str(program))
def test_softmax(self):
program = Program()
with program_guard(program):
data = layers.data(name='data', shape=[10], dtype='float32')
hid = layers.fc(input=data, size=20)
self.assertIsNotNone(layers.softmax(x=hid))
print(str(program))
def test_get_places(self): def test_get_places(self):
program = Program() program = Program()
with program_guard(program): with program_guard(program):
......
...@@ -19,6 +19,7 @@ import paddle.v2.fluid.layers as layers ...@@ -19,6 +19,7 @@ import paddle.v2.fluid.layers as layers
import numpy import numpy
from multiprocessing import Process from multiprocessing import Process
import os, sys import os, sys
import time
class TestRecvOp(unittest.TestCase): class TestRecvOp(unittest.TestCase):
...@@ -28,6 +29,7 @@ class TestRecvOp(unittest.TestCase): ...@@ -28,6 +29,7 @@ class TestRecvOp(unittest.TestCase):
p = Process(target=self.init_serv, args=(place, )) p = Process(target=self.init_serv, args=(place, ))
p.daemon = True p.daemon = True
p.start() p.start()
time.sleep(1)
self.init_client(place) self.init_client(place)
# FIXME(typhoonzero): find a way to gracefully shutdown the server. # FIXME(typhoonzero): find a way to gracefully shutdown the server.
os.system("kill -9 %d" % p.pid) os.system("kill -9 %d" % p.pid)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册