From 47417e003e4257c002939bc3d1e0801993861a1f Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 9 Mar 2023 18:54:15 +0800 Subject: [PATCH] enh: optimize last/last_row cost when the cache was first loaded --- source/dnode/vnode/src/inc/tsdb.h | 1 + source/dnode/vnode/src/tsdb/tsdbCache.c | 14 ++++++++------ source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 15 +++++++++++++++ 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 15087e90f0..b2e1e8ab34 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -780,6 +780,7 @@ typedef struct SCacheRowsReader { SDataFReader *pDataFReader; SDataFReader *pDataFReaderLast; const char *idstr; + int64_t lastTs; } SCacheRowsReader; typedef struct { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index dcf7286c00..244530314a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -722,6 +722,7 @@ typedef struct SFSNextRowIter { int32_t iRow; TSDBROW row; SSttBlockLoadInfo *pLoadInfo; + int64_t lastTs; } SFSNextRowIter; static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { @@ -816,12 +817,12 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { SDataBlk block = {0}; tDataBlkReset(&block); - // tBlockDataReset(&state->blockData); tBlockDataReset(state->pBlockData); tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk); - /* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL); - */ + if (block.maxKey.ts <= state->lastTs) { + goto _next_fileset; + } tBlockDataReset(state->pBlockData); TABLEID tid = {.suid = state->suid, .uid = state->uid}; code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, NULL, 0); @@ -1070,7 +1071,7 @@ typedef struct { static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, SSttBlockLoadInfo *pLoadInfo, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader, - SDataFReader **pDataFReaderLast) { + SDataFReader **pDataFReaderLast, int64_t lastTs) { int code = 0; STbData *pMem = NULL; @@ -1141,6 +1142,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->fsState.uid = uid; pIter->fsState.pLoadInfo = pLoadInfo; pIter->fsState.pDataFReader = pDataFReader; + pIter->fsState.lastTs = lastTs; pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL}; pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL}; @@ -1315,7 +1317,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo CacheNextRowIter iter = {0}; nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader, - &pr->pDataFReaderLast); + &pr->pDataFReaderLast, pr->lastTs); do { TSDBROW *pRow = NULL; @@ -1453,7 +1455,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach CacheNextRowIter iter = {0}; nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader, - &pr->pDataFReaderLast); + &pr->pDataFReaderLast, pr->lastTs); do { TSDBROW *pRow = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 5528b6313c..d925950703 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -199,6 +199,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, p->idstr = taosStrdup(idstr); taosThreadMutexInit(&p->readerMutex, NULL); + p->lastTs = INT64_MIN; + *pReader = p; return TSDB_CODE_SUCCESS; } @@ -347,6 +349,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } { + bool hasNotNullRow = true; + int64_t minTs = INT64_MAX; for (int32_t k = 0; k < pr->numOfCols; ++k) { int32_t slotId = slotIds[k]; @@ -357,6 +361,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 hasRes = true; p->ts = pCol->ts; p->colVal = pCol->colVal; + minTs = pCol->ts; // only set value for last row query if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) { @@ -373,11 +378,17 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 if (pColVal->ts > p->ts) { if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) { + if (!COL_VAL_IS_VALUE(&p->colVal)) { + hasNotNullRow = false; + } continue; } hasRes = true; p->ts = pColVal->ts; + if (pColVal->ts < minTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) { + minTs = pColVal->ts; + } if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) { p->colVal = pColVal->colVal; @@ -394,6 +405,10 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } } + + if (hasNotNullRow) { + pr->lastTs = minTs; + } } tsdbCacheRelease(lruCache, h); -- GitLab