diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 48d02d275ea342cbd7ac2f782acc0f22a4e640fa..f802575c4ada92db2af51a2c6c3b226dd23d1ad8 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -206,8 +206,8 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); -void tsdbRefMemTable(SMemTable *pMemTable); -void tsdbUnrefMemTable(SMemTable *pMemTable); +int32_t tsdbRefMemTable(SMemTable *pMemTable, STsdbReader *pReader); +int32_t tsdbUnrefMemTable(SMemTable *pMemTable, STsdbReader *pReader); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); // STbDataIter int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); @@ -285,8 +285,8 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader); int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); // tsdbRead.c ============================================================================================== -int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap); -void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap); +int32_t tsdbTakeReadSnap(STsdbReader *pReader, STsdbReadSnap **ppSnap); +void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap); // tsdbMerge.c ============================================================================================== int32_t tsdbMerge(STsdb *pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 8da783a5bd82ccd23fe051e76396489cc6516058..ab4d81568922c01f482196760a26861b71bf7db4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -847,7 +847,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs tb_uid_t suid = getTableSuidByUid(uid, pTsdb); - tsdbTakeReadSnap(pTsdb, &pIter->pReadSnap); + tsdbTakeReadSnap(NULL /*pTsdb (todo)*/, &pIter->pReadSnap); STbData *pMem = NULL; if (pIter->pReadSnap->pMem) { @@ -941,7 +941,7 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) { taosArrayDestroy(pIter->pSkyline); } - tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap); + tsdbUntakeReadSnap(NULL /*pIter->pTsdb (todo)*/, pIter->pReadSnap); _err: return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index a619b9f2e4f827d72f2aad5fd752ae002ac2fc74..05c103deec99794fe11539c902c17e9c47152011 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -158,7 +158,7 @@ int32_t tsdbCommit(STsdb *pTsdb) { pTsdb->mem = NULL; taosThreadRwlockUnlock(&pTsdb->rwLock); - tsdbUnrefMemTable(pMemTable); + tsdbUnrefMemTable(pMemTable, NULL); goto _exit; } @@ -983,7 +983,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { // unlock taosThreadRwlockUnlock(&pTsdb->rwLock); - tsdbUnrefMemTable(pMemTable); + tsdbUnrefMemTable(pMemTable, NULL); tsdbFSDestroy(&pCommitter->fs); taosArrayDestroy(pCommitter->aTbDataP); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 0ed143aab92580170ed19bb25c9b048d99459721..7348b284ab75c01183a0c3d51933d601874f921d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -629,16 +629,32 @@ _err: int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } -void tsdbRefMemTable(SMemTable *pMemTable) { +int32_t tsdbRefMemTable(SMemTable *pMemTable, STsdbReader *pReader) { + int32_t code = 0; + int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); ASSERT(nRef > 0); + + // register handle (todo) + if (pReader) { + } + + return code; } -void tsdbUnrefMemTable(SMemTable *pMemTable) { +int32_t tsdbUnrefMemTable(SMemTable *pMemTable, STsdbReader *pReader) { + int32_t code = 0; + + // unregister handle (todo) + if (pReader) { + } + int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); if (nRef == 0) { tsdbMemTableDestroy(pMemTable); } + + return code; } static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 495d2b3070c58824b2231091f0892acc2818a3a0..bd8d665e94faac8f5baa530925e7bb3f02da9c41 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3354,7 +3354,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl goto _err; } - code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap); + code = tsdbTakeReadSnap(pReader, &pReader->pReadSnap); if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -3378,7 +3378,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl STsdbReader* pPrevReader = pReader->innerReader[0]; SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter; - code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap); + code = tsdbTakeReadSnap(pPrevReader, &pPrevReader->pReadSnap); if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -3435,7 +3435,7 @@ void tsdbReaderClose(STsdbReader* pReader) { tsdbDataFReaderClose(&pReader->pFileReader); } - tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap); + tsdbUntakeReadSnap(pReader, pReader->pReadSnap); taosMemoryFree(pReader->status.uidCheckInfo.tableUidList); SIOCostSummary* pCost = &pReader->cost; @@ -3858,8 +3858,10 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 return TSDB_CODE_SUCCESS; } -int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) { - int32_t code = 0; +int32_t tsdbTakeReadSnap(STsdbReader* pReader, STsdbReadSnap** ppSnap) { + int32_t code = 0; + STsdb* pTsdb = pReader->pTsdb; + SVersionRange* pRange = &pReader->verRange; // alloc *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap)); @@ -3876,15 +3878,14 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) { } // take snapshot - (*ppSnap)->pMem = pTsdb->mem; - (*ppSnap)->pIMem = pTsdb->imem; - - if ((*ppSnap)->pMem) { - tsdbRefMemTable((*ppSnap)->pMem); + if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) { + tsdbRefMemTable(pTsdb->mem, pReader); + (*ppSnap)->pMem = pTsdb->mem; } - if ((*ppSnap)->pIMem) { - tsdbRefMemTable((*ppSnap)->pIMem); + if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) { + tsdbRefMemTable(pTsdb->imem, pReader); + (*ppSnap)->pIMem = pTsdb->imem; } // fs @@ -3906,14 +3907,16 @@ _exit: return code; } -void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) { +void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { + STsdb* pTsdb = pReader->pTsdb; + if (pSnap) { if (pSnap->pMem) { - tsdbUnrefMemTable(pSnap->pMem); + tsdbUnrefMemTable(pSnap->pMem, pReader); } if (pSnap->pIMem) { - tsdbUnrefMemTable(pSnap->pIMem); + tsdbUnrefMemTable(pSnap->pIMem, pReader); } tsdbFSUnref(pTsdb, &pSnap->fs);