From 2e443c395eed0a198a92004ed0303086456a4f2d Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 9 Jan 2023 15:51:30 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/tsdb.h | 15 +--- source/dnode/vnode/src/inc/vnd.h | 5 ++ source/dnode/vnode/src/inc/vnodeInt.h | 11 +++ source/dnode/vnode/src/tsdb/tsdbMemTable.c | 83 ++++++++++------------ source/dnode/vnode/src/tsdb/tsdbRead.c | 2 +- source/dnode/vnode/src/vnd/vnodeBufPool.c | 47 ++++++++++++ 6 files changed, 105 insertions(+), 58 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a58301adf2..70dbadeeea 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -65,7 +65,6 @@ typedef struct SSmaInfo SSmaInfo; typedef struct SBlockCol SBlockCol; typedef struct SVersionRange SVersionRange; typedef struct SLDataIter SLDataIter; -typedef struct SQueryNode SQueryNode; typedef struct SDiskCol SDiskCol; typedef struct SDiskData SDiskData; typedef struct SDiskDataBuilder SDiskDataBuilder; @@ -209,12 +208,10 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in uint8_t **ppBuf); // tsdbMemTable ============================================================================================== // SMemTable -typedef int32_t (*_tsdb_reseek_func_t)(void *pQHandle); - int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); -int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode); +int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _query_reseek_func_t reseek, SQueryNode **ppNode); int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); // STbDataIter @@ -293,7 +290,7 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader); int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); // tsdbRead.c ============================================================================================== -int32_t tsdbTakeReadSnap(STsdbReader *pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap **ppSnap); +int32_t tsdbTakeReadSnap(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap); void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap); // tsdbMerge.c ============================================================================================== int32_t tsdbMerge(STsdb *pTsdb); @@ -369,13 +366,6 @@ struct STbData { STbData *next; }; -struct SQueryNode { - SQueryNode *pNext; - SQueryNode **ppNext; - void *pQHandle; - _tsdb_reseek_func_t reseek; -}; - struct SMemTable { SRWLatch latch; STsdb *pTsdb; @@ -392,7 +382,6 @@ struct SMemTable { int32_t nBucket; STbData **aBucket; }; - SQueryNode qList; }; struct TSDBROW { diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 32dfd5a41c..b900e02bfd 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -65,6 +65,11 @@ struct SVBufPool { SVBufPool* recycleNext; SVBufPool* recyclePrev; + // query handle list + TdThreadMutex mutex; + int32_t nQuery; + SQueryNode qList; + SVnode* pVnode; int32_t id; volatile int32_t nRef; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 3b751d506f..ba46d84667 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -76,6 +76,7 @@ typedef struct SRSmaSnapReader SRSmaSnapReader; typedef struct SRSmaSnapWriter SRSmaSnapWriter; typedef struct SSnapDataHdr SSnapDataHdr; typedef struct SCommitInfo SCommitInfo; +typedef struct SQueryNode SQueryNode; #define VNODE_META_DIR "meta" #define VNODE_TSDB_DIR "tsdb" @@ -92,6 +93,13 @@ typedef struct SCommitInfo SCommitInfo; #define VND_INFO_FNAME "vnode.json" // vnd.h +typedef int32_t (*_query_reseek_func_t)(void* pQHandle); +struct SQueryNode { + SQueryNode* pNext; + SQueryNode** ppNext; + void* pQHandle; + _query_reseek_func_t reseek; +}; void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); void* vnodeBufPoolMallocAligned(SVBufPool* pPool, int size); @@ -100,6 +108,9 @@ void vnodeBufPoolRef(SVBufPool* pPool); void vnodeBufPoolUnRef(SVBufPool* pPool); int vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo); +int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, void* pQHandle, _query_reseek_func_t reseekFn); +int32_t vnodeBufPoolDeregisterQuery(SVBufPool* pPool); + // meta typedef struct SMCtbCursor SMCtbCursor; typedef struct SMStbCursor SMStbCursor; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 6bbaa31ed3..0b762cca1e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -64,8 +64,8 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { taosMemoryFree(pMemTable); goto _err; } - pMemTable->qList.pNext = &pMemTable->qList; - pMemTable->qList.ppNext = &pMemTable->qList.pNext; + // pMemTable->qList.pNext = &pMemTable->qList; + // pMemTable->qList.ppNext = &pMemTable->qList.pNext; vnodeBufPoolRef(pMemTable->pPool); *ppMemTable = pMemTable; @@ -749,39 +749,34 @@ _exit: int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } -int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode) { +int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _query_reseek_func_t reseek, SQueryNode **ppNode) { int32_t code = 0; int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); ASSERT(nRef > 0); - /* - // register handle (todo: take concurrency in consideration) - *ppNode = taosMemoryMalloc(sizeof(SQueryNode)); - if (*ppNode == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - (*ppNode)->pQHandle = pQHandle; - (*ppNode)->reseek = reseek; - (*ppNode)->pNext = pMemTable->qList.pNext; - (*ppNode)->ppNext = &pMemTable->qList.pNext; - pMemTable->qList.pNext->ppNext = &(*ppNode)->pNext; - pMemTable->qList.pNext = *ppNode; - */ + + vnodeBufPoolRegisterQuery(pMemTable->pPool, pQHandle, reseek); + _exit: return code; } int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode) { int32_t code = 0; - /* + + if (pNode) { + vnodeBufPoolDeregisterQuery(pMemTable->pPool); + } + +#if 0 // unregister handle (todo: take concurrency in consideration) if (pNode) { pNode->pNext->ppNext = pNode->ppNext; *pNode->ppNext = pNode->pNext; taosMemoryFree(pNode); } - */ +#endif + int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); if (nRef == 0) { tsdbMemTableDestroy(pMemTable); @@ -828,28 +823,28 @@ _exit: return aTbDataP; } -int32_t tsdbRecycleMemTable(SMemTable *pMemTable) { - int32_t code = 0; - - SQueryNode *pNode = pMemTable->qList.pNext; - while (1) { - ASSERT(pNode != &pMemTable->qList); - SQueryNode *pNextNode = pNode->pNext; - - if (pNextNode == &pMemTable->qList) { - code = (*pNode->reseek)(pNode->pQHandle); - if (code) goto _exit; - break; - } else { - code = (*pNode->reseek)(pNode->pQHandle); - if (code) goto _exit; - pNode = pMemTable->qList.pNext; - ASSERT(pNode == pNextNode); - } - } - - // NOTE: Take care here, pMemTable is destroyed - -_exit: - return code; -} +// int32_t tsdbRecycleMemTable(SMemTable *pMemTable) { +// int32_t code = 0; + +// SQueryNode *pNode = pMemTable->qList.pNext; +// while (1) { +// ASSERT(pNode != &pMemTable->qList); +// SQueryNode *pNextNode = pNode->pNext; + +// if (pNextNode == &pMemTable->qList) { +// code = (*pNode->reseek)(pNode->pQHandle); +// if (code) goto _exit; +// break; +// } else { +// code = (*pNode->reseek)(pNode->pQHandle); +// if (code) goto _exit; +// pNode = pMemTable->qList.pNext; +// ASSERT(pNode == pNextNode); +// } +// } + +// // NOTE: Take care here, pMemTable is destroyed + +// _exit: +// return code; +// } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 87b5e59bc0..b52c75f158 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4614,7 +4614,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 return TSDB_CODE_SUCCESS; } -int32_t tsdbTakeReadSnap(STsdbReader* pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap** ppSnap) { +int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsdbReadSnap** ppSnap) { int32_t code = 0; STsdb* pTsdb = pReader->pTsdb; SVersionRange* pRange = &pReader->verRange; diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 3a02a4a0bf..e519c69d44 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -25,6 +25,13 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPoo return -1; } memset(pPool, 0, sizeof(SVBufPool)); + + // query handle list + taosThreadMutexInit(&pPool->mutex, NULL); + pPool->nQuery = 0; + pPool->qList.pNext = &pPool->qList; + pPool->qList.ppNext = &pPool->qList.pNext; + pPool->pVnode = pVnode; pPool->id = id; pPool->ptr = pPool->node.data; @@ -60,6 +67,7 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) { taosThreadSpinDestroy(pPool->lock); taosMemoryFree((void *)pPool->lock); } + taosThreadMutexDestroy(&pPool->mutex); taosMemoryFree(pPool); return 0; } @@ -97,6 +105,7 @@ int vnodeCloseBufPool(SVnode *pVnode) { } void vnodeBufPoolReset(SVBufPool *pPool) { + ASSERT(pPool->nQuery == 0); for (SVBufPoolNode *pNode = pPool->pTail; pNode->prev; pNode = pPool->pTail) { ASSERT(pNode->pnext == &pPool->pTail); pNode->prev->pnext = &pPool->pTail; @@ -255,3 +264,41 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { taosThreadMutexUnlock(&pVnode->mutex); } + +int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, void *pQHandle, _query_reseek_func_t reseekFn) { + int32_t code = 0; + + SQueryNode *pQNode = taosMemoryMalloc(sizeof(*pQNode)); + if (pQNode == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + pQNode->pQHandle = pQHandle; + pQNode->reseek = reseekFn; + + taosThreadMutexLock(&pPool->mutex); + + pQNode->pNext = pPool->qList.pNext; + pQNode->ppNext = &pPool->qList.pNext; + pPool->qList.pNext->ppNext = &pQNode->pNext; + pPool->qList.pNext = pQNode; + pPool->nQuery++; + + taosThreadMutexUnlock(&pPool->mutex); + +_exit: + return code; +} + +int32_t vnodeBufPoolDeregisterQuery(SVBufPool *pPool) { + int32_t code = 0; + + taosThreadMutexLock(&pPool->mutex); + + ASSERT(0); + + taosThreadMutexUnlock(&pPool->mutex); + +_exit: + return code; +} \ No newline at end of file -- GitLab