提交 15995e76 编写于 作者: H Hongze Cheng

more code

上级 37554543
......@@ -61,12 +61,14 @@ struct SVBufPoolNode {
};
struct SVBufPool {
SVBufPool* freeNext;
SVBufPool* recycleNext;
SVBufPool* recyclePrev;
SVBufPool* freeNext;
SVBufPool* recycleNext;
SVBufPool* recyclePrev;
SVnode* pVnode;
TdThreadSpinlock* lock;
int32_t id;
volatile int32_t nRef;
TdThreadSpinlock* lock;
int64_t size;
uint8_t* ptr;
SVBufPoolNode* pTail;
......
......@@ -344,6 +344,7 @@ struct SVnode {
SVBufPool* onCommit;
SVBufPool* recycleHead;
SVBufPool* recycleTail;
SVBufPool* onRecycle;
SMeta* pMeta;
SSma* pSma;
......
......@@ -16,7 +16,7 @@
#include "vnd.h"
/* ------------------------ STRUCTURES ------------------------ */
static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) {
static int vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPool **ppPool) {
SVBufPool *pPool;
pPool = taosMemoryMalloc(sizeof(SVBufPool) + size);
......@@ -24,6 +24,14 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memset(pPool, 0, sizeof(SVBufPool));
pPool->pVnode = pVnode;
pPool->id = id;
pPool->ptr = pPool->node.data;
pPool->pTail = &pPool->node;
pPool->node.prev = NULL;
pPool->node.pnext = &pPool->pTail;
pPool->node.size = size;
if (VND_IS_RSMA(pVnode)) {
pPool->lock = taosMemoryMalloc(sizeof(TdThreadSpinlock));
......@@ -42,16 +50,6 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
pPool->lock = NULL;
}
pPool->freeNext = NULL;
pPool->pVnode = pVnode;
pPool->nRef = 0;
pPool->size = 0;
pPool->ptr = pPool->node.data;
pPool->pTail = &pPool->node;
pPool->node.prev = NULL;
pPool->node.pnext = &pPool->pTail;
pPool->node.size = size;
*ppPool = pPool;
return 0;
}
......@@ -71,7 +69,7 @@ int vnodeOpenBufPool(SVnode *pVnode) {
for (int i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) {
// create pool
if (vnodeBufPoolCreate(pVnode, size, &pVnode->aBufPool[i])) {
if (vnodeBufPoolCreate(pVnode, i, size, &pVnode->aBufPool[i])) {
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
vnodeCloseBufPool(pVnode);
return -1;
......@@ -206,34 +204,54 @@ void vnodeBufPoolRef(SVBufPool *pPool) {
}
void vnodeBufPoolUnRef(SVBufPool *pPool) {
if (pPool == NULL) {
return;
}
int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1);
if (nRef == 0) {
SVnode *pVnode = pPool->pVnode;
vnodeBufPoolReset(pPool);
taosThreadMutexLock(&pVnode->mutex);
int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS;
if (pPool->node.size != size) {
SVBufPool *pPoolT = NULL;
if (vnodeBufPoolCreate(pVnode, size, &pPoolT) < 0) {
vWarn("vgId:%d, try to change buf pools size from %" PRId64 " to %" PRId64 " since %s", TD_VID(pVnode),
pPool->node.size, size, tstrerror(errno));
} else {
vnodeBufPoolDestroy(pPool);
pPool = pPoolT;
vDebug("vgId:%d, change buf pools size from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->node.size, size);
}
if (pPool == NULL) return;
if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) return;
SVnode *pVnode = pPool->pVnode;
taosThreadMutexLock(&pVnode->mutex);
// remove from recycle list or on-recycle position
if (pVnode->onRecycle == pPool) {
pVnode->onRecycle = NULL;
} else {
if (pPool->recyclePrev) {
pPool->recyclePrev->recycleNext = pPool->recycleNext;
} else {
pVnode->recycleHead = pPool->recycleNext;
}
pPool->freeNext = pVnode->freeList;
pVnode->freeList = pPool;
taosThreadCondSignal(&pVnode->poolNotEmpty);
if (pPool->recycleNext) {
pPool->recycleNext->recyclePrev = pPool->recyclePrev;
} else {
pVnode->recycleTail = pPool->recyclePrev;
}
pPool->recyclePrev = pPool->recycleNext = NULL;
}
taosThreadMutexUnlock(&pVnode->mutex);
// change the pool size if need
int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS;
if (pPool->node.size != size) {
SVBufPool *pNewPool = NULL;
if (vnodeBufPoolCreate(pVnode, pPool->id, size, &pNewPool) < 0) {
vWarn("vgId:%d failed to change buffer pool of id %d size from %" PRId64 " to %" PRId64 " since %s",
TD_VID(pVnode), pPool->id, pPool->node.size, size, tstrerror(errno));
} else {
vInfo("vgId:%d buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id,
pPool->node.size, size);
vnodeBufPoolDestroy(pPool);
pPool = pNewPool;
pVnode->aBufPool[pPool->id] = pPool;
}
}
// add to free list
vnodeBufPoolReset(pPool);
pPool->freeNext = pVnode->freeList;
pVnode->freeList = pPool;
taosThreadCondSignal(&pVnode->poolNotEmpty);
taosThreadMutexUnlock(&pVnode->mutex);
}
......@@ -25,7 +25,34 @@ static int vnodeCommitImpl(SCommitInfo *pInfo);
static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) {
int32_t code = 0;
ASSERT(0); // TODO
if (pVnode->onRecycle) {
vDebug("vgId:%d buffer pool %p of id %d is on recycling", TD_VID(pVnode), pVnode->onRecycle, pVnode->onRecycle->id);
goto _exit;
}
if (pVnode->recycleHead) {
vDebug("vgId:%d buffer pool %p of id %d on recycle list, try to recycle", TD_VID(pVnode), pVnode->recycleHead,
pVnode->recycleHead->id);
// pop the header buffer pool for recycling
pVnode->onRecycle = pVnode->recycleHead;
if (pVnode->recycleHead == pVnode->recycleTail) {
pVnode->recycleHead = pVnode->recycleTail = NULL;
} else {
pVnode->recycleHead = pVnode->recycleHead->recycleNext;
pVnode->recycleHead->recyclePrev = NULL;
}
pVnode->onRecycle->recycleNext = pVnode->onRecycle->recyclePrev = NULL;
{
// TODO: do recycle the buffer pool
ASSERT(0);
}
} else {
vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode));
}
_exit:
return code;
}
......@@ -36,7 +63,8 @@ int vnodeBegin(SVnode *pVnode) {
int32_t nTry = 0;
while (++nTry) {
if (pVnode->freeList) {
vDebug("vgId:%d allocate free buffer pool on %d try, pPool:%p", TD_VID(pVnode), nTry, pVnode->freeList);
vDebug("vgId:%d allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList,
pVnode->freeList->id);
pVnode->inUse = pVnode->freeList;
pVnode->inUse->nRef = 1;
......@@ -259,8 +287,11 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
code = smaPrepareAsyncCommit(pVnode->pSma);
if (code) goto _exit;
vnodeBufPoolUnRef(pVnode->inUse);
taosThreadMutexLock(&pVnode->mutex);
ASSERT(pVnode->onCommit == NULL);
pVnode->onCommit = pVnode->inUse;
pVnode->inUse = NULL;
taosThreadMutexUnlock(&pVnode->mutex);
_exit:
if (code) {
......@@ -272,19 +303,51 @@ _exit:
return code;
}
static int32_t vnodeCommitTask(void *arg) {
int32_t code = 0;
SCommitInfo *pInfo = (SCommitInfo *)arg;
SVnode *pVnode = pInfo->pVnode;
// commit
code = vnodeCommitImpl(pInfo);
if (code) goto _exit;
// recycle buffer pool
SVBufPool *pPool = pVnode->onCommit;
taosThreadMutexLock(&pVnode->mutex);
pVnode->onCommit = NULL;
int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1);
if (nRef == 0) {
// add to free list
vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id);
vnodeBufPoolReset(pPool);
pPool->freeNext = pVnode->freeList;
pVnode->freeList = pPool;
taosThreadCondSignal(&pVnode->poolNotEmpty);
} else {
// add to recycle list
vDebug("vgId:%d buffer pool %p of id %d is added to recycle list", TD_VID(pVnode), pPool, pPool->id);
if (pVnode->recycleTail == NULL) {
pPool->recyclePrev = pPool->recycleNext = NULL;
pVnode->recycleHead = pVnode->recycleTail = pPool;
} else {
pPool->recyclePrev = pVnode->recycleTail;
pPool->recycleNext = NULL;
pVnode->recycleTail->recycleNext = pPool;
pVnode->recycleTail = pPool;
}
}
taosThreadMutexUnlock(&pVnode->mutex);
_exit:
// end commit
tsem_post(&pInfo->pVnode->canCommit);
tsem_post(&pVnode->canCommit);
taosMemoryFree(pInfo);
return code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册