diff --git a/python/paddle/fluid/tests/unittests/test_imperative_data_loader_process.py b/python/paddle/fluid/tests/unittests/test_imperative_data_loader_process.py index 051cb3b5bca8c74516693a13e96ff1c4f2a14dde..7fb2cb0090da57ae837d1f774518dd90a41df56c 100644 --- a/python/paddle/fluid/tests/unittests/test_imperative_data_loader_process.py +++ b/python/paddle/fluid/tests/unittests/test_imperative_data_loader_process.py @@ -14,6 +14,7 @@ import sys import unittest +import multiprocessing import numpy as np import paddle.fluid as fluid from paddle.fluid import core @@ -50,6 +51,14 @@ class TestDygraphDataLoaderProcess(unittest.TestCase): self.capacity = 2 def test_reader_process_loop(self): + # This unittest's memory mapped files needs to be cleaned manually + def __clear_process__(util_queue): + while True: + try: + util_queue.get_nowait() + except queue.Empty: + break + with fluid.dygraph.guard(): loader = fluid.io.DataLoader.from_generator( capacity=self.batch_num + 1, use_multiprocess=True) @@ -58,8 +67,16 @@ class TestDygraphDataLoaderProcess(unittest.TestCase): places=fluid.CPUPlace()) loader._data_queue = queue.Queue(self.batch_num + 1) loader._reader_process_loop() + # For clean memory mapped files + util_queue = multiprocessing.Queue(self.batch_num + 1) for _ in range(self.batch_num): - loader._data_queue.get(timeout=10) + data = loader._data_queue.get(timeout=10) + util_queue.put(data) + + # Clean up memory mapped files + clear_process = multiprocessing.Process( + target=__clear_process__, args=(util_queue, )) + clear_process.start() def test_reader_process_loop_simple_none(self): def none_sample_genarator(batch_num):