From 89f7ad2920121ce4f2af8a17f486e69c6d10ae49 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 25 Oct 2022 10:36:30 +0800 Subject: [PATCH] fix: optimize tsdb cache loading speed --- source/dnode/vnode/inc/vnode.h | 3 +- source/dnode/vnode/src/inc/tsdb.h | 61 ++++-- source/dnode/vnode/src/tsdb/tsdbCache.c | 211 ++++++++++--------- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 46 ++-- source/libs/executor/src/cachescanoperator.c | 14 +- 5 files changed, 188 insertions(+), 147 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index be0d6fdc4d..8352910d51 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -163,7 +163,8 @@ void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t getReaderMaxVersion(STsdbReader *pReader); -int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader); +int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, uint64_t suid, + void **pReader); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index dfbbd8fbd0..89be94a35d 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -298,29 +298,6 @@ int32_t tsdbMerge(STsdb *pTsdb); #define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0) #define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0) -// tsdbCache ============================================================================================== -typedef struct { - TSKEY ts; - SColVal colVal; -} SLastCol; - -int32_t tsdbOpenCache(STsdb *pTsdb); -void tsdbCloseCache(STsdb *pTsdb); -int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb); -int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup); -int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); -int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); -int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); - -int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); -int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); -int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); - -void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); -size_t tsdbCacheGetCapacity(SVnode *pVnode); - -int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema); - // tsdbDiskData ============================================================================================== int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder); void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder); @@ -729,6 +706,44 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el); void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); +// tsdbCache ============================================================================================== +typedef struct SCacheRowsReader { + SVnode *pVnode; + STSchema *pSchema; + uint64_t uid; + uint64_t suid; + char **transferBuf; // todo remove it soon + int32_t numOfCols; + int32_t type; + int32_t tableIndex; // currently returned result tables + SArray *pTableList; // table id list + SSttBlockLoadInfo *pLoadInfo; + STsdbReadSnap *pReadSnap; + SDataFReader *pDataFReader; +} SCacheRowsReader; + +typedef struct { + TSKEY ts; + SColVal colVal; +} SLastCol; + +int32_t tsdbOpenCache(STsdb *pTsdb); +void tsdbCloseCache(STsdb *pTsdb); +int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb); +int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup); +int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h); +int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h); +int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); + +int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); +int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); +int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); + +void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); +size_t tsdbCacheGetCapacity(SVnode *pVnode); + +int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema); + // ========== inline functions ========== static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { TSDBKEY *pKey1 = (TSDBKEY *)p1; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index bb394e8acc..f1d079f118 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -26,7 +26,7 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { goto _err; } - taosLRUCacheSetStrictCapacity(pCache, true); + taosLRUCacheSetStrictCapacity(pCache, false); taosThreadMutexInit(&pTsdb->lruMutex, NULL); @@ -488,11 +488,12 @@ typedef struct { int32_t nFileSet; int32_t iFileSet; SArray *aDFileSet; - SDataFReader *pDataFReader; + SDataFReader **pDataFReader; TSDBROW row; - SMergeTree mergeTree; - SMergeTree *pMergeTree; + SMergeTree mergeTree; + SMergeTree *pMergeTree; + SSttBlockLoadInfo *pLoadInfo; } SFSLastNextRowIter; static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { @@ -519,18 +520,20 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { return code; } - if (state->pDataFReader != NULL) { - tsdbDataFReaderClose(&state->pDataFReader); - state->pDataFReader = NULL; - } + if (*state->pDataFReader == NULL || (*state->pDataFReader)->pSet->fid != pFileSet->fid) { + if (*state->pDataFReader != NULL) { + tsdbDataFReaderClose(state->pDataFReader); - code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet); - if (code) goto _err; + resetLastBlockLoadInfo(state->pLoadInfo); + } - SSttBlockLoadInfo *pLoadInfo = tCreateLastBlockLoadInfo(state->pTSchema, NULL, 0); - tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid, + code = tsdbDataFReaderOpen(state->pDataFReader, state->pTsdb, pFileSet); + if (code) goto _err; + } + + tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid, &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, - &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, pLoadInfo, true, NULL); + &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL); state->pMergeTree = &state->mergeTree; bool hasVal = tMergeTreeNext(&state->mergeTree); if (!hasVal) { @@ -554,10 +557,10 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { } _err: - if (state->pDataFReader) { + /*if (state->pDataFReader) { tsdbDataFReaderClose(&state->pDataFReader); state->pDataFReader = NULL; - } + }*/ if (state->pMergeTree != NULL) { tMergeTreeClose(state->pMergeTree); state->pMergeTree = NULL; @@ -575,12 +578,12 @@ int32_t clearNextRowFromFSLast(void *iter) { if (!state) { return code; } - + /* if (state->pDataFReader) { tsdbDataFReaderClose(&state->pDataFReader); state->pDataFReader = NULL; } - + */ if (state->pMergeTree != NULL) { tMergeTreeClose(state->pMergeTree); state->pMergeTree = NULL; @@ -597,27 +600,28 @@ typedef enum SFSNEXTROWSTATES { } SFSNEXTROWSTATES; typedef struct SFSNextRowIter { - SFSNEXTROWSTATES state; // [input] - STsdb *pTsdb; // [input] - SBlockIdx *pBlockIdxExp; // [input] - STSchema *pTSchema; // [input] - tb_uid_t suid; - tb_uid_t uid; - int32_t nFileSet; - int32_t iFileSet; - SArray *aDFileSet; - SDataFReader *pDataFReader; - SArray *aBlockIdx; - SBlockIdx *pBlockIdx; - SMapData blockMap; - int32_t nBlock; - int32_t iBlock; - SDataBlk block; - SBlockData blockData; - SBlockData *pBlockData; - int32_t nRow; - int32_t iRow; - TSDBROW row; + SFSNEXTROWSTATES state; // [input] + STsdb *pTsdb; // [input] + SBlockIdx *pBlockIdxExp; // [input] + STSchema *pTSchema; // [input] + tb_uid_t suid; + tb_uid_t uid; + int32_t nFileSet; + int32_t iFileSet; + SArray *aDFileSet; + SDataFReader **pDataFReader; + SArray *aBlockIdx; + SBlockIdx *pBlockIdx; + SMapData blockMap; + int32_t nBlock; + int32_t iBlock; + SDataBlk block; + SBlockData blockData; + SBlockData *pBlockData; + int32_t nRow; + int32_t iRow; + TSDBROW row; + SSttBlockLoadInfo *pLoadInfo; } SFSNextRowIter; static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { @@ -648,8 +652,16 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { return code; } - code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet); - if (code) goto _err; + if (*state->pDataFReader == NULL || (*state->pDataFReader)->pSet->fid != pFileSet->fid) { + if (*state->pDataFReader != NULL) { + tsdbDataFReaderClose(state->pDataFReader); + + resetLastBlockLoadInfo(state->pLoadInfo); + } + + code = tsdbDataFReaderOpen(state->pDataFReader, state->pTsdb, pFileSet); + if (code) goto _err; + } // tMapDataReset(&state->blockIdxMap); if (!state->aBlockIdx) { @@ -657,7 +669,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { } else { taosArrayClear(state->aBlockIdx); } - code = tsdbReadBlockIdx(state->pDataFReader, state->aBlockIdx); + code = tsdbReadBlockIdx(*state->pDataFReader, state->aBlockIdx); if (code) goto _err; /* if (state->pBlockIdx) { */ @@ -666,17 +678,20 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { * &state->blockIdx); */ state->pBlockIdx = taosArraySearch(state->aBlockIdx, state->pBlockIdxExp, tCmprBlockIdx, TD_EQ); - if (!state->pBlockIdx) { - tsdbDataFReaderClose(&state->pDataFReader); - state->pDataFReader = NULL; + if (!state->pBlockIdx) { /* + tsdbDataFReaderClose(state->pDataFReader); + *state->pDataFReader = NULL; + resetLastBlockLoadInfo(state->pLoadInfo);*/ goto _next_fileset; } + tMapDataReset(&state->blockMap); + /* if (state->blockMap.pData != NULL) { tMapDataClear(&state->blockMap); } - - code = tsdbReadDataBlk(state->pDataFReader, state->pBlockIdx, &state->blockMap); + */ + code = tsdbReadDataBlk(*state->pDataFReader, state->pBlockIdx, &state->blockMap); if (code) goto _err; state->nBlock = state->blockMap.nItem; @@ -703,7 +718,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, NULL, 0); if (code) goto _err; - code = tsdbReadDataBlock(state->pDataFReader, &block, state->pBlockData); + code = tsdbReadDataBlock(*state->pDataFReader, &block, state->pBlockData); if (code) goto _err; state->nRow = state->blockData.nRow; @@ -719,8 +734,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { if (--state->iRow < 0) { state->state = SFSNEXTROW_BLOCKDATA; if (--state->iBlock < 0) { - tsdbDataFReaderClose(&state->pDataFReader); - state->pDataFReader = NULL; + tsdbDataFReaderClose(state->pDataFReader); + *state->pDataFReader = NULL; + resetLastBlockLoadInfo(state->pLoadInfo); if (state->aBlockIdx) { taosArrayDestroy(state->aBlockIdx); @@ -739,16 +755,17 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { } _err: - if (state->pDataFReader) { - tsdbDataFReaderClose(&state->pDataFReader); - state->pDataFReader = NULL; - } + /* + if (*state->pDataFReader) { + tsdbDataFReaderClose(state->pDataFReader); + *state->pDataFReader = NULL; + resetLastBlockLoadInfo(state->pLoadInfo); + }*/ if (state->aBlockIdx) { taosArrayDestroy(state->aBlockIdx); state->aBlockIdx = NULL; } if (state->pBlockData) { - // tBlockDataDestroy(&state->blockData, 1); tBlockDataDestroy(state->pBlockData, 1); state->pBlockData = NULL; } @@ -765,11 +782,11 @@ int32_t clearNextRowFromFS(void *iter) { if (!state) { return code; } - + /* if (state->pDataFReader) { tsdbDataFReaderClose(&state->pDataFReader); state->pDataFReader = NULL; - } + }*/ if (state->aBlockIdx) { taosArrayDestroy(state->aBlockIdx); state->aBlockIdx = NULL; @@ -930,25 +947,21 @@ typedef struct { TSDBROW memRow, imemRow, fsLastRow, fsRow; TsdbNextRowState input[4]; - STsdbReadSnap *pReadSnap; STsdb *pTsdb; } CacheNextRowIter; -static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema) { +static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, + SSttBlockLoadInfo *pLoadInfo, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader) { int code = 0; - tb_uid_t suid = getTableSuidByUid(uid, pTsdb); - - tsdbTakeReadSnap(pTsdb, &pIter->pReadSnap, NULL); - STbData *pMem = NULL; - if (pIter->pReadSnap->pMem) { - pMem = tsdbGetTbDataFromMemTable(pIter->pReadSnap->pMem, suid, uid); + if (pReadSnap->pMem) { + pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid); } STbData *pIMem = NULL; - if (pIter->pReadSnap->pIMem) { - pIMem = tsdbGetTbDataFromMemTable(pIter->pReadSnap->pIMem, suid, uid); + if (pReadSnap->pIMem) { + pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid); } pIter->pTsdb = pTsdb; @@ -957,7 +970,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs SDelIdx delIdx; - SDelFile *pDelFile = pIter->pReadSnap->fs.pDelFile; + SDelFile *pDelFile = pReadSnap->fs.pDelFile; if (pDelFile) { SDelFReader *pDelFReader; @@ -988,18 +1001,22 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS; pIter->fsLastState.pTsdb = pTsdb; - pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet; + pIter->fsLastState.aDFileSet = pReadSnap->fs.aDFileSet; pIter->fsLastState.pTSchema = pTSchema; pIter->fsLastState.suid = suid; pIter->fsLastState.uid = uid; + pIter->fsLastState.pLoadInfo = pLoadInfo; + pIter->fsLastState.pDataFReader = pDataFReader; pIter->fsState.state = SFSNEXTROW_FS; pIter->fsState.pTsdb = pTsdb; - pIter->fsState.aDFileSet = pIter->pReadSnap->fs.aDFileSet; + pIter->fsState.aDFileSet = pReadSnap->fs.aDFileSet; pIter->fsState.pBlockIdxExp = &pIter->idx; pIter->fsState.pTSchema = pTSchema; pIter->fsState.suid = suid; pIter->fsState.uid = uid; + pIter->fsState.pLoadInfo = pLoadInfo; + pIter->fsState.pDataFReader = pDataFReader; pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL}; pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL}; @@ -1040,8 +1057,6 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) { taosArrayDestroy(pIter->pSkyline); } - tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap, NULL); - _err: return code; } @@ -1119,10 +1134,10 @@ _err: return code; } -static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray) { +static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray, SCacheRowsReader *pr) { int32_t code = 0; - STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); + STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); int16_t nCol = pTSchema->numOfCols; int16_t iCol = 0; int16_t noneCol = 0; @@ -1133,7 +1148,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo TSKEY lastRowTs = TSKEY_MAX; CacheNextRowIter iter = {0}; - nextRowIterOpen(&iter, uid, pTsdb, pTSchema); + nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader); do { TSDBROW *pRow = NULL; @@ -1233,20 +1248,20 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo } nextRowIterClose(&iter); - taosMemoryFreeClear(pTSchema); + // taosMemoryFreeClear(pTSchema); return code; _err: nextRowIterClose(&iter); taosArrayDestroy(pColArray); - taosMemoryFreeClear(pTSchema); + // taosMemoryFreeClear(pTSchema); return code; } -static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { +static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr) { int32_t code = 0; - STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); + STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); int16_t nCol = pTSchema->numOfCols; int16_t iCol = 0; int16_t noneCol = 0; @@ -1257,7 +1272,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { TSKEY lastRowTs = TSKEY_MAX; CacheNextRowIter iter = {0}; - nextRowIterOpen(&iter, uid, pTsdb, pTSchema); + nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader); do { TSDBROW *pRow = NULL; @@ -1350,18 +1365,18 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { } nextRowIterClose(&iter); - taosMemoryFreeClear(pTSchema); + // taosMemoryFreeClear(pTSchema); return code; _err: nextRowIterClose(&iter); - taosMemoryFreeClear(pTSchema); + // taosMemoryFreeClear(pTSchema); *ppLastArray = NULL; taosArrayDestroy(pColArray); return code; } -int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) { +int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) { int32_t code = 0; char key[32] = {0}; int keyLen = 0; @@ -1370,13 +1385,14 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH getTableCacheKey(uid, 0, key, &keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); if (!h) { + STsdb *pTsdb = pr->pVnode->pTsdb; taosThreadMutexLock(&pTsdb->lruMutex); h = taosLRUCacheLookup(pCache, key, keyLen); if (!h) { SArray *pArray = NULL; bool dup = false; // which is always false for now - code = mergeLastRow(uid, pTsdb, &dup, &pArray); + code = mergeLastRow(uid, pTsdb, &dup, &pArray, pr); // if table's empty or error, return code of -1 if (code < 0 || pArray == NULL) { if (!dup && pArray) { @@ -1392,17 +1408,17 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH size_t charge = pArray->capacity * pArray->elemSize + sizeof(*pArray); _taos_lru_deleter_t deleter = deleteTableCacheLast; - LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, NULL, TAOS_LRU_PRIORITY_LOW); + LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW); if (status != TAOS_LRU_STATUS_OK) { code = -1; } - taosThreadMutexUnlock(&pTsdb->lruMutex); + // taosThreadMutexUnlock(&pTsdb->lruMutex); - h = taosLRUCacheLookup(pCache, key, keyLen); - } else { - taosThreadMutexUnlock(&pTsdb->lruMutex); - } + // h = taosLRUCacheLookup(pCache, key, keyLen); + } // else { + taosThreadMutexUnlock(&pTsdb->lruMutex); + //} } *handle = h; @@ -1434,7 +1450,7 @@ _err: return code; } -int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) { +int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) { int32_t code = 0; char key[32] = {0}; int keyLen = 0; @@ -1443,12 +1459,13 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand getTableCacheKey(uid, 1, key, &keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); if (!h) { + STsdb *pTsdb = pr->pVnode->pTsdb; taosThreadMutexLock(&pTsdb->lruMutex); h = taosLRUCacheLookup(pCache, key, keyLen); if (!h) { SArray *pLastArray = NULL; - code = mergeLast(uid, pTsdb, &pLastArray); + code = mergeLast(uid, pTsdb, &pLastArray, pr); // if table's empty or error, return code of -1 if (code < 0 || pLastArray == NULL) { taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -1460,17 +1477,17 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand size_t charge = pLastArray->capacity * pLastArray->elemSize + sizeof(*pLastArray); _taos_lru_deleter_t deleter = deleteTableCacheLast; LRUStatus status = - taosLRUCacheInsert(pCache, key, keyLen, pLastArray, charge, deleter, NULL, TAOS_LRU_PRIORITY_LOW); + taosLRUCacheInsert(pCache, key, keyLen, pLastArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW); if (status != TAOS_LRU_STATUS_OK) { code = -1; } - taosThreadMutexUnlock(&pTsdb->lruMutex); + // taosThreadMutexUnlock(&pTsdb->lruMutex); - h = taosLRUCacheLookup(pCache, key, keyLen); - } else { - taosThreadMutexUnlock(&pTsdb->lruMutex); - } + // h = taosLRUCacheLookup(pCache, key, keyLen); + } // else { + taosThreadMutexUnlock(&pTsdb->lruMutex); + //} } *handle = h; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 83dcbc60c7..7784e6ba0a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -18,18 +18,7 @@ #include "tcommon.h" #include "tsdb.h" -typedef struct SCacheRowsReader { - SVnode* pVnode; - STSchema* pSchema; - uint64_t uid; - char** transferBuf; // todo remove it soon - int32_t numOfCols; - int32_t type; - int32_t tableIndex; // currently returned result tables - SArray* pTableList; // table id list -} SCacheRowsReader; - -#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t)) +#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t)) static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, void** pRes) { @@ -56,7 +45,7 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { varDataSetLen(p->buf, pColVal->colVal.value.nData); memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); - p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size + p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size } else { memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes); p->bytes = pReader->pSchema->columns[slotId].bytes; @@ -101,7 +90,8 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea pBlock->info.rows += 1; } -int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) { +int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, uint64_t suid, + void** pReader) { *pReader = NULL; SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); @@ -112,6 +102,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList p->type = type; p->pVnode = pVnode; p->numOfCols = numOfCols; + p->suid = suid; if (taosArrayGetSize(pTableIdList) == 0) { *pReader = p; @@ -138,6 +129,12 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList } } + p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0); + if (p->pLoadInfo == NULL) { + tsdbCacherowsReaderClose(p); + return TSDB_CODE_OUT_OF_MEMORY; + } + *pReader = p; return TSDB_CODE_SUCCESS; } @@ -154,6 +151,8 @@ void* tsdbCacherowsReaderClose(void* pReader) { taosMemoryFree(p->pSchema); } + destroyLastBlockLoadInfo(p->pLoadInfo); + taosMemoryFree(pReader); return NULL; } @@ -164,9 +163,9 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint *pRow = NULL; if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) { - code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h); + code = tsdbCacheGetLastrowH(lruCache, uid, pr, h); } else { - code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h); + code = tsdbCacheGetLastH(lruCache, uid, pr, h); } if (code != TSDB_CODE_SUCCESS) { @@ -182,7 +181,7 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint } static void freeItem(void* pItem) { - SLastCol* pCol = (SLastCol*) pItem; + SLastCol* pCol = (SLastCol*)pItem; if (IS_VAR_DATA_TYPE(pCol->colVal.type)) { taosMemoryFree(pCol->colVal.value.pData); } @@ -223,7 +222,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) { struct STColumn* pCol = &pr->pSchema->columns[i]; - SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type}; + SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type}; if (IS_VAR_DATA_TYPE(pCol->type)) { p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char)); @@ -231,6 +230,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 taosArrayPush(pLastCols, &p); } + tsdbTakeReadSnap(pr->pVnode->pTsdb, &pr->pReadSnap, "cache-l"); + pr->pDataFReader = NULL; + // retrieve the only one last row of all tables in the uid list. if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) { for (int32_t i = 0; i < numOfTables; ++i) { @@ -299,7 +301,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { - STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(pr->pTableList, i); + STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pr->pTableList, i); code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); if (code != TSDB_CODE_SUCCESS) { return code; @@ -324,7 +326,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 code = TSDB_CODE_INVALID_PARA; } - _end: +_end: + tsdbDataFReaderClose(&pr->pDataFReader); + + tsdbUntakeReadSnap(pr->pVnode->pTsdb, pr->pReadSnap, "cache-l"); + for (int32_t j = 0; j < pr->numOfCols; ++j) { taosMemoryFree(pRes[j]); } diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 8c76a3f69b..636e98e380 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -58,17 +58,19 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe // partition by tbname if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) { - pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL|(pScanNode->ignoreNull? CACHESCAN_RETRIEVE_LAST:CACHESCAN_RETRIEVE_LAST_ROW); + pInfo->retrieveType = + CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW); code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList, - taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); + taosArrayGetSize(pInfo->pColMatchInfo), pTableList->suid, &pInfo->pLastrowReader); if (code != TSDB_CODE_SUCCESS) { goto _error; } pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false); blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity); - } else { // by tags - pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE|(pScanNode->ignoreNull? CACHESCAN_RETRIEVE_LAST:CACHESCAN_RETRIEVE_LAST_ROW); + } else { // by tags + pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | + (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW); } if (pScanNode->scan.pScanPseudoCols != NULL) { @@ -184,7 +186,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex); tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList, - taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); + taosArrayGetSize(pInfo->pColMatchInfo), pTableList->suid, &pInfo->pLastrowReader); taosArrayClear(pInfo->pUidList); int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); @@ -265,4 +267,4 @@ int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInf } return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} -- GitLab