diff --git a/paddle/fluid/memory/allocation/mmap_allocator.cc b/paddle/fluid/memory/allocation/mmap_allocator.cc index 557a9cf333a3b95caf60065c5a337e989493f61e..55529b58aeb5f1cb4efe6aedd9c582b5f174e4c3 100644 --- a/paddle/fluid/memory/allocation/mmap_allocator.cc +++ b/paddle/fluid/memory/allocation/mmap_allocator.cc @@ -26,6 +26,8 @@ #include "glog/logging.h" #include "paddle/fluid/platform/enforce.h" +DECLARE_bool(use_shm_cache); + namespace paddle { namespace memory { namespace allocation { @@ -111,20 +113,33 @@ void AllocateMemoryMap( std::shared_ptr AllocateRefcountedMemoryMapAllocation(std::string filename, int flags, - size_t size) { + size_t size, + int buffer_id) { int fd = -1; void *base_ptr = nullptr; - AllocateMemoryMap(filename, flags, size + mmap_alignment, &base_ptr, &fd); + if (buffer_id == -1) { + AllocateMemoryMap(filename, flags, size + mmap_alignment, &base_ptr, &fd); + VLOG(4) << "Create and mmap a new shm: " << filename; + } else { + base_ptr = MemoryMapAllocationPool::Instance().GetById(buffer_id).mmap_ptr_; + VLOG(4) << "Get a cached shm " << filename; + } void *aliged_base_ptr = static_cast(static_cast(base_ptr) + mmap_alignment); return std::make_shared( - aliged_base_ptr, size, filename, flags, fd); + aliged_base_ptr, size, filename, flags, fd, buffer_id); } RefcountedMemoryMapAllocation::RefcountedMemoryMapAllocation( - void *ptr, size_t size, std::string ipc_name, int fd, int flags) + void *ptr, + size_t size, + std::string ipc_name, + int fd, + int flags, + int buffer_id) : MemoryMapAllocation(ptr, size, ipc_name, fd, flags) { // must reset base ptr first. + buffer_id_ = buffer_id; resetBaseptr(); initializeRefercount(); } @@ -165,25 +180,40 @@ void RefcountedMemoryMapAllocation::initializeRefercount() { } void RefcountedMemoryMapAllocation::close() { + VLOG(4) << "Close a RefcountedMemoryMapAllocation: " << ipc_name_; if (closed_) { return; } closed_ = true; void *data = map_ptr_; CountInfo *info = reinterpret_cast(data); - if (--info->refcount == 0) { - shm_unlink(ipc_name_.c_str()); - VLOG(6) << "shm_unlink file: " << ipc_name_; + --info->refcount; + if (FLAGS_use_shm_cache && buffer_id_ != -1) { + return; + } else { + if (FLAGS_use_shm_cache && + MemoryMapAllocationPool::Instance().BufferSize() < + static_cast( + MemoryMapAllocationPool::Instance().MaxPoolSize())) { + MemoryMapAllocationPool::Instance().Insert(MemoryMapInfo( + flags_, map_size_ - mmap_alignment, ipc_name_, map_ptr_)); + } else { + if (info->refcount == 0 && + shm_open(ipc_name_.c_str(), O_RDWR, (mode_t)0600) != -1) { + shm_unlink(ipc_name_.c_str()); + VLOG(6) << "shm_unlink file: " << ipc_name_; + } + + PADDLE_ENFORCE_NE(munmap(map_ptr_, map_size_), + -1, + platform::errors::Unavailable( + "could not unmap the shared memory file: ", + strerror(errno), + " (", + errno, + ")")); + } } - - PADDLE_ENFORCE_NE( - munmap(map_ptr_, map_size_), - -1, - platform::errors::Unavailable("could not unmap the shared memory file: ", - strerror(errno), - " (", - errno, - ")")); } MemoryMapWriterAllocation::~MemoryMapWriterAllocation() { @@ -299,6 +329,67 @@ void MemoryMapFdSet::Clear() { MemoryMapFdSet::~MemoryMapFdSet() { Clear(); } +MemoryMapAllocationPool *MemoryMapAllocationPool::pool_ = nullptr; + +void MemoryMapAllocationPool::Insert(const MemoryMapInfo &memory_map) { + std::lock_guard guard(mtx_); + memory_map_allocations_.push_back(memory_map); + VLOG(4) << this << "Intsert a new shm: " << memory_map.file_name_; +} + +int MemoryMapAllocationPool::FindFromCache(const int &flag, + const size_t &data_size, + const std::string &file_name, + bool check_refcount) { + std::lock_guard guard(mtx_); + for (size_t idx = 0; idx < memory_map_allocations_.size(); idx++) { + if (memory_map_allocations_.at(idx).flags_ == flag && + memory_map_allocations_.at(idx).data_size_ == data_size) { + if (file_name == "" || + memory_map_allocations_.at(idx).file_name_ == file_name) { + if (!check_refcount || reinterpret_cast( + memory_map_allocations_.at(idx).mmap_ptr_) + ->refcount == 0) { + VLOG(4) << "Match at: " << idx; + return idx; + } + } + } + } + return -1; +} + +const MemoryMapInfo &MemoryMapAllocationPool::GetById(int id) { + std::lock_guard guard(mtx_); + return memory_map_allocations_.at(id); +} + +void MemoryMapAllocationPool::SetMaxPoolSize(const int &size) { + max_pool_size_ = size; + VLOG(4) << this << "Set max pool size is: " << max_pool_size_; +} + +void MemoryMapAllocationPool::Clear() { + std::lock_guard guard(mtx_); + for (auto mmap : memory_map_allocations_) { + int rlt = shm_unlink(mmap.file_name_.c_str()); + if (rlt == 0) { + VLOG(4) << "MemoryMapAllocationPool: clear " << mmap.file_name_; + } + PADDLE_ENFORCE_NE(munmap(mmap.mmap_ptr_, mmap.data_size_ + mmap_alignment), + -1, + platform::errors::Unavailable( + "could not unmap the shared memory file: ", + strerror(errno), + " (", + errno, + ")")); + } + memory_map_allocations_.clear(); +} + +MemoryMapAllocationPool::~MemoryMapAllocationPool() { Clear(); } + } // namespace allocation } // namespace memory } // namespace paddle diff --git a/paddle/fluid/memory/allocation/mmap_allocator.h b/paddle/fluid/memory/allocation/mmap_allocator.h index 3fc5d2d1891f245c0096d9a49b76121a2b13ce16..412e3a3545769dc09542b4fd921fc1d44bae2de5 100644 --- a/paddle/fluid/memory/allocation/mmap_allocator.h +++ b/paddle/fluid/memory/allocation/mmap_allocator.h @@ -75,8 +75,12 @@ class MemoryMapAllocation : public Allocation { class RefcountedMemoryMapAllocation : public MemoryMapAllocation { public: - RefcountedMemoryMapAllocation( - void *ptr, size_t size, std::string ipc_name, int flags, int fd); + RefcountedMemoryMapAllocation(void *ptr, + size_t size, + std::string ipc_name, + int flags, + int fd, + int buffer_id = -1); void incref(); int decref(); @@ -84,6 +88,7 @@ class RefcountedMemoryMapAllocation : public MemoryMapAllocation { virtual ~RefcountedMemoryMapAllocation() { close(); } protected: + int buffer_id_ = -1; void initializeRefercount(); void resetBaseptr(); }; @@ -94,7 +99,8 @@ void AllocateMemoryMap( std::shared_ptr AllocateRefcountedMemoryMapAllocation(std::string filename, int flags, - size_t size); + size_t size, + int buffer_id = -1); class MemoryMapWriterAllocation : public Allocation { public: @@ -153,6 +159,68 @@ class MemoryMapFdSet { std::mutex mtx_; }; +class MemoryMapInfo { + public: + explicit MemoryMapInfo(int flags, + size_t data_size, + std::string file_name, + void *mmap_ptr) + : flags_(flags), + data_size_(data_size), + file_name_(file_name), + mmap_ptr_(mmap_ptr) {} + + int flags_ = 0; + size_t data_size_ = 0; + std::string file_name_; + void *mmap_ptr_ = nullptr; +}; + +/* Note(zhangbo): +MemoryMapAllocationPool is used to cache and reuse shm, thus reducing munmap in +dataloader. The munmap(shm_mmap_ptr) instruction in +RefcountedMemoryMapAllocation::close() function may block other threads of the +process. Therefore, the logic of shm cache and reuse is designed: the shm +created by the _share_filename process will be cached and reused according to +the data_size of shm, thus eliminating the problem of munmap blocking other +threads +*/ +class MemoryMapAllocationPool { + public: + static MemoryMapAllocationPool &Instance() { + if (pool_ == nullptr) { + pool_ = new MemoryMapAllocationPool(); + } + return *pool_; + } + + void Insert(const MemoryMapInfo &memory_map); + + int FindFromCache(const int &flag, + const size_t &data_size, + const std::string &file_name = "", + bool check_refcount = true); + + const MemoryMapInfo &GetById(int id); + + size_t BufferSize() { return memory_map_allocations_.size(); } + + void Clear(); + + void SetMaxPoolSize(const int &size); + + int MaxPoolSize() { return max_pool_size_; } + + ~MemoryMapAllocationPool(); + + private: + MemoryMapAllocationPool() = default; + static MemoryMapAllocationPool *pool_; + std::vector memory_map_allocations_; + int max_pool_size_ = 0; + std::mutex mtx_; +}; + } // namespace allocation } // namespace memory } // namespace paddle diff --git a/paddle/fluid/pybind/imperative.cc b/paddle/fluid/pybind/imperative.cc index 9262fec62b989d3643904cc33658a68f795c5f74..e01044720571d088899a8e60b02b6bd8bb5304c1 100644 --- a/paddle/fluid/pybind/imperative.cc +++ b/paddle/fluid/pybind/imperative.cc @@ -624,6 +624,11 @@ void BindImperative(py::module *m_ptr) { m.def("_cleanup_mmap_fds", []() { memory::allocation::MemoryMapFdSet::Instance().Clear(); }); + + m.def("_set_max_memory_map_allocation_pool_size", [](int32_t size) { + memory::allocation::MemoryMapAllocationPool::Instance().SetMaxPoolSize( + size); + }); #endif m.def("start_imperative_gperf_profiler", diff --git a/paddle/fluid/pybind/tensor.cc b/paddle/fluid/pybind/tensor.cc index 8739b32965b0dea4e163091f6cbed321ba9590d4..4bdde24f431bc830171ea4d3c02d1064da67926b 100644 --- a/paddle/fluid/pybind/tensor.cc +++ b/paddle/fluid/pybind/tensor.cc @@ -182,6 +182,7 @@ limitations under the License. */ #include "pybind11/stl.h" DECLARE_bool(use_mkldnn); +DECLARE_bool(use_shm_cache); // disable auto conversion to list in Python PYBIND11_MAKE_OPAQUE(paddle::framework::LoDTensorArray); @@ -910,9 +911,16 @@ void BindTensor(pybind11::module &m) { // NOLINT int flags = memory::allocation::MAPPED_SHAREDMEM | memory::allocation::MAPPED_EXCLUSIVE; std::string handle = memory::allocation::GetIPCName(); + int find_id = -1; + if (FLAGS_use_shm_cache) { + find_id = memory::allocation::MemoryMapAllocationPool::Instance().FindFromCache(flags, data_size); // NOLINT + } + if (find_id != -1) { + handle = memory::allocation::MemoryMapAllocationPool::Instance().GetById(find_id).file_name_; // NOLINT + } auto shared_holder = memory::allocation::AllocateRefcountedMemoryMapAllocation( - handle, flags, data_size); + handle, flags, data_size, find_id); // copy data & reset holder if (platform::is_cuda_pinned_place(holder->place())) { @@ -961,10 +969,13 @@ void BindTensor(pybind11::module &m) { // NOLINT size_t size = t[1].cast(); int flags = memory::allocation::MAPPED_SHAREDMEM | memory::allocation::MAPPED_NOCREATE; - + int find_id = -1; + if (FLAGS_use_shm_cache) { + find_id = memory::allocation::MemoryMapAllocationPool::Instance().FindFromCache(flags, size, ipc_name, /*check_refcount*/ false); // NOLINT + } auto shared_holder = memory::allocation::AllocateRefcountedMemoryMapAllocation( - ipc_name, flags, size); + ipc_name, flags, size, find_id); // 3. Rebuild Tensor tensor.ResetHolderWithType( diff --git a/paddle/phi/core/flags.cc b/paddle/phi/core/flags.cc index 43da2ecb7bb6e1ff532020bd636c80276cfa128d..8b5a78575d220d7d04c7f1046237d79053f8cb30 100644 --- a/paddle/phi/core/flags.cc +++ b/paddle/phi/core/flags.cc @@ -1193,3 +1193,16 @@ PADDLE_DEFINE_EXPORTED_int32(cudnn_cache_saturation_count, 1, ""); PADDLE_DEFINE_EXPORTED_bool(trt_ibuilder_cache, false, "Add a persistent ibuilder."); + +/** + * mmap_allocator related FLAG + * Name: use_shm_cache + * Since Version: 2.5.0 + * Value Range: bool, default=true + * Example: + * Note: . If True, mmap_allocator will cache shm file to decrease munmap + * operation. + */ +PADDLE_DEFINE_EXPORTED_bool(use_shm_cache, + true, + "Use shm cache in mmap_allocator."); diff --git a/python/paddle/fluid/core.py b/python/paddle/fluid/core.py index 09e079ca58327a3610e29ce4b85f4f563e86e63c..771caa4ef3c4fa822dd40d0077b90e778bbbb5d6 100644 --- a/python/paddle/fluid/core.py +++ b/python/paddle/fluid/core.py @@ -317,6 +317,7 @@ try: from .libpaddle import _array_to_share_memory_tensor from .libpaddle import _cleanup_mmap_fds from .libpaddle import _remove_tensor_list_mmap_fds + from .libpaddle import _set_max_memory_map_allocation_pool_size except Exception as e: if has_paddle_dy_lib: sys.stderr.write( diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index fc1effbd89c7afd704a3b6bb657dc9c11d395312..c7c49c794a1017b1788a489cde6030d4ca374c1b 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -20,6 +20,7 @@ import numbers import logging import itertools import threading +import warnings import numpy as np from collections import namedtuple from paddle.fluid.framework import ( @@ -406,6 +407,20 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase): self._base_seed = np.random.randint(low=0, high=sys.maxsize) + # Note(zhangbo): shm_buffer_size is used for MemoryMapAllocationPool. + # MemoryMapAllocationPool is used to cache and reuse shm, thus reducing munmap in dataloader. + # For more details, please see: paddle/fluid/memory/allocation/mmap_allocator.h + try: + self._worker_shm_buffer_size = (2 + 1) * len(self._dataset[0]) + except: + self._worker_shm_buffer_size = 0 + warnings.warn( + "Setting the shm cache buffer size to 0, equivalent to not using the shm cache policy." + ) + self._main_thread_shm_buffer_size = ( + (self._worker_shm_buffer_size) * 2 * self._num_workers + ) + # init workers and indices queues and put 2 indices in each indices queue self._init_workers() for _ in range(self._outstanding_capacity): @@ -450,6 +465,7 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase): self._num_workers, self._use_shared_memory, self._base_seed, + self._worker_shm_buffer_size, ), ) worker.daemon = True @@ -481,6 +497,9 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase): self._blocking_queue = core.init_lod_tensor_blocking_queue( core.Variable(), self._outstanding_capacity, len(self._places) > 1 ) + core._set_max_memory_map_allocation_pool_size( + self._main_thread_shm_buffer_size + ) self._reader = core.create_py_reader( self._blocking_queue, self._var_names, diff --git a/python/paddle/fluid/dataloader/worker.py b/python/paddle/fluid/dataloader/worker.py index e63a5d5f9344142626cf6f8093e509183b92d732..f486e80d746ea7de44750abea89b1ebece6f62f3 100644 --- a/python/paddle/fluid/dataloader/worker.py +++ b/python/paddle/fluid/dataloader/worker.py @@ -275,6 +275,7 @@ def _worker_loop( num_workers, use_shared_memory, base_seed, + shm_cahce_size=0, ): try: # NOTE: [ mmap files clear ] When the child process exits unexpectedly, @@ -286,6 +287,8 @@ def _worker_loop( # set signal handler core._set_process_signal_handler() + core._set_max_memory_map_allocation_pool_size(shm_cahce_size) + # set different numpy seed for each worker try: import numpy as np