From 1cfd3e74f94a57838755e80c7f51d50ddf631274 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 1 Sep 2022 11:56:56 +0800 Subject: [PATCH] enh(query): support merge multiple last files. --- source/dnode/vnode/src/inc/tsdb.h | 25 ++ source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 246 +++++++++++++++++--- source/dnode/vnode/src/tsdb/tsdbRead.c | 137 +++++------ 3 files changed, 297 insertions(+), 111 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 968cc7f297..bca180b64b 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -64,6 +64,7 @@ typedef struct STsdbReadSnap STsdbReadSnap; typedef struct SBlockInfo SBlockInfo; typedef struct SSmaInfo SSmaInfo; typedef struct SBlockCol SBlockCol; +typedef struct SVersionRange SVersionRange; #define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) #define TSDB_MAX_SUBBLOCKS 8 @@ -306,6 +307,12 @@ size_t tsdbCacheGetCapacity(SVnode *pVnode); int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema); +struct SLDataIter; +int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iLast, int8_t backward, uint64_t uid, + STimeWindow *pTimeWindow, SVersionRange *pRange); +void tLDataIterClose(struct SLDataIter *pIter); +bool tLDataIterNextRow(struct SLDataIter *pIter); + // structs ======================= struct STsdbFS { SDelFile *pDelFile; @@ -329,6 +336,11 @@ struct TSDBKEY { TSKEY ts; }; +struct SVersionRange { + uint64_t minVer; + uint64_t maxVer; +}; + typedef struct SMemSkipListNode SMemSkipListNode; struct SMemSkipListNode { int8_t level; @@ -626,6 +638,19 @@ typedef struct { TSDBROW row; } SRowInfo; +typedef struct SMergeTree { + int8_t backward; + SRBTreeNode *pNode; + SRBTree rbt; + struct SLDataIter *pIter; + SDataFReader* pLFileReader; +} SMergeTree; + +void tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader, uint64_t uid, STimeWindow* pTimeWindow, SVersionRange* pVerRange); +void tMergeTreeAddIter(SMergeTree *pMTree, struct SLDataIter *pIter); +bool tMergeTreeNext(SMergeTree* pMTree); +TSDBROW tMergeTreeGetRow(SMergeTree* pMTree); + // ========== inline functions ========== static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { TSDBKEY *pKey1 = (TSDBKEY *)p1; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index af92908e11..bf0e41f800 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -15,12 +15,12 @@ #include "tsdb.h" -// SLDataIter ================================================= -typedef struct { - SRBTreeNode node; - SBlockL *pBlockL; - SRowInfo *pRowInfo; +#define INITIAL_IROW_INDEX (-1) +// SLDataIter ================================================= +typedef struct SLDataIter { + SRBTreeNode node; + SBlockL *pBlockL; SDataFReader *pReader; int32_t iLast; int8_t backward; @@ -29,31 +29,54 @@ typedef struct { SBlockData bData; int32_t iRow; SRowInfo rInfo; + uint64_t uid; + STimeWindow timeWindow; + SVersionRange verRange; } SLDataIter; -int32_t tLDataIterOpen(SLDataIter *pIter, SDataFReader *pReader, int32_t iLast, int8_t backward) { +int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iLast, int8_t backward, uint64_t uid, + STimeWindow *pTimeWindow, SVersionRange *pRange) { int32_t code = 0; + *pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); - pIter->pReader = pReader; - pIter->iLast = iLast; - pIter->backward = backward; - - pIter->aBlockL = taosArrayInit(0, sizeof(SBlockL)); - if (pIter->aBlockL == NULL) { + (*pIter)->uid = uid; + (*pIter)->timeWindow = *pTimeWindow; + (*pIter)->verRange = *pRange; + (*pIter)->pReader = pReader; + (*pIter)->iLast = iLast; + (*pIter)->backward = backward; + (*pIter)->aBlockL = taosArrayInit(0, sizeof(SBlockL)); + if ((*pIter)->aBlockL == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - code = tBlockDataCreate(&pIter->bData); - if (code) goto _exit; + code = tBlockDataCreate(&(*pIter)->bData); + if (code) { + goto _exit; + } - code = tsdbReadBlockL(pReader, iLast, pIter->aBlockL); - if (code) goto _exit; + code = tsdbReadBlockL(pReader, iLast, (*pIter)->aBlockL); + if (code) { + goto _exit; + } - if (backward) { - pIter->iBlockL = taosArrayGetSize(pIter->aBlockL) - 1; - } else { - pIter->iBlockL = 0; + size_t size = taosArrayGetSize((*pIter)->aBlockL); + + // find the start block + // todo handle the desc + int32_t index = -1; + for(int32_t i = 0; i < size; ++i) { + SBlockL *p = taosArrayGet((*pIter)->aBlockL, i); + if (p->minUid <= uid && p->maxUid >= uid) { + index = i; + break; + } + } + + (*pIter)->iBlockL = index; + if (index != -1) { + (*pIter)->pBlockL = taosArrayGet((*pIter)->aBlockL, (*pIter)->iBlockL); } _exit: @@ -74,15 +97,93 @@ void tLDataIterNextBlock(SLDataIter *pIter) { pIter->iBlockL++; } - if (pIter->iBlockL >= 0 && pIter->iBlockL < taosArrayGetSize(pIter->aBlockL)) { - pIter->pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL); - } else { + // todo handle desc order check. + int32_t index = -1; + size_t size = taosArrayGetSize(pIter->aBlockL); + for(int32_t i = pIter->iBlockL; i < size; ++i) { + SBlockL *p = taosArrayGet(pIter->aBlockL, i); + if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) { + index = i; + break; + } + + if (p->minUid > pIter->uid) { + break; + } + } + + if (index == -1) { pIter->pBlockL = NULL; + } else { + pIter->pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL); + } +} + +static void findNextValidRow(SLDataIter* pIter) { + int32_t step = pIter->backward? -1:1; + + bool hasVal = false; + int32_t i = pIter->iRow; + for (; i < pIter->bData.nRow && i >= 0; i += step) { + if (pIter->bData.aUid != NULL) { + if (!pIter->backward) { + if (pIter->bData.aUid[i] < pIter->uid) { + continue; + } else if (pIter->bData.aUid[i] > pIter->uid) { + break; + } + } else { + if (pIter->bData.aUid[i] > pIter->uid) { + continue; + } else if (pIter->bData.aUid[i] < pIter->uid) { + break; + } + } + } + + int64_t ts = pIter->bData.aTSKEY[i]; + if (ts < pIter->timeWindow.skey) { + continue; + } + + int64_t ver = pIter->bData.aVersion[i]; + if (ver < pIter->verRange.minVer) { + continue; + } + + // no data any more, todo opt handle desc case + if (ts > pIter->timeWindow.ekey) { + continue; + } + + // todo opt handle desc case + if (ver > pIter->verRange.maxVer) { + continue; + } + + // todo handle delete soon +#if 0 + TSDBKEY k = {.ts = ts, .version = ver}; + if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) { + continue; + } +#endif + + hasVal = true; + break; } + + pIter->iRow = (hasVal)? i:-1; } -int32_t tLDataIterNextRow(SLDataIter *pIter) { +bool tLDataIterNextRow(SLDataIter *pIter) { int32_t code = 0; + int32_t step = pIter->backward? -1:1; + + // no qualified last file block in current file, no need to fetch row + if (pIter->pBlockL == NULL) { + return false; + } int32_t iBlockL = pIter->iBlockL; if (pIter->backward) { @@ -91,18 +192,38 @@ int32_t tLDataIterNextRow(SLDataIter *pIter) { tLDataIterNextBlock(pIter); } } else { - pIter->iRow++; - if (pIter->iRow >= pIter->bData.nRow) { - pIter->iBlockL++; + if (pIter->bData.nRow == 0 && pIter->pBlockL != NULL) { // current block not loaded yet + code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, pIter->pBlockL, &pIter->bData); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } + + pIter->iRow = (pIter->backward)? pIter->bData.nRow:-1; + } + + pIter->iRow += step; + findNextValidRow(pIter); + + if (pIter->iRow >= pIter->bData.nRow || pIter->iRow < 0) { tLDataIterNextBlock(pIter); + if (pIter->pBlockL == NULL) { // no more data + goto _exit; + } } } if (iBlockL != pIter->iBlockL) { if (pIter->pBlockL) { code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, pIter->pBlockL, &pIter->bData); - if (code) goto _exit; - pIter->iRow = 0; + if (code) { + goto _exit; + } + + pIter->iRow = pIter->backward? (pIter->bData.nRow-1):0; + findNextValidRow(pIter); + if (pIter->iRow >= pIter->bData.nRow || pIter->iRow < 0) { + // todo try next block + } } else { // no more data goto _exit; @@ -114,7 +235,11 @@ int32_t tLDataIterNextRow(SLDataIter *pIter) { pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); _exit: - return code; + if (code != TSDB_CODE_SUCCESS) { + return false; + } else { + return pIter->pBlockL != NULL; + } } SRowInfo *tLDataIterGet(SLDataIter *pIter) { @@ -123,12 +248,6 @@ SRowInfo *tLDataIterGet(SLDataIter *pIter) { } // SMergeTree ================================================= -typedef struct { - int8_t backward; - SRBTreeNode *pNode; - SRBTree rbt; -} SMergeTree; - static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { SLDataIter *pIter1 = (SLDataIter *)p1; SLDataIter *pIter2 = (SLDataIter *)p2; @@ -139,10 +258,61 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { return 0; } -void tMergeTreeOpen(SMergeTree *pMTree, int8_t backward) { +void tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader, uint64_t uid, STimeWindow* pTimeWindow, SVersionRange* pVerRange) { pMTree->backward = backward; pMTree->pNode = NULL; + pMTree->pIter = NULL; tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn); + + struct SLDataIter* pIterList[TSDB_DEFAULT_LAST_FILE] = {0}; + for(int32_t i = 0; i < pFReader->pSet->nLastF; ++i) { // open all last file + /*int32_t code = */tLDataIterOpen(&pIterList[i], pFReader, i, 0, uid, pTimeWindow, pVerRange); + bool hasVal = tLDataIterNextRow(pIterList[i]); + if (hasVal) { + tMergeTreeAddIter(pMTree, pIterList[i]); + } + } } -void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); } \ No newline at end of file +void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); } + +bool tMergeTreeNext(SMergeTree* pMTree) { + int32_t code = TSDB_CODE_SUCCESS; + if (pMTree->pIter) { + SLDataIter *pIter = pMTree->pIter; + + bool hasVal = tLDataIterNextRow(pIter); + if (!hasVal) { + pMTree->pIter = NULL; + } + + // compare with min in RB Tree + pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt); + if (pMTree->pIter && pIter) { + int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->rInfo.row, &pIter->rInfo.row); + if (c > 0) { + tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter); + pMTree->pIter = NULL; + } else { + ASSERT(c); + } + } + } + + if (pMTree->pIter == NULL) { + pMTree->pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt); + if (pMTree->pIter) { + tRBTreeDrop(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter); + } + } + + return pMTree->pIter != NULL; +} + +TSDBROW tMergeTreeGetRow(SMergeTree* pMTree) { + return pMTree->pIter->rInfo.row; +} + +void tMergeTreeClose(SMergeTree* pMTree) { + +} diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 5ff8fd9674..b8c664cddb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -83,11 +83,6 @@ typedef struct SBlockLoadSuppInfo { char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. } SBlockLoadSuppInfo; -typedef struct SVersionRange { - uint64_t minVer; - uint64_t maxVer; -} SVersionRange; - typedef struct SLastBlockReader { SArray* pBlockL; int32_t currentBlockIndex; @@ -96,7 +91,7 @@ typedef struct SLastBlockReader { SVersionRange verRange; int32_t order; uint64_t uid; - int16_t* rowIndex; // row index ptr, usually from the STableBlockScanInfo->indexInBlockL + SMergeTree mergeTree; } SLastBlockReader; typedef struct SFilesetIter { @@ -352,7 +347,6 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, pLReader->order = pReader->order; pLReader->window = pReader->window; pLReader->verRange = pReader->verRange; - pLReader->currentBlockIndex = -1; int32_t code = tBlockDataCreate(&pLReader->lastBlockData); if (code != TSDB_CODE_SUCCESS) { @@ -1346,7 +1340,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } if (minKey == tsLast) { - TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); + TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); if (init) { tRowMerge(&merge, &fRow1); } else { @@ -1375,7 +1369,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } if (minKey == tsLast) { - TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); + TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); if (init) { tRowMerge(&merge, &fRow1); } else { @@ -1407,14 +1401,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, bool mergeBlockData) { - SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; - int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); + int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); STSRow* pTSRow = NULL; SRowMerger merge = {0}; - TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); - + TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); tRowMergerInit(&merge, &fRow, pReader->pSchema); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); @@ -1548,7 +1540,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } if (minKey == tsLast) { - TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); + TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); if (init) { tRowMerge(&merge, &fRow1); } else { @@ -1598,7 +1590,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } if (minKey == tsLast) { - TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); + TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); if (init) { tRowMerge(&merge, &fRow1); } else { @@ -1803,10 +1795,13 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } -static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, int16_t* startPos) { - pLastBlockReader->uid = uid; - pLastBlockReader->rowIndex = startPos; +static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, int16_t* startPos, SDataFReader* pFReader) { + // the last block reader has been initialized for this table. + if (pLastBlockReader->uid == uid) { + return true; + } + pLastBlockReader->uid = uid; if (*startPos == -1) { if (ASCENDING_TRAVERSE(pLastBlockReader->order)) { // do nothing @@ -1814,19 +1809,24 @@ static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid *startPos = pLastBlockReader->lastBlockData.nRow; } } + + /*int32_t code = */ tMergeTreeOpen(&pLastBlockReader->mergeTree, (pLastBlockReader->order == TSDB_ORDER_DESC), + pFReader, uid, &pLastBlockReader->window, &pLastBlockReader->verRange); + bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); + return hasVal; } static void setAllRowsChecked(SLastBlockReader* pLastBlockReader) { - *pLastBlockReader->rowIndex = ALL_ROWS_CHECKED_INDEX; +// *pLastBlockReader->rowIndex = ALL_ROWS_CHECKED_INDEX; } static bool nextRowInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) { - bool asc = ASCENDING_TRAVERSE(pLastBlockReader->order); - int32_t step = (asc) ? 1 : -1; - if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) { - return false; - } +// if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) { +// return false; +// } + return tMergeTreeNext(&pLastBlockReader->mergeTree); +#if 0 *(pLastBlockReader->rowIndex) += step; SBlockData* pBlockData = &pLastBlockReader->lastBlockData; @@ -1879,20 +1879,17 @@ static bool nextRowInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc // set all data is consumed in last block setAllRowsChecked(pLastBlockReader); return false; +#endif } static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { - SBlockData* pBlockData = &pLastBlockReader->lastBlockData; - return pBlockData->aTSKEY[*pLastBlockReader->rowIndex]; + TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + TSDBKEY key = TSDBROW_KEY(&row); + return key.ts; } static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { - if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) { - return false; - } - - ASSERT(pLastBlockReader->lastBlockData.nRow > 0); - return true; + return pLastBlockReader->mergeTree.pIter != NULL; } int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, @@ -1985,6 +1982,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } } + bool hasBlockLData = hasDataInLastBlock(pLastBlockReader); // no data in last block and block, no need to proceed. @@ -2248,6 +2246,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) { return TSDB_CODE_SUCCESS; } +#if 0 static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { SArray* pBlocks = pLastBlockReader->pBlockL; @@ -2308,6 +2307,7 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable return TSDB_CODE_SUCCESS; } +#endif static int32_t uidComparFunc(const void* p1, const void* p2) { uint64_t pu1 = *(uint64_t*)p1; @@ -2401,26 +2401,14 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { while (1) { // load the last data block of current table STableBlockScanInfo* pScanInfo = pStatus->pTableIter; - code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - if (pLastBlockReader->currentBlockIndex != -1) { - initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); - int32_t index = pScanInfo->indexInBlockL; - - if (index == INITIAL_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) { - bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo); - if (!hasData) { // current table does not have rows in last block, try next table - bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); - if (!hasNexTable) { - return TSDB_CODE_SUCCESS; - } - continue; - } - } - } else { // no data in last block, try next table + // code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader); + // if (code != TSDB_CODE_SUCCESS) { + // return code; + // } + + bool hasVal = + initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL, pReader->pFileReader); + if (!hasVal) { bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); if (!hasNexTable) { return TSDB_CODE_SUCCESS; @@ -2428,6 +2416,26 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { continue; } + // int32_t index = pScanInfo->indexInBlockL; + + // if (index == INITIAL_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) { + // bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo); + // if (!hasData) { // current table does not have rows in last block, try next table + // bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); + // if (!hasNexTable) { + // return TSDB_CODE_SUCCESS; + // } + // continue; + // } + // } + // } else { // no data in last block, try next table + // bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); + // if (!hasNexTable) { + // return TSDB_CODE_SUCCESS; + // } + // continue; + // } + code = doBuildDataBlock(pReader); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2446,7 +2454,6 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { } static int32_t doBuildDataBlock(STsdbReader* pReader) { - TSDBKEY key = {0}; int32_t code = TSDB_CODE_SUCCESS; SBlock* pBlock = NULL; @@ -2466,22 +2473,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { pBlock = getCurrentBlock(pBlockIter); } - { - key = getCurrentKeyInBuf(pScanInfo, pReader); - - // load the last data block of current table - code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - // note: the lastblock may be null here - initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); - if (pScanInfo->indexInBlockL == INITIAL_ROW_INDEX_VAL || - pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) { - bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo); - } - } + TSDBKEY key = getCurrentKeyInBuf(pScanInfo, pReader); if (pBlockInfo == NULL) { // build data block from last data file ASSERT(pBlockIter->numOfBlocks == 0); @@ -2594,7 +2586,6 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl } SLastBlockReader* pLReader = pReader->status.fileIter.pLastBlockReader; - pLReader->currentBlockIndex = -1; // set the correct start position according to the query time window initBlockDumpInfo(pReader, pBlockIter); @@ -2660,8 +2651,8 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { bool hasNext = blockIteratorNext(&pReader->status.blockIter); if (hasNext) { // check for the next block in the block accessed order list initBlockDumpInfo(pReader, pBlockIter); - } else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > - 0) { // data blocks in current file are exhausted, let's try the next file now + } else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) { + // data blocks in current file are exhausted, let's try the next file now tBlockDataReset(&pReader->status.fileBlockData); resetDataBlockIterator(pBlockIter, pReader->order); goto _begin; @@ -3024,7 +3015,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc while (nextRowInLastBlock(pLastBlockReader, pScanInfo)) { int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); if (next1 == ts) { - TSDBROW fRow1 = tsdbRowFromBlockData(&pLastBlockReader->lastBlockData, *pLastBlockReader->rowIndex); + TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); tRowMerge(pMerger, &fRow1); } else { break; -- GitLab