提交 ad9e8a9a 编写于 作者: H Hongze Cheng

more code

上级 f8c9ae50
......@@ -415,6 +415,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_NO_AVAIL_BUFPOOL TAOS_DEF_ERROR_CODE(0, 0x0528)
#define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529)
#define TSDB_CODE_VND_DUP_REQUEST TAOS_DEF_ERROR_CODE(0, 0x0530)
#define TSDB_CODE_VND_QUERY_BUSY TAOS_DEF_ERROR_CODE(0, 0x0531)
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
......
......@@ -211,7 +211,7 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
void tsdbMemTableDestroy(SMemTable *pMemTable);
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid);
int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _query_reseek_func_t reseek, SQueryNode **ppNode);
int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode);
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode);
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
// STbDataIter
......
......@@ -108,7 +108,7 @@ void vnodeBufPoolRef(SVBufPool* pPool);
void vnodeBufPoolUnRef(SVBufPool* pPool);
int vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo);
int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, void* pQHandle, _query_reseek_func_t reseekFn);
int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, SQueryNode* pQNode);
int32_t vnodeBufPoolDeregisterQuery(SVBufPool* pPool, SQueryNode* pQNode);
// meta
......
......@@ -749,13 +749,13 @@ _exit:
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _query_reseek_func_t reseek, SQueryNode **ppNode) {
int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode) {
int32_t code = 0;
int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
ASSERT(nRef > 0);
vnodeBufPoolRegisterQuery(pMemTable->pPool, pQHandle, reseek);
vnodeBufPoolRegisterQuery(pMemTable->pPool, pQNode);
_exit:
return code;
......
......@@ -4620,46 +4620,67 @@ int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsd
SVersionRange* pRange = &pReader->verRange;
// alloc
*ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
if (*ppSnap == NULL) {
STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(*pSnap));
if (pSnap == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
// lock
code = taosThreadRwlockRdlock(&pTsdb->rwLock);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
goto _exit;
}
taosThreadRwlockRdlock(&pTsdb->rwLock);
// take snapshot
if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) {
tsdbRefMemTable(pTsdb->mem, pReader, reseek, &(*ppSnap)->pNode);
(*ppSnap)->pMem = pTsdb->mem;
pSnap->pMem = pTsdb->mem;
pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode));
if (pSnap->pNode == NULL) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pSnap->pNode->pQHandle = pReader;
pSnap->pNode->reseek = reseek;
tsdbRefMemTable(pTsdb->mem, pSnap->pNode);
}
if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) {
tsdbRefMemTable(pTsdb->imem, pReader, reseek, &(*ppSnap)->pINode);
(*ppSnap)->pIMem = pTsdb->imem;
pSnap->pIMem = pTsdb->imem;
pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode));
if (pSnap->pINode == NULL) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pSnap->pINode->pQHandle = pReader;
pSnap->pINode->reseek = reseek;
tsdbRefMemTable(pTsdb->imem, pSnap->pINode);
}
// fs
code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
code = tsdbFSRef(pTsdb, &pSnap->fs);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _exit;
}
// unlock
code = taosThreadRwlockUnlock(&pTsdb->rwLock);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
goto _exit;
}
taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
_exit:
if (code) {
*ppSnap = NULL;
if (pSnap) {
if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
taosMemoryFree(pSnap);
}
} else {
*ppSnap = pSnap;
}
return code;
}
......@@ -4676,6 +4697,8 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) {
}
tsdbFSUnref(pTsdb, &pSnap->fs);
if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
taosMemoryFree(pSnap);
}
tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
......
......@@ -215,12 +215,12 @@ void vnodeBufPoolRef(SVBufPool *pPool) {
void vnodeBufPoolUnRef(SVBufPool *pPool) {
if (pPool == NULL) return;
if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) return;
SVnode *pVnode = pPool->pVnode;
taosThreadMutexLock(&pVnode->mutex);
if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit;
// remove from recycle list or on-recycle position
if (pVnode->onRecycle == pPool) {
pVnode->onRecycle = NULL;
......@@ -262,20 +262,14 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) {
pVnode->freeList = pPool;
taosThreadCondSignal(&pVnode->poolNotEmpty);
_exit:
taosThreadMutexUnlock(&pVnode->mutex);
return;
}
int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, void *pQHandle, _query_reseek_func_t reseekFn) {
int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) {
int32_t code = 0;
SQueryNode *pQNode = taosMemoryMalloc(sizeof(*pQNode));
if (pQNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pQNode->pQHandle = pQHandle;
pQNode->reseek = reseekFn;
taosThreadMutexLock(&pPool->mutex);
pQNode->pNext = pPool->qList.pNext;
......
......@@ -26,43 +26,57 @@ static int vnodeCommitImpl(SCommitInfo *pInfo);
static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) {
int32_t code = 0;
if (pVnode->onRecycle) {
vDebug("vgId:%d buffer pool %p of id %d is on recycling", TD_VID(pVnode), pVnode->onRecycle, pVnode->onRecycle->id);
goto _exit;
}
if (pVnode->recycleHead) {
vDebug("vgId:%d buffer pool %p of id %d on recycle list, try to recycle", TD_VID(pVnode), pVnode->recycleHead,
pVnode->recycleHead->id);
// pop the header buffer pool for recycling
pVnode->onRecycle = pVnode->recycleHead;
if (pVnode->recycleHead == pVnode->recycleTail) {
pVnode->recycleHead = pVnode->recycleTail = NULL;
if (pVnode->onRecycle == NULL) {
if (pVnode->recycleHead == NULL) {
vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode));
goto _exit;
} else {
pVnode->recycleHead = pVnode->recycleHead->recycleNext;
pVnode->recycleHead->recyclePrev = NULL;
vDebug("vgId:%d buffer pool %p of id %d on recycle list, try to recycle", TD_VID(pVnode), pVnode->recycleHead,
pVnode->recycleHead->id);
pVnode->onRecycle = pVnode->recycleHead;
if (pVnode->recycleHead == pVnode->recycleTail) {
pVnode->recycleHead = pVnode->recycleTail = NULL;
} else {
pVnode->recycleHead = pVnode->recycleHead->recycleNext;
pVnode->recycleHead->recyclePrev = NULL;
}
pVnode->onRecycle->recycleNext = pVnode->onRecycle->recyclePrev = NULL;
}
pVnode->onRecycle->recycleNext = pVnode->onRecycle->recyclePrev = NULL;
}
// do recycle the buffer pool
SVBufPool *pPool = pVnode->onRecycle;
// do recycle the buffer pool
SVBufPool *pPool = pVnode->onRecycle;
vDebug("vgId:%d buffer pool %p of id %d on recycle", TD_VID(pVnode), pPool, pPool->id);
taosThreadMutexLock(&pPool->mutex);
taosThreadMutexLock(&pPool->mutex);
SQueryNode *pNode = pPool->qList.pNext;
while (pNode != &pPool->qList) {
// TODO: refact/finish here
pNode->reseek(pNode->pQHandle);
SQueryNode *pNode = pPool->qList.pNext;
while (pNode != &pPool->qList) {
int32_t rc = pNode->reseek(pNode->pQHandle);
if (rc == 0) {
SQueryNode *pTNode = pNode->pNext;
pNode->pNext->ppNext = pNode->ppNext;
*pNode->ppNext = pNode->pNext;
pPool->nQuery--;
pNode = pTNode;
} else if (rc == TSDB_CODE_VND_QUERY_BUSY) {
pNode = pNode->pNext;
} else {
taosThreadMutexUnlock(&pPool->mutex);
code = rc;
goto _exit;
}
taosThreadMutexUnlock(&pPool->mutex);
} else {
vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode));
}
taosThreadMutexUnlock(&pPool->mutex);
// TODO: if (pPool->nQuery == 0) add to free list
_exit:
if (code) {
vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code));
}
return code;
}
int vnodeBegin(SVnode *pVnode) {
......@@ -267,6 +281,12 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
tsem_wait(&pVnode->canCommit);
taosThreadMutexLock(&pVnode->mutex);
ASSERT(pVnode->onCommit == NULL);
pVnode->onCommit = pVnode->inUse;
pVnode->inUse = NULL;
taosThreadMutexUnlock(&pVnode->mutex);
pVnode->state.commitTerm = pVnode->state.applyTerm;
pInfo->info.config = pVnode->config;
......@@ -296,12 +316,6 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
code = smaPrepareAsyncCommit(pVnode->pSma);
if (code) goto _exit;
taosThreadMutexLock(&pVnode->mutex);
ASSERT(pVnode->onCommit == NULL);
pVnode->onCommit = pVnode->inUse;
pVnode->inUse = NULL;
taosThreadMutexUnlock(&pVnode->mutex);
_exit:
if (code) {
vError("vgId:%d, %s failed at line %d since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, lino,
......@@ -323,12 +337,12 @@ static int32_t vnodeCommitTask(void *arg) {
if (code) goto _exit;
// recycle buffer pool
SVBufPool *pPool = pVnode->onCommit;
taosThreadMutexLock(&pVnode->mutex);
SVBufPool *pPool = pVnode->onCommit;
int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1);
pVnode->onCommit = NULL;
int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1);
if (nRef == 0) {
// add to free list
vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id);
......@@ -337,7 +351,7 @@ static int32_t vnodeCommitTask(void *arg) {
pPool->freeNext = pVnode->freeList;
pVnode->freeList = pPool;
taosThreadCondSignal(&pVnode->poolNotEmpty);
} else {
} else if (nRef > 0) {
// add to recycle list
vDebug("vgId:%d buffer pool %p of id %d is added to recycle list", TD_VID(pVnode), pPool, pPool->id);
......@@ -350,6 +364,8 @@ static int32_t vnodeCommitTask(void *arg) {
pVnode->recycleTail->recycleNext = pPool;
pVnode->recycleTail = pPool;
}
} else {
ASSERT(0);
}
taosThreadMutexUnlock(&pVnode->mutex);
......
......@@ -320,6 +320,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subsc
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_AVAIL_BUFPOOL, "No availabe buffer pool")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_STOPPED, "Vnode stopped")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_DUP_REQUEST, "Duplicate write request")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_QUERY_BUSY, "Query busy")
// tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册