// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #pragma once #ifdef BCLOUD #include #include #else #include #include #endif #include #include #include #include #include #include namespace im { namespace fugue { #ifdef BCLOUD namespace butil = base; #endif namespace lockfree { template class PushOnlyStack { public: PushOnlyStack() { _head.store(NULL, butil::memory_order_relaxed); } void push(T* node) { T* head = _head.load(butil::memory_order_relaxed); node->next = head; while ( !_head.compare_exchange_weak(head, node, butil::memory_order_relaxed)) { node->next = head; } } T* release() { return _head.exchange(NULL, butil::memory_order_relaxed); } private: butil::atomic _head; }; template struct FreeListNode { uint64_t id; uint64_t next; T data; }; template class FreeList { public: typedef FreeListNode Node; static const uint64_t EMPTY = 0xFFFFFFFFFFFFFFFF; T* get() { uint64_t head = _head.load(butil::memory_order_acquire); if (head == EMPTY) { return new_node(); } Node* node = address(head); while (!_head.compare_exchange_weak( head, node->next, butil::memory_order_acquire)) { if (head == EMPTY) { return new_node(); } node = address(head); } return &node->data; } void put(T* value) { Node* node = container_of(value, Node, data); uint64_t head = _head.load(butil::memory_order_acquire); // add version node->id += (1UL << 32); node->next = head; // NOTE: we MUST use a temp var *head* to call compare_exchange_weak // because Boost.Atomic will update the *expected* even success // std::atomic do not have this limitation while (!_head.compare_exchange_weak( head, node->id, butil::memory_order_release)) { node->next = head; } } template void unsafe_foreach() { uint32_t used_blk_cnt = _slot_index.load(butil::memory_order_relaxed); for (uint32_t i = 0; i < used_blk_cnt; ++i) { F()(&_node[i]->data); } } uint32_t real_used_size() const { uint32_t used_blk_cnt = _slot_index.load(butil::memory_order_relaxed); uint64_t used_bytes = 0; for (uint32_t i = 0; i < used_blk_cnt; ++i) { used_bytes += _node[i]->data.offset; } return used_bytes >> 10; } uint32_t allocate_blocks() const { return _slot_index.load(butil::memory_order_relaxed); } uint32_t free_blocks() const { uint64_t head = _head.load(butil::memory_order_relaxed); uint32_t size = 0; while (head != FreeList::EMPTY) { const Node* head_ptr = address(head); head = head_ptr->next; ++size; } return size; } void reset() { _head.store(FreeList::EMPTY, butil::memory_order_relaxed); _slot_index.store(0, butil::memory_order_relaxed); } FreeList() { for (int i = 0; i < CAP; ++i) { _node[i] = NULL; } reset(); } private: uint32_t slot(uint64_t id) const { return static_cast(id); } T* new_node() { uint32_t index = _slot_index.fetch_add(1, butil::memory_order_relaxed); if (index >= CAP) { return NULL; } if (_node[index] != NULL) { return &(_node[index]->data); } Node* node = reinterpret_cast(malloc(sizeof(Node))); new (node) Node; node->id = index; _node[index] = node; return &node->data; } Node* address(uint64_t id) { return _node[slot(id)]; } const Node* address(uint64_t id) const { return _node[slot(id)]; } butil::atomic _head; butil::atomic _slot_index; Node* _node[CAP]; }; } // namespace lockfree namespace memory { struct Block { static const int BLOCK_SIZE = 2 * 1024 * 1024; char data[BLOCK_SIZE]; }; class GlobalBlockFreeList { public: static const int MAX_BLOCK_COUNT = 32 * 1024; typedef lockfree::FreeList type; static type* instance() { static type singleton; return &singleton; } }; struct BlockReference { BlockReference() : offset(0), block(NULL) { // do nothing } void reset() { offset = 0; block = NULL; } uint32_t offset; Block* block; }; class Region { public: struct GlobalPut { void operator()(BlockReference* block_ref) { if (block_ref->block != NULL) { GlobalBlockFreeList::instance()->put(block_ref->block); } block_ref->reset(); } }; struct BigNode { BigNode* next; char data[0]; }; ~Region() { reset(); delete[] _big_mem_start; _big_mem_start = NULL; } char const* debug_str() const { uint32_t alloc_blocks = _free_blocks.allocate_blocks(); uint32_t free_blocks = _free_blocks.free_blocks(); uint32_t used_mem_mb = _free_blocks.real_used_size(); uint32_t big_buf_size = _big_mem_size.load(butil::memory_order_relaxed); uint32_t big_buf_count = _big_mem_count.load(butil::memory_order_relaxed); uint32_t mlc_mem_size = _mlc_mem_size.load(butil::memory_order_relaxed); uint32_t mlc_mem_count = _mlc_mem_count.load(butil::memory_order_relaxed); std::ostringstream oss; oss << "[alloc_blks:" << alloc_blocks << ",free_blks:" << free_blocks << ",used_mem_kb:" << used_mem_mb << ",big_mem_kb:" << (big_buf_size >> 10) << ",big_buf_cnt:" << big_buf_count << ",mlc_mem_kb:" << (mlc_mem_size >> 10) << ",mlc_cnt:" << mlc_mem_count << "]"; return oss.str().c_str(); } Region(); void init(); void reset(); BlockReference* get(); void* malloc(size_t size); void put(BlockReference* block); static const int MAX_BLOCK_COUNT = 1024; static const int BIG_MEM_THRESHOLD = 256 * 1024; static const int MLC_MEM_THRESHOLD = 4 * 1024 * 1024; static const int COUNTER_SIZE = MLC_MEM_THRESHOLD / BIG_MEM_THRESHOLD + 1; private: lockfree::FreeList _free_blocks; lockfree::PushOnlyStack _big_nodes; butil::atomic _big_mem_size; butil::atomic _big_mem_count; char* _big_mem_start; uint32_t _big_mem_capacity; butil::atomic _mlc_mem_size; butil::atomic _mlc_mem_count; }; } // namespace memory } // namespace fugue class Mempool { public: void* malloc(size_t size) { size = _align(size); if (size <= _free_size) { void* p = _free_cursor; _free_size -= size; _free_cursor += size; return p; } return malloc_from_region(size); } void free(void* p, size_t size) { if (size >= fugue::memory::Region::BIG_MEM_THRESHOLD) { return; } if (_free_cursor - size == static_cast(p)) { size_t down_aligned = _down_align(size); _free_cursor -= down_aligned; _free_size += down_aligned; } } void* realloc(void* old_data, size_t old_size, size_t new_size) { if (old_size >= new_size) { return old_data; } size_t required = new_size - old_size; if (_free_cursor == static_cast(old_data) + old_size) { if (_free_size >= required) { _free_cursor += required; _free_size -= required; return old_data; } else { _free_cursor = static_cast(old_data); _free_size += old_size; } } void* p = this->malloc_from_region(new_size); if (p != NULL) { memcpy(p, old_data, old_size); return p; } return NULL; } explicit Mempool(fugue::memory::Region* blocks) : _free_size(0), _free_cursor(NULL), _blocks(blocks) { _block = NULL; } ~Mempool() { release_block(); } void release_block() { if (_block) { _block->offset = fugue::memory::Block::BLOCK_SIZE - _free_size; _blocks->put(_block); } _free_size = 0; _free_cursor = NULL; _block = NULL; } private: void* malloc_from_region(size_t size) { if (size >= fugue::memory::Region::BIG_MEM_THRESHOLD) { return _blocks->malloc(size); } while (true) { fugue::memory::BlockReference* block = _blocks->get(); if (block == NULL) { return NULL; } uint32_t free_size = fugue::memory::Block::BLOCK_SIZE - block->offset; if (size <= free_size) { if (_block) { _block->offset = fugue::memory::Block::BLOCK_SIZE - _free_size; } char* p = block->block->data + block->offset; _free_size = free_size - size; _free_cursor = p + size; _block = block; return p; } } return _blocks->malloc(size); } static const int ALIGN_SIZE = sizeof(void*); inline size_t _align(size_t size) const { return (size + (ALIGN_SIZE - 1)) & ~(ALIGN_SIZE - 1); } inline size_t _down_align(size_t size) const { return size & ~(ALIGN_SIZE - 1); } size_t _free_size; char* _free_cursor; fugue::memory::Region* _blocks; fugue::memory::BlockReference* _block; }; extern __thread Mempool* g_mempool; class mempool { public: virtual void* malloc(size_t size) = 0; virtual void free(void* p, size_t size) = 0; inline virtual ~mempool() {} }; class GlobalMempool : public mempool { public: GlobalMempool() { // do nothing; } virtual ~GlobalMempool() { // do nothing; } static GlobalMempool* instance() { static GlobalMempool singleton; return &singleton; } void reset(Mempool* mempool) { g_mempool = mempool; } void* malloc(size_t size) { return g_mempool->malloc(size); } void* realloc(void* old_data, size_t old_size, size_t new_size) { return g_mempool->realloc(old_data, old_size, new_size); } void free(void* p, size_t s) { g_mempool->free(p, s); } void clear() { g_mempool->release_block(); } Mempool* get() { return g_mempool; } }; class MempoolGuard { public: explicit MempoolGuard(fugue::memory::Region* region) : _mempool(region) { acquire(); } void acquire() { _saved_mempool = g_mempool; g_mempool = &_mempool; } void release() { _mempool.release_block(); g_mempool = _saved_mempool; } ~MempoolGuard() { release(); } private: Mempool _mempool; Mempool* _saved_mempool; }; inline std::string print_trace() { static const int BT_BUF_SIZE = 400; std::stringstream debug_stream; void* buffer[BT_BUF_SIZE]; int nptrs = backtrace(buffer, BT_BUF_SIZE); char** strings = backtrace_symbols(buffer, nptrs); for (int j = 0; j < nptrs; j++) { debug_stream << strings[j] << "\t"; } return debug_stream.str(); } } // namespace im