diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index ac6e05248f72d5a0499138586b25f6f35c4822af..6753c18da464926dbc5c18d71046cd95b88d441b 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -128,8 +128,10 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase): self._need_check_feed = [ v.desc.need_check_feed() for v in self._feed_list ] + # if only 1 place, do not need to keep order self._blocking_queue = core.init_lod_tensor_blocking_queue( - core.Variable(), self._blocking_queue_capacity, True) + core.Variable(), self._blocking_queue_capacity, + len(self._places) > 1) self._reader = core.create_py_reader( self._blocking_queue, self._var_names, self._shapes, self._dtypes, self._need_check_feed, self._places, self._use_buffer_reader, True) @@ -280,8 +282,9 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase): self._need_check_feed = [ v.desc.need_check_feed() for v in self._feed_list ] + # if only 1 place, do not need to keep order self._blocking_queue = core.init_lod_tensor_blocking_queue( - core.Variable(), self._outstanding_capacity, True) + core.Variable(), self._outstanding_capacity, len(self._places) > 1) self._reader = core.create_py_reader( self._blocking_queue, self._var_names, self._shapes, self._dtypes, self._need_check_feed, self._places, self._use_buffer_reader, True) @@ -442,6 +445,11 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase): # get data again data = self._data_queue.get(timeout=self._timeout) except Exception as e: + # check if thread done event set when waiting data + if self._thread_done_event.is_set(): + continue + + # check failed workers failed_workers = [] for i, w in enumerate(self._workers): if self._worker_status[i] and not w.is_alive(): diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dynamic.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dynamic.py index 6af273faf3942f19e0612811f644f6606417394c..0706eb53d537da58a5a248e060759b748b30af19 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dynamic.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dynamic.py @@ -28,12 +28,7 @@ from paddle.fluid.dygraph.nn import Linear from paddle.fluid.dygraph.base import to_variable from test_multiprocess_dataloader_static import RandomDataset, prepare_places - -EPOCH_NUM = 5 -BATCH_SIZE = 16 -IMAGE_SIZE = 784 -SAMPLE_NUM = 400 -CLASS_NUM = 10 +from test_multiprocess_dataloader_static import EPOCH_NUM, BATCH_SIZE, IMAGE_SIZE, SAMPLE_NUM, CLASS_NUM class SimpleFCNet(fluid.dygraph.Layer): diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_static.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_static.py index 6b5996f3283e5e31dd640088cba1dbdabbd40ef9..38497f91fc18847e40efa691a65c2a7adc20e51c 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_static.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_static.py @@ -25,10 +25,10 @@ import numpy as np import paddle.fluid as fluid from paddle.io import Dataset, BatchSampler, DataLoader -EPOCH_NUM = 5 -BATCH_SIZE = 16 -IMAGE_SIZE = 784 -SAMPLE_NUM = 400 +EPOCH_NUM = 3 +BATCH_SIZE = 8 +IMAGE_SIZE = 32 +SAMPLE_NUM = 100 CLASS_NUM = 10 @@ -157,10 +157,6 @@ class TestStaticDataLoader(unittest.TestCase): return ret def test_main(self): - # FIXME(dkp): disable for random fail in Py35 cloud, - # should be fixed ASAP - if sys.version[:3] == '3.5': - return for p in prepare_places(True): results = [] for num_workers in [0, 2]: