diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 1b162bf4888b9da95884aca01a014157c723bad1..03985654f88bfda5caac87980bea7c53e4f404d7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -431,6 +431,12 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID; + code = tBlockDataInit(&pReader->status.fileBlockData); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + goto _end; + } + pReader->pResBlock = createResBlock(pCond, pReader->capacity); if (pReader->pResBlock == NULL) { code = terrno; @@ -1200,8 +1206,9 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* } static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, - STSRow* pTSRow, SIterInfo* pIter, int64_t key) { + SIterInfo* pIter, int64_t key) { SRowMerger merge = {0}; + STSRow* pTSRow = NULL; SBlockData* pBlockData = &pReader->status.fileBlockData; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -1250,6 +1257,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* tRowMergerClear(&merge); doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + + taosMemoryFree(pTSRow); return TSDB_CODE_SUCCESS; } @@ -1411,8 +1420,6 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SBlockData* pBlockData = &pReader->status.fileBlockData; - STSRow* pTSRow = NULL; - int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); @@ -1422,23 +1429,27 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI } else { // imem + file if (pBlockScanInfo->iiter.hasVal) { - return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, pTSRow, &pBlockScanInfo->iiter, key); + return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key); } // mem + file if (pBlockScanInfo->iter.hasVal) { - return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, pTSRow, &pBlockScanInfo->iter, key); + return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key); } // imem & mem are all empty, only file exist TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + STSRow* pTSRow = NULL; SRowMerger merge = {0}; + tRowMergerInit(&merge, &fRow, pReader->pSchema); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMergerGetRow(&merge, &pTSRow); doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); return TSDB_CODE_SUCCESS; } } @@ -1716,7 +1727,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader); if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) { - tBlockDataInit(&pStatus->fileBlockData); + tBlockDataReset(&pStatus->fileBlockData); + tBlockDataClearData(&pStatus->fileBlockData); code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2160,6 +2172,8 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step); // 3. load the neighbor block, and set it to be the currently accessed file data block + tBlockDataReset(&pStatus->fileBlockData); + tBlockDataClearData(&pStatus->fileBlockData); int32_t code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2563,6 +2577,7 @@ void tsdbReaderClose(STsdbReader* pReader) { } } taosMemoryFree(pSupInfo->buildBuf); + tBlockDataClear(&pReader->status.fileBlockData, true); cleanupDataBlockIterator(&pReader->status.blockIter); @@ -2760,13 +2775,9 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter); STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); - int32_t code = tBlockDataInit(&pStatus->fileBlockData); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return NULL; - } - - code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); + tBlockDataReset(&pStatus->fileBlockData); + tBlockDataClearData(&pStatus->fileBlockData); + int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); if (code != TSDB_CODE_SUCCESS) { tBlockDataClear(&pStatus->fileBlockData, 1); @@ -2775,7 +2786,6 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { } copyBlockDataToSDataBlock(pReader, pBlockScanInfo); - tBlockDataClear(&pStatus->fileBlockData, 1); return pReader->pResBlock->pDataBlock; } @@ -2872,9 +2882,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa while (true) { if (hasNext) { - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); - STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); - SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); + SBlock* pBlock = getCurrentBlock(pBlockIter); int32_t numOfRows = pBlock->nRow; pTableBlockInfo->totalRows += numOfRows; @@ -2895,7 +2903,6 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa pTableBlockInfo->blockRowsHisto[bucketIndex]++; hasNext = blockIteratorNext(&pStatus->blockIter); - } else { code = initForFirstBlockInFile(pReader, pBlockIter); if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 805e49a705ebaadc4dd8447f2beb31174cad6608..3e05b75dd010ec2336115efa8354e2892c07afa7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1022,6 +1022,10 @@ void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear) { tFree((uint8_t *)pBlockData->aTSKEY); taosArrayDestroy(pBlockData->aIdx); taosArrayDestroyEx(pBlockData->aColData, deepClear ? tColDataClear : NULL); + pBlockData->aColData = NULL; + pBlockData->aIdx = NULL; + pBlockData->aTSKEY = NULL; + pBlockData->aVersion = NULL; } int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 962f37a8f7ac339de08809d46d0ae32510f24273..525d7bf33658b906c37159047da31d6b947448c1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3209,9 +3209,8 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInf if (taosFillHasMoreResults(pInfo->pFillInfo)) { int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows; taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows); - if (pInfo->pRes->info.rows > pResultInfo->threshold) { - return; - } + pInfo->pRes->info.groupId = pInfo->curGroupId; + return; } // handle the cached new group data block @@ -3230,7 +3229,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { blockDataCleanup(pResBlock); doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo); - if (pResBlock->info.rows > pResultInfo->threshold || pResBlock->info.rows > 0) { + if (pResBlock->info.rows > 0) { pResBlock->info.groupId = pInfo->curGroupId; return pResBlock; } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 1edc08655159749faf63c2bf957019576d4f622c..53e25c7e9731b7bbef92b85647c36fadf13da257 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -292,7 +292,9 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // do apply filter doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL); - if (pFinalRes->info.rows > 0 || pRes->info.rows == 0) { + + // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint. + if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) { break; } } else { diff --git a/tests/script/tsim/parser/lastrow_query.sim b/tests/script/tsim/parser/lastrow_query.sim index 282761d8208367d6df8cf80ba921a17daa5b6591..be8f089a790b3c13e7eee95931b2b6986a62fe07 100644 --- a/tests/script/tsim/parser/lastrow_query.sim +++ b/tests/script/tsim/parser/lastrow_query.sim @@ -131,7 +131,7 @@ if $rows != 172798 then endi sql select t1,t1,count(*),tbname,t1,t1,tbname from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' partition by tbname, t1 interval(1s) fill(NULL) slimit 1 soffset 0 limit 250000 offset 1 -if $rows != 85648 then +if $rows != 86399 then return -1 endi @@ -146,7 +146,7 @@ if $rows != 4 then endi sql select t1,t1,count(*),tbname,t1,t1,tbname from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' partition by tbname, t1 interval(1s) fill(NULL) slimit 1 soffset 1 limit 250000 offset 1 -if $rows != 87150 then +if $rows != 86399 then return -1 endi