未验证 提交 b789a3a4 编写于 作者: Y yuyang18

Change code

上级 401e92f6
...@@ -28,15 +28,15 @@ BufferedReader::BufferedReader( ...@@ -28,15 +28,15 @@ BufferedReader::BufferedReader(
buffer_size_(buffer_size) { buffer_size_(buffer_size) {
cpu_buffer_.resize(buffer_size); cpu_buffer_.resize(buffer_size);
gpu_buffer_.resize(buffer_size); gpu_buffer_.resize(buffer_size);
AppendFutureToBatchSize(); ReadTillBufferFullAsync();
} }
void BufferedReader::AppendFutureToBatchSize() { void BufferedReader::ReadTillBufferFullAsync() {
PADDLE_ENFORCE_EQ(position_.size(), 0U); PADDLE_ENFORCE_EQ(position_.size(), 0U);
for (size_t i = 0; i < buffer_size_; ++i) { for (size_t i = 0; i < buffer_size_; ++i) {
AppendFuture(i); ReadAsync(i);
} }
} }
void BufferedReader::AppendFuture(size_t i) { void BufferedReader::ReadAsync(size_t i) {
position_.emplace(thread_pool_.enqueue([this, i]() -> size_t { position_.emplace(thread_pool_.enqueue([this, i]() -> size_t {
TensorVec &cpu = cpu_buffer_[i]; TensorVec &cpu = cpu_buffer_[i];
reader_->ReadNext(&cpu); reader_->ReadNext(&cpu);
...@@ -50,6 +50,7 @@ void BufferedReader::AppendFuture(size_t i) { ...@@ -50,6 +50,7 @@ void BufferedReader::AppendFuture(size_t i) {
gpu.resize(cpu.size()); gpu.resize(cpu.size());
for (size_t i = 0; i < cpu.size(); ++i) { for (size_t i = 0; i < cpu.size(); ++i) {
framework::TensorCopySync(cpu[i], place_, &gpu[i]); framework::TensorCopySync(cpu[i], place_, &gpu[i]);
gpu[i].set_lod(cpu[i].lod());
} }
} }
return i; return i;
...@@ -60,10 +61,11 @@ void BufferedReader::ShutdownImpl() { ...@@ -60,10 +61,11 @@ void BufferedReader::ShutdownImpl() {
while (!position_.empty()) { while (!position_.empty()) {
position_.pop(); position_.pop();
} }
prev_pos_ = -1UL;
} }
void BufferedReader::StartImpl() { void BufferedReader::StartImpl() {
reader_->Start(); reader_->Start();
AppendFutureToBatchSize(); ReadTillBufferFullAsync();
} }
void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) { void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) {
if (position_.empty()) { if (position_.empty()) {
...@@ -79,7 +81,14 @@ void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) { ...@@ -79,7 +81,14 @@ void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) {
} }
*out = platform::is_gpu_place(place_) ? gpu_buffer_[i] : cpu_buffer_[i]; *out = platform::is_gpu_place(place_) ? gpu_buffer_[i] : cpu_buffer_[i];
AppendFuture(i);
// Do not push current position into ReadAsync. Push the previous position
// Since all computation in fluid are async, change the data of
// current position may cause data error.
if (prev_pos_ != -1Ul) {
ReadAsync(prev_pos_);
}
prev_pos_ = i;
} }
} // namespace reader } // namespace reader
......
...@@ -35,9 +35,9 @@ class BufferedReader : public framework::DecoratedReader { ...@@ -35,9 +35,9 @@ class BufferedReader : public framework::DecoratedReader {
~BufferedReader() override; ~BufferedReader() override;
private: private:
void AppendFutureToBatchSize(); void ReadTillBufferFullAsync();
void AppendFuture(size_t i); void ReadAsync(size_t i);
protected: protected:
void ShutdownImpl() override; void ShutdownImpl() override;
...@@ -50,8 +50,15 @@ class BufferedReader : public framework::DecoratedReader { ...@@ -50,8 +50,15 @@ class BufferedReader : public framework::DecoratedReader {
const size_t buffer_size_; const size_t buffer_size_;
std::queue<std::future<size_t>> position_; std::queue<std::future<size_t>> position_;
// The buffer for reading data.
// NOTE: the simplest way to implement buffered reader is do not use any
// buffer, just async read and create futures as buffer size. However, to
// malloc Tensor every time is extremely slow. Here we store all data in
// buffers and prevent alloc every time.
std::vector<TensorVec> cpu_buffer_; std::vector<TensorVec> cpu_buffer_;
std::vector<TensorVec> gpu_buffer_; std::vector<TensorVec> gpu_buffer_;
size_t prev_pos_{-1UL};
}; };
} // namespace reader } // namespace reader
......
...@@ -45,12 +45,12 @@ class TestPyReader(unittest.TestCase): ...@@ -45,12 +45,12 @@ class TestPyReader(unittest.TestCase):
) else fluid.CPUPlace() ) else fluid.CPUPlace()
executor = fluid.Executor(place) executor = fluid.Executor(place)
data_file, feed_queue = fluid.layers.py_reader( data_file = fluid.layers.py_reader(
capacity=self.capacity, capacity=self.capacity,
dtypes=self.dtypes, dtypes=self.dtypes,
lod_levels=self.lod_levels, lod_levels=self.lod_levels,
shapes=self.shapes) shapes=self.shapes)
feed_queue = data_file.queue
read_out_data = fluid.layers.read_file(data_file) read_out_data = fluid.layers.read_file(data_file)
self.inputs = [] self.inputs = []
......
...@@ -52,11 +52,12 @@ def simple_fc_net(in_size, ...@@ -52,11 +52,12 @@ def simple_fc_net(in_size,
batch_size, batch_size,
queue_capacity, queue_capacity,
use_double_buffer=False): use_double_buffer=False):
reader, feed_queue = fluid.layers.py_reader( reader = fluid.layers.py_reader(
capacity=queue_capacity, capacity=queue_capacity,
shapes=[[-1, in_size], [-1, 1]], shapes=[[-1, in_size], [-1, 1]],
lod_levels=[0, 0], lod_levels=[0, 0],
dtypes=['float32', 'int64']) dtypes=['float32', 'int64'])
feed_queue = reader.queue
reader = fluid.layers.batch(reader, batch_size=batch_size) reader = fluid.layers.batch(reader, batch_size=batch_size)
if use_double_buffer: if use_double_buffer:
reader = fluid.layers.double_buffer(reader) reader = fluid.layers.double_buffer(reader)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册