From 79020ea4a4000177580f8e390633e342fa4f0f52 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 26 Jul 2023 15:01:20 +0800 Subject: [PATCH] fix: stt and data block merge --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 102 +++++++++++++----------- 1 file changed, 54 insertions(+), 48 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index e96406567a..5e6226d478 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1729,40 +1729,44 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader // row in last file block TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); - + int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); if (ASCENDING_TRAVERSE(pReader->info.order)) { - if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist + if (key < tsLast) { return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); - } else if (key == ts) { - SRow* pTSRow = NULL; - int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); - - TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - tsdbRowMergerAdd(pMerger, pRow1, NULL); + } else if (key > tsLast) { + return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); + } + } else { + if (key > tsLast) { + return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); + } else if (key < tsLast) { + return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); + } + } + if (key == tsLast) { + SRow* pTSRow = NULL; + int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); - code = tsdbRowMergerGetRow(pMerger, &pTSRow); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + tsdbRowMergerAdd(pMerger, pRow1, NULL); - code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); - taosMemoryFree(pTSRow); - tsdbRowMergerClear(pMerger); + code = tsdbRowMergerGetRow(pMerger, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { return code; - } else { // key > ts - return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); } - } else { // desc order - return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true); + + code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); + + taosMemoryFree(pTSRow); + tsdbRowMergerClear(pMerger); + return code; } } else { // only last block exists return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); @@ -2190,7 +2194,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; TSDBROW *pRow = NULL, *piRow = NULL; - int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; + int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : + (ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN); if (pBlockScanInfo->iter.hasVal) { pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); } @@ -2667,16 +2672,32 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { (ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey; code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); } else { - if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->info.order)) { - // only return the rows in last block - int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); - ASSERT(tsLast >= pBlockInfo->record.lastKey); + bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader); + int64_t tsLast = bHasDataInLastBlock ? getCurrentKeyInLastBlock(pLastBlockReader) : INT64_MIN; + if (!bHasDataInLastBlock || ((ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.lastKey < tsLast) || + (!ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.firstKey > tsLast))) { + // whole block is required, return it directly + SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info; + pInfo->rows = pBlockInfo->record.numRow; + pInfo->id.uid = pScanInfo->uid; + pInfo->dataLoad = 0; + pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey}; + setComposedBlockFlag(pReader, false); + setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order); + // update the last key for the corresponding table + pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey; + tsdbDebug("%p uid:%" PRIu64 + " clean file block retrieved from file, global index:%d, " + "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", + pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow, + pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr); + } else { SBlockData* pBData = &pReader->status.fileBlockData; tBlockDataReset(pBData); SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; - tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr); + tsdbDebug("load data in last block firstly %s", pReader->idStr); int64_t st = taosGetTimestampUs(); @@ -2707,23 +2728,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); } - } else { // whole block is required, return it directly - SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info; - pInfo->rows = pBlockInfo->record.numRow; - pInfo->id.uid = pScanInfo->uid; - pInfo->dataLoad = 0; - pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey}; - setComposedBlockFlag(pReader, false); - setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order); - - // update the last key for the corresponding table - pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey; - tsdbDebug("%p uid:%" PRIu64 - " clean file block retrieved from file, global index:%d, " - "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", - pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow, - pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr); } + } return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code; -- GitLab