未验证 提交 b0630938 编写于 作者: Y Yu Yang 提交者: GitHub

Merge pull request #12149 from reyoung/feature/combine_open_files_and_double_buffer

Change and polish readers
......@@ -180,13 +180,13 @@ paddle.fluid.layers.log ArgSpec(args=['x'], varargs=None, keywords=None, default
paddle.fluid.layers.crop ArgSpec(args=['x', 'shape', 'offsets', 'name'], varargs=None, keywords=None, defaults=(None, None, None))
paddle.fluid.layers.data ArgSpec(args=['name', 'shape', 'append_batch_size', 'dtype', 'lod_level', 'type', 'stop_gradient'], varargs=None, keywords=None, defaults=(True, 'float32', 0, VarType.LOD_TENSOR, True))
paddle.fluid.layers.open_recordio_file ArgSpec(args=['filename', 'shapes', 'lod_levels', 'dtypes', 'pass_num', 'for_parallel'], varargs=None, keywords=None, defaults=(1, True))
paddle.fluid.layers.open_files ArgSpec(args=['filenames', 'shapes', 'lod_levels', 'dtypes', 'thread_num', 'buffer_size', 'pass_num', 'for_parallel'], varargs=None, keywords=None, defaults=(1, None, 1, True))
paddle.fluid.layers.open_files ArgSpec(args=['filenames', 'shapes', 'lod_levels', 'dtypes', 'thread_num', 'buffer_size', 'pass_num', 'is_test'], varargs=None, keywords=None, defaults=(None, None, 1, None))
paddle.fluid.layers.read_file ArgSpec(args=['reader'], varargs=None, keywords=None, defaults=None)
paddle.fluid.layers.shuffle ArgSpec(args=['reader', 'buffer_size'], varargs=None, keywords=None, defaults=None)
paddle.fluid.layers.batch ArgSpec(args=['reader', 'batch_size'], varargs=None, keywords=None, defaults=None)
paddle.fluid.layers.double_buffer ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None))
paddle.fluid.layers.random_data_generator ArgSpec(args=['low', 'high', 'shapes', 'lod_levels', 'for_parallel'], varargs=None, keywords=None, defaults=(True,))
paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, None, True))
paddle.fluid.layers.Preprocessor.__init__ ArgSpec(args=['self', 'reader', 'name'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.layers.Preprocessor.block ArgSpec(args=[], varargs='args', keywords='kwds', defaults=None)
paddle.fluid.layers.Preprocessor.inputs ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
......
......@@ -171,7 +171,12 @@ void ThreadedSSAGraphExecutor::InsertFetchOps(
for (size_t i = 0; i < fetch_tensors.size(); ++i) {
auto &var_name = fetch_tensors[i];
auto &vars = fetched_vars.at(var_name);
auto fetched_var_it = fetched_vars.find(var_name);
PADDLE_ENFORCE(fetched_var_it != fetched_vars.end(),
"Cannot find fetched variable.(Perhaps the main_program "
"is not set to ParallelExecutor)");
auto &vars = fetched_var_it->second;
temp_nodes->emplace_back(new ir::Node("fetch", ir::Node::Type::kOperation));
auto *op = new FetchOpHandle(temp_nodes->back().get(), fetch_data, i,
......
......@@ -312,19 +312,22 @@ void WriteToRecordIO(recordio::Writer *writer,
writer->Write(buffer.str());
}
std::vector<LoDTensor> ReadFromRecordIO(
recordio::Scanner *scanner, const platform::DeviceContext &dev_ctx) {
std::vector<LoDTensor> result;
if (scanner->HasNext()) {
std::istringstream sin(scanner->Next());
uint32_t sz;
sin.read(reinterpret_cast<char *>(&sz), sizeof(uint32_t));
result.resize(sz);
for (uint32_t i = 0; i < sz; ++i) {
DeserializeFromStream(sin, &result[i], dev_ctx);
}
bool ReadFromRecordIO(recordio::Scanner *scanner,
const platform::DeviceContext &dev_ctx,
std::vector<LoDTensor> *result_ptr) {
if (!scanner->HasNext()) {
return false;
}
return result;
std::istringstream sin(scanner->Next());
uint32_t sz;
sin.read(reinterpret_cast<char *>(&sz), sizeof(uint32_t));
auto &result = *result_ptr;
result.resize(sz);
for (uint32_t i = 0; i < sz; ++i) {
DeserializeFromStream(sin, &result[i], dev_ctx);
}
return true;
}
std::vector<LoDTensor> LoDTensor::SplitLoDTensor(
......
......@@ -223,8 +223,9 @@ extern void WriteToRecordIO(recordio::Writer* writer,
const std::vector<LoDTensor>& tensor,
const platform::DeviceContext& dev_ctx);
extern std::vector<LoDTensor> ReadFromRecordIO(
recordio::Scanner* scanner, const platform::DeviceContext& dev_ctx);
extern bool ReadFromRecordIO(recordio::Scanner* scanner,
const platform::DeviceContext& dev_ctx,
std::vector<LoDTensor>* result_ptr);
/*
* Convert between length-based LoD and offset-based LoD.
......
......@@ -301,11 +301,12 @@ static void TestRecordIO() {
{
std::unique_ptr<std::istream> stream_ptr(stream);
recordio::Scanner scanner(std::move(stream_ptr));
auto tensors = ReadFromRecordIO(&scanner, ctx);
std::vector<framework::LoDTensor> tensors;
ASSERT_TRUE(ReadFromRecordIO(&scanner, ctx, &tensors));
ASSERT_EQ(tensors.size(), static_cast<size_t>(2));
assert_tensor_ok(tensors[0]);
assert_tensor_ok(tensors[1]);
tensors = ReadFromRecordIO(&scanner, ctx);
ASSERT_TRUE(ReadFromRecordIO(&scanner, ctx, &tensors));
ASSERT_EQ(tensors.size(), static_cast<size_t>(2));
assert_tensor_ok(tensors[0]);
assert_tensor_ok(tensors[1]);
......
......@@ -67,7 +67,8 @@ void ReaderBase::Start() {
}
}
ReaderBase::~ReaderBase() { Shutdown(); }
ReaderBase::~ReaderBase() {}
DecoratedReader::~DecoratedReader() { reader_->Shutdown(); }
} // namespace framework
} // namespace paddle
......@@ -25,8 +25,6 @@
namespace paddle {
namespace framework {
enum ReaderStatus { kRunning, kStopped };
class ReaderBase {
public:
virtual void ReadNext(std::vector<LoDTensor>* out);
......@@ -48,6 +46,8 @@ class ReaderBase {
virtual void StartImpl() {}
enum ReaderStatus { kRunning, kStopped };
ReaderStatus status_{kRunning};
mutable std::mutex mu_;
......@@ -74,6 +74,8 @@ class DecoratedReader : public ReaderBase,
reader_->InsertDecoratedReader(shared_from_this());
}
~DecoratedReader();
protected:
void ShutdownImpl() override { reader_->Shutdown(); }
......
......@@ -15,12 +15,13 @@ function(reader_library TARGET_NAME)
PARENT_SCOPE)
endfunction()
reader_library(open_files_op SRCS open_files_op.cc)
cc_library(buffered_reader SRCS buffered_reader.cc DEPS reader simple_threadpool)
reader_library(open_files_op SRCS open_files_op.cc DEPS buffered_reader)
reader_library(create_random_data_generator_op SRCS create_random_data_generator_op.cc)
reader_library(create_shuffle_reader_op SRCS create_shuffle_reader_op.cc)
reader_library(create_batch_reader_op SRCS create_batch_reader_op.cc)
reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_op.cc)
reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc)
reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc DEPS buffered_reader)
reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc)
reader_library(create_custom_reader_op SRCS create_custom_reader_op.cc)
reader_library(create_py_reader_op SRCS create_py_reader_op.cc)
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/operators/reader/buffered_reader.h"
#include <vector>
namespace paddle {
namespace operators {
namespace reader {
BufferedReader::~BufferedReader() { reader_->Shutdown(); }
BufferedReader::BufferedReader(
const std::shared_ptr<framework::ReaderBase> &reader,
const platform::Place &place, size_t buffer_size)
: framework::DecoratedReader(reader),
thread_pool_(1),
place_(place),
buffer_size_(buffer_size) {
cpu_buffer_.resize(buffer_size);
gpu_buffer_.resize(buffer_size);
ReadTillBufferFullAsync();
}
void BufferedReader::ReadTillBufferFullAsync() {
PADDLE_ENFORCE_EQ(position_.size(), 0U);
for (size_t i = 0; i < buffer_size_; ++i) {
ReadAsync(i);
}
}
void BufferedReader::ReadAsync(size_t i) {
position_.emplace(thread_pool_.enqueue([this, i]() -> size_t {
TensorVec &cpu = cpu_buffer_[i];
reader_->ReadNext(&cpu);
if (cpu.empty()) {
return -1UL;
}
if (platform::is_gpu_place(place_)) {
TensorVec &gpu = gpu_buffer_[i];
gpu.resize(cpu.size());
for (size_t i = 0; i < cpu.size(); ++i) {
framework::TensorCopySync(cpu[i], place_, &gpu[i]);
gpu[i].set_lod(cpu[i].lod());
}
}
return i;
}));
}
void BufferedReader::ShutdownImpl() {
reader_->Shutdown();
while (!position_.empty()) {
position_.pop();
}
prev_pos_ = -1UL;
}
void BufferedReader::StartImpl() {
reader_->Start();
ReadTillBufferFullAsync();
}
void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) {
if (position_.empty()) {
out->clear();
return;
}
size_t i = position_.front().get();
position_.pop();
if (i == -1UL) {
ReadNextImpl(out);
return;
}
*out = platform::is_gpu_place(place_) ? gpu_buffer_[i] : cpu_buffer_[i];
// Do not push current position into ReadAsync. Push the previous position
// Since all computation in fluid are async, change the data of
// current position may cause data error.
if (prev_pos_ != -1Ul) {
ReadAsync(prev_pos_);
}
prev_pos_ = i;
}
} // namespace reader
} // namespace operators
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <list>
#include <queue>
#include <vector>
#include "ThreadPool.h"
#include "paddle/fluid/framework/reader.h"
namespace paddle {
namespace operators {
namespace reader {
class BufferedReader : public framework::DecoratedReader {
using TensorVec = std::vector<framework::LoDTensor>;
using VecFuture = std::future<TensorVec>;
public:
BufferedReader(const std::shared_ptr<framework::ReaderBase>& reader,
const platform::Place& place, size_t buffer_size);
~BufferedReader() override;
private:
void ReadTillBufferFullAsync();
void ReadAsync(size_t i);
protected:
void ShutdownImpl() override;
void StartImpl() override;
void ReadNextImpl(std::vector<framework::LoDTensor>* out) override;
private:
ThreadPool thread_pool_;
platform::Place place_;
const size_t buffer_size_;
std::queue<std::future<size_t>> position_;
// The buffer for reading data.
// NOTE: the simplest way to implement buffered reader is do not use any
// buffer, just read async and create futures as buffer size. However, to
// malloc tensors every time is extremely slow. Here we store all data in
// buffers and prevent alloc every time.
std::vector<TensorVec> cpu_buffer_;
std::vector<TensorVec> gpu_buffer_;
size_t prev_pos_{-1UL};
};
} // namespace reader
} // namespace operators
} // namespace paddle
......@@ -12,83 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <thread> // NOLINT
#include "paddle/fluid/operators/reader/blocking_queue.h"
#include "paddle/fluid/operators/reader/buffered_reader.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h"
namespace paddle {
namespace operators {
namespace reader {
// 'Double buffer' means we shall maintain two batches of input data at the same
// time. So the kCacheSize shoul be at least 2.
static constexpr size_t kCacheSize = 3;
// There will be two bacthes out of the channel during training:
// 1. the one waiting to be sent to the channel
// 2. the one just be received from the channel, which is also being used by
// subsequent operators.
// So the channel size should be kChacheSize - 2
static constexpr size_t kChannelSize = 1; // kCacheSize - 2
class DoubleBufferReader : public framework::DecoratedReader {
public:
explicit DoubleBufferReader(
const std::shared_ptr<ReaderBase>& reader,
platform::Place target_place = platform::CPUPlace())
: DecoratedReader(reader), place_(target_place) {
cpu_tensor_cache_.resize(kCacheSize);
gpu_tensor_cache_.resize(kCacheSize);
#ifdef PADDLE_WITH_CUDA
if (platform::is_gpu_place(place_)) {
for (size_t i = 0; i < kCacheSize; ++i) {
ctxs_.emplace_back(new platform::CUDADeviceContext(
boost::get<platform::CUDAPlace>(place_)));
}
}
#endif
StartPrefetcher();
}
void ReadNextImpl(std::vector<framework::LoDTensor>* out) override;
~DoubleBufferReader() { EndPrefetcher(); }
private:
void ShutdownImpl() override {
EndPrefetcher();
reader_->Shutdown();
}
void StartImpl() override {
reader_->Start();
StartPrefetcher();
}
void StartPrefetcher() {
channel_ = new reader::BlockingQueue<size_t>(kChannelSize);
prefetcher_ = std::thread([this] { PrefetchThreadFunc(); });
}
void EndPrefetcher() {
channel_->Close();
if (prefetcher_.joinable()) {
prefetcher_.join();
}
delete channel_;
channel_ = nullptr;
}
void PrefetchThreadFunc();
std::thread prefetcher_;
reader::BlockingQueue<size_t>* channel_;
platform::Place place_;
std::vector<std::vector<framework::LoDTensor>> cpu_tensor_cache_;
std::vector<std::vector<framework::LoDTensor>> gpu_tensor_cache_;
std::vector<std::unique_ptr<platform::DeviceContext>> ctxs_;
};
class CreateDoubleBufferReaderOp : public framework::OperatorBase {
public:
using framework::OperatorBase::OperatorBase;
......@@ -118,8 +47,8 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase {
place = platform::CUDAPlace(static_cast<int>(num));
}
out->Reset(framework::MakeDecoratedReader<DoubleBufferReader>(
underlying_reader, place));
out->Reset(framework::MakeDecoratedReader<BufferedReader>(underlying_reader,
place, 2));
}
};
......@@ -146,51 +75,6 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase {
}
};
void DoubleBufferReader::ReadNextImpl(std::vector<framework::LoDTensor>* out) {
size_t cached_tensor_id;
if (channel_->Receive(&cached_tensor_id)) {
if (platform::is_gpu_place(place_)) {
*out = gpu_tensor_cache_[cached_tensor_id];
} else {
// CPU place
*out = cpu_tensor_cache_[cached_tensor_id];
}
} else {
out->clear();
}
}
void DoubleBufferReader::PrefetchThreadFunc() {
VLOG(5) << "A new prefetch thread starts.";
size_t cached_tensor_id = 0;
while (true) {
auto& cpu_batch = cpu_tensor_cache_[cached_tensor_id];
reader_->ReadNext(&cpu_batch);
if (cpu_batch.empty()) {
// The underlying reader have no next data.
break;
}
if (platform::is_gpu_place(place_)) {
auto& gpu_batch = gpu_tensor_cache_[cached_tensor_id];
gpu_batch.resize(cpu_batch.size());
for (size_t i = 0; i < cpu_batch.size(); ++i) {
// TODO(fengjiayi): Use asynchronous TensorCopy instead
framework::TensorCopySync(cpu_batch[i], place_, &gpu_batch[i]);
gpu_batch[i].set_lod(cpu_batch[i].lod());
}
}
if (!channel_->Send(cached_tensor_id)) {
VLOG(5) << "WARNING: The double buffer channel has been closed. The "
"prefetch thread will terminate.";
break;
}
++cached_tensor_id;
cached_tensor_id %= kCacheSize;
}
channel_->Close();
VLOG(5) << "Prefetch thread terminates.";
}
} // namespace reader
} // namespace operators
} // namespace paddle
......
......@@ -33,6 +33,8 @@ class PyReader : public framework::FileReader {
if (!success) out->clear();
}
~PyReader() { queue_->Close(); }
void Shutdown() override { queue_->Close(); }
void Start() override { queue_->ReOpen(); }
......
......@@ -33,11 +33,14 @@ class RecordIOFileReader : public framework::FileReader {
protected:
void ReadNextImpl(std::vector<framework::LoDTensor>* out) override {
std::unique_ptr<std::lock_guard<std::mutex>> guard;
if (ThreadSafe) {
std::lock_guard<std::mutex> guard(*mutex_);
*out = framework::ReadFromRecordIO(&scanner_, dev_ctx_);
} else {
*out = framework::ReadFromRecordIO(&scanner_, dev_ctx_);
guard.reset(new std::lock_guard<std::mutex>(*mutex_));
}
bool ok = framework::ReadFromRecordIO(&scanner_, dev_ctx_, out);
if (!ok) {
out->clear();
}
}
......
......@@ -48,9 +48,9 @@ class ShuffleReader : public framework::DecoratedReader {
private:
void ShutdownImpl() override {
reader_->Shutdown();
buffer_.clear();
iteration_pos_ = 0;
reader_->Shutdown();
}
void StartImpl() override {
......
......@@ -12,150 +12,200 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <cmath>
#include <stdexcept>
#include <thread> // NOLINT
#include "ThreadPool.h"
#include "paddle/fluid/framework/blocking_queue.h"
#include "paddle/fluid/operators/reader/blocking_queue.h"
#include "paddle/fluid/operators/reader/buffered_reader.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h"
namespace paddle {
namespace operators {
namespace reader {
class MultiFileReader : public framework::ReaderBase {
class IReaderContainer {
public:
MultiFileReader(const std::vector<std::string>& file_names, size_t thread_num,
size_t buffer_size)
: buffer_size_(buffer_size) {
readers_.reserve(file_names.size());
for (const std::string& f_name : file_names) {
readers_.emplace_back(CreateReaderByFileName(f_name));
virtual ~IReaderContainer() {}
virtual void AppendReader(
std::unique_ptr<framework::ReaderBase>&& readers) = 0;
virtual void Stop() = 0;
virtual void Start() = 0;
virtual void ReadNext(std::vector<framework::LoDTensor>* out) = 0;
};
class OrderedReaderContainer : public IReaderContainer {
public:
void AppendReader(std::unique_ptr<framework::ReaderBase>&& reader) override {
pending_.emplace(std::move(reader));
}
void Stop() override {
while (!pending_.empty()) {
MoveFrontPendingToDone();
}
prefetchers_.resize(thread_num);
StartNewScheduler();
}
void ReadNextImpl(std::vector<framework::LoDTensor>* out) override;
void Start() override { std::swap(done_, pending_); }
~MultiFileReader() { EndScheduler(); }
void ReadNext(std::vector<framework::LoDTensor>* out) override {
if (!pending_.empty()) {
pending_.front()->ReadNext(out);
if (out->empty()) {
MoveFrontPendingToDone();
ReadNext(out);
}
} else {
out->clear();
}
}
private:
void ShutdownImpl() override { EndScheduler(); }
void StartImpl() override { StartNewScheduler(); }
void StartNewScheduler();
void EndScheduler();
void ScheduleThreadFunc();
void PrefetchThreadFunc(size_t reader_idx, size_t thread_idx);
std::vector<std::unique_ptr<framework::ReaderBase>> readers_;
std::thread scheduler_;
std::vector<std::thread> prefetchers_;
size_t buffer_size_;
reader::BlockingQueue<size_t>* waiting_reader_idx_;
reader::BlockingQueue<size_t>* available_thread_idx_;
reader::BlockingQueue<std::vector<framework::LoDTensor>>* buffer_;
void MoveFrontPendingToDone() {
pending_.front()->Shutdown();
pending_.front()->Start();
done_.emplace(move(pending_.front()));
pending_.pop();
}
std::queue<std::unique_ptr<framework::ReaderBase>> pending_;
std::queue<std::unique_ptr<framework::ReaderBase>> done_;
};
void MultiFileReader::ReadNextImpl(std::vector<framework::LoDTensor>* out) {
if (!buffer_->Receive(out)) {
out->clear();
}
}
class PreemptiveReaderContainer : public IReaderContainer {
using ReaderList = std::list<std::unique_ptr<framework::ReaderBase>>;
void MultiFileReader::StartNewScheduler() {
size_t thread_num = prefetchers_.size();
waiting_reader_idx_ = new reader::BlockingQueue<size_t>(readers_.size());
available_thread_idx_ = new reader::BlockingQueue<size_t>(thread_num);
buffer_ = new reader::BlockingQueue<std::vector<framework::LoDTensor>>(
buffer_size_);
struct FutureItem {
std::vector<framework::LoDTensor> data_;
ReaderList::iterator reader_it_;
std::exception_ptr exception_;
};
for (size_t i = 0; i < readers_.size(); ++i) {
waiting_reader_idx_->Send(i);
}
waiting_reader_idx_->Close();
for (size_t i = 0; i < thread_num; ++i) {
available_thread_idx_->Send(i);
}
using FutureList = std::list<std::future<FutureItem>>;
scheduler_ = std::thread([this] { ScheduleThreadFunc(); });
}
public:
explicit PreemptiveReaderContainer(size_t thread_num) : pool_(thread_num) {}
void MultiFileReader::EndScheduler() {
available_thread_idx_->Close();
buffer_->Close();
waiting_reader_idx_->Close();
if (scheduler_.joinable()) {
scheduler_.join();
}
delete buffer_;
delete available_thread_idx_;
delete waiting_reader_idx_;
}
void MultiFileReader::ScheduleThreadFunc() {
VLOG(5) << "MultiFileReader schedule thread starts.";
size_t completed_thread_num = 0;
size_t thread_idx;
while (available_thread_idx_->Receive(&thread_idx)) {
std::thread& prefetcher = prefetchers_[thread_idx];
if (prefetcher.joinable()) {
prefetcher.join();
}
size_t reader_idx;
if (waiting_reader_idx_->Receive(&reader_idx)) {
// Still have files to read. Start a new prefetch thread.
prefetcher = std::thread([this, reader_idx, thread_idx] {
PrefetchThreadFunc(reader_idx, thread_idx);
});
} else {
// No more file to read.
++completed_thread_num;
if (completed_thread_num == prefetchers_.size()) {
buffer_->Close();
break;
void Stop() override {
if (!pending_.empty()) {
for (auto& reader : pending_) {
reader->Shutdown();
}
for (auto& fu : futures_) {
fu.wait();
}
futures_.clear();
for (auto& reader : pending_) {
reader->Start();
done_.emplace_back(std::move(reader));
}
pending_.clear();
bool timeout;
complete_queue_.PopAll(1000, &timeout);
PADDLE_ENFORCE(!timeout);
}
}
// If users invoke Shutdown() when scheduler is running, it will close the
// 'avaiable_thread_idx_' and prefecther threads have no way to tell scheduler
// to release their resource. So a check is needed before scheduler ends.
for (auto& p : prefetchers_) {
if (p.joinable()) {
p.join();
void Start() override {
for (auto& reader : done_) {
AppendReader(std::move(reader));
}
done_.clear();
}
VLOG(5) << "MultiFileReader schedule thread terminates.";
}
void MultiFileReader::PrefetchThreadFunc(size_t reader_idx, size_t thread_idx) {
VLOG(5) << "The prefetch thread of file idx '" << reader_idx << "' starts.";
std::unique_ptr<framework::ReaderBase>& reader = readers_[reader_idx];
while (true) {
std::vector<framework::LoDTensor> ins;
reader->ReadNext(&ins);
if (ins.empty()) {
reader->Shutdown();
reader->Start();
break;
void ReadNext(std::vector<framework::LoDTensor>* out) override {
if (!pending_.empty()) {
auto future_it = complete_queue_.Pop();
FutureItem item = future_it->get();
if (item.exception_) {
for (auto it = futures_.begin(); it != futures_.end(); ++it) {
if (it != future_it) {
it->wait(); // Wait all other threads complete.
}
}
std::rethrow_exception(item.exception_);
} else if (item.data_.empty()) { // reader done.
done_.emplace_back(std::move(*item.reader_it_));
pending_.erase(item.reader_it_);
futures_.erase(future_it);
ReadNext(out);
} else {
*out = item.data_;
// continue read async
ReadAsync(item.reader_it_, &future_it);
}
} else {
out->clear();
}
try {
buffer_->Send(std::move(ins));
} catch (paddle::platform::EnforceNotMet e) {
VLOG(5) << "WARNING: The buffer channel has been closed. The prefetch "
"thread of file idx '"
<< reader_idx << "' will terminate.";
break;
}
private:
void AppendReader(std::unique_ptr<framework::ReaderBase>&& reader) override {
pending_.emplace_back(std::move(reader));
auto reader_it = pending_.end();
--reader_it;
futures_.emplace_back();
auto future_it = futures_.end();
--future_it;
ReadAsync(reader_it, &future_it);
}
void ReadAsync(const ReaderList::iterator& reader_it,
FutureList::iterator* future_it_ptr) {
auto& future_it = *future_it_ptr;
*future_it = pool_.enqueue([reader_it, future_it, this] {
try {
FutureItem item;
item.reader_it_ = reader_it;
(*reader_it)->ReadNext(&item.data_);
if (item.data_.empty()) {
(*reader_it)->Shutdown();
(*reader_it)->Start();
}
complete_queue_.Push(future_it);
return item;
} catch (...) {
FutureItem item;
item.exception_ = std::current_exception();
complete_queue_.Push(future_it);
return item;
}
});
}
FutureList futures_;
ThreadPool pool_;
framework::BlockingQueue<FutureList::iterator> complete_queue_;
std::list<std::unique_ptr<framework::ReaderBase>> pending_;
std::list<std::unique_ptr<framework::ReaderBase>> done_;
};
class MultiFileReader : public framework::ReaderBase {
public:
MultiFileReader(const std::vector<std::string>& file_names,
std::unique_ptr<IReaderContainer>&& container)
: container_(std::move(container)) {
for (auto& fn : file_names) {
container_->AppendReader(CreateReaderByFileName(fn));
}
}
if (!available_thread_idx_->Send(thread_idx)) {
VLOG(5) << "WARNING: The available_thread_idx_ channel has been closed. "
"Fail to send thread_idx.";
~MultiFileReader() { container_->Stop(); }
protected:
void ReadNextImpl(std::vector<framework::LoDTensor>* out) override {
container_->ReadNext(out);
}
VLOG(5) << "The prefetch thread of file idx '" << reader_idx
<< "' terminates.";
}
void ShutdownImpl() override { container_->Stop(); }
void StartImpl() override { container_->Start(); }
private:
std::unique_ptr<IReaderContainer> container_;
};
class OpenFilesOp : public framework::OperatorBase {
public:
......@@ -173,13 +223,27 @@ class OpenFilesOp : public framework::OperatorBase {
"shape concat's length.");
const auto& file_names = Attr<std::vector<std::string>>("file_names");
PADDLE_ENFORCE(!file_names.empty(), "No file to be read!");
const size_t thread_num = Attr<int>("thread_num");
const size_t buffer_size = Attr<int>("buffer_size");
bool is_test = Attr<bool>("is_test");
auto* out = scope.FindVar(Output("Out"))
->template GetMutable<framework::ReaderHolder>();
out->Reset(
std::make_shared<MultiFileReader>(file_names, thread_num, buffer_size));
std::unique_ptr<IReaderContainer> container;
if (is_test) {
container.reset(new OrderedReaderContainer());
} else {
container.reset(new PreemptiveReaderContainer(
static_cast<size_t>(Attr<int>("thread_num"))));
}
std::shared_ptr<framework::ReaderBase> reader(
new MultiFileReader(file_names, std::move(container)));
auto buffer_size = Attr<int>("buffer_size");
if (buffer_size > 1) {
reader = framework::MakeDecoratedReader<BufferedReader>(
reader, platform::CPUPlace(), buffer_size);
}
out->Reset(reader);
}
};
......@@ -187,9 +251,7 @@ class OpenFilesOpMaker : public FileReaderMakerBase {
protected:
void Apply() override {
AddAttr<std::vector<std::string>>("file_names", "Files to be read.");
AddAttr<int>("thread_num", "The maximal concurrent prefetch thread number.")
.GreaterThan(0);
AddAttr<int>("buffer_size", "The size of prefetch buffer.").GreaterThan(0);
AddAttr<bool>("is_test", "Used for testing data.").SetDefault(false);
AddComment(R"DOC(
OpenFiles Operator
......@@ -197,6 +259,11 @@ class OpenFilesOpMaker : public FileReaderMakerBase {
An OpenFilesOp creates a MultiFileReader, which is able to
read data multi-threaded from multiple files.
)DOC");
AddAttr<int>("thread_num",
"The maximal concurrent prefetch thread number. Used only "
"when is_test = False");
AddAttr<int>("buffer_size", "The reading buffer of these files.")
.GreaterThan(0);
}
};
......
......@@ -28,6 +28,7 @@ Scanner::Scanner(std::unique_ptr<std::istream> &&stream)
Scanner::Scanner(const std::string &filename)
: stream_(new std::ifstream(filename)), parser_(*stream_) {
PADDLE_ENFORCE(static_cast<bool>(*stream_), "Cannot open file %s", filename);
Reset();
}
......
......@@ -600,11 +600,11 @@ function main() {
cicheck)
cmake_gen ${PYTHON_ABI:-""}
build
assert_api_not_changed
run_test
gen_capi_package
gen_fluid_inference_lib
test_fluid_inference_lib
assert_api_not_changed
;;
*)
print_usage
......
......@@ -12,14 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import multiprocessing
import threading
from .. import core
from ..framework import convert_np_dtype_to_dtype_, default_main_program, default_startup_program, Program
from ..unique_name import generate as unique_name
from ..data_feeder import DataFeeder
from control_flow import BlockGuard
from ..layer_helper import LayerHelper
from layer_function_generator import templatedoc
from .. import core
from ..executor import global_scope
from layer_function_generator import generate_layer_fn, templatedoc
from ..framework import convert_np_dtype_to_dtype_, default_main_program, \
default_startup_program, program_guard, Program
from ..layer_helper import LayerHelper
from ..unique_name import generate as unique_name
__all__ = [
'data', 'open_recordio_file', 'open_files', 'read_file', 'shuffle', 'batch',
......@@ -445,7 +449,12 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True):
return monkey_patch_reader_methods(main_prog_var)
def py_reader(capacity, shapes, dtypes, lod_levels=None):
def py_reader(capacity,
shapes,
dtypes,
lod_levels=None,
name=None,
use_double_buffer=True):
"""
Create a reader and blocking queue for data feeding in Python
......@@ -458,10 +467,13 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None):
using `close()` method when unused.
Args:
use_double_buffer(bool): Whether use double buffer or not.
capacity(int): The maximum capacity of the BlockingQueue.
shapes(list): List of tuples which declaring data shapes.
dtypes(list): List of strs which declaring data type.
lod_levels(list): List of ints which declaring data lod_level.
shapes(list|tuple): List of tuples which declaring data shapes.
dtypes(list|tuple): List of strs which declaring data type.
lod_levels(list|tuple): List of ints which declaring data lod_level.
name(basestring): The prefix Python queue name and Reader name. None will
be generated automatically.
Returns:
tuple(Variable, BlockingQueue):
......@@ -502,15 +514,23 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None):
if lod_levels is None:
lod_levels = [0] * len(shapes)
queue_name = unique_name('lod_tensor_blocking_queue')
if name is None:
queue_name = unique_name('lod_tensor_blocking_queue')
reader_name = unique_name('create_py_reader')
double_buffer_name = unique_name('double_buffer')
else:
queue_name = "_".join([name, "queue"])
reader_name = "_".join([name, "reader"])
double_buffer_name = "_".join([name, "double_buffer"])
var = global_scope().var(queue_name)
feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes)
startup_blk = default_startup_program().current_block()
startup_var = startup_blk.create_var(name=unique_name('create_py_reader'))
startup_var = startup_blk.create_var(name=reader_name)
startup_blk.append_op(
type='create_py_reader',
inputs={'blocking_queue': queue_name},
inputs={'blocking_queue': [queue_name]},
outputs={'Out': [startup_var]},
attrs={
'shape_concat': shape_concat,
......@@ -524,17 +544,96 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None):
main_prog_var = _copy_reader_var_(default_main_program().current_block(),
startup_var)
return monkey_patch_reader_methods(main_prog_var), feed_queue
reader = monkey_patch_reader_methods(main_prog_var)
if use_double_buffer:
double_buffer_reader = double_buffer(reader, name=double_buffer_name)
# we return a double buffer reader. However, the reset method comes from
# py_reader.
double_buffer_reader.reset = reader.reset
reader = double_buffer_reader
# monkey patch py_reader special methods
reader.queue = feed_queue
current_reset_method = reader.reset
reader.thread = None
reader.tensor_provider = None
reader.exited = False
def start_provide_thread(func):
def __provider_thread__():
for tensors in func():
array = core.LoDTensorArray()
for item in tensors:
if not isinstance(item, core.LoDTensor):
tmp = core.LoDTensor()
tmp.set(item, core.CPUPlace())
item = tmp
array.append(item)
if reader.exited:
break
feed_queue.push(array)
if reader.exited:
break
feed_queue.close()
reader.thread = threading.Thread(target=__provider_thread__)
reader.thread.start()
def __set_tensor_provider__(func):
reader.tensor_provider = func
def __set_paddle_reader__(paddle_reader):
with program_guard(Program(), Program()):
feed_list = []
counter = 0
for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels):
name = str(counter)
feed_list.append(
data(
name=name,
dtype=dtype,
shape=shape,
lod_level=lod_level))
counter += 1
feeder = DataFeeder(feed_list=feed_list, place=core.CPUPlace())
paddle_reader = feeder.decorate_reader(
paddle_reader, multi_devices=False)
def __tensor_provider__():
for slots in paddle_reader():
yield [slots[str(idx)] for idx in xrange(counter)]
__set_tensor_provider__(__tensor_provider__)
def __reset__():
current_reset_method()
if reader.thread is not None and reader.tensor_provider is not None:
reader.exited = True
reader.thread.join()
reader.exited = False
def __start__():
start_provide_thread(reader.tensor_provider)
reader.reset = __reset__
reader.decorate_tensor_provider = __set_tensor_provider__
reader.decorate_paddle_reader = __set_paddle_reader__
reader.start = __start__
return reader
def open_files(filenames,
shapes,
lod_levels,
dtypes,
thread_num=1,
thread_num=None,
buffer_size=None,
pass_num=1,
for_parallel=True):
is_test=None):
"""
Open files
......@@ -547,14 +646,14 @@ def open_files(filenames,
shapes(list): List of tuples which declaring data shapes.
lod_levels(list): List of ints which declaring data lod_level.
dtypes(list): List of strs which declaring data type.
thread_num(int): The maximal concurrent prefetch thread number.
buffer_size(int|None): The size of prefetch buffer. If it is setted None,
buffer size will be thread_num * 3.
Default: None
thread_num(None): The number of thread to read files.
Default: min(len(filenames), cpu_number).
buffer_size(None): The buffer size of reader. Default: 3 * thread_num
pass_num(int): Number of passes to run.
for_parallel(Bool): Set it as True if you are going to run
subsequent operators in parallel.
Default: True
is_test(bool|None): Whether `open_files` used for testing or not. If it
is used for testing, the order of data generated is same as the file
order. Otherwise, it is not guaranteed the order of data is same
between every epoch. [Default: False].
Returns:
Variable: A Reader Variable via which we can get file data.
......@@ -566,15 +665,21 @@ def open_files(filenames,
'./data2.recordio'],
shapes=[(3,224,224), (1)],
lod_levels=[0, 0],
dtypes=['float32', 'int64'],
thread_num=2,
buffer_size=2)
dtypes=['float32', 'int64'])
# Via the reader, we can use 'read_file' layer to get data:
image, label = fluid.layers.io.read_file(reader)
"""
if thread_num is None:
thread_num = min(len(filenames), multiprocessing.cpu_count())
else:
thread_num = int(thread_num)
if buffer_size is None:
buffer_size = thread_num * 3
buffer_size = 3 * thread_num
else:
buffer_size = int(buffer_size)
if isinstance(filenames, basestring):
filenames = [filenames]
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
......@@ -588,17 +693,18 @@ def open_files(filenames,
multi_file_reader_name = unique_name('multi_file_reader')
startup_blk = default_startup_program().current_block()
startup_reader = startup_blk.create_var(name=multi_file_reader_name)
attrs = {
'shape_concat': shape_concat,
'lod_levels': lod_levels,
'ranks': ranks,
'file_names': filenames,
'thread_num': thread_num,
'buffer_size': buffer_size
}
if is_test is not None:
attrs['is_test'] = is_test
startup_blk.append_op(
type='open_files',
outputs={'Out': [startup_reader]},
attrs={
'shape_concat': shape_concat,
'lod_levels': lod_levels,
'ranks': ranks,
'file_names': filenames,
'thread_num': thread_num,
'buffer_size': buffer_size
})
type='open_files', outputs={'Out': [startup_reader]}, attrs=attrs)
startup_reader.desc.set_dtypes(dtypes)
startup_reader.persistable = True
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy
import paddle
import paddle.dataset.mnist as mnist
import paddle.fluid as fluid
import paddle.v2
def network(is_train):
reader = fluid.layers.py_reader(
capacity=10,
shapes=((-1, 784), (-1, 1)),
dtypes=('float32', 'int64'),
name="train_reader" if is_train else "test_reader")
img, label = fluid.layers.read_file(reader)
hidden = img
for i in xrange(2):
hidden = fluid.layers.fc(input=hidden, size=100, act='tanh')
hidden = fluid.layers.dropout(
hidden, dropout_prob=0.5, is_test=not is_train)
prediction = fluid.layers.fc(input=hidden, size=10, act='softmax')
loss = fluid.layers.cross_entropy(input=prediction, label=label)
return fluid.layers.mean(loss), reader
def main():
train_prog = fluid.Program()
startup_prog = fluid.Program()
with fluid.program_guard(train_prog, startup_prog):
with fluid.unique_name.guard():
loss, train_reader = network(True)
adam = fluid.optimizer.Adam(learning_rate=0.01)
adam.minimize(loss)
test_prog = fluid.Program()
test_startup = fluid.Program()
with fluid.program_guard(test_prog, test_startup):
with fluid.unique_name.guard():
test_loss, test_reader = network(False)
fluid.Executor(fluid.CUDAPlace(0)).run(startup_prog)
fluid.Executor(fluid.CUDAPlace(0)).run(test_startup)
trainer = fluid.ParallelExecutor(
use_cuda=True, loss_name=loss.name, main_program=train_prog)
tester = fluid.ParallelExecutor(
use_cuda=True, share_vars_from=trainer, main_program=test_prog)
train_reader.decorate_paddle_reader(
paddle.v2.reader.shuffle(
paddle.batch(mnist.train(), 512), buf_size=8192))
test_reader.decorate_paddle_reader(paddle.batch(mnist.test(), 512))
for epoch_id in xrange(10):
train_reader.start()
try:
while True:
print 'train_loss', numpy.array(
trainer.run(fetch_list=[loss.name]))
except fluid.core.EOFException:
print 'End of epoch', epoch_id
train_reader.reset()
test_reader.start()
try:
while True:
print 'test loss', numpy.array(
tester.run(fetch_list=[test_loss.name]))
except fluid.core.EOFException:
print 'End of testing'
test_reader.reset()
if __name__ == '__main__':
main()
......@@ -31,7 +31,10 @@ def load_vocab(filename):
# load word dict with paddle inner function
word_dict = load_vocab(sys.argv[1])
if len(sys.argv) == 1:
word_dict = paddle.dataset.imdb.word_dict()
else:
word_dict = load_vocab(sys.argv[1])
word_dict["<unk>"] = len(word_dict)
print "Dict dim = ", len(word_dict)
......
......@@ -41,16 +41,14 @@ def network_cfg(is_train, pass_num=100):
pass_num=pass_num,
shapes=[[-1, 1], [-1, 1]],
lod_levels=[1, 0],
dtypes=['int64', 'int64'],
thread_num=1)
dtypes=['int64', 'int64'])
test_file_obj = fluid.layers.open_files(
filenames=TEST_FILES,
pass_num=1,
shapes=[[-1, 1], [-1, 1]],
lod_levels=[1, 0],
dtypes=['int64', 'int64'],
thread_num=1)
dtypes=['int64', 'int64'])
if is_train:
file_obj = fluid.layers.shuffle(train_file_obj, buffer_size=1000)
......
......@@ -142,8 +142,7 @@ class TestDataBalance(unittest.TestCase):
filenames=[self.lod_data_file_name],
shapes=[[-1, 3], [-1, 1]],
lod_levels=[1, 0],
dtypes=['float32', 'int32'],
thread_num=1)
dtypes=['float32', 'int32'])
ins, label = fluid.layers.read_file(data_reader)
place = fluid.CUDAPlace(0) if self.use_cuda else fluid.CPUPlace()
......@@ -156,7 +155,7 @@ class TestDataBalance(unittest.TestCase):
main_program=main_prog,
build_strategy=build_strategy)
if (parallel_exe.device_count > self.batch_size):
if parallel_exe.device_count > self.batch_size:
print("WARNING: Unittest TestDataBalance skipped. \
For the result is not correct when device count \
is larger than batch size.")
......@@ -190,3 +189,7 @@ class TestDataBalance(unittest.TestCase):
def test_all(self):
self.main()
self.main_lod()
if __name__ == '__main__':
unittest.main()
......@@ -39,17 +39,17 @@ class TestMultipleReader(unittest.TestCase):
copyfile('./mnist_0.recordio', './mnist_1.recordio')
copyfile('./mnist_0.recordio', './mnist_2.recordio')
def main(self, thread_num):
def main(self, is_test=False):
file_list = [
'./mnist_0.recordio', './mnist_1.recordio', './mnist_2.recordio'
]
with fluid.program_guard(fluid.Program(), fluid.Program()):
data_files = fluid.layers.open_files(
filenames=file_list,
thread_num=thread_num,
shapes=[(-1, 784), (-1, 1)],
lod_levels=[0, 0],
dtypes=['float32', 'int64'])
dtypes=['float32', 'int64'],
is_test=is_test)
img, label = fluid.layers.read_file(data_files)
if fluid.core.is_compiled_with_cuda():
......@@ -71,6 +71,9 @@ class TestMultipleReader(unittest.TestCase):
self.assertEqual(batch_count, self.num_batch * 3)
def test_main(self):
self.main(thread_num=3) # thread number equals to file number
self.main(thread_num=10) # thread number is larger than file number
self.main(thread_num=2) # thread number is less than file number
self.main(is_test=False)
self.main(is_test=True)
if __name__ == '__main__':
unittest.main()
......@@ -33,9 +33,7 @@ def simple_fc_net(use_feed):
filenames=[MNIST_RECORDIO_FILE],
shapes=[[-1, 784], [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'],
thread_num=1,
for_parallel=True)
dtypes=['float32', 'int64'])
reader = fluid.layers.io.double_buffer(reader)
img, label = fluid.layers.read_file(reader)
hidden = img
......@@ -61,9 +59,7 @@ def fc_with_batchnorm(use_feed):
filenames=[MNIST_RECORDIO_FILE],
shapes=[[-1, 784], [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'],
thread_num=1,
for_parallel=True)
dtypes=['float32', 'int64'])
reader = fluid.layers.io.double_buffer(reader)
img, label = fluid.layers.read_file(reader)
......
......@@ -45,12 +45,12 @@ class TestPyReader(unittest.TestCase):
) else fluid.CPUPlace()
executor = fluid.Executor(place)
data_file, feed_queue = fluid.layers.py_reader(
data_file = fluid.layers.py_reader(
capacity=self.capacity,
dtypes=self.dtypes,
lod_levels=self.lod_levels,
shapes=self.shapes)
feed_queue = data_file.queue
read_out_data = fluid.layers.read_file(data_file)
self.inputs = []
......
......@@ -52,11 +52,13 @@ def simple_fc_net(in_size,
batch_size,
queue_capacity,
use_double_buffer=False):
reader, feed_queue = fluid.layers.py_reader(
reader = fluid.layers.py_reader(
capacity=queue_capacity,
shapes=[[-1, in_size], [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'])
dtypes=['float32', 'int64'],
use_double_buffer=False)
feed_queue = reader.queue
reader = fluid.layers.batch(reader, batch_size=batch_size)
if use_double_buffer:
reader = fluid.layers.double_buffer(reader)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册