diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index 07097c7e75c6ce638549716cd6523f387cdefd92..bad9a6b2ba1cb2f61a1314c0bb59b9d35858d23f 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -168,7 +168,13 @@ void ThreadedSSAGraphExecutor::InsertFetchOps( for (size_t i = 0; i < fetch_tensors.size(); ++i) { auto &var_name = fetch_tensors[i]; - auto &vars = fetched_vars.at(var_name); + + auto fetched_var_it = fetched_vars.find(var_name); + PADDLE_ENFORCE(fetched_var_it != fetched_vars.end(), + "Cannot find fetched variable.(Perhaps the main_program " + "is not set to ParallelExecutor)"); + + auto &vars = fetched_var_it->second; auto *op = new FetchOpHandle(fetch_data, i, &local_scopes_); fetch_ops->emplace_back(op); diff --git a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc index 4b308abc290c10a8a5846672e719b503dfc79b21..3f72890a7cee1453585d50afa04fa62a9b059dc3 100644 --- a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc +++ b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc @@ -48,9 +48,9 @@ class ShuffleReader : public framework::DecoratedReader { private: void ShutdownImpl() override { + reader_->Shutdown(); buffer_.clear(); iteration_pos_ = 0; - reader_->Shutdown(); } void StartImpl() override { diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index 74cda3ad980a72d68b2faf81f97aae95c72cebfb..daeacdb8b4d540e94d1c03bd33b0bbdb024f958d 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -18,7 +18,6 @@ #include "ThreadPool.h" #include "paddle/fluid/framework/blocking_queue.h" #include "paddle/fluid/operators/reader/blocking_queue.h" -#include "paddle/fluid/operators/reader/buffered_reader.h" #include "paddle/fluid/operators/reader/reader_op_registry.h" namespace paddle { @@ -233,17 +232,12 @@ class OpenFilesOp : public framework::OperatorBase { container.reset(new OrderedReaderContainer()); } else { container.reset(new PreemptiveReaderContainer( - static_cast(Attr("thread_num")))); + std::min(file_names.size(), + static_cast(std::thread::hardware_concurrency())))); } - auto reader = - std::make_shared(file_names, std::move(container)); - auto buffer_size = Attr("buffer_size"); - if (buffer_size > 1) { - reader = framework::MakeDecoratedReader( - reader, platform::CPUPlace(), buffer_size); - } - out->Reset(reader); + out->Reset( + std::make_shared(file_names, std::move(container))); } }; @@ -259,8 +253,6 @@ class OpenFilesOpMaker : public FileReaderMakerBase { An OpenFilesOp creates a MultiFileReader, which is able to read data multi-threaded from multiple files. )DOC"); - AddAttr("thread_num", "Number of thread to read files."); - AddAttr("buffer_size", "The reading buffer of these files."); } }; diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index bfcf5ee7d87824e2c68da8ec935da8d77e7bab12..f4481e1df9d727c0725b617f2138a8dae1c16dd8 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -12,16 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. import contextlib +import multiprocessing -from .. import core -from ..framework import convert_np_dtype_to_dtype_, default_main_program, default_startup_program, Program -from ..unique_name import generate as unique_name from control_flow import BlockGuard -from ..layer_helper import LayerHelper +from layer_function_generator import templatedoc +from .. import core from ..executor import global_scope -from layer_function_generator import generate_layer_fn, templatedoc -import sys -import multiprocessing +from ..framework import convert_np_dtype_to_dtype_, default_main_program, \ + default_startup_program +from ..layer_helper import LayerHelper +from ..unique_name import generate as unique_name __all__ = [ 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'Recv', @@ -448,7 +448,12 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True): return monkey_patch_reader_methods(main_prog_var) -def py_reader(capacity, shapes, dtypes, lod_levels=None): +def py_reader(capacity, + shapes, + dtypes, + lod_levels=None, + name=None, + use_double_buffer=True): """ Create a reader and blocking queue for data feeding in Python @@ -461,10 +466,13 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None): using `close()` method when unused. Args: + use_double_buffer(bool): Whether use double buffer or not. capacity(int): The maximum capacity of the BlockingQueue. - shapes(list): List of tuples which declaring data shapes. - dtypes(list): List of strs which declaring data type. - lod_levels(list): List of ints which declaring data lod_level. + shapes(list|tuple): List of tuples which declaring data shapes. + dtypes(list|tuple): List of strs which declaring data type. + lod_levels(list|tuple): List of ints which declaring data lod_level. + name(basestring): The prefix Python queue name and Reader name. None will + be generated automatically. Returns: tuple(Variable, BlockingQueue): @@ -505,15 +513,23 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None): if lod_levels is None: lod_levels = [0] * len(shapes) - queue_name = unique_name('lod_tensor_blocking_queue') + if name is None: + queue_name = unique_name('lod_tensor_blocking_queue') + reader_name = unique_name('create_py_reader') + double_buffer_name = unique_name('double_buffer') + else: + queue_name = "_".join([name, "queue"]) + reader_name = "_".join([name, "reader"]) + double_buffer_name = "_".join([name, "double_buffer"]) + var = global_scope().var(queue_name) feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes) startup_blk = default_startup_program().current_block() - startup_var = startup_blk.create_var(name=unique_name('create_py_reader')) + startup_var = startup_blk.create_var(name=reader_name) startup_blk.append_op( type='create_py_reader', - inputs={'blocking_queue': queue_name}, + inputs={'blocking_queue': [queue_name]}, outputs={'Out': [startup_var]}, attrs={ 'shape_concat': shape_concat, @@ -527,7 +543,10 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None): main_prog_var = _copy_reader_var_(default_main_program().current_block(), startup_var) - return monkey_patch_reader_methods(main_prog_var), feed_queue + reader = monkey_patch_reader_methods(main_prog_var) + if use_double_buffer: + reader = double_buffer(reader, name=double_buffer_name) + return reader, feed_queue def open_files(filenames, diff --git a/python/paddle/fluid/tests/demo/pyreader.py b/python/paddle/fluid/tests/demo/pyreader.py new file mode 100644 index 0000000000000000000000000000000000000000..e2df7b870b47a5558b972e776f98ebfd1cefc57b --- /dev/null +++ b/python/paddle/fluid/tests/demo/pyreader.py @@ -0,0 +1,123 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.fluid as fluid +import paddle.dataset.mnist as mnist +import paddle +import paddle.v2 +import threading +import numpy + + +def network(is_train): + reader, queue = fluid.layers.py_reader( + capacity=10, + shapes=((-1, 784), (-1, 1)), + dtypes=('float32', 'int64'), + name="train_reader" if is_train else "test_reader") + img, label = fluid.layers.read_file(reader) + + hidden = img + + for i in xrange(2): + hidden = fluid.layers.fc(input=hidden, size=100, act='tanh') + hidden = fluid.layers.dropout( + hidden, dropout_prob=0.5, is_test=not is_train) + + prediction = fluid.layers.fc(input=hidden, size=10, act='softmax') + loss = fluid.layers.cross_entropy(input=prediction, label=label) + return fluid.layers.mean(loss), queue, reader + + +def pipe_reader_to_queue(reader_creator, queue): + with fluid.program_guard(fluid.Program(), fluid.Program()): + feeder = fluid.DataFeeder( + feed_list=[ + fluid.layers.data( + name='img', dtype='float32', shape=[784]), + fluid.layers.data( + name='label', dtype='int64', shape=[1]) + ], + place=fluid.CPUPlace()) + + def __thread_main__(): + for data in feeder.decorate_reader( + reader_creator, multi_devices=False)(): + tmp = fluid.core.LoDTensorArray() + tmp.append(data['img']) + tmp.append(data['label']) + queue.push(tmp) + queue.close() + + th = threading.Thread(target=__thread_main__) + th.start() + return th + + +def main(): + train_prog = fluid.Program() + startup_prog = fluid.Program() + + with fluid.program_guard(train_prog, startup_prog): + with fluid.unique_name.guard(): + loss, train_queue, train_reader = network(True) + adam = fluid.optimizer.Adam(learning_rate=0.01) + adam.minimize(loss) + + test_prog = fluid.Program() + test_startup = fluid.Program() + with fluid.program_guard(test_prog, test_startup): + with fluid.unique_name.guard(): + test_loss, test_queue, test_reader = network(False) + + fluid.Executor(fluid.CUDAPlace(0)).run(startup_prog) + fluid.Executor(fluid.CUDAPlace(0)).run(test_startup) + + trainer = fluid.ParallelExecutor( + use_cuda=True, loss_name=loss.name, main_program=train_prog) + + tester = fluid.ParallelExecutor( + use_cuda=True, share_vars_from=trainer, main_program=test_prog) + + for epoch_id in xrange(10): + train_data_thread = pipe_reader_to_queue( + paddle.batch(paddle.v2.reader.firstn(mnist.train(), 32), 64), + train_queue) + try: + while True: + print 'train_loss', numpy.array( + trainer.run(fetch_list=[loss.name])) + except fluid.core.EOFException: + print 'End of epoch', epoch_id + train_reader.reset() + train_data_thread.join() + + test_data_thread = pipe_reader_to_queue( + paddle.batch(mnist.test(), 32), test_queue) + try: + while True: + print 'test loss', numpy.array( + tester.run(fetch_list=[test_loss.name])) + except fluid.core.EOFException: + print 'End of testing' + test_reader.reset() + + test_data_thread.join() + break + del trainer + del tester + + +if __name__ == '__main__': + main()