提交 bff8c216 编写于 作者: H Hongze Cheng

more code

上级 bb2ed08a
...@@ -209,10 +209,10 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in ...@@ -209,10 +209,10 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
// tsdbMemTable ============================================================================================== // tsdbMemTable ==============================================================================================
// SMemTable // SMemTable
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
void tsdbMemTableDestroy(SMemTable *pMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive);
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid);
int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode); int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode);
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode); int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive);
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
// STbDataIter // STbDataIter
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter);
......
...@@ -105,11 +105,11 @@ void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); ...@@ -105,11 +105,11 @@ void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
void* vnodeBufPoolMallocAligned(SVBufPool* pPool, int size); void* vnodeBufPoolMallocAligned(SVBufPool* pPool, int size);
void vnodeBufPoolFree(SVBufPool* pPool, void* p); void vnodeBufPoolFree(SVBufPool* pPool, void* p);
void vnodeBufPoolRef(SVBufPool* pPool); void vnodeBufPoolRef(SVBufPool* pPool);
void vnodeBufPoolUnRef(SVBufPool* pPool); void vnodeBufPoolUnRef(SVBufPool* pPool, bool proactive);
int vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo); int vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo);
int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, SQueryNode* pQNode); int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, SQueryNode* pQNode);
int32_t vnodeBufPoolDeregisterQuery(SVBufPool* pPool, SQueryNode* pQNode); void vnodeBufPoolDeregisterQuery(SVBufPool* pPool, SQueryNode* pQNode, bool proactive);
// meta // meta
typedef struct SMCtbCursor SMCtbCursor; typedef struct SMCtbCursor SMCtbCursor;
......
...@@ -175,7 +175,7 @@ int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) { ...@@ -175,7 +175,7 @@ int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) {
pTsdb->imem = NULL; pTsdb->imem = NULL;
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbUnrefMemTable(pMemTable, NULL); tsdbUnrefMemTable(pMemTable, NULL, true);
goto _exit; goto _exit;
} }
...@@ -1664,7 +1664,7 @@ int32_t tsdbFinishCommit(STsdb *pTsdb) { ...@@ -1664,7 +1664,7 @@ int32_t tsdbFinishCommit(STsdb *pTsdb) {
// unlock // unlock
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadRwlockUnlock(&pTsdb->rwLock);
if (pMemTable) { if (pMemTable) {
tsdbUnrefMemTable(pMemTable, NULL); tsdbUnrefMemTable(pMemTable, NULL, true);
} }
_exit: _exit:
......
...@@ -76,9 +76,9 @@ _err: ...@@ -76,9 +76,9 @@ _err:
return code; return code;
} }
void tsdbMemTableDestroy(SMemTable *pMemTable) { void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive) {
if (pMemTable) { if (pMemTable) {
vnodeBufPoolUnRef(pMemTable->pPool); vnodeBufPoolUnRef(pMemTable->pPool, proactive);
taosMemoryFree(pMemTable->aBucket); taosMemoryFree(pMemTable->aBucket);
taosMemoryFree(pMemTable); taosMemoryFree(pMemTable);
} }
...@@ -761,16 +761,15 @@ _exit: ...@@ -761,16 +761,15 @@ _exit:
return code; return code;
} }
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode) { int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive) {
int32_t code = 0; int32_t code = 0;
if (pNode) { if (pNode) {
vnodeBufPoolDeregisterQuery(pMemTable->pPool, pNode); vnodeBufPoolDeregisterQuery(pMemTable->pPool, pNode, proactive);
} }
int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); if (atomic_sub_fetch_32(&pMemTable->nRef, 1) == 0) {
if (nRef == 0) { tsdbMemTableDestroy(pMemTable, proactive);
tsdbMemTableDestroy(pMemTable);
} }
return code; return code;
......
...@@ -88,7 +88,7 @@ _err: ...@@ -88,7 +88,7 @@ _err:
int tsdbClose(STsdb **pTsdb) { int tsdbClose(STsdb **pTsdb) {
if (*pTsdb) { if (*pTsdb) {
taosThreadRwlockWrlock(&(*pTsdb)->rwLock); taosThreadRwlockWrlock(&(*pTsdb)->rwLock);
tsdbMemTableDestroy((*pTsdb)->mem); tsdbMemTableDestroy((*pTsdb)->mem, true);
(*pTsdb)->mem = NULL; (*pTsdb)->mem = NULL;
taosThreadRwlockUnlock(&(*pTsdb)->rwLock); taosThreadRwlockUnlock(&(*pTsdb)->rwLock);
......
...@@ -4719,11 +4719,11 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { ...@@ -4719,11 +4719,11 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) {
if (pSnap) { if (pSnap) {
if (pSnap->pMem) { if (pSnap->pMem) {
tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode); tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode, true);
} }
if (pSnap->pIMem) { if (pSnap->pIMem) {
tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode); tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, true);
} }
tsdbFSUnref(pTsdb, &pSnap->fs); tsdbFSUnref(pTsdb, &pSnap->fs);
......
...@@ -239,12 +239,12 @@ void vnodeBufPoolAddToFreeList(SVBufPool *pPool) { ...@@ -239,12 +239,12 @@ void vnodeBufPoolAddToFreeList(SVBufPool *pPool) {
taosThreadCondSignal(&pVnode->poolNotEmpty); taosThreadCondSignal(&pVnode->poolNotEmpty);
} }
void vnodeBufPoolUnRef(SVBufPool *pPool) { void vnodeBufPoolUnRef(SVBufPool *pPool, bool proactive) {
if (pPool == NULL) return; if (pPool == NULL) return;
SVnode *pVnode = pPool->pVnode; SVnode *pVnode = pPool->pVnode;
taosThreadMutexLock(&pVnode->mutex); if (proactive) taosThreadMutexLock(&pVnode->mutex);
if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit; if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit;
...@@ -252,6 +252,8 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { ...@@ -252,6 +252,8 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) {
if (pVnode->onRecycle == pPool) { if (pVnode->onRecycle == pPool) {
pVnode->onRecycle = NULL; pVnode->onRecycle = NULL;
} else { } else {
ASSERT(proactive);
if (pPool->recyclePrev) { if (pPool->recyclePrev) {
pPool->recyclePrev->recycleNext = pPool->recycleNext; pPool->recyclePrev->recycleNext = pPool->recycleNext;
} else { } else {
...@@ -269,7 +271,7 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { ...@@ -269,7 +271,7 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) {
vnodeBufPoolAddToFreeList(pPool); vnodeBufPoolAddToFreeList(pPool);
_exit: _exit:
taosThreadMutexUnlock(&pVnode->mutex); if (proactive) taosThreadMutexUnlock(&pVnode->mutex);
return; return;
} }
...@@ -290,25 +292,21 @@ _exit: ...@@ -290,25 +292,21 @@ _exit:
return code; return code;
} }
int32_t vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode) { void vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode, bool proactive) {
int32_t code = 0; int32_t code = 0;
taosThreadMutexLock(&pPool->mutex); if (proactive) taosThreadMutexLock(&pPool->mutex);
pQNode->pNext->ppNext = pQNode->ppNext; pQNode->pNext->ppNext = pQNode->ppNext;
*pQNode->ppNext = pQNode->pNext; *pQNode->ppNext = pQNode->pNext;
pPool->nQuery--; pPool->nQuery--;
taosThreadMutexUnlock(&pPool->mutex); if (proactive) taosThreadMutexUnlock(&pPool->mutex);
_exit:
return code;
} }
int32_t vnodeBufPoolRecycle(SVBufPool *pPool) { int32_t vnodeBufPoolRecycle(SVBufPool *pPool) {
int32_t code = 0; int32_t code = 0;
bool canRecycle;
SVnode *pVnode = pPool->pVnode; SVnode *pVnode = pPool->pVnode;
vDebug("vgId:%d recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id); vDebug("vgId:%d recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id);
...@@ -317,15 +315,11 @@ int32_t vnodeBufPoolRecycle(SVBufPool *pPool) { ...@@ -317,15 +315,11 @@ int32_t vnodeBufPoolRecycle(SVBufPool *pPool) {
SQueryNode *pNode = pPool->qList.pNext; SQueryNode *pNode = pPool->qList.pNext;
while (pNode != &pPool->qList) { while (pNode != &pPool->qList) {
SQueryNode *pTNode = pNode->pNext;
int32_t rc = pNode->reseek(pNode->pQHandle); int32_t rc = pNode->reseek(pNode->pQHandle);
if (rc == 0) { if (rc == 0 || rc == TSDB_CODE_VND_QUERY_BUSY) {
SQueryNode *pTNode = pNode->pNext;
pNode->pNext->ppNext = pNode->ppNext;
*pNode->ppNext = pNode->pNext;
pPool->nQuery--;
pNode = pTNode; pNode = pTNode;
} else if (rc == TSDB_CODE_VND_QUERY_BUSY) {
pNode = pNode->pNext;
} else { } else {
taosThreadMutexUnlock(&pPool->mutex); taosThreadMutexUnlock(&pPool->mutex);
code = rc; code = rc;
...@@ -333,16 +327,8 @@ int32_t vnodeBufPoolRecycle(SVBufPool *pPool) { ...@@ -333,16 +327,8 @@ int32_t vnodeBufPoolRecycle(SVBufPool *pPool) {
} }
} }
canRecycle = (pPool->nQuery == 0);
taosThreadMutexUnlock(&pPool->mutex); taosThreadMutexUnlock(&pPool->mutex);
if (canRecycle) {
ASSERT(atomic_load_32(&pPool->nRef) == 0);
pVnode->onRecycle = NULL;
vnodeBufPoolAddToFreeList(pPool);
}
_exit: _exit:
return code; return code;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册