diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 048c64032ca6ca2c018339ab2c1284f7a48524ca..4af8d6e28fbe09cc6769c97a847cd7744d31c2a5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1279,7 +1279,6 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc bool overlapWithlastBlock = false; if (hasDataInLastBlock(pLastBlockReader)) { SBlockL *pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex); -// int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey); } @@ -1358,8 +1357,71 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* return pReader->pMemSchema; } +// todo handle desc +static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, + SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) { + SRowMerger merge = {0}; + STSRow* pTSRow = NULL; + SBlockData* pBlockData = &pReader->status.fileBlockData; + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + + int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); + + TSDBKEY k = TSDBROW_KEY(pRow); + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + + SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; + + int64_t minKey = INT64_MAX; + if (minKey > tsLast) { + minKey = tsLast; + } + + if (minKey > k.ts) { + minKey = k.ts; + } + + if (minKey > key && pBlockData->nRow > 0) { + minKey = key; + } + + // file block ---> last block -----> imem -----> mem + bool init = false; + if (minKey == key) { + init = true; + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + } + + if (minKey == tsLast) { + if (!init) { + init = true; + TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex); + tRowMergerInit(&merge, &fRow1, pReader->pSchema); + } + + doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge); + } + + if (minKey == k.ts) { + if (!init) { + STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + tRowMergerInit(&merge, pRow, pSchema); + } + + doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + } + + tRowMergerGetRow(&merge, &pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); + + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); + return TSDB_CODE_SUCCESS; +} + static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, - SIterInfo* pIter, int64_t key) { + SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) { SRowMerger merge = {0}; STSRow* pTSRow = NULL; SBlockData* pBlockData = &pReader->status.fileBlockData; @@ -1431,6 +1493,85 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return TSDB_CODE_SUCCESS; } +static int32_t doMergeThreeLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { + SRowMerger merge = {0}; + STSRow* pTSRow = NULL; + + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + SArray* pDelList = pBlockScanInfo->delSkyline; + + TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader); + TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader); + ASSERT(pRow != NULL && piRow != NULL); + + SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; + int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); + + int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; + + TSDBKEY k = TSDBROW_KEY(pRow); + TSDBKEY ik = TSDBROW_KEY(piRow); + + int64_t minKey = INT64_MAX; + if (minKey > k.ts) { + minKey = k.ts; + } + + if (minKey > ik.ts) { + minKey = ik.ts; + } + + if (minKey > key) { + minKey = key; + } + + if (minKey > tsLast) { + minKey = tsLast; + } + + // file block ---> last block -----> imem -----> mem + bool init = false; + if (minKey == key) { + init = true; + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + } + + if (minKey == tsLast) { + if (!init) { + init = true; + TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex); + tRowMergerInit(&merge, &fRow1, pReader->pSchema); + } + doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge); + } + + if (minKey == ik.ts) { + if (!init) { + init = true; + STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); + tRowMergerInit(&merge, piRow, pSchema); + } + doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); + } + + if (minKey == k.ts) { + if (!init) { + STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + tRowMergerInit(&merge, pRow, pSchema); + } + doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + } + + tRowMergerGetRow(&merge, &pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); + + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); + return TSDB_CODE_SUCCESS; +} + static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { SRowMerger merge = {0}; STSRow* pTSRow = NULL; @@ -1605,8 +1746,8 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } -static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, STimeWindow* pWin, SVersionRange* pVerRange, - int16_t startPos) { +static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, STimeWindow* pWin, + SVersionRange* pVerRange, int16_t startPos) { pLastBlockReader->uid = uid; pLastBlockReader->window = *pWin; pLastBlockReader->verRange = *pVerRange; @@ -1636,10 +1777,12 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) { // no data any more if (pBlockData->aTSKEY[i] > pLastBlockReader->window.ekey) { + pLastBlockReader->rowIndex = pBlockData->nRow; return false; } if (pBlockData->aVersion[i] > pLastBlockReader->verRange.maxVer) { + pLastBlockReader->rowIndex = pBlockData->nRow; return false; } @@ -1676,25 +1819,21 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader *pLastBlockReader) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - int64_t key = INT64_MIN; - if (pBlockData->nRow > 0) { - key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; - } - + int64_t key = (pBlockData->nRow > 0)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN; TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) { - return doMergeThreeLevelRows(pReader, pBlockScanInfo, pBlockData); + return doMergeThreeLevelRowsRv(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); } else { - // imem + file + // imem + file + last block if (pBlockScanInfo->iiter.hasVal) { - return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key); + return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader); } // mem + file if (pBlockScanInfo->iter.hasVal) { - return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key); + return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader); } if (pBlockData->nRow > 0) { @@ -1704,7 +1843,6 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); if (ts < key) { // save rows in last block SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; - int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); STSRow* pTSRow = NULL; SRowMerger merge = {0}; @@ -1712,7 +1850,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex); tRowMergerInit(&merge, &fRow1, pReader->pSchema); - doMergeRowsInLastBlock(pLastBlockReader, tsLastBlock, &merge); + doMergeRowsInLastBlock(pLastBlockReader, ts, &merge); tRowMergerGetRow(&merge, &pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); @@ -1753,6 +1891,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI } } } else { // only last block exists + // only last block exits SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); @@ -1820,10 +1959,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); - SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter); // currently loaded file data block is consumed - if (pBlockData->nRow > 0 && (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0)) { + if (pBlockData->nRow > 0 && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) { + SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter); setBlockAllDumped(pDumpInfo, pBlock, pReader->order); break; }