提交 4bc94300 编写于 作者: H Haojun Liao

enh(query): support scalar expressions in the state window aggregate.

上级 a356b4eb
...@@ -815,6 +815,7 @@ typedef struct SStateWindowOperatorInfo { ...@@ -815,6 +815,7 @@ typedef struct SStateWindowOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SExprSupp scalarSup;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
SWindowRowsSup winSup; SWindowRowsSup winSup;
......
...@@ -366,7 +366,6 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t ...@@ -366,7 +366,6 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
} }
// set the tag value for final result // set the tag value for final result
// setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent);
SInterval* pInterval = &pFillInfo->interval; SInterval* pInterval = &pFillInfo->interval;
pFillInfo->currentKey = pFillInfo->currentKey =
taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision); taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision);
...@@ -523,14 +522,6 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) { ...@@ -523,14 +522,6 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
return NULL; return NULL;
} }
void taosFillSetDataOrderInfo(SFillInfo* pFillInfo, int32_t order) {
if (pFillInfo == NULL || (order != TSDB_ORDER_ASC && order != TSDB_ORDER_DESC)) {
return;
}
pFillInfo->order = order;
}
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) { void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) {
if (pFillInfo->type == TSDB_FILL_NONE) { if (pFillInfo->type == TSDB_FILL_NONE) {
return; return;
......
...@@ -1213,39 +1213,17 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -1213,39 +1213,17 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
pBlock->info.rows, numOfOutput); pBlock->info.rows, numOfOutput);
} }
static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) { if (OPTR_IS_OPENED(pOperator)) {
return NULL; return TSDB_CODE_SUCCESS;
} }
SStateWindowOperatorInfo* pInfo = pOperator->info; SStateWindowOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExprSupp* pSup = &pOperator->exprSupp;
SExprSupp* pSup = &pOperator->exprSupp; int32_t order = TSDB_ORDER_ASC;
int64_t st = taosGetTimestampUs();
SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) {
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
break;
}
if (pBInfo->pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
}
int32_t order = TSDB_ORDER_ASC;
int64_t st = taosGetTimestampUs();
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) { while (1) {
...@@ -1257,13 +1235,40 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -1257,13 +1235,40 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, MAIN_SCAN, true); setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, MAIN_SCAN, true);
blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId); blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
pInfo->scalarSup.numOfExprs, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
}
doStateWindowAggImpl(pOperator, pInfo, pBlock); doStateWindowAggImpl(pOperator, pInfo, pBlock);
} }
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SStateWindowOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
doSetOperatorCompleted(pOperator);
return NULL;
}
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
while (1) { while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
...@@ -1279,6 +1284,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -1279,6 +1284,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
break; break;
} }
} }
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows; pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes; return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
} }
...@@ -1659,6 +1665,7 @@ static void destroyStateWindowOperatorInfo(void* param) { ...@@ -1659,6 +1665,7 @@ static void destroyStateWindowOperatorInfo(void* param) {
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param; SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo); cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(pInfo->stateKey.pData); taosMemoryFreeClear(pInfo->stateKey.pData);
cleanupExprSupp(&pInfo->scalarSup);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -2690,6 +2697,15 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi ...@@ -2690,6 +2697,15 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
if (pStateNode->window.pExprs != NULL) {
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr);
int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
pInfo->stateCol = extractColumnFromColumnNode(pColNode); pInfo->stateCol = extractColumnFromColumnNode(pColNode);
pInfo->stateKey.type = pInfo->stateCol.type; pInfo->stateKey.type = pInfo->stateCol.type;
pInfo->stateKey.bytes = pInfo->stateCol.bytes; pInfo->stateKey.bytes = pInfo->stateCol.bytes;
...@@ -2712,7 +2728,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi ...@@ -2712,7 +2728,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
pInfo->twAggSup = pInfo->twAggSup =
(STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType}; (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
;
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->tsSlotId = tsSlotId; pInfo->tsSlotId = tsSlotId;
...@@ -2723,7 +2739,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi ...@@ -2723,7 +2739,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL, pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, NULL,
destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册