提交 e89b4648 编写于 作者: J Jesse Lee

Memory pool clean up

上级 0c5f7377
......@@ -18,8 +18,7 @@
namespace mindspore {
namespace dataset {
CachedSharedMemoryArena::CachedSharedMemoryArena(int32_t port, size_t val_in_GB)
: Arena::Arena(val_in_GB * 1024), port_(port), shmid_(-1) {}
: ptr_(nullptr), val_in_GB_(val_in_GB), port_(port), shmid_(-1) {}
CachedSharedMemoryArena::~CachedSharedMemoryArena() {
#if CACHE_LOCAL_CLIENT
if (this->ptr_ != nullptr && this->ptr_ != reinterpret_cast<void *>(-1)) {
......@@ -54,18 +53,18 @@ Status CachedSharedMemoryArena::CreateArena(std::unique_ptr<CachedSharedMemoryAr
RETURN_STATUS_UNEXPECTED(errMsg);
}
auto access_mode = S_IRUSR | S_IWUSR | S_IROTH | S_IWOTH | S_IRGRP | S_IWGRP;
ba->shmid_ = shmget(shm_key, ba->size_in_bytes_, IPC_CREAT | IPC_EXCL | access_mode);
// Value is in GB. Convert into bytes.
int64_t sz = val_in_GB * 1073741824L;
ba->shmid_ = shmget(shm_key, sz, IPC_CREAT | IPC_EXCL | access_mode);
if (ba->shmid_) {
ba->ptr_ = shmat(ba->shmid_, nullptr, 0);
if (ba->ptr_ == reinterpret_cast<void *>(-1)) {
RETURN_STATUS_UNEXPECTED("Shared memory attach failed. Errno " + std::to_string(errno));
}
ba->impl_ = std::make_unique<ArenaImpl>(ba->ptr_, sz);
} else {
RETURN_STATUS_UNEXPECTED("Shared memory creation failed. Errno " + std::to_string(errno));
}
uint64_t num_blks = ba->size_in_bytes_ / ARENA_BLK_SZ;
MS_LOG(DEBUG) << "Size of memory pool is " << num_blks << ", number of blocks of size is " << ARENA_BLK_SZ << ".";
ba->tr_.Insert(0, num_blks);
#endif
return Status::OK();
}
......
......@@ -17,14 +17,18 @@
#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_ARENA_H_
#include <memory>
#include <mutex>
#include <string>
#include "minddata/dataset/util/arena.h"
#include "minddata/dataset/engine/cache/cache_common.h"
namespace mindspore {
namespace dataset {
/// This is a derived class of Arena but resides in shared memory
class CachedSharedMemoryArena : public Arena {
class CachedSharedMemoryArena : public MemoryPool {
public:
// Disable copy and assignment constructor
CachedSharedMemoryArena(const CachedSharedMemoryArena &) = delete;
CachedSharedMemoryArena &operator=(const CachedSharedMemoryArena &) = delete;
~CachedSharedMemoryArena() override;
/// \brief Create an Arena in shared memory
/// \param[out] p_ba Pointer to a unique_ptr
......@@ -39,11 +43,41 @@ class CachedSharedMemoryArena : public Arena {
/// in the client. So instead we will return an address relative
/// to the base address of the shared memory where we attach to.
/// \return Base address of the shared memory.
const void *SharedMemoryBaseAddr() const { return this->ptr_; }
const void *SharedMemoryBaseAddr() const { return impl_->get_base_addr(); }
/// As a derived class of MemoryPool, we have to implement the following
/// But we simply transfer the call to the implementation class
Status Allocate(size_t size, void **pVoid) override {
std::unique_lock<std::mutex> lock(mux_);
return impl_->Allocate(size, pVoid);
}
Status Reallocate(void **pVoid, size_t old_sz, size_t new_sz) override {
std::unique_lock<std::mutex> lock(mux_);
return impl_->Reallocate(pVoid, old_sz, new_sz);
}
void Deallocate(void *pVoid) override {
std::unique_lock<std::mutex> lock(mux_);
impl_->Deallocate(pVoid);
}
uint64_t get_max_size() const override { return impl_->get_max_size(); }
int PercentFree() const override {
std::unique_lock<std::mutex> lock(mux_);
return impl_->PercentFree();
}
/// \brief Dump the memory allocation block.
friend std::ostream &operator<<(std::ostream &os, const CachedSharedMemoryArena &s) {
os << *(s.impl_);
return os;
}
private:
mutable std::mutex mux_;
void *ptr_;
int32_t val_in_GB_;
int32_t port_;
int shmid_;
std::unique_ptr<ArenaImpl> impl_;
/// Private constructor. Not to be called directly.
CachedSharedMemoryArena(int32_t port, size_t val_in_GB);
};
......
......@@ -40,6 +40,7 @@ class Allocator {
using reference = T &;
using const_reference = const T &;
using size_type = uint64_t;
using difference_type = std::ptrdiff_t;
template <typename U>
struct rebind {
......@@ -86,8 +87,30 @@ class Allocator {
private:
std::shared_ptr<MemoryPool> pool_;
};
/// \brief It is a wrapper of unique_ptr with a custom allocator and acts like std::lock_guard such that the memory will
/// be released when the object goes out of scope
/// \brief It is a wrapper of unique_ptr with a custom Allocator class defined above
template <typename T, typename... Args>
Status MakeUnique(std::unique_ptr<T[], std::function<void(T *)>> *out, Allocator<T> alloc, size_t n, Args &&... args) {
RETURN_UNEXPECTED_IF_NULL(out);
CHECK_FAIL_RETURN_UNEXPECTED(n > 0, "size must be positive");
T *data = alloc.allocate(n);
if (!std::is_arithmetic<T>::value) {
for (auto i = 0; i < n; i++) {
std::allocator_traits<Allocator<T>>::construct(alloc, &(data[i]), std::forward<Args>(args)...);
}
}
auto deleter = [](T *p, Allocator<T> f_alloc, size_t f_n) {
if (!std::is_arithmetic<T>::value && std::is_destructible<T>::value) {
for (auto i = 0; i < f_n; ++i) {
std::allocator_traits<Allocator<T>>::destroy(f_alloc, &p[i]);
}
}
f_alloc.deallocate(p, f_n);
};
*out = std::unique_ptr<T[], std::function<void(T *)>>(data, std::bind(deleter, std::placeholders::_1, alloc, n));
return Status::OK();
}
/// \brief It is a wrapper of the above custom unique_ptr with some additional methods
/// \tparam T The type of object to be allocated
/// \tparam C Allocator. Default to std::allocator
template <typename T, typename C = std::allocator<T>>
......@@ -113,14 +136,7 @@ class MemGuard {
/// \brief Explicitly deallocate the memory if allocated
void deallocate() {
if (ptr_) {
auto *p = ptr_.release();
if (!std::is_arithmetic<T>::value && std::is_destructible<T>::value) {
for (auto i = 0; i < n_; ++i) {
p[i].~T();
}
}
alloc_.deallocate(p, n_);
n_ = 0;
ptr_.reset();
}
}
/// \brief Allocate memory (with emplace feature). Previous one will be released. If size is 0, no new memory is
......@@ -129,24 +145,9 @@ class MemGuard {
/// \tparam Args Extra arguments pass to the constructor of T
template <typename... Args>
Status allocate(size_t n, Args &&... args) noexcept {
try {
deallocate();
if (n > 0) {
T *data = alloc_.allocate(n);
if (!std::is_arithmetic<T>::value) {
for (auto i = 0; i < n; i++) {
std::allocator_traits<C>::construct(alloc_, &(data[i]), std::forward<Args>(args)...);
}
}
ptr_ = std::unique_ptr<T[]>(data);
n_ = n;
}
} catch (const std::bad_alloc &e) {
return Status(StatusCode::kOutOfMemory);
} catch (std::exception &e) {
RETURN_STATUS_UNEXPECTED(e.what());
}
return Status::OK();
deallocate();
n_ = n;
return MakeUnique(&ptr_, alloc_, n, std::forward<Args>(args)...);
}
~MemGuard() noexcept { deallocate(); }
/// \brief Getter function
......@@ -170,7 +171,7 @@ class MemGuard {
private:
size_t n_;
allocator alloc_;
std::unique_ptr<T[]> ptr_;
std::unique_ptr<T[], std::function<void(T *)>> ptr_;
};
} // namespace dataset
} // namespace mindspore
......
......@@ -33,21 +33,19 @@ struct MemHdr {
*hdr = *tmp;
}
};
Status Arena::Init() {
RETURN_IF_NOT_OK(DeMalloc(size_in_MB_ * 1048576L, &ptr_, false));
ArenaImpl::ArenaImpl(void *ptr, size_t sz) : size_in_bytes_(sz), ptr_(ptr) {
// Divide the memory into blocks. Ignore the last partial block.
uint64_t num_blks = size_in_bytes_ / ARENA_BLK_SZ;
MS_LOG(DEBUG) << "Size of memory pool is " << num_blks << ", number of blocks of size is " << ARENA_BLK_SZ << ".";
tr_.Insert(0, num_blks);
return Status::OK();
}
Status Arena::Allocate(size_t n, void **p) {
Status ArenaImpl::Allocate(size_t n, void **p) {
if (n == 0) {
*p = nullptr;
return Status::OK();
}
std::unique_lock<std::mutex> lck(mux_);
// Round up n to 1K block
uint64_t req_size = static_cast<uint64_t>(n) + ARENA_WALL_OVERHEAD_SZ;
if (req_size > this->get_max_size()) {
......@@ -64,7 +62,6 @@ Status Arena::Allocate(size_t n, void **p) {
if (size > reqBlk) {
tr_.Insert(addr + reqBlk, size - reqBlk);
}
lck.unlock();
char *q = static_cast<char *>(ptr_) + addr * ARENA_BLK_SZ;
MemHdr::setHdr(q, addr, reqBlk);
*p = get_user_addr(q);
......@@ -74,14 +71,24 @@ Status Arena::Allocate(size_t n, void **p) {
return Status::OK();
}
void Arena::Deallocate(void *p) {
std::pair<std::pair<uint64_t, uint64_t>, bool> ArenaImpl::FindPrevBlk(uint64_t addr) {
for (auto &it : tr_) {
if (it.key + it.priority == addr) {
return std::make_pair(std::make_pair(it.key, it.priority), true);
} else if (it.key > addr) {
break;
}
}
return std::make_pair(std::make_pair(0, 0), false);
}
void ArenaImpl::Deallocate(void *p) {
auto *q = get_base_addr(p);
MemHdr hdr(0, 0);
MemHdr::getHdr(q, &hdr);
MS_ASSERT(hdr.sig == 0xDEADBEEF);
// We are going to insert a free block back to the treap. But first, check if we can combine
// with the free blocks before and after to form a bigger block.
std::unique_lock<std::mutex> lck(mux_);
// Query if we have a free block after us.
auto nextBlk = tr_.Search(hdr.addr + hdr.blk_size);
if (nextBlk.second) {
......@@ -101,7 +108,61 @@ void Arena::Deallocate(void *p) {
tr_.Insert(hdr.addr, hdr.blk_size);
}
Status Arena::Reallocate(void **pp, size_t old_sz, size_t new_sz) {
bool ArenaImpl::BlockEnlarge(uint64_t *addr, uint64_t old_sz, uint64_t new_sz) {
uint64_t size = old_sz;
// The logic is very much identical to Deallocate. We will see if we can combine with the blocks before and after.
auto next_blk = tr_.Search(*addr + old_sz);
if (next_blk.second) {
size += next_blk.first.priority;
if (size >= new_sz) {
// In this case, we can just enlarge the block without doing any moving.
tr_.DeleteKey(next_blk.first.key);
// Return unused back to the tree.
if (size > new_sz) {
tr_.Insert(*addr + new_sz, size - new_sz);
}
}
return true;
}
// If we still get here, we have to look at the block before us.
auto result = FindPrevBlk(*addr);
if (result.second) {
// We can combine with this block together with the next block (if any)
size += result.first.second;
*addr = result.first.first;
if (size >= new_sz) {
// We can combine with this block together with the next block (if any)
tr_.DeleteKey(*addr);
if (next_blk.second) {
tr_.DeleteKey(next_blk.first.key);
}
// Return unused back to the tree.
if (size > new_sz) {
tr_.Insert(*addr + new_sz, size - new_sz);
}
return true;
}
}
return false;
}
Status ArenaImpl::FreeAndAlloc(void **pp, size_t old_sz, size_t new_sz) {
MS_ASSERT(pp);
MS_ASSERT(*pp);
void *p = nullptr;
void *q = *pp;
RETURN_IF_NOT_OK(Allocate(new_sz, &p));
errno_t err = memmove_s(p, new_sz, q, old_sz);
if (err) {
RETURN_STATUS_UNEXPECTED("Error from memmove: " + std::to_string(err));
}
*pp = p;
// Free the old one.
Deallocate(q);
return Status::OK();
}
Status ArenaImpl::Reallocate(void **pp, size_t old_sz, size_t new_sz) {
MS_ASSERT(pp);
MS_ASSERT(*pp);
uint64_t actual_size = static_cast<uint64_t>(new_sz) + ARENA_WALL_OVERHEAD_SZ;
......@@ -114,7 +175,6 @@ Status Arena::Reallocate(void **pp, size_t old_sz, size_t new_sz) {
MemHdr hdr(0, 0);
MemHdr::getHdr(oldHdr, &hdr);
MS_ASSERT(hdr.sig == 0xDEADBEEF);
std::unique_lock<std::mutex> lck(mux_);
if (hdr.blk_size > req_blk) {
// Refresh the header with the new smaller size.
MemHdr::setHdr(oldHdr, hdr.addr, req_blk);
......@@ -142,42 +202,12 @@ Status Arena::Reallocate(void **pp, size_t old_sz, size_t new_sz) {
*pp = get_user_addr(newHdr);
return Status::OK();
}
// If we reach here, allocate a new block and simply move the content from the old to the new place.
// Unlock since allocate will grab the lock again.
lck.unlock();
return FreeAndAlloc(pp, old_sz, new_sz);
}
return Status::OK();
}
std::ostream &operator<<(std::ostream &os, const Arena &s) {
for (auto &it : s.tr_) {
os << "Address : " << it.key << ". Size : " << it.priority << "\n";
}
return os;
}
Arena::Arena(size_t val_in_MB) : ptr_(nullptr), size_in_MB_(val_in_MB), size_in_bytes_(val_in_MB * 1048576L) {}
Status Arena::CreateArena(std::shared_ptr<Arena> *p_ba, size_t val_in_MB) {
if (p_ba == nullptr) {
RETURN_STATUS_UNEXPECTED("p_ba is null");
}
Status rc;
auto ba = new (std::nothrow) Arena(val_in_MB);
if (ba == nullptr) {
return Status(StatusCode::kOutOfMemory);
}
rc = ba->Init();
if (rc.IsOk()) {
(*p_ba).reset(ba);
} else {
delete ba;
}
return rc;
}
int Arena::PercentFree() const {
int ArenaImpl::PercentFree() const {
uint64_t sz = 0;
for (auto &it : tr_) {
sz += it.priority;
......@@ -186,70 +216,42 @@ int Arena::PercentFree() const {
return static_cast<int>(ratio * 100.0);
}
uint64_t Arena::get_max_size() const { return (size_in_bytes_ - ARENA_WALL_OVERHEAD_SZ); }
std::pair<std::pair<uint64_t, uint64_t>, bool> Arena::FindPrevBlk(uint64_t addr) {
for (auto &it : tr_) {
if (it.key + it.priority == addr) {
return std::make_pair(std::make_pair(it.key, it.priority), true);
} else if (it.key > addr) {
break;
}
uint64_t ArenaImpl::SizeToBlk(uint64_t sz) {
uint64_t req_blk = sz / ARENA_BLK_SZ;
if (sz % ARENA_BLK_SZ) {
++req_blk;
}
return std::make_pair(std::make_pair(0, 0), false);
return req_blk;
}
bool Arena::BlockEnlarge(uint64_t *addr, uint64_t old_sz, uint64_t new_sz) {
uint64_t size = old_sz;
// The logic is very much identical to Deallocate. We will see if we can combine with the blocks before and after.
auto next_blk = tr_.Search(*addr + old_sz);
if (next_blk.second) {
size += next_blk.first.priority;
if (size >= new_sz) {
// In this case, we can just enlarge the block without doing any moving.
tr_.DeleteKey(next_blk.first.key);
// Return unused back to the tree.
if (size > new_sz) {
tr_.Insert(*addr + new_sz, size - new_sz);
}
}
return true;
std::ostream &operator<<(std::ostream &os, const ArenaImpl &s) {
for (auto &it : s.tr_) {
os << "Address : " << it.key << ". Size : " << it.priority << "\n";
}
// If we still get here, we have to look at the block before us.
auto result = FindPrevBlk(*addr);
if (result.second) {
// We can combine with this block together with the next block (if any)
size += result.first.second;
*addr = result.first.first;
if (size >= new_sz) {
// We can combine with this block together with the next block (if any)
tr_.DeleteKey(*addr);
if (next_blk.second) {
tr_.DeleteKey(next_blk.first.key);
}
// Return unused back to the tree.
if (size > new_sz) {
tr_.Insert(*addr + new_sz, size - new_sz);
}
return true;
}
return os;
}
Status Arena::Init() {
try {
auto sz = size_in_MB_ * 1048576L;
mem_ = std::make_unique<uint8_t[]>(sz);
impl_ = std::make_unique<ArenaImpl>(mem_.get(), sz);
} catch (std::bad_alloc &e) {
return Status(StatusCode::kOutOfMemory);
}
return false;
return Status::OK();
}
Status Arena::FreeAndAlloc(void **pp, size_t old_sz, size_t new_sz) {
MS_ASSERT(pp);
MS_ASSERT(*pp);
void *p = nullptr;
void *q = *pp;
RETURN_IF_NOT_OK(Allocate(new_sz, &p));
errno_t err = memmove_s(p, new_sz, q, old_sz);
if (err) {
RETURN_STATUS_UNEXPECTED("Error from memmove: " + std::to_string(err));
Arena::Arena(size_t val_in_MB) : size_in_MB_(val_in_MB) {}
Status Arena::CreateArena(std::shared_ptr<Arena> *p_ba, size_t val_in_MB) {
RETURN_UNEXPECTED_IF_NULL(p_ba);
auto ba = new (std::nothrow) Arena(val_in_MB);
if (ba == nullptr) {
return Status(StatusCode::kOutOfMemory);
}
*pp = p;
// Free the old one.
Deallocate(q);
(*p_ba).reset(ba);
RETURN_IF_NOT_OK(ba->Init());
return Status::OK();
}
} // namespace dataset
......
......@@ -41,63 +41,111 @@ namespace dataset {
///
/// When a block of memory is freed. It is joined with the blocks before and after (if they are available) to
/// form a bigger block.
class Arena : public MemoryPool {
public:
Arena(const Arena &) = delete;
Arena &operator=(const Arena &) = delete;
~Arena() override {
if (ptr_ != nullptr) {
free(ptr_);
ptr_ = nullptr;
}
}
/// At the lowest level, we don't really care where the memory is coming from.
/// This allows other class to make use of Arena method and override the origin of the
/// memory, say from some unix shared memory instead.
/// \note Implementation class is not thread safe. Caller needs to ensure proper serialization
class ArenaImpl {
public:
/// Constructor
/// \param ptr The start of the memory address
/// \param sz Size of the memory block we manage
ArenaImpl(void *ptr, size_t sz);
~ArenaImpl() { ptr_ = nullptr; }
/// \brief Allocate a sub block
/// \param n Size requested
/// \param p pointer to where the result is stored
/// \return Status object.
Status Allocate(size_t n, void **p);
/// \brief Enlarge or shrink a sub block
/// \param old_sz Original size
/// \param new_sz New size
/// \return Status object
Status Reallocate(void **, size_t old_sz, size_t new_sz);
/// \brief Free a sub block
/// \param Address of the block to be freed.
void Deallocate(void *);
/// \brief Calculate % free of the memory
/// \return Percent free
int PercentFree() const;
/// \brief What is the maximum we can support in allocate.
/// \return Max value
uint64_t get_max_size() const { return (size_in_bytes_ - ARENA_WALL_OVERHEAD_SZ); }
/// \brief Get the start of the address. Read only
/// \return Start of the address block
const void *get_base_addr() const { return ptr_; }
Status Allocate(size_t n, void **p) override;
static uint64_t SizeToBlk(uint64_t sz);
friend std::ostream &operator<<(std::ostream &os, const ArenaImpl &s);
Status Reallocate(void **, size_t old_sz, size_t new_sz) override;
private:
size_t size_in_bytes_;
Treap<uint64_t, uint64_t> tr_;
void *ptr_;
void Deallocate(void *) override;
void *get_user_addr(void *base_addr) const { return reinterpret_cast<char *>(base_addr) + ARENA_WALL_OVERHEAD_SZ; }
void *get_base_addr(void *user_addr) const { return reinterpret_cast<char *>(user_addr) - ARENA_WALL_OVERHEAD_SZ; }
std::pair<std::pair<uint64_t, uint64_t>, bool> FindPrevBlk(uint64_t addr);
bool BlockEnlarge(uint64_t *addr, uint64_t old_sz, uint64_t new_sz);
Status FreeAndAlloc(void **pp, size_t old_sz, size_t new_sz);
};
uint64_t get_max_size() const override;
/// \brief This version of Arena allocates from private memory
class Arena : public MemoryPool {
public:
// Disable copy and assignment constructor
Arena(const Arena &) = delete;
Arena &operator=(const Arena &) = delete;
~Arena() override = default;
static uint64_t SizeToBlk(uint64_t sz) {
uint64_t req_blk = sz / ARENA_BLK_SZ;
if (sz % ARENA_BLK_SZ) {
++req_blk;
}
return req_blk;
/// As a derived class of MemoryPool, we have to implement the following.
/// But we simply transfer the call to the implementation class
Status Allocate(size_t size, void **pVoid) override {
std::unique_lock<std::mutex> lock(mux_);
return impl_->Allocate(size, pVoid);
}
Status Reallocate(void **pVoid, size_t old_sz, size_t new_sz) override {
std::unique_lock<std::mutex> lock(mux_);
return impl_->Reallocate(pVoid, old_sz, new_sz);
}
void Deallocate(void *pVoid) override {
std::unique_lock<std::mutex> lock(mux_);
impl_->Deallocate(pVoid);
}
uint64_t get_max_size() const override { return impl_->get_max_size(); }
int PercentFree() const override {
std::unique_lock<std::mutex> lock(mux_);
return impl_->PercentFree();
}
int PercentFree() const override;
const void *get_base_addr() const { return ptr_; }
/// \return Return the start of the memory block
const void *get_base_addr() const { return impl_->get_base_addr(); }
friend std::ostream &operator<<(std::ostream &os, const Arena &s);
/// \brief Dump the memory allocation block.
friend std::ostream &operator<<(std::ostream &os, const Arena &s) {
os << *(s.impl_);
return os;
}
/// The only method to create an arena.
static Status CreateArena(std::shared_ptr<Arena> *p_ba, size_t val_in_MB = 4096);
protected:
std::mutex mux_;
Treap<uint64_t, uint64_t> tr_;
void *ptr_;
mutable std::mutex mux_;
std::unique_ptr<ArenaImpl> impl_;
std::unique_ptr<uint8_t[]> mem_;
size_t size_in_MB_;
size_t size_in_bytes_;
explicit Arena(size_t val_in_MB = 4096);
std::pair<std::pair<uint64_t, uint64_t>, bool> FindPrevBlk(uint64_t addr);
Status Init();
bool BlockEnlarge(uint64_t *addr, uint64_t old_sz, uint64_t new_sz);
Status FreeAndAlloc(void **pp, size_t old_sz, size_t new_sz);
void *get_user_addr(void *base_addr) const { return reinterpret_cast<char *>(base_addr) + ARENA_WALL_OVERHEAD_SZ; }
void *get_base_addr(void *user_addr) const { return reinterpret_cast<char *>(user_addr) - ARENA_WALL_OVERHEAD_SZ; }
};
} // namespace dataset
} // namespace mindspore
......
......@@ -47,10 +47,16 @@ Status BuddySpace::Init() {
size_t offset_1 = sizeof(rel_addr_t) * num_lvl_;
size_t offset_2 = sizeof(int) * num_lvl_ + offset_1;
size_t offset_3 = sizeof(char) * BitLeftShift(1, num_lvl_ - 3) + offset_2;
RETURN_IF_NOT_OK(DeMalloc(offset_3, &ptr_, true));
hint_ = reinterpret_cast<rel_addr_t *>(ptr_);
count_ = reinterpret_cast<int *>((reinterpret_cast<char *>(ptr_) + offset_1));
map_ = reinterpret_cast<char *>(ptr_) + offset_2;
try {
mem_ = std::make_unique<uint8_t[]>(offset_3);
} catch (const std::bad_alloc &e) {
return Status(StatusCode::kOutOfMemory);
}
(void)memset_s(mem_.get(), offset_3, 0, offset_3);
auto ptr = mem_.get();
hint_ = reinterpret_cast<rel_addr_t *>(ptr);
count_ = reinterpret_cast<int *>((reinterpret_cast<char *>(ptr) + offset_1));
map_ = reinterpret_cast<char *>(ptr) + offset_2;
count_[num_lvl_ - 1] = 1;
map_[0] = BitOr(MORE_BIT, num_lvl_ - 3);
return Status::OK();
......@@ -352,19 +358,9 @@ int BuddySpace::PercentFree() const {
}
BuddySpace::BuddySpace(int log_min, int num_lvl)
: hint_(nullptr),
count_(nullptr),
map_(nullptr),
log_min_(log_min),
num_lvl_(num_lvl),
min_(0),
max_(0),
ptr_(nullptr) {}
: hint_(nullptr), count_(nullptr), map_(nullptr), log_min_(log_min), num_lvl_(num_lvl), min_(0), max_(0) {}
BuddySpace::~BuddySpace() {
if (ptr_ != nullptr) {
free(ptr_);
}
hint_ = nullptr;
count_ = nullptr;
map_ = nullptr;
......
......@@ -94,7 +94,7 @@ class BuddySpace {
int num_lvl_;
uint64_t min_;
uint64_t max_;
void *ptr_;
std::unique_ptr<uint8_t[]> mem_;
std::mutex mutex_;
explicit BuddySpace(int log_min = 15, int num_lvl = 18);
......
......@@ -33,18 +33,6 @@
namespace mindspore {
namespace dataset {
template <typename T>
struct is_shared_ptr : public std::false_type {};
template <typename T>
struct is_shared_ptr<std::shared_ptr<T>> : public std::true_type {};
template <typename T>
struct is_unique_ptr : public std::false_type {};
template <typename T>
struct is_unique_ptr<std::unique_ptr<T>> : public std::true_type {};
// A simple thread safe queue using a fixed size array
template <typename T>
class Queue {
......@@ -55,44 +43,25 @@ class Queue {
using reference = T &;
using const_reference = const T &;
void Init() {
if (sz_ > 0) {
// We allocate a block of memory and then call the default constructor for each slot. Maybe simpler to call
// new[] but we want to control where the memory is allocated from.
arr_ = alloc_.allocate(sz_);
for (uint64_t i = 0; i < sz_; i++) {
std::allocator_traits<Allocator<T>>::construct(alloc_, &(arr_[i]));
}
}
}
explicit Queue(int sz)
: sz_(sz),
arr_(nullptr),
head_(0),
tail_(0),
my_name_(Services::GetUniqueID()),
alloc_(Services::GetInstance().GetServiceMemPool()) {
Init();
MS_LOG(DEBUG) << "Create Q with uuid " << my_name_ << " of size " << sz_ << ".";
}
virtual ~Queue() {
ResetQue();
if (arr_) {
// Simply free the pointer. Since there is nothing in the queue. We don't want to invoke the destructor
// of T in each slot.
alloc_.deallocate(arr_);
arr_ = nullptr;
: sz_(sz), arr_(Services::GetAllocator<T>()), head_(0), tail_(0), my_name_(Services::GetUniqueID()) {
Status rc = arr_.allocate(sz);
if (rc.IsError()) {
MS_LOG(ERROR) << "Fail to create a queue.";
std::terminate();
} else {
MS_LOG(DEBUG) << "Create Q with uuid " << my_name_ << " of size " << sz_ << ".";
}
}
int size() const {
int v = tail_ - head_;
virtual ~Queue() { ResetQue(); }
size_t size() const {
size_t v = tail_ - head_;
return (v >= 0) ? v : 0;
}
int capacity() const { return sz_; }
size_t capacity() const { return sz_; }
bool empty() const { return head_ == tail_; }
......@@ -104,8 +73,8 @@ class Queue {
// Block when full
Status rc = full_cv_.Wait(&_lock, [this]() -> bool { return (size() != capacity()); });
if (rc.IsOk()) {
uint32_t k = tail_++ % sz_;
arr_[k] = ele;
auto k = tail_++ % sz_;
*(arr_[k]) = ele;
empty_cv_.NotifyAll();
_lock.unlock();
} else {
......@@ -119,8 +88,8 @@ class Queue {
// Block when full
Status rc = full_cv_.Wait(&_lock, [this]() -> bool { return (size() != capacity()); });
if (rc.IsOk()) {
uint32_t k = tail_++ % sz_;
arr_[k] = std::forward<T>(ele);
auto k = tail_++ % sz_;
*(arr_[k]) = std::forward<T>(ele);
empty_cv_.NotifyAll();
_lock.unlock();
} else {
......@@ -135,8 +104,8 @@ class Queue {
// Block when full
Status rc = full_cv_.Wait(&_lock, [this]() -> bool { return (size() != capacity()); });
if (rc.IsOk()) {
uint32_t k = tail_++ % sz_;
new (&(arr_[k])) T(std::forward<Ts>(args)...);
auto k = tail_++ % sz_;
new (arr_[k]) T(std::forward<Ts>(args)...);
empty_cv_.NotifyAll();
_lock.unlock();
} else {
......@@ -151,20 +120,8 @@ class Queue {
// Block when empty
Status rc = empty_cv_.Wait(&_lock, [this]() -> bool { return !empty(); });
if (rc.IsOk()) {
uint32_t k = head_++ % sz_;
*p = std::move(arr_[k]);
if (std::is_destructible<T>::value) {
// std::move above only changes arr_[k] from rvalue to lvalue.
// The real implementation of move constructor depends on T.
// It may be compiler generated or user defined. But either case
// the result of arr_[k] is still a valid object of type T, and
// we will not keep any extra copy in the queue.
arr_[k].~T();
// For gcc 9, an extra fix is needed here to clear the memory content
// of arr_[k] because this slot can be reused by another Add which can
// do another std::move. We have seen SEGV here in this case.
std::allocator_traits<Allocator<T>>::construct(alloc_, &(arr_[k]));
}
auto k = head_++ % sz_;
*p = std::move(*(arr_[k]));
full_cv_.NotifyAll();
_lock.unlock();
} else {
......@@ -175,15 +132,15 @@ class Queue {
void ResetQue() noexcept {
std::unique_lock<std::mutex> _lock(mux_);
// If there are elements in the queue, invoke its destructor one by one.
if (!empty() && std::is_destructible<T>::value) {
for (uint64_t i = head_; i < tail_; i++) {
uint32_t k = i % sz_;
arr_[k].~T();
}
}
for (uint64_t i = 0; i < sz_; i++) {
std::allocator_traits<Allocator<T>>::construct(alloc_, &(arr_[i]));
// If there are elements in the queue, drain them. We won't call PopFront directly
// because we have got the lock already. We will deadlock if we call PopFront
for (auto i = head_; i < tail_; ++i) {
auto k = i % sz_;
auto val = std::move(*(arr_[k]));
// Let val go out of scope and its destructor will be invoked automatically.
// But our compiler may complain val is not in use. So let's do some useless
// stuff.
MS_LOG(DEBUG) << "Address of val: " << &val;
}
empty_cv_.ResetIntrpState();
full_cv_.ResetIntrpState();
......@@ -202,15 +159,14 @@ class Queue {
}
private:
uint64_t sz_;
pointer arr_;
uint64_t head_;
uint64_t tail_;
size_t sz_;
MemGuard<T, Allocator<T>> arr_;
size_t head_;
size_t tail_;
std::string my_name_;
std::mutex mux_;
CondVar empty_cv_;
CondVar full_cv_;
Allocator<T> alloc_;
};
// A container of queues with [] operator accessors. Basically this is a wrapper over of a vector of queues
......@@ -237,7 +193,7 @@ class QueueList {
return Status::OK();
}
int size() const { return queue_list_.size(); }
auto size() const { return queue_list_.size(); }
std::unique_ptr<Queue<T>> &operator[](const int index) { return queue_list_[index]; }
......
......@@ -15,7 +15,9 @@
*/
#include <string>
#include "minddata/dataset/util/allocator.h"
#include "minddata/dataset/util/arena.h"
#include "minddata/dataset/util/system_pool.h"
#include "common/common.h"
#include "utils/log_adapter.h"
......@@ -27,11 +29,10 @@ class MindDataTestArena : public UT::Common {
};
TEST_F(MindDataTestArena, TestALLFunction) {
TEST_F(MindDataTestArena, Test1) {
std::shared_ptr<Arena> mp;
Status rc = Arena::CreateArena(&mp);
ASSERT_TRUE(rc.IsOk());
std::shared_ptr<Arena> arena = std::dynamic_pointer_cast<Arena>(mp);
std::vector<void *> v;
srand(time(NULL));
......@@ -46,3 +47,25 @@ TEST_F(MindDataTestArena, TestALLFunction) {
}
MS_LOG(DEBUG) << *mp;
}
TEST_F(MindDataTestArena, Test2) {
std::shared_ptr<Arena> arena;
Status rc = Arena::CreateArena(&arena);
std::shared_ptr<MemoryPool> mp = std::static_pointer_cast<MemoryPool>(arena);
auto alloc = Allocator<int>(mp);
ASSERT_TRUE(rc.IsOk());
std::vector<int, Allocator<int>> v(alloc);
v.reserve(1000);
for (auto i = 0; i < 1000; ++i) {
v.push_back(i);
}
// Test copy
std::vector<int, Allocator<int>> w(v, SystemPool::GetAllocator<int>());
auto val = w.at(10);
EXPECT_EQ(val, 10);
// Test move
std::vector<int, Allocator<int>> s(std::move(v), SystemPool::GetAllocator<int>());
val = s.at(100);
EXPECT_EQ(val, 100);
EXPECT_EQ(v.size(), 0);
}
......@@ -50,6 +50,13 @@ class RefCount {
v_ = o.v_;
return *this;
}
RefCount(RefCount &&o) : v_(std::move(o.v_)) {}
RefCount &operator=(RefCount &&o) {
if (&o != this) {
v_ = std::move(o.v_);
}
return *this;
}
std::shared_ptr<int> v_;
};
......@@ -148,8 +155,9 @@ TEST_F(MindDataTestQueue, Test4) { test4(); }
TEST_F(MindDataTestQueue, Test5) {
test4();
// Assume we have run Test4. The destructor of the RefCount should be called 4 times.
// One for a. One for b. One for line 125 when we pop. One for the stale element in the queue.
ASSERT_EQ(gRefCountDestructorCalled, 4);
// One for a. One for b. One for the stale element in the queue. 3 more for
// the one in the queue (but they are empty).
ASSERT_EQ(gRefCountDestructorCalled, 6);
}
TEST_F(MindDataTestQueue, Test6) {
......@@ -169,70 +177,3 @@ TEST_F(MindDataTestQueue, Test6) {
MS_LOG(INFO) << "Popped value " << *pepped_value << " from queue index " << chosen_queue_index;
ASSERT_EQ(*pepped_value, 99);
}
using namespace std::chrono;
template <typename QueueType, typename PayloadType>
void Perf(int n, int p, std::string name) {
auto payload = std::vector<PayloadType>(n, PayloadType(p));
auto queue = QueueType(n);
auto t0 = high_resolution_clock::now();
auto check = 0;
for (int i = 0; i < queue.capacity(); i++) {
queue.Add(PayloadType(p));
}
check = queue.size();
for (int i = 0; i < queue.capacity(); i++) {
queue.PopFront(&payload[i]);
}
auto t1 = high_resolution_clock::now();
std::cout << name << " queue filled size: " << queue.size() << " " << check << std::endl;
auto t2 = high_resolution_clock::now();
for (int i = 0; i < queue.capacity(); i++) {
queue.Add(PayloadType(p));
}
check = queue.size();
for (int i = 0; i < queue.capacity(); i++) {
queue.PopFront(&payload[i]);
}
auto t3 = high_resolution_clock::now();
auto d = duration_cast<milliseconds>(t3 - t2 + t1 - t0).count();
std::cout << name << " queue emptied size: " << queue.size() << " " << check << std::endl;
std::cout << name << " "
<< " ran in " << d << "ms" << std::endl;
}
template <typename QueueType, typename PayloadType>
void Fuzz(int n, int p, std::string name) {
std::mt19937 gen(1);
auto payload = std::vector<PayloadType>(n, PayloadType(p));
auto queue = QueueType(n);
auto dist = std::uniform_int_distribution<int>(0, 2);
std::cout << "###" << std::endl;
for (auto i = 0; i < n; i++) {
auto v = dist(gen);
if (v == 0 && queue.size() < n - 1) {
queue.Add(std::move(payload[i]));
}
if (v == 1 && queue.size() > 0) {
queue.PopFront(&payload[i]);
} else {
queue.Reset();
}
}
std::cout << name << " fuzz ran " << queue.size() << std::endl;
}
TEST_F(MindDataTestQueue, TestPerf) {
try {
int kSz = 1000000;
// std::cout << "enter size" << std::endl;
// std::cin >> kSz;
Perf<Queue<std::vector<int>>, std::vector<int>>(kSz, 1, "old queue, vector of size 1");
} catch (const std::exception &e) {
std::cout << e.what() << std::endl;
}
std::cout << "Test Reset" << std::endl;
std::cout << "Enter fuzz size" << std::endl;
int fs = 1000;
// std::cin >> fs;
Fuzz<Queue<std::vector<int>>, std::vector<int>>(fs, 1, "New queue");
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册