提交 b0581899 编写于 作者: Q qiaolongfei

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into use-multi-thread-todo-update

......@@ -21,7 +21,7 @@ import argparse
import time
import distutils.util
import paddle.v2 as paddle
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.framework as framework
......
......@@ -20,7 +20,7 @@ import numpy as np
import argparse
import time
import paddle.v2 as paddle
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
......
......@@ -23,7 +23,7 @@ import time
import cProfile, pstats, StringIO
import paddle.v2 as paddle
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.profiler as profiler
......
......@@ -23,10 +23,10 @@ import random
import time
import numpy
import paddle.v2 as paddle
import paddle.v2.dataset.imdb as imdb
import paddle
import paddle.dataset.imdb as imdb
import paddle.fluid as fluid
from paddle.v2 import batch
import paddle.batch as batch
import paddle.fluid.profiler as profiler
......
......@@ -17,7 +17,7 @@ from __future__ import print_function
import sys
import time
import numpy as np
import paddle.v2 as paddle
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
import argparse
......
Dataset
=======
.. automodule:: paddle.v2.dataset
.. automodule:: paddle.dataset
:members:
:noindex:
mnist
+++++
.. automodule:: paddle.v2.dataset.mnist
.. automodule:: paddle.dataset.mnist
:members:
:noindex:
cifar
+++++
.. automodule:: paddle.v2.dataset.cifar
.. automodule:: paddle.dataset.cifar
:members:
:noindex:
conll05
+++++++
.. automodule:: paddle.v2.dataset.conll05
.. automodule:: paddle.dataset.conll05
:members: get_dict,get_embedding,test
:noindex:
imdb
++++
.. automodule:: paddle.v2.dataset.imdb
.. automodule:: paddle.dataset.imdb
:members:
:noindex:
imikolov
++++++++
.. automodule:: paddle.v2.dataset.imikolov
.. automodule:: paddle.dataset.imikolov
:members:
:noindex:
movielens
+++++++++
.. automodule:: paddle.v2.dataset.movielens
.. automodule:: paddle.dataset.movielens
:members:
:noindex:
.. autoclass:: paddle.v2.dataset.movielens.MovieInfo
.. autoclass:: paddle.dataset.movielens.MovieInfo
:noindex:
.. autoclass:: paddle.v2.dataset.movielens.UserInfo
.. autoclass:: paddle.dataset.movielens.UserInfo
:noindex:
sentiment
+++++++++
.. automodule:: paddle.v2.dataset.sentiment
.. automodule:: paddle.dataset.sentiment
:members:
:noindex:
uci_housing
+++++++++++
.. automodule:: paddle.v2.dataset.uci_housing
.. automodule:: paddle.dataset.uci_housing
:members:
:noindex:
wmt14
+++++
.. automodule:: paddle.v2.dataset.wmt14
.. automodule:: paddle.dataset.wmt14
:members:
:noindex:
wmt16
+++++
.. automodule:: paddle.v2.dataset.wmt16
.. automodule:: paddle.dataset.wmt16
:members:
:noindex:
......@@ -17,6 +17,7 @@ limitations under the License. */
#include <NvInfer.h>
#include <cuda.h>
#include <glog/logging.h>
#include <string>
#include "paddle/fluid/inference/tensorrt/helper.h"
#include "paddle/fluid/platform/enforce.h"
......
......@@ -241,9 +241,9 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor,
auto optimize_prepared = executor->Prepare(*program, block_list);
std::unordered_map<std::string,
std::shared_ptr<framework::ExecutorPrepareContext>>
grad_to_prepared_block;
grad_to_prepared_ctx;
for (size_t i = 0; i < block_list.size(); ++i) {
grad_to_prepared_block[id_to_grad[block_list[i]]] = optimize_prepared[i];
grad_to_prepared_ctx[id_to_grad[block_list[i]]] = optimize_prepared[i];
}
VLOG(3) << "RunAsyncLoop into while";
......@@ -254,9 +254,9 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor,
for (auto iter = grad_to_queue.begin(); iter != grad_to_queue.end(); iter++) {
std::string grad_name = iter->first;
fs.push_back(framework::Async([grad_name, &exit_flag, &executor,
&grad_to_queue, &grad_to_prepared_block]() {
&grad_to_queue, &grad_to_prepared_ctx]() {
AsyncUpdateThread(exit_flag, grad_to_queue[grad_name], executor,
grad_to_prepared_block[grad_name].get());
grad_to_prepared_ctx[grad_name].get());
}));
}
......
......@@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <vector>
#include "paddle/fluid/operators/math/depthwise_conv.h"
#include "paddle/fluid/platform/cuda_helper.h"
......
......@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <vector>
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/hostdevice.h"
......
......@@ -14,6 +14,7 @@ limitations under the License. */
#pragma once
#include <math.h>
#include <string>
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/hostdevice.h"
......
......@@ -14,6 +14,7 @@ limitations under the License. */
#include "paddle/fluid/operators/math/im2col.h"
#include <gtest/gtest.h>
#include <vector>
template <typename DeviceContext, typename Place>
void testIm2col() {
......
......@@ -13,6 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <algorithm>
#include <vector>
#include "paddle/fluid/operators/math/math_function.h"
namespace paddle {
......
......@@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "sampler.h"
#include "paddle/fluid/operators/math/sampler.h"
namespace paddle {
namespace random {
......
......@@ -13,41 +13,50 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/math/selected_rows_functor.h"
#include <vector>
#include "gtest/gtest.h"
#include "paddle/fluid/operators/math/math_function.h"
TEST(selected_rows_functor, cpu_add) {
using namespace paddle::framework;
using namespace paddle::platform;
using namespace paddle::operators::math;
CPUPlace cpu_place;
CPUDeviceContext ctx(cpu_place);
SetConstant<CPUDeviceContext, float> functor;
paddle::platform::CPUPlace cpu_place;
paddle::platform::CPUDeviceContext ctx(cpu_place);
paddle::operators::math::SetConstant<paddle::platform::CPUDeviceContext,
float>
functor;
int64_t height = 10;
int64_t row_numel = 10;
std::vector<int64_t> rows1{0, 4, 7};
std::unique_ptr<SelectedRows> selected_rows1{new SelectedRows(rows1, height)};
std::unique_ptr<paddle::framework::SelectedRows> selected_rows1{
new paddle::framework::SelectedRows(rows1, height)};
auto* in1_value = selected_rows1->mutable_value();
in1_value->mutable_data<float>(
make_ddim({static_cast<int64_t>(rows1.size()), row_numel}), cpu_place);
paddle::framework::make_ddim(
{static_cast<int64_t>(rows1.size()), row_numel}),
cpu_place);
functor(ctx, in1_value, 1.0);
std::vector<int64_t> rows2{0, 5, 7, 9};
std::unique_ptr<SelectedRows> selected_rows2{new SelectedRows(rows2, height)};
std::unique_ptr<paddle::framework::SelectedRows> selected_rows2{
new paddle::framework::SelectedRows(rows2, height)};
auto* in2_value = selected_rows2->mutable_value();
in2_value->mutable_data<float>(
make_ddim({static_cast<int64_t>(rows2.size()), row_numel}), cpu_place);
paddle::framework::make_ddim(
{static_cast<int64_t>(rows2.size()), row_numel}),
cpu_place);
functor(ctx, in2_value, 2.0);
std::unique_ptr<SelectedRows> output{new SelectedRows()};
std::unique_ptr<paddle::framework::SelectedRows> output{
new paddle::framework::SelectedRows()};
auto* out_value = output->mutable_value();
// simplely concat two SelectedRows
out_value->mutable_data<float>(make_ddim({7, 10}), cpu_place);
out_value->mutable_data<float>(paddle::framework::make_ddim({7, 10}),
cpu_place);
SelectedRowsAdd<CPUDeviceContext, float> add_functor;
paddle::operators::math::SelectedRowsAdd<paddle::platform::CPUDeviceContext,
float>
add_functor;
add_functor(ctx, *selected_rows1, *selected_rows2, output.get());
auto out_height = output->height();
......@@ -78,14 +87,20 @@ TEST(selected_rows_functor, cpu_add) {
EXPECT_EQ(out_data[5 * row_numel + 7], 2.0);
EXPECT_EQ(out_data[6 * row_numel + 9], 2.0);
std::unique_ptr<Tensor> tensor1{new Tensor()};
tensor1->mutable_data<float>(make_ddim({height, row_numel}), cpu_place);
std::unique_ptr<paddle::framework::Tensor> tensor1{
new paddle::framework::Tensor()};
tensor1->mutable_data<float>(
paddle::framework::make_ddim({height, row_numel}), cpu_place);
functor(ctx, tensor1.get(), 3.0);
std::unique_ptr<Tensor> tensor2{new Tensor()};
tensor2->mutable_data<float>(make_ddim({height, row_numel}), cpu_place);
std::unique_ptr<paddle::framework::Tensor> tensor2{
new paddle::framework::Tensor()};
tensor2->mutable_data<float>(
paddle::framework::make_ddim({height, row_numel}), cpu_place);
SelectedRowsAddTensor<CPUDeviceContext, float> add_tensor_functor;
paddle::operators::math::SelectedRowsAddTensor<
paddle::platform::CPUDeviceContext, float>
add_tensor_functor;
add_tensor_functor(ctx, *output, *tensor1, tensor2.get());
auto* tensor2_data = tensor2->data<float>();
......@@ -106,38 +121,46 @@ TEST(selected_rows_functor, cpu_add) {
}
TEST(selected_rows_functor, cpu_add_to) {
using namespace paddle::framework;
using namespace paddle::platform;
using namespace paddle::operators::math;
CPUPlace cpu_place;
CPUDeviceContext ctx(cpu_place);
SetConstant<CPUDeviceContext, float> functor;
paddle::platform::CPUPlace cpu_place;
paddle::platform::CPUDeviceContext ctx(cpu_place);
paddle::operators::math::SetConstant<paddle::platform::CPUDeviceContext,
float>
functor;
int64_t height = 10;
int64_t row_numel = 10;
std::vector<int64_t> rows1{0, 4, 7};
std::unique_ptr<SelectedRows> selected_rows1{new SelectedRows(rows1, height)};
std::unique_ptr<paddle::framework::SelectedRows> selected_rows1{
new paddle::framework::SelectedRows(rows1, height)};
auto* in1_value = selected_rows1->mutable_value();
in1_value->mutable_data<float>(
make_ddim({static_cast<int64_t>(rows1.size()), row_numel}), cpu_place);
paddle::framework::make_ddim(
{static_cast<int64_t>(rows1.size()), row_numel}),
cpu_place);
functor(ctx, in1_value, 1.0);
std::vector<int64_t> rows2{0, 5, 7, 9};
std::unique_ptr<SelectedRows> selected_rows2{new SelectedRows(rows2, height)};
std::unique_ptr<paddle::framework::SelectedRows> selected_rows2{
new paddle::framework::SelectedRows(rows2, height)};
auto* in2_value = selected_rows2->mutable_value();
in2_value->mutable_data<float>(
make_ddim({static_cast<int64_t>(rows2.size()), row_numel}), cpu_place);
paddle::framework::make_ddim(
{static_cast<int64_t>(rows2.size()), row_numel}),
cpu_place);
functor(ctx, in2_value, 2.0);
std::unique_ptr<SelectedRows> output{new SelectedRows()};
std::unique_ptr<paddle::framework::SelectedRows> output{
new paddle::framework::SelectedRows()};
output->set_height(height);
auto* out_value = output->mutable_value();
// simplely concat two SelectedRows
out_value->mutable_data<float>(make_ddim({7, 10}), cpu_place);
out_value->mutable_data<float>(paddle::framework::make_ddim({7, 10}),
cpu_place);
SelectedRowsAddTo<CPUDeviceContext, float> add_to_functor;
paddle::operators::math::SelectedRowsAddTo<paddle::platform::CPUDeviceContext,
float>
add_to_functor;
add_to_functor(ctx, *selected_rows1, 0, output.get());
add_to_functor(ctx, *selected_rows2, in1_value->numel(), output.get());
......@@ -169,11 +192,15 @@ TEST(selected_rows_functor, cpu_add_to) {
EXPECT_EQ(out_data[5 * row_numel + 7], 2.0);
EXPECT_EQ(out_data[6 * row_numel + 9], 2.0);
std::unique_ptr<Tensor> tensor1{new Tensor()};
tensor1->mutable_data<float>(make_ddim({height, row_numel}), cpu_place);
std::unique_ptr<paddle::framework::Tensor> tensor1{
new paddle::framework::Tensor()};
tensor1->mutable_data<float>(
paddle::framework::make_ddim({height, row_numel}), cpu_place);
functor(ctx, tensor1.get(), 3.0);
SelectedRowsAddToTensor<CPUDeviceContext, float> add_to_tensor_functor;
paddle::operators::math::SelectedRowsAddToTensor<
paddle::platform::CPUDeviceContext, float>
add_to_tensor_functor;
add_to_tensor_functor(ctx, *output, tensor1.get());
auto* tensor1_data = tensor1->data<float>();
......
......@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/math/sequence_pooling.h"
#include <string>
#include "paddle/fluid/operators/math/math_function.h"
namespace paddle {
......
......@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/math/vol2col.h"
#include <vector>
namespace paddle {
namespace operators {
......
......@@ -12,6 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <algorithm>
#include <vector>
#include "paddle/fluid/operators/math/vol2col.h"
#include "paddle/fluid/platform/cuda_helper.h"
......
......@@ -15,6 +15,7 @@ limitations under the License. */
#include "paddle/fluid/operators/math/vol2col.h"
#include <gtest/gtest.h>
#include <iostream>
#include <vector>
template <typename DeviceContext, typename Place>
void testVol2col() {
......
......@@ -23,5 +23,7 @@ reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_o
reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc)
reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc)
reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc)
cc_test(reader_blocking_queue_test SRCS reader_blocking_queue_test.cc)
# Export local libraries to parent
set(READER_LIBRARY ${LOCAL_READER_LIBS} PARENT_SCOPE)
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <condition_variable> // NOLINT
#include <deque>
#include "paddle/fluid/platform/enforce.h"
namespace paddle {
namespace operators {
namespace reader {
template <typename T>
class BlockingQueue {
// BlockingQueue is for buffered reading and is supposed to use only the
// reader package. It is true that we could and we should have been using
// framework::Channel, but which has currently a deadlock bug. BlockingQueue
// is a workaround and a simplified version of framework::Channel as it
// doesn't support GPU and it implements on buffered blocking queue.
public:
explicit BlockingQueue(size_t capacity)
: capacity_(capacity), closed_(false) {
PADDLE_ENFORCE_GT(
capacity_, 0,
"The capacity of a reader::BlockingQueue must be greater than 0.");
}
bool Send(const T& elem) {
std::unique_lock<std::mutex> lock(mutex_);
send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; });
if (closed_) {
VLOG(5)
<< "WARNING: Sending an element to a closed reader::BlokcingQueue.";
return false;
}
PADDLE_ENFORCE_LT(queue_.size(), capacity_);
queue_.push_back(elem);
receive_cv_.notify_one();
return true;
}
bool Send(T&& elem) {
std::unique_lock<std::mutex> lock(mutex_);
send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; });
if (closed_) {
VLOG(5)
<< "WARNING: Sending an element to a closed reader::BlokcingQueue.";
return false;
}
PADDLE_ENFORCE_LT(queue_.size(), capacity_);
queue_.emplace_back(std::move(elem));
receive_cv_.notify_one();
return true;
}
bool Receive(T* elem) {
std::unique_lock<std::mutex> lock(mutex_);
receive_cv_.wait(lock, [&] { return !queue_.empty() || closed_; });
if (!queue_.empty()) {
PADDLE_ENFORCE_NOT_NULL(elem);
*elem = queue_.front();
queue_.pop_front();
send_cv_.notify_one();
return true;
} else {
PADDLE_ENFORCE(closed_);
return false;
}
}
void Close() {
std::lock_guard<std::mutex> lock(mutex_);
closed_ = true;
send_cv_.notify_all();
receive_cv_.notify_all();
}
bool IsClosed() {
std::lock_guard<std::mutex> lock(mutex_);
return closed_;
}
size_t Cap() {
std::lock_guard<std::mutex> lock(mutex_);
return capacity_;
}
private:
size_t capacity_;
bool closed_;
std::deque<T> queue_;
std::mutex mutex_;
std::condition_variable receive_cv_;
std::condition_variable send_cv_;
};
} // namespace reader
} // namespace operators
} // namespace paddle
......@@ -14,7 +14,7 @@
#include <thread> // NOLINT
#include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/operators/reader/blocking_queue.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h"
namespace paddle {
......@@ -23,13 +23,13 @@ namespace reader {
// 'Double buffer' means we shall maintain two batches of input data at the same
// time. So the kCacheSize shoul be at least 2.
static constexpr size_t kCacheSize = 2;
static constexpr size_t kCacheSize = 3;
// There will be two bacthes out of the channel during training:
// 1. the one waiting to be sent to the channel
// 2. the one just be received from the channel, which is also being used by
// subsequent operators.
// So the channel size should be kChacheSize - 2
static constexpr size_t kChannelSize = 0; // kCacheSize - 2
static constexpr size_t kChannelSize = 1; // kCacheSize - 2
class DoubleBufferReader : public framework::DecoratedReader {
public:
......@@ -55,10 +55,8 @@ class DoubleBufferReader : public framework::DecoratedReader {
~DoubleBufferReader() { EndPrefetcher(); }
private:
bool HasNext() const;
void StartPrefetcher() {
channel_ = framework::MakeChannel<size_t>(kChannelSize);
channel_ = new reader::BlockingQueue<size_t>(kChannelSize);
prefetcher_ = std::thread([this] { PrefetchThreadFunc(); });
}
......@@ -74,7 +72,7 @@ class DoubleBufferReader : public framework::DecoratedReader {
void PrefetchThreadFunc();
std::thread prefetcher_;
framework::Channel<size_t>* channel_;
reader::BlockingQueue<size_t>* channel_;
platform::Place place_;
std::vector<std::vector<framework::LoDTensor>> cpu_tensor_cache_;
std::vector<std::vector<framework::LoDTensor>> gpu_tensor_cache_;
......@@ -139,17 +137,16 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase {
};
void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) {
out->clear();
if (HasNext()) {
size_t cached_tensor_id;
channel_->Receive(&cached_tensor_id);
if (channel_->Receive(&cached_tensor_id)) {
if (platform::is_gpu_place(place_)) {
*out = gpu_tensor_cache_[cached_tensor_id];
ctxs_[cached_tensor_id]->Wait();
} else {
// CPU place
*out = cpu_tensor_cache_[cached_tensor_id];
}
} else {
out->clear();
}
}
......@@ -159,12 +156,6 @@ void DoubleBufferReader::ReInit() {
StartPrefetcher();
}
bool DoubleBufferReader::HasNext() const {
while (!channel_->IsClosed() && !channel_->CanReceive()) {
}
return channel_->CanReceive();
}
void DoubleBufferReader::PrefetchThreadFunc() {
VLOG(5) << "A new prefetch thread starts.";
size_t cached_tensor_id = 0;
......@@ -185,10 +176,7 @@ void DoubleBufferReader::PrefetchThreadFunc() {
gpu_batch[i].set_lod(cpu_batch[i].lod());
}
}
try {
size_t tmp = cached_tensor_id;
channel_->Send(&tmp);
} catch (paddle::platform::EnforceNotMet e) {
if (!channel_->Send(cached_tensor_id)) {
VLOG(5) << "WARNING: The double buffer channel has been closed. The "
"prefetch thread will terminate.";
break;
......
......@@ -14,7 +14,7 @@
#include <thread> // NOLINT
#include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/operators/reader/blocking_queue.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h"
namespace paddle {
......@@ -37,7 +37,6 @@ class MultiFileReader : public framework::ReaderBase {
~MultiFileReader() { EndScheduler(); }
private:
bool HasNext();
void StartNewScheduler();
void EndScheduler();
void ScheduleThreadFunc();
......@@ -48,15 +47,14 @@ class MultiFileReader : public framework::ReaderBase {
std::thread scheduler_;
std::vector<std::thread> prefetchers_;
size_t buffer_size_;
framework::Channel<size_t>* waiting_file_idx_;
framework::Channel<size_t>* available_thread_idx_;
framework::Channel<std::vector<framework::LoDTensor>>* buffer_;
reader::BlockingQueue<size_t>* waiting_file_idx_;
reader::BlockingQueue<size_t>* available_thread_idx_;
reader::BlockingQueue<std::vector<framework::LoDTensor>>* buffer_;
};
void MultiFileReader::ReadNext(std::vector<framework::LoDTensor>* out) {
if (!buffer_->Receive(out)) {
out->clear();
if (HasNext()) {
buffer_->Receive(out);
}
}
......@@ -65,25 +63,19 @@ void MultiFileReader::ReInit() {
StartNewScheduler();
}
bool MultiFileReader::HasNext() {
while (!buffer_->IsClosed() && !buffer_->CanReceive()) {
}
return buffer_->CanReceive();
}
void MultiFileReader::StartNewScheduler() {
size_t thread_num = prefetchers_.size();
waiting_file_idx_ = framework::MakeChannel<size_t>(file_names_.size());
available_thread_idx_ = framework::MakeChannel<size_t>(thread_num);
buffer_ =
framework::MakeChannel<std::vector<framework::LoDTensor>>(buffer_size_);
waiting_file_idx_ = new reader::BlockingQueue<size_t>(file_names_.size());
available_thread_idx_ = new reader::BlockingQueue<size_t>(thread_num);
buffer_ = new reader::BlockingQueue<std::vector<framework::LoDTensor>>(
buffer_size_);
for (size_t i = 0; i < file_names_.size(); ++i) {
waiting_file_idx_->Send(&i);
waiting_file_idx_->Send(i);
}
waiting_file_idx_->Close();
for (size_t i = 0; i < thread_num; ++i) {
available_thread_idx_->Send(&i);
available_thread_idx_->Send(i);
}
scheduler_ = std::thread([this] { ScheduleThreadFunc(); });
......@@ -149,7 +141,7 @@ void MultiFileReader::PrefetchThreadFunc(std::string file_name,
break;
}
try {
buffer_->Send(&ins);
buffer_->Send(std::move(ins));
} catch (paddle::platform::EnforceNotMet e) {
VLOG(5) << "WARNING: The buffer channel has been closed. The prefetch "
"thread of file '"
......@@ -158,9 +150,7 @@ void MultiFileReader::PrefetchThreadFunc(std::string file_name,
}
}
try {
available_thread_idx_->Send(&thread_idx);
} catch (paddle::platform::EnforceNotMet e) {
if (!available_thread_idx_->Send(thread_idx)) {
VLOG(5) << "WARNING: The available_thread_idx_ channel has been closed. "
"Fail to send thread_idx.";
}
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <chrono> // NOLINT
#include <set>
#include <thread> // NOLINT
#include <vector>
#include "gtest/gtest.h"
#include "paddle/fluid/operators/reader/blocking_queue.h"
using paddle::operators::reader::BlockingQueue;
TEST(BlockingQueue, CapacityTest) {
size_t cap = 10;
BlockingQueue<int> q(cap);
EXPECT_EQ(q.Cap(), cap);
}
void FirstInFirstOut(size_t queue_cap, size_t elem_num, size_t send_time_gap,
size_t receive_time_gap) {
BlockingQueue<size_t> q(queue_cap);
std::thread sender([&]() {
for (size_t i = 0; i < elem_num; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(send_time_gap));
EXPECT_TRUE(q.Send(i));
}
q.Close();
});
size_t count = 0;
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(receive_time_gap));
size_t elem;
if (!q.Receive(&elem)) {
break;
}
EXPECT_EQ(elem, count++);
}
sender.join();
EXPECT_EQ(count, elem_num);
EXPECT_TRUE(q.IsClosed());
}
TEST(BlockingQueue, FirstInFirstOutTest) {
FirstInFirstOut(2, 5, 2, 50);
FirstInFirstOut(2, 5, 50, 2);
FirstInFirstOut(10, 3, 50, 2);
FirstInFirstOut(10, 3, 2, 50);
}
TEST(BlockingQueue, SenderBlockingTest) {
const size_t queue_cap = 2;
BlockingQueue<size_t> q(queue_cap);
size_t send_count = 0;
std::thread sender([&]() {
for (size_t i = 0; i < 5; ++i) {
if (!q.Send(i)) {
break;
}
++send_count;
}
});
std::this_thread::sleep_for(std::chrono::milliseconds(200));
q.Close();
sender.join();
EXPECT_EQ(send_count, queue_cap);
std::vector<size_t> res;
while (true) {
size_t elem;
if (!q.Receive(&elem)) {
break;
}
res.push_back(elem);
}
EXPECT_EQ(res.size(), queue_cap);
for (size_t i = 0; i < res.size(); ++i) {
EXPECT_EQ(res[i], i);
}
}
TEST(BlockingQueue, ReceiverBlockingTest) {
const size_t queue_cap = 5;
BlockingQueue<size_t> q(queue_cap);
std::vector<size_t> receive_res;
std::thread receiver([&]() {
size_t elem;
while (true) {
if (!q.Receive(&elem)) {
break;
}
receive_res.push_back(elem);
}
});
std::vector<size_t> to_send{2, 1, 7};
for (auto e : to_send) {
q.Send(e);
}
q.Close();
receiver.join();
EXPECT_EQ(receive_res.size(), to_send.size());
for (size_t i = 0; i < to_send.size(); ++i) {
EXPECT_EQ(receive_res[i], to_send[i]);
}
}
void CheckIsUnorderedSame(const std::vector<std::vector<size_t>>& v1,
const std::vector<std::vector<size_t>>& v2) {
std::set<size_t> s1;
std::set<size_t> s2;
for (auto vec : v1) {
for (size_t elem : vec) {
s1.insert(elem);
}
}
for (auto vec : v2) {
for (size_t elem : vec) {
s2.insert(elem);
}
}
EXPECT_EQ(s1.size(), s2.size());
auto it1 = s1.begin();
auto it2 = s2.begin();
while (it1 != s1.end()) {
EXPECT_EQ(*it1, *it2);
++it1;
++it2;
}
}
void MultiSenderMultiReceiver(const size_t queue_cap,
const std::vector<std::vector<size_t>>& to_send,
size_t receiver_num, size_t send_time_gap,
size_t receive_time_gap) {
BlockingQueue<size_t> q(queue_cap);
size_t sender_num = to_send.size();
std::vector<std::thread> senders;
for (size_t s_idx = 0; s_idx < sender_num; ++s_idx) {
senders.emplace_back(std::thread([&, s_idx] {
for (size_t elem : to_send[s_idx]) {
std::this_thread::sleep_for(std::chrono::milliseconds(send_time_gap));
EXPECT_TRUE(q.Send(elem));
}
}));
}
std::vector<std::thread> receivers;
std::mutex mu;
std::vector<std::vector<size_t>> res;
for (size_t r_idx = 0; r_idx < receiver_num; ++r_idx) {
receivers.emplace_back(std::thread([&] {
std::vector<size_t> receiver_res;
while (true) {
std::this_thread::sleep_for(
std::chrono::milliseconds(receive_time_gap));
size_t elem;
if (!q.Receive(&elem)) {
break;
}
receiver_res.push_back(elem);
}
std::lock_guard<std::mutex> lock(mu);
res.push_back(receiver_res);
}));
}
for (auto& t : senders) {
t.join();
}
q.Close();
for (auto& t : receivers) {
t.join();
}
CheckIsUnorderedSame(to_send, res);
}
TEST(BlockingQueue, MultiSenderMultiReaderTest) {
std::vector<std::vector<size_t>> to_send_1{{2, 3, 4}, {9}, {0, 7, 15, 6}};
MultiSenderMultiReceiver(2, to_send_1, 2, 0, 0);
MultiSenderMultiReceiver(10, to_send_1, 2, 0, 0);
MultiSenderMultiReceiver(2, to_send_1, 20, 0, 0);
MultiSenderMultiReceiver(2, to_send_1, 2, 50, 0);
MultiSenderMultiReceiver(2, to_send_1, 2, 0, 50);
std::vector<std::vector<size_t>> to_send_2{
{2, 3, 4}, {}, {0, 7, 15, 6, 9, 32}};
MultiSenderMultiReceiver(2, to_send_2, 3, 0, 0);
MultiSenderMultiReceiver(20, to_send_2, 3, 0, 0);
MultiSenderMultiReceiver(2, to_send_2, 30, 0, 0);
MultiSenderMultiReceiver(2, to_send_2, 3, 50, 0);
MultiSenderMultiReceiver(2, to_send_2, 3, 0, 50);
}
struct MyClass {
MyClass() : val_(0) {}
explicit MyClass(int val) : val_(val) {}
MyClass(const MyClass& b) { val_ = b.val_; }
MyClass(MyClass&& b) { val_ = b.val_; }
void operator=(const MyClass& b) { val_ = b.val_; }
int val_;
};
TEST(BlockingQueue, MyClassTest) {
BlockingQueue<MyClass> q(2);
MyClass a(200);
q.Send(std::move(a));
MyClass b;
q.Receive(&b);
EXPECT_EQ(a.val_, b.val_);
}
......@@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "reader_op_registry.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h"
#include <string>
#include <vector>
namespace paddle {
namespace operators {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册