From 78b1bf27b6af2b7e99840270ea5163229ddce2d0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 6 Apr 2022 10:09:14 +0800 Subject: [PATCH] [td-14393] refactor. --- include/libs/function/function.h | 3 +- source/libs/executor/src/executorimpl.c | 29 +++++---- source/libs/function/src/builtins.c | 4 +- source/libs/function/src/builtinsimpl.c | 81 +++++++++++++++---------- source/libs/function/src/taggfunction.c | 14 ++--- 5 files changed, 76 insertions(+), 55 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 5b4108ef97..d4307362b6 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -194,7 +194,8 @@ typedef struct SqlFunctionCtx { int32_t numOfParams; SVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param int64_t *ptsList; // corresponding timestamp array list - void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ + SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ + int32_t offset; SVariant tag; struct SResultRowEntryInfo *resultInfo; SSubsidiaryResInfo subsidiaryRes; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 12255189b3..b690f88fe2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1264,17 +1264,21 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData taosArrayDestroy(pBlockList); } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) { - ASSERT(!fmIsAggFunc(pCtx->functionId)); + ASSERT(!fmIsAggFunc(pCtx[k].functionId)); - if (fmIsNonstandardSQLFunc(pCtx->functionId)) { + if (fmIsPseudoColumnFunc(pCtx[k].functionId)) { + // TODO: set the correct _rowts column output buffer, there may be multiple _rowts columns + } else if (fmIsNonstandardSQLFunc(pCtx[k].functionId)) { SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k); + pCtx[k].ptsList = 0; SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[k]); pCtx[k].fpSet.init(&pCtx[k], pResInfo); - pCtx[k].pOutput = pColInfoData->pData; + pCtx[k].pOutput = (char*)pColInfoData; +// pCtx[k].pTsOutput = int32_t numOfRows = pCtx[k].fpSet.process(&pCtx[k]); - pResult->info.rows = numOfRows; + pResult->info.rows += numOfRows; } else { SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); taosArrayPush(pBlockList, &pSrcBlock); @@ -1284,7 +1288,6 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); pResult->info.rows = dest.numOfRows; - taosArrayDestroy(pBlockList); } } else { @@ -1926,12 +1929,12 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES); pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES); - pCtx->ptsOutputBuf = NULL; + pCtx->pTsOutput = NULL; pCtx->resDataInfo.bytes = pFunct->resSchema.bytes; - pCtx->resDataInfo.type = pFunct->resSchema.type; - pCtx->order = TSDB_ORDER_ASC; + pCtx->resDataInfo.type = pFunct->resSchema.type; + pCtx->order = TSDB_ORDER_ASC; pCtx->start.key = INT64_MIN; - pCtx->end.key = INT64_MIN; + pCtx->end.key = INT64_MIN; #if 0 for (int32_t j = 0; j < pCtx->numOfParams; ++j) { // int16_t type = pFunct->param[j].nType; @@ -2983,7 +2986,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t // set the timestamp output buffer for top/bottom/diff query // int32_t fid = pCtx[i].functionId; // if (fid == FUNCTION_TOP || fid == FUNCTION_BOTTOM || fid == FUNCTION_DIFF || fid == FUNCTION_DERIVATIVE) { - // if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput; + // if (i > 0) pCtx[i].pTsOutput = pCtx[i-1].pOutput; // } } @@ -3020,7 +3023,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity, int32_t numOf if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF || functionId == FUNCTION_DERIVATIVE) { - if (i > 0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i - 1].pOutput; +// if (i > 0) pBInfo->pCtx[i].pTsOutput = pBInfo->pCtx[i - 1].pOutput; } } } @@ -3058,7 +3061,7 @@ void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput) void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) { for (int32_t j = 0; j < size; ++j) { struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]); - if (isRowEntryInitialized(pResInfo) || pCtx[j].functionId == -1) { + if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 || fmIsScalarFunc(pCtx[j].functionId)) { continue; } @@ -3224,7 +3227,7 @@ void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pRes } if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) { - if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput; +// if (i > 0) pCtx[i].pTsOutput = pCtx[i - 1].pOutput; } // if (!pResInfo->initialized) { diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 573eb8ecf5..0c86bd22fb 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -387,7 +387,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_ROWTS, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC, .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = NULL, + .getEnvFunc = getTimePseudoFuncEnv, .initFunc = NULL, .sprocessFunc = NULL, .finalizeFunc = NULL @@ -504,6 +504,7 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { break; } + case FUNCTION_TYPE_ROWTS: case FUNCTION_TYPE_QSTARTTS: case FUNCTION_TYPE_QENDTS: case FUNCTION_TYPE_WSTARTTS: @@ -589,7 +590,6 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { break; } - case FUNCTION_TYPE_ROWTS: case FUNCTION_TYPE_TBNAME: { // todo break; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 10f23d5866..55f4fb3878 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -847,8 +847,10 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) { } typedef struct SDiffInfo { - bool valueAssigned; + bool hasPrev; + bool includeNull; bool ignoreNegative; + bool firstOutput; union { int64_t i64; double d64;} prev; } SDiffInfo; @@ -863,9 +865,11 @@ bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo) { } SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); - pDiffInfo->valueAssigned = false; - pDiffInfo->prev.i64 = 0; + pDiffInfo->hasPrev = false; + pDiffInfo->prev.i64 = 0; pDiffInfo->ignoreNegative = false; // TODO set correct param + pDiffInfo->includeNull = false; + pDiffInfo->firstOutput = false; return true; } @@ -876,65 +880,79 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - bool isFirstBlock = (pDiffInfo->valueAssigned == false); + bool isFirstBlock = (pDiffInfo->hasPrev == false); int32_t numOfElems = 0; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); // int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1; - TSKEY* pTimestamp = pCtx->ptsOutputBuf; + SColumnInfoData* pTsOutput = pCtx->pTsOutput; TSKEY* tsList = GET_TS_LIST(pCtx); + int32_t startOffset = 0; switch (pInputCol->info.type) { case TSDB_DATA_TYPE_INT: { - int32_t *pOutput = (int32_t *)pCtx->pOutput; + SColumnInfoData *pOutput = (SColumnInfoData *)pCtx->pOutput; for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + + int32_t pos = startOffset + (isFirstBlock? (numOfElems-1):numOfElems); if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + if (pDiffInfo->includeNull) { + colDataSetNull_f(pOutput->nullbitmap, pos); + if (tsList != NULL) { + colDataAppendInt64(pTsOutput, pos, &tsList[i]); + } + + numOfElems += 1; + } continue; } int32_t v = *(int32_t*) colDataGetData(pInputCol, i); - if (pDiffInfo->valueAssigned) { - int64_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null - if (pDiffInfo->ignoreNegative) { - continue; + if (pDiffInfo->hasPrev) { + int32_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null + if (delta < 0 && pDiffInfo->ignoreNegative) { + colDataSetNull_f(pOutput->nullbitmap, pos); + } else { + colDataAppendInt32(pOutput, pos, &delta); } + } - *(pOutput++) = delta; -// *pTimestamp = (tsList != NULL)? tsList[i]:0; - pTimestamp += 1; + if (tsList != NULL) { + colDataAppendInt64(pTsOutput, pos, &tsList[i]); } pDiffInfo->prev.i64 = v; - pDiffInfo->valueAssigned = true; + pDiffInfo->hasPrev = true; numOfElems++; } break; } + case TSDB_DATA_TYPE_BIGINT: { - int64_t *pOutput = (int64_t *)pCtx->pOutput; + SColumnInfoData *pOutput = (SColumnInfoData *)pCtx->pOutput; for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } int32_t v = 0; - if (pDiffInfo->valueAssigned) { + if (pDiffInfo->hasPrev) { v = *(int64_t*) colDataGetData(pInputCol, i); int64_t delta = (int64_t)(v - pDiffInfo->prev.i64); // direct previous may be null if (pDiffInfo->ignoreNegative) { continue; } - *(pOutput++) = delta; - *pTimestamp = (tsList != NULL)? tsList[i]:0; - - pOutput += 1; - pTimestamp += 1; +// *(pOutput++) = delta; +// *pTimestamp = (tsList != NULL)? tsList[i]:0; +// +// pOutput += 1; +// pTimestamp += 1; } pDiffInfo->prev.i64 = v; - pDiffInfo->valueAssigned = true; + pDiffInfo->hasPrev = true; numOfElems++; } break; @@ -952,7 +970,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { continue; } - if (pDiffInfo->valueAssigned) { // initial value is not set yet + if (pDiffInfo->hasPrev) { // initial value is not set yet SET_DOUBLE_VAL(pOutput, pData[i] - pDiffInfo->d64Prev); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; pOutput += 1; @@ -960,7 +978,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { } pDiffInfo->d64Prev = pData[i]; - pDiffInfo->valueAssigned = true; + pDiffInfo->hasPrev = true; numOfElems++; } break; @@ -977,7 +995,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { continue; } - if (pDiffInfo->valueAssigned) { // initial value is not set yet + if (pDiffInfo->hasPrev) { // initial value is not set yet *pOutput = (float)(pData[i] - pDiffInfo->d64Prev); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; pOutput += 1; @@ -985,7 +1003,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { } pDiffInfo->d64Prev = pData[i]; - pDiffInfo->valueAssigned = true; + pDiffInfo->hasPrev = true; numOfElems++; } break; @@ -1002,7 +1020,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { continue; } - if (pDiffInfo->valueAssigned) { // initial value is not set yet + if (pDiffInfo->hasPrev) { // initial value is not set yet *pOutput = (int16_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; pOutput += 1; @@ -1010,7 +1028,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { } pDiffInfo->i64Prev = pData[i]; - pDiffInfo->valueAssigned = true; + pDiffInfo->hasPrev = true; numOfElems++; } break; @@ -1028,7 +1046,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { continue; } - if (pDiffInfo->valueAssigned) { // initial value is not set yet + if (pDiffInfo->hasPrev) { // initial value is not set yet *pOutput = (int8_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; pOutput += 1; @@ -1036,7 +1054,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { } pDiffInfo->i64Prev = pData[i]; - pDiffInfo->valueAssigned = true; + pDiffInfo->hasPrev = true; numOfElems++; } break; @@ -1048,7 +1066,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { } // initial value is not set yet - if (!pDiffInfo->valueAssigned || numOfElems <= 0) { + if (!pDiffInfo->hasPrev || numOfElems <= 0) { /* * 1. current block and blocks before are full of null * 2. current block may be null value @@ -1064,7 +1082,6 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { int32_t forwardStep = (isFirstBlock) ? numOfElems - 1 : numOfElems; return forwardStep; -// pResInfo->numOfRes += forwardStep; } } diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index 9174420ff4..789bfb61ee 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -1902,10 +1902,10 @@ static void copyTopBotRes(SqlFunctionCtx *pCtx, int32_t type) { } // set the output timestamp of each record. - TSKEY *output = pCtx->ptsOutputBuf; - for (int32_t i = 0; i < len; ++i, output += step) { - *output = tvp[i]->timestamp; - } +// TSKEY *output = pCtx->pTsOutput; +// for (int32_t i = 0; i < len; ++i, output += step) { +// *output = tvp[i]->timestamp; +// } // set the corresponding tag data for each record // todo check malloc failure @@ -2687,7 +2687,7 @@ static void deriv_function(SqlFunctionCtx *pCtx) { int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1; - TSKEY *pTimestamp = pCtx->ptsOutputBuf; + TSKEY *pTimestamp = NULL;//pCtx->pTsOutput; TSKEY *tsList = GET_TS_LIST(pCtx); double *pOutput = (double *)pCtx->pOutput; @@ -2867,7 +2867,7 @@ static void deriv_function(SqlFunctionCtx *pCtx) { } else { \ *(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i)); \ *(type *)(&(ctx)->param[1].i) = *(type *)(d); \ - *(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \ + *(int64_t *)(ctx)->pTsOutput = GET_TS_DATA(ctx, index); \ } \ } while (0); @@ -2881,7 +2881,7 @@ static void diff_function(SqlFunctionCtx *pCtx) { int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1; - TSKEY* pTimestamp = pCtx->ptsOutputBuf; + TSKEY* pTimestamp = NULL;//pCtx->pTsOutput; TSKEY* tsList = GET_TS_LIST(pCtx); switch (pCtx->inputType) { -- GitLab