From c96036a7c42723b6d675a1c336f7b50821037d3f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 21 May 2022 14:08:00 +0800 Subject: [PATCH] fix(query): add null value check before update the block window info --- source/libs/executor/src/timewindowoperator.c | 39 ++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8abebf184b..588c3e90e7 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -621,7 +621,7 @@ static void saveDataBlockLastRow(char** pRow, SArray* pDataBlock, int32_t rowInd } } -static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, +static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, uint64_t tableGroupId) { SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info; @@ -639,13 +639,17 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe // int32_t prevIndex = pResultRowInfo->curPos; TSKEY* tsCols = NULL; - if (pSDataBlock->pDataBlock != NULL) { - SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); + if (pBlock->pDataBlock != NULL) { + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); tsCols = (int64_t*)pColDataInfo->pData; + + if (tsCols != NULL) { + blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); + } } int32_t startPos = 0; - TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascScan); + TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols, pBlock->info.rows, ascScan); STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->interval.precision, &pInfo->win); @@ -670,7 +674,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe int32_t forwardStep = 0; TSKEY ekey = ascScan? win.ekey:win.skey; forwardStep = - getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); ASSERT(forwardStep > 0); // prev time window not interpolation yet. @@ -686,7 +690,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe } STimeWindow w = pRes->win; - ret = setTimeWindowOutputBuf(pResultRowInfo, pSDataBlock->info.uid, &w, masterScan, &pResult, tableGroupId, + ret = setTimeWindowOutputBuf(pResultRowInfo, pBlock->info.uid, &w, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { @@ -694,17 +698,17 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe } assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); - doTimeWindowInterpolation(pOperatorInfo, &pInfo->binfo, pSDataBlock->pDataBlock, *(TSKEY*)pInfo->pRow[0], -1, + doTimeWindowInterpolation(pOperatorInfo, &pInfo->binfo, pBlock->pDataBlock, *(TSKEY*)pInfo->pRow[0], -1, tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); - doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } // restore current time window - ret = setTimeWindowOutputBuf(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, + ret = setTimeWindowOutputBuf(pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { @@ -714,17 +718,17 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe #endif // window start key interpolation - doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep, + doWindowBorderInterpolation(pOperatorInfo, pBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep, pInfo->order, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, - pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); STimeWindow nextWin = win; while (1) { int32_t prevEndPos = (forwardStep - 1) * step + startPos; - startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order); + startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->order); if (startPos < 0) { break; } @@ -748,20 +752,20 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ekey = ascScan? nextWin.ekey:nextWin.skey; forwardStep = - getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); // window start(end) key interpolation - doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, + doWindowBorderInterpolation(pOperatorInfo, pBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, pInfo->order, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, - pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } if (pInfo->timeWindowInterpo) { - int32_t rowIndex = ascScan ? (pSDataBlock->info.rows - 1) : 0; - saveDataBlockLastRow(pInfo->pRow, pSDataBlock->pDataBlock, rowIndex, pSDataBlock->info.numOfCols); + int32_t rowIndex = ascScan ? (pBlock->info.rows - 1) : 0; + saveDataBlockLastRow(pInfo->pRow, pBlock->pDataBlock, rowIndex, pBlock->info.numOfCols); } return pUpdated; @@ -790,7 +794,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { } getTableScanInfo(pOperator, &pInfo->order, &scanFlag); - blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, scanFlag, true); -- GitLab