From c6daa68e5420f893218503c0000cb971ea1d9438 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 25 Apr 2023 00:17:37 +0800 Subject: [PATCH] enh(query): opt merge life cycle. --- source/dnode/vnode/src/inc/tsdb.h | 8 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 189 +++++++++++++------------ source/dnode/vnode/src/tsdb/tsdbUtil.c | 64 +++------ 3 files changed, 127 insertions(+), 134 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index c3a0db466b..b2b04abff1 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -126,11 +126,13 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter); // SRowMerger int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema); int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); - -// int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); void tsdbRowMergerClear(SRowMerger *pMerger); -// int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow); int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow); + +int32_t tsdbRowMergerInit_rv(SRowMerger* pMerger, STSchema *pSchema); +void tsdbRowMergerClear_rv(SRowMerger* pMerger); +void tsdbRowMergerCleanup_rv(SRowMerger* pMerger); + // TABLEID int32_t tTABLEIDCmprFn(const void *p1, const void *p2); // TSDBKEY diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 042b31ee09..dd6913039c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -157,6 +157,7 @@ typedef struct SReaderStatus { SFilesetIter fileIter; SDataBlockIter blockIter; SLDataIter* pLDataIter; + SRowMerger merger; } SReaderStatus; typedef struct SBlockInfoBuf { @@ -166,6 +167,15 @@ typedef struct SBlockInfoBuf { int32_t numOfTables; } SBlockInfoBuf; +typedef struct STsdbReaderAttr { + STSchema* pSchema; + EReadMode readMode; + uint64_t rowsNum; + STimeWindow window; + bool freeBlock; + SVersionRange verRange; +} STsdbReaderAttr; + struct STsdbReader { STsdb* pTsdb; SVersionRange verRange; @@ -185,28 +195,26 @@ struct STsdbReader { SBlockLoadSuppInfo suppInfo; STsdbReadSnap* pReadSnap; SIOCostSummary cost; - STSchema* pSchema; // the newest version schema - SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema - SDataFReader* pFileReader; // the file reader - SDelFReader* pDelFReader; // the del file reader - SArray* pDelIdx; // del file block index; - SBlockInfoBuf blockInfoBuf; - int32_t step; - STsdbReader* innerReader[2]; + STSchema* pSchema; // the newest version schema + SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema + SDataFReader* pFileReader; // the file reader + SDelFReader* pDelFReader; // the del file reader + SArray* pDelIdx; // del file block index; + SBlockInfoBuf blockInfoBuf; + int32_t step; + STsdbReader* innerReader[2]; }; static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader); static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); -static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, - SRowMerger* pMerger); +static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger, SVersionRange* pVerRange, const char* id); -static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, - STsdbReader* pReader); +static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, - STableBlockScanInfo* pInfo); + STableBlockScanInfo* pScanInfo); static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); @@ -214,7 +222,7 @@ static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* SVersionRange* pVerRange); static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, - TSDBROW* pTSRow, STsdbReader* pReader, bool* freeTSRow); + TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow); static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow); static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, @@ -235,7 +243,7 @@ static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order static STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id); -static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid); +static STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid); static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } @@ -1889,7 +1897,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas return code; } -static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) { +STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) { if (pReader->pSchema != NULL) { return pReader->pSchema; } @@ -1912,6 +1920,12 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* terrno = code; return NULL; } + + code = tsdbRowMergerInit_rv(&pReader->status.merger, pReader->pSchema); + if (code != 0) { + terrno = code; + return NULL; + } } if (pReader->pSchema && sversion == pReader->pSchema->version) { @@ -1989,11 +2003,11 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (pReader->order == TSDB_ORDER_ASC) { if (minKey == key) { init = true; // todo check if pReader->pSchema is null or not - int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema); + int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); } if (minKey == tsLast) { @@ -2002,7 +2016,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(&merge, &fRow1, NULL); } else { init = true; - int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema); + int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2019,12 +2033,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(&merge, pRow, pSchema); } else { init = true; - int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema); + int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } } - int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2037,12 +2051,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return terrno; } - int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema); + int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } - code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) { return code; } @@ -2054,7 +2068,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(&merge, &fRow1, NULL); } else { init = true; - int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema); + int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2067,12 +2081,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(&merge, &fRow, NULL); } else { init = true; - int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema); + int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); } } @@ -2112,7 +2126,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, pBlockScanInfo->lastKey = tsLastBlock; return TSDB_CODE_SUCCESS; } else { - code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema); + code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2136,7 +2150,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, } } } else { // not merge block data - code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema); + code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2145,7 +2159,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, // merge with block data if ts == key if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) { - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); } code = tsdbRowMergerGetRow(&merge, &pTSRow); @@ -2185,22 +2199,22 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); } else if (key == ts) { - SRow* pTSRow = NULL; - SRowMerger merge = {0}; + SRow* pTSRow = NULL; + SRowMerger* pMerger = &pReader->status.merger; - int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema); + int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - tsdbRowMergerAdd(&merge, &fRow1, NULL); + tsdbRowMergerAdd(pMerger, &fRow1, NULL); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->verRange, pReader->idStr); - code = tsdbRowMergerGetRow(&merge, &pTSRow); + code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2208,7 +2222,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tsdbRowMergerClear(&merge); + tsdbRowMergerClear_rv(pMerger); return code; } else { return TSDB_CODE_SUCCESS; @@ -2296,12 +2310,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == key) { init = true; TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema); + code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); } if (minKey == tsLast) { @@ -2310,7 +2324,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(&merge, &fRow1, NULL); } else { init = true; - code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema); + code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2324,14 +2338,13 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(&merge, piRow, piSchema); } else { init = true; - code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema); + code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema); if (code != TSDB_CODE_SUCCESS) { return code; } } - code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, - pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2346,13 +2359,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(&merge, pRow, pSchema); } else { // STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema); + code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } } - code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, - pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2360,13 +2372,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } else { if (minKey == k.ts) { init = true; - code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema); + code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } - code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, - pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2377,14 +2388,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(&merge, piRow, piSchema); } else { init = true; - // STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); - code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema); + code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema); if (code != TSDB_CODE_SUCCESS) { return code; } } - code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, - pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2396,7 +2405,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(&merge, &fRow1, NULL); } else { init = true; - code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema); + code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2407,7 +2416,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == key) { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); if (!init) { - code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema); + code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2417,7 +2426,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } tsdbRowMergerAdd(&merge, &fRow, NULL); } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); } } @@ -2598,15 +2607,13 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); SRow* pTSRow = NULL; - SRowMerger merge = {0}; - - code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema); + code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - code = tsdbRowMergerGetRow(&merge, &pTSRow); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); + code = tsdbRowMergerGetRow(&pReader->status.merger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2614,7 +2621,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tsdbRowMergerClear(&merge); + tsdbRowMergerClear_rv(&pReader->status.merger); return code; } } @@ -3685,8 +3692,9 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p } } -int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, - STsdbReader* pReader) { +int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader) { + SRowMerger* pMerger = &pReader->status.merger; + while (1) { pIter->hasVal = tsdbTbDataIterNext(pIter->iter); if (!pIter->hasVal) { @@ -3765,10 +3773,10 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn return code; } -int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, - SRowMerger* pMerger) { +int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + SRowMerger* pMerger = &pReader->status.merger; bool asc = ASCENDING_TRAVERSE(pReader->order); int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; int32_t step = asc ? 1 : -1; @@ -3847,7 +3855,6 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, } } - SRowMerger merge = {0}; terrno = 0; int32_t code = 0; @@ -3859,8 +3866,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, return terrno; } - STSchema* ps = (pReader->pSchema != NULL)? pReader->pSchema:pTSchema; - code = tsdbRowMergerInit(&merge, ps, ¤t, pTSchema); + code = tsdbRowMergerAdd(&pReader->status.merger, ¤t, pTSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3870,28 +3876,28 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, return terrno; } - tsdbRowMergerAdd(&merge, pNextRow, pTSchema1); + tsdbRowMergerAdd(&pReader->status.merger,pNextRow, pTSchema1); } else { // let's merge rows in file block - code = tsdbRowMergerInit(&merge, NULL, ¤t, pReader->pSchema); + code = tsdbRowMergerAdd(&pReader->status.merger, ¤t, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } - tsdbRowMergerAdd(&merge, pNextRow, NULL); + tsdbRowMergerAdd(&pReader->status.merger,pNextRow, NULL); } - code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(¤t), pDelList, &merge, pReader); + code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(¤t), pDelList, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } - code = tsdbRowMergerGetRow(&merge, &pResRow->pTSRow); + code = tsdbRowMergerGetRow(&pReader->status.merger, &pResRow->pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } pResRow->type = TSDBROW_ROW_FMT; - tsdbRowMergerClear(&merge); + tsdbRowMergerClear_rv(&pReader->status.merger); *freeTSRow = true; return TSDB_CODE_SUCCESS; @@ -3899,7 +3905,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow) { - SRowMerger merge = {0}; + SRowMerger* pMerger = &pReader->status.merger; TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); @@ -3914,46 +3920,43 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p } if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem - int32_t code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema); + int32_t code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema); if (code != TSDB_CODE_SUCCESS) { return code; } - code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, - pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } - tsdbRowMergerAdd(&merge, pRow, pSchema); + tsdbRowMergerAdd(&pReader->status.merger,pRow, pSchema); code = - doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } } else { - int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema); - if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) { + int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); + if (code != TSDB_CODE_SUCCESS || pMerger->pTSchema == NULL) { return code; } - code = - doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } - tsdbRowMergerAdd(&merge, piRow, piSchema); - code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, - pReader); + tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema); + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } } - int32_t code = tsdbRowMergerGetRow(&merge, pTSRow); - tsdbRowMergerClear(&merge); + int32_t code = tsdbRowMergerGetRow(pMerger, pTSRow); + tsdbRowMergerClear_rv(pMerger); return code; } @@ -4334,6 +4337,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL } } + if (pReader->pSchema != NULL) { + tsdbRowMergerInit_rv(&pReader->status.merger, pReader->pSchema); + } + pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash); if (pReader->pSchemaMap == NULL) { tsdbError("failed init schema hash for reader %s", pReader->idStr); @@ -4483,6 +4490,8 @@ void tsdbReaderClose(STsdbReader* pReader) { pCost->initDelSkylineIterTime, pReader->idStr); taosMemoryFree(pReader->idStr); + + tsdbRowMergerCleanup_rv(&pReader->status.merger); taosMemoryFree(pReader->pSchema); tSimpleHashCleanup(pReader->pSchemaMap); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 8e778da877..909d354c38 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -778,58 +778,40 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) pMerger->version = key.version; return code; } -/* -int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { - int32_t code = 0; - TSDBKEY key = TSDBROW_KEY(pRow); - SColVal *pColVal = &(SColVal){0}; - STColumn *pTColumn; - - pMerger->pTSchema = pTSchema; - pMerger->version = key.version; - pMerger->pArray = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); +int32_t tsdbRowMergerInit_rv(SRowMerger* pMerger, STSchema *pSchema) { + pMerger->pTSchema = pSchema; + pMerger->pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal)); if (pMerger->pArray == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + return TSDB_CODE_OUT_OF_MEMORY; + } else { + return TSDB_CODE_SUCCESS; } +} - // ts - pTColumn = &pTSchema->columns[0]; - - ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP); - - *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = key.ts}); - if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; +void tsdbRowMergerClear_rv(SRowMerger* pMerger) { + for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) { + SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol); + if (IS_VAR_DATA_TYPE(pTColVal->type)) { + tFree(pTColVal->value.pData); + } } - // other - for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) { - tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); - if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) { - uint8_t *pVal = pColVal->value.pData; - - pColVal->value.pData = NULL; - code = tRealloc(&pColVal->value.pData, pColVal->value.nData); - if (code) goto _exit; - - if (pColVal->value.nData) { - memcpy(pColVal->value.pData, pVal, pColVal->value.nData); - } - } + taosArrayClear(pMerger->pArray); +} - if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; +void tsdbRowMergerCleanup_rv(SRowMerger* pMerger) { + int32_t numOfCols = taosArrayGetSize(pMerger->pArray); + for (int32_t iCol = 1; iCol < numOfCols; iCol++) { + SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol); + if (IS_VAR_DATA_TYPE(pTColVal->type)) { + tFree(pTColVal->value.pData); } } -_exit: - return code; + taosArrayDestroy(pMerger->pArray); } -*/ + void tsdbRowMergerClear(SRowMerger *pMerger) { for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) { SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol); -- GitLab