diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index 30c004b412670202862dae7c11189c7f14155da7..69d665b80fde22c10d9d57687b0e45dae7291969 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -180,13 +180,13 @@ paddle.fluid.layers.log ArgSpec(args=['x'], varargs=None, keywords=None, default paddle.fluid.layers.crop ArgSpec(args=['x', 'shape', 'offsets', 'name'], varargs=None, keywords=None, defaults=(None, None, None)) paddle.fluid.layers.data ArgSpec(args=['name', 'shape', 'append_batch_size', 'dtype', 'lod_level', 'type', 'stop_gradient'], varargs=None, keywords=None, defaults=(True, 'float32', 0, VarType.LOD_TENSOR, True)) paddle.fluid.layers.open_recordio_file ArgSpec(args=['filename', 'shapes', 'lod_levels', 'dtypes', 'pass_num', 'for_parallel'], varargs=None, keywords=None, defaults=(1, True)) -paddle.fluid.layers.open_files ArgSpec(args=['filenames', 'shapes', 'lod_levels', 'dtypes', 'thread_num', 'buffer_size', 'pass_num', 'for_parallel'], varargs=None, keywords=None, defaults=(1, None, 1, True)) +paddle.fluid.layers.open_files ArgSpec(args=['filenames', 'shapes', 'lod_levels', 'dtypes', 'thread_num', 'buffer_size', 'pass_num', 'is_test'], varargs=None, keywords=None, defaults=(None, None, 1, None)) paddle.fluid.layers.read_file ArgSpec(args=['reader'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.shuffle ArgSpec(args=['reader', 'buffer_size'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.batch ArgSpec(args=['reader', 'batch_size'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.double_buffer ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None)) paddle.fluid.layers.random_data_generator ArgSpec(args=['low', 'high', 'shapes', 'lod_levels', 'for_parallel'], varargs=None, keywords=None, defaults=(True,)) -paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels'], varargs=None, keywords=None, defaults=(None,)) +paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, None, True)) paddle.fluid.layers.Preprocessor.__init__ ArgSpec(args=['self', 'reader', 'name'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.layers.Preprocessor.block ArgSpec(args=[], varargs='args', keywords='kwds', defaults=None) paddle.fluid.layers.Preprocessor.inputs ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index 38cde13fe279d264c51baff71cffcab7b6ebb227..f85c62dd6c4a8033a037b1e001ece6a9cc90ca98 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -171,7 +171,12 @@ void ThreadedSSAGraphExecutor::InsertFetchOps( for (size_t i = 0; i < fetch_tensors.size(); ++i) { auto &var_name = fetch_tensors[i]; - auto &vars = fetched_vars.at(var_name); + auto fetched_var_it = fetched_vars.find(var_name); + PADDLE_ENFORCE(fetched_var_it != fetched_vars.end(), + "Cannot find fetched variable.(Perhaps the main_program " + "is not set to ParallelExecutor)"); + + auto &vars = fetched_var_it->second; temp_nodes->emplace_back(new ir::Node("fetch", ir::Node::Type::kOperation)); auto *op = new FetchOpHandle(temp_nodes->back().get(), fetch_data, i, diff --git a/paddle/fluid/framework/lod_tensor.cc b/paddle/fluid/framework/lod_tensor.cc index cba0064f38f89c1dd27cfac1ddb2339a5ee6c93f..919029c38f2f26a6f5e02da645c4f7718044cdae 100644 --- a/paddle/fluid/framework/lod_tensor.cc +++ b/paddle/fluid/framework/lod_tensor.cc @@ -312,19 +312,22 @@ void WriteToRecordIO(recordio::Writer *writer, writer->Write(buffer.str()); } -std::vector ReadFromRecordIO( - recordio::Scanner *scanner, const platform::DeviceContext &dev_ctx) { - std::vector result; - if (scanner->HasNext()) { - std::istringstream sin(scanner->Next()); - uint32_t sz; - sin.read(reinterpret_cast(&sz), sizeof(uint32_t)); - result.resize(sz); - for (uint32_t i = 0; i < sz; ++i) { - DeserializeFromStream(sin, &result[i], dev_ctx); - } +bool ReadFromRecordIO(recordio::Scanner *scanner, + const platform::DeviceContext &dev_ctx, + std::vector *result_ptr) { + if (!scanner->HasNext()) { + return false; } - return result; + std::istringstream sin(scanner->Next()); + uint32_t sz; + sin.read(reinterpret_cast(&sz), sizeof(uint32_t)); + auto &result = *result_ptr; + result.resize(sz); + for (uint32_t i = 0; i < sz; ++i) { + DeserializeFromStream(sin, &result[i], dev_ctx); + } + + return true; } std::vector LoDTensor::SplitLoDTensor( diff --git a/paddle/fluid/framework/lod_tensor.h b/paddle/fluid/framework/lod_tensor.h index 4a2729373b5c63176ed1e856f4acf29fd1e73254..e9b473d547252e80ed26ec61e1a33fbe1742dbe0 100644 --- a/paddle/fluid/framework/lod_tensor.h +++ b/paddle/fluid/framework/lod_tensor.h @@ -223,8 +223,9 @@ extern void WriteToRecordIO(recordio::Writer* writer, const std::vector& tensor, const platform::DeviceContext& dev_ctx); -extern std::vector ReadFromRecordIO( - recordio::Scanner* scanner, const platform::DeviceContext& dev_ctx); +extern bool ReadFromRecordIO(recordio::Scanner* scanner, + const platform::DeviceContext& dev_ctx, + std::vector* result_ptr); /* * Convert between length-based LoD and offset-based LoD. diff --git a/paddle/fluid/framework/lod_tensor_test.cc b/paddle/fluid/framework/lod_tensor_test.cc index 38d3cd96d65f0a54b0ea87b4c677013f3802adfb..cd50aaa26054b78f1b1e8f0d470b397892155a2b 100644 --- a/paddle/fluid/framework/lod_tensor_test.cc +++ b/paddle/fluid/framework/lod_tensor_test.cc @@ -301,11 +301,12 @@ static void TestRecordIO() { { std::unique_ptr stream_ptr(stream); recordio::Scanner scanner(std::move(stream_ptr)); - auto tensors = ReadFromRecordIO(&scanner, ctx); + std::vector tensors; + ASSERT_TRUE(ReadFromRecordIO(&scanner, ctx, &tensors)); ASSERT_EQ(tensors.size(), static_cast(2)); assert_tensor_ok(tensors[0]); assert_tensor_ok(tensors[1]); - tensors = ReadFromRecordIO(&scanner, ctx); + ASSERT_TRUE(ReadFromRecordIO(&scanner, ctx, &tensors)); ASSERT_EQ(tensors.size(), static_cast(2)); assert_tensor_ok(tensors[0]); assert_tensor_ok(tensors[1]); diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc index 5897d320a8b7e5af541098cadff8e78f8324949c..40eafda9bf294f7e8ddd067e9014447f4de1cc0e 100644 --- a/paddle/fluid/framework/reader.cc +++ b/paddle/fluid/framework/reader.cc @@ -67,7 +67,8 @@ void ReaderBase::Start() { } } -ReaderBase::~ReaderBase() { Shutdown(); } +ReaderBase::~ReaderBase() {} +DecoratedReader::~DecoratedReader() { reader_->Shutdown(); } } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index a8d04feb42456607159bcbede0574fe90dfe995c..82562bf883d88787858912f7039cf8fef003eccf 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -25,8 +25,6 @@ namespace paddle { namespace framework { -enum ReaderStatus { kRunning, kStopped }; - class ReaderBase { public: virtual void ReadNext(std::vector* out); @@ -48,6 +46,8 @@ class ReaderBase { virtual void StartImpl() {} + enum ReaderStatus { kRunning, kStopped }; + ReaderStatus status_{kRunning}; mutable std::mutex mu_; @@ -74,6 +74,8 @@ class DecoratedReader : public ReaderBase, reader_->InsertDecoratedReader(shared_from_this()); } + ~DecoratedReader(); + protected: void ShutdownImpl() override { reader_->Shutdown(); } diff --git a/paddle/fluid/operators/reader/CMakeLists.txt b/paddle/fluid/operators/reader/CMakeLists.txt index 9dbcc35e6f5bb01c159980a49dd4b4c9d37d2aab..728197377df04df8c993a48bc282431473fe9959 100644 --- a/paddle/fluid/operators/reader/CMakeLists.txt +++ b/paddle/fluid/operators/reader/CMakeLists.txt @@ -15,12 +15,13 @@ function(reader_library TARGET_NAME) PARENT_SCOPE) endfunction() -reader_library(open_files_op SRCS open_files_op.cc) +cc_library(buffered_reader SRCS buffered_reader.cc DEPS reader simple_threadpool) +reader_library(open_files_op SRCS open_files_op.cc DEPS buffered_reader) reader_library(create_random_data_generator_op SRCS create_random_data_generator_op.cc) reader_library(create_shuffle_reader_op SRCS create_shuffle_reader_op.cc) reader_library(create_batch_reader_op SRCS create_batch_reader_op.cc) reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_op.cc) -reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc) +reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc DEPS buffered_reader) reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc) reader_library(create_custom_reader_op SRCS create_custom_reader_op.cc) reader_library(create_py_reader_op SRCS create_py_reader_op.cc) diff --git a/paddle/fluid/operators/reader/buffered_reader.cc b/paddle/fluid/operators/reader/buffered_reader.cc new file mode 100644 index 0000000000000000000000000000000000000000..ba1b3d3e583868c5c360d4766e14c15538b22b33 --- /dev/null +++ b/paddle/fluid/operators/reader/buffered_reader.cc @@ -0,0 +1,96 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/operators/reader/buffered_reader.h" +#include + +namespace paddle { +namespace operators { +namespace reader { +BufferedReader::~BufferedReader() { reader_->Shutdown(); } +BufferedReader::BufferedReader( + const std::shared_ptr &reader, + const platform::Place &place, size_t buffer_size) + : framework::DecoratedReader(reader), + thread_pool_(1), + place_(place), + buffer_size_(buffer_size) { + cpu_buffer_.resize(buffer_size); + gpu_buffer_.resize(buffer_size); + ReadTillBufferFullAsync(); +} +void BufferedReader::ReadTillBufferFullAsync() { + PADDLE_ENFORCE_EQ(position_.size(), 0U); + for (size_t i = 0; i < buffer_size_; ++i) { + ReadAsync(i); + } +} +void BufferedReader::ReadAsync(size_t i) { + position_.emplace(thread_pool_.enqueue([this, i]() -> size_t { + TensorVec &cpu = cpu_buffer_[i]; + reader_->ReadNext(&cpu); + + if (cpu.empty()) { + return -1UL; + } + + if (platform::is_gpu_place(place_)) { + TensorVec &gpu = gpu_buffer_[i]; + gpu.resize(cpu.size()); + for (size_t i = 0; i < cpu.size(); ++i) { + framework::TensorCopySync(cpu[i], place_, &gpu[i]); + gpu[i].set_lod(cpu[i].lod()); + } + } + return i; + })); +} +void BufferedReader::ShutdownImpl() { + reader_->Shutdown(); + while (!position_.empty()) { + position_.pop(); + } + prev_pos_ = -1UL; +} +void BufferedReader::StartImpl() { + reader_->Start(); + ReadTillBufferFullAsync(); +} +void BufferedReader::ReadNextImpl(std::vector *out) { + if (position_.empty()) { + out->clear(); + return; + } + size_t i = position_.front().get(); + position_.pop(); + + if (i == -1UL) { + ReadNextImpl(out); + return; + } + + *out = platform::is_gpu_place(place_) ? gpu_buffer_[i] : cpu_buffer_[i]; + + // Do not push current position into ReadAsync. Push the previous position + // Since all computation in fluid are async, change the data of + // current position may cause data error. + if (prev_pos_ != -1Ul) { + ReadAsync(prev_pos_); + } + prev_pos_ = i; +} + +} // namespace reader +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/operators/reader/buffered_reader.h b/paddle/fluid/operators/reader/buffered_reader.h new file mode 100644 index 0000000000000000000000000000000000000000..cbe2bc1b5fdd69d1a843b768e3289acd621369a6 --- /dev/null +++ b/paddle/fluid/operators/reader/buffered_reader.h @@ -0,0 +1,66 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include "ThreadPool.h" +#include "paddle/fluid/framework/reader.h" + +namespace paddle { +namespace operators { +namespace reader { + +class BufferedReader : public framework::DecoratedReader { + using TensorVec = std::vector; + using VecFuture = std::future; + + public: + BufferedReader(const std::shared_ptr& reader, + const platform::Place& place, size_t buffer_size); + + ~BufferedReader() override; + + private: + void ReadTillBufferFullAsync(); + + void ReadAsync(size_t i); + + protected: + void ShutdownImpl() override; + void StartImpl() override; + void ReadNextImpl(std::vector* out) override; + + private: + ThreadPool thread_pool_; + platform::Place place_; + const size_t buffer_size_; + + std::queue> position_; + + // The buffer for reading data. + // NOTE: the simplest way to implement buffered reader is do not use any + // buffer, just read async and create futures as buffer size. However, to + // malloc tensors every time is extremely slow. Here we store all data in + // buffers and prevent alloc every time. + std::vector cpu_buffer_; + std::vector gpu_buffer_; + size_t prev_pos_{-1UL}; +}; + +} // namespace reader +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index 7b14370f4fd64e8fd5b8d9038006494b88d671dc..ed719f91d0980480aa62a5cd3c1f819e6c0e7475 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -12,83 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include // NOLINT - -#include "paddle/fluid/operators/reader/blocking_queue.h" +#include "paddle/fluid/operators/reader/buffered_reader.h" #include "paddle/fluid/operators/reader/reader_op_registry.h" namespace paddle { namespace operators { namespace reader { - -// 'Double buffer' means we shall maintain two batches of input data at the same -// time. So the kCacheSize shoul be at least 2. -static constexpr size_t kCacheSize = 3; -// There will be two bacthes out of the channel during training: -// 1. the one waiting to be sent to the channel -// 2. the one just be received from the channel, which is also being used by -// subsequent operators. -// So the channel size should be kChacheSize - 2 -static constexpr size_t kChannelSize = 1; // kCacheSize - 2 - -class DoubleBufferReader : public framework::DecoratedReader { - public: - explicit DoubleBufferReader( - const std::shared_ptr& reader, - platform::Place target_place = platform::CPUPlace()) - : DecoratedReader(reader), place_(target_place) { - cpu_tensor_cache_.resize(kCacheSize); - gpu_tensor_cache_.resize(kCacheSize); -#ifdef PADDLE_WITH_CUDA - if (platform::is_gpu_place(place_)) { - for (size_t i = 0; i < kCacheSize; ++i) { - ctxs_.emplace_back(new platform::CUDADeviceContext( - boost::get(place_))); - } - } -#endif - StartPrefetcher(); - } - - void ReadNextImpl(std::vector* out) override; - - ~DoubleBufferReader() { EndPrefetcher(); } - - private: - void ShutdownImpl() override { - EndPrefetcher(); - reader_->Shutdown(); - } - - void StartImpl() override { - reader_->Start(); - StartPrefetcher(); - } - - void StartPrefetcher() { - channel_ = new reader::BlockingQueue(kChannelSize); - prefetcher_ = std::thread([this] { PrefetchThreadFunc(); }); - } - - void EndPrefetcher() { - channel_->Close(); - if (prefetcher_.joinable()) { - prefetcher_.join(); - } - delete channel_; - channel_ = nullptr; - } - - void PrefetchThreadFunc(); - - std::thread prefetcher_; - reader::BlockingQueue* channel_; - platform::Place place_; - std::vector> cpu_tensor_cache_; - std::vector> gpu_tensor_cache_; - std::vector> ctxs_; -}; - class CreateDoubleBufferReaderOp : public framework::OperatorBase { public: using framework::OperatorBase::OperatorBase; @@ -118,8 +47,8 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase { place = platform::CUDAPlace(static_cast(num)); } - out->Reset(framework::MakeDecoratedReader( - underlying_reader, place)); + out->Reset(framework::MakeDecoratedReader(underlying_reader, + place, 2)); } }; @@ -146,51 +75,6 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase { } }; -void DoubleBufferReader::ReadNextImpl(std::vector* out) { - size_t cached_tensor_id; - if (channel_->Receive(&cached_tensor_id)) { - if (platform::is_gpu_place(place_)) { - *out = gpu_tensor_cache_[cached_tensor_id]; - } else { - // CPU place - *out = cpu_tensor_cache_[cached_tensor_id]; - } - } else { - out->clear(); - } -} - -void DoubleBufferReader::PrefetchThreadFunc() { - VLOG(5) << "A new prefetch thread starts."; - size_t cached_tensor_id = 0; - while (true) { - auto& cpu_batch = cpu_tensor_cache_[cached_tensor_id]; - reader_->ReadNext(&cpu_batch); - if (cpu_batch.empty()) { - // The underlying reader have no next data. - break; - } - if (platform::is_gpu_place(place_)) { - auto& gpu_batch = gpu_tensor_cache_[cached_tensor_id]; - gpu_batch.resize(cpu_batch.size()); - for (size_t i = 0; i < cpu_batch.size(); ++i) { - // TODO(fengjiayi): Use asynchronous TensorCopy instead - framework::TensorCopySync(cpu_batch[i], place_, &gpu_batch[i]); - gpu_batch[i].set_lod(cpu_batch[i].lod()); - } - } - if (!channel_->Send(cached_tensor_id)) { - VLOG(5) << "WARNING: The double buffer channel has been closed. The " - "prefetch thread will terminate."; - break; - } - ++cached_tensor_id; - cached_tensor_id %= kCacheSize; - } - channel_->Close(); - VLOG(5) << "Prefetch thread terminates."; -} - } // namespace reader } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/reader/create_py_reader_op.cc b/paddle/fluid/operators/reader/create_py_reader_op.cc index 833776f56eef0ffb2ae5e963919f0482bcd511b8..0f31ca1a94326956ae5e6dffd582daedeb55a9e3 100644 --- a/paddle/fluid/operators/reader/create_py_reader_op.cc +++ b/paddle/fluid/operators/reader/create_py_reader_op.cc @@ -33,6 +33,8 @@ class PyReader : public framework::FileReader { if (!success) out->clear(); } + ~PyReader() { queue_->Close(); } + void Shutdown() override { queue_->Close(); } void Start() override { queue_->ReOpen(); } diff --git a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc index b32f09b22524c8b67ce57cc6022ef46efc2e828d..a08a9dbd0da46e73082cdd24c019e8d210d8bcc4 100644 --- a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc +++ b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc @@ -33,11 +33,14 @@ class RecordIOFileReader : public framework::FileReader { protected: void ReadNextImpl(std::vector* out) override { + std::unique_ptr> guard; if (ThreadSafe) { - std::lock_guard guard(*mutex_); - *out = framework::ReadFromRecordIO(&scanner_, dev_ctx_); - } else { - *out = framework::ReadFromRecordIO(&scanner_, dev_ctx_); + guard.reset(new std::lock_guard(*mutex_)); + } + + bool ok = framework::ReadFromRecordIO(&scanner_, dev_ctx_, out); + if (!ok) { + out->clear(); } } diff --git a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc index 4b308abc290c10a8a5846672e719b503dfc79b21..3f72890a7cee1453585d50afa04fa62a9b059dc3 100644 --- a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc +++ b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc @@ -48,9 +48,9 @@ class ShuffleReader : public framework::DecoratedReader { private: void ShutdownImpl() override { + reader_->Shutdown(); buffer_.clear(); iteration_pos_ = 0; - reader_->Shutdown(); } void StartImpl() override { diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index 9a8d203672fa2d560440d063d93fa5f8523690ef..38223e069975a08791d58d6ae10e2112b79a61fe 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -12,150 +12,200 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include #include // NOLINT - +#include "ThreadPool.h" +#include "paddle/fluid/framework/blocking_queue.h" #include "paddle/fluid/operators/reader/blocking_queue.h" +#include "paddle/fluid/operators/reader/buffered_reader.h" #include "paddle/fluid/operators/reader/reader_op_registry.h" namespace paddle { namespace operators { namespace reader { -class MultiFileReader : public framework::ReaderBase { +class IReaderContainer { public: - MultiFileReader(const std::vector& file_names, size_t thread_num, - size_t buffer_size) - : buffer_size_(buffer_size) { - readers_.reserve(file_names.size()); - for (const std::string& f_name : file_names) { - readers_.emplace_back(CreateReaderByFileName(f_name)); + virtual ~IReaderContainer() {} + virtual void AppendReader( + std::unique_ptr&& readers) = 0; + virtual void Stop() = 0; + virtual void Start() = 0; + virtual void ReadNext(std::vector* out) = 0; +}; + +class OrderedReaderContainer : public IReaderContainer { + public: + void AppendReader(std::unique_ptr&& reader) override { + pending_.emplace(std::move(reader)); + } + + void Stop() override { + while (!pending_.empty()) { + MoveFrontPendingToDone(); } - prefetchers_.resize(thread_num); - StartNewScheduler(); } - void ReadNextImpl(std::vector* out) override; + void Start() override { std::swap(done_, pending_); } - ~MultiFileReader() { EndScheduler(); } + void ReadNext(std::vector* out) override { + if (!pending_.empty()) { + pending_.front()->ReadNext(out); + if (out->empty()) { + MoveFrontPendingToDone(); + ReadNext(out); + } + } else { + out->clear(); + } + } private: - void ShutdownImpl() override { EndScheduler(); } - - void StartImpl() override { StartNewScheduler(); } - - void StartNewScheduler(); - void EndScheduler(); - void ScheduleThreadFunc(); - void PrefetchThreadFunc(size_t reader_idx, size_t thread_idx); - - std::vector> readers_; - std::thread scheduler_; - std::vector prefetchers_; - size_t buffer_size_; - reader::BlockingQueue* waiting_reader_idx_; - reader::BlockingQueue* available_thread_idx_; - reader::BlockingQueue>* buffer_; + void MoveFrontPendingToDone() { + pending_.front()->Shutdown(); + pending_.front()->Start(); + done_.emplace(move(pending_.front())); + pending_.pop(); + } + + std::queue> pending_; + std::queue> done_; }; -void MultiFileReader::ReadNextImpl(std::vector* out) { - if (!buffer_->Receive(out)) { - out->clear(); - } -} +class PreemptiveReaderContainer : public IReaderContainer { + using ReaderList = std::list>; -void MultiFileReader::StartNewScheduler() { - size_t thread_num = prefetchers_.size(); - waiting_reader_idx_ = new reader::BlockingQueue(readers_.size()); - available_thread_idx_ = new reader::BlockingQueue(thread_num); - buffer_ = new reader::BlockingQueue>( - buffer_size_); + struct FutureItem { + std::vector data_; + ReaderList::iterator reader_it_; + std::exception_ptr exception_; + }; - for (size_t i = 0; i < readers_.size(); ++i) { - waiting_reader_idx_->Send(i); - } - waiting_reader_idx_->Close(); - for (size_t i = 0; i < thread_num; ++i) { - available_thread_idx_->Send(i); - } + using FutureList = std::list>; - scheduler_ = std::thread([this] { ScheduleThreadFunc(); }); -} + public: + explicit PreemptiveReaderContainer(size_t thread_num) : pool_(thread_num) {} -void MultiFileReader::EndScheduler() { - available_thread_idx_->Close(); - buffer_->Close(); - waiting_reader_idx_->Close(); - if (scheduler_.joinable()) { - scheduler_.join(); - } - delete buffer_; - delete available_thread_idx_; - delete waiting_reader_idx_; -} - -void MultiFileReader::ScheduleThreadFunc() { - VLOG(5) << "MultiFileReader schedule thread starts."; - size_t completed_thread_num = 0; - size_t thread_idx; - while (available_thread_idx_->Receive(&thread_idx)) { - std::thread& prefetcher = prefetchers_[thread_idx]; - if (prefetcher.joinable()) { - prefetcher.join(); - } - size_t reader_idx; - if (waiting_reader_idx_->Receive(&reader_idx)) { - // Still have files to read. Start a new prefetch thread. - prefetcher = std::thread([this, reader_idx, thread_idx] { - PrefetchThreadFunc(reader_idx, thread_idx); - }); - } else { - // No more file to read. - ++completed_thread_num; - if (completed_thread_num == prefetchers_.size()) { - buffer_->Close(); - break; + void Stop() override { + if (!pending_.empty()) { + for (auto& reader : pending_) { + reader->Shutdown(); + } + for (auto& fu : futures_) { + fu.wait(); } + futures_.clear(); + for (auto& reader : pending_) { + reader->Start(); + done_.emplace_back(std::move(reader)); + } + pending_.clear(); + bool timeout; + complete_queue_.PopAll(1000, &timeout); + PADDLE_ENFORCE(!timeout); } } - // If users invoke Shutdown() when scheduler is running, it will close the - // 'avaiable_thread_idx_' and prefecther threads have no way to tell scheduler - // to release their resource. So a check is needed before scheduler ends. - for (auto& p : prefetchers_) { - if (p.joinable()) { - p.join(); + + void Start() override { + for (auto& reader : done_) { + AppendReader(std::move(reader)); } + done_.clear(); } - VLOG(5) << "MultiFileReader schedule thread terminates."; -} - -void MultiFileReader::PrefetchThreadFunc(size_t reader_idx, size_t thread_idx) { - VLOG(5) << "The prefetch thread of file idx '" << reader_idx << "' starts."; - std::unique_ptr& reader = readers_[reader_idx]; - while (true) { - std::vector ins; - reader->ReadNext(&ins); - if (ins.empty()) { - reader->Shutdown(); - reader->Start(); - break; + + void ReadNext(std::vector* out) override { + if (!pending_.empty()) { + auto future_it = complete_queue_.Pop(); + FutureItem item = future_it->get(); + if (item.exception_) { + for (auto it = futures_.begin(); it != futures_.end(); ++it) { + if (it != future_it) { + it->wait(); // Wait all other threads complete. + } + } + std::rethrow_exception(item.exception_); + + } else if (item.data_.empty()) { // reader done. + done_.emplace_back(std::move(*item.reader_it_)); + pending_.erase(item.reader_it_); + futures_.erase(future_it); + ReadNext(out); + } else { + *out = item.data_; + // continue read async + ReadAsync(item.reader_it_, &future_it); + } + } else { + out->clear(); } - try { - buffer_->Send(std::move(ins)); - } catch (paddle::platform::EnforceNotMet e) { - VLOG(5) << "WARNING: The buffer channel has been closed. The prefetch " - "thread of file idx '" - << reader_idx << "' will terminate."; - break; + } + + private: + void AppendReader(std::unique_ptr&& reader) override { + pending_.emplace_back(std::move(reader)); + auto reader_it = pending_.end(); + --reader_it; + + futures_.emplace_back(); + auto future_it = futures_.end(); + --future_it; + + ReadAsync(reader_it, &future_it); + } + + void ReadAsync(const ReaderList::iterator& reader_it, + FutureList::iterator* future_it_ptr) { + auto& future_it = *future_it_ptr; + *future_it = pool_.enqueue([reader_it, future_it, this] { + try { + FutureItem item; + item.reader_it_ = reader_it; + (*reader_it)->ReadNext(&item.data_); + if (item.data_.empty()) { + (*reader_it)->Shutdown(); + (*reader_it)->Start(); + } + complete_queue_.Push(future_it); + return item; + } catch (...) { + FutureItem item; + item.exception_ = std::current_exception(); + complete_queue_.Push(future_it); + return item; + } + }); + } + + FutureList futures_; + ThreadPool pool_; + framework::BlockingQueue complete_queue_; + std::list> pending_; + std::list> done_; +}; + +class MultiFileReader : public framework::ReaderBase { + public: + MultiFileReader(const std::vector& file_names, + std::unique_ptr&& container) + : container_(std::move(container)) { + for (auto& fn : file_names) { + container_->AppendReader(CreateReaderByFileName(fn)); } } - if (!available_thread_idx_->Send(thread_idx)) { - VLOG(5) << "WARNING: The available_thread_idx_ channel has been closed. " - "Fail to send thread_idx."; + ~MultiFileReader() { container_->Stop(); } + + protected: + void ReadNextImpl(std::vector* out) override { + container_->ReadNext(out); } - VLOG(5) << "The prefetch thread of file idx '" << reader_idx - << "' terminates."; -} + void ShutdownImpl() override { container_->Stop(); } + void StartImpl() override { container_->Start(); } + + private: + std::unique_ptr container_; +}; class OpenFilesOp : public framework::OperatorBase { public: @@ -173,13 +223,27 @@ class OpenFilesOp : public framework::OperatorBase { "shape concat's length."); const auto& file_names = Attr>("file_names"); PADDLE_ENFORCE(!file_names.empty(), "No file to be read!"); - const size_t thread_num = Attr("thread_num"); - const size_t buffer_size = Attr("buffer_size"); + bool is_test = Attr("is_test"); auto* out = scope.FindVar(Output("Out")) ->template GetMutable(); - out->Reset( - std::make_shared(file_names, thread_num, buffer_size)); + std::unique_ptr container; + + if (is_test) { + container.reset(new OrderedReaderContainer()); + } else { + container.reset(new PreemptiveReaderContainer( + static_cast(Attr("thread_num")))); + } + + std::shared_ptr reader( + new MultiFileReader(file_names, std::move(container))); + auto buffer_size = Attr("buffer_size"); + if (buffer_size > 1) { + reader = framework::MakeDecoratedReader( + reader, platform::CPUPlace(), buffer_size); + } + out->Reset(reader); } }; @@ -187,9 +251,7 @@ class OpenFilesOpMaker : public FileReaderMakerBase { protected: void Apply() override { AddAttr>("file_names", "Files to be read."); - AddAttr("thread_num", "The maximal concurrent prefetch thread number.") - .GreaterThan(0); - AddAttr("buffer_size", "The size of prefetch buffer.").GreaterThan(0); + AddAttr("is_test", "Used for testing data.").SetDefault(false); AddComment(R"DOC( OpenFiles Operator @@ -197,6 +259,11 @@ class OpenFilesOpMaker : public FileReaderMakerBase { An OpenFilesOp creates a MultiFileReader, which is able to read data multi-threaded from multiple files. )DOC"); + AddAttr("thread_num", + "The maximal concurrent prefetch thread number. Used only " + "when is_test = False"); + AddAttr("buffer_size", "The reading buffer of these files.") + .GreaterThan(0); } }; diff --git a/paddle/fluid/recordio/scanner.cc b/paddle/fluid/recordio/scanner.cc index 06a13e6c5b6ea76456e231e3f7b1eb33492b16ea..a0a2f984228db0e7a015630655a3176aa4d1a5a4 100644 --- a/paddle/fluid/recordio/scanner.cc +++ b/paddle/fluid/recordio/scanner.cc @@ -28,6 +28,7 @@ Scanner::Scanner(std::unique_ptr &&stream) Scanner::Scanner(const std::string &filename) : stream_(new std::ifstream(filename)), parser_(*stream_) { + PADDLE_ENFORCE(static_cast(*stream_), "Cannot open file %s", filename); Reset(); } diff --git a/paddle/scripts/paddle_build.sh b/paddle/scripts/paddle_build.sh index 9837117fbbc0b4218dc89fc5de0c176abcd32b83..65e7b10625583327c2919efd45ef1fe01fa9eb3c 100755 --- a/paddle/scripts/paddle_build.sh +++ b/paddle/scripts/paddle_build.sh @@ -600,11 +600,11 @@ function main() { cicheck) cmake_gen ${PYTHON_ABI:-""} build - assert_api_not_changed run_test gen_capi_package gen_fluid_inference_lib test_fluid_inference_lib + assert_api_not_changed ;; *) print_usage diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 0665c09bfb52c932219be68ca801cfa951d672d3..07a7ef15ac5cfd39b4cdb30bcff95fc499ae50ae 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -12,14 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. import contextlib +import multiprocessing +import threading -from .. import core -from ..framework import convert_np_dtype_to_dtype_, default_main_program, default_startup_program, Program -from ..unique_name import generate as unique_name +from ..data_feeder import DataFeeder from control_flow import BlockGuard -from ..layer_helper import LayerHelper +from layer_function_generator import templatedoc +from .. import core from ..executor import global_scope -from layer_function_generator import generate_layer_fn, templatedoc +from ..framework import convert_np_dtype_to_dtype_, default_main_program, \ + default_startup_program, program_guard, Program +from ..layer_helper import LayerHelper +from ..unique_name import generate as unique_name __all__ = [ 'data', 'open_recordio_file', 'open_files', 'read_file', 'shuffle', 'batch', @@ -445,7 +449,12 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True): return monkey_patch_reader_methods(main_prog_var) -def py_reader(capacity, shapes, dtypes, lod_levels=None): +def py_reader(capacity, + shapes, + dtypes, + lod_levels=None, + name=None, + use_double_buffer=True): """ Create a reader and blocking queue for data feeding in Python @@ -458,10 +467,13 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None): using `close()` method when unused. Args: + use_double_buffer(bool): Whether use double buffer or not. capacity(int): The maximum capacity of the BlockingQueue. - shapes(list): List of tuples which declaring data shapes. - dtypes(list): List of strs which declaring data type. - lod_levels(list): List of ints which declaring data lod_level. + shapes(list|tuple): List of tuples which declaring data shapes. + dtypes(list|tuple): List of strs which declaring data type. + lod_levels(list|tuple): List of ints which declaring data lod_level. + name(basestring): The prefix Python queue name and Reader name. None will + be generated automatically. Returns: tuple(Variable, BlockingQueue): @@ -502,15 +514,23 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None): if lod_levels is None: lod_levels = [0] * len(shapes) - queue_name = unique_name('lod_tensor_blocking_queue') + if name is None: + queue_name = unique_name('lod_tensor_blocking_queue') + reader_name = unique_name('create_py_reader') + double_buffer_name = unique_name('double_buffer') + else: + queue_name = "_".join([name, "queue"]) + reader_name = "_".join([name, "reader"]) + double_buffer_name = "_".join([name, "double_buffer"]) + var = global_scope().var(queue_name) feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes) startup_blk = default_startup_program().current_block() - startup_var = startup_blk.create_var(name=unique_name('create_py_reader')) + startup_var = startup_blk.create_var(name=reader_name) startup_blk.append_op( type='create_py_reader', - inputs={'blocking_queue': queue_name}, + inputs={'blocking_queue': [queue_name]}, outputs={'Out': [startup_var]}, attrs={ 'shape_concat': shape_concat, @@ -524,17 +544,96 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None): main_prog_var = _copy_reader_var_(default_main_program().current_block(), startup_var) - return monkey_patch_reader_methods(main_prog_var), feed_queue + reader = monkey_patch_reader_methods(main_prog_var) + if use_double_buffer: + double_buffer_reader = double_buffer(reader, name=double_buffer_name) + # we return a double buffer reader. However, the reset method comes from + # py_reader. + double_buffer_reader.reset = reader.reset + reader = double_buffer_reader + + # monkey patch py_reader special methods + reader.queue = feed_queue + current_reset_method = reader.reset + reader.thread = None + reader.tensor_provider = None + reader.exited = False + + def start_provide_thread(func): + def __provider_thread__(): + for tensors in func(): + array = core.LoDTensorArray() + for item in tensors: + if not isinstance(item, core.LoDTensor): + tmp = core.LoDTensor() + tmp.set(item, core.CPUPlace()) + item = tmp + + array.append(item) + + if reader.exited: + break + feed_queue.push(array) + if reader.exited: + break + feed_queue.close() + + reader.thread = threading.Thread(target=__provider_thread__) + reader.thread.start() + + def __set_tensor_provider__(func): + reader.tensor_provider = func + + def __set_paddle_reader__(paddle_reader): + with program_guard(Program(), Program()): + feed_list = [] + counter = 0 + for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels): + name = str(counter) + feed_list.append( + data( + name=name, + dtype=dtype, + shape=shape, + lod_level=lod_level)) + counter += 1 + + feeder = DataFeeder(feed_list=feed_list, place=core.CPUPlace()) + paddle_reader = feeder.decorate_reader( + paddle_reader, multi_devices=False) + + def __tensor_provider__(): + for slots in paddle_reader(): + yield [slots[str(idx)] for idx in xrange(counter)] + + __set_tensor_provider__(__tensor_provider__) + + def __reset__(): + current_reset_method() + if reader.thread is not None and reader.tensor_provider is not None: + reader.exited = True + reader.thread.join() + reader.exited = False + + def __start__(): + start_provide_thread(reader.tensor_provider) + + reader.reset = __reset__ + reader.decorate_tensor_provider = __set_tensor_provider__ + reader.decorate_paddle_reader = __set_paddle_reader__ + reader.start = __start__ + + return reader def open_files(filenames, shapes, lod_levels, dtypes, - thread_num=1, + thread_num=None, buffer_size=None, pass_num=1, - for_parallel=True): + is_test=None): """ Open files @@ -547,14 +646,14 @@ def open_files(filenames, shapes(list): List of tuples which declaring data shapes. lod_levels(list): List of ints which declaring data lod_level. dtypes(list): List of strs which declaring data type. - thread_num(int): The maximal concurrent prefetch thread number. - buffer_size(int|None): The size of prefetch buffer. If it is setted None, - buffer size will be thread_num * 3. - Default: None + thread_num(None): The number of thread to read files. + Default: min(len(filenames), cpu_number). + buffer_size(None): The buffer size of reader. Default: 3 * thread_num pass_num(int): Number of passes to run. - for_parallel(Bool): Set it as True if you are going to run - subsequent operators in parallel. - Default: True + is_test(bool|None): Whether `open_files` used for testing or not. If it + is used for testing, the order of data generated is same as the file + order. Otherwise, it is not guaranteed the order of data is same + between every epoch. [Default: False]. Returns: Variable: A Reader Variable via which we can get file data. @@ -566,15 +665,21 @@ def open_files(filenames, './data2.recordio'], shapes=[(3,224,224), (1)], lod_levels=[0, 0], - dtypes=['float32', 'int64'], - thread_num=2, - buffer_size=2) + dtypes=['float32', 'int64']) # Via the reader, we can use 'read_file' layer to get data: image, label = fluid.layers.io.read_file(reader) """ + if thread_num is None: + thread_num = min(len(filenames), multiprocessing.cpu_count()) + else: + thread_num = int(thread_num) + if buffer_size is None: - buffer_size = thread_num * 3 + buffer_size = 3 * thread_num + else: + buffer_size = int(buffer_size) + if isinstance(filenames, basestring): filenames = [filenames] dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] @@ -588,17 +693,18 @@ def open_files(filenames, multi_file_reader_name = unique_name('multi_file_reader') startup_blk = default_startup_program().current_block() startup_reader = startup_blk.create_var(name=multi_file_reader_name) + attrs = { + 'shape_concat': shape_concat, + 'lod_levels': lod_levels, + 'ranks': ranks, + 'file_names': filenames, + 'thread_num': thread_num, + 'buffer_size': buffer_size + } + if is_test is not None: + attrs['is_test'] = is_test startup_blk.append_op( - type='open_files', - outputs={'Out': [startup_reader]}, - attrs={ - 'shape_concat': shape_concat, - 'lod_levels': lod_levels, - 'ranks': ranks, - 'file_names': filenames, - 'thread_num': thread_num, - 'buffer_size': buffer_size - }) + type='open_files', outputs={'Out': [startup_reader]}, attrs=attrs) startup_reader.desc.set_dtypes(dtypes) startup_reader.persistable = True diff --git a/python/paddle/fluid/tests/demo/pyreader.py b/python/paddle/fluid/tests/demo/pyreader.py new file mode 100644 index 0000000000000000000000000000000000000000..3a7dbf8106268582c9c771c7ab9e2a7be7a2f313 --- /dev/null +++ b/python/paddle/fluid/tests/demo/pyreader.py @@ -0,0 +1,95 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy + +import paddle +import paddle.dataset.mnist as mnist +import paddle.fluid as fluid +import paddle.v2 + + +def network(is_train): + reader = fluid.layers.py_reader( + capacity=10, + shapes=((-1, 784), (-1, 1)), + dtypes=('float32', 'int64'), + name="train_reader" if is_train else "test_reader") + img, label = fluid.layers.read_file(reader) + + hidden = img + + for i in xrange(2): + hidden = fluid.layers.fc(input=hidden, size=100, act='tanh') + hidden = fluid.layers.dropout( + hidden, dropout_prob=0.5, is_test=not is_train) + + prediction = fluid.layers.fc(input=hidden, size=10, act='softmax') + loss = fluid.layers.cross_entropy(input=prediction, label=label) + return fluid.layers.mean(loss), reader + + +def main(): + train_prog = fluid.Program() + startup_prog = fluid.Program() + + with fluid.program_guard(train_prog, startup_prog): + with fluid.unique_name.guard(): + loss, train_reader = network(True) + adam = fluid.optimizer.Adam(learning_rate=0.01) + adam.minimize(loss) + + test_prog = fluid.Program() + test_startup = fluid.Program() + with fluid.program_guard(test_prog, test_startup): + with fluid.unique_name.guard(): + test_loss, test_reader = network(False) + + fluid.Executor(fluid.CUDAPlace(0)).run(startup_prog) + fluid.Executor(fluid.CUDAPlace(0)).run(test_startup) + + trainer = fluid.ParallelExecutor( + use_cuda=True, loss_name=loss.name, main_program=train_prog) + + tester = fluid.ParallelExecutor( + use_cuda=True, share_vars_from=trainer, main_program=test_prog) + + train_reader.decorate_paddle_reader( + paddle.v2.reader.shuffle( + paddle.batch(mnist.train(), 512), buf_size=8192)) + + test_reader.decorate_paddle_reader(paddle.batch(mnist.test(), 512)) + + for epoch_id in xrange(10): + train_reader.start() + try: + while True: + print 'train_loss', numpy.array( + trainer.run(fetch_list=[loss.name])) + except fluid.core.EOFException: + print 'End of epoch', epoch_id + train_reader.reset() + + test_reader.start() + try: + while True: + print 'test loss', numpy.array( + tester.run(fetch_list=[test_loss.name])) + except fluid.core.EOFException: + print 'End of testing' + test_reader.reset() + + +if __name__ == '__main__': + main() diff --git a/python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py b/python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py index 9425d472a48056e71da5da364f659971ef6c2520..8244617711138d590193b2898de5d2f3aeb1e11e 100644 --- a/python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py +++ b/python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py @@ -31,7 +31,10 @@ def load_vocab(filename): # load word dict with paddle inner function -word_dict = load_vocab(sys.argv[1]) +if len(sys.argv) == 1: + word_dict = paddle.dataset.imdb.word_dict() +else: + word_dict = load_vocab(sys.argv[1]) word_dict[""] = len(word_dict) print "Dict dim = ", len(word_dict) diff --git a/python/paddle/fluid/tests/demo/text_classification/train.py b/python/paddle/fluid/tests/demo/text_classification/train.py index e408684c6e0941a1b317ffeac66f071c1382836d..281c2869d642c7fe41a386c42208ca2da1dc2891 100644 --- a/python/paddle/fluid/tests/demo/text_classification/train.py +++ b/python/paddle/fluid/tests/demo/text_classification/train.py @@ -41,16 +41,14 @@ def network_cfg(is_train, pass_num=100): pass_num=pass_num, shapes=[[-1, 1], [-1, 1]], lod_levels=[1, 0], - dtypes=['int64', 'int64'], - thread_num=1) + dtypes=['int64', 'int64']) test_file_obj = fluid.layers.open_files( filenames=TEST_FILES, pass_num=1, shapes=[[-1, 1], [-1, 1]], lod_levels=[1, 0], - dtypes=['int64', 'int64'], - thread_num=1) + dtypes=['int64', 'int64']) if is_train: file_obj = fluid.layers.shuffle(train_file_obj, buffer_size=1000) diff --git a/python/paddle/fluid/tests/unittests/test_data_balance.py b/python/paddle/fluid/tests/unittests/test_data_balance.py index 6d810920d55ccf069ff408c553069e8f5e590271..aa09b0ea445adccae3f741b53850f8182f3270cc 100644 --- a/python/paddle/fluid/tests/unittests/test_data_balance.py +++ b/python/paddle/fluid/tests/unittests/test_data_balance.py @@ -142,8 +142,7 @@ class TestDataBalance(unittest.TestCase): filenames=[self.lod_data_file_name], shapes=[[-1, 3], [-1, 1]], lod_levels=[1, 0], - dtypes=['float32', 'int32'], - thread_num=1) + dtypes=['float32', 'int32']) ins, label = fluid.layers.read_file(data_reader) place = fluid.CUDAPlace(0) if self.use_cuda else fluid.CPUPlace() @@ -156,7 +155,7 @@ class TestDataBalance(unittest.TestCase): main_program=main_prog, build_strategy=build_strategy) - if (parallel_exe.device_count > self.batch_size): + if parallel_exe.device_count > self.batch_size: print("WARNING: Unittest TestDataBalance skipped. \ For the result is not correct when device count \ is larger than batch size.") @@ -190,3 +189,7 @@ class TestDataBalance(unittest.TestCase): def test_all(self): self.main() self.main_lod() + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_multi_file_reader.py b/python/paddle/fluid/tests/unittests/test_multi_file_reader.py index dbd510e64ffdd6f3b78b22bb0d37d9a7ba3fd9b5..cb0ea96ff69ce32b0bb1b49f0318c353aa08d388 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_file_reader.py +++ b/python/paddle/fluid/tests/unittests/test_multi_file_reader.py @@ -39,17 +39,17 @@ class TestMultipleReader(unittest.TestCase): copyfile('./mnist_0.recordio', './mnist_1.recordio') copyfile('./mnist_0.recordio', './mnist_2.recordio') - def main(self, thread_num): + def main(self, is_test=False): file_list = [ './mnist_0.recordio', './mnist_1.recordio', './mnist_2.recordio' ] with fluid.program_guard(fluid.Program(), fluid.Program()): data_files = fluid.layers.open_files( filenames=file_list, - thread_num=thread_num, shapes=[(-1, 784), (-1, 1)], lod_levels=[0, 0], - dtypes=['float32', 'int64']) + dtypes=['float32', 'int64'], + is_test=is_test) img, label = fluid.layers.read_file(data_files) if fluid.core.is_compiled_with_cuda(): @@ -71,6 +71,9 @@ class TestMultipleReader(unittest.TestCase): self.assertEqual(batch_count, self.num_batch * 3) def test_main(self): - self.main(thread_num=3) # thread number equals to file number - self.main(thread_num=10) # thread number is larger than file number - self.main(thread_num=2) # thread number is less than file number + self.main(is_test=False) + self.main(is_test=True) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py index d999ca8d3ca0e7c1a0fa0482ff7ce816cd8b5da7..b21e16439a5070e5f6d763e1617d4cfffe8bd618 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py @@ -33,9 +33,7 @@ def simple_fc_net(use_feed): filenames=[MNIST_RECORDIO_FILE], shapes=[[-1, 784], [-1, 1]], lod_levels=[0, 0], - dtypes=['float32', 'int64'], - thread_num=1, - for_parallel=True) + dtypes=['float32', 'int64']) reader = fluid.layers.io.double_buffer(reader) img, label = fluid.layers.read_file(reader) hidden = img @@ -61,9 +59,7 @@ def fc_with_batchnorm(use_feed): filenames=[MNIST_RECORDIO_FILE], shapes=[[-1, 784], [-1, 1]], lod_levels=[0, 0], - dtypes=['float32', 'int64'], - thread_num=1, - for_parallel=True) + dtypes=['float32', 'int64']) reader = fluid.layers.io.double_buffer(reader) img, label = fluid.layers.read_file(reader) diff --git a/python/paddle/fluid/tests/unittests/test_py_reader_push_pop.py b/python/paddle/fluid/tests/unittests/test_py_reader_push_pop.py index 05715464848d835684dd3cf0e99e5d4dd482e0b6..91b1fd2af7d8aaf85d17965f8b02c35ee3990291 100644 --- a/python/paddle/fluid/tests/unittests/test_py_reader_push_pop.py +++ b/python/paddle/fluid/tests/unittests/test_py_reader_push_pop.py @@ -45,12 +45,12 @@ class TestPyReader(unittest.TestCase): ) else fluid.CPUPlace() executor = fluid.Executor(place) - data_file, feed_queue = fluid.layers.py_reader( + data_file = fluid.layers.py_reader( capacity=self.capacity, dtypes=self.dtypes, lod_levels=self.lod_levels, shapes=self.shapes) - + feed_queue = data_file.queue read_out_data = fluid.layers.read_file(data_file) self.inputs = [] diff --git a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py index 9a5b69eea46e74deeba87aefae4afac84c7745f1..9a379bdbaa7e278879117a8cdc2dddb335a10ca1 100644 --- a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py +++ b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py @@ -52,11 +52,13 @@ def simple_fc_net(in_size, batch_size, queue_capacity, use_double_buffer=False): - reader, feed_queue = fluid.layers.py_reader( + reader = fluid.layers.py_reader( capacity=queue_capacity, shapes=[[-1, in_size], [-1, 1]], lod_levels=[0, 0], - dtypes=['float32', 'int64']) + dtypes=['float32', 'int64'], + use_double_buffer=False) + feed_queue = reader.queue reader = fluid.layers.batch(reader, batch_size=batch_size) if use_double_buffer: reader = fluid.layers.double_buffer(reader)