diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 08f6d66a6524f6cf44fa9abd86df90874b039b53..8a82f8e05f1cce465e3c856a04e482360e57cbb3 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -13,13 +13,15 @@ # limitations under the License. import contextlib import multiprocessing +import threading +from ..data_feeder import DataFeeder from control_flow import BlockGuard from layer_function_generator import templatedoc from .. import core from ..executor import global_scope from ..framework import convert_np_dtype_to_dtype_, default_main_program, \ - default_startup_program + default_startup_program, program_guard, Program from ..layer_helper import LayerHelper from ..unique_name import generate as unique_name @@ -550,7 +552,71 @@ def py_reader(capacity, # py_reader. double_buffer_reader.reset = reader.reset reader = double_buffer_reader - return reader, feed_queue + + # monkey patch py_reader special methods + reader.queue = feed_queue + current_reset_method = reader.reset + reader.thread = None + reader.tensor_provider = None + + def start_provide_thread(func): + def __provider_thread__(): + for tensors in func(): + array = core.LoDTensorArray() + for item in tensors: + if not isinstance(item, core.LoDTensor): + tmp = core.LoDTensor() + tmp.set(item, core.CPUPlace()) + item = tmp + + array.append(item) + + feed_queue.push(array) + feed_queue.close() + + reader.thread = threading.Thread(target=__provider_thread__) + reader.thread.start() + + def __set_tensor_provider__(func): + reader._tensor_provider = func + start_provide_thread(reader._tensor_provider) + + def __set_paddle_reader__(reader): + with program_guard(Program(), Program()): + feed_list = [] + counter = 0 + for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels): + name = str(counter) + feed_list.append( + data( + name=name, + dtype=dtype, + shape=shape, + lod_level=lod_level)) + counter += 1 + + feeder = DataFeeder(feed_list=feed_list, place=core.CPUPlace()) + + reader = feeder.decorate_reader(reader, multi_devices=False) + + def __tensor_provider__(): + for data in reader(): + yield [data[str(idx)] for idx in xrange(counter)] + + __set_tensor_provider__(__tensor_provider__) + + def __reset__(): + current_reset_method() + if reader.thread is not None and reader.tensor_provider is not None: + reader.thread.join() + # restart provider thread. + start_provide_thread(reader.tensor_provider) + + reader.reset = __reset__ + reader.decorate_tensor_provider = __set_tensor_provider__ + reader.decorate_paddle_reader = __set_paddle_reader__ + + return reader def open_files(filenames, diff --git a/python/paddle/fluid/tests/demo/pyreader.py b/python/paddle/fluid/tests/demo/pyreader.py index e2df7b870b47a5558b972e776f98ebfd1cefc57b..3185df07db2d151b956e6aba3e373c1af7a2af14 100644 --- a/python/paddle/fluid/tests/demo/pyreader.py +++ b/python/paddle/fluid/tests/demo/pyreader.py @@ -12,16 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import paddle.fluid as fluid -import paddle.dataset.mnist as mnist +import numpy + import paddle +import paddle.dataset.mnist as mnist +import paddle.fluid as fluid import paddle.v2 -import threading -import numpy def network(is_train): - reader, queue = fluid.layers.py_reader( + reader = fluid.layers.py_reader( capacity=10, shapes=((-1, 784), (-1, 1)), dtypes=('float32', 'int64'), @@ -37,32 +37,7 @@ def network(is_train): prediction = fluid.layers.fc(input=hidden, size=10, act='softmax') loss = fluid.layers.cross_entropy(input=prediction, label=label) - return fluid.layers.mean(loss), queue, reader - - -def pipe_reader_to_queue(reader_creator, queue): - with fluid.program_guard(fluid.Program(), fluid.Program()): - feeder = fluid.DataFeeder( - feed_list=[ - fluid.layers.data( - name='img', dtype='float32', shape=[784]), - fluid.layers.data( - name='label', dtype='int64', shape=[1]) - ], - place=fluid.CPUPlace()) - - def __thread_main__(): - for data in feeder.decorate_reader( - reader_creator, multi_devices=False)(): - tmp = fluid.core.LoDTensorArray() - tmp.append(data['img']) - tmp.append(data['label']) - queue.push(tmp) - queue.close() - - th = threading.Thread(target=__thread_main__) - th.start() - return th + return fluid.layers.mean(loss), reader def main(): @@ -71,7 +46,7 @@ def main(): with fluid.program_guard(train_prog, startup_prog): with fluid.unique_name.guard(): - loss, train_queue, train_reader = network(True) + loss, train_reader = network(True) adam = fluid.optimizer.Adam(learning_rate=0.01) adam.minimize(loss) @@ -79,7 +54,7 @@ def main(): test_startup = fluid.Program() with fluid.program_guard(test_prog, test_startup): with fluid.unique_name.guard(): - test_loss, test_queue, test_reader = network(False) + test_loss, test_reader = network(False) fluid.Executor(fluid.CUDAPlace(0)).run(startup_prog) fluid.Executor(fluid.CUDAPlace(0)).run(test_startup) @@ -90,10 +65,13 @@ def main(): tester = fluid.ParallelExecutor( use_cuda=True, share_vars_from=trainer, main_program=test_prog) + train_reader.decorate_paddle_reader( + paddle.v2.reader.shuffle( + paddle.batch(mnist.train(), 256), buf_size=8192)) + + test_reader.decorate_paddle_reader(paddle.batch(mnist.test(), 256)) + for epoch_id in xrange(10): - train_data_thread = pipe_reader_to_queue( - paddle.batch(paddle.v2.reader.firstn(mnist.train(), 32), 64), - train_queue) try: while True: print 'train_loss', numpy.array( @@ -101,10 +79,7 @@ def main(): except fluid.core.EOFException: print 'End of epoch', epoch_id train_reader.reset() - train_data_thread.join() - test_data_thread = pipe_reader_to_queue( - paddle.batch(mnist.test(), 32), test_queue) try: while True: print 'test loss', numpy.array( @@ -113,11 +88,6 @@ def main(): print 'End of testing' test_reader.reset() - test_data_thread.join() - break - del trainer - del tester - if __name__ == '__main__': main()