diff --git a/paddle/fluid/operators/reader/create_py_reader_op.cc b/paddle/fluid/operators/reader/create_py_reader_op.cc index 833776f56eef0ffb2ae5e963919f0482bcd511b8..0f31ca1a94326956ae5e6dffd582daedeb55a9e3 100644 --- a/paddle/fluid/operators/reader/create_py_reader_op.cc +++ b/paddle/fluid/operators/reader/create_py_reader_op.cc @@ -33,6 +33,8 @@ class PyReader : public framework::FileReader { if (!success) out->clear(); } + ~PyReader() { queue_->Close(); } + void Shutdown() override { queue_->Close(); } void Start() override { queue_->ReOpen(); } diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 8a82f8e05f1cce465e3c856a04e482360e57cbb3..0bf9f46cf72281449b273651b613f3b57fddbdd1 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -558,6 +558,7 @@ def py_reader(capacity, current_reset_method = reader.reset reader.thread = None reader.tensor_provider = None + reader.exited = False def start_provide_thread(func): def __provider_thread__(): @@ -571,17 +572,20 @@ def py_reader(capacity, array.append(item) + if reader.exited: + break feed_queue.push(array) + if reader.exited: + break feed_queue.close() reader.thread = threading.Thread(target=__provider_thread__) reader.thread.start() def __set_tensor_provider__(func): - reader._tensor_provider = func - start_provide_thread(reader._tensor_provider) + reader.tensor_provider = func - def __set_paddle_reader__(reader): + def __set_paddle_reader__(paddle_reader): with program_guard(Program(), Program()): feed_list = [] counter = 0 @@ -596,25 +600,29 @@ def py_reader(capacity, counter += 1 feeder = DataFeeder(feed_list=feed_list, place=core.CPUPlace()) - - reader = feeder.decorate_reader(reader, multi_devices=False) + paddle_reader = feeder.decorate_reader( + paddle_reader, multi_devices=False) def __tensor_provider__(): - for data in reader(): - yield [data[str(idx)] for idx in xrange(counter)] + for slots in paddle_reader(): + yield [slots[str(idx)] for idx in xrange(counter)] __set_tensor_provider__(__tensor_provider__) def __reset__(): current_reset_method() if reader.thread is not None and reader.tensor_provider is not None: + reader.exited = True reader.thread.join() - # restart provider thread. - start_provide_thread(reader.tensor_provider) + reader.exited = False + + def __start__(): + start_provide_thread(reader.tensor_provider) reader.reset = __reset__ reader.decorate_tensor_provider = __set_tensor_provider__ reader.decorate_paddle_reader = __set_paddle_reader__ + reader.start = __start__ return reader diff --git a/python/paddle/fluid/tests/demo/pyreader.py b/python/paddle/fluid/tests/demo/pyreader.py index 3185df07db2d151b956e6aba3e373c1af7a2af14..3a7dbf8106268582c9c771c7ab9e2a7be7a2f313 100644 --- a/python/paddle/fluid/tests/demo/pyreader.py +++ b/python/paddle/fluid/tests/demo/pyreader.py @@ -67,11 +67,12 @@ def main(): train_reader.decorate_paddle_reader( paddle.v2.reader.shuffle( - paddle.batch(mnist.train(), 256), buf_size=8192)) + paddle.batch(mnist.train(), 512), buf_size=8192)) - test_reader.decorate_paddle_reader(paddle.batch(mnist.test(), 256)) + test_reader.decorate_paddle_reader(paddle.batch(mnist.test(), 512)) for epoch_id in xrange(10): + train_reader.start() try: while True: print 'train_loss', numpy.array( @@ -80,6 +81,7 @@ def main(): print 'End of epoch', epoch_id train_reader.reset() + test_reader.start() try: while True: print 'test loss', numpy.array(