From a73957aabadb33762ca72c9917336ffe2fb494f9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 25 Apr 2023 09:24:15 +0800 Subject: [PATCH] fix(query): fix error in add row into merger. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 85 +++++++++++--------------- 1 file changed, 37 insertions(+), 48 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index dd6913039c..0bba83622c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1954,7 +1954,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) { - SRowMerger merge = {0}; + SRowMerger* pMerger = &pReader->status.merger; SRow* pTSRow = NULL; SBlockData* pBlockData = &pReader->status.fileBlockData; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -2013,7 +2013,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); if (init) { - tsdbRowMergerAdd(&merge, &fRow1, NULL); + tsdbRowMergerAdd(&pReader->status.merger, &fRow1, NULL); } else { init = true; int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema); @@ -2021,7 +2021,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &pReader->status.merger, &pReader->verRange, pReader->idStr); } if (minKey == k.ts) { @@ -2030,7 +2030,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return terrno; } if (init) { - tsdbRowMergerAdd(&merge, pRow, pSchema); + tsdbRowMergerAdd(pMerger, pRow, pSchema); } else { init = true; int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); @@ -2057,7 +2057,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); - if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) { + if (code != TSDB_CODE_SUCCESS || pMerger->pTSchema == NULL) { return code; } } @@ -2065,7 +2065,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); if (init) { - tsdbRowMergerAdd(&merge, &fRow1, NULL); + tsdbRowMergerAdd(pMerger, &fRow1, NULL); } else { init = true; int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema); @@ -2073,12 +2073,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr); } if (minKey == key) { if (init) { - tsdbRowMergerAdd(&merge, &fRow, NULL); + tsdbRowMergerAdd(pMerger, &fRow, NULL); } else { init = true; int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); @@ -2090,7 +2090,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } - int32_t code = tsdbRowMergerGetRow(&merge, &pTSRow); + int32_t code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2098,7 +2098,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tsdbRowMergerClear(&merge); + tsdbRowMergerClear_rv(pMerger); return code; } @@ -2106,12 +2106,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, bool mergeBlockData) { + SRowMerger* pMerger = &pReader->status.merger; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); bool copied = false; int32_t code = TSDB_CODE_SUCCESS; SRow* pTSRow = NULL; - SRowMerger merge = {0}; TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr); @@ -2132,10 +2132,10 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, } TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - tsdbRowMergerAdd(&merge, &fRow1, NULL); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr); + tsdbRowMergerAdd(pMerger, &fRow1, NULL); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->verRange, pReader->idStr); - code = tsdbRowMergerGetRow(&merge, &pTSRow); + code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2143,7 +2143,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tsdbRowMergerClear(&merge); + tsdbRowMergerClear_rv(pMerger); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2155,14 +2155,14 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, return code; } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->verRange, pReader->idStr); // merge with block data if ts == key if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) { doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); } - code = tsdbRowMergerGetRow(&merge, &pTSRow); + code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2170,7 +2170,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tsdbRowMergerClear(&merge); + tsdbRowMergerClear_rv(pMerger); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2237,7 +2237,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { - SRowMerger merge = {0}; + SRowMerger* pMerger = &pReader->status.merger; SRow* pTSRow = NULL; int32_t code = TSDB_CODE_SUCCESS; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -2310,7 +2310,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == key) { init = true; TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); + code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2321,24 +2321,24 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); if (init) { - tsdbRowMergerAdd(&merge, &fRow1, NULL); + tsdbRowMergerAdd(pMerger, &fRow1, NULL); } else { init = true; - code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema); + code = tsdbRowMergerAdd(pMerger, &fRow1, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr); } if (minKey == ik.ts) { if (init) { - tsdbRowMergerAdd(&merge, piRow, piSchema); + tsdbRowMergerAdd(pMerger, piRow, piSchema); } else { init = true; - code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema); + code = tsdbRowMergerAdd(pMerger, piRow, piSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2352,14 +2352,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == k.ts) { if (init) { - if (merge.pTSchema == NULL) { - return code; - } - - tsdbRowMergerAdd(&merge, pRow, pSchema); + tsdbRowMergerAdd(pMerger, pRow, pSchema); } else { // STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); + code = tsdbRowMergerAdd(pMerger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2372,7 +2368,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } else { if (minKey == k.ts) { init = true; - code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); + code = tsdbRowMergerAdd(pMerger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2385,10 +2381,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == ik.ts) { if (init) { - tsdbRowMergerAdd(&merge, piRow, piSchema); + tsdbRowMergerAdd(pMerger, piRow, piSchema); } else { init = true; - code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema); + code = tsdbRowMergerAdd(pMerger, piRow, piSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2402,39 +2398,32 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); if (init) { - tsdbRowMergerAdd(&merge, &fRow1, NULL); + tsdbRowMergerAdd(pMerger, &fRow1, NULL); } else { init = true; - code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema); + code = tsdbRowMergerAdd(pMerger, &fRow1, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr); } if (minKey == key) { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); if (!init) { - code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); + code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } } else { - if (merge.pTSchema == NULL) { - return code; - } - tsdbRowMergerAdd(&merge, &fRow, NULL); + tsdbRowMergerAdd(pMerger, &fRow, NULL); } doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); } } - if (merge.pTSchema == NULL) { - return code; - } - - code = tsdbRowMergerGetRow(&merge, &pTSRow); + code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2442,7 +2431,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tsdbRowMergerClear(&merge); + tsdbRowMergerClear_rv(pMerger); return code; } -- GitLab