未验证 提交 3fdc105f 编写于 作者: Z zhangbo9674 提交者: GitHub

Refine munmap freq for RefcountedMemoryMapAllocation (#49691)

* refine munmap freq for ref_cnt_mmap_allocator

* add shm reuse logic

* fix compile bug

* fix compile bug

* fix bug of file refcount

* fix compile bug

* fix compile bug

* refine code for delete shm case

* polish code

* refine shm cache pool size setting logic

* set buffer is 2

* refine shm cache size logic

* refine max shm cache

* refine shm cache size
上级 18745e6f
......@@ -26,6 +26,8 @@
#include "glog/logging.h"
#include "paddle/fluid/platform/enforce.h"
DECLARE_bool(use_shm_cache);
namespace paddle {
namespace memory {
namespace allocation {
......@@ -111,20 +113,33 @@ void AllocateMemoryMap(
std::shared_ptr<RefcountedMemoryMapAllocation>
AllocateRefcountedMemoryMapAllocation(std::string filename,
int flags,
size_t size) {
size_t size,
int buffer_id) {
int fd = -1;
void *base_ptr = nullptr;
if (buffer_id == -1) {
AllocateMemoryMap(filename, flags, size + mmap_alignment, &base_ptr, &fd);
VLOG(4) << "Create and mmap a new shm: " << filename;
} else {
base_ptr = MemoryMapAllocationPool::Instance().GetById(buffer_id).mmap_ptr_;
VLOG(4) << "Get a cached shm " << filename;
}
void *aliged_base_ptr =
static_cast<void *>(static_cast<char *>(base_ptr) + mmap_alignment);
return std::make_shared<RefcountedMemoryMapAllocation>(
aliged_base_ptr, size, filename, flags, fd);
aliged_base_ptr, size, filename, flags, fd, buffer_id);
}
RefcountedMemoryMapAllocation::RefcountedMemoryMapAllocation(
void *ptr, size_t size, std::string ipc_name, int fd, int flags)
void *ptr,
size_t size,
std::string ipc_name,
int fd,
int flags,
int buffer_id)
: MemoryMapAllocation(ptr, size, ipc_name, fd, flags) {
// must reset base ptr first.
buffer_id_ = buffer_id;
resetBaseptr();
initializeRefercount();
}
......@@ -165,25 +180,40 @@ void RefcountedMemoryMapAllocation::initializeRefercount() {
}
void RefcountedMemoryMapAllocation::close() {
VLOG(4) << "Close a RefcountedMemoryMapAllocation: " << ipc_name_;
if (closed_) {
return;
}
closed_ = true;
void *data = map_ptr_;
CountInfo *info = reinterpret_cast<CountInfo *>(data);
if (--info->refcount == 0) {
--info->refcount;
if (FLAGS_use_shm_cache && buffer_id_ != -1) {
return;
} else {
if (FLAGS_use_shm_cache &&
MemoryMapAllocationPool::Instance().BufferSize() <
static_cast<size_t>(
MemoryMapAllocationPool::Instance().MaxPoolSize())) {
MemoryMapAllocationPool::Instance().Insert(MemoryMapInfo(
flags_, map_size_ - mmap_alignment, ipc_name_, map_ptr_));
} else {
if (info->refcount == 0 &&
shm_open(ipc_name_.c_str(), O_RDWR, (mode_t)0600) != -1) {
shm_unlink(ipc_name_.c_str());
VLOG(6) << "shm_unlink file: " << ipc_name_;
}
PADDLE_ENFORCE_NE(
munmap(map_ptr_, map_size_),
PADDLE_ENFORCE_NE(munmap(map_ptr_, map_size_),
-1,
platform::errors::Unavailable("could not unmap the shared memory file: ",
platform::errors::Unavailable(
"could not unmap the shared memory file: ",
strerror(errno),
" (",
errno,
")"));
}
}
}
MemoryMapWriterAllocation::~MemoryMapWriterAllocation() {
......@@ -299,6 +329,67 @@ void MemoryMapFdSet::Clear() {
MemoryMapFdSet::~MemoryMapFdSet() { Clear(); }
MemoryMapAllocationPool *MemoryMapAllocationPool::pool_ = nullptr;
void MemoryMapAllocationPool::Insert(const MemoryMapInfo &memory_map) {
std::lock_guard<std::mutex> guard(mtx_);
memory_map_allocations_.push_back(memory_map);
VLOG(4) << this << "Intsert a new shm: " << memory_map.file_name_;
}
int MemoryMapAllocationPool::FindFromCache(const int &flag,
const size_t &data_size,
const std::string &file_name,
bool check_refcount) {
std::lock_guard<std::mutex> guard(mtx_);
for (size_t idx = 0; idx < memory_map_allocations_.size(); idx++) {
if (memory_map_allocations_.at(idx).flags_ == flag &&
memory_map_allocations_.at(idx).data_size_ == data_size) {
if (file_name == "" ||
memory_map_allocations_.at(idx).file_name_ == file_name) {
if (!check_refcount || reinterpret_cast<CountInfo *>(
memory_map_allocations_.at(idx).mmap_ptr_)
->refcount == 0) {
VLOG(4) << "Match at: " << idx;
return idx;
}
}
}
}
return -1;
}
const MemoryMapInfo &MemoryMapAllocationPool::GetById(int id) {
std::lock_guard<std::mutex> guard(mtx_);
return memory_map_allocations_.at(id);
}
void MemoryMapAllocationPool::SetMaxPoolSize(const int &size) {
max_pool_size_ = size;
VLOG(4) << this << "Set max pool size is: " << max_pool_size_;
}
void MemoryMapAllocationPool::Clear() {
std::lock_guard<std::mutex> guard(mtx_);
for (auto mmap : memory_map_allocations_) {
int rlt = shm_unlink(mmap.file_name_.c_str());
if (rlt == 0) {
VLOG(4) << "MemoryMapAllocationPool: clear " << mmap.file_name_;
}
PADDLE_ENFORCE_NE(munmap(mmap.mmap_ptr_, mmap.data_size_ + mmap_alignment),
-1,
platform::errors::Unavailable(
"could not unmap the shared memory file: ",
strerror(errno),
" (",
errno,
")"));
}
memory_map_allocations_.clear();
}
MemoryMapAllocationPool::~MemoryMapAllocationPool() { Clear(); }
} // namespace allocation
} // namespace memory
} // namespace paddle
......
......@@ -75,8 +75,12 @@ class MemoryMapAllocation : public Allocation {
class RefcountedMemoryMapAllocation : public MemoryMapAllocation {
public:
RefcountedMemoryMapAllocation(
void *ptr, size_t size, std::string ipc_name, int flags, int fd);
RefcountedMemoryMapAllocation(void *ptr,
size_t size,
std::string ipc_name,
int flags,
int fd,
int buffer_id = -1);
void incref();
int decref();
......@@ -84,6 +88,7 @@ class RefcountedMemoryMapAllocation : public MemoryMapAllocation {
virtual ~RefcountedMemoryMapAllocation() { close(); }
protected:
int buffer_id_ = -1;
void initializeRefercount();
void resetBaseptr();
};
......@@ -94,7 +99,8 @@ void AllocateMemoryMap(
std::shared_ptr<RefcountedMemoryMapAllocation>
AllocateRefcountedMemoryMapAllocation(std::string filename,
int flags,
size_t size);
size_t size,
int buffer_id = -1);
class MemoryMapWriterAllocation : public Allocation {
public:
......@@ -153,6 +159,68 @@ class MemoryMapFdSet {
std::mutex mtx_;
};
class MemoryMapInfo {
public:
explicit MemoryMapInfo(int flags,
size_t data_size,
std::string file_name,
void *mmap_ptr)
: flags_(flags),
data_size_(data_size),
file_name_(file_name),
mmap_ptr_(mmap_ptr) {}
int flags_ = 0;
size_t data_size_ = 0;
std::string file_name_;
void *mmap_ptr_ = nullptr;
};
/* Note(zhangbo):
MemoryMapAllocationPool is used to cache and reuse shm, thus reducing munmap in
dataloader. The munmap(shm_mmap_ptr) instruction in
RefcountedMemoryMapAllocation::close() function may block other threads of the
process. Therefore, the logic of shm cache and reuse is designed: the shm
created by the _share_filename process will be cached and reused according to
the data_size of shm, thus eliminating the problem of munmap blocking other
threads
*/
class MemoryMapAllocationPool {
public:
static MemoryMapAllocationPool &Instance() {
if (pool_ == nullptr) {
pool_ = new MemoryMapAllocationPool();
}
return *pool_;
}
void Insert(const MemoryMapInfo &memory_map);
int FindFromCache(const int &flag,
const size_t &data_size,
const std::string &file_name = "",
bool check_refcount = true);
const MemoryMapInfo &GetById(int id);
size_t BufferSize() { return memory_map_allocations_.size(); }
void Clear();
void SetMaxPoolSize(const int &size);
int MaxPoolSize() { return max_pool_size_; }
~MemoryMapAllocationPool();
private:
MemoryMapAllocationPool() = default;
static MemoryMapAllocationPool *pool_;
std::vector<MemoryMapInfo> memory_map_allocations_;
int max_pool_size_ = 0;
std::mutex mtx_;
};
} // namespace allocation
} // namespace memory
} // namespace paddle
......
......@@ -624,6 +624,11 @@ void BindImperative(py::module *m_ptr) {
m.def("_cleanup_mmap_fds",
[]() { memory::allocation::MemoryMapFdSet::Instance().Clear(); });
m.def("_set_max_memory_map_allocation_pool_size", [](int32_t size) {
memory::allocation::MemoryMapAllocationPool::Instance().SetMaxPoolSize(
size);
});
#endif
m.def("start_imperative_gperf_profiler",
......
......@@ -182,6 +182,7 @@ limitations under the License. */
#include "pybind11/stl.h"
DECLARE_bool(use_mkldnn);
DECLARE_bool(use_shm_cache);
// disable auto conversion to list in Python
PYBIND11_MAKE_OPAQUE(paddle::framework::LoDTensorArray);
......@@ -910,9 +911,16 @@ void BindTensor(pybind11::module &m) { // NOLINT
int flags = memory::allocation::MAPPED_SHAREDMEM |
memory::allocation::MAPPED_EXCLUSIVE;
std::string handle = memory::allocation::GetIPCName();
int find_id = -1;
if (FLAGS_use_shm_cache) {
find_id = memory::allocation::MemoryMapAllocationPool::Instance().FindFromCache(flags, data_size); // NOLINT
}
if (find_id != -1) {
handle = memory::allocation::MemoryMapAllocationPool::Instance().GetById(find_id).file_name_; // NOLINT
}
auto shared_holder =
memory::allocation::AllocateRefcountedMemoryMapAllocation(
handle, flags, data_size);
handle, flags, data_size, find_id);
// copy data & reset holder
if (platform::is_cuda_pinned_place(holder->place())) {
......@@ -961,10 +969,13 @@ void BindTensor(pybind11::module &m) { // NOLINT
size_t size = t[1].cast<size_t>();
int flags = memory::allocation::MAPPED_SHAREDMEM |
memory::allocation::MAPPED_NOCREATE;
int find_id = -1;
if (FLAGS_use_shm_cache) {
find_id = memory::allocation::MemoryMapAllocationPool::Instance().FindFromCache(flags, size, ipc_name, /*check_refcount*/ false); // NOLINT
}
auto shared_holder =
memory::allocation::AllocateRefcountedMemoryMapAllocation(
ipc_name, flags, size);
ipc_name, flags, size, find_id);
// 3. Rebuild Tensor
tensor.ResetHolderWithType(
......
......@@ -1193,3 +1193,16 @@ PADDLE_DEFINE_EXPORTED_int32(cudnn_cache_saturation_count, 1, "");
PADDLE_DEFINE_EXPORTED_bool(trt_ibuilder_cache,
false,
"Add a persistent ibuilder.");
/**
* mmap_allocator related FLAG
* Name: use_shm_cache
* Since Version: 2.5.0
* Value Range: bool, default=true
* Example:
* Note: . If True, mmap_allocator will cache shm file to decrease munmap
* operation.
*/
PADDLE_DEFINE_EXPORTED_bool(use_shm_cache,
true,
"Use shm cache in mmap_allocator.");
......@@ -317,6 +317,7 @@ try:
from .libpaddle import _array_to_share_memory_tensor
from .libpaddle import _cleanup_mmap_fds
from .libpaddle import _remove_tensor_list_mmap_fds
from .libpaddle import _set_max_memory_map_allocation_pool_size
except Exception as e:
if has_paddle_dy_lib:
sys.stderr.write(
......
......@@ -20,6 +20,7 @@ import numbers
import logging
import itertools
import threading
import warnings
import numpy as np
from collections import namedtuple
from paddle.fluid.framework import (
......@@ -406,6 +407,20 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase):
self._base_seed = np.random.randint(low=0, high=sys.maxsize)
# Note(zhangbo): shm_buffer_size is used for MemoryMapAllocationPool.
# MemoryMapAllocationPool is used to cache and reuse shm, thus reducing munmap in dataloader.
# For more details, please see: paddle/fluid/memory/allocation/mmap_allocator.h
try:
self._worker_shm_buffer_size = (2 + 1) * len(self._dataset[0])
except:
self._worker_shm_buffer_size = 0
warnings.warn(
"Setting the shm cache buffer size to 0, equivalent to not using the shm cache policy."
)
self._main_thread_shm_buffer_size = (
(self._worker_shm_buffer_size) * 2 * self._num_workers
)
# init workers and indices queues and put 2 indices in each indices queue
self._init_workers()
for _ in range(self._outstanding_capacity):
......@@ -450,6 +465,7 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase):
self._num_workers,
self._use_shared_memory,
self._base_seed,
self._worker_shm_buffer_size,
),
)
worker.daemon = True
......@@ -481,6 +497,9 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase):
self._blocking_queue = core.init_lod_tensor_blocking_queue(
core.Variable(), self._outstanding_capacity, len(self._places) > 1
)
core._set_max_memory_map_allocation_pool_size(
self._main_thread_shm_buffer_size
)
self._reader = core.create_py_reader(
self._blocking_queue,
self._var_names,
......
......@@ -275,6 +275,7 @@ def _worker_loop(
num_workers,
use_shared_memory,
base_seed,
shm_cahce_size=0,
):
try:
# NOTE: [ mmap files clear ] When the child process exits unexpectedly,
......@@ -286,6 +287,8 @@ def _worker_loop(
# set signal handler
core._set_process_signal_handler()
core._set_max_memory_map_allocation_pool_size(shm_cahce_size)
# set different numpy seed for each worker
try:
import numpy as np
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册