提交 1fb36f54 编写于 作者: H HexToString

fix memoryPool

上级 dfd7f014
......@@ -19,6 +19,11 @@ namespace baidu {
namespace paddle_serving {
namespace predictor {
// why we need MempoolRegion
// because we need to release the resource.
// so we need both Mempool and Region.
// Mempool is a wrapper class for us to use memory more safely.
// Region is the RAII class.
struct MempoolRegion {
MempoolRegion(im::fugue::memory::Region* region, im::Mempool* mempool)
: _region(region), _mempool(mempool) {}
......
......@@ -16,6 +16,7 @@
namespace im {
// `g_mempool` this is not used at all
__thread Mempool* g_mempool = NULL;
namespace fugue {
......@@ -28,38 +29,47 @@ void Region::init() {
}
void Region::reset() {
// release memory allocate from GlobalMempool
_free_blocks.unsafe_foreach<GlobalPut>();
_free_blocks.reset();
// return the Block memory borrow from BlockFreeList
_blockReference_FreeList.unsafe_foreach<PutBlockByReference>();
_blockReference_FreeList.reset();
// release memory from malloc
BigNode* head = _big_nodes.release();
// release BigNode memory
BigNode* head = _bigNode_Stack.releaseAndGetHeadPtr();
while (head) {
BigNode* next = head->next;
::free(head);
head = next;
}
_mlc_mem_size.store(0, butil::memory_order_relaxed);
_mlc_mem_count.store(0, butil::memory_order_relaxed);
_total_bigNode_size.store(0, butil::memory_order_relaxed);
_total_bigNode_count.store(0, butil::memory_order_relaxed);
// clear the large buffer
// clear the large buffer, but don`t release it.
// it will be deleted in the deconstruction.
_big_mem_size.store(0, butil::memory_order_relaxed);
_big_mem_count.store(0, butil::memory_order_relaxed);
}
BlockReference* Region::get() {
BlockReference* ref = _free_blocks.get();
// the first time, it will be null
// after you call put(), it won`t be null.
BlockReference* ref = _blockReference_FreeList.get();
if (ref->block == NULL) {
ref->offset = 0;
ref->block = GlobalBlockFreeList::instance()->get();
ref->block = BlockFreeList::instance()->get();
}
return ref;
}
void Region::put(BlockReference* block) { _free_blocks.put(block); }
// this will not return the Block to the BlockFreeList
// it just return to the _blockReference_FreeList.
// next time when you call get(), you will get the BlockReference* head (which
// is just put by yourself)
void Region::put(BlockReference* blockReference) {
_blockReference_FreeList.put(blockReference);
}
void* Region::malloc(size_t size) {
if (size < MLC_MEM_THRESHOLD) {
if (size < BIGNODE_MEM_THRESHOLD) {
uint32_t offset =
_big_mem_size.fetch_add(size, butil::memory_order_relaxed);
if (offset + size < _big_mem_capacity) {
......@@ -68,22 +78,22 @@ void* Region::malloc(size_t size) {
}
}
_mlc_mem_size.fetch_add(size, butil::memory_order_relaxed);
_mlc_mem_count.fetch_add(1, butil::memory_order_relaxed);
// if size>= BIGNODE_MEM_THRESHOLD or the _big_mem_capacity is used up.
_total_bigNode_size.fetch_add(size, butil::memory_order_relaxed);
_total_bigNode_count.fetch_add(1, butil::memory_order_relaxed);
BigNode* node = reinterpret_cast<BigNode*>(::malloc(sizeof(BigNode) + size));
_big_nodes.push(node);
_bigNode_Stack.push(node);
return node->data;
}
Region::Region() {
_big_mem_size.store(0, butil::memory_order_relaxed);
_big_mem_count.store(0, butil::memory_order_relaxed);
_big_mem_start = NULL;
_big_mem_capacity = 0;
_mlc_mem_size.store(0, butil::memory_order_relaxed);
_mlc_mem_count.store(0, butil::memory_order_relaxed);
_total_bigNode_size.store(0, butil::memory_order_relaxed);
_total_bigNode_count.store(0, butil::memory_order_relaxed);
}
} // namespace memory
} // namespace fugue
......
......@@ -38,6 +38,16 @@ namespace butil = base;
namespace lockfree {
/*
struct BigNode {
BigNode* next;
char data[0];
};
*/
// template T is BigNode
// which is a node of variable length memory linked list
// _head is a BigNode* ptr, always points to the head node of the Stack.
// so PushOnlyStack is the head node of the Stack with some member function.
template <class T>
class PushOnlyStack {
public:
......@@ -52,12 +62,20 @@ class PushOnlyStack {
}
}
T* release() { return _head.exchange(NULL, butil::memory_order_relaxed); }
T* releaseAndGetHeadPtr() {
return _head.exchange(NULL, butil::memory_order_relaxed);
}
private:
butil::atomic<T*> _head;
};
// T can be class Block or class BlockReference
// class Block is 2M bytes memory
// class BlockReference is class Block* ptr.
// so the main member of FreeListNode is 2M bytes or class Block* ptr
// int 'id' is the index of itself in a FreeList.
// int 'next' is the index of next FreeListNode<T> in a FreeList.
template <class T>
struct FreeListNode {
uint64_t id;
......@@ -65,18 +83,34 @@ struct FreeListNode {
T data;
};
// T can be class Block or class BlockReference
// CAP means capicity
// the main member of FreeList is FreeListNode<T>* [CAP].
// FreeList doesn`t realse the block data, it`s only an array of
// FreeListNode<T>* ptr.
template <class T, int CAP>
class FreeList {
public:
typedef FreeListNode<T> Node;
static const uint64_t EMPTY = 0xFFFFFFFFFFFFFFFF;
// get the head Node`s member data ptr(T*)
T* get() {
uint64_t head = _head.load(butil::memory_order_acquire);
if (head == EMPTY) {
return new_node();
}
// _head is atomic<int>, which means the head index.
// head is the tempValue of _head.
// maybe _head is not equals head anymore.
// cause other thread may change the _head.
/*compare_exchange_weak
When the current value is equal to the expected value, modify the current
value to the set value and return true
When the current value is not equal to the expected value, modify the
expected value to the current value and return false
*/
Node* node = address(head);
while (!_head.compare_exchange_weak(
head, node->next, butil::memory_order_acquire)) {
......@@ -89,10 +123,23 @@ class FreeList {
}
void put(T* value) {
/*
container_of
according to the member(pointer type) of a Class
to get the class Pointer
for example
T is the member of class Node, T data, 'data' is the name.
T* value is the member(pointer type) class Node
so we can get the Node* by calling container_of(value, Node, data)
*/
Node* node = container_of(value, Node, data);
uint64_t head = _head.load(butil::memory_order_acquire);
// add version
// node->id is int64. slot index is int32.
// address(): slot = static_cast<uint32_t>(node->id)
// will this be wrong?
// add version? maybe this is different from new node?
node->id += (1UL << 32);
node->next = head;
......@@ -105,6 +152,9 @@ class FreeList {
}
}
// F is callable class, class PutBlockByReference.
// actually, F is the function put.
// this function put the reuse the used block or blockReference
template <class F>
void unsafe_foreach() {
uint32_t used_blk_cnt = _slot_index.load(butil::memory_order_relaxed);
......@@ -119,14 +169,15 @@ class FreeList {
for (uint32_t i = 0; i < used_blk_cnt; ++i) {
used_bytes += _node[i]->data.offset;
}
// used_bytes/1024 = KB
return used_bytes >> 10;
}
uint32_t allocate_blocks() const {
uint32_t get_number_of_allocate_blocks() const {
return _slot_index.load(butil::memory_order_relaxed);
}
uint32_t free_blocks() const {
uint32_t get_number_of_free_blocks() const {
uint64_t head = _head.load(butil::memory_order_relaxed);
uint32_t size = 0;
while (head != FreeList::EMPTY) {
......@@ -183,21 +234,13 @@ class FreeList {
namespace memory {
// Memory is 2M bytes
struct Block {
static const int BLOCK_SIZE = 2 * 1024 * 1024;
static const int BLOCK_SIZE = 2 * 1024 * 1024; // 2MB
char data[BLOCK_SIZE];
};
class GlobalBlockFreeList {
public:
static const int MAX_BLOCK_COUNT = 32 * 1024;
typedef lockfree::FreeList<Block, MAX_BLOCK_COUNT> type;
static type* instance() {
static type singleton;
return &singleton;
}
};
// Block* and offset
struct BlockReference {
BlockReference() : offset(0), block(NULL) {
// do nothing
......@@ -212,17 +255,41 @@ struct BlockReference {
Block* block;
};
// This is a real singleton class FreeList<Block,MAX_BLOCK_COUNT>
// FreeList is always an array of FreeListNode<Block>* ptr.
// Block(2MB) is created when get() is called.
// because BlockFreeList is a threal-safe Singleton.
// so we don`t release Block, it is global memory.
// total number is 32*1024
class BlockFreeList {
public:
static const int MAX_BLOCK_COUNT = 32 * 1024;
typedef lockfree::FreeList<Block, MAX_BLOCK_COUNT> BlockFreeListType;
static BlockFreeListType* instance() {
static BlockFreeListType singleton;
return &singleton;
}
};
// _big_mem_capacity: a large memory is owned by Region.
// _bigNode_Stack: A list of bigNode(variable length memory)is owned by
// Region,the number is unlimit.
// _blockReference_FreeList: a FreeList of Block(2MB) is owned by singleton
// BlockFreeList, which is global.
// we can borrow 1024*Block from BlockFreeList.
class Region {
public:
struct GlobalPut {
struct PutBlockByReference {
void operator()(BlockReference* block_ref) {
if (block_ref->block != NULL) {
GlobalBlockFreeList::instance()->put(block_ref->block);
BlockFreeList::instance()->put(block_ref->block);
}
block_ref->reset();
}
};
// this is a variable length memory node.
struct BigNode {
BigNode* next;
char data[0];
......@@ -235,13 +302,16 @@ class Region {
}
char const* debug_str() const {
uint32_t alloc_blocks = _free_blocks.allocate_blocks();
uint32_t free_blocks = _free_blocks.free_blocks();
uint32_t used_mem_mb = _free_blocks.real_used_size();
uint32_t alloc_blocks =
_blockReference_FreeList.get_number_of_allocate_blocks();
uint32_t free_blocks = _blockReference_FreeList.get_number_of_free_blocks();
uint32_t used_mem_mb = _blockReference_FreeList.real_used_size();
uint32_t big_buf_size = _big_mem_size.load(butil::memory_order_relaxed);
uint32_t big_buf_count = _big_mem_count.load(butil::memory_order_relaxed);
uint32_t mlc_mem_size = _mlc_mem_size.load(butil::memory_order_relaxed);
uint32_t mlc_mem_count = _mlc_mem_count.load(butil::memory_order_relaxed);
uint32_t mlc_mem_size =
_total_bigNode_size.load(butil::memory_order_relaxed);
uint32_t mlc_mem_count =
_total_bigNode_count.load(butil::memory_order_relaxed);
std::ostringstream oss;
oss << "[alloc_blks:" << alloc_blocks << ",free_blks:" << free_blocks
......@@ -264,25 +334,34 @@ class Region {
void* malloc(size_t size);
void put(BlockReference* block);
void put(BlockReference* blockReference);
static const int MAX_BLOCK_COUNT = 1024;
static const int BIG_MEM_THRESHOLD = 256 * 1024;
static const int MLC_MEM_THRESHOLD = 4 * 1024 * 1024;
static const int COUNTER_SIZE = MLC_MEM_THRESHOLD / BIG_MEM_THRESHOLD + 1;
static const int MAX_BLOCK_COUNT = 1024; // each Block is 2MB
static const int BIG_MEM_THRESHOLD =
2 * 1024 *
1024; // 2MB,means when you need less than 2M, get memory from Block.
static const int BIGNODE_MEM_THRESHOLD = 4 * 1024 * 1024; // 4MB
static const int COUNTER_SIZE =
BIGNODE_MEM_THRESHOLD / BIG_MEM_THRESHOLD + 1; // this is not used
private:
lockfree::FreeList<BlockReference, MAX_BLOCK_COUNT> _free_blocks;
lockfree::PushOnlyStack<BigNode> _big_nodes;
lockfree::FreeList<BlockReference, MAX_BLOCK_COUNT> _blockReference_FreeList;
// _total_bigNode_size is the total size of BigNodeStack.
// _total_bigNode_count is the total count of BigNodeStack.
// BigNode is variable length memory.
lockfree::PushOnlyStack<BigNode> _bigNode_Stack;
butil::atomic<uint32_t> _total_bigNode_size;
butil::atomic<uint32_t> _total_bigNode_count;
// '_big_mem_start' points to a single big memory belong to Region.
// _big_mem_capacity is the size of single big memory.
// _big_mem_size is the already used size.
// _big_mem_count is the used count.
char* _big_mem_start;
uint32_t _big_mem_capacity; // 32M
butil::atomic<uint32_t> _big_mem_size;
butil::atomic<uint32_t> _big_mem_count;
char* _big_mem_start;
uint32_t _big_mem_capacity;
butil::atomic<uint32_t> _mlc_mem_size;
butil::atomic<uint32_t> _mlc_mem_count;
};
} // namespace memory
} // namespace fugue
......@@ -291,6 +370,8 @@ class Mempool {
public:
void* malloc(size_t size) {
size = _align(size);
// It does not enter the if statement the first time.
// Because the block has not been used up, it will enter.
if (size <= _free_size) {
void* p = _free_cursor;
_free_size -= size;
......@@ -302,11 +383,16 @@ class Mempool {
}
void free(void* p, size_t size) {
if (size >= fugue::memory::Region::BIG_MEM_THRESHOLD) {
// size>Block(2M)
// other memory is managed by Region,no need to release here.
if (size > fugue::memory::Region::BIG_MEM_THRESHOLD) {
return;
}
// memory in Block,update the pointer.
if (_free_cursor - size == static_cast<char*>(p)) {
// for example, you need to release -(8+1)bytes
// you can only release -8bytes,cause -(8+2)byte is used by other.
size_t down_aligned = _down_align(size);
_free_cursor -= down_aligned;
_free_size += down_aligned;
......@@ -314,6 +400,7 @@ class Mempool {
}
void* realloc(void* old_data, size_t old_size, size_t new_size) {
// Return the pointer directly and reuse it without expansion.
if (old_size >= new_size) {
return old_data;
}
......@@ -325,11 +412,19 @@ class Mempool {
_free_size -= required;
return old_data;
} else {
// old_data will copy to other structure
// so _free_cursor rollback,means the memory used by old_data can be
// used.
_free_cursor = static_cast<char*>(old_data);
_free_size += old_size;
}
}
// 可能返回的是单独Region中malloc的内存。
// 也可能是Block,例如new_size=1M, old_data原本的指针头就在1.2M处,old_size =
// 0.5M
// 此时,_free_size = 0.3M,new_size<2M,但是required = 1-0.5 >0.3
// 分配出来的就是Block,但是该Block没有并很完美的利用完全。
void* p = this->malloc_from_region(new_size);
if (p != NULL) {
memcpy(p, old_data, old_size);
......@@ -339,58 +434,70 @@ class Mempool {
return NULL;
}
explicit Mempool(fugue::memory::Region* blocks)
: _free_size(0), _free_cursor(NULL), _blocks(blocks) {
_block = NULL;
explicit Mempool(fugue::memory::Region* region)
: _free_size(0), _free_cursor(NULL), _region(region) {
_blockReference = NULL;
}
~Mempool() { release_block(); }
void release_block() {
if (_block) {
_block->offset = fugue::memory::Block::BLOCK_SIZE - _free_size;
_blocks->put(_block);
if (_blockReference) {
_blockReference->offset = fugue::memory::Block::BLOCK_SIZE - _free_size;
_region->put(_blockReference);
}
_free_size = 0;
_free_cursor = NULL;
_block = NULL;
_blockReference = NULL;
}
private:
void* malloc_from_region(size_t size) {
if (size >= fugue::memory::Region::BIG_MEM_THRESHOLD) {
return _blocks->malloc(size);
// if greater than BIG_MEM_THRESHOLD, _region->malloc
// else get the memory from the Block.
if (size > fugue::memory::Region::BIG_MEM_THRESHOLD) {
return _region->malloc(size);
}
while (true) {
fugue::memory::BlockReference* block = _blocks->get();
if (block == NULL) {
fugue::memory::BlockReference* blockReference = _region->get();
if (blockReference == NULL) {
return NULL;
}
uint32_t free_size = fugue::memory::Block::BLOCK_SIZE - block->offset;
uint32_t free_size =
fugue::memory::Block::BLOCK_SIZE - blockReference->offset;
// 若未能满足要求,则while下一次循环,那么上次的block必然成为野值。
if (size <= free_size) {
if (_block) {
_block->offset = fugue::memory::Block::BLOCK_SIZE - _free_size;
// 试图更新该节点的offset,但是该结点已经变成了野值,无法被再次使用了,只有等待归还。
// 应将该节点put进队列,进入该判断语句内,证明本来get已肯定可以return.
// 此时,上次没用完的值,应该更新offset后,还回去,下次没准还能用,不至于浪费。
if (_blockReference) {
_blockReference->offset =
fugue::memory::Block::BLOCK_SIZE - _free_size;
_region->put(_blockReference);
}
char* p = block->block->data + block->offset;
char* p = blockReference->block->data + blockReference->offset;
_free_size = free_size - size;
_free_cursor = p + size;
_block = block;
_blockReference = blockReference;
return p;
}
}
return _blocks->malloc(size);
// It's not executed at all, for the sake of syntax.
return _region->malloc(size);
}
static const int ALIGN_SIZE = sizeof(void*);
// align to the 8bytes, if (8+1), it will be (8+8)bytes.
inline size_t _align(size_t size) const {
return (size + (ALIGN_SIZE - 1)) & ~(ALIGN_SIZE - 1);
}
// down_align to 8bytes, if (8+1), it will be (8+0)bytes.
inline size_t _down_align(size_t size) const {
return size & ~(ALIGN_SIZE - 1);
}
......@@ -398,18 +505,32 @@ class Mempool {
size_t _free_size;
char* _free_cursor;
fugue::memory::Region* _blocks;
fugue::memory::BlockReference* _block;
fugue::memory::Region* _region;
fugue::memory::BlockReference* _blockReference;
};
// use threal-local key instead of __thread.
// it`s not referenced.
/*
extern __thread Mempool* g_mempool;
*/
// class mempool is a Interface.
// it`s not necessary at all.
/*
class mempool {
public:
virtual void* malloc(size_t size) = 0;
virtual void free(void* p, size_t size) = 0;
inline virtual ~mempool() {}
};
*/
// GlobalMempool is a Singleton-RAII class.
// It`s propose is to manage the thread-local pointer 'g_mempool'(class
// Mempool*)
// It`s not referenced, so it`s useless.
/*
class GlobalMempool : public mempool {
public:
GlobalMempool() {
......@@ -439,7 +560,13 @@ class GlobalMempool : public mempool {
Mempool* get() { return g_mempool; }
};
*/
// MempoolGuard is a RAII class.
// It`s propose is to manage the thread-local pointer 'g_mempool'(class
// Mempool*)
// It`s not referenced, so it`s useless.
/*
class MempoolGuard {
public:
explicit MempoolGuard(fugue::memory::Region* region) : _mempool(region) {
......@@ -462,7 +589,7 @@ class MempoolGuard {
Mempool _mempool;
Mempool* _saved_mempool;
};
*/
inline std::string print_trace() {
static const int BT_BUF_SIZE = 400;
std::stringstream debug_stream;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册