diff --git a/mindspore/ccsrc/dataset/util/CMakeLists.txt b/mindspore/ccsrc/dataset/util/CMakeLists.txt index b36d612435aba228500cda25ea239daaf5eb424a..96489add071f2d8072eadfc4eac26a67f5c3ed2a 100644 --- a/mindspore/ccsrc/dataset/util/CMakeLists.txt +++ b/mindspore/ccsrc/dataset/util/CMakeLists.txt @@ -2,6 +2,8 @@ file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc" set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD) add_library(utils OBJECT arena.cc + buddy.cc + cache_pool.cc circular_pool.cc memory_pool.cc cond_var.cc @@ -11,7 +13,11 @@ add_library(utils OBJECT service.cc services.cc lock.cc + semaphore.cc status.cc + storage_container.cc + storage_manager.cc + slice.cc path.cc wait_post.cc sig_handler.cc) diff --git a/mindspore/ccsrc/dataset/util/allocator.h b/mindspore/ccsrc/dataset/util/allocator.h index ba6c7786df50ca3a19fd8694735a58c63406df0c..50a9cadbe3fb58cfe5a7e9f10e3c6ba104bbcdb5 100644 --- a/mindspore/ccsrc/dataset/util/allocator.h +++ b/mindspore/ccsrc/dataset/util/allocator.h @@ -17,8 +17,10 @@ #define DATASET_UTIL_ALLOCATOR_H_ #include +#include #include #include +#include #include "dataset/util/memory_pool.h" namespace mindspore { @@ -84,6 +86,91 @@ class Allocator { private: std::shared_ptr pool_; }; +/// \brief It is a wrapper of unique_ptr with a custom allocator and acts like std::lock_guard such that the memory will +/// be released when the object goes out of scope \tparam T The type of object to be allocated \tparam C Allocator. +/// Default to std::allocator +template > +class MemGuard { + public: + using allocator = C; + MemGuard() : n_(0) {} + explicit MemGuard(allocator a) : n_(0), alloc_(a) {} + // There is no copy constructor nor assignment operator because the memory is solely owned by this object. + MemGuard(const MemGuard &) = delete; + MemGuard &operator=(const MemGuard &) = delete; + // On the other hand, We can support move constructor + MemGuard(MemGuard &&lhs) noexcept : alloc_(std::move(lhs.alloc_)), ptr_(std::move(lhs.ptr_)), n_(lhs.n_) {} + MemGuard &operator=(MemGuard &&lhs) noexcept { + if (this != &lhs) { + this->deallocate(); + n_ = lhs.n_; + alloc_ = std::move(lhs.alloc_); + ptr_ = std::move(lhs.ptr_); + } + return *this; + } + /// \brief Explicitly deallocate the memory if allocated + void deallocate() { + if (ptr_) { + auto *p = ptr_.release(); + if (!std::is_arithmetic::value && std::is_destructible::value) { + for (auto i = 0; i < n_; ++i) { + p[i].~T(); + } + } + alloc_.deallocate(p, n_); + n_ = 0; + } + } + /// \brief Allocate memory (with emplace feature). Previous one will be released. If size is 0, no new memory is + /// allocated. + /// \param n Number of objects of type T to be allocated + /// \tparam Args Extra arguments pass to the constructor of T + template + Status allocate(size_t n, Args &&... args) noexcept { + try { + deallocate(); + if (n > 0) { + T *data = alloc_.allocate(n); + if (!std::is_arithmetic::value) { + for (auto i = 0; i < n; i++) { + std::allocator_traits::construct(alloc_, &(data[i]), std::forward(args)...); + } + } + ptr_ = std::unique_ptr(data); + n_ = n; + } + } catch (const std::bad_alloc &e) { + return Status(StatusCode::kOutOfMemory); + } catch (std::exception &e) { + RETURN_STATUS_UNEXPECTED(e.what()); + } + return Status::OK(); + } + ~MemGuard() noexcept { deallocate(); } + /// \brief Getter function + /// \return The pointer to the memory allocated + T *GetPointer() const { return ptr_.get(); } + /// \brief Getter function + /// \return The pointer to the memory allocated + T *GetMutablePointer() { return ptr_.get(); } + /// \brief Overload [] operator to access a particular element + /// \param x index to the element. Must be less than number of element allocated. + /// \return pointer to the x-th element + T *operator[](size_t x) { return GetMutablePointer() + x; } + /// \brief Overload [] operator to access a particular element + /// \param x index to the element. Must be less than number of element allocated. + /// \return pointer to the x-th element + T *operator[](size_t x) const { return GetPointer() + x; } + /// \brief Return how many bytes are allocated in total + /// \return Number of bytes allocated in total + size_t GetSizeInBytes() const { return n_ * sizeof(T); } + + private: + allocator alloc_; + std::unique_ptr> ptr_; + size_t n_; +}; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/util/buddy.cc b/mindspore/ccsrc/dataset/util/buddy.cc new file mode 100644 index 0000000000000000000000000000000000000000..3a14258419a7225a52a61a4f60e2027126c693f9 --- /dev/null +++ b/mindspore/ccsrc/dataset/util/buddy.cc @@ -0,0 +1,388 @@ +/** + * Copyright 2019 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "dataset/util/buddy.h" +#include +#include +#include "dataset/util/de_error.h" +#include "dataset/util/memory_pool.h" +#include "dataset/util/system_pool.h" +#include "./securec.h" + +inline uint64_t BitLeftShift(uint64_t v, uint64_t n) { return (v << n); } + +inline uint64_t BitRightShift(uint64_t v, uint64_t n) { return (v >> n); } + +inline uint64_t BitOr(uint64_t rhs, uint64_t lhs) { return rhs | lhs; } + +inline uint64_t BitEx(uint64_t rhs, uint64_t lhs) { return rhs ^ lhs; } + +inline uint64_t BitAnd(uint64_t rhs, uint64_t lhs) { return rhs & lhs; } + +namespace mindspore { +namespace dataset { +Status BuddySpace::Init() { + if (log_min_ < 0) { + return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, + "log_min must be positive : " + std::to_string(log_min_)); + } + if (num_lvl_ < 3 || num_lvl_ > 18) { + return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, + "num_lvl must be between 3 and 18 : " + std::to_string(num_lvl_)); + } + min_ = BitLeftShift(1, log_min_); + max_ = BitLeftShift(1, log_min_ + num_lvl_ - 1); + size_t offset_1 = sizeof(rel_addr_t) * num_lvl_; + size_t offset_2 = sizeof(int) * num_lvl_ + offset_1; + size_t offset_3 = sizeof(char) * BitLeftShift(1, num_lvl_ - 3) + offset_2; + RETURN_IF_NOT_OK(DeMalloc(offset_3, &ptr_, true)); + hint_ = reinterpret_cast(ptr_); + count_ = reinterpret_cast((reinterpret_cast(ptr_) + offset_1)); + map_ = reinterpret_cast(ptr_) + offset_2; + count_[num_lvl_ - 1] = 1; + map_[0] = BitOr(MORE_BIT, num_lvl_ - 3); + return Status::OK(); +} + +Status BuddySpace::Alloc(const uint64_t sz, BSpaceDescriptor *desc, addr_t *p) noexcept { + std::lock_guard lock(mutex_); + addr_t addr = AllocNoLock(sz, desc); + if (addr != NOSPACE) { + *p = addr; + return Status::OK(); + } else { + return Status(StatusCode::kNoSpace, "BuddySpace full. Not an error. Please ignore."); + } +} + +addr_t BuddySpace::AllocNoLock(const uint64_t sz, BSpaceDescriptor *desc) noexcept { + DS_ASSERT(sz <= max_); + uint32_t reqSize = SizeToBlock(sz); + rel_addr_t rel_addr = AllocBuddySeg(reqSize); + if (rel_addr != static_cast(NOSPACE)) { + (void)memset_s(desc, sizeof(BSpaceDescriptor), 0, sizeof(BSpaceDescriptor)); + desc->sig = static_cast(0xDEADBEEF); + desc->addr = rel_addr; + desc->req_size = reqSize; + desc->blk_size = NextPowerOf2(reqSize); + return static_cast(rel_addr * min_); + } else { + return NOSPACE; + } +} + +void BuddySpace::FreeNoLock(const BSpaceDescriptor *desc) { + DS_ASSERT(desc->sig == 0XDEADBEEF); + rel_addr_t rel_addr = desc->addr; + size_t blk_size = desc->blk_size; + size_t req_size = desc->req_size; + FreeBuddySeg(rel_addr, blk_size, req_size); +} + +void BuddySpace::Free(const BSpaceDescriptor *desc) { + std::lock_guard lock(mutex_); + return FreeNoLock(desc); +} + +std::ostream &operator<<(std::ostream &os, const BuddySpace &s) { + os << "1 unit = " << s.GetMinSize() << "\n" + << "Size of buddy space = " << s.GetMaxSize() << "\n" + << "Number of levels = " << s.num_lvl_ << "\n\n" + << "Percent free = " << s.PercentFree() << "\n" + << "Dumping count array : " + << "\n"; + for (int i = 0; i < s.num_lvl_; i++) { + os << "[" << i << "] = " << s.count_[i] << " "; + if (((i + 1) % 4) == 0) { + os << "\n"; + } + } + os << "\n"; + os << "Dumping allocation info:" + << "\n"; + auto max_addr = static_cast(BitLeftShift(1, s.num_lvl_ - 1)); + rel_addr_t addr = 0; + while (addr < max_addr) { + size_t sz = 0; + BuddySpace::STATE st; + s.GetBuddySegState(addr, &sz, &st); + os << "Address : " << std::left << std::setw(8) << addr << " Size : " << std::setw(8) << sz << " State : " + << ((st == BuddySpace::STATE::kAlloc) ? "ALLOC" : ((st == BuddySpace::STATE::kFree) ? "FREE" : "Unkonwn")) + << "\n"; + addr += sz; + } + return os; +} + +void BuddySpace::GetBuddySegState(const rel_addr_t rel_addr, size_t *rel_sz, STATE *st) const { + char byte; + int pos; + int offset; + uint64_t val = 0; + int shift; + pos = BitRightShift(rel_addr, 2); + offset = rel_addr % 4; + shift = offset * 2; + byte = map_[pos]; + switch (offset) { + case 0: + val = byte; + break; + case 1: + case 3: + if (offset == 1) { + val = BitLeftShift(BitAnd(byte, 0x30), shift); + } else { + val = BitLeftShift(BitAnd(byte, 0x03), shift); + } + break; + case 2: + val = BitLeftShift(BitAnd(byte, 0x0F), shift); + break; + } + if (BitAnd(val, ONE_BIT)) { + *rel_sz = 1; + } else if (BitAnd(val, TWO_BIT)) { + *rel_sz = 2; + } else if (BitAnd(val, MORE_BIT)) { + log_t lg = BitAnd(val, 0x0F); + *rel_sz = BitLeftShift(1, lg + 2); + } else { + *st = STATE::kEmpty; + return; + } + *st = BitAnd(val, ALLOC_BIT) ? STATE::kAlloc : STATE::kFree; +} + +void BuddySpace::SetBuddySegState(rel_addr_t rel_addr, size_t rel_sz, STATE st) { + int clr; + int mask; + int pos; + int offset; + int val = 0; + int shift; + auto log_sz = static_cast(Log2(rel_sz)); + pos = BitRightShift(rel_addr, 2); + offset = rel_addr % 4; + shift = offset * 2; + if (rel_sz == 1) { + val = ONE_BIT; + mask = 0xC0; + } else if (rel_sz == 2) { + val = TWO_BIT; + mask = 0xF0; + } else { + val = BitOr(log_sz - 2, MORE_BIT); + mask = 0xFF; + } + if (st == STATE::kAlloc) { + val = BitOr(val, ALLOC_BIT); + } else if (st == STATE::kFree) { + val = BitAnd(val, ~(static_cast(ALLOC_BIT))); + } else if (st == STATE::kEmpty) { + val = 0; + } + clr = static_cast(~(BitRightShift(mask, shift))); + map_[pos] = static_cast(BitAnd(map_[pos], clr)); + map_[pos] = static_cast(BitOr(map_[pos], BitRightShift(val, shift))); + if (st == STATE::kAlloc) { + count_[log_sz]--; + } else if (st == STATE::kFree) { + count_[log_sz]++; + if (rel_addr < hint_[log_sz]) { + hint_[log_sz] = rel_addr; + } + } +} + +void BuddySpace::JoinBuddySeg(rel_addr_t addr, size_t blk_sz) { + while (blk_sz < BitLeftShift(1, num_lvl_)) { + rel_addr_t buddy = BitEx(addr, blk_sz); + size_t sz = 0; + STATE st; + GetBuddySegState(buddy, &sz, &st); + if (st == STATE::kFree && sz == blk_sz) { + auto log_sz = static_cast(Log2(blk_sz)); + rel_addr_t left = (buddy < addr) ? buddy : addr; + rel_addr_t right = left + blk_sz; + DS_ASSERT(count_[log_sz] >= 2); + count_[log_sz] -= 2; + SetBuddySegState(right, blk_sz, STATE::kEmpty); + SetBuddySegState(left, BitLeftShift(blk_sz, 1), STATE::kFree); + for (int i = 0; i < log_sz; i++) { + if (hint_[i] == right) { + hint_[i] = left; + } + } + addr = left; + blk_sz <<= 1u; + } else { + break; + } + } +} + +void BuddySpace::TrimBuddySeg(rel_addr_t addr, size_t blk_sz, size_t ask_sz) { + DS_ASSERT(ask_sz < blk_sz); + uint32_t inx = Log2(blk_sz); + size_t remaining_sz = ask_sz; + for (int i = inx; i > 0; i--) { + size_t b_size = BitLeftShift(1, i); + size_t half_sz = BitRightShift(b_size, 1); + count_[i]--; + SetBuddySegState(addr, half_sz, STATE::kFree); + SetBuddySegState(addr + half_sz, half_sz, STATE::kFree); + if (remaining_sz >= half_sz) { + SetBuddySegState(addr, half_sz, STATE::kAlloc); + remaining_sz -= half_sz; + if (remaining_sz == 0) { + break; + } + addr += half_sz; + } + } +} + +void BuddySpace::UnTrimBuddySeg(rel_addr_t addr, size_t blk_sz, size_t ask_sz) { + DS_ASSERT(ask_sz < blk_sz); + uint32_t inx = Log2(blk_sz); + size_t remaining_sz = ask_sz; + for (int i = inx; i > 0; i--) { + size_t b_size = BitLeftShift(1, i); + size_t half_sz = BitRightShift(b_size, 1); + if (remaining_sz >= half_sz) { +#ifdef DEBUG + { + size_t sz = 0; + STATE st; + GetBuddySegState(addr, &sz, &st); + DS_ASSERT(sz == half_sz && st == STATE::kAlloc); + } +#endif + SetBuddySegState(addr, half_sz, STATE::kFree); + remaining_sz -= half_sz; + if (remaining_sz == 0) { + JoinBuddySeg(addr, half_sz); + break; + } + addr += half_sz; + } + } +} + +rel_addr_t BuddySpace::AllocBuddySeg(uint32_t req_size) noexcept { + uint32_t blk_size = NextPowerOf2(req_size); + int start_inx = static_cast(Log2(blk_size)); + bool found = false; + rel_addr_t ask_addr = 0; + auto max_addr = static_cast(BitLeftShift(1, num_lvl_ - 1)); + STATE st; + size_t sz = 0; + for (int i = start_inx; !found && i < num_lvl_; i++) { + DS_ASSERT(count_[i] >= 0); + if (count_[i] == 0) { + continue; + } + auto blk_sz = static_cast(BitLeftShift(1, i)); + ask_addr = hint_[i]; + while (ask_addr < max_addr && !found) { + GetBuddySegState(ask_addr, &sz, &st); + if (st == STATE::kFree && sz == blk_sz) { + found = true; + } else { + DS_ASSERT(st != STATE::kEmpty); + ask_addr += ((sz > blk_sz) ? sz : blk_sz); + } + } + } + if (found) { + if (sz > req_size) { + TrimBuddySeg(ask_addr, sz, req_size); + } else { + SetBuddySegState(ask_addr, sz, STATE::kAlloc); + hint_[start_inx] = ask_addr; + } + return ask_addr; + } else { + return static_cast(NOSPACE); + } +} + +void BuddySpace::FreeBuddySeg(rel_addr_t addr, size_t blk_size, size_t req_size) { + if (req_size == blk_size) { +#ifdef DEBUG + { + size_t sz = 0; + STATE st; + GetBuddySegState(addr, &sz, &st); + } +#endif + SetBuddySegState(addr, blk_size, STATE::kFree); + JoinBuddySeg(addr, blk_size); + } else { + UnTrimBuddySeg(addr, blk_size, req_size); + } +} + +int BuddySpace::PercentFree() const { + uint64_t total_free_sz = 0; + uint64_t max_sz_in_unit = BitLeftShift(1, num_lvl_ - 1); + // Go through the count array without lock + for (int i = 0; i < num_lvl_; i++) { + int cnt = count_[i]; + if (cnt == 0) { + continue; + } + uint64_t blk_sz = BitLeftShift(1, i); + total_free_sz += (blk_sz * cnt); + } + return static_cast(static_cast(total_free_sz) / static_cast(max_sz_in_unit) * 100); +} + +BuddySpace::BuddySpace(int log_min, int num_lvl) + : hint_(nullptr), + count_(nullptr), + map_(nullptr), + log_min_(log_min), + num_lvl_(num_lvl), + min_(0), + max_(0), + ptr_(nullptr) {} + +BuddySpace::~BuddySpace() { + if (ptr_ != nullptr) { + free(ptr_); + } + hint_ = nullptr; + count_ = nullptr; + map_ = nullptr; +} + +Status BuddySpace::CreateBuddySpace(std::unique_ptr *out_bs, int log_min, int num_lvl) { + Status rc; + auto bs = new (std::nothrow) BuddySpace(log_min, num_lvl); + if (bs == nullptr) { + return Status(StatusCode::kOutOfMemory); + } + rc = bs->Init(); + if (rc.IsOk()) { + (*out_bs).reset(bs); + } else { + delete bs; + } + return rc; +} +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/dataset/util/buddy.h b/mindspore/ccsrc/dataset/util/buddy.h new file mode 100644 index 0000000000000000000000000000000000000000..08c05cbbdbe3808a944c95e9297ce9a3f78d185c --- /dev/null +++ b/mindspore/ccsrc/dataset/util/buddy.h @@ -0,0 +1,133 @@ +/** + * Copyright 2019 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef DATASET_UTIL_BUDDY_H_ +#define DATASET_UTIL_BUDDY_H_ + +#include +#include +#include +#include +#include +#include +#include "dataset/util/status.h" + +using addr_t = int64_t; +using rel_addr_t = int32_t; +using log_t = int; +#define ALLOC_BIT 0x80 +#define ONE_BIT 0x40 +#define TWO_BIT 0x20 +#define MORE_BIT 0x10 +#define NOSPACE ((addr_t)(-1)) +namespace mindspore { +namespace dataset { +struct BSpaceDescriptor { + int32_t sig; + rel_addr_t addr; + size_t req_size; + size_t blk_size; +}; + +class BuddySpace { + public: + // C++11 feature. Change STATE into a type safe class with + // the keyword. Don't take out the keyword 'class' + enum class STATE { kFree, kAlloc, kEmpty }; + + BuddySpace(const BuddySpace &) = delete; + + BuddySpace &operator=(const BuddySpace &) = delete; + + virtual ~BuddySpace(); + + Status Alloc(uint64_t sz, BSpaceDescriptor *desc, addr_t *) noexcept; + + void Free(const BSpaceDescriptor *desc); + + uint64_t GetMinSize() const { return min_; } + + uint64_t GetMaxSize() const { return max_; } + + int PercentFree() const; + + friend std::ostream &operator<<(std::ostream &os, const BuddySpace &s); + + static uint64_t NextPowerOf2(uint64_t n) { + if (n <= 1) { + return 1; + } + n = n - 1; + while (n & (n - 1)) { + n = n & (n - 1); + } + return n << 1; + } + + static uint32_t Log2(uint64_t n) { + uint32_t cnt = 0; + while (n >>= 1) { + cnt++; + } + return cnt; + } + + static Status CreateBuddySpace(std::unique_ptr *out_bs, int log_min = 15, int num_lvl = 18); + + private: + rel_addr_t *hint_; + int *count_; + char *map_; + int log_min_; + int num_lvl_; + uint64_t min_; + uint64_t max_; + void *ptr_; + std::mutex mutex_; + + explicit BuddySpace(int log_min = 15, int num_lvl = 18); + + Status Init(); + + addr_t AllocNoLock(const uint64_t sz, BSpaceDescriptor *desc) noexcept; + + void FreeNoLock(const BSpaceDescriptor *desc); + + uint32_t SizeToBlock(const uint64_t sz) const { + uint32_t reqSize = (sz / min_); + if (sz % min_) { + reqSize++; + } + return reqSize; + } + + void GetBuddySegState(const rel_addr_t rel_addr, size_t *rel_sz, STATE *st) const; + + void SetBuddySegState(rel_addr_t rel_addr, size_t rel_sz, STATE st); + + void JoinBuddySeg(rel_addr_t addr, size_t blk_sz); + + void TrimBuddySeg(rel_addr_t addr, size_t blk_sz, size_t ask_sz); + + void UnTrimBuddySeg(rel_addr_t addr, size_t blk_sz, size_t ask_sz); + + rel_addr_t AllocBuddySeg(uint32_t req_size) noexcept; + + void FreeBuddySeg(rel_addr_t addr, size_t blk_size, size_t req_size); +}; +} // namespace dataset +} // namespace mindspore + +#endif // DATASET_UTIL_BUDDY_H_ diff --git a/mindspore/ccsrc/dataset/util/cache_pool.cc b/mindspore/ccsrc/dataset/util/cache_pool.cc new file mode 100644 index 0000000000000000000000000000000000000000..92504cd06344e1878cc888639089b6ddd904764d --- /dev/null +++ b/mindspore/ccsrc/dataset/util/cache_pool.cc @@ -0,0 +1,202 @@ +/** + * Copyright 2019 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include "common/utils.h" +#include "dataset/util/cache_pool.h" +#include "dataset/util/services.h" + +namespace mindspore { +namespace dataset { +CachePool::CachePool(const value_allocator &alloc, const std::string &root) + : alloc_(alloc), root_(root), subfolder_(Services::GetUniqueID()), sm_(nullptr), tree_(nullptr) {} + +Status CachePool::DoServiceStart() { + tree_ = std::make_shared(); + // If we are given a disk path, set up the StorageManager + if (!root_.toString().empty()) { + Path spill = GetSpillPath(); + RETURN_IF_NOT_OK(spill.CreateDirectories()); + sm_ = std::make_shared(spill); + RETURN_IF_NOT_OK(sm_->ServiceStart()); + MS_LOG(INFO) << "CachePool will use disk folder: " << common::SafeCStr(spill.toString()); + } + return Status::OK(); +} +Status CachePool::DoServiceStop() { + Status rc; + Status rc2; + if (sm_ != nullptr) { + rc = sm_->ServiceStop(); + if (rc.IsError()) { + rc2 = rc; + } + } + sm_.reset(); + for (auto &bl : *tree_) { + if (bl.ptr != nullptr) { + alloc_.deallocate(bl.ptr, bl.sz); + } + } + tree_.reset(); + if (!root_.toString().empty()) { + Path spill = GetSpillPath(); + auto it = Path::DirIterator::OpenDirectory(&spill); + while (it->hasNext()) { + rc = it->next().Remove(); + if (rc.IsError() && rc2.IsOk()) { + rc2 = rc; + } + } + rc = spill.Remove(); + if (rc.IsError() && rc2.IsOk()) { + rc2 = rc; + } + } + return rc2; +} +CachePool::~CachePool() noexcept { (void)ServiceStop(); } +Status CachePool::Insert(const std::vector &buf, CachePool::key_type *key) { + DataLocator bl; + Status rc; + size_t sz = 0; + // We will consolidate all the slices into one piece. + for (auto &v : buf) { + sz += v.GetSize(); + } + bl.sz = sz; + try { + bl.ptr = alloc_.allocate(sz); + // We will do a piecewise copy. + WritableSlice dest(bl.ptr, bl.sz); + size_t pos = 0; + for (auto &v : buf) { + WritableSlice out(dest, pos); + rc = WritableSlice::Copy(&out, v); + if (rc.IsError()) { + break; + } + pos += v.GetSize(); + } + if (rc.IsError()) { + alloc_.deallocate(bl.ptr, sz); + bl.ptr = nullptr; + return rc; + } + } catch (std::bad_alloc &e) { + if (sm_ != nullptr) { + RETURN_IF_NOT_OK(sm_->Write(&bl.storage_key, buf)); + // We have an assumption 0 is not a valid key from the design of AutoIndexObj. + // Make sure it is not 0. + if (bl.storage_key == 0) { + RETURN_STATUS_UNEXPECTED("Key 0 is returned which is unexpected"); + } + } else { + return Status(StatusCode::kOutOfMemory, __LINE__, __FILE__); + } + } + rc = tree_->insert(bl, key); + if (rc.IsError() && bl.ptr != nullptr) { + alloc_.deallocate(bl.ptr, sz); + } + return rc; +} +Status CachePool::Read(CachePool::key_type key, WritableSlice *dest, size_t *bytesRead) const { + RETURN_UNEXPECTED_IF_NULL(dest); + auto r = tree_->Search(key); + if (r.second) { + auto &it = r.first; + if (it->ptr != nullptr) { + ReadableSlice src(it->ptr, it->sz); + RETURN_IF_NOT_OK(WritableSlice::Copy(dest, src)); + } else if (sm_ != nullptr) { + size_t expectedLength = 0; + RETURN_IF_NOT_OK(sm_->Read(it->storage_key, dest, &expectedLength)); + if (expectedLength != it->sz) { + MS_LOG(ERROR) << "Unexpected length. Read " << expectedLength << ". Expected " << it->sz << "." + << " Internal key: " << key << "\n"; + RETURN_STATUS_UNEXPECTED("Length mismatch. See log file for details."); + } + } + if (bytesRead != nullptr) { + *bytesRead = it->sz; + } + } else { + RETURN_STATUS_UNEXPECTED("Key not found"); + } + return Status::OK(); +} +const CachePool::value_allocator &CachePool::get_allocator() const { return alloc_; } +Path CachePool::GetSpillPath() const { + auto spill = Path(root_) / subfolder_; + return spill; +} +CachePool::CacheStat CachePool::GetStat() const { + CacheStat cs{0}; + for (auto &it : *tree_) { + if (it.ptr != nullptr) { + ++cs.num_mem_cached; + } else { + ++cs.num_disk_cached; + } + } + return cs; +} +Status CachePool::Spill(CachePool::DataLocator *dl) { + if (sm_ == nullptr) { + RETURN_STATUS_UNEXPECTED("No disk storage to spill"); + } + RETURN_UNEXPECTED_IF_NULL(dl); + RETURN_UNEXPECTED_IF_NULL(dl->ptr); + if (dl->storage_key == 0) { + ReadableSlice data(dl->ptr, dl->sz); + RETURN_IF_NOT_OK(sm_->Write(&dl->storage_key, {data})); + } + alloc_.deallocate(dl->ptr, dl->sz); + dl->ptr = nullptr; + return Status::OK(); +} +Status CachePool::Locate(CachePool::DataLocator *dl) { + RETURN_UNEXPECTED_IF_NULL(dl); + if (dl->ptr == nullptr) { + if (sm_ == nullptr) { + RETURN_STATUS_UNEXPECTED("No disk storage to locate the data"); + } + try { + dl->ptr = alloc_.allocate(dl->sz); + WritableSlice dest(dl->ptr, dl->sz); + Status rc = Read(dl->storage_key, &dest); + if (rc.IsError()) { + alloc_.deallocate(dl->ptr, dl->sz); + dl->ptr = nullptr; + return rc; + } + } catch (const std::bad_alloc &e) { + return Status(StatusCode::kOutOfMemory, __LINE__, __FILE__); + } + } + return Status::OK(); +} +size_t CachePool::GetSize(CachePool::key_type key) const { + auto r = tree_->Search(key); + if (r.second) { + auto &it = r.first; + return it->sz; + } else { + return 0; + } +} +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/dataset/util/cache_pool.h b/mindspore/ccsrc/dataset/util/cache_pool.h new file mode 100644 index 0000000000000000000000000000000000000000..d35617d0e4b679d50bc1e83cc2fe974590376368 --- /dev/null +++ b/mindspore/ccsrc/dataset/util/cache_pool.h @@ -0,0 +1,139 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef DATASET_UTIL_CACHE_POOL_H_ +#define DATASET_UTIL_CACHE_POOL_H_ + +#include +#include +#include +#include +#include "dataset/util/allocator.h" +#include "dataset/util/service.h" +#include "dataset/util/slice.h" +#include "dataset/util/storage_manager.h" +#include "dataset/util/auto_index.h" + +namespace mindspore { +namespace dataset { +/// \brief A CachePool provides service for backup/restore a buffer. A buffer can be represented in a form of vector of +/// ReadableSlice where all memory blocks will be copied to one contiguous block which can be in memory or spilled to +/// disk (if a disk directory is provided). Every buffer insert will return a generated key which can be used to +/// restore the buffer. +/// \see ReadableSlice +class CachePool : public Service { + public: + using base_type = uint8_t; + using pointer = base_type *; + using const_pointer = const base_type *; + using reference = base_type &; + using const_reference = const base_type &; + using value_allocator = Allocator; + + // An internal class to locate the whereabouts of a backed up buffer which can be either in + class DataLocator { + public: + DataLocator() : ptr(nullptr), sz(0), storage_key(0) {} + ~DataLocator() = default; + DataLocator(const DataLocator &other) = default; + DataLocator &operator=(const DataLocator &other) = default; + DataLocator(DataLocator &&other) noexcept { + ptr = other.ptr; + sz = other.sz; + storage_key = other.storage_key; + other.ptr = nullptr; + other.sz = 0; + other.storage_key = 0; + } + DataLocator &operator=(DataLocator &&other) noexcept { + if (&other != this) { + ptr = other.ptr; + sz = other.sz; + storage_key = other.storage_key; + other.ptr = nullptr; + other.sz = 0; + other.storage_key = 0; + } + return *this; + } + pointer ptr; + size_t sz; + StorageManager::key_type storage_key; + }; + + using data_index = AutoIndexObj; + using key_type = data_index::key_type; + using bl_alloc_type = typename value_allocator::template rebind::other; + + /// \brief Simple statistics returned from CachePool like how many elements are cached in memory and + /// how many elements are spilled to disk. + struct CacheStat { + int64_t num_mem_cached; + int64_t num_disk_cached; + }; + + /// \brief Constructor + /// \param alloc Allocator to allocate memory from + /// \param root Optional disk folder to spill + explicit CachePool(const value_allocator &alloc, const std::string &root = ""); + + CachePool(const CachePool &) = delete; + CachePool(CachePool &&) = delete; + CachePool &operator=(const CachePool &) = delete; + CachePool &operator=(CachePool &&) = delete; + ~CachePool() noexcept; + + Status DoServiceStart() override; + Status DoServiceStop() override; + + Path GetSpillPath() const; + + /// \brief Insert a sequence of ReadableSlice objects into the pool. + /// All memory blocks will be consolidated into one contiguous block and be cached in either memory or on disk. + /// \param[in] buf A sequence of ReadableSlice objects. + /// \param[out] key Generated key + /// \return Error code + Status Insert(const std::vector &buf, key_type *key); + /// \brief Restore a cached buffer (from memory or disk) + /// \param[in] key A previous key returned from Insert + /// \param[out] dest The cached buffer will be copied to this destination represented by a WritableSlice + /// \param[out] bytesRead Optional. Number of bytes read. + /// \return Error code + Status Read(key_type key, WritableSlice *dest, size_t *bytesRead = nullptr) const; + + Status Spill(DataLocator *dl); + + Status Locate(DataLocator *dl); + + size_t GetSize(key_type key) const; + + /// \brief Get statistics. + /// \return CacheStat object + CacheStat GetStat() const; + + const value_allocator &get_allocator() const; + + std::string MyName() const { return subfolder_; } + + private: + value_allocator alloc_; + Path root_; + const std::string subfolder_; + std::shared_ptr sm_; + std::shared_ptr tree_; +}; +} // namespace dataset +} // namespace mindspore +#endif diff --git a/mindspore/ccsrc/dataset/util/list.h b/mindspore/ccsrc/dataset/util/list.h index 5a08f4514e5d9f8afd2007cc359306efb10d16ad..a4c15daa0e44ed9cc40844f83f3458182695c0cd 100644 --- a/mindspore/ccsrc/dataset/util/list.h +++ b/mindspore/ccsrc/dataset/util/list.h @@ -106,6 +106,24 @@ struct List { ++count; } + // Insert elem2 before elem1 in the list. + virtual void InsertBefore(pointer elem1, pointer elem2) { + DS_ASSERT(elem1 != elem2); + Node &elem1_node = elem1->*node; + Node &elem2_node = elem2->*node; + elem2_node.next = elem1; + elem2_node.prev = elem1_node.prev; + if (elem1_node.prev != nullptr) { + Node &prev_node = elem1_node.prev->*node; + prev_node.next = elem2; + } + elem1_node.prev = elem2; + if (head == elem1) { + head = elem2; + } + ++count; + } + // Remove an element in the list virtual void Remove(pointer elem) noexcept { Node &elem_node = elem->*node; diff --git a/mindspore/ccsrc/dataset/util/memory_pool.h b/mindspore/ccsrc/dataset/util/memory_pool.h index 70876a81417141a3a517a62c0a2b30bb3e21ba80..ee1da3bda151b735d97d044d1db3977028dd9165 100644 --- a/mindspore/ccsrc/dataset/util/memory_pool.h +++ b/mindspore/ccsrc/dataset/util/memory_pool.h @@ -44,20 +44,6 @@ class MemoryPool { virtual ~MemoryPool() {} }; -// Used by unique_ptr -template -class Deleter { - public: - explicit Deleter(std::shared_ptr &mp) : mp_(mp) {} - - ~Deleter() = default; - - void operator()(T *ptr) const { mp_->Deallocate(ptr); } - - private: - std::shared_ptr mp_; -}; - Status DeMalloc(std::size_t s, void **p, bool); } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/util/path.cc b/mindspore/ccsrc/dataset/util/path.cc index c37fdc17f1d4b4fa5d4c347b07acdd544fee1ec6..59e5e5232c5e29dd87b2a6c3f3d8b0a51b27a74f 100644 --- a/mindspore/ccsrc/dataset/util/path.cc +++ b/mindspore/ccsrc/dataset/util/path.cc @@ -16,6 +16,8 @@ #include "dataset/util/path.h" #include +#include +#include #include #include #include @@ -26,7 +28,7 @@ namespace mindspore { namespace dataset { -#ifdef _WIN32 +#if defined(_WIN32) || defined(_WIN64) char Path::separator_ = '\\'; #else char Path::separator_ = '/'; @@ -132,7 +134,7 @@ Status Path::CreateDirectory() { #if defined(_WIN32) || defined(_WIN64) int rc = mkdir(common::SafeCStr(path_)); #else - int rc = mkdir(common::SafeCStr(path_), 0700); + int rc = mkdir(common::SafeCStr(path_), S_IRUSR | S_IWUSR | S_IXUSR); #endif if (rc) { std::ostringstream oss; @@ -182,6 +184,111 @@ Status Path::CreateDirectories() { return Status::OK(); } +Status Path::Remove() { + if (Exists()) { + if (IsDirectory()) { + errno_t err = rmdir(common::SafeCStr(path_)); + if (err == -1) { + std::ostringstream oss; + oss << "Unable to delete directory " << path_ << ". Errno = " << errno; + RETURN_STATUS_UNEXPECTED(oss.str()); + } + } else { + errno_t err = unlink(common::SafeCStr(path_)); + if (err == -1) { + std::ostringstream oss; + oss << "Unable to delete file " << path_ << ". Errno = " << errno; + RETURN_STATUS_UNEXPECTED(oss.str()); + } + } + } + return Status::OK(); +} + +Status Path::CreateFile(int *file_descriptor) { return OpenFile(file_descriptor, true); } + +Status Path::OpenFile(int *file_descriptor, bool create) { + int fd; + if (file_descriptor == nullptr) { + RETURN_STATUS_UNEXPECTED("null pointer"); + } + if (IsDirectory()) { + std::ostringstream oss; + oss << "Unable to create file " << path_ << " which is a directory."; + RETURN_STATUS_UNEXPECTED(oss.str()); + } + // Convert to canonical form. + if (strlen(common::SafeCStr(path_)) > PATH_MAX) { + RETURN_STATUS_UNEXPECTED(strerror(errno)); + } + char canonical_path[PATH_MAX + 1] = {0x00}; +#if defined(_WIN32) || defined(_WIN64) + if (_fullpath(canonical_path, common::SafeCStr(path_), PATH_MAX) == nullptr) { +#else + if (realpath(common::SafeCStr(path_), canonical_path) == nullptr) { +#endif + if (errno == ENOENT && create) { + // File doesn't exist and we are to create it. Let's break it down. + auto file_part = Basename(); + auto parent_part = ParentPath(); +#if defined(_WIN32) || defined(_WIN64) + if (_fullpath(canonical_path, common::SafeCStr(parent_part), PATH_MAX) == nullptr) { +#else + if (realpath(common::SafeCStr(parent_part), canonical_path) == nullptr) { +#endif + RETURN_STATUS_UNEXPECTED(strerror(errno)); + } + auto cur_inx = strlen(canonical_path); + if ((cur_inx + file_part.length() + 1) > PATH_MAX) { + RETURN_STATUS_UNEXPECTED(strerror(errno)); + } + canonical_path[cur_inx++] = separator_; + if (strncpy_s(canonical_path + cur_inx, PATH_MAX - cur_inx, common::SafeCStr(file_part), file_part.length()) != + EOK) { + RETURN_STATUS_UNEXPECTED(strerror(errno)); + } + } else { + RETURN_STATUS_UNEXPECTED(strerror(errno)); + } + } + if (create) { + fd = open(canonical_path, O_CREAT | O_TRUNC | O_RDWR, S_IRUSR | S_IWUSR | S_IRGRP); + } else { + fd = open(canonical_path, O_RDWR); + } + if (fd == -1) { + RETURN_STATUS_UNEXPECTED(strerror(errno)); + } + *file_descriptor = fd; + return Status::OK(); +} + +Status Path::CloseFile(int fd) const { + if (close(fd) < 0) { + RETURN_STATUS_UNEXPECTED(strerror(errno)); + } + return Status::OK(); +} + +Status Path::TruncateFile(int fd) const { + int rc; + rc = ftruncate(fd, 0); + if (rc == 0) { + return Status::OK(); + } else { + RETURN_STATUS_UNEXPECTED(strerror(errno)); + } +} + +std::string Path::Basename() { + std::size_t found = path_.find_last_of(separator_); + if (found != std::string::npos) { + return path_.substr(found + 1); + } else { + return path_; + } +} + std::shared_ptr Path::DirIterator::OpenDirectory(Path *f) { auto it = new (std::nothrow) DirIterator(f); @@ -208,7 +315,7 @@ Path::DirIterator::~DirIterator() { Path::DirIterator::DirIterator(Path *f) : dir_(f), dp_(nullptr), entry_(nullptr) { MS_LOG(DEBUG) << "Open directory " << f->toString() << "."; - dp_ = opendir(common::SafeCStr(f->toString())); + dp_ = opendir(f->toString().c_str()); } bool Path::DirIterator::hasNext() { @@ -225,5 +332,10 @@ bool Path::DirIterator::hasNext() { } Path Path::DirIterator::next() { return (*(this->dir_) / Path(entry_->d_name)); } + +std::ostream &operator<<(std::ostream &os, const Path &s) { + os << s.path_; + return os; +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/util/path.h b/mindspore/ccsrc/dataset/util/path.h index efe01a7d16cbebff7facc8c63da8e08734683e45..fbf65b8c236164533eecfa7aa2fa5cf4c087ff7d 100644 --- a/mindspore/ccsrc/dataset/util/path.h +++ b/mindspore/ccsrc/dataset/util/path.h @@ -90,6 +90,20 @@ class Path { std::string ParentPath(); + Status Remove(); + + Status CreateFile(int *fd); + + Status OpenFile(int *fd, bool create = false); + + Status CloseFile(int fd) const; + + Status TruncateFile(int fd) const; + + std::string Basename(); + + friend std::ostream &operator<<(std::ostream &os, const Path &s); + private: static char separator_; std::string path_; diff --git a/mindspore/ccsrc/dataset/util/semaphore.cc b/mindspore/ccsrc/dataset/util/semaphore.cc new file mode 100644 index 0000000000000000000000000000000000000000..36ddf5511d9316a6b822a0d05fb56dfd1f69d813 --- /dev/null +++ b/mindspore/ccsrc/dataset/util/semaphore.cc @@ -0,0 +1,41 @@ +/** + * Copyright 2019 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "dataset/util/semaphore.h" +#include "dataset/util/task_manager.h" + +namespace mindspore { +namespace dataset { +Status Semaphore::P() { + std::unique_lock lck(mutex_); + RETURN_IF_NOT_OK(wait_cond_.Wait(&lck, [this]() { return value_ > 0; })); + --value_; + return Status::OK(); +} +void Semaphore::V() { + std::unique_lock lck(mutex_); + ++value_; + wait_cond_.NotifyOne(); +} +int Semaphore::Peek() { + std::unique_lock lck(mutex_); + return value_; +} +Status Semaphore::Register(TaskGroup *vg) { return wait_cond_.Register(vg->GetIntrpService()); } +Status Semaphore::Deregister() { return (wait_cond_.Deregister()); } +void Semaphore::ResetIntrpState() { wait_cond_.ResetIntrpState(); } + +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/dataset/util/semaphore.h b/mindspore/ccsrc/dataset/util/semaphore.h new file mode 100644 index 0000000000000000000000000000000000000000..07b9e83e7fbc803510bcc77594cea75402049995 --- /dev/null +++ b/mindspore/ccsrc/dataset/util/semaphore.h @@ -0,0 +1,54 @@ +/** + * Copyright 2019 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef DATASET_UTIL_SEMAPHORE_H_ +#define DATASET_UTIL_SEMAPHORE_H_ + +#include "dataset/util/cond_var.h" + +namespace mindspore { +namespace dataset { +class TaskGroup; + +/// \brief A counting semaphore. There are two external functions P and V. P decrements the internal count and will be +/// blocked if the count is 0 (zero). V increments the internal count and wake up one of the waiters. +class Semaphore { + public: + /// \brief Constructor + /// \param init Initial value of the internal counter. + explicit Semaphore(int init) : value_(init) {} + + virtual ~Semaphore() {} + /// \brief Decrement the internal counter. Will be blocked if the value is 0. + /// \return Error code. Can get interrupt. + Status P(); + /// \brief Increment the internal counter. Wakeup on of the watiers if any. + void V(); + /// \brief Peek the internal value + /// \return The internal value + int Peek(); + Status Register(TaskGroup *vg); + Status Deregister(); + void ResetIntrpState(); + + private: + int value_; + + std::mutex mutex_; + CondVar wait_cond_; +}; +} // namespace dataset +} // namespace mindspore +#endif // DATASET_UTIL_SEMAPHORE_H_ diff --git a/mindspore/ccsrc/dataset/util/slice.cc b/mindspore/ccsrc/dataset/util/slice.cc new file mode 100644 index 0000000000000000000000000000000000000000..f1798b4f44a7b0153e386162f2064422e8e62e45 --- /dev/null +++ b/mindspore/ccsrc/dataset/util/slice.cc @@ -0,0 +1,38 @@ +/** + * Copyright 2019 Huawei Technologies Co., Ltd + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +#include "dataset/util/slice.h" + +namespace mindspore { +namespace dataset { +WritableSlice::WritableSlice(const WritableSlice &src, off64_t offset, size_t len) : ReadableSlice(src, offset, len) { + mutable_data_ = static_cast(src.mutable_data_) + offset; +} +WritableSlice::WritableSlice(const WritableSlice &src, off64_t offset) + : WritableSlice(src, offset, src.GetSize() - offset) {} +Status WritableSlice::Copy(WritableSlice *dest, const ReadableSlice &src) { + RETURN_UNEXPECTED_IF_NULL(dest); + RETURN_UNEXPECTED_IF_NULL(dest->GetMutablePointer()); + if (dest->GetSize() <= 0) { + RETURN_STATUS_UNEXPECTED("Destination length is non-positive"); + } + auto err = memcpy_s(dest->GetMutablePointer(), dest->GetSize(), src.GetPointer(), src.GetSize()); + if (err) { + RETURN_STATUS_UNEXPECTED(std::to_string(err)); + } + return Status::OK(); +} +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/dataset/util/slice.h b/mindspore/ccsrc/dataset/util/slice.h new file mode 100644 index 0000000000000000000000000000000000000000..127df23cfabaffaa650294bde65095c770779220 --- /dev/null +++ b/mindspore/ccsrc/dataset/util/slice.h @@ -0,0 +1,122 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef DATASET_UTIL_SLICE_H_ +#define DATASET_UTIL_SLICE_H_ + +#include +#include +#include +#include "./securec.h" +#include "dataset/util/allocator.h" +#include "dataset/util/status.h" +namespace mindspore { +namespace dataset { +/// \brief A ReadableSlice wraps a const pointer in memory and its size. +/// \see WritableSlice for a non-const version +/// +class ReadableSlice { + public: + ReadableSlice() : ptr_(nullptr), sz_(0) {} + ReadableSlice(const void *ptr, size_t sz) : ptr_(ptr), sz_(sz) {} + ReadableSlice(const ReadableSlice &src, off64_t offset, size_t len) { + ptr_ = static_cast(src.GetPointer()) + offset; + sz_ = len; + } + ReadableSlice(const ReadableSlice &src, off64_t offset) : ReadableSlice(src, offset, src.sz_ - offset) {} + ReadableSlice(const ReadableSlice &lhs) { + ptr_ = lhs.ptr_; + sz_ = lhs.sz_; + } + ReadableSlice &operator=(const ReadableSlice &lhs) { + if (this != &lhs) { + ptr_ = lhs.ptr_; + sz_ = lhs.sz_; + } + return *this; + } + ReadableSlice(ReadableSlice &&lhs) noexcept { + if (this != &lhs) { + ptr_ = lhs.ptr_; + sz_ = lhs.sz_; + lhs.ptr_ = nullptr; + lhs.sz_ = 0; + } + } + ReadableSlice &operator=(ReadableSlice &&lhs) noexcept { + if (this != &lhs) { + ptr_ = lhs.ptr_; + sz_ = lhs.sz_; + lhs.ptr_ = nullptr; + lhs.sz_ = 0; + } + return *this; + } + /// \brief Getter function + /// \return Const version of the pointer + const void *GetPointer() const { return ptr_; } + /// \brief Getter function + /// \return Size of the slice + size_t GetSize() const { return sz_; } + bool empty() const { return ptr_ == nullptr; } + + private: + const void *ptr_; + size_t sz_; +}; +/// \brief A WritableSlice inherits from ReadableSlice to allow +/// one to write to the address pointed to by the pointer. +/// +class WritableSlice : public ReadableSlice { + public: + friend class StorageContainer; + /// \brief Default constructor + WritableSlice() : ReadableSlice(), mutable_data_(nullptr) {} + /// \brief This form of a constructor takes a pointer and its size. + WritableSlice(void *ptr, size_t sz) : ReadableSlice(ptr, sz), mutable_data_(ptr) {} + WritableSlice(const WritableSlice &src, off64_t offset, size_t len); + WritableSlice(const WritableSlice &src, off64_t offset); + WritableSlice(const WritableSlice &lhs) : ReadableSlice(lhs) { mutable_data_ = lhs.mutable_data_; } + WritableSlice &operator=(const WritableSlice &lhs) { + if (this != &lhs) { + mutable_data_ = lhs.mutable_data_; + ReadableSlice::operator=(lhs); + } + return *this; + } + WritableSlice(WritableSlice &&lhs) noexcept : ReadableSlice(std::move(lhs)) { + if (this != &lhs) { + mutable_data_ = lhs.mutable_data_; + lhs.mutable_data_ = nullptr; + } + } + WritableSlice &operator=(WritableSlice &&lhs) noexcept { + if (this != &lhs) { + mutable_data_ = lhs.mutable_data_; + lhs.mutable_data_ = nullptr; + ReadableSlice::operator=(std::move(lhs)); + } + return *this; + } + /// \brief Copy the content from one slice onto another. + static Status Copy(WritableSlice *dest, const ReadableSlice &src); + + private: + void *mutable_data_; + void *GetMutablePointer() { return mutable_data_; } +}; +} // namespace dataset +} // namespace mindspore +#endif // DATASET_UTIL_SLICE_H_ diff --git a/mindspore/ccsrc/dataset/util/storage_container.cc b/mindspore/ccsrc/dataset/util/storage_container.cc new file mode 100644 index 0000000000000000000000000000000000000000..96f5b45d0cc35815cb37beafaeab189ffc672425 --- /dev/null +++ b/mindspore/ccsrc/dataset/util/storage_container.cc @@ -0,0 +1,164 @@ +/** + * Copyright 2019 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "dataset/util/storage_container.h" + +#include +#include +#include +#include +#include "common/utils.h" +#include "dataset/util/de_error.h" +#include "dataset/util/path.h" +#include "dataset/util/status.h" +#include "utils/log_adapter.h" + +namespace mindspore { +namespace dataset { +Status StorageContainer::Create() { + RETURN_IF_NOT_OK(BuddySpace::CreateBuddySpace(&bs_)); + RETURN_IF_NOT_OK(cont_.CreateFile(&fd_)); + is_open_ = true; + MS_LOG(INFO) << "Container " << cont_ << " created"; + return Status::OK(); +} + +Status StorageContainer::Open() noexcept { + std::lock_guard lck(mutex_); + // Check again + if (!is_open_) { + RETURN_IF_NOT_OK(cont_.OpenFile(&fd_)); + is_open_ = true; + } + return Status::OK(); +} + +Status StorageContainer::Close() noexcept { + if (is_open_) { + std::lock_guard lck(mutex_); + // Check again + if (is_open_) { + RETURN_IF_NOT_OK(cont_.CloseFile(fd_)); + is_open_ = false; + fd_ = -1; + } + } + return Status::OK(); +} + +Status StorageContainer::Read(WritableSlice *dest, off64_t offset) const noexcept { + DS_ASSERT(is_open_); + RETURN_UNEXPECTED_IF_NULL(dest); + auto sz = dest->GetSize(); +#if defined(_WIN32) || defined(_WIN64) + // Doesn't seem there is any pread64 on mingw. + // So we will do a seek and then a read under + // a protection of mutex. + std::lock_guard lck(mutex_); + auto seek_err = lseek(fd_, offset, SEEK_SET); + if (seek_err < 0) { + RETURN_STATUS_UNEXPECTED(strerror(errno)); + } + auto r_sz = read(fd_, dest->GetMutablePointer(), sz); +#else + auto r_sz = pread64(fd_, dest->GetMutablePointer(), sz, offset); +#endif + if (r_sz != sz) { + errno_t err = (r_sz == 0) ? EOF : errno; + RETURN_STATUS_UNEXPECTED(strerror(err)); + } + return Status::OK(); +} + +Status StorageContainer::Write(const ReadableSlice &dest, off64_t offset) const noexcept { + DS_ASSERT(is_open_); + auto sz = dest.GetSize(); +#if defined(_WIN32) || defined(_WIN64) + // Doesn't seem there is any pwrite64 on mingw. + // So we will do a seek and then a read under + // a protection of mutex. + std::lock_guard lck(mutex_); + auto seek_err = lseek(fd_, offset, SEEK_SET); + if (seek_err < 0) { + RETURN_STATUS_UNEXPECTED(strerror(errno)); + } + auto r_sz = write(fd_, dest.GetPointer(), sz); +#else + auto r_sz = pwrite64(fd_, dest.GetPointer(), sz, offset); +#endif + if (r_sz != sz) { + errno_t err = (r_sz == 0) ? EOF : errno; + RETURN_STATUS_UNEXPECTED(strerror(err)); + } + return Status::OK(); +} + +Status StorageContainer::Insert(const std::vector &buf, off64_t *offset) noexcept { + size_t sz = 0; + for (auto &v : buf) { + sz += v.GetSize(); + } + if (sz == 0) { + RETURN_STATUS_UNEXPECTED("Unexpected 0 length"); + } + if (sz > bs_->GetMaxSize()) { + RETURN_STATUS_UNEXPECTED("Request size too big"); + } + BSpaceDescriptor bspd{0}; + addr_t addr = 0; + RETURN_IF_NOT_OK(bs_->Alloc(sz, &bspd, &addr)); + *offset = static_cast(addr); + // We will do piecewise copy of the data to disk. + for (auto &v : buf) { + RETURN_IF_NOT_OK(Write(v, addr)); + addr += v.GetSize(); + } + return Status::OK(); +} + +Status StorageContainer::Truncate() const noexcept { + if (is_open_) { + RETURN_IF_NOT_OK(cont_.TruncateFile(fd_)); + MS_LOG(INFO) << "Container " << cont_ << " truncated"; + } + return Status::OK(); +} + +StorageContainer::~StorageContainer() noexcept { + (void)Truncate(); + (void)Close(); +} + +std::ostream &operator<<(std::ostream &os, const StorageContainer &s) { + os << "File path : " << s.cont_ << "\n" << *(s.bs_.get()); + return os; +} + +Status StorageContainer::CreateStorageContainer(std::shared_ptr *out_sc, const std::string &path) { + Status rc; + auto sc = new (std::nothrow) StorageContainer(path); + if (sc == nullptr) { + return Status(StatusCode::kOutOfMemory); + } + rc = sc->Create(); + if (rc.IsOk()) { + (*out_sc).reset(sc); + } else { + delete sc; + } + return rc; +} +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/dataset/util/storage_container.h b/mindspore/ccsrc/dataset/util/storage_container.h new file mode 100644 index 0000000000000000000000000000000000000000..07e41bd66a7ba7fc3080847acb4f8021cdd500c7 --- /dev/null +++ b/mindspore/ccsrc/dataset/util/storage_container.h @@ -0,0 +1,79 @@ +/** + * Copyright 2019 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef DATASET_UTIL_STORAGE_CONTAINER_H_ +#define DATASET_UTIL_STORAGE_CONTAINER_H_ + +#include +#include +#include +#include +#include +#include +#include "dataset/util/system_pool.h" +#include "dataset/util/buddy.h" +#include "dataset/util/path.h" +#include "dataset/util/slice.h" +#include "dataset/util/status.h" + +namespace mindspore { +namespace dataset { +class StorageManager; + +class StorageContainer { + public: + friend class StorageManager; + + ~StorageContainer() noexcept; + + StorageContainer(const StorageContainer &) = delete; + + StorageContainer &operator=(const StorageContainer &) = delete; + + friend std::ostream &operator<<(std::ostream &os, const StorageContainer &s); + + Status Open() noexcept; + + Status Close() noexcept; + + Status Insert(const std::vector &buf, off64_t *offset) noexcept; + + Status Write(const ReadableSlice &dest, off64_t offset) const noexcept; + + Status Read(WritableSlice *dest, off64_t offset) const noexcept; + + Status Truncate() const noexcept; + + bool IsOpen() const { return is_open_; } + + static Status CreateStorageContainer(std::shared_ptr *out_sc, const std::string &path); + + private: + mutable std::mutex mutex_; + Path cont_; + int fd_; + bool is_open_; + std::unique_ptr bs_; + + // Use the default value of BuddySpace + // which can map upto 4G of space. + explicit StorageContainer(const std::string &path) : cont_(path), fd_(-1), is_open_(false), bs_(nullptr) {} + + Status Create(); +}; +} // namespace dataset +} // namespace mindspore + +#endif // DATASET_UTIL_STORAGE_CONTAINER_H_ diff --git a/mindspore/ccsrc/dataset/util/storage_manager.cc b/mindspore/ccsrc/dataset/util/storage_manager.cc new file mode 100644 index 0000000000000000000000000000000000000000..8b7a6044e93dd44e285e50071fc9a907a93a1153 --- /dev/null +++ b/mindspore/ccsrc/dataset/util/storage_manager.cc @@ -0,0 +1,167 @@ +/** + * Copyright 2019 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "dataset/util/storage_manager.h" + +#include +#include +#include +#include +#include "common/utils.h" +#include "dataset/util/path.h" +#include "dataset/util/services.h" +#include "dataset/util//de_error.h" +#include "utils/log_adapter.h" + +namespace mindspore { +namespace dataset { +std::string StorageManager::GetBaseName(const std::string &prefix, int32_t file_id) { + std::ostringstream oss; + oss << prefix << std::setfill('0') << std::setw(5) << file_id; + return oss.str(); +} + +std::string StorageManager::ConstructFileName(const std::string &prefix, int32_t file_id, const std::string &suffix) { + std::string base_name = GetBaseName(prefix, file_id); + return (base_name + "." + suffix); +} + +Status StorageManager::AddOneContainer() { + const std::string kPrefix = "IMG"; + const std::string kSuffix = "LB"; + Path container_name = root_ / ConstructFileName(kPrefix, file_id_, kSuffix); + std::shared_ptr sc; + RETURN_IF_NOT_OK(StorageContainer::CreateStorageContainer(&sc, container_name.toString())); + containers_.push_back(sc); + file_id_++; + return Status::OK(); +} + +Status StorageManager::DoServiceStart() { + containers_.reserve(1000); + if (root_.IsDirectory()) { + RETURN_IF_NOT_OK(AddOneContainer()); + } else { + RETURN_STATUS_UNEXPECTED("Not a directory"); + } + return Status::OK(); +} + +Status StorageManager::Write(key_type *key, const std::vector &buf) { + RETURN_UNEXPECTED_IF_NULL(key); + size_t sz = 0; + for (auto &v : buf) { + sz += v.GetSize(); + } + if (sz == 0) { + RETURN_STATUS_UNEXPECTED("Unexpected 0 length"); + } + std::shared_ptr cont; + key_type out_key; + value_type out_value; + bool create_new_container = false; + do { + SharedLock lock_s(&rw_lock_); + size_t num_containers = containers_.size(); + if (create_new_container) { + // Upgrade to exclusvie lock. + lock_s.Upgrade(); + create_new_container = false; + // Check again if someone has already added a + // new container after we got the x lock + if (containers_.size() == num_containers) { + RETURN_IF_NOT_OK(AddOneContainer()); + } + // Refresh how many containers there are. + num_containers = containers_.size(); + // Downgrade back to shared lock + lock_s.Downgrade(); + } + if (num_containers == 0) { + RETURN_STATUS_UNEXPECTED("num_containers is zero"); + } + // Go to the last container to insert. + cont = containers_.at(num_containers - 1); + off64_t offset; + Status rc = cont->Insert(buf, &offset); + if (rc.IsNoSpace()) { + create_new_container = true; + } else if (rc.IsOk()) { + out_value = std::make_pair(num_containers - 1, std::make_pair(offset, sz)); + RETURN_IF_NOT_OK(index_.insert(out_value, &out_key)); + *key = out_key; + break; + } else { + return rc; + } + } while (true); + return Status::OK(); +} + +Status StorageManager::Read(StorageManager::key_type key, WritableSlice *dest, size_t *bytesRead) const { + RETURN_UNEXPECTED_IF_NULL(dest); + auto r = index_.Search(key); + if (r.second) { + auto &it = r.first; + value_type v = *it; + int container_inx = v.first; + off_t offset = v.second.first; + size_t sz = v.second.second; + if (dest->GetSize() < sz) { + std::string errMsg = "Destination buffer too small. Expect at least " + std::to_string(sz) + + " but length = " + std::to_string(dest->GetSize()); + RETURN_STATUS_UNEXPECTED(errMsg); + } + if (bytesRead != nullptr) { + *bytesRead = sz; + } + auto cont = containers_.at(container_inx); + RETURN_IF_NOT_OK(cont->Read(dest, offset)); + } else { + RETURN_STATUS_UNEXPECTED("Key not found"); + } + return Status::OK(); +} + +Status StorageManager::DoServiceStop() noexcept { + Status rc; + Status rc1; + for (auto const &p : containers_) { + // The destructor of StorageContainer is not called automatically until the use + // count drops to 0. But it is not always the case. We will do it ourselves. + rc = p.get()->Truncate(); + if (rc.IsError()) { + rc1 = rc; + } + } + containers_.clear(); + file_id_ = 0; + return rc1; +} + +StorageManager::StorageManager(const Path &root) : root_(root), file_id_(0), index_() {} + +StorageManager::~StorageManager() { (void)StorageManager::DoServiceStop(); } + +std::ostream &operator<<(std::ostream &os, const StorageManager &s) { + os << "Dumping all containers ..." + << "\n"; + for (auto const &p : s.containers_) { + os << *(p.get()); + } + return os; +} +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/dataset/util/storage_manager.h b/mindspore/ccsrc/dataset/util/storage_manager.h new file mode 100644 index 0000000000000000000000000000000000000000..075ac713d2c98c4f1e281738c3a1fda35edda4ea --- /dev/null +++ b/mindspore/ccsrc/dataset/util/storage_manager.h @@ -0,0 +1,76 @@ +/** + * Copyright 2019 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef DATASET_UTIL_STORAGE_MANAGER_H_ +#define DATASET_UTIL_STORAGE_MANAGER_H_ + +#include +#include +#include +#include +#include +#include "dataset/util/allocator.h" +#include "dataset/util/auto_index.h" +#include "dataset/util/lock.h" +#include "dataset/util/memory_pool.h" +#include "dataset/util/path.h" +#include "dataset/util/service.h" +#include "dataset/util/slice.h" +#include "dataset/util/storage_container.h" + +using ListOfContainers = std::vector>; +namespace mindspore { +namespace dataset { +class StorageManager : public Service { + public: + using storage_index = AutoIndexObj>>; + using key_type = storage_index::key_type; + using value_type = storage_index::value_type; + + explicit StorageManager(const Path &); + + ~StorageManager() override; + + StorageManager(const StorageManager &) = delete; + + StorageManager &operator=(const StorageManager &) = delete; + + Status Write(key_type *out_key, const std::vector &buf); + + Status Read(key_type key, WritableSlice *dest, size_t *bytesRead) const; + + Status DoServiceStart() override; + + Status DoServiceStop() noexcept override; + + friend std::ostream &operator<<(std::ostream &os, const StorageManager &s); + + private: + Path root_; + ListOfContainers containers_; + int file_id_; + RWLock rw_lock_; + storage_index index_; + + std::string GetBaseName(const std::string &prefix, int32_t file_id); + + std::string ConstructFileName(const std::string &prefix, int32_t file_id, const std::string &suffix); + + Status AddOneContainer(); +}; +} // namespace dataset +} // namespace mindspore + +#endif // DATASET_UTIL_STORAGE_MANAGER_H_ diff --git a/mindspore/ccsrc/dataset/util/system_pool.h b/mindspore/ccsrc/dataset/util/system_pool.h index bd15ad11ddf977ae30318c12904974c3aedd6e90..286e30a615815121f3c2983c8150fc1f3b177a0b 100644 --- a/mindspore/ccsrc/dataset/util/system_pool.h +++ b/mindspore/ccsrc/dataset/util/system_pool.h @@ -19,8 +19,10 @@ #include #include #include +#include #include #include "./securec.h" +#include "dataset/util/allocator.h" #include "dataset/util/memory_pool.h" namespace mindspore { @@ -61,6 +63,11 @@ class SystemPool : public MemoryPool { uint64_t get_max_size() const override { return std::numeric_limits::max(); } int PercentFree() const override { return 100; } + + template + static Allocator GetAllocator() { + return Allocator(std::make_shared()); + } }; } // namespace dataset } // namespace mindspore