diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ce0d588df73bcb923ece6e1e7859baf387657b6f..4c513371f53e2a98f94a19fdf7a3436f5aa2b43a 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -207,10 +207,12 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in uint8_t **ppBuf); // tsdbMemTable ============================================================================================== // SMemTable +typedef int32_t (*_tsdb_reseek_func_t)(void *pQHandle); + int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); -int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, SQueryNode **ppNode); +int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode); int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); // STbDataIter @@ -290,7 +292,7 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader); int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); // tsdbRead.c ============================================================================================== -int32_t tsdbTakeReadSnap(STsdbReader *pReader, STsdbReadSnap **ppSnap); +int32_t tsdbTakeReadSnap(STsdbReader *pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap **ppSnap); void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap); // tsdbMerge.c ============================================================================================== int32_t tsdbMerge(STsdb *pTsdb); @@ -362,9 +364,10 @@ struct STbData { }; struct SQueryNode { - SQueryNode *pNext; - SQueryNode **ppNext; - void *pQHandle; + SQueryNode *pNext; + SQueryNode **ppNext; + void *pQHandle; + _tsdb_reseek_func_t reseek; }; struct SMemTable { diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 41c03f1c0dcb2abf0b1bf8bd7032f0f1950f3fe6..86cc00568e274b39cd83dab1492a475286f63eff 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -194,6 +194,12 @@ static void freeItem(void* pItem) { } } +static int32_t tsdbCacheQueryReseek(void* pQHandle) { + int32_t code = 0; + + return code; +} + int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) { if (pReader == NULL || pResBlock == NULL) { return TSDB_CODE_INVALID_PARA; @@ -237,7 +243,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 taosArrayPush(pLastCols, &p); } - tsdbTakeReadSnap(NULL, &pr->pReadSnap); + tsdbTakeReadSnap(NULL, tsdbCacheQueryReseek, &pr->pReadSnap); pr->pDataFReader = NULL; pr->pDataFReaderLast = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index f1857b1047456dda376f02182d5ed9cadd4a6ecb..04939959fa83439c866fa95bd9a354601d54ab8b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -636,7 +636,7 @@ _err: int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } -int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, SQueryNode **ppNode) { +int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode) { int32_t code = 0; int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); @@ -649,6 +649,7 @@ int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, SQueryNode **ppNod goto _exit; } (*ppNode)->pQHandle = pQHandle; + (*ppNode)->reseek = reseek; (*ppNode)->pNext = pMemTable->qList.pNext; (*ppNode)->ppNext = &pMemTable->qList.pNext; pMemTable->qList.pNext->ppNext = &(*ppNode)->pNext; @@ -714,8 +715,6 @@ _exit: return aTbDataP; } -extern int32_t tsdbSetQueryReseek(void *pQHandle); - int32_t tsdbRecycleMemTable(SMemTable *pMemTable) { int32_t code = 0; @@ -725,11 +724,11 @@ int32_t tsdbRecycleMemTable(SMemTable *pMemTable) { SQueryNode *pNextNode = pNode->pNext; if (pNextNode == &pMemTable->qList) { - code = tsdbSetQueryReseek(pNode->pQHandle); + code = (*pNode->reseek)(pNode->pQHandle); if (code) goto _exit; break; } else { - code = tsdbSetQueryReseek(pNode->pQHandle); + code = (*pNode->reseek)(pNode->pQHandle); if (code) goto _exit; pNode = pMemTable->qList.pNext; ASSERT(pNode == pNextNode); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 3f276e5b33765bab3f870d20651671b889a43ce7..4808956693904d6cbf4b9f8e0e7b3813dee2eba7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3489,6 +3489,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { } // ====================================== EXPOSED APIs ====================================== +static int32_t tsdbSetQueryReseek(void* pQHandle); + int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader, const char* idstr) { STimeWindow window = pCond->twindows; @@ -3570,7 +3572,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl } if (numOfTables > 0) { - code = tsdbTakeReadSnap(pReader, &pReader->pReadSnap); + code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap); if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -4106,7 +4108,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 return TSDB_CODE_SUCCESS; } -int32_t tsdbTakeReadSnap(STsdbReader* pReader, STsdbReadSnap** ppSnap) { +int32_t tsdbTakeReadSnap(STsdbReader* pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap** ppSnap) { int32_t code = 0; STsdb* pTsdb = pReader->pTsdb; SVersionRange* pRange = &pReader->verRange; @@ -4127,12 +4129,12 @@ int32_t tsdbTakeReadSnap(STsdbReader* pReader, STsdbReadSnap** ppSnap) { // take snapshot if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) { - tsdbRefMemTable(pTsdb->mem, pReader, &(*ppSnap)->pNode); + tsdbRefMemTable(pTsdb->mem, pReader, reseek, &(*ppSnap)->pNode); (*ppSnap)->pMem = pTsdb->mem; } if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) { - tsdbRefMemTable(pTsdb->imem, pReader, &(*ppSnap)->pINode); + tsdbRefMemTable(pTsdb->imem, pReader, reseek, &(*ppSnap)->pINode); (*ppSnap)->pIMem = pTsdb->imem; } @@ -4173,7 +4175,7 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode)); } -int32_t tsdbSetQueryReseek(void* pQHandle) { +static int32_t tsdbSetQueryReseek(void* pQHandle) { int32_t code = 0; // lock handle