提交 c6c5fa84 编写于 作者: M mindspore-ci-bot 提交者: Gitee

!2472 CacheOp branch infrastructure

Merge pull request !2472 from JesseKLee/cache_infra
......@@ -2,6 +2,8 @@ file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc"
set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD)
add_library(utils OBJECT
arena.cc
buddy.cc
cache_pool.cc
circular_pool.cc
memory_pool.cc
cond_var.cc
......@@ -11,7 +13,11 @@ add_library(utils OBJECT
service.cc
services.cc
lock.cc
semaphore.cc
status.cc
storage_container.cc
storage_manager.cc
slice.cc
path.cc
wait_post.cc
sig_handler.cc)
......@@ -17,8 +17,10 @@
#define DATASET_UTIL_ALLOCATOR_H_
#include <cstdlib>
#include <functional>
#include <memory>
#include <type_traits>
#include <utility>
#include "dataset/util/memory_pool.h"
namespace mindspore {
......@@ -84,6 +86,91 @@ class Allocator {
private:
std::shared_ptr<MemoryPool> pool_;
};
/// \brief It is a wrapper of unique_ptr with a custom allocator and acts like std::lock_guard such that the memory will
/// be released when the object goes out of scope \tparam T The type of object to be allocated \tparam C Allocator.
/// Default to std::allocator
template <typename T, typename C = std::allocator<T>>
class MemGuard {
public:
using allocator = C;
MemGuard() : n_(0) {}
explicit MemGuard(allocator a) : n_(0), alloc_(a) {}
// There is no copy constructor nor assignment operator because the memory is solely owned by this object.
MemGuard(const MemGuard &) = delete;
MemGuard &operator=(const MemGuard &) = delete;
// On the other hand, We can support move constructor
MemGuard(MemGuard &&lhs) noexcept : alloc_(std::move(lhs.alloc_)), ptr_(std::move(lhs.ptr_)), n_(lhs.n_) {}
MemGuard &operator=(MemGuard &&lhs) noexcept {
if (this != &lhs) {
this->deallocate();
n_ = lhs.n_;
alloc_ = std::move(lhs.alloc_);
ptr_ = std::move(lhs.ptr_);
}
return *this;
}
/// \brief Explicitly deallocate the memory if allocated
void deallocate() {
if (ptr_) {
auto *p = ptr_.release();
if (!std::is_arithmetic<T>::value && std::is_destructible<T>::value) {
for (auto i = 0; i < n_; ++i) {
p[i].~T();
}
}
alloc_.deallocate(p, n_);
n_ = 0;
}
}
/// \brief Allocate memory (with emplace feature). Previous one will be released. If size is 0, no new memory is
/// allocated.
/// \param n Number of objects of type T to be allocated
/// \tparam Args Extra arguments pass to the constructor of T
template <typename... Args>
Status allocate(size_t n, Args &&... args) noexcept {
try {
deallocate();
if (n > 0) {
T *data = alloc_.allocate(n);
if (!std::is_arithmetic<T>::value) {
for (auto i = 0; i < n; i++) {
std::allocator_traits<C>::construct(alloc_, &(data[i]), std::forward<Args>(args)...);
}
}
ptr_ = std::unique_ptr<T[]>(data);
n_ = n;
}
} catch (const std::bad_alloc &e) {
return Status(StatusCode::kOutOfMemory);
} catch (std::exception &e) {
RETURN_STATUS_UNEXPECTED(e.what());
}
return Status::OK();
}
~MemGuard() noexcept { deallocate(); }
/// \brief Getter function
/// \return The pointer to the memory allocated
T *GetPointer() const { return ptr_.get(); }
/// \brief Getter function
/// \return The pointer to the memory allocated
T *GetMutablePointer() { return ptr_.get(); }
/// \brief Overload [] operator to access a particular element
/// \param x index to the element. Must be less than number of element allocated.
/// \return pointer to the x-th element
T *operator[](size_t x) { return GetMutablePointer() + x; }
/// \brief Overload [] operator to access a particular element
/// \param x index to the element. Must be less than number of element allocated.
/// \return pointer to the x-th element
T *operator[](size_t x) const { return GetPointer() + x; }
/// \brief Return how many bytes are allocated in total
/// \return Number of bytes allocated in total
size_t GetSizeInBytes() const { return n_ * sizeof(T); }
private:
allocator alloc_;
std::unique_ptr<T[], std::function<void(T *)>> ptr_;
size_t n_;
};
} // namespace dataset
} // namespace mindspore
......
/**
* Copyright 2019 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "dataset/util/buddy.h"
#include <iomanip>
#include <stdexcept>
#include "dataset/util/de_error.h"
#include "dataset/util/memory_pool.h"
#include "dataset/util/system_pool.h"
#include "./securec.h"
inline uint64_t BitLeftShift(uint64_t v, uint64_t n) { return (v << n); }
inline uint64_t BitRightShift(uint64_t v, uint64_t n) { return (v >> n); }
inline uint64_t BitOr(uint64_t rhs, uint64_t lhs) { return rhs | lhs; }
inline uint64_t BitEx(uint64_t rhs, uint64_t lhs) { return rhs ^ lhs; }
inline uint64_t BitAnd(uint64_t rhs, uint64_t lhs) { return rhs & lhs; }
namespace mindspore {
namespace dataset {
Status BuddySpace::Init() {
if (log_min_ < 0) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
"log_min must be positive : " + std::to_string(log_min_));
}
if (num_lvl_ < 3 || num_lvl_ > 18) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
"num_lvl must be between 3 and 18 : " + std::to_string(num_lvl_));
}
min_ = BitLeftShift(1, log_min_);
max_ = BitLeftShift(1, log_min_ + num_lvl_ - 1);
size_t offset_1 = sizeof(rel_addr_t) * num_lvl_;
size_t offset_2 = sizeof(int) * num_lvl_ + offset_1;
size_t offset_3 = sizeof(char) * BitLeftShift(1, num_lvl_ - 3) + offset_2;
RETURN_IF_NOT_OK(DeMalloc(offset_3, &ptr_, true));
hint_ = reinterpret_cast<rel_addr_t *>(ptr_);
count_ = reinterpret_cast<int *>((reinterpret_cast<char *>(ptr_) + offset_1));
map_ = reinterpret_cast<char *>(ptr_) + offset_2;
count_[num_lvl_ - 1] = 1;
map_[0] = BitOr(MORE_BIT, num_lvl_ - 3);
return Status::OK();
}
Status BuddySpace::Alloc(const uint64_t sz, BSpaceDescriptor *desc, addr_t *p) noexcept {
std::lock_guard<std::mutex> lock(mutex_);
addr_t addr = AllocNoLock(sz, desc);
if (addr != NOSPACE) {
*p = addr;
return Status::OK();
} else {
return Status(StatusCode::kNoSpace, "BuddySpace full. Not an error. Please ignore.");
}
}
addr_t BuddySpace::AllocNoLock(const uint64_t sz, BSpaceDescriptor *desc) noexcept {
DS_ASSERT(sz <= max_);
uint32_t reqSize = SizeToBlock(sz);
rel_addr_t rel_addr = AllocBuddySeg(reqSize);
if (rel_addr != static_cast<rel_addr_t>(NOSPACE)) {
(void)memset_s(desc, sizeof(BSpaceDescriptor), 0, sizeof(BSpaceDescriptor));
desc->sig = static_cast<int>(0xDEADBEEF);
desc->addr = rel_addr;
desc->req_size = reqSize;
desc->blk_size = NextPowerOf2(reqSize);
return static_cast<addr_t>(rel_addr * min_);
} else {
return NOSPACE;
}
}
void BuddySpace::FreeNoLock(const BSpaceDescriptor *desc) {
DS_ASSERT(desc->sig == 0XDEADBEEF);
rel_addr_t rel_addr = desc->addr;
size_t blk_size = desc->blk_size;
size_t req_size = desc->req_size;
FreeBuddySeg(rel_addr, blk_size, req_size);
}
void BuddySpace::Free(const BSpaceDescriptor *desc) {
std::lock_guard<std::mutex> lock(mutex_);
return FreeNoLock(desc);
}
std::ostream &operator<<(std::ostream &os, const BuddySpace &s) {
os << "1 unit = " << s.GetMinSize() << "\n"
<< "Size of buddy space = " << s.GetMaxSize() << "\n"
<< "Number of levels = " << s.num_lvl_ << "\n\n"
<< "Percent free = " << s.PercentFree() << "\n"
<< "Dumping count array : "
<< "\n";
for (int i = 0; i < s.num_lvl_; i++) {
os << "[" << i << "] = " << s.count_[i] << " ";
if (((i + 1) % 4) == 0) {
os << "\n";
}
}
os << "\n";
os << "Dumping allocation info:"
<< "\n";
auto max_addr = static_cast<rel_addr_t>(BitLeftShift(1, s.num_lvl_ - 1));
rel_addr_t addr = 0;
while (addr < max_addr) {
size_t sz = 0;
BuddySpace::STATE st;
s.GetBuddySegState(addr, &sz, &st);
os << "Address : " << std::left << std::setw(8) << addr << " Size : " << std::setw(8) << sz << " State : "
<< ((st == BuddySpace::STATE::kAlloc) ? "ALLOC" : ((st == BuddySpace::STATE::kFree) ? "FREE" : "Unkonwn"))
<< "\n";
addr += sz;
}
return os;
}
void BuddySpace::GetBuddySegState(const rel_addr_t rel_addr, size_t *rel_sz, STATE *st) const {
char byte;
int pos;
int offset;
uint64_t val = 0;
int shift;
pos = BitRightShift(rel_addr, 2);
offset = rel_addr % 4;
shift = offset * 2;
byte = map_[pos];
switch (offset) {
case 0:
val = byte;
break;
case 1:
case 3:
if (offset == 1) {
val = BitLeftShift(BitAnd(byte, 0x30), shift);
} else {
val = BitLeftShift(BitAnd(byte, 0x03), shift);
}
break;
case 2:
val = BitLeftShift(BitAnd(byte, 0x0F), shift);
break;
}
if (BitAnd(val, ONE_BIT)) {
*rel_sz = 1;
} else if (BitAnd(val, TWO_BIT)) {
*rel_sz = 2;
} else if (BitAnd(val, MORE_BIT)) {
log_t lg = BitAnd(val, 0x0F);
*rel_sz = BitLeftShift(1, lg + 2);
} else {
*st = STATE::kEmpty;
return;
}
*st = BitAnd(val, ALLOC_BIT) ? STATE::kAlloc : STATE::kFree;
}
void BuddySpace::SetBuddySegState(rel_addr_t rel_addr, size_t rel_sz, STATE st) {
int clr;
int mask;
int pos;
int offset;
int val = 0;
int shift;
auto log_sz = static_cast<log_t>(Log2(rel_sz));
pos = BitRightShift(rel_addr, 2);
offset = rel_addr % 4;
shift = offset * 2;
if (rel_sz == 1) {
val = ONE_BIT;
mask = 0xC0;
} else if (rel_sz == 2) {
val = TWO_BIT;
mask = 0xF0;
} else {
val = BitOr(log_sz - 2, MORE_BIT);
mask = 0xFF;
}
if (st == STATE::kAlloc) {
val = BitOr(val, ALLOC_BIT);
} else if (st == STATE::kFree) {
val = BitAnd(val, ~(static_cast<uint64_t>(ALLOC_BIT)));
} else if (st == STATE::kEmpty) {
val = 0;
}
clr = static_cast<int>(~(BitRightShift(mask, shift)));
map_[pos] = static_cast<char>(BitAnd(map_[pos], clr));
map_[pos] = static_cast<char>(BitOr(map_[pos], BitRightShift(val, shift)));
if (st == STATE::kAlloc) {
count_[log_sz]--;
} else if (st == STATE::kFree) {
count_[log_sz]++;
if (rel_addr < hint_[log_sz]) {
hint_[log_sz] = rel_addr;
}
}
}
void BuddySpace::JoinBuddySeg(rel_addr_t addr, size_t blk_sz) {
while (blk_sz < BitLeftShift(1, num_lvl_)) {
rel_addr_t buddy = BitEx(addr, blk_sz);
size_t sz = 0;
STATE st;
GetBuddySegState(buddy, &sz, &st);
if (st == STATE::kFree && sz == blk_sz) {
auto log_sz = static_cast<log_t>(Log2(blk_sz));
rel_addr_t left = (buddy < addr) ? buddy : addr;
rel_addr_t right = left + blk_sz;
DS_ASSERT(count_[log_sz] >= 2);
count_[log_sz] -= 2;
SetBuddySegState(right, blk_sz, STATE::kEmpty);
SetBuddySegState(left, BitLeftShift(blk_sz, 1), STATE::kFree);
for (int i = 0; i < log_sz; i++) {
if (hint_[i] == right) {
hint_[i] = left;
}
}
addr = left;
blk_sz <<= 1u;
} else {
break;
}
}
}
void BuddySpace::TrimBuddySeg(rel_addr_t addr, size_t blk_sz, size_t ask_sz) {
DS_ASSERT(ask_sz < blk_sz);
uint32_t inx = Log2(blk_sz);
size_t remaining_sz = ask_sz;
for (int i = inx; i > 0; i--) {
size_t b_size = BitLeftShift(1, i);
size_t half_sz = BitRightShift(b_size, 1);
count_[i]--;
SetBuddySegState(addr, half_sz, STATE::kFree);
SetBuddySegState(addr + half_sz, half_sz, STATE::kFree);
if (remaining_sz >= half_sz) {
SetBuddySegState(addr, half_sz, STATE::kAlloc);
remaining_sz -= half_sz;
if (remaining_sz == 0) {
break;
}
addr += half_sz;
}
}
}
void BuddySpace::UnTrimBuddySeg(rel_addr_t addr, size_t blk_sz, size_t ask_sz) {
DS_ASSERT(ask_sz < blk_sz);
uint32_t inx = Log2(blk_sz);
size_t remaining_sz = ask_sz;
for (int i = inx; i > 0; i--) {
size_t b_size = BitLeftShift(1, i);
size_t half_sz = BitRightShift(b_size, 1);
if (remaining_sz >= half_sz) {
#ifdef DEBUG
{
size_t sz = 0;
STATE st;
GetBuddySegState(addr, &sz, &st);
DS_ASSERT(sz == half_sz && st == STATE::kAlloc);
}
#endif
SetBuddySegState(addr, half_sz, STATE::kFree);
remaining_sz -= half_sz;
if (remaining_sz == 0) {
JoinBuddySeg(addr, half_sz);
break;
}
addr += half_sz;
}
}
}
rel_addr_t BuddySpace::AllocBuddySeg(uint32_t req_size) noexcept {
uint32_t blk_size = NextPowerOf2(req_size);
int start_inx = static_cast<int>(Log2(blk_size));
bool found = false;
rel_addr_t ask_addr = 0;
auto max_addr = static_cast<rel_addr_t>(BitLeftShift(1, num_lvl_ - 1));
STATE st;
size_t sz = 0;
for (int i = start_inx; !found && i < num_lvl_; i++) {
DS_ASSERT(count_[i] >= 0);
if (count_[i] == 0) {
continue;
}
auto blk_sz = static_cast<size_t>(BitLeftShift(1, i));
ask_addr = hint_[i];
while (ask_addr < max_addr && !found) {
GetBuddySegState(ask_addr, &sz, &st);
if (st == STATE::kFree && sz == blk_sz) {
found = true;
} else {
DS_ASSERT(st != STATE::kEmpty);
ask_addr += ((sz > blk_sz) ? sz : blk_sz);
}
}
}
if (found) {
if (sz > req_size) {
TrimBuddySeg(ask_addr, sz, req_size);
} else {
SetBuddySegState(ask_addr, sz, STATE::kAlloc);
hint_[start_inx] = ask_addr;
}
return ask_addr;
} else {
return static_cast<rel_addr_t>(NOSPACE);
}
}
void BuddySpace::FreeBuddySeg(rel_addr_t addr, size_t blk_size, size_t req_size) {
if (req_size == blk_size) {
#ifdef DEBUG
{
size_t sz = 0;
STATE st;
GetBuddySegState(addr, &sz, &st);
}
#endif
SetBuddySegState(addr, blk_size, STATE::kFree);
JoinBuddySeg(addr, blk_size);
} else {
UnTrimBuddySeg(addr, blk_size, req_size);
}
}
int BuddySpace::PercentFree() const {
uint64_t total_free_sz = 0;
uint64_t max_sz_in_unit = BitLeftShift(1, num_lvl_ - 1);
// Go through the count array without lock
for (int i = 0; i < num_lvl_; i++) {
int cnt = count_[i];
if (cnt == 0) {
continue;
}
uint64_t blk_sz = BitLeftShift(1, i);
total_free_sz += (blk_sz * cnt);
}
return static_cast<int>(static_cast<float>(total_free_sz) / static_cast<float>(max_sz_in_unit) * 100);
}
BuddySpace::BuddySpace(int log_min, int num_lvl)
: hint_(nullptr),
count_(nullptr),
map_(nullptr),
log_min_(log_min),
num_lvl_(num_lvl),
min_(0),
max_(0),
ptr_(nullptr) {}
BuddySpace::~BuddySpace() {
if (ptr_ != nullptr) {
free(ptr_);
}
hint_ = nullptr;
count_ = nullptr;
map_ = nullptr;
}
Status BuddySpace::CreateBuddySpace(std::unique_ptr<BuddySpace> *out_bs, int log_min, int num_lvl) {
Status rc;
auto bs = new (std::nothrow) BuddySpace(log_min, num_lvl);
if (bs == nullptr) {
return Status(StatusCode::kOutOfMemory);
}
rc = bs->Init();
if (rc.IsOk()) {
(*out_bs).reset(bs);
} else {
delete bs;
}
return rc;
}
} // namespace dataset
} // namespace mindspore
/**
* Copyright 2019 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DATASET_UTIL_BUDDY_H_
#define DATASET_UTIL_BUDDY_H_
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <memory>
#include <mutex>
#include "dataset/util/status.h"
using addr_t = int64_t;
using rel_addr_t = int32_t;
using log_t = int;
#define ALLOC_BIT 0x80
#define ONE_BIT 0x40
#define TWO_BIT 0x20
#define MORE_BIT 0x10
#define NOSPACE ((addr_t)(-1))
namespace mindspore {
namespace dataset {
struct BSpaceDescriptor {
int32_t sig;
rel_addr_t addr;
size_t req_size;
size_t blk_size;
};
class BuddySpace {
public:
// C++11 feature. Change STATE into a type safe class with
// the keyword. Don't take out the keyword 'class'
enum class STATE { kFree, kAlloc, kEmpty };
BuddySpace(const BuddySpace &) = delete;
BuddySpace &operator=(const BuddySpace &) = delete;
virtual ~BuddySpace();
Status Alloc(uint64_t sz, BSpaceDescriptor *desc, addr_t *) noexcept;
void Free(const BSpaceDescriptor *desc);
uint64_t GetMinSize() const { return min_; }
uint64_t GetMaxSize() const { return max_; }
int PercentFree() const;
friend std::ostream &operator<<(std::ostream &os, const BuddySpace &s);
static uint64_t NextPowerOf2(uint64_t n) {
if (n <= 1) {
return 1;
}
n = n - 1;
while (n & (n - 1)) {
n = n & (n - 1);
}
return n << 1;
}
static uint32_t Log2(uint64_t n) {
uint32_t cnt = 0;
while (n >>= 1) {
cnt++;
}
return cnt;
}
static Status CreateBuddySpace(std::unique_ptr<BuddySpace> *out_bs, int log_min = 15, int num_lvl = 18);
private:
rel_addr_t *hint_;
int *count_;
char *map_;
int log_min_;
int num_lvl_;
uint64_t min_;
uint64_t max_;
void *ptr_;
std::mutex mutex_;
explicit BuddySpace(int log_min = 15, int num_lvl = 18);
Status Init();
addr_t AllocNoLock(const uint64_t sz, BSpaceDescriptor *desc) noexcept;
void FreeNoLock(const BSpaceDescriptor *desc);
uint32_t SizeToBlock(const uint64_t sz) const {
uint32_t reqSize = (sz / min_);
if (sz % min_) {
reqSize++;
}
return reqSize;
}
void GetBuddySegState(const rel_addr_t rel_addr, size_t *rel_sz, STATE *st) const;
void SetBuddySegState(rel_addr_t rel_addr, size_t rel_sz, STATE st);
void JoinBuddySeg(rel_addr_t addr, size_t blk_sz);
void TrimBuddySeg(rel_addr_t addr, size_t blk_sz, size_t ask_sz);
void UnTrimBuddySeg(rel_addr_t addr, size_t blk_sz, size_t ask_sz);
rel_addr_t AllocBuddySeg(uint32_t req_size) noexcept;
void FreeBuddySeg(rel_addr_t addr, size_t blk_size, size_t req_size);
};
} // namespace dataset
} // namespace mindspore
#endif // DATASET_UTIL_BUDDY_H_
/**
* Copyright 2019 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include "common/utils.h"
#include "dataset/util/cache_pool.h"
#include "dataset/util/services.h"
namespace mindspore {
namespace dataset {
CachePool::CachePool(const value_allocator &alloc, const std::string &root)
: alloc_(alloc), root_(root), subfolder_(Services::GetUniqueID()), sm_(nullptr), tree_(nullptr) {}
Status CachePool::DoServiceStart() {
tree_ = std::make_shared<data_index>();
// If we are given a disk path, set up the StorageManager
if (!root_.toString().empty()) {
Path spill = GetSpillPath();
RETURN_IF_NOT_OK(spill.CreateDirectories());
sm_ = std::make_shared<StorageManager>(spill);
RETURN_IF_NOT_OK(sm_->ServiceStart());
MS_LOG(INFO) << "CachePool will use disk folder: " << common::SafeCStr(spill.toString());
}
return Status::OK();
}
Status CachePool::DoServiceStop() {
Status rc;
Status rc2;
if (sm_ != nullptr) {
rc = sm_->ServiceStop();
if (rc.IsError()) {
rc2 = rc;
}
}
sm_.reset();
for (auto &bl : *tree_) {
if (bl.ptr != nullptr) {
alloc_.deallocate(bl.ptr, bl.sz);
}
}
tree_.reset();
if (!root_.toString().empty()) {
Path spill = GetSpillPath();
auto it = Path::DirIterator::OpenDirectory(&spill);
while (it->hasNext()) {
rc = it->next().Remove();
if (rc.IsError() && rc2.IsOk()) {
rc2 = rc;
}
}
rc = spill.Remove();
if (rc.IsError() && rc2.IsOk()) {
rc2 = rc;
}
}
return rc2;
}
CachePool::~CachePool() noexcept { (void)ServiceStop(); }
Status CachePool::Insert(const std::vector<ReadableSlice> &buf, CachePool::key_type *key) {
DataLocator bl;
Status rc;
size_t sz = 0;
// We will consolidate all the slices into one piece.
for (auto &v : buf) {
sz += v.GetSize();
}
bl.sz = sz;
try {
bl.ptr = alloc_.allocate(sz);
// We will do a piecewise copy.
WritableSlice dest(bl.ptr, bl.sz);
size_t pos = 0;
for (auto &v : buf) {
WritableSlice out(dest, pos);
rc = WritableSlice::Copy(&out, v);
if (rc.IsError()) {
break;
}
pos += v.GetSize();
}
if (rc.IsError()) {
alloc_.deallocate(bl.ptr, sz);
bl.ptr = nullptr;
return rc;
}
} catch (std::bad_alloc &e) {
if (sm_ != nullptr) {
RETURN_IF_NOT_OK(sm_->Write(&bl.storage_key, buf));
// We have an assumption 0 is not a valid key from the design of AutoIndexObj.
// Make sure it is not 0.
if (bl.storage_key == 0) {
RETURN_STATUS_UNEXPECTED("Key 0 is returned which is unexpected");
}
} else {
return Status(StatusCode::kOutOfMemory, __LINE__, __FILE__);
}
}
rc = tree_->insert(bl, key);
if (rc.IsError() && bl.ptr != nullptr) {
alloc_.deallocate(bl.ptr, sz);
}
return rc;
}
Status CachePool::Read(CachePool::key_type key, WritableSlice *dest, size_t *bytesRead) const {
RETURN_UNEXPECTED_IF_NULL(dest);
auto r = tree_->Search(key);
if (r.second) {
auto &it = r.first;
if (it->ptr != nullptr) {
ReadableSlice src(it->ptr, it->sz);
RETURN_IF_NOT_OK(WritableSlice::Copy(dest, src));
} else if (sm_ != nullptr) {
size_t expectedLength = 0;
RETURN_IF_NOT_OK(sm_->Read(it->storage_key, dest, &expectedLength));
if (expectedLength != it->sz) {
MS_LOG(ERROR) << "Unexpected length. Read " << expectedLength << ". Expected " << it->sz << "."
<< " Internal key: " << key << "\n";
RETURN_STATUS_UNEXPECTED("Length mismatch. See log file for details.");
}
}
if (bytesRead != nullptr) {
*bytesRead = it->sz;
}
} else {
RETURN_STATUS_UNEXPECTED("Key not found");
}
return Status::OK();
}
const CachePool::value_allocator &CachePool::get_allocator() const { return alloc_; }
Path CachePool::GetSpillPath() const {
auto spill = Path(root_) / subfolder_;
return spill;
}
CachePool::CacheStat CachePool::GetStat() const {
CacheStat cs{0};
for (auto &it : *tree_) {
if (it.ptr != nullptr) {
++cs.num_mem_cached;
} else {
++cs.num_disk_cached;
}
}
return cs;
}
Status CachePool::Spill(CachePool::DataLocator *dl) {
if (sm_ == nullptr) {
RETURN_STATUS_UNEXPECTED("No disk storage to spill");
}
RETURN_UNEXPECTED_IF_NULL(dl);
RETURN_UNEXPECTED_IF_NULL(dl->ptr);
if (dl->storage_key == 0) {
ReadableSlice data(dl->ptr, dl->sz);
RETURN_IF_NOT_OK(sm_->Write(&dl->storage_key, {data}));
}
alloc_.deallocate(dl->ptr, dl->sz);
dl->ptr = nullptr;
return Status::OK();
}
Status CachePool::Locate(CachePool::DataLocator *dl) {
RETURN_UNEXPECTED_IF_NULL(dl);
if (dl->ptr == nullptr) {
if (sm_ == nullptr) {
RETURN_STATUS_UNEXPECTED("No disk storage to locate the data");
}
try {
dl->ptr = alloc_.allocate(dl->sz);
WritableSlice dest(dl->ptr, dl->sz);
Status rc = Read(dl->storage_key, &dest);
if (rc.IsError()) {
alloc_.deallocate(dl->ptr, dl->sz);
dl->ptr = nullptr;
return rc;
}
} catch (const std::bad_alloc &e) {
return Status(StatusCode::kOutOfMemory, __LINE__, __FILE__);
}
}
return Status::OK();
}
size_t CachePool::GetSize(CachePool::key_type key) const {
auto r = tree_->Search(key);
if (r.second) {
auto &it = r.first;
return it->sz;
} else {
return 0;
}
}
} // namespace dataset
} // namespace mindspore
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DATASET_UTIL_CACHE_POOL_H_
#define DATASET_UTIL_CACHE_POOL_H_
#include <memory>
#include <mutex>
#include <string>
#include <vector>
#include "dataset/util/allocator.h"
#include "dataset/util/service.h"
#include "dataset/util/slice.h"
#include "dataset/util/storage_manager.h"
#include "dataset/util/auto_index.h"
namespace mindspore {
namespace dataset {
/// \brief A CachePool provides service for backup/restore a buffer. A buffer can be represented in a form of vector of
/// ReadableSlice where all memory blocks will be copied to one contiguous block which can be in memory or spilled to
/// disk (if a disk directory is provided). Every buffer insert will return a generated key which can be used to
/// restore the buffer.
/// \see ReadableSlice
class CachePool : public Service {
public:
using base_type = uint8_t;
using pointer = base_type *;
using const_pointer = const base_type *;
using reference = base_type &;
using const_reference = const base_type &;
using value_allocator = Allocator<base_type>;
// An internal class to locate the whereabouts of a backed up buffer which can be either in
class DataLocator {
public:
DataLocator() : ptr(nullptr), sz(0), storage_key(0) {}
~DataLocator() = default;
DataLocator(const DataLocator &other) = default;
DataLocator &operator=(const DataLocator &other) = default;
DataLocator(DataLocator &&other) noexcept {
ptr = other.ptr;
sz = other.sz;
storage_key = other.storage_key;
other.ptr = nullptr;
other.sz = 0;
other.storage_key = 0;
}
DataLocator &operator=(DataLocator &&other) noexcept {
if (&other != this) {
ptr = other.ptr;
sz = other.sz;
storage_key = other.storage_key;
other.ptr = nullptr;
other.sz = 0;
other.storage_key = 0;
}
return *this;
}
pointer ptr;
size_t sz;
StorageManager::key_type storage_key;
};
using data_index = AutoIndexObj<DataLocator>;
using key_type = data_index::key_type;
using bl_alloc_type = typename value_allocator::template rebind<DataLocator>::other;
/// \brief Simple statistics returned from CachePool like how many elements are cached in memory and
/// how many elements are spilled to disk.
struct CacheStat {
int64_t num_mem_cached;
int64_t num_disk_cached;
};
/// \brief Constructor
/// \param alloc Allocator to allocate memory from
/// \param root Optional disk folder to spill
explicit CachePool(const value_allocator &alloc, const std::string &root = "");
CachePool(const CachePool &) = delete;
CachePool(CachePool &&) = delete;
CachePool &operator=(const CachePool &) = delete;
CachePool &operator=(CachePool &&) = delete;
~CachePool() noexcept;
Status DoServiceStart() override;
Status DoServiceStop() override;
Path GetSpillPath() const;
/// \brief Insert a sequence of ReadableSlice objects into the pool.
/// All memory blocks will be consolidated into one contiguous block and be cached in either memory or on disk.
/// \param[in] buf A sequence of ReadableSlice objects.
/// \param[out] key Generated key
/// \return Error code
Status Insert(const std::vector<ReadableSlice> &buf, key_type *key);
/// \brief Restore a cached buffer (from memory or disk)
/// \param[in] key A previous key returned from Insert
/// \param[out] dest The cached buffer will be copied to this destination represented by a WritableSlice
/// \param[out] bytesRead Optional. Number of bytes read.
/// \return Error code
Status Read(key_type key, WritableSlice *dest, size_t *bytesRead = nullptr) const;
Status Spill(DataLocator *dl);
Status Locate(DataLocator *dl);
size_t GetSize(key_type key) const;
/// \brief Get statistics.
/// \return CacheStat object
CacheStat GetStat() const;
const value_allocator &get_allocator() const;
std::string MyName() const { return subfolder_; }
private:
value_allocator alloc_;
Path root_;
const std::string subfolder_;
std::shared_ptr<StorageManager> sm_;
std::shared_ptr<data_index> tree_;
};
} // namespace dataset
} // namespace mindspore
#endif
......@@ -106,6 +106,24 @@ struct List {
++count;
}
// Insert elem2 before elem1 in the list.
virtual void InsertBefore(pointer elem1, pointer elem2) {
DS_ASSERT(elem1 != elem2);
Node<T> &elem1_node = elem1->*node;
Node<T> &elem2_node = elem2->*node;
elem2_node.next = elem1;
elem2_node.prev = elem1_node.prev;
if (elem1_node.prev != nullptr) {
Node<T> &prev_node = elem1_node.prev->*node;
prev_node.next = elem2;
}
elem1_node.prev = elem2;
if (head == elem1) {
head = elem2;
}
++count;
}
// Remove an element in the list
virtual void Remove(pointer elem) noexcept {
Node<T> &elem_node = elem->*node;
......
......@@ -44,20 +44,6 @@ class MemoryPool {
virtual ~MemoryPool() {}
};
// Used by unique_ptr
template <typename T>
class Deleter {
public:
explicit Deleter(std::shared_ptr<MemoryPool> &mp) : mp_(mp) {}
~Deleter() = default;
void operator()(T *ptr) const { mp_->Deallocate(ptr); }
private:
std::shared_ptr<MemoryPool> mp_;
};
Status DeMalloc(std::size_t s, void **p, bool);
} // namespace dataset
} // namespace mindspore
......
......@@ -16,6 +16,8 @@
#include "dataset/util/path.h"
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <new>
#include <sstream>
#include <utility>
......@@ -26,7 +28,7 @@
namespace mindspore {
namespace dataset {
#ifdef _WIN32
#if defined(_WIN32) || defined(_WIN64)
char Path::separator_ = '\\';
#else
char Path::separator_ = '/';
......@@ -132,7 +134,7 @@ Status Path::CreateDirectory() {
#if defined(_WIN32) || defined(_WIN64)
int rc = mkdir(common::SafeCStr(path_));
#else
int rc = mkdir(common::SafeCStr(path_), 0700);
int rc = mkdir(common::SafeCStr(path_), S_IRUSR | S_IWUSR | S_IXUSR);
#endif
if (rc) {
std::ostringstream oss;
......@@ -182,6 +184,111 @@ Status Path::CreateDirectories() {
return Status::OK();
}
Status Path::Remove() {
if (Exists()) {
if (IsDirectory()) {
errno_t err = rmdir(common::SafeCStr(path_));
if (err == -1) {
std::ostringstream oss;
oss << "Unable to delete directory " << path_ << ". Errno = " << errno;
RETURN_STATUS_UNEXPECTED(oss.str());
}
} else {
errno_t err = unlink(common::SafeCStr(path_));
if (err == -1) {
std::ostringstream oss;
oss << "Unable to delete file " << path_ << ". Errno = " << errno;
RETURN_STATUS_UNEXPECTED(oss.str());
}
}
}
return Status::OK();
}
Status Path::CreateFile(int *file_descriptor) { return OpenFile(file_descriptor, true); }
Status Path::OpenFile(int *file_descriptor, bool create) {
int fd;
if (file_descriptor == nullptr) {
RETURN_STATUS_UNEXPECTED("null pointer");
}
if (IsDirectory()) {
std::ostringstream oss;
oss << "Unable to create file " << path_ << " which is a directory.";
RETURN_STATUS_UNEXPECTED(oss.str());
}
// Convert to canonical form.
if (strlen(common::SafeCStr(path_)) > PATH_MAX) {
RETURN_STATUS_UNEXPECTED(strerror(errno));
}
char canonical_path[PATH_MAX + 1] = {0x00};
#if defined(_WIN32) || defined(_WIN64)
if (_fullpath(canonical_path, common::SafeCStr(path_), PATH_MAX) == nullptr) {
#else
if (realpath(common::SafeCStr(path_), canonical_path) == nullptr) {
#endif
if (errno == ENOENT && create) {
// File doesn't exist and we are to create it. Let's break it down.
auto file_part = Basename();
auto parent_part = ParentPath();
#if defined(_WIN32) || defined(_WIN64)
if (_fullpath(canonical_path, common::SafeCStr(parent_part), PATH_MAX) == nullptr) {
#else
if (realpath(common::SafeCStr(parent_part), canonical_path) == nullptr) {
#endif
RETURN_STATUS_UNEXPECTED(strerror(errno));
}
auto cur_inx = strlen(canonical_path);
if ((cur_inx + file_part.length() + 1) > PATH_MAX) {
RETURN_STATUS_UNEXPECTED(strerror(errno));
}
canonical_path[cur_inx++] = separator_;
if (strncpy_s(canonical_path + cur_inx, PATH_MAX - cur_inx, common::SafeCStr(file_part), file_part.length()) !=
EOK) {
RETURN_STATUS_UNEXPECTED(strerror(errno));
}
} else {
RETURN_STATUS_UNEXPECTED(strerror(errno));
}
}
if (create) {
fd = open(canonical_path, O_CREAT | O_TRUNC | O_RDWR, S_IRUSR | S_IWUSR | S_IRGRP);
} else {
fd = open(canonical_path, O_RDWR);
}
if (fd == -1) {
RETURN_STATUS_UNEXPECTED(strerror(errno));
}
*file_descriptor = fd;
return Status::OK();
}
Status Path::CloseFile(int fd) const {
if (close(fd) < 0) {
RETURN_STATUS_UNEXPECTED(strerror(errno));
}
return Status::OK();
}
Status Path::TruncateFile(int fd) const {
int rc;
rc = ftruncate(fd, 0);
if (rc == 0) {
return Status::OK();
} else {
RETURN_STATUS_UNEXPECTED(strerror(errno));
}
}
std::string Path::Basename() {
std::size_t found = path_.find_last_of(separator_);
if (found != std::string::npos) {
return path_.substr(found + 1);
} else {
return path_;
}
}
std::shared_ptr<Path::DirIterator> Path::DirIterator::OpenDirectory(Path *f) {
auto it = new (std::nothrow) DirIterator(f);
......@@ -208,7 +315,7 @@ Path::DirIterator::~DirIterator() {
Path::DirIterator::DirIterator(Path *f) : dir_(f), dp_(nullptr), entry_(nullptr) {
MS_LOG(DEBUG) << "Open directory " << f->toString() << ".";
dp_ = opendir(common::SafeCStr(f->toString()));
dp_ = opendir(f->toString().c_str());
}
bool Path::DirIterator::hasNext() {
......@@ -225,5 +332,10 @@ bool Path::DirIterator::hasNext() {
}
Path Path::DirIterator::next() { return (*(this->dir_) / Path(entry_->d_name)); }
std::ostream &operator<<(std::ostream &os, const Path &s) {
os << s.path_;
return os;
}
} // namespace dataset
} // namespace mindspore
......@@ -90,6 +90,20 @@ class Path {
std::string ParentPath();
Status Remove();
Status CreateFile(int *fd);
Status OpenFile(int *fd, bool create = false);
Status CloseFile(int fd) const;
Status TruncateFile(int fd) const;
std::string Basename();
friend std::ostream &operator<<(std::ostream &os, const Path &s);
private:
static char separator_;
std::string path_;
......
/**
* Copyright 2019 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "dataset/util/semaphore.h"
#include "dataset/util/task_manager.h"
namespace mindspore {
namespace dataset {
Status Semaphore::P() {
std::unique_lock<std::mutex> lck(mutex_);
RETURN_IF_NOT_OK(wait_cond_.Wait(&lck, [this]() { return value_ > 0; }));
--value_;
return Status::OK();
}
void Semaphore::V() {
std::unique_lock<std::mutex> lck(mutex_);
++value_;
wait_cond_.NotifyOne();
}
int Semaphore::Peek() {
std::unique_lock<std::mutex> lck(mutex_);
return value_;
}
Status Semaphore::Register(TaskGroup *vg) { return wait_cond_.Register(vg->GetIntrpService()); }
Status Semaphore::Deregister() { return (wait_cond_.Deregister()); }
void Semaphore::ResetIntrpState() { wait_cond_.ResetIntrpState(); }
} // namespace dataset
} // namespace mindspore
/**
* Copyright 2019 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DATASET_UTIL_SEMAPHORE_H_
#define DATASET_UTIL_SEMAPHORE_H_
#include "dataset/util/cond_var.h"
namespace mindspore {
namespace dataset {
class TaskGroup;
/// \brief A counting semaphore. There are two external functions P and V. P decrements the internal count and will be
/// blocked if the count is 0 (zero). V increments the internal count and wake up one of the waiters.
class Semaphore {
public:
/// \brief Constructor
/// \param init Initial value of the internal counter.
explicit Semaphore(int init) : value_(init) {}
virtual ~Semaphore() {}
/// \brief Decrement the internal counter. Will be blocked if the value is 0.
/// \return Error code. Can get interrupt.
Status P();
/// \brief Increment the internal counter. Wakeup on of the watiers if any.
void V();
/// \brief Peek the internal value
/// \return The internal value
int Peek();
Status Register(TaskGroup *vg);
Status Deregister();
void ResetIntrpState();
private:
int value_;
std::mutex mutex_;
CondVar wait_cond_;
};
} // namespace dataset
} // namespace mindspore
#endif // DATASET_UTIL_SEMAPHORE_H_
/**
* Copyright 2019 Huawei Technologies Co., Ltd
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "dataset/util/slice.h"
namespace mindspore {
namespace dataset {
WritableSlice::WritableSlice(const WritableSlice &src, off64_t offset, size_t len) : ReadableSlice(src, offset, len) {
mutable_data_ = static_cast<char *>(src.mutable_data_) + offset;
}
WritableSlice::WritableSlice(const WritableSlice &src, off64_t offset)
: WritableSlice(src, offset, src.GetSize() - offset) {}
Status WritableSlice::Copy(WritableSlice *dest, const ReadableSlice &src) {
RETURN_UNEXPECTED_IF_NULL(dest);
RETURN_UNEXPECTED_IF_NULL(dest->GetMutablePointer());
if (dest->GetSize() <= 0) {
RETURN_STATUS_UNEXPECTED("Destination length is non-positive");
}
auto err = memcpy_s(dest->GetMutablePointer(), dest->GetSize(), src.GetPointer(), src.GetSize());
if (err) {
RETURN_STATUS_UNEXPECTED(std::to_string(err));
}
return Status::OK();
}
} // namespace dataset
} // namespace mindspore
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DATASET_UTIL_SLICE_H_
#define DATASET_UTIL_SLICE_H_
#include <unistd.h>
#include <cstddef>
#include <utility>
#include "./securec.h"
#include "dataset/util/allocator.h"
#include "dataset/util/status.h"
namespace mindspore {
namespace dataset {
/// \brief A ReadableSlice wraps a const pointer in memory and its size.
/// \see WritableSlice for a non-const version
///
class ReadableSlice {
public:
ReadableSlice() : ptr_(nullptr), sz_(0) {}
ReadableSlice(const void *ptr, size_t sz) : ptr_(ptr), sz_(sz) {}
ReadableSlice(const ReadableSlice &src, off64_t offset, size_t len) {
ptr_ = static_cast<const char *>(src.GetPointer()) + offset;
sz_ = len;
}
ReadableSlice(const ReadableSlice &src, off64_t offset) : ReadableSlice(src, offset, src.sz_ - offset) {}
ReadableSlice(const ReadableSlice &lhs) {
ptr_ = lhs.ptr_;
sz_ = lhs.sz_;
}
ReadableSlice &operator=(const ReadableSlice &lhs) {
if (this != &lhs) {
ptr_ = lhs.ptr_;
sz_ = lhs.sz_;
}
return *this;
}
ReadableSlice(ReadableSlice &&lhs) noexcept {
if (this != &lhs) {
ptr_ = lhs.ptr_;
sz_ = lhs.sz_;
lhs.ptr_ = nullptr;
lhs.sz_ = 0;
}
}
ReadableSlice &operator=(ReadableSlice &&lhs) noexcept {
if (this != &lhs) {
ptr_ = lhs.ptr_;
sz_ = lhs.sz_;
lhs.ptr_ = nullptr;
lhs.sz_ = 0;
}
return *this;
}
/// \brief Getter function
/// \return Const version of the pointer
const void *GetPointer() const { return ptr_; }
/// \brief Getter function
/// \return Size of the slice
size_t GetSize() const { return sz_; }
bool empty() const { return ptr_ == nullptr; }
private:
const void *ptr_;
size_t sz_;
};
/// \brief A WritableSlice inherits from ReadableSlice to allow
/// one to write to the address pointed to by the pointer.
///
class WritableSlice : public ReadableSlice {
public:
friend class StorageContainer;
/// \brief Default constructor
WritableSlice() : ReadableSlice(), mutable_data_(nullptr) {}
/// \brief This form of a constructor takes a pointer and its size.
WritableSlice(void *ptr, size_t sz) : ReadableSlice(ptr, sz), mutable_data_(ptr) {}
WritableSlice(const WritableSlice &src, off64_t offset, size_t len);
WritableSlice(const WritableSlice &src, off64_t offset);
WritableSlice(const WritableSlice &lhs) : ReadableSlice(lhs) { mutable_data_ = lhs.mutable_data_; }
WritableSlice &operator=(const WritableSlice &lhs) {
if (this != &lhs) {
mutable_data_ = lhs.mutable_data_;
ReadableSlice::operator=(lhs);
}
return *this;
}
WritableSlice(WritableSlice &&lhs) noexcept : ReadableSlice(std::move(lhs)) {
if (this != &lhs) {
mutable_data_ = lhs.mutable_data_;
lhs.mutable_data_ = nullptr;
}
}
WritableSlice &operator=(WritableSlice &&lhs) noexcept {
if (this != &lhs) {
mutable_data_ = lhs.mutable_data_;
lhs.mutable_data_ = nullptr;
ReadableSlice::operator=(std::move(lhs));
}
return *this;
}
/// \brief Copy the content from one slice onto another.
static Status Copy(WritableSlice *dest, const ReadableSlice &src);
private:
void *mutable_data_;
void *GetMutablePointer() { return mutable_data_; }
};
} // namespace dataset
} // namespace mindspore
#endif // DATASET_UTIL_SLICE_H_
/**
* Copyright 2019 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "dataset/util/storage_container.h"
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <vector>
#include "common/utils.h"
#include "dataset/util/de_error.h"
#include "dataset/util/path.h"
#include "dataset/util/status.h"
#include "utils/log_adapter.h"
namespace mindspore {
namespace dataset {
Status StorageContainer::Create() {
RETURN_IF_NOT_OK(BuddySpace::CreateBuddySpace(&bs_));
RETURN_IF_NOT_OK(cont_.CreateFile(&fd_));
is_open_ = true;
MS_LOG(INFO) << "Container " << cont_ << " created";
return Status::OK();
}
Status StorageContainer::Open() noexcept {
std::lock_guard<std::mutex> lck(mutex_);
// Check again
if (!is_open_) {
RETURN_IF_NOT_OK(cont_.OpenFile(&fd_));
is_open_ = true;
}
return Status::OK();
}
Status StorageContainer::Close() noexcept {
if (is_open_) {
std::lock_guard<std::mutex> lck(mutex_);
// Check again
if (is_open_) {
RETURN_IF_NOT_OK(cont_.CloseFile(fd_));
is_open_ = false;
fd_ = -1;
}
}
return Status::OK();
}
Status StorageContainer::Read(WritableSlice *dest, off64_t offset) const noexcept {
DS_ASSERT(is_open_);
RETURN_UNEXPECTED_IF_NULL(dest);
auto sz = dest->GetSize();
#if defined(_WIN32) || defined(_WIN64)
// Doesn't seem there is any pread64 on mingw.
// So we will do a seek and then a read under
// a protection of mutex.
std::lock_guard<std::mutex> lck(mutex_);
auto seek_err = lseek(fd_, offset, SEEK_SET);
if (seek_err < 0) {
RETURN_STATUS_UNEXPECTED(strerror(errno));
}
auto r_sz = read(fd_, dest->GetMutablePointer(), sz);
#else
auto r_sz = pread64(fd_, dest->GetMutablePointer(), sz, offset);
#endif
if (r_sz != sz) {
errno_t err = (r_sz == 0) ? EOF : errno;
RETURN_STATUS_UNEXPECTED(strerror(err));
}
return Status::OK();
}
Status StorageContainer::Write(const ReadableSlice &dest, off64_t offset) const noexcept {
DS_ASSERT(is_open_);
auto sz = dest.GetSize();
#if defined(_WIN32) || defined(_WIN64)
// Doesn't seem there is any pwrite64 on mingw.
// So we will do a seek and then a read under
// a protection of mutex.
std::lock_guard<std::mutex> lck(mutex_);
auto seek_err = lseek(fd_, offset, SEEK_SET);
if (seek_err < 0) {
RETURN_STATUS_UNEXPECTED(strerror(errno));
}
auto r_sz = write(fd_, dest.GetPointer(), sz);
#else
auto r_sz = pwrite64(fd_, dest.GetPointer(), sz, offset);
#endif
if (r_sz != sz) {
errno_t err = (r_sz == 0) ? EOF : errno;
RETURN_STATUS_UNEXPECTED(strerror(err));
}
return Status::OK();
}
Status StorageContainer::Insert(const std::vector<ReadableSlice> &buf, off64_t *offset) noexcept {
size_t sz = 0;
for (auto &v : buf) {
sz += v.GetSize();
}
if (sz == 0) {
RETURN_STATUS_UNEXPECTED("Unexpected 0 length");
}
if (sz > bs_->GetMaxSize()) {
RETURN_STATUS_UNEXPECTED("Request size too big");
}
BSpaceDescriptor bspd{0};
addr_t addr = 0;
RETURN_IF_NOT_OK(bs_->Alloc(sz, &bspd, &addr));
*offset = static_cast<off64_t>(addr);
// We will do piecewise copy of the data to disk.
for (auto &v : buf) {
RETURN_IF_NOT_OK(Write(v, addr));
addr += v.GetSize();
}
return Status::OK();
}
Status StorageContainer::Truncate() const noexcept {
if (is_open_) {
RETURN_IF_NOT_OK(cont_.TruncateFile(fd_));
MS_LOG(INFO) << "Container " << cont_ << " truncated";
}
return Status::OK();
}
StorageContainer::~StorageContainer() noexcept {
(void)Truncate();
(void)Close();
}
std::ostream &operator<<(std::ostream &os, const StorageContainer &s) {
os << "File path : " << s.cont_ << "\n" << *(s.bs_.get());
return os;
}
Status StorageContainer::CreateStorageContainer(std::shared_ptr<StorageContainer> *out_sc, const std::string &path) {
Status rc;
auto sc = new (std::nothrow) StorageContainer(path);
if (sc == nullptr) {
return Status(StatusCode::kOutOfMemory);
}
rc = sc->Create();
if (rc.IsOk()) {
(*out_sc).reset(sc);
} else {
delete sc;
}
return rc;
}
} // namespace dataset
} // namespace mindspore
/**
* Copyright 2019 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DATASET_UTIL_STORAGE_CONTAINER_H_
#define DATASET_UTIL_STORAGE_CONTAINER_H_
#include <limits.h>
#include <unistd.h>
#include <memory>
#include <mutex>
#include <string>
#include <vector>
#include "dataset/util/system_pool.h"
#include "dataset/util/buddy.h"
#include "dataset/util/path.h"
#include "dataset/util/slice.h"
#include "dataset/util/status.h"
namespace mindspore {
namespace dataset {
class StorageManager;
class StorageContainer {
public:
friend class StorageManager;
~StorageContainer() noexcept;
StorageContainer(const StorageContainer &) = delete;
StorageContainer &operator=(const StorageContainer &) = delete;
friend std::ostream &operator<<(std::ostream &os, const StorageContainer &s);
Status Open() noexcept;
Status Close() noexcept;
Status Insert(const std::vector<ReadableSlice> &buf, off64_t *offset) noexcept;
Status Write(const ReadableSlice &dest, off64_t offset) const noexcept;
Status Read(WritableSlice *dest, off64_t offset) const noexcept;
Status Truncate() const noexcept;
bool IsOpen() const { return is_open_; }
static Status CreateStorageContainer(std::shared_ptr<StorageContainer> *out_sc, const std::string &path);
private:
mutable std::mutex mutex_;
Path cont_;
int fd_;
bool is_open_;
std::unique_ptr<BuddySpace> bs_;
// Use the default value of BuddySpace
// which can map upto 4G of space.
explicit StorageContainer(const std::string &path) : cont_(path), fd_(-1), is_open_(false), bs_(nullptr) {}
Status Create();
};
} // namespace dataset
} // namespace mindspore
#endif // DATASET_UTIL_STORAGE_CONTAINER_H_
/**
* Copyright 2019 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "dataset/util/storage_manager.h"
#include <iomanip>
#include <sstream>
#include <stdexcept>
#include <utility>
#include "common/utils.h"
#include "dataset/util/path.h"
#include "dataset/util/services.h"
#include "dataset/util//de_error.h"
#include "utils/log_adapter.h"
namespace mindspore {
namespace dataset {
std::string StorageManager::GetBaseName(const std::string &prefix, int32_t file_id) {
std::ostringstream oss;
oss << prefix << std::setfill('0') << std::setw(5) << file_id;
return oss.str();
}
std::string StorageManager::ConstructFileName(const std::string &prefix, int32_t file_id, const std::string &suffix) {
std::string base_name = GetBaseName(prefix, file_id);
return (base_name + "." + suffix);
}
Status StorageManager::AddOneContainer() {
const std::string kPrefix = "IMG";
const std::string kSuffix = "LB";
Path container_name = root_ / ConstructFileName(kPrefix, file_id_, kSuffix);
std::shared_ptr<StorageContainer> sc;
RETURN_IF_NOT_OK(StorageContainer::CreateStorageContainer(&sc, container_name.toString()));
containers_.push_back(sc);
file_id_++;
return Status::OK();
}
Status StorageManager::DoServiceStart() {
containers_.reserve(1000);
if (root_.IsDirectory()) {
RETURN_IF_NOT_OK(AddOneContainer());
} else {
RETURN_STATUS_UNEXPECTED("Not a directory");
}
return Status::OK();
}
Status StorageManager::Write(key_type *key, const std::vector<ReadableSlice> &buf) {
RETURN_UNEXPECTED_IF_NULL(key);
size_t sz = 0;
for (auto &v : buf) {
sz += v.GetSize();
}
if (sz == 0) {
RETURN_STATUS_UNEXPECTED("Unexpected 0 length");
}
std::shared_ptr<StorageContainer> cont;
key_type out_key;
value_type out_value;
bool create_new_container = false;
do {
SharedLock lock_s(&rw_lock_);
size_t num_containers = containers_.size();
if (create_new_container) {
// Upgrade to exclusvie lock.
lock_s.Upgrade();
create_new_container = false;
// Check again if someone has already added a
// new container after we got the x lock
if (containers_.size() == num_containers) {
RETURN_IF_NOT_OK(AddOneContainer());
}
// Refresh how many containers there are.
num_containers = containers_.size();
// Downgrade back to shared lock
lock_s.Downgrade();
}
if (num_containers == 0) {
RETURN_STATUS_UNEXPECTED("num_containers is zero");
}
// Go to the last container to insert.
cont = containers_.at(num_containers - 1);
off64_t offset;
Status rc = cont->Insert(buf, &offset);
if (rc.IsNoSpace()) {
create_new_container = true;
} else if (rc.IsOk()) {
out_value = std::make_pair(num_containers - 1, std::make_pair(offset, sz));
RETURN_IF_NOT_OK(index_.insert(out_value, &out_key));
*key = out_key;
break;
} else {
return rc;
}
} while (true);
return Status::OK();
}
Status StorageManager::Read(StorageManager::key_type key, WritableSlice *dest, size_t *bytesRead) const {
RETURN_UNEXPECTED_IF_NULL(dest);
auto r = index_.Search(key);
if (r.second) {
auto &it = r.first;
value_type v = *it;
int container_inx = v.first;
off_t offset = v.second.first;
size_t sz = v.second.second;
if (dest->GetSize() < sz) {
std::string errMsg = "Destination buffer too small. Expect at least " + std::to_string(sz) +
" but length = " + std::to_string(dest->GetSize());
RETURN_STATUS_UNEXPECTED(errMsg);
}
if (bytesRead != nullptr) {
*bytesRead = sz;
}
auto cont = containers_.at(container_inx);
RETURN_IF_NOT_OK(cont->Read(dest, offset));
} else {
RETURN_STATUS_UNEXPECTED("Key not found");
}
return Status::OK();
}
Status StorageManager::DoServiceStop() noexcept {
Status rc;
Status rc1;
for (auto const &p : containers_) {
// The destructor of StorageContainer is not called automatically until the use
// count drops to 0. But it is not always the case. We will do it ourselves.
rc = p.get()->Truncate();
if (rc.IsError()) {
rc1 = rc;
}
}
containers_.clear();
file_id_ = 0;
return rc1;
}
StorageManager::StorageManager(const Path &root) : root_(root), file_id_(0), index_() {}
StorageManager::~StorageManager() { (void)StorageManager::DoServiceStop(); }
std::ostream &operator<<(std::ostream &os, const StorageManager &s) {
os << "Dumping all containers ..."
<< "\n";
for (auto const &p : s.containers_) {
os << *(p.get());
}
return os;
}
} // namespace dataset
} // namespace mindspore
/**
* Copyright 2019 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DATASET_UTIL_STORAGE_MANAGER_H_
#define DATASET_UTIL_STORAGE_MANAGER_H_
#include <unistd.h>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "dataset/util/allocator.h"
#include "dataset/util/auto_index.h"
#include "dataset/util/lock.h"
#include "dataset/util/memory_pool.h"
#include "dataset/util/path.h"
#include "dataset/util/service.h"
#include "dataset/util/slice.h"
#include "dataset/util/storage_container.h"
using ListOfContainers = std::vector<std::shared_ptr<mindspore::dataset::StorageContainer>>;
namespace mindspore {
namespace dataset {
class StorageManager : public Service {
public:
using storage_index = AutoIndexObj<std::pair<int, std::pair<off_t, size_t>>>;
using key_type = storage_index::key_type;
using value_type = storage_index::value_type;
explicit StorageManager(const Path &);
~StorageManager() override;
StorageManager(const StorageManager &) = delete;
StorageManager &operator=(const StorageManager &) = delete;
Status Write(key_type *out_key, const std::vector<ReadableSlice> &buf);
Status Read(key_type key, WritableSlice *dest, size_t *bytesRead) const;
Status DoServiceStart() override;
Status DoServiceStop() noexcept override;
friend std::ostream &operator<<(std::ostream &os, const StorageManager &s);
private:
Path root_;
ListOfContainers containers_;
int file_id_;
RWLock rw_lock_;
storage_index index_;
std::string GetBaseName(const std::string &prefix, int32_t file_id);
std::string ConstructFileName(const std::string &prefix, int32_t file_id, const std::string &suffix);
Status AddOneContainer();
};
} // namespace dataset
} // namespace mindspore
#endif // DATASET_UTIL_STORAGE_MANAGER_H_
......@@ -19,8 +19,10 @@
#include <cstddef>
#include <cstdlib>
#include <limits>
#include <memory>
#include <new>
#include "./securec.h"
#include "dataset/util/allocator.h"
#include "dataset/util/memory_pool.h"
namespace mindspore {
......@@ -61,6 +63,11 @@ class SystemPool : public MemoryPool {
uint64_t get_max_size() const override { return std::numeric_limits<uint64_t>::max(); }
int PercentFree() const override { return 100; }
template <typename T>
static Allocator<T> GetAllocator() {
return Allocator<T>(std::make_shared<SystemPool>());
}
};
} // namespace dataset
} // namespace mindspore
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册