diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index bbef24907967df9221c09515038786d8acbcbde1..cb288cecfb2d895f031034c088f0fac9041c0e42 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -182,6 +182,8 @@ static void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIt STsdbReader* pReader, bool* freeTSRow); static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow); +static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader); + static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, STbData* piMemTbData); static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr, @@ -1510,82 +1512,83 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf return TSDB_CODE_SUCCESS; } -#if 0 -static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, - SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) { - SRowMerger merge = {0}; - STSRow* pTSRow = NULL; - SBlockData* pBlockData = &pReader->status.fileBlockData; +static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key, + STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - TSDBKEY k = TSDBROW_KEY(pRow); - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - SArray* pDelList = pBlockScanInfo->delSkyline; - bool freeTSRow = false; - uint64_t uid = pBlockScanInfo->uid; + if (pBlockData->nRow > 0) { + // no last block available, only data block exists + if (pLastBlockReader->lastBlockData.nRow == 0 || (!hasDataInLastBlock(pLastBlockReader))) { + return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); + } + + // row in last file block + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); + ASSERT(ts >= key); + + if (ASCENDING_TRAVERSE(pReader->order)) { + if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist + return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); + } else if (key == ts) { + STSRow* pTSRow = NULL; + SRowMerger merge = {0}; - // ascending order traverse - if (ASCENDING_TRAVERSE(pReader->order)) { - if (key < k.ts) { - // imem & mem are all empty, only file exist - if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { - return TSDB_CODE_SUCCESS; - } else { tRowMergerInit(&merge, &fRow, pReader->pSchema); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - tRowMergerGetRow(&merge, &pTSRow); - freeTSRow = true; - } - } else if (k.ts < key) { // k.ts < key - doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow); - } else { // k.ts == key, ascending order: file block ----> imem rows -----> mem rows - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); - tRowMerge(&merge, pRow); - doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + tRowMergerGetRow(&merge, &pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - tRowMergerGetRow(&merge, &pTSRow); - freeTSRow = true; - } - } else { // descending order scan - if (key < k.ts) { - doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow); - } else if (k.ts < key) { - if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); return TSDB_CODE_SUCCESS; } else { - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - tRowMergerGetRow(&merge, &pTSRow); - freeTSRow = true; + ASSERT(0); + return TSDB_CODE_SUCCESS; } - } else { // descending order: mem rows -----> imem rows ------> file block - STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + } else { // desc order + SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; + TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); - tRowMergerInit(&merge, pRow, pSchema); - doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + STSRow* pTSRow = NULL; + SRowMerger merge = {0}; + tRowMergerInit(&merge, &fRow1, pReader->pSchema); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); - tRowMerge(&merge, &fRow); - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + if (ts == key) { + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + } tRowMergerGetRow(&merge, &pTSRow); - freeTSRow = true; + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); + + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); + return TSDB_CODE_SUCCESS; } - } + } else { // only last block exists + SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; + int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); - tRowMergerClear(&merge); - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid); + STSRow* pTSRow = NULL; + SRowMerger merge = {0}; + + TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); + + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); + tRowMergerGetRow(&merge, &pTSRow); + + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - if (freeTSRow) { taosMemoryFree(pTSRow); + tRowMergerClear(&merge); + return TSDB_CODE_SUCCESS; } - - return TSDB_CODE_SUCCESS; } -#endif - static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { SRowMerger merge = {0}; STSRow* pTSRow = NULL; @@ -1987,10 +1990,35 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) { return false; } + + ASSERT(pLastBlockReader->lastBlockData.nRow > 0); return true; } -// todo refactor +int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader) { + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + + if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { + return TSDB_CODE_SUCCESS; + } else { + STSRow* pTSRow = NULL; + SRowMerger merge = {0}; + + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + tRowMergerGetRow(&merge, &pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); + + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); + return TSDB_CODE_SUCCESS; + } + + return TSDB_CODE_SUCCESS; +} + static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -2007,112 +2035,13 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader); } - // mem + file + // mem + file + last block if (pBlockScanInfo->iter.hasVal) { return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader); } - if (pBlockData->nRow > 0) { - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - - // no last block available, only data block exists - if (pLastBlockReader->lastBlockData.nRow == 0 || (!hasDataInLastBlock(pLastBlockReader))) { - if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { - return TSDB_CODE_SUCCESS; - } else { - STSRow* pTSRow = NULL; - SRowMerger merge = {0}; - - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - tRowMergerGetRow(&merge, &pTSRow); - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - - taosMemoryFree(pTSRow); - tRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; - } - } - - // row in last file block - int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); - ASSERT(ts >= key); - - if (ASCENDING_TRAVERSE(pReader->order)) { - if (key < ts) { - // imem & mem are all empty, only file exist - if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { - return TSDB_CODE_SUCCESS; - } else { - STSRow* pTSRow = NULL; - SRowMerger merge = {0}; - - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - tRowMergerGetRow(&merge, &pTSRow); - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - - taosMemoryFree(pTSRow); - tRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; - } - } else if (key == ts) { - STSRow* pTSRow = NULL; - SRowMerger merge = {0}; - - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); - - tRowMergerGetRow(&merge, &pTSRow); - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - - taosMemoryFree(pTSRow); - tRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; - } else { - ASSERT(0); - return TSDB_CODE_SUCCESS; - } - } else { // desc order - SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; - TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); - - STSRow* pTSRow = NULL; - SRowMerger merge = {0}; - tRowMergerInit(&merge, &fRow1, pReader->pSchema); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); - - if (ts == key) { - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - } - - tRowMergerGetRow(&merge, &pTSRow); - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - - taosMemoryFree(pTSRow); - tRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; - } - } else { // only last block exists - SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; - int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); - - STSRow* pTSRow = NULL; - SRowMerger merge = {0}; - - TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); - - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); - tRowMergerGetRow(&merge, &pTSRow); - - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - - taosMemoryFree(pTSRow); - tRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; - } + // files data blocks + last block + return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData); } } @@ -2137,9 +2066,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { while (1) { // todo check the validate of row in file block + bool hasBlockData = false; { - bool hasBlockData = false; - while (pBlockData->nRow > 0) { // find the first qualified row in data block if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) { hasBlockData = true; @@ -2154,13 +2082,13 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { break; } } + } + + bool hasBlockLData = hasDataInLastBlock(pLastBlockReader); - bool hasBlockLData = hasDataInLastBlock(pLastBlockReader); - - // no data in last block and block, no need to proceed. - if ((hasBlockData == false) && (hasBlockLData == false)) { - break; - } + // no data in last block and block, no need to proceed. + if ((hasBlockData == false) && (hasBlockLData == false)) { + break; } buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); @@ -3224,10 +3152,12 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); - if (ik.ts < k.ts) { // ik.ts < k.ts - doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); - } else if (k.ts < ik.ts) { - doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); + if (ik.ts != k.ts) { + if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts + doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); + } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) { + doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); + } } else { // ik.ts == k.ts doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); *freeTSRow = true; diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 52c73f85f55703607b1da579860574af60dca1ef..f4c42023c8bf191ad1be25fa443d57c9bbeced30 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -80,11 +80,9 @@ struct SqlFunctionCtx; size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); -void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); void initResultRow(SResultRow* pResultRow); void closeResultRow(SResultRow* pResultRow); -bool isResultRowClosed(SResultRow* pResultRow); struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index bf969bf2e4855a12a9abfbe5e67301c5df5f3702..734b63b94dc22d70651f263ab3ce25049f434458 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -31,20 +31,6 @@ void initResultRowInfo(SResultRowInfo* pResultRowInfo) { pResultRowInfo->cur.pageId = -1; } -void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo) { - if (pResultRowInfo == NULL) { - return; - } - - for (int32_t i = 0; i < pResultRowInfo->size; ++i) { - // if (pResultRowInfo->pResult[i]) { - // taosMemoryFreeClear(pResultRowInfo->pResult[i]->key); - // } - } -} - -bool isResultRowClosed(SResultRow* pRow) { return (pRow->closed == true); } - void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; } // TODO refactor: use macro diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9e56d63673ab6b1c9c90b4893d99898abe18e5bb..c3f1c8fbf694f5ade00d9304401e9dda1ee2560a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3635,7 +3635,6 @@ _error: void cleanupBasicInfo(SOptrBasicInfo* pInfo) { assert(pInfo != NULL); - cleanupResultRowInfo(&pInfo->resultRowInfo); pInfo->pRes = blockDataDestroy(pInfo->pRes); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 9eaab6963346b788de4a886812f57db6063f511e..2648e368b240ddbe3ec2cc8a74a92a809926343a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2965,7 +2965,6 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) { taosHashClear(pInfo->aggSup.pResultRowHashTable); clearDiskbasedBuf(pInfo->aggSup.pResultBuf); - cleanupResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo); } @@ -4253,8 +4252,6 @@ static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) { } } clearDiskbasedBuf(pInfo->streamAggSup.pResultBuf); - cleanupResultRowInfo(&pInfo->binfo.resultRowInfo); - initResultRowInfo(&pInfo->binfo.resultRowInfo); } static void removeSessionResults(SHashObj* pHashMap, SArray* pWins) {