提交 db6c660b 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/hzcheng_3.0

...@@ -431,6 +431,12 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd ...@@ -431,6 +431,12 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID; pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
code = tBlockDataInit(&pReader->status.fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
goto _end;
}
pReader->pResBlock = createResBlock(pCond, pReader->capacity); pReader->pResBlock = createResBlock(pCond, pReader->capacity);
if (pReader->pResBlock == NULL) { if (pReader->pResBlock == NULL) {
code = terrno; code = terrno;
...@@ -1200,8 +1206,9 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1200,8 +1206,9 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
} }
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
STSRow* pTSRow, SIterInfo* pIter, int64_t key) { SIterInfo* pIter, int64_t key) {
SRowMerger merge = {0}; SRowMerger merge = {0};
STSRow* pTSRow = NULL;
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
...@@ -1250,6 +1257,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1250,6 +1257,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMergerClear(&merge); tRowMergerClear(&merge);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow); doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
taosMemoryFree(pTSRow);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1411,8 +1420,6 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -1411,8 +1420,6 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
STSRow* pTSRow = NULL;
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
...@@ -1422,23 +1429,27 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -1422,23 +1429,27 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
} else { } else {
// imem + file // imem + file
if (pBlockScanInfo->iiter.hasVal) { if (pBlockScanInfo->iiter.hasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, pTSRow, &pBlockScanInfo->iiter, key); return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key);
} }
// mem + file // mem + file
if (pBlockScanInfo->iter.hasVal) { if (pBlockScanInfo->iter.hasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, pTSRow, &pBlockScanInfo->iter, key); return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key);
} }
// imem & mem are all empty, only file exist // imem & mem are all empty, only file exist
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
STSRow* pTSRow = NULL;
SRowMerger merge = {0}; SRowMerger merge = {0};
tRowMergerInit(&merge, &fRow, pReader->pSchema); tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow); tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow); doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
...@@ -1716,7 +1727,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -1716,7 +1727,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader); TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) { if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
tBlockDataInit(&pStatus->fileBlockData); tBlockDataReset(&pStatus->fileBlockData);
tBlockDataClearData(&pStatus->fileBlockData);
code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData); code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -2160,6 +2172,8 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn ...@@ -2160,6 +2172,8 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step); setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
// 3. load the neighbor block, and set it to be the currently accessed file data block // 3. load the neighbor block, and set it to be the currently accessed file data block
tBlockDataReset(&pStatus->fileBlockData);
tBlockDataClearData(&pStatus->fileBlockData);
int32_t code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData); int32_t code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -2563,6 +2577,7 @@ void tsdbReaderClose(STsdbReader* pReader) { ...@@ -2563,6 +2577,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
} }
} }
taosMemoryFree(pSupInfo->buildBuf); taosMemoryFree(pSupInfo->buildBuf);
tBlockDataClear(&pReader->status.fileBlockData, true);
cleanupDataBlockIterator(&pReader->status.blockIter); cleanupDataBlockIterator(&pReader->status.blockIter);
...@@ -2760,13 +2775,9 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { ...@@ -2760,13 +2775,9 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
int32_t code = tBlockDataInit(&pStatus->fileBlockData); tBlockDataReset(&pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) { tBlockDataClearData(&pStatus->fileBlockData);
terrno = code; int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
return NULL;
}
code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tBlockDataClear(&pStatus->fileBlockData, 1); tBlockDataClear(&pStatus->fileBlockData, 1);
...@@ -2775,7 +2786,6 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { ...@@ -2775,7 +2786,6 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
} }
copyBlockDataToSDataBlock(pReader, pBlockScanInfo); copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
tBlockDataClear(&pStatus->fileBlockData, 1);
return pReader->pResBlock->pDataBlock; return pReader->pResBlock->pDataBlock;
} }
...@@ -2872,9 +2882,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa ...@@ -2872,9 +2882,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
while (true) { while (true) {
if (hasNext) { if (hasNext) {
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SBlock* pBlock = getCurrentBlock(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
int32_t numOfRows = pBlock->nRow; int32_t numOfRows = pBlock->nRow;
pTableBlockInfo->totalRows += numOfRows; pTableBlockInfo->totalRows += numOfRows;
...@@ -2895,7 +2903,6 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa ...@@ -2895,7 +2903,6 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
pTableBlockInfo->blockRowsHisto[bucketIndex]++; pTableBlockInfo->blockRowsHisto[bucketIndex]++;
hasNext = blockIteratorNext(&pStatus->blockIter); hasNext = blockIteratorNext(&pStatus->blockIter);
} else { } else {
code = initForFirstBlockInFile(pReader, pBlockIter); code = initForFirstBlockInFile(pReader, pBlockIter);
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
......
...@@ -1022,6 +1022,10 @@ void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear) { ...@@ -1022,6 +1022,10 @@ void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear) {
tFree((uint8_t *)pBlockData->aTSKEY); tFree((uint8_t *)pBlockData->aTSKEY);
taosArrayDestroy(pBlockData->aIdx); taosArrayDestroy(pBlockData->aIdx);
taosArrayDestroyEx(pBlockData->aColData, deepClear ? tColDataClear : NULL); taosArrayDestroyEx(pBlockData->aColData, deepClear ? tColDataClear : NULL);
pBlockData->aColData = NULL;
pBlockData->aIdx = NULL;
pBlockData->aTSKEY = NULL;
pBlockData->aVersion = NULL;
} }
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema) { int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema) {
......
...@@ -3209,9 +3209,8 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInf ...@@ -3209,9 +3209,8 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInf
if (taosFillHasMoreResults(pInfo->pFillInfo)) { if (taosFillHasMoreResults(pInfo->pFillInfo)) {
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows; int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows;
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows); taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows);
if (pInfo->pRes->info.rows > pResultInfo->threshold) { pInfo->pRes->info.groupId = pInfo->curGroupId;
return; return;
}
} }
// handle the cached new group data block // handle the cached new group data block
...@@ -3230,7 +3229,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { ...@@ -3230,7 +3229,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
blockDataCleanup(pResBlock); blockDataCleanup(pResBlock);
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo); doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows > pResultInfo->threshold || pResBlock->info.rows > 0) { if (pResBlock->info.rows > 0) {
pResBlock->info.groupId = pInfo->curGroupId; pResBlock->info.groupId = pInfo->curGroupId;
return pResBlock; return pResBlock;
} }
......
...@@ -292,7 +292,9 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -292,7 +292,9 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// do apply filter // do apply filter
doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL); doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL);
if (pFinalRes->info.rows > 0 || pRes->info.rows == 0) {
// when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
break; break;
} }
} else { } else {
......
...@@ -131,7 +131,7 @@ if $rows != 172798 then ...@@ -131,7 +131,7 @@ if $rows != 172798 then
endi endi
sql select t1,t1,count(*),tbname,t1,t1,tbname from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' partition by tbname, t1 interval(1s) fill(NULL) slimit 1 soffset 0 limit 250000 offset 1 sql select t1,t1,count(*),tbname,t1,t1,tbname from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' partition by tbname, t1 interval(1s) fill(NULL) slimit 1 soffset 0 limit 250000 offset 1
if $rows != 85648 then if $rows != 86399 then
return -1 return -1
endi endi
...@@ -146,7 +146,7 @@ if $rows != 4 then ...@@ -146,7 +146,7 @@ if $rows != 4 then
endi endi
sql select t1,t1,count(*),tbname,t1,t1,tbname from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' partition by tbname, t1 interval(1s) fill(NULL) slimit 1 soffset 1 limit 250000 offset 1 sql select t1,t1,count(*),tbname,t1,t1,tbname from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' partition by tbname, t1 interval(1s) fill(NULL) slimit 1 soffset 1 limit 250000 offset 1
if $rows != 87150 then if $rows != 86399 then
return -1 return -1
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册