diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ec4b438b2b2a7951528d0b972863efd27960a9ff..5dd5ced72d1eeda1948ea1b4794a3a3da462d145 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -147,7 +147,7 @@ struct STsdbReader { static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader); static TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader); -static int32_t doLoadRowsOfIdenticalTsInFileBlock(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger); +static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger); static int32_t doLoadRowsOfIdenticalTsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader); static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); @@ -829,7 +829,7 @@ static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal, SBlockLoadSuppInfo* pSup) { if (IS_VAR_DATA_TYPE(pColVal->type)) { - if (pColVal->isNull) { + if (pColVal->isNull || pColVal->isNone) { colDataAppendNULL(pColInfoData, rowIndex); } else { varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData); @@ -2096,58 +2096,83 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* return code; } -static int32_t doMergeBufFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, - STSRow* pTSRow, STbDataIter* pIter, bool* hasVal, int64_t key, - SFileDataBlockInfo* pFBlock, SBlock* pBlock) { +static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, + STSRow* pTSRow, STbDataIter* pIter, bool* hasVal, int64_t key) { SRowMerger merge = {0}; SBlockData* pBlockData = &pReader->status.fileBlockData; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; TSDBKEY k = TSDBROW_KEY(pRow); - if (key <= k.ts) { - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - tRowMergerInit(&merge, &fRow, pReader->pSchema); + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - doLoadRowsOfIdenticalTsInFileBlock(pBlockData, pBlockScanInfo, pReader, &merge); + // ascending order traverse + if (ASCENDING_TRAVERSE(pReader->order)) { + if (key < k.ts) { + tRowMergerInit(&merge, &fRow, pReader->pSchema); + + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + tRowMergerGetRow(&merge, &pTSRow); + } else if (k.ts < key) { // k.ts < key + doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, hasVal, &pTSRow, pReader); + } else { // k.ts == key, ascending order: file block ----> imem rows -----> mem rows + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - if (k.ts == key) { tRowMerge(&merge, pRow); doLoadRowsOfIdenticalTsInBuf(pIter, hasVal, k.ts, &merge, pReader); + + tRowMergerGetRow(&merge, &pTSRow); } + } else { // descending order scan + if (key < k.ts) { + doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, hasVal, &pTSRow, pReader); + } else if (k.ts < key) { + tRowMergerInit(&merge, &fRow, pReader->pSchema); - tRowMergerGetRow(&merge, &pTSRow); - } else { // k.ts < key - doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, hasVal, &pTSRow, pReader); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + tRowMergerGetRow(&merge, &pTSRow); + } else { // descending order: mem rows -----> imem rows ------> file block + updateSchema(pRow, pBlockScanInfo->uid, pReader); + + tRowMergerInit(&merge, pRow, pReader->pSchema); + doLoadRowsOfIdenticalTsInBuf(pIter, hasVal, k.ts, &merge, pReader); + + tRowMerge(&merge, &fRow); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + + tRowMergerGetRow(&merge, &pTSRow); + } } + tRowMergerClear(&merge); doAppendOneRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } -static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { +static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { + SRowMerger merge = {0}; + STSRow* pTSRow = NULL; + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SBlockData* pBlockData = &pReader->status.fileBlockData; - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); - SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx); - - SRowMerger merge = {0}; - STSRow* pTSRow = NULL; + TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); + TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; - TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); - TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); - if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal) { - TSDBKEY k = TSDBROW_KEY(pRow); - TSDBKEY ik = TSDBROW_KEY(piRow); + uint64_t uid = pBlockScanInfo->uid; + TSDBKEY k = TSDBROW_KEY(pRow); + TSDBKEY ik = TSDBROW_KEY(piRow); + + if (ASCENDING_TRAVERSE(pReader->order)) { // [1&2] key <= [k.ts && ik.ts] if (key <= k.ts && key <= ik.ts) { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); tRowMergerInit(&merge, &fRow, pReader->pSchema); - doLoadRowsOfIdenticalTsInFileBlock(pBlockData, pBlockScanInfo, pReader, &merge); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); if (ik.ts == key) { tRowMerge(&merge, piRow); @@ -2161,12 +2186,14 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI tRowMergerGetRow(&merge, &pTSRow); doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + return TSDB_CODE_SUCCESS; } else { // key > ik.ts || key > k.ts + ASSERT(key != ik.ts); + // [3] ik.ts < key <= k.ts // [4] ik.ts < k.ts <= key if (ik.ts < k.ts) { - doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, &pTSRow, - pReader); + doMergeMultiRows(piRow, uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, &pTSRow, pReader); doAppendOneRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } @@ -2174,39 +2201,116 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI // [5] k.ts < key <= ik.ts // [6] k.ts < ik.ts <= key if (k.ts < ik.ts) { - doMergeMultiRows(pRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, &pTSRow, - pReader); + doMergeMultiRows(pRow, uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, &pTSRow, pReader); doAppendOneRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } // [7] k.ts == ik.ts < key if (k.ts == ik.ts) { + ASSERT(key > ik.ts && key > k.ts); + doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow); doAppendOneRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } } + } else { // descending order scan + // [1/2] k.ts >= ik.ts && k.ts >= key + if (k.ts >= ik.ts && k.ts >= key) { + updateSchema(pRow, uid, pReader); + + tRowMergerInit(&merge, pRow, pReader->pSchema); + doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, key, &merge, pReader); + + if (ik.ts == k.ts) { + tRowMerge(&merge, piRow); + doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, key, &merge, pReader); + } + + if (k.ts == key) { + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + tRowMerge(&merge, &fRow); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + } + + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + return TSDB_CODE_SUCCESS; + } else { + ASSERT(ik.ts != k.ts); // this case has been included in the previous if branch + + // [3] ik.ts > k.ts >= Key + // [4] ik.ts > key >= k.ts + if (ik.ts > key) { + doMergeMultiRows(piRow, uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, &pTSRow, pReader); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + return TSDB_CODE_SUCCESS; + } + + // [5] key > ik.ts > k.ts + // [6] key > k.ts > ik.ts + if (key > ik.ts) { + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + tRowMergerInit(&merge, &fRow, pReader->pSchema); + + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + return TSDB_CODE_SUCCESS; + } + + //[7] key = ik.ts > k.ts + if (key == ik.ts) { + doMergeMultiRows(piRow, uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, &pTSRow, pReader); + + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + tRowMerge(&merge, &fRow); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + return TSDB_CODE_SUCCESS; + } + } + } + + ASSERT(0); +} + +static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + SBlockData* pBlockData = &pReader->status.fileBlockData; + + SRowMerger merge = {0}; + STSRow* pTSRow = NULL; + + int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; + TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); + TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); + + if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal) { + return doMergeThreeLevelRows(pReader, pBlockScanInfo); } else { + // imem + file if (pBlockScanInfo->imemHasVal) { - return doMergeBufFileRows(pReader, pBlockScanInfo, piRow, pTSRow, pBlockScanInfo->iiter, - &pBlockScanInfo->imemHasVal, key, pFBlock, pBlock); + return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, pTSRow, pBlockScanInfo->iiter, + &pBlockScanInfo->imemHasVal, key); } - if (pBlockScanInfo->memHasVal) { // pBlockScanInfo->memHasVal != NULL - return doMergeBufFileRows(pReader, pBlockScanInfo, pRow, pTSRow, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, - key, pFBlock, pBlock); + // mem + file + if (pBlockScanInfo->memHasVal) { + return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, pTSRow, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, + key); } - // imem & mem are all empty + // imem & mem are all empty, only file exist TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); tRowMergerInit(&merge, &fRow, pReader->pSchema); - doLoadRowsOfIdenticalTsInFileBlock(pBlockData, pBlockScanInfo, pReader, &merge); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMergerGetRow(&merge, &pTSRow); doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + return TSDB_CODE_SUCCESS; } - - return TSDB_CODE_SUCCESS; } static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { @@ -2671,39 +2775,31 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn return TSDB_CODE_SUCCESS; } -int32_t doLoadRowsOfIdenticalTsInFileBlock(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger) { +int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; bool asc = ASCENDING_TRAVERSE(pReader->order); int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; int32_t step = asc ? 1 : -1; - if (asc) { - pDumpInfo->rowIndex += step; - if (pDumpInfo->rowIndex <= pBlockData->nRow - 1) { - pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); - } + pDumpInfo->rowIndex += step; + if (pDumpInfo->rowIndex <= pBlockData->nRow - 1) { + pDumpInfo->rowIndex = + doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); + } - // all rows are consumed, let's try next file block - if (pDumpInfo->rowIndex >= pBlockData->nRow && asc) { - while (1) { - CHECK_FILEBLOCK_STATE st; + // all rows are consumed, let's try next file block + if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) { + while (1) { + CHECK_FILEBLOCK_STATE st; - SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); - SBlock* pCurrentBlock = taosArrayGet(pScanInfo->pBlockList, pFileBlockInfo->tbBlockIdx); - checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st); - if (st == CHECK_FILEBLOCK_QUIT) { - break; - } + SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); + SBlock* pCurrentBlock = taosArrayGet(pScanInfo->pBlockList, pFileBlockInfo->tbBlockIdx); + checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st); + if (st == CHECK_FILEBLOCK_QUIT) { + break; } } - } else { // last row of current block, check if current block is overlapped with previous neighbor block - pDumpInfo->rowIndex += step; - // bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo); - // if (overlap) { // load next block - // ASSERT(0); - // } - // } } return TSDB_CODE_SUCCESS; @@ -2736,15 +2832,24 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo *pBlo TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); - ASSERT(k.ts == ik.ts); - updateSchema(piRow, pBlockScanInfo->uid, pReader); + if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem + updateSchema(piRow, pBlockScanInfo->uid, pReader); + + tRowMergerInit(&merge, piRow, pReader->pSchema); + doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); - tRowMergerInit(&merge, piRow, pReader->pSchema); - doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); + tRowMerge(&merge, pRow); + doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); + } else { + updateSchema(pRow, pBlockScanInfo->uid, pReader); - tRowMerge(&merge, pRow); - doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); + tRowMergerInit(&merge, pRow, pReader->pSchema); + doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->memHasVal, ik.ts, &merge, pReader); + + tRowMerge(&merge, piRow); + doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->imemHasVal, k.ts, &merge, pReader); + } tRowMergerGetRow(&merge, pTSRow); } @@ -2753,20 +2858,19 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); - TSDBKEY k = {.ts = TSKEY_INITIAL_VAL}; - TSDBKEY ik = {.ts = TSKEY_INITIAL_VAL}; - if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal) { - k = TSDBROW_KEY(pRow); - ik = TSDBROW_KEY(piRow); + TSDBKEY k = TSDBROW_KEY(pRow); + TSDBKEY ik = TSDBROW_KEY(piRow); - if (ik.ts <= k.ts) { - doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); - return TSDB_CODE_SUCCESS; - } else { // k.ts < ik.ts + if (ik.ts < k.ts) { // ik.ts < k.ts + doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pTSRow, pReader); + } else if (k.ts < ik.ts) { doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pTSRow, pReader); - return TSDB_CODE_SUCCESS; + } else { // ik.ts == k.ts + doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); } + + return TSDB_CODE_SUCCESS; } if (pBlockScanInfo->memHasVal) {