From 55338753262bbc14e85a7bed00bed3eb1ba57588 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 22 Mar 2022 16:03:42 +0800 Subject: [PATCH] [td-13039] support group by. --- include/common/tcommon.h | 6 +- source/libs/executor/inc/executorimpl.h | 6 +- source/libs/executor/src/executorimpl.c | 236 +++++++++++++----------- source/libs/function/src/builtinsimpl.c | 4 +- 4 files changed, 141 insertions(+), 111 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index fb787a32df..8faccf7815 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -160,10 +160,8 @@ typedef struct SColumn { int64_t dataBlockId; }; - union { - int16_t colId; - int16_t slotId; - }; + int16_t colId; + int16_t slotId; char name[TSDB_COL_NAME_LEN]; int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 66d5f8b5a7..f170d5c529 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -543,9 +543,10 @@ typedef struct SFillOperatorInfo { typedef struct SGroupbyOperatorInfo { SOptrBasicInfo binfo; - int32_t colIndex; + SArray* pGroupCols; char* prevData; // previous group by value SGroupResInfo groupResInfo; + SAggSupporter aggSup; } SGroupbyOperatorInfo; typedef struct SSessionAggOperatorInfo { @@ -643,6 +644,8 @@ SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfD SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, + SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, @@ -650,7 +653,6 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult); -SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a1b11c8161..d1aa7cd99a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include "os.h" @@ -242,8 +243,8 @@ static void operatorDummyCloseFn(void* param, int32_t numOfCols) {} static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset); static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); -static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binf, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); - +static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupId, + SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); @@ -1012,8 +1013,8 @@ static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t of pCtx[k].startTs = pWin->skey; // keep it temporarialy - int32_t startOffset = pCtx[k].input.startRowIndex; bool hasAgg = pCtx[k].input.colDataAggIsSet; + int32_t startOffset = pCtx[k].input.startRowIndex; int32_t numOfRows = pCtx[k].input.numOfRows; int32_t pos = (order == TSDB_ORDER_ASC) ? offset : offset - (forwardStep - 1); @@ -1622,35 +1623,30 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe // updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey); } +static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pBlock) { + SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; + // TODO multiple group by columns + SColumn* pCol = taosArrayGet(pInfo->pGroupCols, 0); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); -static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { - STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - STableQueryInfo* item = pRuntimeEnv->current; - - SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex); - - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - int16_t bytes = pColInfoData->info.bytes; - int16_t type = pColInfoData->info.type; - + int16_t bytes = pColInfoData->info.bytes; + int16_t type = pColInfoData->info.type; if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { //qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv)); return; } - SColumnInfoData* pFirstColData = taosArrayGet(pSDataBlock->pDataBlock, 0); - int64_t* tsList = (pFirstColData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (int64_t*) pFirstColData->pData:NULL; - STimeWindow w = TSWINDOW_INITIALIZER; int32_t num = 0; - for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { - char* val = ((char*)pColInfoData->pData) + bytes * j; - if (isNull(val, type)) { + for (int32_t j = 0; j < pBlock->info.rows; ++j) { + if (colDataIsNull(pColInfoData, pBlock->info.rows, j, NULL)) { // TODO continue; } + char* val = colDataGetData(pColInfoData, j); + // Compare with the previous row of this column, and do not set the output buffer again if they are identical. if (pInfo->prevData == NULL) { pInfo->prevData = malloc(bytes); @@ -1672,35 +1668,28 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn } } - if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { - setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo->prevData, bytes); - } - - int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, item->groupIndex); + int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, 0, + pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } -// doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, j - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); + doApplyFunctions(pInfo->binfo.pCtx, &w, j - num, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); num = 1; memcpy(pInfo->prevData, val, bytes); } if (num > 0) { - char* val = ((char*)pColInfoData->pData) + bytes * (pSDataBlock->info.rows - num); + char* val = ((char*)pColInfoData->pData) + bytes * (pBlock->info.rows - num); memcpy(pInfo->prevData, val, bytes); - - if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { - setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, bytes); - } - - int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes, item->groupIndex); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, 0, + pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); + if (ret != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } -// doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); + doApplyFunctions(pInfo->binfo.pCtx, &w, pBlock->info.rows - num, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); } tfree(pInfo->prevData); @@ -1792,9 +1781,8 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { } } -static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) { - SDiskbasedBuf *pResultBuf = pRuntimeEnv->pResultBuf; - +static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupId, + SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup) { int32_t *rowCellInfoOffset = binfo->rowCellInfoOffset; SResultRowInfo *pResultRowInfo = &binfo->resultRowInfo; SqlFunctionCtx *pCtx = binfo->pCtx; @@ -1808,19 +1796,12 @@ static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicI } int64_t tid = 0; - SResultRow *pResultRow = doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, d, len, true, groupIndex); + SResultRow *pResultRow = doSetResultOutBufByKey_rv(pBuf, pResultRowInfo, groupId, (char *)pData, TSDB_KEYSIZE, true, groupId, pTaskInfo, true, pAggSup); assert (pResultRow != NULL); setResultRowKey(pResultRow, pData, type); - if (pResultRow->pageId == -1) { - int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupIndex, pRuntimeEnv->pQueryAttr->resultRowSize); - if (ret != 0) { - return -1; - } - } - setResultOutputBuf(pRuntimeEnv, pResultRow, pCtx, numOfCols, rowCellInfoOffset); - initCtxOutputBuffer(pCtx, numOfCols); + setResultRowOutputBufInitCtx_rv(pBuf, pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset); return TSDB_CODE_SUCCESS; } @@ -1851,6 +1832,10 @@ static bool functionNeedToExecute(SqlFunctionCtx *pCtx) { // in case of timestamp column, always generated results. int32_t functionId = pCtx->functionId; + if (functionId == -1) { + return false; + } + if (functionId == FUNCTION_TS) { return true; } @@ -2063,9 +2048,13 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num if (pExpr->pExpr->_function.pFunctNode != NULL) { SFuncExecEnv env = {0}; - fmGetFuncExecFuncs(pExpr->pExpr->_function.pFunctNode->funcId, &pCtx->fpSet); + pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId; + + fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet); pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env); pCtx->resDataInfo.interBufSize = env.calcMemSize; + } else { + pCtx->functionId = -1; } pCtx->input.numOfInputCols = pFunct->numOfParams; @@ -3318,8 +3307,6 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t SResultRow* pRow = doSetResultOutBufByKey_rv(pSup->pResultBuf, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pSup); for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { - SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i); - struct SResultRowEntryInfo* pEntry = getResultCell(pRow, i, rowCellInfoOffset); cleanupResultRowEntry(pEntry); @@ -3417,7 +3404,7 @@ void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity) { void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) { for (int32_t j = 0; j < size; ++j) { struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]); - if (isRowEntryInitialized(pResInfo)) { + if (isRowEntryInitialized(pResInfo) || pCtx[j].functionId == -1) { continue; } @@ -3458,6 +3445,10 @@ static void setupEnvForReverseScan(STableScanInfo *pTableScanInfo, SqlFunctionCt void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput) { for (int32_t j = 0; j < numOfOutput; ++j) { + if (pCtx[j].functionId == -1) { + continue; + } + pCtx[j].fpSet.finalize(&pCtx[j]); } } @@ -3480,7 +3471,9 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD continue; } - pCtx[j].fpSet.finalize(&pCtx[j]); + if (pCtx[j].fpSet.process) { // TODO set the dummy function. + pCtx[j].fpSet.finalize(&pCtx[j]); + } if (pRow->numOfRows < pResInfo->numOfRes) { pRow->numOfRows = pResInfo->numOfRes; @@ -3586,20 +3579,13 @@ void setResultRowOutputBufInitCtx(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pRes void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf * pBuf, SResultRow *pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) { // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group - SFilePage* bufPage = getBufPage(pBuf, pResult->pageId); - -// int32_t offset = 0; for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { -// offset += pCtx[i].resDataInfo.bytes; continue; } - -// offset += pCtx[i].resDataInfo.bytes; - // int32_t functionId = pCtx[i].functionId; // if (functionId < 0) { // continue; @@ -3608,7 +3594,7 @@ void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf * pBuf, SResultRow *pResult, // if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput; // } - if (!pResInfo->initialized) { + if (!pResInfo->initialized && pCtx[i].functionId != -1) { pCtx[i].fpSet.init(&pCtx[i], pResInfo); } } @@ -6941,15 +6927,15 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou SGroupbyOperatorInfo *pInfo = pOperator->info; if (pOperator->status == OP_RES_TO_RETURN) { -// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); - if (pInfo->binfo.pRes->info.rows == 0/*|| !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset); + if (pInfo->binfo.pRes->info.rows == 0|| !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } return pInfo->binfo.pRes; } - SOperatorInfo* downstream = pOperator->pDownstream[0]; int32_t order = TSDB_ORDER_ASC; + SOperatorInfo* downstream = pOperator->pDownstream[0]; while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -6962,29 +6948,23 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); - if (pInfo->colIndex == -1) { - pInfo->colIndex = getGroupbyColumnIndex(NULL/*pInfo->pGroupbyExpr*/, pBlock); - } - doHashGroupbyAgg(pOperator, pInfo, pBlock); } pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->binfo.resultRowInfo); + finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); // if (!pRuntimeEnv->pQueryAttr->stableQuery) { // finalize include the update of result rows - finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput); +// finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput); // } else { // updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); // } + blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); -// if (!pRuntimeEnv->pQueryAttr->stableQuery) { -// sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); -// } - -// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); - if (pInfo->binfo.pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset); + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -7663,25 +7643,25 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun return pOperator; } -SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, + SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo)); - pInfo->colIndex = -1; // group by column index - - pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } -// pQueryAttr->resultRowSize = (pQueryAttr->resultRowSize * -// (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery))); + pInfo->pGroupCols = pGroupColList; + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str); - pInfo->binfo.pRes = pResBlock; initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "GroupbyAggOperator"; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Groupby; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; + pOperator->pExpr = pExprInfo; + pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; pOperator->_openFn = operatorDummyOpenFn; pOperator->getNextFn = hashGroupbyAggregate; @@ -7689,6 +7669,11 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; + + _error: + tfree(pInfo); + tfree(pOperator); + return NULL; } SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) { @@ -8152,17 +8137,29 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i return s; } -SExprInfo* createExprInfo(SNodeList* pNodeList, int32_t* numOfExprs) { +SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) { int32_t numOfFuncs = LIST_LENGTH(pNodeList); + int32_t numOfGroupKeys = 0; + if (pGroupKeys != NULL) { + numOfGroupKeys = LIST_LENGTH(pGroupKeys); + } - *numOfExprs = numOfFuncs; - SExprInfo* pExprs = calloc(numOfFuncs, sizeof(SExprInfo)); + *numOfExprs = numOfFuncs + numOfGroupKeys; + SExprInfo* pExprs = calloc(*numOfExprs, sizeof(SExprInfo)); - for(int32_t i = 0; i < numOfFuncs; ++i) { - SExprInfo* pExp = &pExprs[i]; + for(int32_t i = 0; i < (*numOfExprs); ++i) { + STargetNode* pTargetNode = NULL; + if (i < numOfFuncs) { + pTargetNode = (STargetNode*)nodesListGetNode(pNodeList, i); + } else { + pTargetNode = (STargetNode*)nodesListGetNode(pGroupKeys, i - numOfFuncs); + } + + SExprInfo* pExp = &pExprs[pTargetNode->slotId]; pExp->pExpr = calloc(1, sizeof(tExprNode)); pExp->pExpr->_function.num = 1; + pExp->pExpr->_function.functionId = -1; pExp->base.pParam = calloc(1, sizeof(SFunctParam)); pExp->base.numOfParams = 1; @@ -8170,10 +8167,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, int32_t* numOfExprs) { pExp->base.pParam[0].pCol = calloc(1, sizeof(SColumn)); SColumn* pCol = pExp->base.pParam[0].pCol; - STargetNode* pTargetNode = (STargetNode*)nodesListGetNode(pNodeList, i); - ASSERT(pTargetNode->slotId == i); - - // it is a project query + // it is a project query, or group by column if (nodeType(pTargetNode->pExpr) == QUERY_NODE_COLUMN) { SColumnNode* pColNode = (SColumnNode*) pTargetNode->pExpr; @@ -8190,6 +8184,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, int32_t* numOfExprs) { SDataType* pType = &pFuncNode->node.resType; pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pFuncNode->node.aliasName); + pExp->pExpr->_function.functionId = pFuncNode->funcId; pExp->pExpr->_function.pFunctNode = pFuncNode; strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName)); @@ -8199,11 +8194,11 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, int32_t* numOfExprs) { SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j); SColumnNode* pcn = (SColumnNode*)p1; - pCol->slotId = pcn->slotId; - pCol->bytes = pcn->node.resType.bytes; - pCol->type = pcn->node.resType.type; - pCol->scale = pcn->node.resType.scale; - pCol->precision = pcn->node.resType.precision; + pCol->slotId = pcn->slotId; + pCol->bytes = pcn->node.resType.bytes; + pCol->type = pcn->node.resType.type; + pCol->scale = pcn->node.resType.scale; + pCol->precision = pcn->node.resType.precision; pCol->dataBlockId = pcn->dataBlockId; } } @@ -8231,6 +8226,7 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId); static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo); static SArray* extractScanColumnId(SNodeList* pNodeList); +static SArray* extractColumnInfo(SNodeList* pNodeList); SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { // if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node @@ -8289,7 +8285,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections, &num); + SExprInfo* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo); } @@ -8302,9 +8298,17 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(((SAggPhysiNode*)pPhyNode)->pAggFuncs, &num); + + SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; + SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); + + if (pAggNode->pGroupKeys != NULL) { + SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys); + return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); + } else { + return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); + } } } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == nodeType(pPhyNode)) { size_t size = LIST_LENGTH(pPhyNode->pChildren); @@ -8317,7 +8321,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->pFuncs, &num); + SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, .intervalUnit = 'a', .slidingUnit = 'a'}; @@ -8385,6 +8389,32 @@ SArray* extractScanColumnId(SNodeList* pNodeList) { return pList; } +SArray* extractColumnInfo(SNodeList* pNodeList) { + size_t numOfCols = LIST_LENGTH(pNodeList); + SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn)); + if (pList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for(int32_t i = 0; i < numOfCols; ++i) { + STargetNode* pNode = (STargetNode*) nodesListGetNode(pNodeList, i); + SColumnNode* pColNode = (SColumnNode*) pNode->pExpr; + + SColumn c = {0}; + c.slotId = pColNode->slotId; + c.colId = pColNode->colId; + c.type = pColNode->node.resType.type; + c.bytes = pColNode->node.resType.bytes; + c.precision = pColNode->node.resType.precision; + c.scale = pColNode->node.resType.scale; + + taosArrayPush(pList, &c); + } + + return pList; +} + int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId) { int32_t code = 0; if (tableType == TSDB_SUPER_TABLE) { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fa83068755..14f63bead8 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -456,7 +456,7 @@ void firstFunction(SqlFunctionCtx *pCtx) { int32_t numOfElems = 0; - struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); char* buf = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; @@ -500,7 +500,7 @@ void lastFunction(SqlFunctionCtx *pCtx) { int32_t numOfElems = 0; - struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); + SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); char* buf = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; -- GitLab