diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 59bba315f45972d06967d2418586131d2c1283be..52d8a75ee06b10de97ebd488d3a9a2ee439bacea 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -415,6 +415,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_NO_AVAIL_BUFPOOL TAOS_DEF_ERROR_CODE(0, 0x0528) #define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529) #define TSDB_CODE_VND_DUP_REQUEST TAOS_DEF_ERROR_CODE(0, 0x0530) +#define TSDB_CODE_VND_QUERY_BUSY TAOS_DEF_ERROR_CODE(0, 0x0531) // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 70dbadeeea6228a11003b9bc65477d37f611f009..fbc78e11043a24371db8a51a3e6ba14218ea3622 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -211,7 +211,7 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); -int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _query_reseek_func_t reseek, SQueryNode **ppNode); +int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode); int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); // STbDataIter diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index e8771f15b51e8f2994cab796b3f762b4d4458c66..2069ee070c77c8f4626148e0a270760dc39784fa 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -108,7 +108,7 @@ void vnodeBufPoolRef(SVBufPool* pPool); void vnodeBufPoolUnRef(SVBufPool* pPool); int vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo); -int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, void* pQHandle, _query_reseek_func_t reseekFn); +int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, SQueryNode* pQNode); int32_t vnodeBufPoolDeregisterQuery(SVBufPool* pPool, SQueryNode* pQNode); // meta diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index f661bf5ddc33aba6c030d6d002ac53f17a29110a..8d87db9a15a061090fee405ff12eb9e1dfbe4548 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -749,13 +749,13 @@ _exit: int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } -int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _query_reseek_func_t reseek, SQueryNode **ppNode) { +int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode) { int32_t code = 0; int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); ASSERT(nRef > 0); - vnodeBufPoolRegisterQuery(pMemTable->pPool, pQHandle, reseek); + vnodeBufPoolRegisterQuery(pMemTable->pPool, pQNode); _exit: return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index b52c75f158a47d9f89228067543e9ba6d480bec3..e3dfaaa6672e9e5874b917a99f3f25f5720f43cb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4620,46 +4620,67 @@ int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsd SVersionRange* pRange = &pReader->verRange; // alloc - *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap)); - if (*ppSnap == NULL) { + STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(*pSnap)); + if (pSnap == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } // lock - code = taosThreadRwlockRdlock(&pTsdb->rwLock); - if (code) { - code = TAOS_SYSTEM_ERROR(code); - goto _exit; - } + taosThreadRwlockRdlock(&pTsdb->rwLock); // take snapshot if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) { - tsdbRefMemTable(pTsdb->mem, pReader, reseek, &(*ppSnap)->pNode); - (*ppSnap)->pMem = pTsdb->mem; + pSnap->pMem = pTsdb->mem; + pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode)); + if (pSnap->pNode == NULL) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + pSnap->pNode->pQHandle = pReader; + pSnap->pNode->reseek = reseek; + + tsdbRefMemTable(pTsdb->mem, pSnap->pNode); } if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) { - tsdbRefMemTable(pTsdb->imem, pReader, reseek, &(*ppSnap)->pINode); - (*ppSnap)->pIMem = pTsdb->imem; + pSnap->pIMem = pTsdb->imem; + pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode)); + if (pSnap->pINode == NULL) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + pSnap->pINode->pQHandle = pReader; + pSnap->pINode->reseek = reseek; + + tsdbRefMemTable(pTsdb->imem, pSnap->pINode); } // fs - code = tsdbFSRef(pTsdb, &(*ppSnap)->fs); + code = tsdbFSRef(pTsdb, &pSnap->fs); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); goto _exit; } // unlock - code = taosThreadRwlockUnlock(&pTsdb->rwLock); - if (code) { - code = TAOS_SYSTEM_ERROR(code); - goto _exit; - } + taosThreadRwlockUnlock(&pTsdb->rwLock); tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode)); + _exit: + if (code) { + *ppSnap = NULL; + if (pSnap) { + if (pSnap->pNode) taosMemoryFree(pSnap->pNode); + if (pSnap->pINode) taosMemoryFree(pSnap->pINode); + taosMemoryFree(pSnap); + } + } else { + *ppSnap = pSnap; + } return code; } @@ -4676,6 +4697,8 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { } tsdbFSUnref(pTsdb, &pSnap->fs); + if (pSnap->pNode) taosMemoryFree(pSnap->pNode); + if (pSnap->pINode) taosMemoryFree(pSnap->pINode); taosMemoryFree(pSnap); } tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode)); diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 00706ad12d7981f6b774d66a0f63228e917ce371..d05f12641371ca57d6de87ad566358668af5a96b 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -215,12 +215,12 @@ void vnodeBufPoolRef(SVBufPool *pPool) { void vnodeBufPoolUnRef(SVBufPool *pPool) { if (pPool == NULL) return; - if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) return; - SVnode *pVnode = pPool->pVnode; taosThreadMutexLock(&pVnode->mutex); + if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit; + // remove from recycle list or on-recycle position if (pVnode->onRecycle == pPool) { pVnode->onRecycle = NULL; @@ -262,20 +262,14 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { pVnode->freeList = pPool; taosThreadCondSignal(&pVnode->poolNotEmpty); +_exit: taosThreadMutexUnlock(&pVnode->mutex); + return; } -int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, void *pQHandle, _query_reseek_func_t reseekFn) { +int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) { int32_t code = 0; - SQueryNode *pQNode = taosMemoryMalloc(sizeof(*pQNode)); - if (pQNode == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - pQNode->pQHandle = pQHandle; - pQNode->reseek = reseekFn; - taosThreadMutexLock(&pPool->mutex); pQNode->pNext = pPool->qList.pNext; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 8cc50876cb60daf26e51226cadfe45f308512010..00ea7fbd05349445f7c2e8e2530c3817cfc67ce5 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -26,43 +26,57 @@ static int vnodeCommitImpl(SCommitInfo *pInfo); static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { int32_t code = 0; - if (pVnode->onRecycle) { - vDebug("vgId:%d buffer pool %p of id %d is on recycling", TD_VID(pVnode), pVnode->onRecycle, pVnode->onRecycle->id); - goto _exit; - } - - if (pVnode->recycleHead) { - vDebug("vgId:%d buffer pool %p of id %d on recycle list, try to recycle", TD_VID(pVnode), pVnode->recycleHead, - pVnode->recycleHead->id); - - // pop the header buffer pool for recycling - pVnode->onRecycle = pVnode->recycleHead; - if (pVnode->recycleHead == pVnode->recycleTail) { - pVnode->recycleHead = pVnode->recycleTail = NULL; + if (pVnode->onRecycle == NULL) { + if (pVnode->recycleHead == NULL) { + vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode)); + goto _exit; } else { - pVnode->recycleHead = pVnode->recycleHead->recycleNext; - pVnode->recycleHead->recyclePrev = NULL; + vDebug("vgId:%d buffer pool %p of id %d on recycle list, try to recycle", TD_VID(pVnode), pVnode->recycleHead, + pVnode->recycleHead->id); + + pVnode->onRecycle = pVnode->recycleHead; + if (pVnode->recycleHead == pVnode->recycleTail) { + pVnode->recycleHead = pVnode->recycleTail = NULL; + } else { + pVnode->recycleHead = pVnode->recycleHead->recycleNext; + pVnode->recycleHead->recyclePrev = NULL; + } + pVnode->onRecycle->recycleNext = pVnode->onRecycle->recyclePrev = NULL; } - pVnode->onRecycle->recycleNext = pVnode->onRecycle->recyclePrev = NULL; + } - // do recycle the buffer pool - SVBufPool *pPool = pVnode->onRecycle; + // do recycle the buffer pool + SVBufPool *pPool = pVnode->onRecycle; + vDebug("vgId:%d buffer pool %p of id %d on recycle", TD_VID(pVnode), pPool, pPool->id); - taosThreadMutexLock(&pPool->mutex); + taosThreadMutexLock(&pPool->mutex); - SQueryNode *pNode = pPool->qList.pNext; - while (pNode != &pPool->qList) { - // TODO: refact/finish here - pNode->reseek(pNode->pQHandle); + SQueryNode *pNode = pPool->qList.pNext; + while (pNode != &pPool->qList) { + int32_t rc = pNode->reseek(pNode->pQHandle); + if (rc == 0) { + SQueryNode *pTNode = pNode->pNext; + pNode->pNext->ppNext = pNode->ppNext; + *pNode->ppNext = pNode->pNext; + pPool->nQuery--; + pNode = pTNode; + } else if (rc == TSDB_CODE_VND_QUERY_BUSY) { pNode = pNode->pNext; + } else { + taosThreadMutexUnlock(&pPool->mutex); + code = rc; + goto _exit; } - - taosThreadMutexUnlock(&pPool->mutex); - } else { - vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode)); } + taosThreadMutexUnlock(&pPool->mutex); + + // TODO: if (pPool->nQuery == 0) add to free list + _exit: + if (code) { + vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code)); + } return code; } int vnodeBegin(SVnode *pVnode) { @@ -267,6 +281,12 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { tsem_wait(&pVnode->canCommit); + taosThreadMutexLock(&pVnode->mutex); + ASSERT(pVnode->onCommit == NULL); + pVnode->onCommit = pVnode->inUse; + pVnode->inUse = NULL; + taosThreadMutexUnlock(&pVnode->mutex); + pVnode->state.commitTerm = pVnode->state.applyTerm; pInfo->info.config = pVnode->config; @@ -296,12 +316,6 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { code = smaPrepareAsyncCommit(pVnode->pSma); if (code) goto _exit; - taosThreadMutexLock(&pVnode->mutex); - ASSERT(pVnode->onCommit == NULL); - pVnode->onCommit = pVnode->inUse; - pVnode->inUse = NULL; - taosThreadMutexUnlock(&pVnode->mutex); - _exit: if (code) { vError("vgId:%d, %s failed at line %d since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, lino, @@ -323,12 +337,12 @@ static int32_t vnodeCommitTask(void *arg) { if (code) goto _exit; // recycle buffer pool - SVBufPool *pPool = pVnode->onCommit; - taosThreadMutexLock(&pVnode->mutex); + SVBufPool *pPool = pVnode->onCommit; + int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1); + pVnode->onCommit = NULL; - int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1); if (nRef == 0) { // add to free list vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id); @@ -337,7 +351,7 @@ static int32_t vnodeCommitTask(void *arg) { pPool->freeNext = pVnode->freeList; pVnode->freeList = pPool; taosThreadCondSignal(&pVnode->poolNotEmpty); - } else { + } else if (nRef > 0) { // add to recycle list vDebug("vgId:%d buffer pool %p of id %d is added to recycle list", TD_VID(pVnode), pPool, pPool->id); @@ -350,6 +364,8 @@ static int32_t vnodeCommitTask(void *arg) { pVnode->recycleTail->recycleNext = pPool; pVnode->recycleTail = pPool; } + } else { + ASSERT(0); } taosThreadMutexUnlock(&pVnode->mutex); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index a142bb76d7a39edff4075850dd90fa6e2c348442..bab3edc870bb888bf2ad6030ff102c88daa05be6 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -320,6 +320,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subsc TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_AVAIL_BUFPOOL, "No availabe buffer pool") TAOS_DEFINE_ERROR(TSDB_CODE_VND_STOPPED, "Vnode stopped") TAOS_DEFINE_ERROR(TSDB_CODE_VND_DUP_REQUEST, "Duplicate write request") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_QUERY_BUSY, "Query busy") // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")