From b9bf2515f59e1cc280fc1f911e3e209fec87da0b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 12 Apr 2022 17:55:17 +0800 Subject: [PATCH] fix(query): handle crash of taosd caused by a complex expression in GROUP BY clauses. --- include/common/tcommon.h | 2 +- source/libs/executor/inc/executorimpl.h | 18 ++- source/libs/executor/src/executorimpl.c | 156 ++++++++++------------- source/libs/executor/src/groupoperator.c | 23 +++- source/libs/executor/src/scanoperator.c | 17 +-- source/libs/function/src/tfill.c | 2 +- 6 files changed, 104 insertions(+), 114 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 1040130f76..7c308f9354 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -222,7 +222,7 @@ typedef struct SFunctParam { // the structure for sql function in select clause typedef struct SResSchame { int8_t type; - int32_t colId; + int32_t slotId; int32_t bytes; int32_t precision; int32_t scale; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9fee599a5d..37bc4b771f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -411,7 +411,7 @@ typedef struct STableScanInfo { SResultRowInfo* pResultRowInfo; int32_t* rowCellInfoOffset; SExprInfo* pExpr; - SSDataBlock block; + SSDataBlock* pResBlock; SArray* pColMatchInfo; int32_t numOfOutput; int64_t elapsedTime; @@ -569,6 +569,9 @@ typedef struct SGroupbyOperatorInfo { int32_t groupKeyLen; // total group by column width SGroupResInfo groupResInfo; SAggSupporter aggSup; + SExprInfo* pScalarExprInfo; + int32_t numOfScalarExpr;// the number of scalar expression in group operator + SqlFunctionCtx*pScalarFuncCtx; } SGroupbyOperatorInfo; typedef struct SDataGroupInfo { @@ -674,7 +677,7 @@ void operatorDummyCloseFn(void* param, int32_t numOfCols); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); -void toSDatablock(SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset); +void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, int32_t* rowCellOffset); void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, @@ -685,12 +688,11 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI uint64_t* total, SArray* pColList); void doSetOperatorCompleted(SOperatorInfo* pOperator); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); -SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity); -SSDataBlock* loadNextDataBlock(void* param); +SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset); SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime, - int32_t reverseTime, SArray* pColMatchInfo, SNode* pCondition, SExecTaskInfo* pTaskInfo); + int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); @@ -704,8 +706,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, - SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); +SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, + SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo); @@ -729,6 +731,8 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOf int32_t numOfOutput); #endif +void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList); + void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b7bbb605c5..0eac507822 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -234,9 +234,8 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) { void operatorDummyCloseFn(void* param, int32_t numOfCols) {} -static int32_t doCopyToSDataBlock(SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, - SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset); - +static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, + int32_t orderType, int32_t* rowCellOffset); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow* win); @@ -1068,9 +1067,9 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, pCtx[i].size = pBlock->info.rows; pCtx[i].currentStage = MAIN_SCAN; - SExprInfo expr = pOperator->pExpr[i]; - for (int32_t j = 0; j < expr.base.numOfParams; ++j) { - SFunctParam *pFuncParam = &expr.base.pParam[j]; + SExprInfo* pOneExpr = &pOperator->pExpr[i]; + for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) { + SFunctParam *pFuncParam = &pOneExpr->base.pParam[j]; if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { int32_t slotId = pFuncParam->pCol->slotId; pCtx[i].input.pData[j] = taosArrayGet(pBlock->pDataBlock, slotId); @@ -1145,19 +1144,21 @@ static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, S } } -static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, - int32_t numOfOutput, SArray* pPseudoList) { +void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList) { setPseudoOutputColInfo(pResult, pCtx, pPseudoList); pResult->info.groupId = pSrcBlock->info.groupId; for (int32_t k = 0; k < numOfOutput; ++k) { + int32_t outputSlotId = pExpr[k].base.resSchema.slotId; + SqlFunctionCtx* pfCtx = &pCtx[k]; + if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query - SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k); - colDataAssign(pColInfoData, pCtx[k].input.pData[0], pCtx[k].input.numOfRows); + SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); + colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows); pResult->info.rows = pSrcBlock->info.rows; } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) { - SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k); + SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) { colDataAppend(pColInfoData, i, taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType), TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType); } @@ -1167,40 +1168,40 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData taosArrayPush(pBlockList, &pSrcBlock); SScalarParam dest = {0}; - dest.columnData = taosArrayGet(pResult->pDataBlock, k); + dest.columnData = taosArrayGet(pResult->pDataBlock, outputSlotId); scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); pResult->info.rows = dest.numOfRows; taosArrayDestroy(pBlockList); } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) { - ASSERT(!fmIsAggFunc(pCtx[k].functionId)); + ASSERT(!fmIsAggFunc(pfCtx->functionId)); - if (fmIsPseudoColumnFunc(pCtx[k].functionId)) { + if (fmIsPseudoColumnFunc(pfCtx->functionId)) { // do nothing - } else if (fmIsNonstandardSQLFunc(pCtx[k].functionId)) { + } else if (fmIsNonstandardSQLFunc(pfCtx->functionId)) { // todo set the correct timestamp column - pCtx[k].input.pPTS = taosArrayGet(pSrcBlock->pDataBlock, 1); + pfCtx->input.pPTS = taosArrayGet(pSrcBlock->pDataBlock, 1); SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[k]); - pCtx[k].fpSet.init(&pCtx[k], pResInfo); + pfCtx->fpSet.init(&pCtx[k], pResInfo); - pCtx[k].pOutput = taosArrayGet(pResult->pDataBlock, k); - pCtx[k].offset = pResult->info.rows; // set the start offset + pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); + pfCtx->offset = pResult->info.rows; // set the start offset if (taosArrayGetSize(pPseudoList) > 0) { int32_t* outputColIndex = taosArrayGet(pPseudoList, 0); - pCtx[k].pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; + pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; } - int32_t numOfRows = pCtx[k].fpSet.process(&pCtx[k]); + int32_t numOfRows = pfCtx->fpSet.process(pfCtx); pResult->info.rows += numOfRows; } else { SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); taosArrayPush(pBlockList, &pSrcBlock); SScalarParam dest = {0}; - dest.columnData = taosArrayGet(pResult->pDataBlock, k); + dest.columnData = taosArrayGet(pResult->pDataBlock, outputSlotId); scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); pResult->info.rows = dest.numOfRows; @@ -1810,7 +1811,7 @@ static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) { return TSDB_CODE_SUCCESS; } -static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset) { +SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset) { SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx)); if (pFuncCtx == NULL) { return NULL; @@ -1851,12 +1852,12 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES); pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES); - pCtx->pTsOutput = NULL;//taosArrayInit(4, POINTER_BYTES); + pCtx->pTsOutput = NULL; pCtx->resDataInfo.bytes = pFunct->resSchema.bytes; pCtx->resDataInfo.type = pFunct->resSchema.type; pCtx->order = TSDB_ORDER_ASC; - pCtx->start.key = INT64_MIN; - pCtx->end.key = INT64_MIN; + pCtx->start.key = INT64_MIN; + pCtx->end.key = INT64_MIN; #if 0 for (int32_t j = 0; j < pCtx->numOfParams; ++j) { // int16_t type = pFunct->param[j].nType; @@ -3330,8 +3331,7 @@ void setIntervalQueryRange(STaskRuntimeEnv* pRuntimeEnv, TSKEY key) { * @param pQInfo * @param result */ -static int32_t doCopyToSDataBlock(SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, - SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset) { +int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, int32_t* rowCellOffset) { int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t numOfResult = pBlock->info.rows; // there are already exists result rows @@ -3370,7 +3370,9 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResI pGroupResInfo->index += 1; for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, j); + int32_t slotId = pExprInfo[j].base.resSchema.slotId; + + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); SResultRowEntryInfo* pEntryInfo = getResultCell(pRow, j, rowCellOffset); char* in = GET_ROWCELL_INTERBUF(pEntryInfo); @@ -3391,8 +3393,8 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResI return 0; } -void toSDatablock(SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, - int32_t* rowCellOffset) { +void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, + int32_t* rowCellOffset) { assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); blockDataCleanup(pBlock); @@ -3401,7 +3403,7 @@ void toSDatablock(SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock } int32_t orderType = TSDB_ORDER_ASC; - doCopyToSDataBlock(pBuf, pGroupResInfo, orderType, pBlock, rowCapacity, rowCellOffset); + doCopyToSDataBlock(pBlock, rowCapacity, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset); // add condition (pBlock->info.rows >= 1) just to runtime happy blockDataUpdateTsWindow(pBlock); @@ -4438,32 +4440,6 @@ _error: return NULL; } -SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { - SSDataBlock* pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - if (pResBlock == NULL) { - return NULL; - } - - size_t numOfCols = taosArrayGetSize(pExprInfo); - pResBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); - - SArray* pResult = pResBlock->pDataBlock; - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData colInfoData = {0}; - SExprInfo* p = taosArrayGetP(pExprInfo, i); - - SResSchema* pSchema = &p->base.resSchema; - colInfoData.info.type = pSchema->type; - colInfoData.info.colId = pSchema->colId; - colInfoData.info.bytes = pSchema->bytes; - colInfoData.info.scale = pSchema->scale; - colInfoData.info.precision = pSchema->precision; - taosArrayPush(pResult, &colInfoData); - } - - return pResBlock; -} - static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey); static void cleanupAggSup(SAggSupporter* pAggSup); @@ -4777,7 +4753,7 @@ static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGr SColumn* pCol = taosArrayGet(pGroupInfo, i); for (int32_t j = 0; j < numOfCols; ++j) { SExprInfo* pe = &pExprInfo[j]; - if (pe->base.resSchema.colId == pCol->colId) { + if (pe->base.resSchema.slotId == pCol->colId) { taosArrayPush(plist, pCol); taosArrayPush(pInfo->groupInfo, &j); len += pCol->bytes; @@ -4816,7 +4792,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t goto _error; } - pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset); initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL) { @@ -5137,9 +5113,7 @@ static SSDataBlock* doMultiTableAggregate(SOperatorInfo* pOperator, bool* newgro SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity, - pAggInfo->binfo.rowCellInfoOffset); - + toSDatablock(pInfo->pRes, pAggInfo->binfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->pResultBuf, pAggInfo->binfo.rowCellInfoOffset); if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -5188,8 +5162,7 @@ static SSDataBlock* doMultiTableAggregate(SOperatorInfo* pOperator, bool* newgro updateNumOfRowsInResultRows(pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); initGroupResInfo(&pAggInfo->groupResInfo, &pInfo->resultRowInfo); - toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity, - pAggInfo->binfo.rowCellInfoOffset); + toSDatablock(pInfo->pRes, pAggInfo->binfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->pResultBuf, pAggInfo->binfo.rowCellInfoOffset); if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -5204,6 +5177,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) SSDataBlock* pRes = pInfo->pRes; blockDataCleanup(pRes); + #if 0 if (pProjectInfo->existDataBlock) { // TODO refactor SSDataBlock* pBlock = pProjectInfo->existDataBlock; @@ -5392,8 +5366,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro } blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); - toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, - pInfo->binfo.rowCellInfoOffset); + toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -5411,8 +5384,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup } if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, - pInfo->binfo.rowCellInfoOffset); + toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -5447,8 +5419,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); - toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, - pInfo->binfo.rowCellInfoOffset); + toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); ASSERT(pInfo->binfo.pRes->info.rows > 0); pOperator->status = OP_RES_TO_RETURN; @@ -5693,7 +5664,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset); + toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -5725,7 +5696,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity); - toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset); + toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -5742,7 +5713,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset); + toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -5774,7 +5745,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity); - toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset); + toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -5953,7 +5924,7 @@ static void cleanupAggSup(SAggSupporter* pAggSup) { int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) { - pBasicInfo->pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset); + pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset); pBasicInfo->pRes = pResultBlock; pBasicInfo->capacity = numOfRows; @@ -6382,8 +6353,6 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); - // pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -6406,8 +6375,6 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); - // pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -6685,10 +6652,10 @@ bool validateExprColumnInfo(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision, const char* name) { SResSchema s = {0}; - s.scale = scale; - s.type = type; - s.bytes = bytes; - s.colId = slotId; + s.scale = scale; + s.type = type; + s.bytes = bytes; + s.slotId = slotId; s.precision = precision; strncpy(s.name, name, tListLen(s.name)); @@ -6730,7 +6697,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* pTargetNode = (STargetNode*)nodesListGetNode(pGroupKeys, i - numOfFuncs); } - SExprInfo* pExp = &pExprs[pTargetNode->slotId]; + SExprInfo* pExp = &pExprs[i]; pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode)); pExp->pExpr->_function.num = 1; @@ -6850,8 +6817,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t numOfCols = 0; tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); + SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc); + return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, - pScanPhyNode->reverse, pColList, pScanPhyNode->node.pConditions, pTaskInfo); + pScanPhyNode->reverse, pColList, pResBlock, pScanPhyNode->node.pConditions, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc); @@ -6906,9 +6875,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + SExprInfo* pScalarExprInfo = NULL; + int32_t numOfScalarExpr = 0; if (pAggNode->pGroupKeys != NULL) { SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys); - return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pTaskInfo, NULL); + if (pAggNode->pExprs != NULL) { + pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); + } + + return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL); } else { return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); } @@ -7155,10 +7130,15 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod } *numOfOutputCols = 0; - int32_t num = LIST_LENGTH(pOutputNodeList->pSlots); for (int32_t i = 0; i < num; ++i) { SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i); + // todo: add reserve flag check + if (pNode->slotId >= numOfCols) { // it is a column reserved for the arithmetic expression calculation + (*numOfOutputCols) += 1; + continue; + } + SColMatchInfo* info = taosArrayGet(pList, pNode->slotId); if (pNode->output) { (*numOfOutputCols) += 1; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 8a10170fcd..c8a6697d4e 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou SSDataBlock* pRes = pInfo->binfo.pRes; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset); + toSDatablock(pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -285,6 +285,12 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); + + // there is an scalar expression that needs to be calculated before apply the group aggregation. + if (pInfo->pScalarExprInfo != NULL) { + projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL); + } + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); doHashGroupbyAgg(pOperator, pBlock); } @@ -305,7 +311,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); while(1) { - toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset); + toSDatablock(pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); doFilter(pInfo->pCondition, pRes); bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo); @@ -322,16 +328,21 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou return (pRes->info.rows == 0)? NULL:pRes; } -SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, - const STableGroupInfo* pTableGroupInfo) { +SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, + SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->pGroupCols = pGroupColList; - pInfo->pCondition = pCondition; + pInfo->pGroupCols = pGroupColList; + pInfo->pCondition = pCondition; + + pInfo->pScalarExprInfo = pScalarExprInfo; + pInfo->numOfScalarExpr = numOfScalarExpr; + pInfo->pScalarFuncCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset); + int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4179999c4b..f1502df743 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -113,7 +113,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { STableScanInfo* pTableScanInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSDataBlock* pBlock = &pTableScanInfo->block; + SSDataBlock* pBlock = pTableScanInfo->pResBlock; STableGroupInfo* pTableGroupInfo = &pOperator->pTaskInfo->tableqinfoGroupInfo; *newgroup = false; @@ -218,7 +218,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { } SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, - int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, + int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo) { assert(repeatTime > 0); @@ -232,12 +232,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, return NULL; } - pInfo->block.pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); - for (int32_t i = 0; i < numOfOutput; ++i) { - SColumnInfoData idata = {0}; - taosArrayPush(pInfo->block.pDataBlock, &idata); - } - + pInfo->pResBlock = pResBlock; pInfo->pFilterNode = pCondition; pInfo->dataReader = pTsdbReadHandle; pInfo->times = repeatTime; @@ -312,7 +307,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator, bool* newgroup) { tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &tableBlockDist); tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader); - SSDataBlock* pBlock = &pTableScanInfo->block; + SSDataBlock* pBlock = pTableScanInfo->pResBlock; pBlock->info.rows = 1; pBlock->info.numOfCols = 1; @@ -343,13 +338,13 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* } pInfo->dataReader = dataReader; - pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); +// pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); SColumnInfoData infoData = {0}; infoData.info.type = TSDB_DATA_TYPE_BINARY; infoData.info.bytes = 1024; infoData.info.colId = 0; - taosArrayPush(pInfo->block.pDataBlock, &infoData); +// taosArrayPush(pInfo->block.pDataBlock, &infoData); pOperator->name = "DataBlockInfoScanOperator"; // pOperator->operatorType = OP_TableBlockInfoScan; diff --git a/source/libs/function/src/tfill.c b/source/libs/function/src/tfill.c index 1f7e83286b..b6b5362187 100644 --- a/source/libs/function/src/tfill.c +++ b/source/libs/function/src/tfill.c @@ -541,7 +541,7 @@ struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, co pFillCol[i].col.bytes = pExprInfo->base.resSchema.bytes; pFillCol[i].col.type = (int8_t)pExprInfo->base.resSchema.type; pFillCol[i].col.offset = offset; - pFillCol[i].col.colId = pExprInfo->base.resSchema.colId; + pFillCol[i].col.colId = pExprInfo->base.resSchema.slotId; pFillCol[i].tagIndex = -2; pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query // pFillCol[i].functionId = pExprInfo->pExpr->_function.functionId; -- GitLab