From 8c47b350dcc53d735987b1c4008373daf7d8e7c8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 10 May 2022 18:21:54 +0800 Subject: [PATCH] fix(query): Pseudo time window result expands to multiple rows in case of multiple rows aggregates function existing. --- source/libs/executor/src/executorimpl.c | 26 ++++++++++++++----------- source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 2 +- source/libs/function/src/builtinsimpl.c | 5 ++++- 4 files changed, 21 insertions(+), 13 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a2aaac1f50..338c2c4207 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -619,7 +619,7 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* continue; } - if (functionNeedToExecute(&pCtx[k])) { + if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) { pCtx[k].fpSet.process(&pCtx[k]); } @@ -802,9 +802,12 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { if (functionNeedToExecute(&pCtx[k])) { - pCtx[k].startTs = startTs; // this can be set during create the struct - if (pCtx[k].fpSet.process != NULL) + pCtx[k].startTs = startTs; + // this can be set during create the struct + // todo add a dummy funtion to avoid process check + if (pCtx[k].fpSet.process != NULL) { pCtx[k].fpSet.process(&pCtx[k]); + } } } } @@ -1087,7 +1090,7 @@ static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) { for (int32_t i = 0; i < numOfOutput; ++i) { if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0) { pValCtx[num++] = &pCtx[i]; - } else { + } else if (fmIsAggFunc(pCtx[i].functionId)) { p = &pCtx[i]; } // if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) { @@ -2179,17 +2182,15 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased int32_t numOfResult = pBlock->info.rows; // there are already exists result rows int32_t start = 0; - int32_t step = -1; + int32_t step = 1; // qDebug("QInfo:0x%"PRIx64" start to copy data from windowResInfo to output buf", GET_TASKID(pRuntimeEnv)); assert(orderType == TSDB_ORDER_ASC || orderType == TSDB_ORDER_DESC); if (orderType == TSDB_ORDER_ASC) { start = pGroupResInfo->index; - step = 1; } else { // desc order copy all data start = numOfRows - pGroupResInfo->index - 1; - step = -1; } for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) { @@ -2219,10 +2220,13 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { // do nothing, todo refactor } else { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); - - char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); - colDataAppend(pColInfoData, pBlock->info.rows, in, pCtx[j].resultInfo->isNullRes); + // expand the result into multiple rows. E.g., _wstartts, top(k, 20) + // the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); + char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); + for(int32_t k = 0; k < pRow->numOfRows; ++k) { + colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); + } } } diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index c62a7f00cc..a45fdab030 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -25,6 +25,7 @@ extern "C" { bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx)); int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult); EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index f88d84f50a..6a3cf33a8e 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1064,7 +1064,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateSelectValue, .getEnvFunc = getSelectivityFuncEnv, // todo remove this function later. .initFunc = functionSetup, - .sprocessFunc = NULL, + .processFunc = NULL, .finalizeFunc = NULL } }; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index d1b16aef71..effc4888a8 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -184,7 +184,6 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; - /*cleanupResultRowEntry(pResInfo);*/ char* in = GET_ROWCELL_INTERBUF(pResInfo); colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes); @@ -192,6 +191,10 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } +int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx)) { + return 0; +} + int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult) { int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); -- GitLab