diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 34cdac52d33000cfb87a97a1486abe7a4a583bbd..03788b41cb2db1283e0e4d628266a7c02e82c079 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -13,13 +13,14 @@ # limitations under the License. import contextlib -from .. import core -from ..framework import convert_np_dtype_to_dtype_, default_main_program, default_startup_program, Program -from ..unique_name import generate as unique_name from control_flow import BlockGuard -from ..layer_helper import LayerHelper +from layer_function_generator import templatedoc +from .. import core from ..executor import global_scope -from layer_function_generator import generate_layer_fn, templatedoc +from ..framework import convert_np_dtype_to_dtype_, default_main_program, \ + default_startup_program +from ..layer_helper import LayerHelper +from ..unique_name import generate as unique_name __all__ = [ 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'Recv', @@ -446,7 +447,7 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True): return monkey_patch_reader_methods(main_prog_var) -def py_reader(capacity, shapes, dtypes, lod_levels=None): +def py_reader(capacity, shapes, dtypes, lod_levels=None, name=None): """ Create a reader and blocking queue for data feeding in Python @@ -460,9 +461,11 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None): Args: capacity(int): The maximum capacity of the BlockingQueue. - shapes(list): List of tuples which declaring data shapes. - dtypes(list): List of strs which declaring data type. - lod_levels(list): List of ints which declaring data lod_level. + shapes(list|tuple): List of tuples which declaring data shapes. + dtypes(list|tuple): List of strs which declaring data type. + lod_levels(list|tuple): List of ints which declaring data lod_level. + name(basestring): The prefix Python queue name and Reader name. None will + be generated automatically. Returns: tuple(Variable, BlockingQueue): @@ -503,12 +506,18 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None): if lod_levels is None: lod_levels = [0] * len(shapes) - queue_name = unique_name('lod_tensor_blocking_queue') + if name is None: + queue_name = unique_name('lod_tensor_blocking_queue') + reader_name = unique_name('create_py_reader') + else: + queue_name = "_".join([name, "queue"]) + reader_name = "_".join([name, "reader"]) + var = global_scope().var(queue_name) feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes) startup_blk = default_startup_program().current_block() - startup_var = startup_blk.create_var(name=unique_name('create_py_reader')) + startup_var = startup_blk.create_var(name=reader_name) startup_blk.append_op( type='create_py_reader', inputs={'blocking_queue': queue_name}, diff --git a/python/paddle/fluid/tests/demo/pyreader.py b/python/paddle/fluid/tests/demo/pyreader.py new file mode 100644 index 0000000000000000000000000000000000000000..a7550fd1f13948d2279c4de2ee0f69af56701c84 --- /dev/null +++ b/python/paddle/fluid/tests/demo/pyreader.py @@ -0,0 +1,97 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.fluid as fluid +import paddle.dataset.mnist as mnist +import paddle +import threading +import numpy + + +def network(is_train): + reader, queue = fluid.layers.py_reader( + capacity=10, + shapes=((-1, 784), (-1, 1)), + dtypes=('float32', 'int64'), + name="train_reader" if is_train else "test_reader") + img, label = fluid.layers.read_file(fluid.layers.double_buffer(reader)) + + hidden = img + + for i in xrange(2): + hidden = fluid.layers.fc(input=hidden, size=100, act='tanh') + hidden = fluid.layers.dropout( + hidden, dropout_prob=0.5, is_test=not is_train) + + prediction = fluid.layers.fc(input=hidden, size=10, act='softmax') + loss = fluid.layers.cross_entropy(input=prediction, label=label) + return fluid.layers.mean(loss), queue + + +def pipe_reader_to_queue(reader_creator, queue): + with fluid.program_guard(fluid.Program(), fluid.Program()): + feeder = fluid.DataFeeder( + feed_list=[ + fluid.layers.data( + name='img', dtype='float32', shape=[784]), + fluid.layers.data( + name='label', dtype='int64', shape=[1]) + ], + place=fluid.CPUPlace()) + + def __thread_main__(): + for data in feeder.decorate_reader( + reader_creator, multi_devices=False)(): + tmp = fluid.core.LoDTensorArray() + tmp.append(data['img']) + tmp.append(data['label']) + queue.push(tmp) + queue.close() + + th = threading.Thread(target=__thread_main__) + th.start() + return th + + +def main(): + train_prog = fluid.Program() + startup_prog = fluid.Program() + + with fluid.program_guard(train_prog, startup_prog): + with fluid.unique_name.guard(): + loss, train_queue = network(True) + adam = fluid.optimizer.Adam(learning_rate=0.01) + adam.minimize(loss) + + test_prog = fluid.Program() + with fluid.program_guard(test_prog, fluid.Program()): + with fluid.unique_name.guard(): + test_loss, test_queue = network(False) + + fluid.Executor(fluid.CUDAPlace(0)).run(startup_prog) + + trainer = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name) + tester = fluid.ParallelExecutor(use_cuda=True, share_vars_from=trainer) + + for epoch_id in xrange(10): + pipe_reader_to_queue(paddle.batch(mnist.train(), 32), train_queue) + pipe_reader_to_queue(paddle.batch(mnist.test(), 32), test_queue) + try: + print 'train_loss', numpy.array(trainer.run(fetch_list=[loss.name])) + except fluid.core.EOFException: + print 'End of epoch', epoch_id + + +if __name__ == '__main__': + main()