diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index de4e4dd1f84b6e56c896468b9d6cf6c97c592170..212a4aef54cfd25057a22b96c9b15a0d0b742f8d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -818,6 +818,7 @@ typedef struct SStateWindowOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; SAggSupporter aggSup; + SExprSupp scalarSup; SGroupResInfo groupResInfo; SWindowRowsSup winSup; diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 34997a5dd83aa6612d985acb07c91f1c5655e40a..cf0bc078d7215eed3080d106428b272959382131 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -366,7 +366,6 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t } // set the tag value for final result - // setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent); SInterval* pInterval = &pFillInfo->interval; pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision); @@ -523,14 +522,6 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) { return NULL; } -void taosFillSetDataOrderInfo(SFillInfo* pFillInfo, int32_t order) { - if (pFillInfo == NULL || (order != TSDB_ORDER_ASC && order != TSDB_ORDER_DESC)) { - return; - } - - pFillInfo->order = order; -} - void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) { if (pFillInfo->type == TSDB_FILL_NONE) { return; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 69e1b66ecf80988eec1e0657357beacf6474252b..16d0c178e0f531613e7d65bb88e740e37a014a21 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1218,39 +1218,17 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI pBlock->info.rows, numOfOutput); } -static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; +static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) { + if (OPTR_IS_OPENED(pOperator)) { + return TSDB_CODE_SUCCESS; } SStateWindowOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SExprSupp* pSup = &pOperator->exprSupp; - - SOptrBasicInfo* pBInfo = &pInfo->binfo; - - if (pOperator->status == OP_RES_TO_RETURN) { - while (1) { - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes, NULL); - - bool hasRemain = hasRemainResults(&pInfo->groupResInfo); - if (!hasRemain) { - doSetOperatorCompleted(pOperator); - break; - } - - if (pBInfo->pRes->info.rows > 0) { - break; - } - } - pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows; - return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes; - } - - int32_t order = TSDB_ORDER_ASC; - int64_t st = taosGetTimestampUs(); + SExprSupp* pSup = &pOperator->exprSupp; + int32_t order = TSDB_ORDER_ASC; + int64_t st = taosGetTimestampUs(); SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -1262,13 +1240,40 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, MAIN_SCAN, true); blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId); + // there is an scalar expression that needs to be calculated right before apply the group aggregation. + if (pInfo->scalarSup.pExprInfo != NULL) { + pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, + pInfo->scalarSup.numOfExprs, NULL); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } + } + doStateWindowAggImpl(pOperator, pInfo, pBlock); } pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; + initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); pOperator->status = OP_RES_TO_RETURN; - initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); + return TSDB_CODE_SUCCESS; +} + +static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SStateWindowOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SOptrBasicInfo* pBInfo = &pInfo->binfo; + + pTaskInfo->code = pOperator->fpSet._openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + doSetOperatorCompleted(pOperator); + return NULL; + } + blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); @@ -1284,6 +1289,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { break; } } + pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows; return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes; } @@ -1716,6 +1722,7 @@ static void destroyStateWindowOperatorInfo(void* param) { SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); taosMemoryFreeClear(pInfo->stateKey.pData); + cleanupExprSupp(&pInfo->scalarSup); taosMemoryFreeClear(param); } @@ -2747,6 +2754,15 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; + if (pStateNode->window.pExprs != NULL) { + int32_t numOfScalarExpr = 0; + SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr); + int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } + pInfo->stateCol = extractColumnFromColumnNode(pColNode); pInfo->stateKey.type = pInfo->stateCol.type; pInfo->stateKey.bytes = pInfo->stateCol.bytes; @@ -2769,7 +2785,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType}; - ; + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); pInfo->tsSlotId = tsSlotId; @@ -2780,7 +2796,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi pOperator->pTaskInfo = pTaskInfo; pOperator->info = pInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL, + pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); code = appendDownstream(pOperator, &downstream, 1);