From 6e0f1cb1e012961236778747e86e68769851765b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 3 Apr 2023 11:57:43 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/executor/src/aggregateoperator.c | 427 +++++++++++++++++++ 1 file changed, 427 insertions(+) create mode 100644 source/libs/executor/src/aggregateoperator.c diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c new file mode 100644 index 0000000000..565a1ccf25 --- /dev/null +++ b/source/libs/executor/src/aggregateoperator.c @@ -0,0 +1,427 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "filter.h" +#include "function.h" +#include "functionMgt.h" +#include "os.h" +#include "querynodes.h" +#include "tfill.h" +#include "tname.h" + +#include "tdatablock.h" +#include "tglobal.h" +#include "tmsg.h" +#include "ttime.h" + +#include "executorimpl.h" +#include "index.h" +#include "query.h" +#include "tcompare.h" +#include "thash.h" +#include "ttypes.h" +#include "vnode.h" + +static void destroyAggOperatorInfo(void* param); +static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); + +static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock); +static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock); + +static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator); +static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx); +static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator); + +static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size); + +static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); + +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, + SExecTaskInfo* pTaskInfo) { + SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc); + initBasicInfo(&pInfo->binfo, pResBlock); + + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + initResultSizeInfo(&pOperator->resultInfo, 4096); + + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, + pTaskInfo->streamInfo.pState); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + int32_t numOfScalarExpr = 0; + SExprInfo* pScalarExprInfo = NULL; + if (pAggNode->pExprs != NULL) { + pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); + } + + code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; + pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized; + pInfo->groupId = UINT64_MAX; + + setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, + pTaskInfo); + pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, + optrDefaultBufFn, NULL); + + if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { + STableScanInfo* pTableScanInfo = downstream->info; + pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp; + pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup; + } + + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + + _error: + if (pInfo != NULL) { + destroyAggOperatorInfo(pInfo); + } + + if (pOperator != NULL) { + cleanupExprSupp(&pOperator->exprSupp); + } + + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; + return NULL; +} + +void destroyAggOperatorInfo(void* param) { + SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param; + cleanupBasicInfo(&pInfo->binfo); + + cleanupAggSup(&pInfo->aggSup); + cleanupExprSupp(&pInfo->scalarExprSup); + cleanupGroupResInfo(&pInfo->groupResInfo); + taosMemoryFreeClear(param); +} + +// this is a blocking operator +int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { + if (OPTR_IS_OPENED(pOperator)) { + return TSDB_CODE_SUCCESS; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SAggOperatorInfo* pAggInfo = pOperator->info; + + SExprSupp* pSup = &pOperator->exprSupp; + SOperatorInfo* downstream = pOperator->pDownstream[0]; + + int64_t st = taosGetTimestampUs(); + + int32_t order = TSDB_ORDER_ASC; + int32_t scanFlag = MAIN_SCAN; + + bool hasValidBlock = false; + bool blockAllocated = false; + + while (1) { + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + if (pBlock == NULL) { + if (!hasValidBlock) { + createDataBlockForEmptyInput(pOperator, &pBlock); + if (pBlock == NULL) { + break; + } + blockAllocated = true; + } else { + break; + } + } + hasValidBlock = true; + + int32_t code = getTableScanInfo(pOperator, &order, &scanFlag, false); + if (code != TSDB_CODE_SUCCESS) { + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); + T_LONG_JMP(pTaskInfo->env, code); + } + + // there is an scalar expression that needs to be calculated before apply the group aggregation. + if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) { + SExprSupp* pSup1 = &pAggInfo->scalarExprSup; + code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL); + if (code != TSDB_CODE_SUCCESS) { + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); + T_LONG_JMP(pTaskInfo->env, code); + } + } + + // the pDataBlock are always the same one, no need to call this again + setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); + setInputDataBlock(pSup, pBlock, order, scanFlag, true); + code = doAggregateImpl(pOperator, pSup->pCtx); + if (code != 0) { + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); + T_LONG_JMP(pTaskInfo->env, code); + } + + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); + } + + // the downstream operator may return with error code, so let's check the code before generating results. + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } + + initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); + OPTR_SET_OPENED(pOperator); + + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; + return pTaskInfo->code; +} + +SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { + SAggOperatorInfo* pAggInfo = pOperator->info; + SOptrBasicInfo* pInfo = &pAggInfo->binfo; + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + pTaskInfo->code = pOperator->fpSet._openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + setOperatorCompleted(pOperator); + return NULL; + } + + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + while (1) { + doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); + doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); + + if (!hasRemainResults(&pAggInfo->groupResInfo)) { + setOperatorCompleted(pOperator); + break; + } + + if (pInfo->pRes->info.rows > 0) { + break; + } + } + + size_t rows = blockDataGetNumOfRows(pInfo->pRes); + pOperator->resultInfo.totalRows += rows; + + return (rows == 0) ? NULL : pInfo->pRes; +} + +int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { + for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) { + if (functionNeedToExecute(&pCtx[k])) { + // todo add a dummy funtion to avoid process check + if (pCtx[k].fpSet.process == NULL) { + continue; + } + + int32_t code = pCtx[k].fpSet.process(&pCtx[k]); + if (code != TSDB_CODE_SUCCESS) { + qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); + return code; + } + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) { + if (!tsCountAlwaysReturnValue) { + return TSDB_CODE_SUCCESS; + } + + SAggOperatorInfo* pAggInfo = pOperator->info; + if (pAggInfo->groupKeyOptimized) { + return TSDB_CODE_SUCCESS; + } + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION || + (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN && + ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) { + return TSDB_CODE_SUCCESS; + } + + SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; + bool hasCountFunc = false; + + for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { + const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; + if ((strcmp(pName, "count") == 0) || (strcmp(pName, "hyperloglog") == 0) || + (strcmp(pName, "_hyperloglog_partial") == 0) || (strcmp(pName, "_hyperloglog_merge") == 0)) { + hasCountFunc = true; + break; + } + } + + if (!hasCountFunc) { + return TSDB_CODE_SUCCESS; + } + + SSDataBlock* pBlock = createDataBlock(); + pBlock->info.rows = 1; + pBlock->info.capacity = 0; + + for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { + SColumnInfoData colInfo = {0}; + colInfo.hasNull = true; + colInfo.info.type = TSDB_DATA_TYPE_NULL; + colInfo.info.bytes = 1; + + SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i]; + for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) { + SFunctParam* pFuncParam = &pOneExpr->base.pParam[j]; + if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { + int32_t slotId = pFuncParam->pCol->slotId; + int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + if (slotId >= numOfCols) { + taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1); + for (int32_t k = numOfCols; k < slotId + 1; ++k) { + taosArrayPush(pBlock->pDataBlock, &colInfo); + } + } + } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { + // do nothing + } + } + } + + blockDataEnsureCapacity(pBlock, pBlock->info.rows); + *ppBlock = pBlock; + + return TSDB_CODE_SUCCESS; +} + +void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) { + if (!blockAllocated) { + return; + } + + blockDataDestroy(*ppBlock); + *ppBlock = NULL; +} + +void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { + SAggOperatorInfo* pAggInfo = pOperator->info; + if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) { + return; + } + + doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId); + + // record the current active group id + pAggInfo->groupId = groupId; +} + +void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { + // for simple group by query without interval, all the tables belong to one group result. + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SAggOperatorInfo* pAggInfo = pOperator->info; + + SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo; + SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; + int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset; + + SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, + sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup, true); + /* + * not assign result buffer yet, add new result buffer + * all group belong to one result set, and each group result has different group id so set the id to be one + */ + if (pResultRow->pageId == -1) { + int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, terrno); + } + } + + setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); +} + +// a new buffer page for each table. Needs to opt this design +int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) { + if (pWindowRes->pageId != -1) { + return 0; + } + + SFilePage* pData = NULL; + + // in the first scan, new space needed for results + int32_t pageId = -1; + SArray* list = getDataBufPagesIdList(pResultBuf); + + if (taosArrayGetSize(list) == 0) { + pData = getNewBufPage(pResultBuf, &pageId); + pData->num = sizeof(SFilePage); + } else { + SPageInfo* pi = getLastPageInfo(list); + pData = getBufPage(pResultBuf, getPageId(pi)); + if (pData == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return terrno; + } + + pageId = getPageId(pi); + + if (pData->num + size > getBufPageSize(pResultBuf)) { + // release current page first, and prepare the next one + releaseBufPageInfo(pResultBuf, pi); + + pData = getNewBufPage(pResultBuf, &pageId); + if (pData != NULL) { + pData->num = sizeof(SFilePage); + } + } + } + + if (pData == NULL) { + return -1; + } + + // set the number of rows in current disk page + if (pWindowRes->pageId == -1) { // not allocated yet, allocate new buffer + pWindowRes->pageId = pageId; + pWindowRes->offset = (int32_t)pData->num; + + pData->num += size; + } + + return 0; +} -- GitLab