提交 89c9f79c 编写于 作者: W wanghaoshuang

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into adadelta

...@@ -26,7 +26,7 @@ lookup of rows. ...@@ -26,7 +26,7 @@ lookup of rows.
The following figure illustrates the multiplication of x with two The following figure illustrates the multiplication of x with two
non-zero elements, or say, two symbols, and a lookup table W: non-zero elements, or say, two symbols, and a lookup table W:
![lookup table](./lookup_table.png) ![lookup table](./src/lookup_table.png)
### The Backward Algorithm ### The Backward Algorithm
...@@ -42,7 +42,7 @@ or some more sophisticated algorithms that rely on both W' and W: ...@@ -42,7 +42,7 @@ or some more sophisticated algorithms that rely on both W' and W:
$$W = f(W, W')$$ $$W = f(W, W')$$
The following figure illustrates the backward pass of the lookup The following figure illustrates the backward pass of the lookup
operator: ![lookup table training](./lookup_table_training.png) operator: ![lookup table training](./src/lookup_table_training.png)
## Distributed Storage Service ## Distributed Storage Service
......
...@@ -103,7 +103,7 @@ In computability theory, a system of data-manipulation rules, such as a programm ...@@ -103,7 +103,7 @@ In computability theory, a system of data-manipulation rules, such as a programm
There are two ways to execute a Fluid program. When a program is executed, it creates a protobuf message [`ProgramDesc`](https://github.com/PaddlePaddle/Paddle/blob/a91efdde6910ce92a78e3aa7157412c4c88d9ee8/paddle/framework/framework.proto#L145) that describes the process and is conceptually like an [abstract syntax tree](https://en.wikipedia.org/wiki/Abstract_syntax_tree). There are two ways to execute a Fluid program. When a program is executed, it creates a protobuf message [`ProgramDesc`](https://github.com/PaddlePaddle/Paddle/blob/a91efdde6910ce92a78e3aa7157412c4c88d9ee8/paddle/framework/framework.proto#L145) that describes the process and is conceptually like an [abstract syntax tree](https://en.wikipedia.org/wiki/Abstract_syntax_tree).
There is a C++ class [`Executor`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/executor.h), which runs a `ProgramDesc`, similar to how an interpreter runs a Python program. There is a C++ class [`Executor`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/fluid/framework/executor.h), which runs a `ProgramDesc`, similar to how an interpreter runs a Python program.
Fluid is moving towards the direction of a compiler, which is explain in [fluid_compiler.md](fluid_compiler.md). Fluid is moving towards the direction of a compiler, which is explain in [fluid_compiler.md](fluid_compiler.md).
......
...@@ -871,3 +871,67 @@ TEST(ChannelHolder, ChannelHolderDestroyUnblocksSendersTest) { ...@@ -871,3 +871,67 @@ TEST(ChannelHolder, ChannelHolderDestroyUnblocksSendersTest) {
ch->Reset<int>(0); ch->Reset<int>(0);
ChannelHolderDestroyUnblockSenders(ch, false); ChannelHolderDestroyUnblockSenders(ch, false);
} }
// This tests that closing a channelholder many times.
void ChannelHolderManyTimesClose(ChannelHolder *ch) {
const int num_threads = 15;
std::thread t[num_threads];
bool thread_ended[num_threads];
// Launches threads that try to send data to channel.
for (size_t i = 0; i < num_threads / 3; i++) {
thread_ended[i] = false;
t[i] = std::thread(
[&](bool *ended) {
int data = 10;
ch->Send(&data);
*ended = true;
},
&thread_ended[i]);
}
// Launches threads that try to receive data to channel.
for (size_t i = num_threads / 3; i < 2 * num_threads / 3; i++) {
thread_ended[i] = false;
t[i] = std::thread(
[&](bool *p) {
int data;
if (ch->Receive(&data)) {
EXPECT_EQ(data, 10);
}
*p = true;
},
&thread_ended[i]);
}
// Launches threads that try to close the channel.
for (size_t i = 2 * num_threads / 3; i < num_threads; i++) {
thread_ended[i] = false;
t[i] = std::thread(
[&](bool *p) {
if (!ch->IsClosed()) {
ch->close();
}
*p = true;
},
&thread_ended[i]);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait
// Verify that all threads are unblocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true);
}
EXPECT_TRUE(ch->IsClosed());
// delete the channel
delete ch;
for (size_t i = 0; i < num_threads; i++) t[i].join();
}
TEST(ChannelHolder, ChannelHolderManyTimesCloseTest) {
// Check for Buffered Channel
ChannelHolder *ch = new ChannelHolder();
ch->Reset<int>(10);
ChannelHolderManyTimesClose(ch);
}
...@@ -43,7 +43,7 @@ math_library(sequence2batch) ...@@ -43,7 +43,7 @@ math_library(sequence2batch)
math_library(sequence_padding) math_library(sequence_padding)
math_library(sequence_pooling DEPS math_function) math_library(sequence_pooling DEPS math_function)
math_library(sequence_scale) math_library(sequence_scale)
math_library(softmax) math_library(softmax DEPS math_function)
math_library(unpooling) math_library(unpooling)
math_library(vol2col) math_library(vol2col)
......
...@@ -44,7 +44,7 @@ class ConcatFunctor<platform::CPUDeviceContext, T> { ...@@ -44,7 +44,7 @@ class ConcatFunctor<platform::CPUDeviceContext, T> {
out_cols += t_cols; out_cols += t_cols;
input_cols[i] = t_cols; input_cols[i] = t_cols;
} }
auto& cpu_place = boost::get<platform::CPUPlace>(context.GetPlace()); auto cpu_place = boost::get<platform::CPUPlace>(context.GetPlace());
// computation // computation
for (int k = 0; k < out_rows; ++k) { for (int k = 0; k < out_rows; ++k) {
...@@ -87,7 +87,7 @@ class ConcatGradFunctor<platform::CPUDeviceContext, T> { ...@@ -87,7 +87,7 @@ class ConcatGradFunctor<platform::CPUDeviceContext, T> {
input_cols += t_cols; input_cols += t_cols;
output_cols[i] = t_cols; output_cols[i] = t_cols;
} }
auto& cpu_place = boost::get<platform::CPUPlace>(context.GetPlace()); auto cpu_place = boost::get<platform::CPUPlace>(context.GetPlace());
// computation // computation
for (int k = 0; k < input_rows; ++k) { for (int k = 0; k < input_rows; ++k) {
......
...@@ -48,20 +48,24 @@ class DoubleBufferReader : public framework::DecoratedReader { ...@@ -48,20 +48,24 @@ class DoubleBufferReader : public framework::DecoratedReader {
void start_thread() { void start_thread() {
buffer_ = framework::MakeChannel<Item>(kDoubleBufferSize); buffer_ = framework::MakeChannel<Item>(kDoubleBufferSize);
std::thread prefetch([this] { PrefetchThreadFunc(); }); prefetcher_ = std::thread([this] { PrefetchThreadFunc(); });
prefetch.detach();
} }
void ReadNext(std::vector<framework::LoDTensor>* out) override; void ReadNext(std::vector<framework::LoDTensor>* out) override;
void ReInit() override; void ReInit() override;
~DoubleBufferReader() { buffer_->Close(); } ~DoubleBufferReader() {
buffer_->Close();
prefetcher_.join();
delete buffer_;
}
bool HasNext() const override; bool HasNext() const override;
private: private:
void PrefetchThreadFunc(); void PrefetchThreadFunc();
std::thread prefetcher_;
framework::Channel<Item>* buffer_; framework::Channel<Item>* buffer_;
platform::Place place_; platform::Place place_;
std::vector<std::unique_ptr<platform::DeviceContext>> ctxs_; std::vector<std::unique_ptr<platform::DeviceContext>> ctxs_;
...@@ -134,6 +138,8 @@ void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) { ...@@ -134,6 +138,8 @@ void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) {
void DoubleBufferReader::ReInit() { void DoubleBufferReader::ReInit() {
reader_->ReInit(); reader_->ReInit();
buffer_->Close(); buffer_->Close();
prefetcher_.join();
delete buffer_;
start_thread(); start_thread();
} }
...@@ -159,11 +165,12 @@ void DoubleBufferReader::PrefetchThreadFunc() { ...@@ -159,11 +165,12 @@ void DoubleBufferReader::PrefetchThreadFunc() {
if (!buffer_->Send(&batch)) { if (!buffer_->Send(&batch)) {
VLOG(5) << "WARNING: The double buffer channel has been closed. The " VLOG(5) << "WARNING: The double buffer channel has been closed. The "
"prefetch thread terminates."; "prefetch thread will terminate.";
break; break;
} }
} }
buffer_->Close(); buffer_->Close();
VLOG(5) << "Prefetch thread terminates.";
} }
bool DoubleBufferReader::HasNext() const { bool DoubleBufferReader::HasNext() const {
......
...@@ -34,6 +34,9 @@ class ShuffleReader : public framework::DecoratedReader { ...@@ -34,6 +34,9 @@ class ShuffleReader : public framework::DecoratedReader {
} }
void ReadNext(std::vector<framework::LoDTensor>* out) override { void ReadNext(std::vector<framework::LoDTensor>* out) override {
if (!HasNext()) {
PADDLE_THROW("There is no next data!");
}
if (iteration_pos_ >= buffer_.size()) { if (iteration_pos_ >= buffer_.size()) {
VLOG(10) << "Resetting shuffle buffer"; VLOG(10) << "Resetting shuffle buffer";
ReadIntoBuffers(); ReadIntoBuffers();
...@@ -50,7 +53,6 @@ class ShuffleReader : public framework::DecoratedReader { ...@@ -50,7 +53,6 @@ class ShuffleReader : public framework::DecoratedReader {
buffer_.clear(); buffer_.clear();
buffer_.reserve(buffer_size_); buffer_.reserve(buffer_size_);
iteration_pos_ = 0; iteration_pos_ = 0;
PADDLE_ENFORCE(reader_->HasNext());
for (size_t i = 0; i < buffer_size_; ++i) { for (size_t i = 0; i < buffer_size_; ++i) {
if (!reader_->HasNext()) { if (!reader_->HasNext()) {
break; break;
......
...@@ -131,7 +131,7 @@ def make_channel(dtype, capacity=0): ...@@ -131,7 +131,7 @@ def make_channel(dtype, capacity=0):
return channel return channel
def channel_send(channel, value): def channel_send(channel, value, copy=False):
""" """
Sends a value through a channel variable. Used by an unbuffered or buffered Sends a value through a channel variable. Used by an unbuffered or buffered
channel to pass data from within or to a concurrent Go block, where channel to pass data from within or to a concurrent Go block, where
...@@ -141,6 +141,8 @@ def channel_send(channel, value): ...@@ -141,6 +141,8 @@ def channel_send(channel, value):
channel (Variable|Channel): Channel variable created using channel (Variable|Channel): Channel variable created using
`make_channel`. `make_channel`.
value (Variable): Value to send to channel value (Variable): Value to send to channel
copy (bool): Copy data while channel send. If False, then data
is moved. The input cannot be used after move.
Returns: Returns:
Variable: The boolean status on whether or not the channel Variable: The boolean status on whether or not the channel
successfully sent the passed value. successfully sent the passed value.
...@@ -162,11 +164,26 @@ def channel_send(channel, value): ...@@ -162,11 +164,26 @@ def channel_send(channel, value):
type=core.VarDesc.VarType.LOD_TENSOR, type=core.VarDesc.VarType.LOD_TENSOR,
dtype=core.VarDesc.VarType.BOOL) dtype=core.VarDesc.VarType.BOOL)
X = value
if copy is True:
copied_X = helper.create_variable(
name=unique_name.generate(value.name + '_copy'),
type=value.type,
dtype=value.dtype,
shape=value.shape,
lod_level=value.lod_level,
capacity=value.capacity)
assign_op = channel_send_block.append_op(
type="assign_op", inputs={"X": value}, outputs={"Out": copied_X})
X = copied_X
channel_send_op = channel_send_block.append_op( channel_send_op = channel_send_block.append_op(
type="channel_send", type="channel_send",
inputs={ inputs={
"Channel": channel, "Channel": channel,
"X": value, "X": X,
}, },
outputs={"Status": status}) outputs={"Status": status})
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册