diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index f51d0d2fd68460738e4e6e2e4e147a77d378e134..3a54bd61273e299ee8f9c2783818c55300114545 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -707,7 +707,8 @@ typedef struct SSttBlockLoadInfo { SArray *aSttBlk; int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. int32_t currentLoadBlockIndex; - LRUHandle *blockDataHandle; + SLRUCache* pBlockDataCache; + LRUHandle *blockDataHandle[2]; int32_t loadBlocks; double elapsedTime; STSchema *pSchema; @@ -800,6 +801,8 @@ bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree); SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfStt); +void setLastBlockLoadInfoCache(SSttBlockLoadInfo* pLoadInfo, SLRUCache* pBlockDataCache); +void releaseLastBlockLoadInfoCacheHandle(SSttBlockLoadInfo *pLoadInfo); void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el); void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index f0f34fa9cd98fc95227314ba59f056d57e0420ef..e9b2b4c357a066516c1ee010330e5ee1a66e35a0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1214,13 +1214,20 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { taosLRUCacheSetStrictCapacity(pTsdb->sttBlockCache, false); pTsdb->sttBlkCache = taosLRUCacheInit(4 * 1024 * 1024, -1, 0.0); + taosLRUCacheSetStrictCapacity(pTsdb->sttBlkCache, false); + _err: pTsdb->lruCache = pCache; return code; } void tsdbCloseCache(STsdb *pTsdb) { + taosLRUCacheEraseUnrefEntries(pTsdb->sttBlockCache); taosLRUCacheCleanup(pTsdb->sttBlockCache); + + taosLRUCacheEraseUnrefEntries(pTsdb->sttBlkCache); + taosLRUCacheCleanup(pTsdb->sttBlkCache); + SLRUCache *pCache = pTsdb->lruCache; if (pCache) { taosLRUCacheEraseUnrefEntries(pCache); diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index d9535dc2725914aa723635c21771bd99fae61c06..2e9d2d50480283149deb720a3a2a1ccad1223e60 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -45,11 +45,33 @@ SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, pLoadInfo[i].pSchema = pSchema; pLoadInfo[i].colIds = colList; pLoadInfo[i].numOfCols = numOfCols; + pLoadInfo[i].blockDataHandle[0] = NULL; + pLoadInfo[i].blockDataHandle[1] = NULL; } return pLoadInfo; } +void setLastBlockLoadInfoCache(SSttBlockLoadInfo *pLoadInfo, SLRUCache *pBlockDataCache) { + for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) { + pLoadInfo[i].pBlockDataCache = pBlockDataCache; + } +} + +void releaseLastBlockLoadInfoCacheHandle(SSttBlockLoadInfo *pLoadInfo) { + if (pLoadInfo == NULL) return; + for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) { + if (pLoadInfo[i].pBlockDataCache && pLoadInfo[i].blockDataHandle[0]) { + taosLRUCacheRelease(pLoadInfo[i].pBlockDataCache, pLoadInfo[i].blockDataHandle[0], false); + pLoadInfo[i].blockDataHandle[0] = NULL; + } + if (pLoadInfo[i].pBlockDataCache && pLoadInfo[i].blockDataHandle[1]) { + taosLRUCacheRelease(pLoadInfo[i].pBlockDataCache, pLoadInfo[i].blockDataHandle[1], false); + pLoadInfo[i].blockDataHandle[1] = NULL; + } + } +} + void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) { pLoadInfo[i].currentLoadBlockIndex = 1; @@ -61,6 +83,8 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { pLoadInfo[i].elapsedTime = 0; pLoadInfo[i].loadBlocks = 0; pLoadInfo[i].sttBlockLoaded = false; + pLoadInfo[i].blockDataHandle[0] = NULL; + pLoadInfo[i].blockDataHandle[1] = NULL; } } @@ -80,7 +104,14 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { pLoadInfo[i].currentLoadBlockIndex = 1; pLoadInfo[i].blockIndex[0] = -1; pLoadInfo[i].blockIndex[1] = -1; - + if (pLoadInfo[i].pBlockDataCache && pLoadInfo[i].blockDataHandle[0]) { + taosLRUCacheRelease(pLoadInfo[i].pBlockDataCache, pLoadInfo[i].blockDataHandle[0], false); + pLoadInfo[i].blockDataHandle[0] = NULL; + } + if (pLoadInfo[i].pBlockDataCache && pLoadInfo[i].blockDataHandle[1]) { + taosLRUCacheRelease(pLoadInfo[i].pBlockDataCache, pLoadInfo[i].blockDataHandle[1], false); + pLoadInfo[i].blockDataHandle[1] = NULL; + } tBlockDataDestroy(&pLoadInfo[i].blockData[0]); tBlockDataDestroy(&pLoadInfo[i].blockData[1]); @@ -91,70 +122,23 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { return NULL; } -typedef struct SSttBlockDataCacheKey{ - int32_t fid; - int32_t iStt; - int64_t cid; - int64_t offset; +typedef struct SSttBlockDataCacheKey { + int32_t fid; + int32_t iStt; + int64_t cid; + int64_t offset; } SSttBlockDataCacheKey; static void deleteSttBlockDataCache(const void *key, size_t keyLen, void *value, void *ud) { - SBlockData* pBlockData = value; + SBlockData *pBlockData = value; tBlockDataDestroy(pBlockData); + taosMemoryFree(pBlockData); } static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { + int32_t code = 0; - // (pReader->pSet->fid, iStt, pReader->pSet->pHeadF->commitID) -> aSttBlk global cache - - // (pIter->pReader->pSet->fid, pIter->iStt, pIter->pReader->pSet->pHeadF->commitID, pIter->pSttBlk->bInfo.offset) -> SBlockData SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo; - if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) { - pInfo->blockDataHandle = NULL; - return NULL; - } - SSttBlockDataCacheKey key = {.fid = pIter->pReader->pSet->fid, .iStt = pIter->iStt, .cid = pIter->pReader->pSet->pHeadF->commitID, .offset = pIter->pSttBlk->bInfo.offset}; - int32_t code = 0; - - LRUHandle* h = taosLRUCacheLookup(pIter->pReader->pTsdb->sttBlockCache, &key, sizeof(struct SSttBlockDataCacheKey)); - bool bFound = false; - if (!h) { - int64_t st = taosGetTimestampUs(); - - SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo; - - SBlockData* pBlockData = taosMemoryMalloc(sizeof(SBlockData)); - - TABLEID id = {0}; - if (pIter->pSttBlk->suid != 0) { - id.suid = pIter->pSttBlk->suid; - } else { - id.uid = pIter->uid; - } - - tBlockDataCreate(pBlockData); - tBlockDataInit(pBlockData, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols); - tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData); - double el = (taosGetTimestampUs() - st) / 1000.0; - - tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64 - ", last file index:%d, last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s", - pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlockData->nRow, - pBlockData, el, idStr); - - int charge = 1 * 1024 * 1024; //TODO - taosLRUCacheInsert(pIter->pReader->pTsdb->sttBlockCache, &key, sizeof(key), pBlockData, charge, deleteSttBlockDataCache, &h, TAOS_LRU_PRIORITY_LOW, NULL); - pInfo->elapsedTime += el; - pInfo->loadBlocks += 1; - } else { - tsdbDebug("use global cached last block, block index:%d, file index:%d, block data offset: %"PRId64 " due to uid:%" PRIu64 ", load data, %s", - pIter->iSttBlk, pIter->iStt, pIter->pSttBlk->bInfo.offset, pIter->uid, idStr); - bFound = true; - } - - SBlockData* pBlockData = taosLRUCacheValue(pIter->pReader->pTsdb->sttBlockCache, h); - pInfo->blockDataHandle = h; - return pBlockData; if (pInfo->blockIndex[0] == pIter->iSttBlk) { if (pInfo->currentLoadBlockIndex != 0) { @@ -162,7 +146,8 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { pIter->iSttBlk, pIter->iStt, pIter->uid, idStr); pInfo->currentLoadBlockIndex = 0; } - return &pInfo->blockData[0]; + SBlockData *pBlockData = taosLRUCacheValue(pInfo->pBlockDataCache, pInfo->blockDataHandle[0]); + return pBlockData; } if (pInfo->blockIndex[1] == pIter->iSttBlk) { @@ -171,7 +156,8 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { pIter->iSttBlk, pIter->iStt, pIter->uid, idStr); pInfo->currentLoadBlockIndex = 1; } - return &pInfo->blockData[1]; + SBlockData *pBlockData = taosLRUCacheValue(pInfo->pBlockDataCache, pInfo->blockDataHandle[1]); + return pBlockData; } if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) { @@ -182,40 +168,57 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { pInfo->currentLoadBlockIndex ^= 1; int64_t st = taosGetTimestampUs(); - SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex]; + SSttBlockDataCacheKey key = {.fid = pIter->pReader->pSet->fid, + .iStt = pIter->iStt, + .cid = pIter->pReader->pSet->pHeadF->commitID, + .offset = pIter->pSttBlk->bInfo.offset}; - TABLEID id = {0}; - if (pIter->pSttBlk->suid != 0) { - id.suid = pIter->pSttBlk->suid; - } else { - id.uid = pIter->uid; - } + LRUHandle *h = taosLRUCacheLookup(pInfo->pBlockDataCache, &key, sizeof(struct SSttBlockDataCacheKey)); + if (!h) { + int64_t st = taosGetTimestampUs(); - code = tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols); - if (code != TSDB_CODE_SUCCESS) { - goto _exit; - } - code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock); - if (code != TSDB_CODE_SUCCESS) { - goto _exit; - } + SBlockData *pBlockData = taosMemoryMalloc(sizeof(SBlockData)); + + TABLEID id = {0}; + if (pIter->pSttBlk->suid != 0) { + id.suid = pIter->pSttBlk->suid; + } else { + id.uid = pIter->uid; + } + + tBlockDataCreate(pBlockData); + tBlockDataInit(pBlockData, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols); + tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData); + double el = (taosGetTimestampUs() - st) / 1000.0; - double el = (taosGetTimestampUs() - st) / 1000.0; - pInfo->elapsedTime += el; - pInfo->loadBlocks += 1; + tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64 + ", last file index:%d, block data offset: %" PRId64 + ", last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s", + pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->pSttBlk->bInfo.offset, pIter->iSttBlk, + pInfo->currentLoadBlockIndex, pBlockData->nRow, pBlockData, el, idStr); + + int charge = 1 * 1024 * 1024; // TODO + taosLRUCacheInsert(pIter->pReader->pTsdb->sttBlockCache, &key, sizeof(key), pBlockData, charge, + deleteSttBlockDataCache, &h, TAOS_LRU_PRIORITY_LOW, NULL); + pInfo->elapsedTime += el; + pInfo->loadBlocks += 1; + } else { + tsdbDebug("use global cached last block, block index:%d, file index:%d, block data offset: %" PRId64 + " due to uid:%" PRIu64 ", load data, %s", + pIter->iSttBlk, pIter->iStt, pIter->pSttBlk->bInfo.offset, pIter->uid, idStr); + } - tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64 - ", last file index:%d, last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s", - pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow, - pBlock, el, idStr); + SBlockData *pBlockData = taosLRUCacheValue(pIter->pReader->pTsdb->sttBlockCache, h); + taosLRUCacheRelease(pInfo->pBlockDataCache, pInfo->blockDataHandle[pInfo->currentLoadBlockIndex], false); + pInfo->blockDataHandle[pInfo->currentLoadBlockIndex] = h; pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk; pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1; tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow, idStr); - return &pInfo->blockData[pInfo->currentLoadBlockIndex]; + return pBlockData; _exit: if (code != TSDB_CODE_SUCCESS) { @@ -404,7 +407,8 @@ int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t return code; } -void tLDataIterClose(SLDataIter *pIter) { /*taosMemoryFree(pIter); */} +void tLDataIterClose(SLDataIter *pIter) { /*taosMemoryFree(pIter); */ +} void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) { int32_t step = pIter->backward ? -1 : 1; @@ -468,7 +472,6 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) { bool hasVal = false; int32_t i = pIter->iRow; - SBlockData *pBlockData = loadLastBlock(pIter, idStr); // mostly we only need to find the start position for a given table @@ -573,7 +576,6 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { } if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) { - taosLRUCacheRelease(pIter->pReader->pTsdb->sttBlockCache, pIter->pBlockLoadInfo->blockDataHandle, false); tLDataIterNextBlock(pIter, idStr); if (pIter->pSttBlk == NULL) { // no more data goto _exit; @@ -632,7 +634,7 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, - bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter) { + bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter) { int32_t code = TSDB_CODE_SUCCESS; pMTree->backward = backward; @@ -641,7 +643,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead if (!pMTree->backward) { // asc tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn); - } else { // desc + } else { // desc tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 29e59daeb9fa4ae6510f0117fa6af82cab39d152..5fa8700de219aa9829d780a009cd73e0efda3ed7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -561,6 +561,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb int32_t numOfStt = pReader->pTsdb->pVnode->config.sttTrigger; pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pInfo->colId[1], pInfo->numOfCols - 1, numOfStt); + setLastBlockLoadInfoCache(pLReader->pInfo, pReader->pTsdb->sttBlockCache); if (pLReader->pInfo == NULL) { tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr); return terrno;