提交 c96036a7 编写于 作者: H Haojun Liao

fix(query): add null value check before update the block window info

上级 00821527
......@@ -621,7 +621,7 @@ static void saveDataBlockLastRow(char** pRow, SArray* pDataBlock, int32_t rowInd
}
}
static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock,
static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
uint64_t tableGroupId) {
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
......@@ -639,13 +639,17 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
// int32_t prevIndex = pResultRowInfo->curPos;
TSKEY* tsCols = NULL;
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
if (pBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
if (tsCols != NULL) {
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
}
}
int32_t startPos = 0;
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascScan);
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols, pBlock->info.rows, ascScan);
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, &pInfo->win);
......@@ -670,7 +674,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
int32_t forwardStep = 0;
TSKEY ekey = ascScan? win.ekey:win.skey;
forwardStep =
getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
ASSERT(forwardStep > 0);
// prev time window not interpolation yet.
......@@ -686,7 +690,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
}
STimeWindow w = pRes->win;
ret = setTimeWindowOutputBuf(pResultRowInfo, pSDataBlock->info.uid, &w, masterScan, &pResult, tableGroupId,
ret = setTimeWindowOutputBuf(pResultRowInfo, pBlock->info.uid, &w, masterScan, &pResult, tableGroupId,
pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup,
pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -694,17 +698,17 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
}
assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
doTimeWindowInterpolation(pOperatorInfo, &pInfo->binfo, pSDataBlock->pDataBlock, *(TSKEY*)pInfo->pRow[0], -1,
doTimeWindowInterpolation(pOperatorInfo, &pInfo->binfo, pBlock->pDataBlock, *(TSKEY*)pInfo->pRow[0], -1,
tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP);
doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
}
// restore current time window
ret = setTimeWindowOutputBuf(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId,
ret = setTimeWindowOutputBuf(pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, tableGroupId,
pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup,
pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -714,17 +718,17 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
#endif
// window start key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep,
doWindowBorderInterpolation(pOperatorInfo, pBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep,
pInfo->order, false);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
STimeWindow nextWin = win;
while (1) {
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order);
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->order);
if (startPos < 0) {
break;
}
......@@ -748,20 +752,20 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
ekey = ascScan? nextWin.ekey:nextWin.skey;
forwardStep =
getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
// window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep,
doWindowBorderInterpolation(pOperatorInfo, pBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep,
pInfo->order, false);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
}
if (pInfo->timeWindowInterpo) {
int32_t rowIndex = ascScan ? (pSDataBlock->info.rows - 1) : 0;
saveDataBlockLastRow(pInfo->pRow, pSDataBlock->pDataBlock, rowIndex, pSDataBlock->info.numOfCols);
int32_t rowIndex = ascScan ? (pBlock->info.rows - 1) : 0;
saveDataBlockLastRow(pInfo->pRow, pBlock->pDataBlock, rowIndex, pBlock->info.numOfCols);
}
return pUpdated;
......@@ -790,7 +794,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
}
getTableScanInfo(pOperator, &pInfo->order, &scanFlag);
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, scanFlag, true);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册