未验证 提交 aa3f5058 编写于 作者: F fengjiayi 提交者: GitHub

Merge pull request #8841 from JiayiFeng/dev_double_buffer_for_cpp_reader

Basic double buffer for cpp reader
......@@ -20,9 +20,8 @@ class ReaderBase {
PADDLE_ENFORCE(!shapes_.empty());
}
// Read the next batch of data. (A 'batch' can be only one instance)
// If the next batch doesn't exist, the '*out' will be an empty std::vector.
virtual void ReadNext(std::vector<LoDTensor>* out) = 0;
// Show whether the next bacth exists.
virtual bool HasNext() const = 0;
// Reinitialize the reader and read the file from the begin.
virtual void ReInit() = 0;
......
......@@ -26,7 +26,6 @@ class ReaderBase {
PADDLE_ENFORCE(!shapes_.empty());
}
virtual void ReadNext(std::vector<LoDTensor>* out) = 0;
virtual bool HasNext() const = 0;
virtual void ReInit() = 0;
......@@ -52,8 +51,6 @@ class DecoratedReader : public ReaderBase {
PADDLE_ENFORCE_NOT_NULL(reader_);
}
bool HasNext() const override { return reader_->HasNext(); }
void ReInit() override { reader_->ReInit(); }
protected:
......@@ -69,7 +66,6 @@ class ReaderHolder {
ReaderBase* Get() const { return reader_.get(); }
void ReadNext(std::vector<LoDTensor>* out) { reader_->ReadNext(out); }
bool HasNext() const { return reader_->HasNext(); }
void ReInit() { reader_->ReInit(); }
DDim shape(size_t idx) const { return reader_->shape(idx); }
......
......@@ -60,15 +60,16 @@ class ReadOp : public framework::OperatorBase {
const platform::Place& dev_place) const override {
framework::ReaderHolder* reader =
scope.FindVar(Input("Reader"))->GetMutable<framework::ReaderHolder>();
if (!reader->HasNext()) {
std::vector<std::string> out_arg_names = Outputs("Out");
std::vector<framework::LoDTensor> ins;
reader->ReadNext(&ins);
if (ins.empty()) {
reader->ReInit();
reader->ReadNext(&ins);
PADDLE_ENFORCE(
reader->HasNext(),
!ins.empty(),
"Reader can not read the next data even it has been re-initialized.");
}
std::vector<std::string> out_arg_names = Outputs("Out");
std::vector<framework::LoDTensor> ins;
reader->ReadNext(&ins);
PADDLE_ENFORCE_EQ(ins.size(), out_arg_names.size());
for (size_t i = 0; i < ins.size(); ++i) {
auto* out =
......
......@@ -2,4 +2,5 @@ cc_library(reader_op_registry SRCS reader_op_registry.cc DEPS operator op_regist
op_library(create_random_data_generator_op SRCS create_random_data_generator_op.cc DEPS reader_op_registry)
op_library(create_shuffle_reader_op SRCS create_shuffle_reader_op.cc DEPS reader_op_registry)
op_library(create_batch_reader_op SRCS create_batch_reader_op.cc DEPS reader_op_registry)
set(READER_LIBRARY create_random_data_generator_op create_shuffle_reader_op create_batch_reader_op PARENT_SCOPE)
op_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc DEPS reader_op_registry)
set(READER_LIBRARY create_random_data_generator_op create_shuffle_reader_op create_batch_reader_op create_double_buffer_reader_op PARENT_SCOPE)
......@@ -68,10 +68,10 @@ void BatchReader::ReadNext(std::vector<framework::LoDTensor>* out) {
buffer_.clear();
buffer_.reserve(batch_size_);
for (int i = 0; i < batch_size_; ++i) {
if (reader_->HasNext()) {
buffer_.push_back(std::vector<framework::LoDTensor>());
reader_->ReadNext(&buffer_.back());
} else {
if (buffer_.back().empty()) {
buffer_.pop_back();
break;
}
}
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <thread>
#include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h"
namespace paddle {
namespace operators {
namespace reader {
static constexpr size_t kDoubleBufferSize = 2;
class DoubleBufferReader : public framework::DecoratedReader {
public:
explicit DoubleBufferReader(ReaderBase* reader)
: DecoratedReader(reader),
buffer_(framework::MakeChannel<std::vector<framework::LoDTensor>>(
kDoubleBufferSize)) {
std::thread prefetch(&DoubleBufferReader::PrefetchThreadFunc, this);
prefetch.detach();
}
void ReadNext(std::vector<framework::LoDTensor>* out) override;
void ReInit() override;
~DoubleBufferReader() { buffer_->Close(); }
private:
void PrefetchThreadFunc();
framework::Channel<std::vector<framework::LoDTensor>>* buffer_;
};
class CreateDoubleBufferReaderOp : public framework::OperatorBase {
public:
using framework::OperatorBase::OperatorBase;
private:
void RunImpl(const framework::Scope& scope,
const platform::Place& dev_place) const override {
const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader"))
->Get<framework::ReaderHolder>();
auto* out = scope.FindVar(Output("Out"))
->template GetMutable<framework::ReaderHolder>();
out->Reset(new DoubleBufferReader(underlying_reader.Get()));
}
};
class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase {
public:
CreateDoubleBufferReaderOpMaker(OpProto* op_proto, OpAttrChecker* op_checker)
: DecoratedReaderMakerBase(op_proto, op_checker) {
AddComment(R"DOC(
CreateDoubleBufferReader Operator
A double buffer reader takes another reader as its 'underlying reader'.
It launches another thread to execute the 'underlying reader' asynchronously,
which prevents reading process from blocking subsequent training.
)DOC");
}
};
void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) {
out->clear();
buffer_->Receive(out);
}
void DoubleBufferReader::ReInit() {
reader_->ReInit();
buffer_->Close();
// The existing prefetch thread will terminate for the buffer_ is closed.
buffer_ = framework::MakeChannel<std::vector<framework::LoDTensor>>(
kDoubleBufferSize);
std::thread prefetch(&DoubleBufferReader::PrefetchThreadFunc, this);
prefetch.detach();
}
void DoubleBufferReader::PrefetchThreadFunc() {
VLOG(5) << "A new prefetch thread starts.";
while (true) {
std::vector<framework::LoDTensor> batch;
reader_->ReadNext(&batch);
if (batch.empty()) {
// EOF
buffer_->Close();
VLOG(5) << "Reached the end of the file. The prefetch thread terminates.";
break;
}
if (!buffer_->Send(&batch)) {
VLOG(5) << "WARNING: The double buffer channel has been closed. The "
"prefetch thread terminates.";
break;
}
}
}
} // namespace reader
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators::reader;
REGISTER_DECORATED_READER_OPERATOR(create_double_buffer_reader,
ops::CreateDoubleBufferReaderOp,
ops::CreateDoubleBufferReaderOpMaker);
......@@ -50,8 +50,6 @@ class RandomDataGenerator : public framework::FileReader {
}
}
bool HasNext() const override { return true; }
void ReInit() override { return; }
private:
......
......@@ -39,10 +39,10 @@ void ShuffleReader::ReadNext(std::vector<framework::LoDTensor>* out) {
buffer_.clear();
buffer_.reserve(buffer_size_);
for (int i = 0; i < buffer_size_; ++i) {
if (reader_->HasNext()) {
buffer_.push_back(std::vector<framework::LoDTensor>());
reader_->ReadNext(&buffer_.back());
} else {
if (buffer_.back().empty()) {
buffer_.pop_back();
break;
}
}
......
......@@ -15,16 +15,30 @@
import paddle.v2 as paddle
import paddle.fluid as fluid
import numpy as np
import sys
prog = fluid.framework.Program()
block = prog.current_block()
startup_prog = fluid.framework.Program()
startup_block = startup_prog.current_block()
random_reader = block.create_var(
random_reader = startup_block.create_var(
type=fluid.core.VarDesc.VarType.READER, name="RandomDataGenerator")
random_reader.desc.set_dtypes(
[fluid.core.VarDesc.VarType.FP32, fluid.core.VarDesc.VarType.FP32])
random_reader.persistable = True
shuffle_reader = startup_block.create_var(
type=fluid.core.VarDesc.VarType.READER, name="ShuffleReader")
shuffle_reader.persistable = True
batch_reader = startup_block.create_var(
type=fluid.core.VarDesc.VarType.READER, name="BatchReader")
batch_reader.persistable = True
double_buffer = startup_block.create_var(
type=fluid.core.VarDesc.VarType.READER, name="DoubleBuffer")
double_buffer.persistable = True
main_prog = startup_prog.clone()
main_block = main_prog.current_block()
create_random_data_generator_op = block.append_op(
create_random_data_generator_op = startup_block.append_op(
type="create_random_data_generator",
outputs={"Out": random_reader},
attrs={
......@@ -34,37 +48,45 @@ create_random_data_generator_op = block.append_op(
"max": 1.0,
'lod_levels': [0, 0]
})
shuffle_reader = block.create_var(
type=fluid.core.VarDesc.VarType.READER, name="ShuffleReader")
create_shuffle_reader_op = block.append_op(
create_shuffle_reader_op = startup_block.append_op(
type="create_shuffle_reader",
inputs={"UnderlyingReader": random_reader},
outputs={"Out": shuffle_reader},
attrs={"buffer_size": 7})
batch_reader = block.create_var(
type=fluid.core.VarDesc.VarType.READER, name="BatchReader")
create_batch_reader_op = block.append_op(
create_batch_reader_op = startup_block.append_op(
type="create_batch_reader",
inputs={"UnderlyingReader": shuffle_reader},
outputs={"Out": batch_reader},
attrs={"batch_size": 10})
out1 = block.create_var(type=fluid.core.VarDesc.VarType.LOD_TENSOR, name="Out1")
out2 = block.create_var(type=fluid.core.VarDesc.VarType.LOD_TENSOR, name="Out2")
create_double_buffer_reader_op = startup_block.append_op(
type="create_double_buffer_reader",
inputs={"UnderlyingReader": batch_reader},
outputs={"Out": double_buffer})
out1 = main_block.create_var(
type=fluid.core.VarDesc.VarType.LOD_TENSOR, name="Out1")
out2 = main_block.create_var(
type=fluid.core.VarDesc.VarType.LOD_TENSOR, name="Out2")
read_op = block.append_op(
type="read", inputs={"Reader": batch_reader},
main_block.var("DoubleBuffer").desc.set_shapes(double_buffer.desc.shapes())
main_block.var("DoubleBuffer").desc.set_dtypes(double_buffer.desc.dtypes())
main_block.var("DoubleBuffer").desc.set_lod_levels(
double_buffer.desc.lod_levels())
read_op = main_block.append_op(
type="read",
inputs={"Reader": double_buffer},
outputs={"Out": [out1, out2]})
place = fluid.CPUPlace()
exe = fluid.Executor(place)
[res1, res2] = exe.run(prog, fetch_list=[out1, out2])
exe.run(startup_prog)
if not (res1.shape == (10, 2) and res2.shape == (10, 1)):
for i in range(1, 100):
[res1, res2] = exe.run(main_prog, fetch_list=[out1, out2])
if not (res1.shape == (10, 2) and res2.shape == (10, 1)):
exit(1)
exit(0)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册