未验证 提交 dbc54d2d 编写于 作者: K Kaipeng Deng 提交者: GitHub

[cherry pick] fix DataLoader memory leak (#34301)

* fix DataLoader memory leak. test=develop

* fix unittest abort. test=develop

* enable test_dataloader_dataset & skip GPU tensor break. test=develop
上级 5e3ae97b
...@@ -125,13 +125,6 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase): ...@@ -125,13 +125,6 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase):
self._init_thread() self._init_thread()
# if user exit python program when dataloader is still
# iterating, resource may no release safely, so we
# add __del__ function to to CleanupFuncRegistrar
# to make sure __del__ is always called when program
# exit for resoure releasing safely
CleanupFuncRegistrar.register(self.__del__)
def _init_thread(self): def _init_thread(self):
self._var_names = [v.name for v in self._feed_list] self._var_names = [v.name for v in self._feed_list]
self._shapes = [v.shape for v in self._feed_list] self._shapes = [v.shape for v in self._feed_list]
...@@ -229,9 +222,7 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase): ...@@ -229,9 +222,7 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase):
def _shutdown_thread(self): def _shutdown_thread(self):
if self._thread: if self._thread:
self._thread_done_event.set() self._thread_done_event.set()
if self._thread is not threading.current_thread(): self._thread = None
self._thread.join()
self._thread = None
# python2 compatibility # python2 compatibility
def next(self): def next(self):
...@@ -287,17 +278,6 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase): ...@@ -287,17 +278,6 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase):
self._init_thread() self._init_thread()
self._shutdown = False self._shutdown = False
# if user exit python program when dataloader is still
# iterating, resource may no release safely, so we
# add _shutdown_on_exit function to to CleanupFuncRegistrar
# to make sure _try_shutdown_all is always called when program
# exit for resoure releasing safely
# worker join may hang for in _try_shutdown_all call in atexit
# for main process is in atexit state in some OS, so we add
# timeout=1 for shutdown function call in atexit, for shutdown
# function call in __del__, we keep it as it is
CleanupFuncRegistrar.register(self._shutdown_on_exit)
def _init_workers(self): def _init_workers(self):
# multiprocess worker and indice queue list initial as empty # multiprocess worker and indice queue list initial as empty
self._workers = [] self._workers = []
......
...@@ -66,7 +66,12 @@ class TestDatasetWithDiffOutputPlace(unittest.TestCase): ...@@ -66,7 +66,12 @@ class TestDatasetWithDiffOutputPlace(unittest.TestCase):
for image, label in loader: for image, label in loader:
self.assertTrue(image.place.is_gpu_place()) self.assertTrue(image.place.is_gpu_place())
self.assertTrue(label.place.is_cuda_pinned_place()) self.assertTrue(label.place.is_cuda_pinned_place())
break # FIXME(dkp): when input tensor is in GPU place and
# iteration break in the median, it seems the GPU
# tensor put into blocking_queue cannot be safely
# released and may cause ABRT/SEGV, this should
# be fixed
# break
def test_multi_process(self): def test_multi_process(self):
# DataLoader with multi-process mode is not supported on MacOs and Windows currently # DataLoader with multi-process mode is not supported on MacOs and Windows currently
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册