From fecbe522008f8ae67b64ce724974c11771a865ef Mon Sep 17 00:00:00 2001 From: yuyang18 Date: Sat, 14 Jul 2018 11:19:42 +0800 Subject: [PATCH] Rewrite open_files --- .../fluid/operators/reader/open_files_op.cc | 39 ++++++++++---- paddle/fluid/recordio/scanner.cc | 1 + python/paddle/fluid/layers/io.py | 54 ++++++++++--------- .../convert_data_to_recordio.py | 5 +- .../tests/demo/text_classification/train.py | 6 +-- .../tests/unittests/test_multi_file_reader.py | 15 +++--- .../unittests/test_parallel_executor_mnist.py | 8 +-- 7 files changed, 76 insertions(+), 52 deletions(-) diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index 7fdfb3d029..daeacdb8b4 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include // NOLINT #include "ThreadPool.h" #include "paddle/fluid/framework/blocking_queue.h" @@ -77,6 +78,7 @@ class PreemptiveReaderContainer : public IReaderContainer { struct FutureItem { std::vector data_; ReaderList::iterator reader_it_; + std::exception_ptr exception_; }; using FutureList = std::list>; @@ -115,7 +117,15 @@ class PreemptiveReaderContainer : public IReaderContainer { if (!pending_.empty()) { auto future_it = complete_queue_.Pop(); FutureItem item = future_it->get(); - if (item.data_.empty()) { // reader done. + if (item.exception_) { + for (auto it = futures_.begin(); it != futures_.end(); ++it) { + if (it != future_it) { + it->wait(); // Wait all other threads complete. + } + } + std::rethrow_exception(item.exception_); + + } else if (item.data_.empty()) { // reader done. done_.emplace_back(std::move(*item.reader_it_)); pending_.erase(item.reader_it_); futures_.erase(future_it); @@ -131,8 +141,8 @@ class PreemptiveReaderContainer : public IReaderContainer { } private: - void AppendReader(std::unique_ptr&& readers) override { - pending_.emplace_back(); + void AppendReader(std::unique_ptr&& reader) override { + pending_.emplace_back(std::move(reader)); auto reader_it = pending_.end(); --reader_it; @@ -147,15 +157,22 @@ class PreemptiveReaderContainer : public IReaderContainer { FutureList::iterator* future_it_ptr) { auto& future_it = *future_it_ptr; *future_it = pool_.enqueue([reader_it, future_it, this] { - FutureItem item; - item.reader_it_ = reader_it; - (*reader_it)->ReadNext(&item.data_); - if (item.data_.empty()) { - (*reader_it)->Shutdown(); - (*reader_it)->Start(); + try { + FutureItem item; + item.reader_it_ = reader_it; + (*reader_it)->ReadNext(&item.data_); + if (item.data_.empty()) { + (*reader_it)->Shutdown(); + (*reader_it)->Start(); + } + complete_queue_.Push(future_it); + return item; + } catch (...) { + FutureItem item; + item.exception_ = std::current_exception(); + complete_queue_.Push(future_it); + return item; } - complete_queue_.Push(future_it); - return item; }); } diff --git a/paddle/fluid/recordio/scanner.cc b/paddle/fluid/recordio/scanner.cc index 06a13e6c5b..a0a2f98422 100644 --- a/paddle/fluid/recordio/scanner.cc +++ b/paddle/fluid/recordio/scanner.cc @@ -28,6 +28,7 @@ Scanner::Scanner(std::unique_ptr &&stream) Scanner::Scanner(const std::string &filename) : stream_(new std::ifstream(filename)), parser_(*stream_) { + PADDLE_ENFORCE(static_cast(*stream_), "Cannot open file %s", filename); Reset(); } diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 34cdac52d3..9133038de2 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -20,6 +20,7 @@ from control_flow import BlockGuard from ..layer_helper import LayerHelper from ..executor import global_scope from layer_function_generator import generate_layer_fn, templatedoc +import sys __all__ = [ 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'Recv', @@ -532,10 +533,10 @@ def open_files(filenames, shapes, lod_levels, dtypes, - thread_num=1, + thread_num=None, buffer_size=None, pass_num=1, - for_parallel=True): + is_test=None): """ Open files @@ -548,14 +549,15 @@ def open_files(filenames, shapes(list): List of tuples which declaring data shapes. lod_levels(list): List of ints which declaring data lod_level. dtypes(list): List of strs which declaring data type. - thread_num(int): The maximal concurrent prefetch thread number. - buffer_size(int|None): The size of prefetch buffer. If it is setted None, - buffer size will be thread_num * 3. - Default: None + thread_num(None): Deprecated argument. It will be set by open_files + automatically. + buffer_size(None): Deprecated argument. It will be set by open_files + automatically. pass_num(int): Number of passes to run. - for_parallel(Bool): Set it as True if you are going to run - subsequent operators in parallel. - Default: True + is_test(bool|None): Whether `open_files` used for testing or not. If it + is used for testing, the order of data generated is same as the file + order. Otherwise, it is not guaranteed the order of data is same + between every epoch. [Default: False]. Returns: Variable: A Reader Variable via which we can get file data. @@ -567,15 +569,20 @@ def open_files(filenames, './data2.recordio'], shapes=[(3,224,224), (1)], lod_levels=[0, 0], - dtypes=['float32', 'int64'], - thread_num=2, - buffer_size=2) + dtypes=['float32', 'int64']) # Via the reader, we can use 'read_file' layer to get data: image, label = fluid.layers.io.read_file(reader) """ - if buffer_size is None: - buffer_size = thread_num * 3 + if thread_num is not None: + print >> sys.stderr, "thread_num parameter of open_files is " \ + "deprecated. It will be ignored and set " \ + "automatically by open_files " + if buffer_size is not None: + print >> sys.stderr, "buffer_size parameter of open_files is " \ + "deprecated. It will be ignored and set " \ + "automatically by open_files " + if isinstance(filenames, basestring): filenames = [filenames] dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] @@ -589,17 +596,16 @@ def open_files(filenames, multi_file_reader_name = unique_name('multi_file_reader') startup_blk = default_startup_program().current_block() startup_reader = startup_blk.create_var(name=multi_file_reader_name) + attrs = { + 'shape_concat': shape_concat, + 'lod_levels': lod_levels, + 'ranks': ranks, + 'file_names': filenames + } + if is_test is not None: + attrs['is_test'] = is_test startup_blk.append_op( - type='open_files', - outputs={'Out': [startup_reader]}, - attrs={ - 'shape_concat': shape_concat, - 'lod_levels': lod_levels, - 'ranks': ranks, - 'file_names': filenames, - 'thread_num': thread_num, - 'buffer_size': buffer_size - }) + type='open_files', outputs={'Out': [startup_reader]}, attrs=attrs) startup_reader.desc.set_dtypes(dtypes) startup_reader.persistable = True diff --git a/python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py b/python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py index 9425d472a4..8244617711 100644 --- a/python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py +++ b/python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py @@ -31,7 +31,10 @@ def load_vocab(filename): # load word dict with paddle inner function -word_dict = load_vocab(sys.argv[1]) +if len(sys.argv) == 1: + word_dict = paddle.dataset.imdb.word_dict() +else: + word_dict = load_vocab(sys.argv[1]) word_dict[""] = len(word_dict) print "Dict dim = ", len(word_dict) diff --git a/python/paddle/fluid/tests/demo/text_classification/train.py b/python/paddle/fluid/tests/demo/text_classification/train.py index e408684c6e..281c2869d6 100644 --- a/python/paddle/fluid/tests/demo/text_classification/train.py +++ b/python/paddle/fluid/tests/demo/text_classification/train.py @@ -41,16 +41,14 @@ def network_cfg(is_train, pass_num=100): pass_num=pass_num, shapes=[[-1, 1], [-1, 1]], lod_levels=[1, 0], - dtypes=['int64', 'int64'], - thread_num=1) + dtypes=['int64', 'int64']) test_file_obj = fluid.layers.open_files( filenames=TEST_FILES, pass_num=1, shapes=[[-1, 1], [-1, 1]], lod_levels=[1, 0], - dtypes=['int64', 'int64'], - thread_num=1) + dtypes=['int64', 'int64']) if is_train: file_obj = fluid.layers.shuffle(train_file_obj, buffer_size=1000) diff --git a/python/paddle/fluid/tests/unittests/test_multi_file_reader.py b/python/paddle/fluid/tests/unittests/test_multi_file_reader.py index dbd510e64f..cb0ea96ff6 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_file_reader.py +++ b/python/paddle/fluid/tests/unittests/test_multi_file_reader.py @@ -39,17 +39,17 @@ class TestMultipleReader(unittest.TestCase): copyfile('./mnist_0.recordio', './mnist_1.recordio') copyfile('./mnist_0.recordio', './mnist_2.recordio') - def main(self, thread_num): + def main(self, is_test=False): file_list = [ './mnist_0.recordio', './mnist_1.recordio', './mnist_2.recordio' ] with fluid.program_guard(fluid.Program(), fluid.Program()): data_files = fluid.layers.open_files( filenames=file_list, - thread_num=thread_num, shapes=[(-1, 784), (-1, 1)], lod_levels=[0, 0], - dtypes=['float32', 'int64']) + dtypes=['float32', 'int64'], + is_test=is_test) img, label = fluid.layers.read_file(data_files) if fluid.core.is_compiled_with_cuda(): @@ -71,6 +71,9 @@ class TestMultipleReader(unittest.TestCase): self.assertEqual(batch_count, self.num_batch * 3) def test_main(self): - self.main(thread_num=3) # thread number equals to file number - self.main(thread_num=10) # thread number is larger than file number - self.main(thread_num=2) # thread number is less than file number + self.main(is_test=False) + self.main(is_test=True) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py index a801d99aa1..3ab477a3e0 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py @@ -32,9 +32,7 @@ def simple_fc_net(use_feed): filenames=[MNIST_RECORDIO_FILE], shapes=[[-1, 784], [-1, 1]], lod_levels=[0, 0], - dtypes=['float32', 'int64'], - thread_num=1, - for_parallel=True) + dtypes=['float32', 'int64']) reader = fluid.layers.io.double_buffer(reader) img, label = fluid.layers.read_file(reader) hidden = img @@ -60,9 +58,7 @@ def fc_with_batchnorm(use_feed): filenames=[MNIST_RECORDIO_FILE], shapes=[[-1, 784], [-1, 1]], lod_levels=[0, 0], - dtypes=['float32', 'int64'], - thread_num=1, - for_parallel=True) + dtypes=['float32', 'int64']) reader = fluid.layers.io.double_buffer(reader) img, label = fluid.layers.read_file(reader) -- GitLab