diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index fafb146ad868c217a4b102a94d96ed9aeb829e4d..e0d0e9408b1dde6edf29b895a4c3c916105de646 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -83,6 +83,8 @@ struct SVBufPool { int32_t vnodeOpenBufPool(SVnode* pVnode); int32_t vnodeCloseBufPool(SVnode* pVnode); void vnodeBufPoolReset(SVBufPool* pPool); +void vnodeBufPoolAddToFreeList(SVBufPool* pPool); +int32_t vnodeBufPoolRecycle(SVBufPool* pPool); // vnodeQuery.c int32_t vnodeQueryOpen(SVnode* pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index d05f12641371ca57d6de87ad566358668af5a96b..2aff4a19273f50476fc9f18b70784c68c118a515 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -212,6 +212,33 @@ void vnodeBufPoolRef(SVBufPool *pPool) { ASSERT(nRef > 0); } +void vnodeBufPoolAddToFreeList(SVBufPool *pPool) { + SVnode *pVnode = pPool->pVnode; + + int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; + if (pPool->node.size != size) { + SVBufPool *pNewPool = NULL; + if (vnodeBufPoolCreate(pVnode, pPool->id, size, &pNewPool) < 0) { + vWarn("vgId:%d failed to change buffer pool of id %d size from %" PRId64 " to %" PRId64 " since %s", + TD_VID(pVnode), pPool->id, pPool->node.size, size, tstrerror(errno)); + } else { + vInfo("vgId:%d buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id, + pPool->node.size, size); + + vnodeBufPoolDestroy(pPool); + pPool = pNewPool; + pVnode->aBufPool[pPool->id] = pPool; + } + } + + // add to free list + vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id); + vnodeBufPoolReset(pPool); + pPool->freeNext = pVnode->freeList; + pVnode->freeList = pPool; + taosThreadCondSignal(&pVnode->poolNotEmpty); +} + void vnodeBufPoolUnRef(SVBufPool *pPool) { if (pPool == NULL) return; @@ -221,7 +248,7 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit; - // remove from recycle list or on-recycle position + // remove from recycle queue or on-recycle position if (pVnode->onRecycle == pPool) { pVnode->onRecycle = NULL; } else { @@ -239,28 +266,7 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { pPool->recyclePrev = pPool->recycleNext = NULL; } - // change the pool size if need - int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; - if (pPool->node.size != size) { - SVBufPool *pNewPool = NULL; - if (vnodeBufPoolCreate(pVnode, pPool->id, size, &pNewPool) < 0) { - vWarn("vgId:%d failed to change buffer pool of id %d size from %" PRId64 " to %" PRId64 " since %s", - TD_VID(pVnode), pPool->id, pPool->node.size, size, tstrerror(errno)); - } else { - vInfo("vgId:%d buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id, - pPool->node.size, size); - - vnodeBufPoolDestroy(pPool); - pPool = pNewPool; - pVnode->aBufPool[pPool->id] = pPool; - } - } - - // add to free list - vnodeBufPoolReset(pPool); - pPool->freeNext = pVnode->freeList; - pVnode->freeList = pPool; - taosThreadCondSignal(&pVnode->poolNotEmpty); + vnodeBufPoolAddToFreeList(pPool); _exit: taosThreadMutexUnlock(&pVnode->mutex); @@ -295,6 +301,48 @@ int32_t vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode) { taosThreadMutexUnlock(&pPool->mutex); +_exit: + return code; +} + +int32_t vnodeBufPoolRecycle(SVBufPool *pPool) { + int32_t code = 0; + + bool canRecycle; + SVnode *pVnode = pPool->pVnode; + + vDebug("vgId:%d recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id); + + taosThreadMutexLock(&pPool->mutex); + + SQueryNode *pNode = pPool->qList.pNext; + while (pNode != &pPool->qList) { + int32_t rc = pNode->reseek(pNode->pQHandle); + if (rc == 0) { + SQueryNode *pTNode = pNode->pNext; + pNode->pNext->ppNext = pNode->ppNext; + *pNode->ppNext = pNode->pNext; + pPool->nQuery--; + pNode = pTNode; + } else if (rc == TSDB_CODE_VND_QUERY_BUSY) { + pNode = pNode->pNext; + } else { + taosThreadMutexUnlock(&pPool->mutex); + code = rc; + goto _exit; + } + } + + canRecycle = (pPool->nQuery == 0); + + taosThreadMutexUnlock(&pPool->mutex); + + if (canRecycle) { + ASSERT(atomic_load_32(&pPool->nRef) == 0); + pVnode->onRecycle = NULL; + vnodeBufPoolAddToFreeList(pPool); + } + _exit: return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 8657b80bdeca437d4f319019378392c039872f5c..d3fe8e6cc52247fe4c9483af364bf3af2189d462 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -31,7 +31,7 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode)); goto _exit; } else { - vDebug("vgId:%d buffer pool %p of id %d on recycle list, try to recycle", TD_VID(pVnode), pVnode->recycleHead, + vDebug("vgId:%d buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead, pVnode->recycleHead->id); pVnode->onRecycle = pVnode->recycleHead; @@ -45,33 +45,8 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { } } - // do recycle the buffer pool - SVBufPool *pPool = pVnode->onRecycle; - vDebug("vgId:%d buffer pool %p of id %d on recycle", TD_VID(pVnode), pPool, pPool->id); - - taosThreadMutexLock(&pPool->mutex); - - SQueryNode *pNode = pPool->qList.pNext; - while (pNode != &pPool->qList) { - int32_t rc = pNode->reseek(pNode->pQHandle); - if (rc == 0) { - SQueryNode *pTNode = pNode->pNext; - pNode->pNext->ppNext = pNode->ppNext; - *pNode->ppNext = pNode->pNext; - pPool->nQuery--; - pNode = pTNode; - } else if (rc == TSDB_CODE_VND_QUERY_BUSY) { - pNode = pNode->pNext; - } else { - taosThreadMutexUnlock(&pPool->mutex); - code = rc; - goto _exit; - } - } - - taosThreadMutexUnlock(&pPool->mutex); - - // TODO: if (pPool->nQuery == 0) add to free list + code = vnodeBufPoolRecycle(pVnode->onRecycle); + if (code) goto _exit; _exit: if (code) { @@ -360,17 +335,7 @@ _exit: return code; } -static int32_t vnodeCommitTask(void *arg) { - int32_t code = 0; - - SCommitInfo *pInfo = (SCommitInfo *)arg; - SVnode *pVnode = pInfo->pVnode; - - // commit - code = vnodeCommitImpl(pInfo); - if (code) goto _exit; - - // recycle buffer pool +static void vnodeReturnBufPool(SVnode *pVnode) { taosThreadMutexLock(&pVnode->mutex); SVBufPool *pPool = pVnode->onCommit; @@ -378,16 +343,9 @@ static int32_t vnodeCommitTask(void *arg) { pVnode->onCommit = NULL; if (nRef == 0) { - // add to free list - vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id); - - vnodeBufPoolReset(pPool); - pPool->freeNext = pVnode->freeList; - pVnode->freeList = pPool; - taosThreadCondSignal(&pVnode->poolNotEmpty); + vnodeBufPoolAddToFreeList(pPool); } else if (nRef > 0) { - // add to recycle list - vDebug("vgId:%d buffer pool %p of id %d is added to recycle list", TD_VID(pVnode), pPool, pPool->id); + vDebug("vgId:%d buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id); if (pVnode->recycleTail == NULL) { pPool->recyclePrev = pPool->recycleNext = NULL; @@ -403,6 +361,18 @@ static int32_t vnodeCommitTask(void *arg) { } taosThreadMutexUnlock(&pVnode->mutex); +} +static int32_t vnodeCommitTask(void *arg) { + int32_t code = 0; + + SCommitInfo *pInfo = (SCommitInfo *)arg; + SVnode *pVnode = pInfo->pVnode; + + // commit + code = vnodeCommitImpl(pInfo); + if (code) goto _exit; + + vnodeReturnBufPool(pVnode); _exit: // end commit