From 3ef067ff0354fba22bf9c99b424266584469b2cd Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 16 May 2022 11:03:15 +0800 Subject: [PATCH] feat(query): add tail function --- source/libs/function/inc/builtinsimpl.h | 7 +- source/libs/function/src/builtins.c | 36 +++++++++ source/libs/function/src/builtinsimpl.c | 100 ++++++++++++++++++++++++ 3 files changed, 142 insertions(+), 1 deletion(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 807234a1b1..c25d74911c 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -105,7 +105,12 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx); bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t sampleFunction(SqlFunctionCtx* pCtx); -int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +//int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); + +bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +int32_t tailFunction(SqlFunctionCtx* pCtx); +int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index fc93008312..282b527b31 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -386,6 +386,32 @@ static int32_t translateSample(SFunctionNode* pFunc, char* pErrBuf, int32_t len) return TSDB_CODE_SUCCESS; } +static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (2 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); + if (QUERY_NODE_COLUMN != nodeType(pPara)) { + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "The input parameter of TAIL function can only be column"); + } + + uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; + if (!IS_INTEGER_TYPE(paraType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + SExprNode* pCol = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0); + uint8_t colType = pCol->resType.type; + if (IS_VAR_DATA_TYPE(colType)) { + pFunc->node.resType = (SDataType){.bytes = pCol->resType.bytes, .type = colType}; + } else { + pFunc->node.resType = (SDataType){.bytes = tDataTypes[colType].bytes, .type = colType}; + } + return TSDB_CODE_SUCCESS; +} + static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // todo return TSDB_CODE_SUCCESS; @@ -850,6 +876,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = sampleFunction, .finalizeFunc = NULL }, + { + .name = "tail", + .type = FUNCTION_TYPE_TAIL, + .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateTail, + .getEnvFunc = getTailFuncEnv, + .initFunc = tailFunctionSetup, + .processFunc = tailFunction, + .finalizeFunc = NULL + }, { .name = "abs", .type = FUNCTION_TYPE_ABS, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index af86eb4e90..02382cc228 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -18,12 +18,15 @@ #include "function.h" #include "querynodes.h" #include "taggfunction.h" +#include "tcompare.h" #include "tdatablock.h" #include "tpercentile.h" #define HISTOGRAM_MAX_BINS_NUM 1000 #define MAVG_MAX_POINTS_NUM 1000 #define SAMPLE_MAX_POINTS_NUM 1000 +#define TAIL_MAX_POINTS_NUM 100 +#define TAIL_MAX_OFFSET 100 typedef struct SSumRes { union { @@ -161,6 +164,20 @@ typedef struct SSampleInfo { int64_t *timestamp; } SSampleInfo; +typedef struct STailUnit { + int64_t timestamp; + char data[]; +} STailUnit; + +typedef struct STailInfo { + int32_t numOfPoints; + int32_t numAdded; + int32_t offset; + uint8_t colType; + int16_t colBytes; + STailUnit **pRes; +} STailInfo; + #define SET_VAL(_info, numOfElem, res) \ do { \ if ((numOfElem) <= 0) { \ @@ -3141,3 +3158,86 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { // // return pResInfo->numOfRes; //} + + +bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); + SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); + int32_t numOfPoints = pVal->datum.i; + pEnv->calcMemSize = sizeof(STailInfo) + numOfPoints * (POINTER_BYTES + sizeof(STailUnit) + pCol->node.resType.bytes); + return true; +} + +bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo *pResultInfo) { + if (!functionSetup(pCtx, pResultInfo)) { + return false; + } + + STailInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo); + pInfo->numAdded = 0; + pInfo->numOfPoints = pCtx->param[1].param.i; + pInfo->offset = pCtx->param[2].param.i; + pInfo->colType = pCtx->resDataInfo.type; + pInfo->colBytes = pCtx->resDataInfo.bytes; + if ((pInfo->numOfPoints < 1 || pInfo->numOfPoints > TAIL_MAX_POINTS_NUM) || + (pInfo->numOfPoints < 0 || pInfo->numOfPoints > TAIL_MAX_OFFSET)) { + return false; + } + + pInfo->pRes = (STailUnit **)((char *)pInfo + sizeof(STailInfo)); + char *pUnit = (char *)pInfo->pRes + pInfo->numOfPoints * POINTER_BYTES; + + size_t unitSize = sizeof(STailUnit) + pInfo->colBytes; + for (int32_t i = 0; i < pInfo->numOfPoints; ++i) { + pInfo->pRes[i] = (STailUnit *)(pUnit + i * unitSize); + } + + return true; +} + +static void tailAssignResult(STailUnit* pUnit, char *data, int32_t colBytes, TSKEY ts) { + pUnit->timestamp = ts; + memcpy(pUnit->data, data, colBytes); +} + +static int32_t tailCompFn(const void *p1, const void *p2, const void *param) { + STailUnit *d1 = *(STailUnit **)p1; + STailUnit *d2 = *(STailUnit **)p2; + return compareInt64Val(&d1->timestamp, &d2->timestamp); +} + +static void doTailAdd(STailInfo* pInfo, char *data, TSKEY ts) { + STailUnit **pList = pInfo->pRes; + if (pInfo->numAdded < pInfo->numOfPoints) { + tailAssignResult(pList[pInfo->numAdded], data, pInfo->colBytes, ts); + taosheapsort((void *)pList, sizeof(STailUnit **), pInfo->numAdded + 1, NULL, tailCompFn, 0); + } else if (pList[0]->timestamp < ts) { + tailAssignResult(pList[0], data, pInfo->colBytes, ts); + taosheapadjust((void *)pList, sizeof(STailUnit **), 0, pInfo->numOfPoints - 1, NULL, tailCompFn, NULL, 0); + } +} + +int32_t tailFunction(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + STailInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + + SColumnInfoData* pInputCol = pInput->pData[0]; + SColumnInfoData* pTsOutput = pCtx->pTsOutput; + SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; + + int32_t startOffset = pCtx->offset; + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + //colDataAppendNULL(pOutput, i); + continue; + } + + char* data = colDataGetData(pInputCol, i); + doTailAdd(pInfo, data, tsList[i]); + } + + return pInfo->numOfPoints; +} -- GitLab