From a4c7e78d5f55877c787182955e33b41098308554 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 6 Apr 2022 11:01:09 +0800 Subject: [PATCH] [td-14393] refactor. --- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorimpl.c | 47 ++++++++++++++++++++----- source/libs/function/src/builtinsimpl.c | 10 +++--- 3 files changed, 44 insertions(+), 15 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b16d75502e..3cbcbc004b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -505,7 +505,7 @@ typedef struct SProjectOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; SSDataBlock *existDataBlock; - int32_t threshold; + SArray *pPseudoColInfo; SLimit limit; int64_t curOffset; int64_t curOutput; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b690f88fe2..3e47356223 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1244,8 +1244,21 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction } } +static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) { + size_t num = 0; + if (pPseudoList != NULL) { + num = taosArrayGetSize(pPseudoList); + } + + for (int32_t i = 0; i < num; ++i) { + pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i); + } +} + static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, - int32_t numOfOutput) { + int32_t numOfOutput, SArray* pPseudoList) { + setPseudoOutputColInfo(pResult, pCtx, pPseudoList); + for (int32_t k = 0; k < numOfOutput; ++k) { if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k); @@ -1267,16 +1280,20 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData ASSERT(!fmIsAggFunc(pCtx[k].functionId)); if (fmIsPseudoColumnFunc(pCtx[k].functionId)) { - // TODO: set the correct _rowts column output buffer, there may be multiple _rowts columns + // do nothing } else if (fmIsNonstandardSQLFunc(pCtx[k].functionId)) { - SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k); + // todo set the correct timestamp column + pCtx[k].input.pPTS = taosArrayGet(pSrcBlock->pDataBlock, 1); - pCtx[k].ptsList = 0; SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[k]); pCtx[k].fpSet.init(&pCtx[k], pResInfo); - pCtx[k].pOutput = (char*)pColInfoData; -// pCtx[k].pTsOutput = + pCtx[k].pOutput = taosArrayGet(pResult->pDataBlock, k); + pCtx[k].offset = pResult->info.rows; // set the start offset + + int32_t* outputColIndex = taosArrayGet(pPseudoList, 0); + pCtx[k].pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; + int32_t numOfRows = pCtx[k].fpSet.process(&pCtx[k]); pResult->info.rows += numOfRows; } else { @@ -1929,7 +1946,7 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES); pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES); - pCtx->pTsOutput = NULL; + pCtx->pTsOutput = NULL;//taosArrayInit(4, POINTER_BYTES); pCtx->resDataInfo.bytes = pFunct->resSchema.bytes; pCtx->resDataInfo.type = pFunct->resSchema.type; pCtx->order = TSDB_ORDER_ASC; @@ -5382,12 +5399,12 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); - projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput); + projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput, pProjectInfo->pPseudoColInfo); + // todo extract method if (pProjectInfo->curOffset < pInfo->pRes->info.rows && pProjectInfo->curOffset > 0) { blockDataTrimFirstNRows(pInfo->pRes, pProjectInfo->curOffset); pProjectInfo->curOffset = 0; - break; } else if (pProjectInfo->curOffset >= pInfo->pRes->info.rows) { pProjectInfo->curOffset -= pInfo->pRes->info.rows; blockDataCleanup(pInfo->pRes); @@ -6247,6 +6264,17 @@ _error: return NULL; } +static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) { + SArray* pList = taosArrayInit(4, sizeof(int32_t)); + for(int32_t i = 0; i < numOfCols; ++i) { + if (fmIsPseudoColumnFunc(pCtx[i].functionId)) { + taosArrayPush(pList, &i); + } + } + + return pList; +} + SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SExecTaskInfo* pTaskInfo) { SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo)); @@ -6263,6 +6291,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p int32_t numOfRows = 4096; initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); + pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols); pOperator->name = "ProjectOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 55f4fb3878..6f7fcd37be 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -887,9 +887,9 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { // int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1; SColumnInfoData* pTsOutput = pCtx->pTsOutput; - TSKEY* tsList = GET_TS_LIST(pCtx); + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; - int32_t startOffset = 0; + int32_t startOffset = pCtx->offset; switch (pInputCol->info.type) { case TSDB_DATA_TYPE_INT: { SColumnInfoData *pOutput = (SColumnInfoData *)pCtx->pOutput; @@ -916,10 +916,10 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { } else { colDataAppendInt32(pOutput, pos, &delta); } - } - if (tsList != NULL) { - colDataAppendInt64(pTsOutput, pos, &tsList[i]); + if (tsList != NULL) { + colDataAppendInt64(pTsOutput, pos, &tsList[i]); + } } pDiffInfo->prev.i64 = v; -- GitLab