From 826c607e0ce93ac15ab52852278fa461e7d60e65 Mon Sep 17 00:00:00 2001 From: Kaipeng Deng Date: Thu, 2 Jul 2020 10:56:08 +0800 Subject: [PATCH] Fix test multiprocess dataloader static (#25287) * fix test_multiprocess_dataloader_static random fail. test=develop --- python/paddle/fluid/dataloader/dataloader_iter.py | 12 ++++++++++-- .../test_multiprocess_dataloader_dynamic.py | 7 +------ .../unittests/test_multiprocess_dataloader_static.py | 12 ++++-------- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index ac6e05248f..6753c18da4 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -128,8 +128,10 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase): self._need_check_feed = [ v.desc.need_check_feed() for v in self._feed_list ] + # if only 1 place, do not need to keep order self._blocking_queue = core.init_lod_tensor_blocking_queue( - core.Variable(), self._blocking_queue_capacity, True) + core.Variable(), self._blocking_queue_capacity, + len(self._places) > 1) self._reader = core.create_py_reader( self._blocking_queue, self._var_names, self._shapes, self._dtypes, self._need_check_feed, self._places, self._use_buffer_reader, True) @@ -280,8 +282,9 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase): self._need_check_feed = [ v.desc.need_check_feed() for v in self._feed_list ] + # if only 1 place, do not need to keep order self._blocking_queue = core.init_lod_tensor_blocking_queue( - core.Variable(), self._outstanding_capacity, True) + core.Variable(), self._outstanding_capacity, len(self._places) > 1) self._reader = core.create_py_reader( self._blocking_queue, self._var_names, self._shapes, self._dtypes, self._need_check_feed, self._places, self._use_buffer_reader, True) @@ -442,6 +445,11 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase): # get data again data = self._data_queue.get(timeout=self._timeout) except Exception as e: + # check if thread done event set when waiting data + if self._thread_done_event.is_set(): + continue + + # check failed workers failed_workers = [] for i, w in enumerate(self._workers): if self._worker_status[i] and not w.is_alive(): diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dynamic.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dynamic.py index 6af273faf3..0706eb53d5 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dynamic.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dynamic.py @@ -28,12 +28,7 @@ from paddle.fluid.dygraph.nn import Linear from paddle.fluid.dygraph.base import to_variable from test_multiprocess_dataloader_static import RandomDataset, prepare_places - -EPOCH_NUM = 5 -BATCH_SIZE = 16 -IMAGE_SIZE = 784 -SAMPLE_NUM = 400 -CLASS_NUM = 10 +from test_multiprocess_dataloader_static import EPOCH_NUM, BATCH_SIZE, IMAGE_SIZE, SAMPLE_NUM, CLASS_NUM class SimpleFCNet(fluid.dygraph.Layer): diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_static.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_static.py index 6b5996f328..38497f91fc 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_static.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_static.py @@ -25,10 +25,10 @@ import numpy as np import paddle.fluid as fluid from paddle.io import Dataset, BatchSampler, DataLoader -EPOCH_NUM = 5 -BATCH_SIZE = 16 -IMAGE_SIZE = 784 -SAMPLE_NUM = 400 +EPOCH_NUM = 3 +BATCH_SIZE = 8 +IMAGE_SIZE = 32 +SAMPLE_NUM = 100 CLASS_NUM = 10 @@ -157,10 +157,6 @@ class TestStaticDataLoader(unittest.TestCase): return ret def test_main(self): - # FIXME(dkp): disable for random fail in Py35 cloud, - # should be fixed ASAP - if sys.version[:3] == '3.5': - return for p in prepare_places(True): results = [] for num_workers in [0, 2]: -- GitLab