From 7d8d573453d6fbb890a5ac386bcc4666e1cda7b1 Mon Sep 17 00:00:00 2001 From: Chen Weihang Date: Mon, 2 Mar 2020 14:24:14 +0800 Subject: [PATCH] Speed up dygraph DataLoader based on shared memory and LoDTensor serialization (#22541) * add lodtensor share memory & serialization, test=develop * fix windows compile error, test=develop * deal vartype pickle & fix unittest matching error message, test=develop * update timeout variable name, test=develop * refactor memory map implement, test=develop * clear mmap file discripter when exit unexpectedly, test=develop * remove the child process fd in advance, test=develop * remove mmap fds after Queue.put in child process, test=develop * add hard unittests for register exit func, test=develop * fix python2 compatibility problem in unittest, test=develop * fix exception unittest error, test=develop * polish code based review comment, test=develop --- paddle/fluid/memory/allocation/CMakeLists.txt | 5 + .../fluid/memory/allocation/mmap_allocator.cc | 142 +++++++++++++ .../fluid/memory/allocation/mmap_allocator.h | 90 ++++++++ .../memory/allocation/mmap_allocator_test.cc | 54 +++++ paddle/fluid/pybind/CMakeLists.txt | 1 + paddle/fluid/pybind/imperative.cc | 93 +++++++-- paddle/fluid/pybind/pybind.cc | 52 +++++ python/paddle/fluid/core.py | 6 + python/paddle/fluid/reader.py | 197 +++++++++++++----- .../fluid/tests/unittests/CMakeLists.txt | 2 + .../unittests/test_imperative_data_loader.py | 2 +- .../test_imperative_data_loader_exception.py | 6 +- .../test_imperative_data_loader_exit_func.py | 84 ++++++++ .../test_imperative_data_loader_fds_clear.py | 78 +++++++ .../test_imperative_data_loader_process.py | 5 +- 15 files changed, 745 insertions(+), 72 deletions(-) create mode 100644 paddle/fluid/memory/allocation/mmap_allocator.cc create mode 100644 paddle/fluid/memory/allocation/mmap_allocator.h create mode 100644 paddle/fluid/memory/allocation/mmap_allocator_test.cc create mode 100644 python/paddle/fluid/tests/unittests/test_imperative_data_loader_exit_func.py create mode 100644 python/paddle/fluid/tests/unittests/test_imperative_data_loader_fds_clear.py diff --git a/paddle/fluid/memory/allocation/CMakeLists.txt b/paddle/fluid/memory/allocation/CMakeLists.txt index dc3d9a1f56e..dc26c19cbc8 100644 --- a/paddle/fluid/memory/allocation/CMakeLists.txt +++ b/paddle/fluid/memory/allocation/CMakeLists.txt @@ -66,3 +66,8 @@ cc_test(allocator_facade_frac_flags_test SRCS allocator_facade_frac_flags_test.c cc_library(auto_growth_best_fit_allocator SRCS auto_growth_best_fit_allocator.cc DEPS allocator aligned_allocator) cc_test(auto_growth_best_fit_allocator_facade_test SRCS auto_growth_best_fit_allocator_facade_test.cc DEPS cpu_allocator auto_growth_best_fit_allocator) cc_test(auto_growth_best_fit_allocator_test SRCS auto_growth_best_fit_allocator_test.cc DEPS auto_growth_best_fit_allocator) + +if(NOT WIN32) + cc_library(mmap_allocator SRCS mmap_allocator.cc DEPS allocator) + cc_test(mmap_allocator_test SRCS mmap_allocator_test.cc DEPS mmap_allocator allocator) +endif(NOT WIN32) diff --git a/paddle/fluid/memory/allocation/mmap_allocator.cc b/paddle/fluid/memory/allocation/mmap_allocator.cc new file mode 100644 index 00000000000..0ef084bafd0 --- /dev/null +++ b/paddle/fluid/memory/allocation/mmap_allocator.cc @@ -0,0 +1,142 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef _WIN32 + +#include "paddle/fluid/memory/allocation/mmap_allocator.h" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace paddle { +namespace memory { +namespace allocation { + +MemoryMapWriterAllocation::~MemoryMapWriterAllocation() { + PADDLE_ENFORCE_NE( + munmap(this->ptr(), this->size()), -1, + platform::errors::Unavailable("could not unmap the shared memory file %s", + this->ipc_name())); +} + +MemoryMapReaderAllocation::~MemoryMapReaderAllocation() { + PADDLE_ENFORCE_NE( + munmap(this->ptr(), this->size()), -1, + platform::errors::Unavailable("could not unmap the shared memory file %s", + this->ipc_name())); + /* Here we do not pay attention to the result of shm_unlink, + because the memory mapped file may have been cleared due to the + MemoryMapFdSet::Clear() */ + shm_unlink(this->ipc_name().c_str()); + MemoryMapFdSet::Instance().Remove(this->ipc_name()); + VLOG(3) << "~MemoryMapReaderAllocation: " << this->ipc_name(); +} + +std::string GetIPCName() { + static std::random_device rd; + std::string handle = "/paddle_"; +#ifdef _WIN32 + handle += std::to_string(GetCurrentProcessId()); +#else + handle += std::to_string(getpid()); +#endif + handle += "_"; + handle += std::to_string(rd()); + return std::move(handle); +} + +std::shared_ptr AllocateMemoryMapWriterAllocation( + size_t size) { + const std::string &ipc_name = GetIPCName(); + int flags = O_RDWR | O_CREAT; + + int fd = shm_open(ipc_name.c_str(), flags, 0644); + PADDLE_ENFORCE_NE( + fd, -1, platform::errors::Unavailable("File descriptor %s open failed", + ipc_name.c_str())); + PADDLE_ENFORCE_EQ(ftruncate(fd, size), 0, + platform::errors::Unavailable( + "Fruncate a file to a specified length failed!")); + + void *ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + PADDLE_ENFORCE_NE(ptr, MAP_FAILED, + platform::errors::Unavailable( + "Memory map failed when create shared memory.")); + close(fd); + + return std::make_shared(ptr, size, ipc_name); +} + +std::shared_ptr RebuildMemoryMapReaderAllocation( + const std::string &ipc_name, size_t size) { + int fd = shm_open(ipc_name.c_str(), O_RDONLY, 0644); + PADDLE_ENFORCE_NE( + fd, -1, platform::errors::Unavailable("File descriptor %s open failed", + ipc_name.c_str())); + + void *ptr = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0); + PADDLE_ENFORCE_NE(ptr, MAP_FAILED, + platform::errors::Unavailable( + "Memory map failed when rebuild shared memory.")); + close(fd); + return std::make_shared(ptr, size, ipc_name); +} + +MemoryMapFdSet &MemoryMapFdSet::Instance() { // NOLINT + static MemoryMapFdSet set; + return set; +} + +void MemoryMapFdSet::Insert(const std::string &ipc_name) { + std::lock_guard guard(mtx_); + fd_set_.emplace(ipc_name); + VLOG(3) << "PID: " << getpid() << ", MemoryMapFdSet: insert " << ipc_name + << ", set size: " << fd_set_.size(); +} + +void MemoryMapFdSet::Remove(const std::string &ipc_name) { + std::lock_guard guard(mtx_); + fd_set_.erase(ipc_name); + VLOG(3) << "PID: " << getpid() << ", MemoryMapFdSet: erase " << ipc_name + << ", set size: " << fd_set_.size(); +} + +void MemoryMapFdSet::Clear() { + VLOG(3) << "PID: " << getpid() << ", MemoryMapFdSet: set size - " + << fd_set_.size(); + std::lock_guard guard(mtx_); + for (auto fd : fd_set_) { + int rlt = shm_unlink(fd.c_str()); + if (rlt == 0) { + VLOG(3) << "PID: " << getpid() << ", MemoryMapFdSet: clear " << fd; + } + } + fd_set_.clear(); +} + +MemoryMapFdSet::~MemoryMapFdSet() { Clear(); } + +} // namespace allocation +} // namespace memory +} // namespace paddle + +#endif diff --git a/paddle/fluid/memory/allocation/mmap_allocator.h b/paddle/fluid/memory/allocation/mmap_allocator.h new file mode 100644 index 00000000000..3f91e5c4278 --- /dev/null +++ b/paddle/fluid/memory/allocation/mmap_allocator.h @@ -0,0 +1,90 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#ifndef _WIN32 + +#include +#include // NOLINT +#include +#include +#include + +#include "paddle/fluid/memory/allocation/allocator.h" + +namespace paddle { +namespace memory { +namespace allocation { + +class MemoryMapWriterAllocation : public Allocation { + public: + explicit MemoryMapWriterAllocation(void *ptr, size_t size, + std::string ipc_name) + : Allocation(ptr, size, platform::CPUPlace()), + ipc_name_(std::move(ipc_name)) {} + + inline const std::string &ipc_name() const { return ipc_name_; } + + ~MemoryMapWriterAllocation() override; + + private: + std::string ipc_name_; +}; + +class MemoryMapReaderAllocation : public Allocation { + public: + explicit MemoryMapReaderAllocation(void *ptr, size_t size, + std::string ipc_name) + : Allocation(ptr, size, platform::CPUPlace()), + ipc_name_(std::move(ipc_name)) {} + + inline const std::string &ipc_name() const { return ipc_name_; } + + ~MemoryMapReaderAllocation() override; + + private: + std::string ipc_name_; +}; + +std::shared_ptr AllocateMemoryMapWriterAllocation( + size_t size); + +std::shared_ptr RebuildMemoryMapReaderAllocation( + const std::string &ipc_name, size_t size); + +class MemoryMapFdSet { + public: + static MemoryMapFdSet &Instance(); // NOLINT + + void Insert(const std::string &ipc_name); + + void Remove(const std::string &ipc_name); + + void Clear(); + + ~MemoryMapFdSet(); + + private: + MemoryMapFdSet() = default; + + std::unordered_set fd_set_; + std::mutex mtx_; +}; + +} // namespace allocation +} // namespace memory +} // namespace paddle + +#endif diff --git a/paddle/fluid/memory/allocation/mmap_allocator_test.cc b/paddle/fluid/memory/allocation/mmap_allocator_test.cc new file mode 100644 index 00000000000..5b66920be2a --- /dev/null +++ b/paddle/fluid/memory/allocation/mmap_allocator_test.cc @@ -0,0 +1,54 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef _WIN32 + +#include "paddle/fluid/memory/allocation/mmap_allocator.h" + +#include + +#include "gtest/gtest.h" + +namespace paddle { +namespace memory { +namespace allocation { + +TEST(MemoryMapAllocation, test_allocation_base) { + size_t data_size = 4UL * 1024; + // 1. allocate writer holader + auto mmap_writer_holder = AllocateMemoryMapWriterAllocation(data_size); + std::string ipc_name = mmap_writer_holder->ipc_name(); + // 2. write data + auto* writer_ptr = static_cast(mmap_writer_holder->ptr()); + for (int32_t i = 0; i < 1024; ++i) { + writer_ptr[i] = i; + } + // 3. create child process + pid_t fpid = fork(); + if (fpid == 0) { + // 4. rebuild reader holder + auto mmap_reader_holder = + RebuildMemoryMapReaderAllocation(ipc_name, data_size); + auto* reader_ptr = static_cast(mmap_reader_holder->ptr()); + for (int32_t i = 0; i < 1024; ++i) { + ASSERT_EQ(reader_ptr[i], i); + } + } +} + +} // namespace allocation +} // namespace memory +} // namespace paddle + +#endif diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index 320951debb7..87dceb1850f 100644 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -9,6 +9,7 @@ endif() if(NOT WIN32) set(PYBIND_DEPS ${PYBIND_DEPS} data_loader) + set(PYBIND_DEPS ${PYBIND_DEPS} mmap_allocator) if (WITH_NCCL) set(PYBIND_DEPS ${PYBIND_DEPS} nccl_context) endif() diff --git a/paddle/fluid/pybind/imperative.cc b/paddle/fluid/pybind/imperative.cc index beeae143f5f..c9f455d66e2 100644 --- a/paddle/fluid/pybind/imperative.cc +++ b/paddle/fluid/pybind/imperative.cc @@ -31,6 +31,7 @@ limitations under the License. */ #include "paddle/fluid/imperative/profiler.h" #include "paddle/fluid/imperative/tracer.h" #include "paddle/fluid/imperative/type_defs.h" +#include "paddle/fluid/memory/allocation/mmap_allocator.h" #include "paddle/fluid/pybind/op_function.h" #include "paddle/fluid/pybind/pybind_boost_headers.h" #include "paddle/fluid/pybind/tensor_py.h" @@ -220,6 +221,85 @@ void BindImperative(py::module *m_ptr) { BindOpFunctions(&m); +#ifndef _WIN32 + // Dygraph DataLoader signal handler + m.def("_set_process_pid", [](int64_t key, pid_t pid) { + imperative::SetLoadProcessPID(key, pid); + }); + m.def("_erase_process_pid", + [](int64_t key) { imperative::EraseLoadProcessPID(key); }); + m.def("_set_process_signal_handler", + []() { imperative::SetLoadProcessSignalHandler(); }); + m.def("_throw_error_if_process_failed", + []() { imperative::ThrowErrorIfLoadProcessFailed(); }); + + // Dygraph DataLoader reader process & thread related functions + m.def( + "_convert_to_tensor_list", + [](py::object &obj) -> py::list { + // 0. input data check + PADDLE_ENFORCE( + py::isinstance(obj) || py::isinstance(obj), + platform::errors::InvalidArgument( + "The batch data read into DataLoader is illegal." + "Expected data type is tuple or list, but received %s", + obj.get_type())); + py::list batch = py::cast(obj); + py::list tensors; + for (size_t i = 0; i < batch.size(); ++i) { + // 1. cast to python array + auto array = batch[i].cast(); + PADDLE_ENFORCE_NE( + string::Sprintf("%s", array.dtype()).compare("object"), 0, + platform::errors::InvalidArgument( + "Faild to convert input data to a regular ndarray.\n * " + "Usually this means the input data contains nested " + "lists with different lengths.\n * Check the reader " + "function passed to 'set_(sample/sample_list/batch)" + "_generator' to locate the data causes this issue.")); + // 2. construcct LoDTensor + framework::LoDTensor t; + SetTensorFromPyArray(&t, array, + platform::CPUPlace(), true); + // 3. allocate shared memory + void *data_ptr = t.data(); + size_t data_size = t.numel() * framework::SizeOfType(t.type()); + auto shared_writer_holder = + memory::allocation::AllocateMemoryMapWriterAllocation(data_size); + // 4. maintain mmap fd set & backup ipc_name + const std::string &ipc_name = shared_writer_holder->ipc_name(); + memory::allocation::MemoryMapFdSet::Instance().Insert(ipc_name); + // 5. copy data & reset holder + memory::Copy(platform::CPUPlace(), shared_writer_holder->ptr(), + platform::CPUPlace(), data_ptr, data_size); + t.ResetHolder(shared_writer_holder); + // 6. append to result list + tensors.append(t); + } + return tensors; + }, + py::return_value_policy::take_ownership); + + m.def("_remove_tensor_list_mmap_fds", [](py::list &tensor_list) { + for (size_t i = 0; i < tensor_list.size(); ++i) { + auto t = tensor_list[i].cast(); + auto *mmap_writer_allocation = + dynamic_cast( + t.Holder().get()); + PADDLE_ENFORCE_NOT_NULL( + mmap_writer_allocation, + platform::errors::NotFound("The shared memory of LoDTensor in " + "DataLoader's child process has been " + "released.")); + memory::allocation::MemoryMapFdSet::Instance().Remove( + mmap_writer_allocation->ipc_name()); + } + }); + + m.def("_cleanup_mmap_fds", + []() { memory::allocation::MemoryMapFdSet::Instance().Clear(); }); +#endif + py::class_ backward_strategy( m, "BackwardStrategy", R"DOC( @@ -277,19 +357,6 @@ void BindImperative(py::module *m_ptr) { imperative::SetCurrentTracer(tracer); }); -#ifndef _WIN32 - // Dygraph DataLoader signal handler - m.def("_set_process_pid", [](int64_t key, pid_t pid) { - imperative::SetLoadProcessPID(key, pid); - }); - m.def("_erase_process_pid", - [](int64_t key) { imperative::EraseLoadProcessPID(key); }); - m.def("_set_process_signal_handler", - []() { imperative::SetLoadProcessSignalHandler(); }); - m.def("_throw_error_if_process_failed", - []() { imperative::ThrowErrorIfLoadProcessFailed(); }); -#endif - py::class_>( m, "VarBase", R"DOC()DOC") diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 03078a767b9..a0cf35ea299 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -47,6 +47,7 @@ limitations under the License. */ #include "paddle/fluid/framework/version.h" #include "paddle/fluid/imperative/layer.h" #include "paddle/fluid/memory/allocation/allocator_strategy.h" +#include "paddle/fluid/memory/allocation/mmap_allocator.h" #include "paddle/fluid/operators/activation_op.h" #include "paddle/fluid/operators/py_func_op.h" #include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h" @@ -860,7 +861,58 @@ PYBIND11_MODULE(core_noavx, m) { } dst.set_lod(self.lod()); return dst; +#ifdef _WIN32 }); +#else + }) + .def(py::pickle( + [](const LoDTensor &t) { // __getstate__ + auto holder = t.Holder(); + PADDLE_ENFORCE_EQ( + platform::is_cpu_place(holder->place()), true, + platform::errors::PreconditionNotMet( + "LoDTensor is not on CPU." + "Now only LoDTensor on CPU can be serialized.")); + auto* mmap_writer_allocation = + dynamic_cast( + holder.get()); + PADDLE_ENFORCE_NOT_NULL(mmap_writer_allocation, + platform::errors::PreconditionNotMet( + "LoDTensor is not in shared memory." + "Now only LoDTensor on shared memory can be serialized.")); + int type_idx = static_cast(t.type()); + + return py::make_tuple(mmap_writer_allocation->ipc_name(), + mmap_writer_allocation->size(), + type_idx, vectorize(t.dims()), t.lod()); + }, + [](py::tuple t) { // __setstate__ + if (t.size() != 5) + throw std::runtime_error("Invalid LoDTensor state!"); + + // 1. Create a new C++ instance + LoDTensor tensor; + + // 2. Rebuild Allocation + const std::string &ipc_name = t[0].cast(); + size_t size = t[1].cast(); + auto shared_reader_holder = + memory::allocation::RebuildMemoryMapReaderAllocation( + ipc_name, size); + + // 3. Maintain global fd set + VLOG(3) << "LoDTensor ipc name: " << ipc_name; + memory::allocation::MemoryMapFdSet::Instance().Insert(ipc_name); + + // 4. Rebuild LoDTensor + tensor.ResetHolderWithType(shared_reader_holder, + static_cast(t[2].cast())); + tensor.Resize(make_ddim(t[3].cast>())); + tensor.set_lod(t[4].cast()); + + return tensor; + })); +#endif py::class_(m, "SelectedRows") .def("__init__", diff --git a/python/paddle/fluid/core.py b/python/paddle/fluid/core.py index 067e733094e..71ed95a9067 100644 --- a/python/paddle/fluid/core.py +++ b/python/paddle/fluid/core.py @@ -189,6 +189,9 @@ if avx_supported(): from .core_avx import _erase_process_pid from .core_avx import _set_process_signal_handler from .core_avx import _throw_error_if_process_failed + from .core_avx import _convert_to_tensor_list + from .core_avx import _cleanup_mmap_fds + from .core_avx import _remove_tensor_list_mmap_fds except Exception as e: if has_avx_core: raise e @@ -230,6 +233,9 @@ if load_noavx: from .core_noavx import _erase_process_pid from .core_noavx import _set_process_signal_handler from .core_noavx import _throw_error_if_process_failed + from .core_noavx import _convert_to_tensor_list + from .core_noavx import _cleanup_mmap_fds + from .core_noavx import _remove_tensor_list_mmap_fds except Exception as e: if has_noavx_core: sys.stderr.write( diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index b6723bfe905..ef99b82b02b 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -27,6 +27,8 @@ import logging from .dataset import DatasetBase, InMemoryDataset ### Dygraph DataLoader configs ### +import atexit +import os import multiprocessing import signal # NOTE: queue has a different name in python2 and python3 @@ -34,9 +36,13 @@ if sys.version_info[0] == 2: import Queue as queue else: import queue -# NOTE: [ avoid hanging ] These value is used in getting data from another process -QUEUE_GET_TIMEOUT = 5 -MAX_GET_FAILED_TIME = 12 +# NOTE: [ avoid hanging & failed quickly ] These value is used in getting data from another process +QUEUE_GET_TIMEOUT = 60 + +# NOTE: [ mmap files clear ] If there is still data in the multiprocess queue when the main process finishes reading, +# the data in the queue needs to be popped. Then the LoDTensor read by the main process +# from the child process will automatically clear the memory-mapped file. +multiprocess_queue_set = set() __all__ = ['PyReader', 'DataLoader'] @@ -58,6 +64,82 @@ def _convert_places(places): return ret +def _clear_multiprocess_queue_set(): + global multiprocess_queue_set + for data_queue in multiprocess_queue_set: + while True: + try: + data_queue.get_nowait() + except queue.Empty: + break + + +# NOTE: main process clear function at exit +def _cleanup(): + # NOTE: inter-process Queue shared memory objects clear function + _clear_multiprocess_queue_set() + # NOTE: main process memory map files clear funciton + core._cleanup_mmap_fds() + + +# NOTE used for register a function to be executed at interpreter exit. +class CleanupFuncRegistrar(): + # Record the cleanup functions that have been executed + _executed_func_set = set() + # Record the cleanup functions that have been registered + _registered_func_set = set() + + @classmethod + def register(cls, function, signals=[signal.SIGTERM]): + def _func_exectuor(): + if function not in cls._executed_func_set: + try: + function() + finally: + cls._executed_func_set.add(function) + + def _func_register(function): + if not callable(function): + raise TypeError("%s is not callable object." % (function)) + # check function object whether hash-able + set([function]) + if function not in cls._registered_func_set: + atexit.register(_func_exectuor) + cls._registered_func_set.add(function) + + def _signal_handler(signum=None, frame=None): + _func_exectuor() + if signum is not None: + if signum == signal.SIGINT: + raise KeyboardInterrupt + sys.exit(signum) + + def _signal_register(signals): + signals = set(signals) + for sig in signals: + orig_handler = signal.signal(sig, _signal_handler) + if orig_handler not in (signal.SIG_DFL, signal.SIG_IGN): + if (sig == signal.SIGINT and + orig_handler is signal.default_int_handler): + continue + if orig_handler not in cls._registered_func_set: + atexit.register(orig_handler) + cls._registered_func_set.add(orig_handler) + + # deal with signals + _signal_register(signals) + # deal with function + _func_register(function) + + +# NOTE: [ mmap files clear ] When the main process exits unexpectedly, the remaining +# shared memory objects in the inter-process Queue and the main process (mostly in the +# BlockingQueue) may not be completely released, resulting in the corresponding +# memory-mapped file remaining on the disk (/dev/shm), so register this function +# to clean up shared memory objects in these two queues before the python interpreter exits. +CleanupFuncRegistrar.register(_cleanup) + + class DataLoaderBase(object): def __init__(self): self._places = None @@ -383,6 +465,16 @@ class DygraphGeneratorLoader(DataLoaderBase): def iterable(self): return self._iterable + def _clear_and_remove_data_queue(self): + if self._data_queue is not None: + while True: + try: + self._data_queue.get_nowait() + except queue.Empty: + break + global multiprocess_queue_set + multiprocess_queue_set.remove(self._data_queue) + def _wait_thread_ends(self): thread = self._thread if thread is not None: @@ -392,12 +484,25 @@ class DygraphGeneratorLoader(DataLoaderBase): def _wait_process_ends(self): process = self._process if process is not None: - self._data_queue.cancel_join_thread() - self._data_queue.close() process.join() # erase process id core._erase_process_pid(id(self)) + def _set_child_signal_handler(self): + core._set_process_pid(id(self), self._process.pid) + current_handler = signal.getsignal(signal.SIGCHLD) + if not callable(current_handler): + current_handler = None + + def __handler__(signum, frame): + # NOTE: Here the signum is SIGDHLD, when the child process exits, this handler + # will be called whenever the child process exits normally or abnormally. + core._throw_error_if_process_failed() + if current_handler is not None: + current_handler(signum, frame) + + signal.signal(signal.SIGCHLD, __handler__) + def _init_iterable(self): self._wait_thread_ends() if self._use_multiprocess: @@ -414,8 +519,13 @@ class DygraphGeneratorLoader(DataLoaderBase): def _start(self): if self._use_multiprocess: - # Set data_queue and process + # clear old _data_queue and remove it from multiprocess_queue_set + self._clear_and_remove_data_queue() + # set data_queue and process self._data_queue = multiprocessing.Queue(self._capacity) + # add _data_queue into global queue set + global multiprocess_queue_set + multiprocess_queue_set.add(self._data_queue) self._process = multiprocessing.Process( target=self._reader_process_loop) self._process.daemon = True @@ -473,28 +583,13 @@ class DygraphGeneratorLoader(DataLoaderBase): " to locate the data causes this issue.\n\t* Please consider using " "'fluid.create_lod_tensor' to convert it to a LoD-Tensor.") - def _set_child_signal_handler(self): - core._set_process_pid(id(self), self._process.pid) - current_handler = signal.getsignal(signal.SIGCHLD) - if not callable(current_handler): - current_handler = None - - def __handler__(signum, frame): - core._throw_error_if_process_failed() - if current_handler is not None: - current_handler(signum, frame) - - signal.signal(signal.SIGCHLD, __handler__) - def _exit_thread_expectedly(self): self._thread_done_event.set() self._blocking_queue.close() - self._data_queue.close() def _exit_thread_unexpectedly(self): self._thread_done_event.set() self._blocking_queue.kill() - self._data_queue.close() logging.error("DataLoader reader thread raised an exception!") def _reader_process_loop(self): @@ -502,23 +597,29 @@ class DygraphGeneratorLoader(DataLoaderBase): # set signal handler core._set_process_signal_handler() - for sample in self._batch_reader(): - if sample is None: - raise ValueError( - "Sample in reader is None. Please check whether your dataset is valid." - ) - self._data_queue.put(sample) + # child process clear function at exit + def _cleanup(): + # clear memory map files in child process + core._cleanup_mmap_fds() + + # NOTE: [ mmap files clear ] When the child process exits unexpectedly, + # some shared memory objects may have been applied for but have not yet + # been put into the inter-process Queue. This part of the object needs + # to be cleaned up when the process ends. + CleanupFuncRegistrar.register(_cleanup) + + for batch in self._batch_reader(): + tensor_list = core._convert_to_tensor_list(batch) + self._data_queue.put(tensor_list) + core._remove_tensor_list_mmap_fds(tensor_list) self._data_queue.put(None) except KeyboardInterrupt: # NOTE: Main process will raise KeyboardInterrupt anyways, ignore it in child process pass except: - self._data_queue.cancel_join_thread() - self._data_queue.close() six.reraise(*sys.exc_info()) def _reader_thread_loop_with_process(self): - get_sample_try_time = 0 while not self._thread_done_event.is_set(): try: # NOTE: [ avoid hanging ] Even with carefully designed data dependencies @@ -526,33 +627,25 @@ class DygraphGeneratorLoader(DataLoaderBase): # still happen when data in queue is corrupted (e.g., due to # Queue.cancel_join_thread or unexpected exit). So we set a timeout whenever # we try to get data from `data_queue` - sample = self._data_queue.get(timeout=QUEUE_GET_TIMEOUT) - get_sample_try_time = 0 + # NOTE: [ avoid failed quickly ] Here, the time setting of QUEUE_GET_TIMEOUT + # is relatively long, currently it is 60 seconds, because in some models, + # if the reader child process starts with a heavy burden, the child process + # has no enough time to put the data in the queue when the main process + # start trying to get data from queue. At this time, the child thread needs + # to wait slightly longer + tensor_list = self._data_queue.get(timeout=QUEUE_GET_TIMEOUT) except queue.Empty: - get_sample_try_time += 1 - if get_sample_try_time > MAX_GET_FAILED_TIME: - self._exit_thread_unexpectedly() - raise RuntimeError( - "DataLoader reader thread has not read data for a long time (60s)." - ) - else: - # NOTE: [ avoid failed quickly ] Sometimes if the reader child process has a heavy burden, - # the child process has no enough time to put the data in the queue when the main process - # start trying to get data from queue. At this time, failure to read data should not be - # counted as a fatal error, there should be a certain number of attempts. - continue + self._exit_thread_unexpectedly() + raise RuntimeError( + "DataLoader reader thread has not read data for a long time (60s)." + ) if not self._thread_done_event.is_set(): - if sample is not None: + if tensor_list is not None: try: array = core.LoDTensorArray() - for item in sample: - if not isinstance(item, core.LoDTensor): - self._check_input_array(item) - tmp = core.LoDTensor() - tmp.set(item, core.CPUPlace()) - item = tmp - array.append(item) + for tensor in tensor_list: + array.append(tensor) if not self._blocking_queue.push(array): self._blocking_queue.close() except: diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 3a503396ea3..7bd2eabaf53 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -200,6 +200,8 @@ if (APPLE OR WIN32) list(REMOVE_ITEM TEST_OPS test_imperative_data_loader) list(REMOVE_ITEM TEST_OPS test_imperative_data_loader_exception) list(REMOVE_ITEM TEST_OPS test_imperative_data_loader_process) + list(REMOVE_ITEM TEST_OPS test_imperative_data_loader_fds_clear) + list(REMOVE_ITEM TEST_OPS test_imperative_data_loader_exit_func) list(REMOVE_ITEM TEST_OPS test_imperative_signal_handler) endif() diff --git a/python/paddle/fluid/tests/unittests/test_imperative_data_loader.py b/python/paddle/fluid/tests/unittests/test_imperative_data_loader.py index 50a7e21609b..eabcdc4b7c7 100644 --- a/python/paddle/fluid/tests/unittests/test_imperative_data_loader.py +++ b/python/paddle/fluid/tests/unittests/test_imperative_data_loader.py @@ -44,7 +44,7 @@ def batch_generator_creator(batch_size, batch_num): return __reader__ -class TestDygraphhDataLoader(unittest.TestCase): +class TestDygraphDataLoader(unittest.TestCase): def setUp(self): self.batch_size = 8 self.batch_num = 4 diff --git a/python/paddle/fluid/tests/unittests/test_imperative_data_loader_exception.py b/python/paddle/fluid/tests/unittests/test_imperative_data_loader_exception.py index ca3995d602e..d317d943dca 100644 --- a/python/paddle/fluid/tests/unittests/test_imperative_data_loader_exception.py +++ b/python/paddle/fluid/tests/unittests/test_imperative_data_loader_exception.py @@ -27,7 +27,7 @@ def get_random_images_and_labels(image_shape, label_shape): return image, label -class TestDygraphhDataLoaderWithException(unittest.TestCase): +class TestDygraphDataLoaderWithException(unittest.TestCase): def setUp(self): self.batch_size = 8 self.batch_num = 4 @@ -63,7 +63,7 @@ class TestDygraphhDataLoaderWithException(unittest.TestCase): exception = ex self.assertIsNotNone(exception) - def test_multi_process_with_thread_expection(self): + def test_multi_process_with_process_expection(self): def error_sample_genarator(batch_num): def __reader__(): for _ in range(batch_num): @@ -81,8 +81,6 @@ class TestDygraphhDataLoaderWithException(unittest.TestCase): for _ in loader(): print("test_multi_process_with_thread_expection") except core.EnforceNotMet as ex: - self.assertIn("Blocking queue is killed", - cpt.get_exception_message(ex)) exception = ex self.assertIsNotNone(exception) diff --git a/python/paddle/fluid/tests/unittests/test_imperative_data_loader_exit_func.py b/python/paddle/fluid/tests/unittests/test_imperative_data_loader_exit_func.py new file mode 100644 index 00000000000..ba98a343a45 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_imperative_data_loader_exit_func.py @@ -0,0 +1,84 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import signal +import unittest +import multiprocessing +import time + +import paddle.compat as cpt + +if sys.version_info[0] == 2: + import Queue as queue +else: + import queue + +from paddle.fluid.reader import multiprocess_queue_set, _cleanup, CleanupFuncRegistrar + +# NOTE: These special functions cannot be detected by the existing coverage mechanism, +# so the following unittests are added for these internal functions. + + +class TestDygraphDataLoaderCleanUpFunc(unittest.TestCase): + def setUp(self): + self.capacity = 10 + + def test_clear_queue_set(self): + test_queue = queue.Queue(self.capacity) + global multiprocess_queue_set + multiprocess_queue_set.add(test_queue) + for i in range(0, self.capacity): + test_queue.put(i) + _cleanup() + + +class TestRegisterExitFunc(unittest.TestCase): + # This function does not need to be implemented in this case + def none_func(self): + pass + + def test_not_callable_func(self): + exception = None + try: + CleanupFuncRegistrar.register(5) + except TypeError as ex: + self.assertIn("is not callable", cpt.get_exception_message(ex)) + exception = ex + self.assertIsNotNone(exception) + + def test_old_handler_for_sigint(self): + CleanupFuncRegistrar.register( + function=self.none_func, signals=[signal.SIGINT]) + + def test_signal_wrapper_by_sigchld(self): + # This function does not need to be implemented in this case + def __test_process__(): + pass + + CleanupFuncRegistrar.register( + function=self.none_func, signals=[signal.SIGCHLD]) + + exception = None + try: + test_process = multiprocessing.Process(target=__test_process__) + test_process.start() + time.sleep(3) + except SystemExit as ex: + exception = ex + self.assertIsNotNone(exception) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_imperative_data_loader_fds_clear.py b/python/paddle/fluid/tests/unittests/test_imperative_data_loader_fds_clear.py new file mode 100644 index 00000000000..664d4078d2c --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_imperative_data_loader_fds_clear.py @@ -0,0 +1,78 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import unittest +import numpy as np +import paddle.fluid as fluid +from paddle.fluid import core + + +def get_random_images_and_labels(image_shape, label_shape): + image = np.random.random(size=image_shape).astype('float32') + label = np.random.random(size=label_shape).astype('int64') + return image, label + + +def batch_generator_creator(batch_size, batch_num): + def __reader__(): + for _ in range(batch_num): + batch_image, batch_label = get_random_images_and_labels( + [batch_size, 784], [batch_size, 1]) + yield batch_image, batch_label + + return __reader__ + + +class TestDygraphDataLoaderMmapFdsClear(unittest.TestCase): + def setUp(self): + self.batch_size = 8 + self.batch_num = 100 + self.epoch_num = 2 + self.capacity = 50 + + def prepare_data_loader(self): + loader = fluid.io.DataLoader.from_generator( + capacity=self.capacity, use_multiprocess=True) + loader.set_batch_generator( + batch_generator_creator(self.batch_size, self.batch_num), + places=fluid.CPUPlace()) + return loader + + def run_one_epoch_with_break(self, loader): + for step_id, data in enumerate(loader()): + image, label = data + relu = fluid.layers.relu(image) + self.assertEqual(image.shape, [self.batch_size, 784]) + self.assertEqual(label.shape, [self.batch_size, 1]) + self.assertEqual(relu.shape, [self.batch_size, 784]) + if step_id == 30: + break + + def test_data_loader_break(self): + with fluid.dygraph.guard(): + loader = self.prepare_data_loader() + for _ in range(self.epoch_num): + self.run_one_epoch_with_break(loader) + break + + def test_data_loader_continue_break(self): + with fluid.dygraph.guard(): + loader = self.prepare_data_loader() + for _ in range(self.epoch_num): + self.run_one_epoch_with_break(loader) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_imperative_data_loader_process.py b/python/paddle/fluid/tests/unittests/test_imperative_data_loader_process.py index 8ca14607419..051cb3b5bca 100644 --- a/python/paddle/fluid/tests/unittests/test_imperative_data_loader_process.py +++ b/python/paddle/fluid/tests/unittests/test_imperative_data_loader_process.py @@ -16,6 +16,7 @@ import sys import unittest import numpy as np import paddle.fluid as fluid +from paddle.fluid import core if sys.version_info[0] == 2: import Queue as queue @@ -41,7 +42,7 @@ def batch_generator_creator(batch_size, batch_num): # NOTE: coverage CI can't cover child process code, so need these test. # Here test child process loop function in main process -class TestDygraphhDataLoaderProcess(unittest.TestCase): +class TestDygraphDataLoaderProcess(unittest.TestCase): def setUp(self): self.batch_size = 8 self.batch_num = 4 @@ -77,7 +78,7 @@ class TestDygraphhDataLoaderProcess(unittest.TestCase): exception = None try: loader._reader_process_loop() - except AttributeError as ex: + except core.EnforceNotMet as ex: exception = ex self.assertIsNotNone(exception) -- GitLab