提交 c704496f 编写于 作者: H Hongze Cheng

more refact meta

上级 07720702
......@@ -5,8 +5,6 @@ target_sources(
PRIVATE
# vnode
"src/vnd/vnodeOpen.c"
# "src/vnd/vnodeArenaMAImpl.c"
# "src/vnd/vnodeBufferPool.c"
"src/vnd/vnodeBufPool.c"
"src/vnd/vnodeCfg.c"
"src/vnd/vnodeCommit.c"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
static SVArenaNode *vArenaNodeNew(uint64_t capacity);
static void vArenaNodeFree(SVArenaNode *pNode);
SVMemAllocator *vmaCreate(uint64_t capacity, uint64_t ssize, uint64_t lsize) {
SVMemAllocator *pVMA = (SVMemAllocator *)taosMemoryMalloc(sizeof(*pVMA));
if (pVMA == NULL) {
return NULL;
}
pVMA->capacity = capacity;
pVMA->ssize = ssize;
pVMA->lsize = lsize;
TD_SLIST_INIT(&(pVMA->nlist));
pVMA->pNode = vArenaNodeNew(capacity);
if (pVMA->pNode == NULL) {
taosMemoryFree(pVMA);
return NULL;
}
TD_SLIST_PUSH(&(pVMA->nlist), pVMA->pNode);
return pVMA;
}
void vmaDestroy(SVMemAllocator *pVMA) {
if (pVMA) {
while (TD_SLIST_NELES(&(pVMA->nlist)) > 1) {
SVArenaNode *pNode = TD_SLIST_HEAD(&(pVMA->nlist));
TD_SLIST_POP(&(pVMA->nlist));
vArenaNodeFree(pNode);
}
taosMemoryFree(pVMA);
}
}
void vmaReset(SVMemAllocator *pVMA) {
while (TD_SLIST_NELES(&(pVMA->nlist)) > 1) {
SVArenaNode *pNode = TD_SLIST_HEAD(&(pVMA->nlist));
TD_SLIST_POP(&(pVMA->nlist));
vArenaNodeFree(pNode);
}
SVArenaNode *pNode = TD_SLIST_HEAD(&(pVMA->nlist));
pNode->ptr = pNode->data;
}
void *vmaMalloc(SVMemAllocator *pVMA, uint64_t size) {
SVArenaNode *pNode = TD_SLIST_HEAD(&(pVMA->nlist));
void * ptr;
if (pNode->size < POINTER_DISTANCE(pNode->ptr, pNode->data) + size) {
uint64_t capacity = TMAX(pVMA->ssize, size);
pNode = vArenaNodeNew(capacity);
if (pNode == NULL) {
// TODO: handle error
return NULL;
}
TD_SLIST_PUSH(&(pVMA->nlist), pNode);
}
ptr = pNode->ptr;
pNode->ptr = POINTER_SHIFT(ptr, size);
return ptr;
}
void vmaFree(SVMemAllocator *pVMA, void *ptr) {
// TODO
}
bool vmaIsFull(SVMemAllocator *pVMA) {
SVArenaNode *pNode = TD_SLIST_HEAD(&(pVMA->nlist));
return (TD_SLIST_NELES(&(pVMA->nlist)) > 1) ||
(pNode->size < POINTER_DISTANCE(pNode->ptr, pNode->data) + pVMA->lsize);
}
/* ------------------------ STATIC METHODS ------------------------ */
static SVArenaNode *vArenaNodeNew(uint64_t capacity) {
SVArenaNode *pNode = NULL;
pNode = (SVArenaNode *)taosMemoryMalloc(sizeof(*pNode) + capacity);
if (pNode == NULL) {
return NULL;
}
pNode->size = capacity;
pNode->ptr = pNode->data;
return pNode;
}
static void vArenaNodeFree(SVArenaNode *pNode) {
if (pNode) {
taosMemoryFree(pNode);
}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
/* ------------------------ STRUCTURES ------------------------ */
#define VNODE_BUF_POOL_SHARDS 3
struct SVBufPool {
TdThreadMutex mutex;
TdThreadCond hasFree;
TD_DLIST(SVMemAllocator) free;
TD_DLIST(SVMemAllocator) incycle;
SVMemAllocator *inuse;
// MAF for submodules to use
SMemAllocatorFactory *pMAF;
};
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pMAF);
static void vBufPoolDestroyMA(SMemAllocatorFactory *pMAF, SMemAllocator *pMA);
int vnodeOpenBufPool(SVnode *pVnode) {
uint64_t capacity;
if ((pVnode->pBufPool = (SVBufPool *)taosMemoryCalloc(1, sizeof(SVBufPool))) == NULL) {
/* TODO */
return -1;
}
TD_DLIST_INIT(&(pVnode->pBufPool->free));
TD_DLIST_INIT(&(pVnode->pBufPool->incycle));
pVnode->pBufPool->inuse = NULL;
// TODO
capacity = pVnode->config.wsize / VNODE_BUF_POOL_SHARDS;
for (int i = 0; i < VNODE_BUF_POOL_SHARDS; i++) {
SVMemAllocator *pVMA = vmaCreate(capacity, pVnode->config.ssize, pVnode->config.lsize);
if (pVMA == NULL) {
// TODO: handle error
return -1;
}
TD_DLIST_APPEND(&(pVnode->pBufPool->free), pVMA);
}
pVnode->pBufPool->pMAF = (SMemAllocatorFactory *)taosMemoryMalloc(sizeof(SMemAllocatorFactory));
if (pVnode->pBufPool->pMAF == NULL) {
// TODO: handle error
return -1;
}
pVnode->pBufPool->pMAF->impl = pVnode;
pVnode->pBufPool->pMAF->create = vBufPoolCreateMA;
pVnode->pBufPool->pMAF->destroy = vBufPoolDestroyMA;
return 0;
}
void vnodeCloseBufPool(SVnode *pVnode) {
if (pVnode->pBufPool) {
taosMemoryFreeClear(pVnode->pBufPool->pMAF);
vmaDestroy(pVnode->pBufPool->inuse);
while (true) {
SVMemAllocator *pVMA = TD_DLIST_HEAD(&(pVnode->pBufPool->incycle));
if (pVMA == NULL) break;
TD_DLIST_POP(&(pVnode->pBufPool->incycle), pVMA);
vmaDestroy(pVMA);
}
while (true) {
SVMemAllocator *pVMA = TD_DLIST_HEAD(&(pVnode->pBufPool->free));
if (pVMA == NULL) break;
TD_DLIST_POP(&(pVnode->pBufPool->free), pVMA);
vmaDestroy(pVMA);
}
taosMemoryFree(pVnode->pBufPool);
pVnode->pBufPool = NULL;
}
}
int vnodeBufPoolSwitch(SVnode *pVnode) {
SVMemAllocator *pvma = pVnode->pBufPool->inuse;
pVnode->pBufPool->inuse = NULL;
if (pvma) {
TD_DLIST_APPEND(&(pVnode->pBufPool->incycle), pvma);
}
return 0;
}
int vnodeBufPoolRecycle(SVnode *pVnode) {
SVBufPool * pBufPool = pVnode->pBufPool;
SVMemAllocator *pvma = TD_DLIST_HEAD(&(pBufPool->incycle));
if (pvma == NULL) return 0;
// ASSERT(pvma != NULL);
TD_DLIST_POP(&(pBufPool->incycle), pvma);
vmaReset(pvma);
TD_DLIST_APPEND(&(pBufPool->free), pvma);
return 0;
}
void *vnodeMalloc(SVnode *pVnode, uint64_t size) {
SVBufPool *pBufPool = pVnode->pBufPool;
if (pBufPool->inuse == NULL) {
while (true) {
// TODO: add sem_wait and sem_post
pBufPool->inuse = TD_DLIST_HEAD(&(pBufPool->free));
if (pBufPool->inuse) {
TD_DLIST_POP(&(pBufPool->free), pBufPool->inuse);
break;
} else {
// tsem_wait(&(pBufPool->hasFree));
}
}
}
return vmaMalloc(pBufPool->inuse, size);
}
bool vnodeBufPoolIsFull(SVnode *pVnode) {
if (pVnode->pBufPool->inuse == NULL) return false;
return vmaIsFull(pVnode->pBufPool->inuse);
}
SMemAllocatorFactory *vBufPoolGetMAF(SVnode *pVnode) { return pVnode->pBufPool->pMAF; }
/* ------------------------ STATIC METHODS ------------------------ */
typedef struct {
SVnode * pVnode;
SVMemAllocator *pVMA;
} SVMAWrapper;
static FORCE_INLINE void *vmaMaloocCb(SMemAllocator *pMA, uint64_t size) {
SVMAWrapper *pWrapper = (SVMAWrapper *)(pMA->impl);
return vmaMalloc(pWrapper->pVMA, size);
}
// TODO: Add atomic operations here
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pMAF) {
SMemAllocator *pMA;
SVnode * pVnode = (SVnode *)(pMAF->impl);
SVMAWrapper * pWrapper;
pMA = (SMemAllocator *)taosMemoryCalloc(1, sizeof(*pMA) + sizeof(SVMAWrapper));
if (pMA == NULL) {
return NULL;
}
pVnode->pBufPool->inuse->_ref.val++;
pWrapper = POINTER_SHIFT(pMA, sizeof(*pMA));
pWrapper->pVnode = pVnode;
pWrapper->pVMA = pVnode->pBufPool->inuse;
pMA->impl = pWrapper;
TD_MA_MALLOC_FUNC(pMA) = vmaMaloocCb;
return pMA;
}
static void vBufPoolDestroyMA(SMemAllocatorFactory *pMAF, SMemAllocator *pMA) {
SVMAWrapper * pWrapper = (SVMAWrapper *)(pMA->impl);
SVnode * pVnode = pWrapper->pVnode;
SVMemAllocator *pVMA = pWrapper->pVMA;
taosMemoryFree(pMA);
if (--pVMA->_ref.val == 0) {
TD_DLIST_POP(&(pVnode->pBufPool->incycle), pVMA);
vmaReset(pVMA);
TD_DLIST_APPEND(&(pVnode->pBufPool->free), pVMA);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册