From 832b68e17d4aefe42347743292300d6a811d99d0 Mon Sep 17 00:00:00 2001 From: Minglei Jin <mljin@taosdata.com> Date: Mon, 17 Jul 2023 15:48:08 +0800 Subject: [PATCH] tsdb/cache: load mem del data --- source/dnode/vnode/src/tsdb/tsdbCache.c | 79 +++++++--------------- source/dnode/vnode/src/tsdb/tsdbRead2.c | 2 +- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 22 +++--- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 2 +- 4 files changed, 39 insertions(+), 66 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index feaff677da..1fef428826 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -14,6 +14,7 @@ */ #include "tsdb.h" #include "tsdbDataFileRW.h" +#include "tsdbReadUtil.h" #include "vnd.h" #define ROCKS_BATCH_SIZE (4096) @@ -1720,19 +1721,22 @@ typedef struct { SMergeTree *pMergeTree; } SFSLastIter; -static int32_t loadSttTombData(STsdbReader *pReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) { +static int32_t loadSttTombData(STsdbReader *pTsdbReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) { int32_t code = 0; - /* + + SCacheRowsReader *pReader = (SCacheRowsReader *)pTsdbReader; + if (pLoadInfo->pTombBlockArray == NULL) { pLoadInfo->pTombBlockArray = taosArrayInit(4, POINTER_BYTES); } const TTombBlkArray *pBlkArray = NULL; - int32_t code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray); + code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray); if (code != TSDB_CODE_SUCCESS) { return code; } + /* return doLoadTombDataFromTombBlk(pBlkArray, pReader, pSttFileReader, false); */ return code; @@ -2482,45 +2486,6 @@ _err: return code; } -/* static int32_t tsRowFromTsdbRow(STSchema *pTSchema, TSDBROW *pRow, STSRow **ppRow) { */ -/* int32_t code = 0; */ - -/* SColVal *pColVal = &(SColVal){0}; */ - -/* if (pRow->type == 0) { */ -/* *ppRow = tdRowDup(pRow->pTSRow); */ -/* } else { */ -/* SArray *pArray = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); */ -/* if (pArray == NULL) { */ -/* code = TSDB_CODE_OUT_OF_MEMORY; */ -/* goto _exit; */ -/* } */ - -/* TSDBKEY key = TSDBROW_KEY(pRow); */ -/* STColumn *pTColumn = &pTSchema->columns[0]; */ -/* *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts}); */ - -/* if (taosArrayPush(pArray, pColVal) == NULL) { */ -/* code = TSDB_CODE_OUT_OF_MEMORY; */ -/* goto _exit; */ -/* } */ - -/* for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) { */ -/* tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); */ -/* if (taosArrayPush(pArray, pColVal) == NULL) { */ -/* code = TSDB_CODE_OUT_OF_MEMORY; */ -/* goto _exit; */ -/* } */ -/* } */ - -/* code = tdSTSRowNew(pArray, pTSchema, ppRow); */ -/* if (code) goto _exit; */ -/* } */ - -/* _exit: */ -/* return code; */ -/* } */ - static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) { bool deleted = false; while (*iSkyline > 0) { @@ -2567,22 +2532,21 @@ typedef struct { } TsdbNextRowState; typedef struct CacheNextRowIter { - SArray *pSkyline; - int64_t iSkyline; - - SBlockIdx idx; - SMemNextRowIter memState; - SMemNextRowIter imemState; - SFSNextRowIter fsState; - TSDBROW memRow, imemRow, fsLastRow, fsRow; - + SArray *pMemDelData; + SArray *pSkyline; + int64_t iSkyline; + SBlockIdx idx; + SMemNextRowIter memState; + SMemNextRowIter imemState; + SFSNextRowIter fsState; + TSDBROW memRow, imemRow, fsLastRow, fsRow; TsdbNextRowState input[4]; STsdb *pTsdb; } CacheNextRowIter; static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader, - SDataFReader **pDataFReaderLast, int64_t lastTs) { + SDataFReader **pDataFReaderLast, int64_t lastTs, SCacheRowsReader *pr) { int code = 0; STbData *pMem = NULL; @@ -2597,6 +2561,9 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->pTsdb = pTsdb; + pIter->pMemDelData = NULL; + loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer); + pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); #if 0 SDelFile *pDelFile = pReadSnap->fs.pDelFile; @@ -2700,6 +2667,10 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) { taosArrayDestroy(pIter->pSkyline); } + if (pIter->pMemDelData) { + taosArrayDestroy(pIter->pMemDelData); + } + _err: return code; } @@ -2865,7 +2836,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC CacheNextRowIter iter = {0}; nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLDataIterArray, pr->pReadSnap, &pr->pDataFReader, - &pr->pDataFReaderLast, pr->lastTs); + &pr->pDataFReaderLast, pr->lastTs, pr); do { TSDBROW *pRow = NULL; @@ -3035,7 +3006,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, CacheNextRowIter iter = {0}; nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLDataIterArray, pr->pReadSnap, &pr->pDataFReader, - &pr->pDataFReaderLast, pr->lastTs); + &pr->pDataFReaderLast, pr->lastTs, pr); do { TSDBROW *pRow = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 113d939bb8..a00fafcbf9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2037,7 +2037,7 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea return code; } - loadMemTombData(pBlockScanInfo, d, di, pReader->info.verRange.maxVer); + loadMemTombData(&pBlockScanInfo->pMemDelData, d, di, pReader->info.verRange.maxVer); pBlockScanInfo->iterInit = true; return TSDB_CODE_SUCCESS; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index a2f32c86a3..495ad9185e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -13,6 +13,7 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include "tsdbReadUtil.h" #include "osDef.h" #include "tsdb.h" #include "tsdbDataFileRW.h" @@ -20,7 +21,6 @@ #include "tsdbMerge.h" #include "tsdbUtil2.h" #include "tsimplehash.h" -#include "tsdbReadUtil.h" static int32_t uidComparFunc(const void* p1, const void* p2) { uint64_t pu1 = *(uint64_t*)p1; @@ -167,7 +167,7 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES); tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, - pScanInfo->lastKey, pTsdbReader->idStr); + pScanInfo->lastKey, pTsdbReader->idStr); } taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc); @@ -518,8 +518,8 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_ } // load tomb data API -static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STsdbReader* pReader, - void* pFileReader, bool isFile) { +static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STsdbReader* pReader, void* pFileReader, + bool isFile) { int32_t code = 0; STableUidList* pList = &pReader->status.uidList; int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); @@ -595,7 +595,7 @@ int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileRead } const TTombBlkArray* pBlkArray = NULL; - int32_t code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray); + int32_t code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -603,17 +603,19 @@ int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileRead return doLoadTombDataFromTombBlk(pBlkArray, pReader, pSttFileReader, false); } -void loadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) { - if (pScanInfo->pMemDelData == NULL) { - pScanInfo->pMemDelData = taosArrayInit(4, sizeof(SDelData)); +void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) { + if (*ppMemDelData == NULL) { + *ppMemDelData = taosArrayInit(4, sizeof(SDelData)); } + SArray* pMemDelData = *ppMemDelData; + SDelData* p = NULL; if (pMemTbData != NULL) { p = pMemTbData->pHead; while (p) { if (p->version <= ver) { - taosArrayPush(pScanInfo->pMemDelData, p); + taosArrayPush(pMemDelData, p); } p = p->pNext; @@ -624,7 +626,7 @@ void loadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbData, STbDat p = piMemTbData->pHead; while (p) { if (p->version <= ver) { - taosArrayPush(pScanInfo->pMemDelData, p); + taosArrayPush(pMemDelData, p); } p = p->pNext; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index c343eafb54..5c4737440d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -237,7 +237,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr); // load tomb data API (stt/mem only for one table each, tomb data from data files are load for all tables at one time) -void loadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbData, STbData* piMemTbData, int64_t ver); +void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver); int32_t loadDataFileTombDataForAll(STsdbReader* pReader); int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo); -- GitLab