diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index f802575c4ada92db2af51a2c6c3b226dd23d1ad8..3d4cad4bd1353cd73191d1e8f8d4d2c8ccac05c7 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -65,6 +65,7 @@ typedef struct SSmaInfo SSmaInfo; typedef struct SBlockCol SBlockCol; typedef struct SVersionRange SVersionRange; typedef struct SLDataIter SLDataIter; +typedef struct SQueryNode SQueryNode; #define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) #define TSDB_MAX_SUBBLOCKS 8 @@ -206,8 +207,8 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); -int32_t tsdbRefMemTable(SMemTable *pMemTable, STsdbReader *pReader); -int32_t tsdbUnrefMemTable(SMemTable *pMemTable, STsdbReader *pReader); +int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQueryHandle, SQueryNode **ppNode); +int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); // STbDataIter int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); @@ -365,6 +366,12 @@ struct STbData { STbData *next; }; +struct SQueryNode { + SQueryNode *pNext; + SQueryNode **ppNext; + void *pQueryHandle; +}; + struct SMemTable { SRWLatch latch; STsdb *pTsdb; @@ -381,7 +388,7 @@ struct SMemTable { int32_t nBucket; STbData **aBucket; }; - STsdbReader *pReaderList; + SQueryNode *qList; }; struct TSDBROW { @@ -592,9 +599,11 @@ struct SDelFWriter { }; struct STsdbReadSnap { - SMemTable *pMem; - SMemTable *pIMem; - STsdbFS fs; + SMemTable *pMem; + SQueryNode *pNode; + SMemTable *pIMem; + SQueryNode *pINode; + STsdbFS fs; }; struct SDataFWriter { diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 7348b284ab75c01183a0c3d51933d601874f921d..a78021d69737d98073f3e8c2f24265bfa00d693b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -629,24 +629,35 @@ _err: int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } -int32_t tsdbRefMemTable(SMemTable *pMemTable, STsdbReader *pReader) { +int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQueryHandle, SQueryNode **ppNode) { int32_t code = 0; int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); ASSERT(nRef > 0); - // register handle (todo) - if (pReader) { + // register handle + *ppNode = taosMemoryMalloc(sizeof(SQueryNode)); + if (*ppNode == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; } + (*ppNode)->pQueryHandle = pQueryHandle; + (*ppNode)->pNext = pMemTable->qList; + (*ppNode)->ppNext = &pMemTable->qList; + pMemTable->qList = *ppNode; +_exit: return code; } -int32_t tsdbUnrefMemTable(SMemTable *pMemTable, STsdbReader *pReader) { +int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode) { int32_t code = 0; - // unregister handle (todo) - if (pReader) { + // unregister handle + if (pNode) { + pNode->pNext->ppNext = pNode->ppNext; + *pNode->ppNext = pNode->pNext; + taosMemoryFree(pNode); } int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index bd8d665e94faac8f5baa530925e7bb3f02da9c41..c0e4b4cd5016a8976e0b1de9f2b07aeb26ed9327 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3879,12 +3879,12 @@ int32_t tsdbTakeReadSnap(STsdbReader* pReader, STsdbReadSnap** ppSnap) { // take snapshot if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) { - tsdbRefMemTable(pTsdb->mem, pReader); + tsdbRefMemTable(pTsdb->mem, pReader, &(*ppSnap)->pNode); (*ppSnap)->pMem = pTsdb->mem; } if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) { - tsdbRefMemTable(pTsdb->imem, pReader); + tsdbRefMemTable(pTsdb->imem, pReader, &(*ppSnap)->pINode); (*ppSnap)->pIMem = pTsdb->imem; } @@ -3912,11 +3912,11 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { if (pSnap) { if (pSnap->pMem) { - tsdbUnrefMemTable(pSnap->pMem, pReader); + tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode); } if (pSnap->pIMem) { - tsdbUnrefMemTable(pSnap->pIMem, pReader); + tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode); } tsdbFSUnref(pTsdb, &pSnap->fs);