未验证 提交 eae34059 编写于 作者: K Kaipeng Deng 提交者: GitHub

fix dataloader exit error (#32550)

* fix dataloader exit error if user exit program when dataloader is still iterating. test=develop
上级 202b0eaf
......@@ -35,7 +35,7 @@ else:
import paddle
from .. import core, layers
from ..framework import in_dygraph_mode
from ..multiprocess_utils import _set_SIGCHLD_handler, MP_STATUS_CHECK_INTERVAL
from ..multiprocess_utils import _set_SIGCHLD_handler, MP_STATUS_CHECK_INTERVAL, CleanupFuncRegistrar
from .fetcher import _IterableDatasetFetcher, _MapDatasetFetcher
from .batch_sampler import _InfiniteIterableSampler
from .collate import default_collate_fn, default_convert_fn
......@@ -125,6 +125,13 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase):
self._init_thread()
# if user exit python program when dataloader is still
# iterating, resource may no release safely, so we
# add __del__ function to to CleanupFuncRegistrar
# to make sure __del__ is always called when program
# exit for resoure releasing safely
CleanupFuncRegistrar.register(self.__del__)
def _init_thread(self):
self._var_names = [v.name for v in self._feed_list]
self._shapes = [v.shape for v in self._feed_list]
......@@ -178,13 +185,16 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase):
if not self._blocking_queue.push(array):
break
if self._thread_done_event.is_set():
break
self._blocking_queue.close()
self._thread = None
self._shutdown_thread()
except StopIteration:
self._blocking_queue.close()
except Exception:
self._blocking_queue.kill()
self._thread = None
self._shutdown_thread()
logging.warning("DataLoader reader thread raised an exception.")
six.reraise(*sys.exc_info())
......@@ -216,6 +226,13 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase):
self._reader.shutdown()
six.reraise(*sys.exc_info())
def _shutdown_thread(self):
if self._thread:
self._thread_done_event.set()
if self._thread is not threading.current_thread():
self._thread.join()
self._thread = None
# python2 compatibility
def next(self):
return self.__next__()
......@@ -225,6 +242,10 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase):
# need to release thread resources on unexpected exit
if self._blocking_queue:
self._blocking_queue.close()
# NOTE: blocking queue should be closed firstly for
# blocking queue read may hang and _thread_done_event
# cannot be checked
self._shutdown_thread()
class _DataLoaderIterMultiProcess(_DataLoaderIterBase):
......@@ -266,6 +287,13 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase):
self._init_thread()
self._shutdown = False
# if user exit python program when dataloader is still
# iterating, resource may no release safely, so we
# add __del__ function to to CleanupFuncRegistrar
# to make sure __del__ is always called when program
# exit for resoure releasing safely
CleanupFuncRegistrar.register(self.__del__)
def _init_workers(self):
# multiprocess worker and indice queue list initial as empty
self._workers = []
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册