diff --git a/benchmark/fluid/machine_translation.py b/benchmark/fluid/machine_translation.py index d7a421c10979c3b9d6865a8c0b99a6410e0f46a8..adde5f21acd4e77d58a453d6868abeccfca4bb5a 100644 --- a/benchmark/fluid/machine_translation.py +++ b/benchmark/fluid/machine_translation.py @@ -21,7 +21,7 @@ import argparse import time import distutils.util -import paddle.v2 as paddle +import paddle import paddle.fluid as fluid import paddle.fluid.core as core import paddle.fluid.framework as framework diff --git a/benchmark/fluid/mnist.py b/benchmark/fluid/mnist.py index dc10ac2ec195acc9a5693718141ddb32417dfb71..1e2185dfac1072d1f1046f4616a9d53a8fc76061 100644 --- a/benchmark/fluid/mnist.py +++ b/benchmark/fluid/mnist.py @@ -20,7 +20,7 @@ import numpy as np import argparse import time -import paddle.v2 as paddle +import paddle import paddle.fluid as fluid import paddle.fluid.profiler as profiler diff --git a/benchmark/fluid/resnet.py b/benchmark/fluid/resnet.py index 1af5eaf6b46be47cb6b778cedcf53830c201ef39..831fa2c019fc2868cd85b1ca7b2c8c76a2f1628c 100644 --- a/benchmark/fluid/resnet.py +++ b/benchmark/fluid/resnet.py @@ -23,7 +23,7 @@ import time import cProfile, pstats, StringIO -import paddle.v2 as paddle +import paddle import paddle.fluid as fluid import paddle.fluid.core as core import paddle.fluid.profiler as profiler diff --git a/benchmark/fluid/stacked_dynamic_lstm.py b/benchmark/fluid/stacked_dynamic_lstm.py index 5fcbdd64af9dc196c9d5b2b82ce4213478ea1418..73bcc47b4d404af2c01d61ca3dfb11971bbcfe9c 100644 --- a/benchmark/fluid/stacked_dynamic_lstm.py +++ b/benchmark/fluid/stacked_dynamic_lstm.py @@ -23,10 +23,10 @@ import random import time import numpy -import paddle.v2 as paddle -import paddle.v2.dataset.imdb as imdb +import paddle +import paddle.dataset.imdb as imdb import paddle.fluid as fluid -from paddle.v2 import batch +import paddle.batch as batch import paddle.fluid.profiler as profiler diff --git a/benchmark/fluid/vgg.py b/benchmark/fluid/vgg.py index 9d990eff62ec368dc7033f55cc0862fa974a64e0..53e34e0cbd15914791c305db6797f826ebfae34e 100644 --- a/benchmark/fluid/vgg.py +++ b/benchmark/fluid/vgg.py @@ -17,7 +17,7 @@ from __future__ import print_function import sys import time import numpy as np -import paddle.v2 as paddle +import paddle import paddle.fluid as fluid import paddle.fluid.core as core import argparse diff --git a/doc/v2/api/data/dataset.rst b/doc/v2/api/data/dataset.rst index 02e41564b1e48c07da6ac071fc4b60089169e05a..e7c8be4452bf55e0967d750c2e624e8e316e9330 100644 --- a/doc/v2/api/data/dataset.rst +++ b/doc/v2/api/data/dataset.rst @@ -1,82 +1,82 @@ Dataset ======= -.. automodule:: paddle.v2.dataset +.. automodule:: paddle.dataset :members: :noindex: mnist +++++ -.. automodule:: paddle.v2.dataset.mnist +.. automodule:: paddle.dataset.mnist :members: :noindex: cifar +++++ -.. automodule:: paddle.v2.dataset.cifar +.. automodule:: paddle.dataset.cifar :members: :noindex: conll05 +++++++ -.. automodule:: paddle.v2.dataset.conll05 +.. automodule:: paddle.dataset.conll05 :members: get_dict,get_embedding,test :noindex: imdb ++++ -.. automodule:: paddle.v2.dataset.imdb +.. automodule:: paddle.dataset.imdb :members: :noindex: imikolov ++++++++ -.. automodule:: paddle.v2.dataset.imikolov +.. automodule:: paddle.dataset.imikolov :members: :noindex: movielens +++++++++ -.. automodule:: paddle.v2.dataset.movielens +.. automodule:: paddle.dataset.movielens :members: :noindex: -.. autoclass:: paddle.v2.dataset.movielens.MovieInfo +.. autoclass:: paddle.dataset.movielens.MovieInfo :noindex: - -.. autoclass:: paddle.v2.dataset.movielens.UserInfo + +.. autoclass:: paddle.dataset.movielens.UserInfo :noindex: sentiment +++++++++ -.. automodule:: paddle.v2.dataset.sentiment +.. automodule:: paddle.dataset.sentiment :members: :noindex: uci_housing +++++++++++ -.. automodule:: paddle.v2.dataset.uci_housing +.. automodule:: paddle.dataset.uci_housing :members: :noindex: wmt14 +++++ -.. automodule:: paddle.v2.dataset.wmt14 +.. automodule:: paddle.dataset.wmt14 :members: :noindex: wmt16 +++++ -.. automodule:: paddle.v2.dataset.wmt16 +.. automodule:: paddle.dataset.wmt16 :members: :noindex: diff --git a/paddle/fluid/inference/tensorrt/engine.cc b/paddle/fluid/inference/tensorrt/engine.cc index 276502e4999df3aecf246fcb9591c59ea9d3f02b..03a25f8e8b52b452cb621aeddf3563cc21a033df 100644 --- a/paddle/fluid/inference/tensorrt/engine.cc +++ b/paddle/fluid/inference/tensorrt/engine.cc @@ -17,6 +17,7 @@ limitations under the License. */ #include #include #include +#include #include "paddle/fluid/inference/tensorrt/helper.h" #include "paddle/fluid/platform/enforce.h" diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 616a89a4132a07d16ac9deae4ddbf7071ab9b1aa..88b07d865f74c9c4bdd8f2762518492d22d50e8a 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -241,9 +241,9 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor, auto optimize_prepared = executor->Prepare(*program, block_list); std::unordered_map> - grad_to_prepared_block; + grad_to_prepared_ctx; for (size_t i = 0; i < block_list.size(); ++i) { - grad_to_prepared_block[id_to_grad[block_list[i]]] = optimize_prepared[i]; + grad_to_prepared_ctx[id_to_grad[block_list[i]]] = optimize_prepared[i]; } VLOG(3) << "RunAsyncLoop into while"; @@ -254,9 +254,9 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor, for (auto iter = grad_to_queue.begin(); iter != grad_to_queue.end(); iter++) { std::string grad_name = iter->first; fs.push_back(framework::Async([grad_name, &exit_flag, &executor, - &grad_to_queue, &grad_to_prepared_block]() { + &grad_to_queue, &grad_to_prepared_ctx]() { AsyncUpdateThread(exit_flag, grad_to_queue[grad_name], executor, - grad_to_prepared_block[grad_name].get()); + grad_to_prepared_ctx[grad_name].get()); })); } diff --git a/paddle/fluid/operators/math/depthwise_conv.cu b/paddle/fluid/operators/math/depthwise_conv.cu index a5e6e4031bbaddc2d09c660d34a975b2a496bc63..d360728484a73ce844b4a36fbffd7dc631f8e786 100644 --- a/paddle/fluid/operators/math/depthwise_conv.cu +++ b/paddle/fluid/operators/math/depthwise_conv.cu @@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#include #include "paddle/fluid/operators/math/depthwise_conv.h" #include "paddle/fluid/platform/cuda_helper.h" diff --git a/paddle/fluid/operators/math/depthwise_conv.h b/paddle/fluid/operators/math/depthwise_conv.h index 081bda891d044041f6814964e7076e28e812039c..97aec401889a56d3fc9ac08e766d931bb3725b01 100644 --- a/paddle/fluid/operators/math/depthwise_conv.h +++ b/paddle/fluid/operators/math/depthwise_conv.h @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once +#include #include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/hostdevice.h" diff --git a/paddle/fluid/operators/math/detail/activation_functions.h b/paddle/fluid/operators/math/detail/activation_functions.h index d205ebf210818b91c3cf6d4563fca1e56702bcf9..b127fbe8c8515e7fe57b07ea1d4291675ec4efca 100644 --- a/paddle/fluid/operators/math/detail/activation_functions.h +++ b/paddle/fluid/operators/math/detail/activation_functions.h @@ -14,6 +14,7 @@ limitations under the License. */ #pragma once #include +#include #include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/hostdevice.h" diff --git a/paddle/fluid/operators/math/im2col_test.cc b/paddle/fluid/operators/math/im2col_test.cc index b3978536bca7dc276bf5417bed36761a2d496536..b497c2a66146eed7b4b39ead6b05a9977cd4be21 100644 --- a/paddle/fluid/operators/math/im2col_test.cc +++ b/paddle/fluid/operators/math/im2col_test.cc @@ -14,6 +14,7 @@ limitations under the License. */ #include "paddle/fluid/operators/math/im2col.h" #include +#include template void testIm2col() { diff --git a/paddle/fluid/operators/math/matmul.h b/paddle/fluid/operators/math/matmul.h index 6e2d35cd0f3581c742197a66842696a8e3a936b1..0006c5062f3639da589eea44d47917d879933615 100644 --- a/paddle/fluid/operators/math/matmul.h +++ b/paddle/fluid/operators/math/matmul.h @@ -13,6 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once +#include +#include #include "paddle/fluid/operators/math/math_function.h" namespace paddle { diff --git a/paddle/fluid/operators/math/sampler.cc b/paddle/fluid/operators/math/sampler.cc index 3ec6538d7fb0a902f468aa848c3064bd0260fabb..3066dc0ba284611af89c4927f45089a570ab88bc 100644 --- a/paddle/fluid/operators/math/sampler.cc +++ b/paddle/fluid/operators/math/sampler.cc @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include "sampler.h" +#include "paddle/fluid/operators/math/sampler.h" namespace paddle { namespace random { diff --git a/paddle/fluid/operators/math/selected_rows_functor_test.cc b/paddle/fluid/operators/math/selected_rows_functor_test.cc index 679b6568ad09ce3a42a7ad4222310711e707d9a8..70bed820ee58885861fa8c5535c931f258625572 100644 --- a/paddle/fluid/operators/math/selected_rows_functor_test.cc +++ b/paddle/fluid/operators/math/selected_rows_functor_test.cc @@ -13,41 +13,50 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/operators/math/selected_rows_functor.h" +#include #include "gtest/gtest.h" #include "paddle/fluid/operators/math/math_function.h" TEST(selected_rows_functor, cpu_add) { - using namespace paddle::framework; - using namespace paddle::platform; - using namespace paddle::operators::math; - - CPUPlace cpu_place; - CPUDeviceContext ctx(cpu_place); - SetConstant functor; + paddle::platform::CPUPlace cpu_place; + paddle::platform::CPUDeviceContext ctx(cpu_place); + paddle::operators::math::SetConstant + functor; int64_t height = 10; int64_t row_numel = 10; std::vector rows1{0, 4, 7}; - std::unique_ptr selected_rows1{new SelectedRows(rows1, height)}; + std::unique_ptr selected_rows1{ + new paddle::framework::SelectedRows(rows1, height)}; auto* in1_value = selected_rows1->mutable_value(); in1_value->mutable_data( - make_ddim({static_cast(rows1.size()), row_numel}), cpu_place); + paddle::framework::make_ddim( + {static_cast(rows1.size()), row_numel}), + cpu_place); functor(ctx, in1_value, 1.0); std::vector rows2{0, 5, 7, 9}; - std::unique_ptr selected_rows2{new SelectedRows(rows2, height)}; + std::unique_ptr selected_rows2{ + new paddle::framework::SelectedRows(rows2, height)}; auto* in2_value = selected_rows2->mutable_value(); in2_value->mutable_data( - make_ddim({static_cast(rows2.size()), row_numel}), cpu_place); + paddle::framework::make_ddim( + {static_cast(rows2.size()), row_numel}), + cpu_place); functor(ctx, in2_value, 2.0); - std::unique_ptr output{new SelectedRows()}; + std::unique_ptr output{ + new paddle::framework::SelectedRows()}; auto* out_value = output->mutable_value(); // simplely concat two SelectedRows - out_value->mutable_data(make_ddim({7, 10}), cpu_place); + out_value->mutable_data(paddle::framework::make_ddim({7, 10}), + cpu_place); - SelectedRowsAdd add_functor; + paddle::operators::math::SelectedRowsAdd + add_functor; add_functor(ctx, *selected_rows1, *selected_rows2, output.get()); auto out_height = output->height(); @@ -78,14 +87,20 @@ TEST(selected_rows_functor, cpu_add) { EXPECT_EQ(out_data[5 * row_numel + 7], 2.0); EXPECT_EQ(out_data[6 * row_numel + 9], 2.0); - std::unique_ptr tensor1{new Tensor()}; - tensor1->mutable_data(make_ddim({height, row_numel}), cpu_place); + std::unique_ptr tensor1{ + new paddle::framework::Tensor()}; + tensor1->mutable_data( + paddle::framework::make_ddim({height, row_numel}), cpu_place); functor(ctx, tensor1.get(), 3.0); - std::unique_ptr tensor2{new Tensor()}; - tensor2->mutable_data(make_ddim({height, row_numel}), cpu_place); + std::unique_ptr tensor2{ + new paddle::framework::Tensor()}; + tensor2->mutable_data( + paddle::framework::make_ddim({height, row_numel}), cpu_place); - SelectedRowsAddTensor add_tensor_functor; + paddle::operators::math::SelectedRowsAddTensor< + paddle::platform::CPUDeviceContext, float> + add_tensor_functor; add_tensor_functor(ctx, *output, *tensor1, tensor2.get()); auto* tensor2_data = tensor2->data(); @@ -106,38 +121,46 @@ TEST(selected_rows_functor, cpu_add) { } TEST(selected_rows_functor, cpu_add_to) { - using namespace paddle::framework; - using namespace paddle::platform; - using namespace paddle::operators::math; - - CPUPlace cpu_place; - CPUDeviceContext ctx(cpu_place); - SetConstant functor; + paddle::platform::CPUPlace cpu_place; + paddle::platform::CPUDeviceContext ctx(cpu_place); + paddle::operators::math::SetConstant + functor; int64_t height = 10; int64_t row_numel = 10; std::vector rows1{0, 4, 7}; - std::unique_ptr selected_rows1{new SelectedRows(rows1, height)}; + std::unique_ptr selected_rows1{ + new paddle::framework::SelectedRows(rows1, height)}; auto* in1_value = selected_rows1->mutable_value(); in1_value->mutable_data( - make_ddim({static_cast(rows1.size()), row_numel}), cpu_place); + paddle::framework::make_ddim( + {static_cast(rows1.size()), row_numel}), + cpu_place); functor(ctx, in1_value, 1.0); std::vector rows2{0, 5, 7, 9}; - std::unique_ptr selected_rows2{new SelectedRows(rows2, height)}; + std::unique_ptr selected_rows2{ + new paddle::framework::SelectedRows(rows2, height)}; auto* in2_value = selected_rows2->mutable_value(); in2_value->mutable_data( - make_ddim({static_cast(rows2.size()), row_numel}), cpu_place); + paddle::framework::make_ddim( + {static_cast(rows2.size()), row_numel}), + cpu_place); functor(ctx, in2_value, 2.0); - std::unique_ptr output{new SelectedRows()}; + std::unique_ptr output{ + new paddle::framework::SelectedRows()}; output->set_height(height); auto* out_value = output->mutable_value(); // simplely concat two SelectedRows - out_value->mutable_data(make_ddim({7, 10}), cpu_place); + out_value->mutable_data(paddle::framework::make_ddim({7, 10}), + cpu_place); - SelectedRowsAddTo add_to_functor; + paddle::operators::math::SelectedRowsAddTo + add_to_functor; add_to_functor(ctx, *selected_rows1, 0, output.get()); add_to_functor(ctx, *selected_rows2, in1_value->numel(), output.get()); @@ -169,11 +192,15 @@ TEST(selected_rows_functor, cpu_add_to) { EXPECT_EQ(out_data[5 * row_numel + 7], 2.0); EXPECT_EQ(out_data[6 * row_numel + 9], 2.0); - std::unique_ptr tensor1{new Tensor()}; - tensor1->mutable_data(make_ddim({height, row_numel}), cpu_place); + std::unique_ptr tensor1{ + new paddle::framework::Tensor()}; + tensor1->mutable_data( + paddle::framework::make_ddim({height, row_numel}), cpu_place); functor(ctx, tensor1.get(), 3.0); - SelectedRowsAddToTensor add_to_tensor_functor; + paddle::operators::math::SelectedRowsAddToTensor< + paddle::platform::CPUDeviceContext, float> + add_to_tensor_functor; add_to_tensor_functor(ctx, *output, tensor1.get()); auto* tensor1_data = tensor1->data(); diff --git a/paddle/fluid/operators/math/sequence_pooling.cc b/paddle/fluid/operators/math/sequence_pooling.cc index 5ae42ab973c81d3794fbbbe088e37ab02168c8dc..f25d3d3f1ee1f89d46b8e7c88ca68048f5203544 100644 --- a/paddle/fluid/operators/math/sequence_pooling.cc +++ b/paddle/fluid/operators/math/sequence_pooling.cc @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/operators/math/sequence_pooling.h" +#include #include "paddle/fluid/operators/math/math_function.h" namespace paddle { diff --git a/paddle/fluid/operators/math/vol2col.cc b/paddle/fluid/operators/math/vol2col.cc index 09e9f85cca349be1dc46a3f3a0b2d919485d6fa1..e92adc09ba01b032aba8eba94bcb4ba96524c641 100644 --- a/paddle/fluid/operators/math/vol2col.cc +++ b/paddle/fluid/operators/math/vol2col.cc @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/operators/math/vol2col.h" +#include namespace paddle { namespace operators { diff --git a/paddle/fluid/operators/math/vol2col.cu b/paddle/fluid/operators/math/vol2col.cu index 619730d394d075d05a016e421fd75c4549015216..e0f3ef36879327c0592bb955dd800b44b228e721 100644 --- a/paddle/fluid/operators/math/vol2col.cu +++ b/paddle/fluid/operators/math/vol2col.cu @@ -12,6 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#include +#include #include "paddle/fluid/operators/math/vol2col.h" #include "paddle/fluid/platform/cuda_helper.h" diff --git a/paddle/fluid/operators/math/vol2col_test.cc b/paddle/fluid/operators/math/vol2col_test.cc index eb91f862e39d9f158200f69fd48e7245dde47171..b4e4186c986605b4fd14b10cadab81f07be4b27b 100644 --- a/paddle/fluid/operators/math/vol2col_test.cc +++ b/paddle/fluid/operators/math/vol2col_test.cc @@ -15,6 +15,7 @@ limitations under the License. */ #include "paddle/fluid/operators/math/vol2col.h" #include #include +#include template void testVol2col() { diff --git a/paddle/fluid/operators/reader/CMakeLists.txt b/paddle/fluid/operators/reader/CMakeLists.txt index 845528860f91d0b479bb3c4dbbe05e32c68dc16f..3106978eb0149b14849dfd1aaad8bbe76791f2f6 100644 --- a/paddle/fluid/operators/reader/CMakeLists.txt +++ b/paddle/fluid/operators/reader/CMakeLists.txt @@ -23,5 +23,7 @@ reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_o reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc) reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc) reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc) + +cc_test(reader_blocking_queue_test SRCS reader_blocking_queue_test.cc) # Export local libraries to parent set(READER_LIBRARY ${LOCAL_READER_LIBS} PARENT_SCOPE) diff --git a/paddle/fluid/operators/reader/blocking_queue.h b/paddle/fluid/operators/reader/blocking_queue.h new file mode 100644 index 0000000000000000000000000000000000000000..71684b14176edc8f71efbefa9a7decffc8f3011e --- /dev/null +++ b/paddle/fluid/operators/reader/blocking_queue.h @@ -0,0 +1,112 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include // NOLINT +#include + +#include "paddle/fluid/platform/enforce.h" + +namespace paddle { +namespace operators { +namespace reader { + +template +class BlockingQueue { + // BlockingQueue is for buffered reading and is supposed to use only the + // reader package. It is true that we could and we should have been using + // framework::Channel, but which has currently a deadlock bug. BlockingQueue + // is a workaround and a simplified version of framework::Channel as it + // doesn't support GPU and it implements on buffered blocking queue. + public: + explicit BlockingQueue(size_t capacity) + : capacity_(capacity), closed_(false) { + PADDLE_ENFORCE_GT( + capacity_, 0, + "The capacity of a reader::BlockingQueue must be greater than 0."); + } + + bool Send(const T& elem) { + std::unique_lock lock(mutex_); + send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); + if (closed_) { + VLOG(5) + << "WARNING: Sending an element to a closed reader::BlokcingQueue."; + return false; + } + PADDLE_ENFORCE_LT(queue_.size(), capacity_); + queue_.push_back(elem); + receive_cv_.notify_one(); + return true; + } + + bool Send(T&& elem) { + std::unique_lock lock(mutex_); + send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); + if (closed_) { + VLOG(5) + << "WARNING: Sending an element to a closed reader::BlokcingQueue."; + return false; + } + PADDLE_ENFORCE_LT(queue_.size(), capacity_); + queue_.emplace_back(std::move(elem)); + receive_cv_.notify_one(); + return true; + } + + bool Receive(T* elem) { + std::unique_lock lock(mutex_); + receive_cv_.wait(lock, [&] { return !queue_.empty() || closed_; }); + if (!queue_.empty()) { + PADDLE_ENFORCE_NOT_NULL(elem); + *elem = queue_.front(); + queue_.pop_front(); + send_cv_.notify_one(); + return true; + } else { + PADDLE_ENFORCE(closed_); + return false; + } + } + + void Close() { + std::lock_guard lock(mutex_); + closed_ = true; + send_cv_.notify_all(); + receive_cv_.notify_all(); + } + + bool IsClosed() { + std::lock_guard lock(mutex_); + return closed_; + } + + size_t Cap() { + std::lock_guard lock(mutex_); + return capacity_; + } + + private: + size_t capacity_; + bool closed_; + std::deque queue_; + + std::mutex mutex_; + std::condition_variable receive_cv_; + std::condition_variable send_cv_; +}; +} // namespace reader +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index 4372f23fc1dbd85e43b04a9d644977392316c2e9..3fdc31dfa5242b6487c308d395d70d7ff348bc73 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -14,7 +14,7 @@ #include // NOLINT -#include "paddle/fluid/framework/channel.h" +#include "paddle/fluid/operators/reader/blocking_queue.h" #include "paddle/fluid/operators/reader/reader_op_registry.h" namespace paddle { @@ -23,13 +23,13 @@ namespace reader { // 'Double buffer' means we shall maintain two batches of input data at the same // time. So the kCacheSize shoul be at least 2. -static constexpr size_t kCacheSize = 2; +static constexpr size_t kCacheSize = 3; // There will be two bacthes out of the channel during training: // 1. the one waiting to be sent to the channel // 2. the one just be received from the channel, which is also being used by // subsequent operators. // So the channel size should be kChacheSize - 2 -static constexpr size_t kChannelSize = 0; // kCacheSize - 2 +static constexpr size_t kChannelSize = 1; // kCacheSize - 2 class DoubleBufferReader : public framework::DecoratedReader { public: @@ -55,10 +55,8 @@ class DoubleBufferReader : public framework::DecoratedReader { ~DoubleBufferReader() { EndPrefetcher(); } private: - bool HasNext() const; - void StartPrefetcher() { - channel_ = framework::MakeChannel(kChannelSize); + channel_ = new reader::BlockingQueue(kChannelSize); prefetcher_ = std::thread([this] { PrefetchThreadFunc(); }); } @@ -74,7 +72,7 @@ class DoubleBufferReader : public framework::DecoratedReader { void PrefetchThreadFunc(); std::thread prefetcher_; - framework::Channel* channel_; + reader::BlockingQueue* channel_; platform::Place place_; std::vector> cpu_tensor_cache_; std::vector> gpu_tensor_cache_; @@ -139,17 +137,16 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase { }; void DoubleBufferReader::ReadNext(std::vector* out) { - out->clear(); - if (HasNext()) { - size_t cached_tensor_id; - channel_->Receive(&cached_tensor_id); + size_t cached_tensor_id; + if (channel_->Receive(&cached_tensor_id)) { if (platform::is_gpu_place(place_)) { *out = gpu_tensor_cache_[cached_tensor_id]; - ctxs_[cached_tensor_id]->Wait(); } else { // CPU place *out = cpu_tensor_cache_[cached_tensor_id]; } + } else { + out->clear(); } } @@ -159,12 +156,6 @@ void DoubleBufferReader::ReInit() { StartPrefetcher(); } -bool DoubleBufferReader::HasNext() const { - while (!channel_->IsClosed() && !channel_->CanReceive()) { - } - return channel_->CanReceive(); -} - void DoubleBufferReader::PrefetchThreadFunc() { VLOG(5) << "A new prefetch thread starts."; size_t cached_tensor_id = 0; @@ -185,10 +176,7 @@ void DoubleBufferReader::PrefetchThreadFunc() { gpu_batch[i].set_lod(cpu_batch[i].lod()); } } - try { - size_t tmp = cached_tensor_id; - channel_->Send(&tmp); - } catch (paddle::platform::EnforceNotMet e) { + if (!channel_->Send(cached_tensor_id)) { VLOG(5) << "WARNING: The double buffer channel has been closed. The " "prefetch thread will terminate."; break; diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index 779dc8a6a0deb7792e0540071e3a2588102fa708..91ad7d56583446ee4686e74187de166f387125df 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -14,7 +14,7 @@ #include // NOLINT -#include "paddle/fluid/framework/channel.h" +#include "paddle/fluid/operators/reader/blocking_queue.h" #include "paddle/fluid/operators/reader/reader_op_registry.h" namespace paddle { @@ -37,7 +37,6 @@ class MultiFileReader : public framework::ReaderBase { ~MultiFileReader() { EndScheduler(); } private: - bool HasNext(); void StartNewScheduler(); void EndScheduler(); void ScheduleThreadFunc(); @@ -48,15 +47,14 @@ class MultiFileReader : public framework::ReaderBase { std::thread scheduler_; std::vector prefetchers_; size_t buffer_size_; - framework::Channel* waiting_file_idx_; - framework::Channel* available_thread_idx_; - framework::Channel>* buffer_; + reader::BlockingQueue* waiting_file_idx_; + reader::BlockingQueue* available_thread_idx_; + reader::BlockingQueue>* buffer_; }; void MultiFileReader::ReadNext(std::vector* out) { - out->clear(); - if (HasNext()) { - buffer_->Receive(out); + if (!buffer_->Receive(out)) { + out->clear(); } } @@ -65,25 +63,19 @@ void MultiFileReader::ReInit() { StartNewScheduler(); } -bool MultiFileReader::HasNext() { - while (!buffer_->IsClosed() && !buffer_->CanReceive()) { - } - return buffer_->CanReceive(); -} - void MultiFileReader::StartNewScheduler() { size_t thread_num = prefetchers_.size(); - waiting_file_idx_ = framework::MakeChannel(file_names_.size()); - available_thread_idx_ = framework::MakeChannel(thread_num); - buffer_ = - framework::MakeChannel>(buffer_size_); + waiting_file_idx_ = new reader::BlockingQueue(file_names_.size()); + available_thread_idx_ = new reader::BlockingQueue(thread_num); + buffer_ = new reader::BlockingQueue>( + buffer_size_); for (size_t i = 0; i < file_names_.size(); ++i) { - waiting_file_idx_->Send(&i); + waiting_file_idx_->Send(i); } waiting_file_idx_->Close(); for (size_t i = 0; i < thread_num; ++i) { - available_thread_idx_->Send(&i); + available_thread_idx_->Send(i); } scheduler_ = std::thread([this] { ScheduleThreadFunc(); }); @@ -149,7 +141,7 @@ void MultiFileReader::PrefetchThreadFunc(std::string file_name, break; } try { - buffer_->Send(&ins); + buffer_->Send(std::move(ins)); } catch (paddle::platform::EnforceNotMet e) { VLOG(5) << "WARNING: The buffer channel has been closed. The prefetch " "thread of file '" @@ -158,9 +150,7 @@ void MultiFileReader::PrefetchThreadFunc(std::string file_name, } } - try { - available_thread_idx_->Send(&thread_idx); - } catch (paddle::platform::EnforceNotMet e) { + if (!available_thread_idx_->Send(thread_idx)) { VLOG(5) << "WARNING: The available_thread_idx_ channel has been closed. " "Fail to send thread_idx."; } diff --git a/paddle/fluid/operators/reader/reader_blocking_queue_test.cc b/paddle/fluid/operators/reader/reader_blocking_queue_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..7d1b381d56c8cdc1e79e594b18c1a1ed59ab5284 --- /dev/null +++ b/paddle/fluid/operators/reader/reader_blocking_queue_test.cc @@ -0,0 +1,219 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include // NOLINT +#include +#include // NOLINT +#include +#include "gtest/gtest.h" + +#include "paddle/fluid/operators/reader/blocking_queue.h" + +using paddle::operators::reader::BlockingQueue; + +TEST(BlockingQueue, CapacityTest) { + size_t cap = 10; + BlockingQueue q(cap); + EXPECT_EQ(q.Cap(), cap); +} + +void FirstInFirstOut(size_t queue_cap, size_t elem_num, size_t send_time_gap, + size_t receive_time_gap) { + BlockingQueue q(queue_cap); + std::thread sender([&]() { + for (size_t i = 0; i < elem_num; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(send_time_gap)); + EXPECT_TRUE(q.Send(i)); + } + q.Close(); + }); + size_t count = 0; + while (true) { + std::this_thread::sleep_for(std::chrono::milliseconds(receive_time_gap)); + size_t elem; + if (!q.Receive(&elem)) { + break; + } + EXPECT_EQ(elem, count++); + } + sender.join(); + EXPECT_EQ(count, elem_num); + EXPECT_TRUE(q.IsClosed()); +} + +TEST(BlockingQueue, FirstInFirstOutTest) { + FirstInFirstOut(2, 5, 2, 50); + FirstInFirstOut(2, 5, 50, 2); + FirstInFirstOut(10, 3, 50, 2); + FirstInFirstOut(10, 3, 2, 50); +} + +TEST(BlockingQueue, SenderBlockingTest) { + const size_t queue_cap = 2; + BlockingQueue q(queue_cap); + size_t send_count = 0; + std::thread sender([&]() { + for (size_t i = 0; i < 5; ++i) { + if (!q.Send(i)) { + break; + } + ++send_count; + } + }); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + q.Close(); + sender.join(); + EXPECT_EQ(send_count, queue_cap); + std::vector res; + while (true) { + size_t elem; + if (!q.Receive(&elem)) { + break; + } + res.push_back(elem); + } + EXPECT_EQ(res.size(), queue_cap); + for (size_t i = 0; i < res.size(); ++i) { + EXPECT_EQ(res[i], i); + } +} + +TEST(BlockingQueue, ReceiverBlockingTest) { + const size_t queue_cap = 5; + BlockingQueue q(queue_cap); + std::vector receive_res; + std::thread receiver([&]() { + size_t elem; + while (true) { + if (!q.Receive(&elem)) { + break; + } + receive_res.push_back(elem); + } + }); + std::vector to_send{2, 1, 7}; + for (auto e : to_send) { + q.Send(e); + } + q.Close(); + receiver.join(); + EXPECT_EQ(receive_res.size(), to_send.size()); + for (size_t i = 0; i < to_send.size(); ++i) { + EXPECT_EQ(receive_res[i], to_send[i]); + } +} + +void CheckIsUnorderedSame(const std::vector>& v1, + const std::vector>& v2) { + std::set s1; + std::set s2; + for (auto vec : v1) { + for (size_t elem : vec) { + s1.insert(elem); + } + } + for (auto vec : v2) { + for (size_t elem : vec) { + s2.insert(elem); + } + } + EXPECT_EQ(s1.size(), s2.size()); + auto it1 = s1.begin(); + auto it2 = s2.begin(); + while (it1 != s1.end()) { + EXPECT_EQ(*it1, *it2); + ++it1; + ++it2; + } +} + +void MultiSenderMultiReceiver(const size_t queue_cap, + const std::vector>& to_send, + size_t receiver_num, size_t send_time_gap, + size_t receive_time_gap) { + BlockingQueue q(queue_cap); + size_t sender_num = to_send.size(); + std::vector senders; + for (size_t s_idx = 0; s_idx < sender_num; ++s_idx) { + senders.emplace_back(std::thread([&, s_idx] { + for (size_t elem : to_send[s_idx]) { + std::this_thread::sleep_for(std::chrono::milliseconds(send_time_gap)); + EXPECT_TRUE(q.Send(elem)); + } + })); + } + std::vector receivers; + std::mutex mu; + std::vector> res; + for (size_t r_idx = 0; r_idx < receiver_num; ++r_idx) { + receivers.emplace_back(std::thread([&] { + std::vector receiver_res; + while (true) { + std::this_thread::sleep_for( + std::chrono::milliseconds(receive_time_gap)); + size_t elem; + if (!q.Receive(&elem)) { + break; + } + receiver_res.push_back(elem); + } + std::lock_guard lock(mu); + res.push_back(receiver_res); + })); + } + for (auto& t : senders) { + t.join(); + } + q.Close(); + for (auto& t : receivers) { + t.join(); + } + CheckIsUnorderedSame(to_send, res); +} + +TEST(BlockingQueue, MultiSenderMultiReaderTest) { + std::vector> to_send_1{{2, 3, 4}, {9}, {0, 7, 15, 6}}; + MultiSenderMultiReceiver(2, to_send_1, 2, 0, 0); + MultiSenderMultiReceiver(10, to_send_1, 2, 0, 0); + MultiSenderMultiReceiver(2, to_send_1, 20, 0, 0); + MultiSenderMultiReceiver(2, to_send_1, 2, 50, 0); + MultiSenderMultiReceiver(2, to_send_1, 2, 0, 50); + + std::vector> to_send_2{ + {2, 3, 4}, {}, {0, 7, 15, 6, 9, 32}}; + MultiSenderMultiReceiver(2, to_send_2, 3, 0, 0); + MultiSenderMultiReceiver(20, to_send_2, 3, 0, 0); + MultiSenderMultiReceiver(2, to_send_2, 30, 0, 0); + MultiSenderMultiReceiver(2, to_send_2, 3, 50, 0); + MultiSenderMultiReceiver(2, to_send_2, 3, 0, 50); +} + +struct MyClass { + MyClass() : val_(0) {} + explicit MyClass(int val) : val_(val) {} + MyClass(const MyClass& b) { val_ = b.val_; } + MyClass(MyClass&& b) { val_ = b.val_; } + void operator=(const MyClass& b) { val_ = b.val_; } + + int val_; +}; + +TEST(BlockingQueue, MyClassTest) { + BlockingQueue q(2); + MyClass a(200); + q.Send(std::move(a)); + MyClass b; + q.Receive(&b); + EXPECT_EQ(a.val_, b.val_); +} diff --git a/paddle/fluid/operators/reader/reader_op_registry.cc b/paddle/fluid/operators/reader/reader_op_registry.cc index fc8dc747ff0c2286f4516d8350f75d9887361924..3ff4536819b128d9c593b97f4942a0292a3b6b36 100644 --- a/paddle/fluid/operators/reader/reader_op_registry.cc +++ b/paddle/fluid/operators/reader/reader_op_registry.cc @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "reader_op_registry.h" +#include "paddle/fluid/operators/reader/reader_op_registry.h" +#include +#include namespace paddle { namespace operators {