From eae34059bed92a2cdae4aab781a503ab5e4d0fe8 Mon Sep 17 00:00:00 2001 From: Kaipeng Deng Date: Mon, 26 Apr 2021 14:12:48 +0800 Subject: [PATCH] fix dataloader exit error (#32550) * fix dataloader exit error if user exit program when dataloader is still iterating. test=develop --- .../fluid/dataloader/dataloader_iter.py | 34 +++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index 167c7987c55..52ab8369859 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -35,7 +35,7 @@ else: import paddle from .. import core, layers from ..framework import in_dygraph_mode -from ..multiprocess_utils import _set_SIGCHLD_handler, MP_STATUS_CHECK_INTERVAL +from ..multiprocess_utils import _set_SIGCHLD_handler, MP_STATUS_CHECK_INTERVAL, CleanupFuncRegistrar from .fetcher import _IterableDatasetFetcher, _MapDatasetFetcher from .batch_sampler import _InfiniteIterableSampler from .collate import default_collate_fn, default_convert_fn @@ -125,6 +125,13 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase): self._init_thread() + # if user exit python program when dataloader is still + # iterating, resource may no release safely, so we + # add __del__ function to to CleanupFuncRegistrar + # to make sure __del__ is always called when program + # exit for resoure releasing safely + CleanupFuncRegistrar.register(self.__del__) + def _init_thread(self): self._var_names = [v.name for v in self._feed_list] self._shapes = [v.shape for v in self._feed_list] @@ -178,13 +185,16 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase): if not self._blocking_queue.push(array): break + if self._thread_done_event.is_set(): + break + self._blocking_queue.close() - self._thread = None + self._shutdown_thread() except StopIteration: self._blocking_queue.close() except Exception: self._blocking_queue.kill() - self._thread = None + self._shutdown_thread() logging.warning("DataLoader reader thread raised an exception.") six.reraise(*sys.exc_info()) @@ -216,6 +226,13 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase): self._reader.shutdown() six.reraise(*sys.exc_info()) + def _shutdown_thread(self): + if self._thread: + self._thread_done_event.set() + if self._thread is not threading.current_thread(): + self._thread.join() + self._thread = None + # python2 compatibility def next(self): return self.__next__() @@ -225,6 +242,10 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase): # need to release thread resources on unexpected exit if self._blocking_queue: self._blocking_queue.close() + # NOTE: blocking queue should be closed firstly for + # blocking queue read may hang and _thread_done_event + # cannot be checked + self._shutdown_thread() class _DataLoaderIterMultiProcess(_DataLoaderIterBase): @@ -266,6 +287,13 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase): self._init_thread() self._shutdown = False + # if user exit python program when dataloader is still + # iterating, resource may no release safely, so we + # add __del__ function to to CleanupFuncRegistrar + # to make sure __del__ is always called when program + # exit for resoure releasing safely + CleanupFuncRegistrar.register(self.__del__) + def _init_workers(self): # multiprocess worker and indice queue list initial as empty self._workers = [] -- GitLab