From 76d5298ba1e73075f3d5dcc077644caf3e60ecef Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 17 Feb 2022 16:08:59 +0800 Subject: [PATCH] feature/qnode --- include/libs/function/function.h | 4 +- include/libs/function/functionMgt.h | 2 +- include/libs/nodes/querynodes.h | 2 - source/libs/function/inc/tbinoperator.h | 2 +- source/libs/function/inc/tscalar.h | 8 +- source/libs/function/inc/tscalarfunction.h | 6 +- source/libs/function/inc/tunaryoperator.h | 2 +- source/libs/function/src/taggfunction.c | 2 +- source/libs/function/src/tbinoperator.c | 68 +-- source/libs/function/src/tscalar.c | 532 +++++++++++++++------ source/libs/function/src/tscalarfunction.c | 20 +- 11 files changed, 449 insertions(+), 199 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index aef5f7fec4..dfcac7ca3f 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -226,13 +226,13 @@ typedef struct SAggFunctionInfo { int32_t (*dataReqFunc)(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId); } SAggFunctionInfo; -struct SScalarFuncParam; +struct SScalarParam; typedef struct SScalarFunctionInfo { char name[FUNCTIONS_NAME_MAX_LENGTH]; int8_t type; // scalar function or aggregation function uint32_t functionId; // index of scalar function - void (*process)(struct SScalarFuncParam* pOutput, size_t numOfInput, const struct SScalarFuncParam *pInput); + void (*process)(struct SScalarParam* pOutput, size_t numOfInput, const struct SScalarParam *pInput); } SScalarFunctionInfo; typedef struct SMultiFunctionsDesc { diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 5078b00c8c..84b1f8d5bf 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -117,7 +117,7 @@ typedef struct SFuncExecFuncs { FExecFinalize finalize; } SFuncExecFuncs; -typedef int32_t (*FScalarExecProcess)(SScalarFuncParam *pInput, int32_t inputNum, SScalarFuncParam *pOutput); +typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); typedef struct SScalarFuncExecFuncs { FScalarExecProcess process; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index b6439acdbe..cea0aa32a8 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -104,8 +104,6 @@ typedef enum EOperatorType { OP_TYPE_NMATCH, OP_TYPE_IS_NULL, OP_TYPE_IS_NOT_NULL, - OP_TYPE_BIT_AND, - OP_TYPE_BIT_OR, // json operator OP_TYPE_JSON_GET_VALUE, diff --git a/source/libs/function/inc/tbinoperator.h b/source/libs/function/inc/tbinoperator.h index c1b6b0bc31..678d6169ab 100644 --- a/source/libs/function/inc/tbinoperator.h +++ b/source/libs/function/inc/tbinoperator.h @@ -22,7 +22,7 @@ extern "C" { #include "tscalarfunction.h" -typedef void (*_bin_scalar_fn_t)(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *output, int32_t order); +typedef void (*_bin_scalar_fn_t)(SScalarParam* pLeft, SScalarParam* pRight, void *output, int32_t order); _bin_scalar_fn_t getBinScalarOperatorFn(int32_t binOperator); bool isBinaryStringOp(int32_t op); diff --git a/source/libs/function/inc/tscalar.h b/source/libs/function/inc/tscalar.h index da98069c86..f7f30e40c6 100644 --- a/source/libs/function/inc/tscalar.h +++ b/source/libs/function/inc/tscalar.h @@ -19,9 +19,13 @@ extern "C" { #endif -typedef struct SScalarCalcContext { +typedef struct SScalarCtx { + int32_t code; + SSDataBlock *pSrc; + SHashObj *pRes; /* element is SScalarParam */ +} SScalarCtx; -} SScalarCalcContext; +#define SCL_DEFAULT_OP_NUM 10 #define sclFatal(...) qFatal(__VA_ARGS__) #define sclError(...) qError(__VA_ARGS__) diff --git a/source/libs/function/inc/tscalarfunction.h b/source/libs/function/inc/tscalarfunction.h index 6d23775610..f36f5d4fcf 100644 --- a/source/libs/function/inc/tscalarfunction.h +++ b/source/libs/function/inc/tscalarfunction.h @@ -21,12 +21,12 @@ extern "C" { #include "function.h" -typedef struct SScalarFuncParam { +typedef struct SScalarParam { void* data; int32_t num; int32_t type; int32_t bytes; -} SScalarFuncParam; +} SScalarParam; typedef struct SScalarFunctionSupport { struct SExprInfo *pExprInfo; @@ -39,7 +39,7 @@ typedef struct SScalarFunctionSupport { extern struct SScalarFunctionInfo scalarFunc[8]; -int32_t evaluateExprNodeTree(tExprNode* pExprs, int32_t numOfRows, SScalarFuncParam* pOutput, +int32_t evaluateExprNodeTree(tExprNode* pExprs, int32_t numOfRows, SScalarParam* pOutput, void* param, char* (*getSourceDataBlock)(void*, const char*, int32_t)); diff --git a/source/libs/function/inc/tunaryoperator.h b/source/libs/function/inc/tunaryoperator.h index 27784e8487..08cc6f69c6 100644 --- a/source/libs/function/inc/tunaryoperator.h +++ b/source/libs/function/inc/tunaryoperator.h @@ -22,7 +22,7 @@ extern "C" { #include "tscalarfunction.h" -typedef void (*_unary_scalar_fn_t)(SScalarFuncParam *pLeft, SScalarFuncParam* pOutput); +typedef void (*_unary_scalar_fn_t)(SScalarParam *pLeft, SScalarParam* pOutput); _unary_scalar_fn_t getUnaryScalarOperatorFn(int32_t binOperator); #ifdef __cplusplus diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index 881845f42b..441b6fe3e7 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -3240,7 +3240,7 @@ static void arithmetic_function(SqlFunctionCtx *pCtx) { GET_RES_INFO(pCtx)->numOfRes += pCtx->size; SScalarFunctionSupport *pSup = (SScalarFunctionSupport *)pCtx->param[1].pz; - SScalarFuncParam output = {0}; + SScalarParam output = {0}; output.data = pCtx->pOutput; //evaluateExprNodeTree(pSup->pExprInfo->pExpr, pCtx->size, &output, pSup, getArithColumnData); diff --git a/source/libs/function/src/tbinoperator.c b/source/libs/function/src/tbinoperator.c index 27b5d162a6..31fc2813e2 100644 --- a/source/libs/function/src/tbinoperator.c +++ b/source/libs/function/src/tbinoperator.c @@ -245,7 +245,7 @@ _getValueAddr_fn_t getVectorValueAddrFn(int32_t srcType) { } -int32_t vectorConvertImpl(SScalarFuncParam* pIn, SScalarFuncParam* pOut) { +int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut) { int16_t inType = pIn->type; int16_t inBytes = pIn->bytes; char *input = pIn->data; @@ -512,13 +512,13 @@ int8_t gConvertTypes[TSDB_DATA_TYPE_BLOB+1][TSDB_DATA_TYPE_BLOB+1] = { /*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; -int32_t vectorConvert(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, SScalarFuncParam* pLeftOut, SScalarFuncParam* pRightOut) { +int32_t vectorConvert(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam* pLeftOut, SScalarParam* pRightOut) { if (pLeft->type == pRight->type) { return TSDB_CODE_SUCCESS; } - SScalarFuncParam *param1 = NULL, *paramOut1 = NULL; - SScalarFuncParam *param2 = NULL, *paramOut2 = NULL; + SScalarParam *param1 = NULL, *paramOut1 = NULL; + SScalarParam *param2 = NULL, *paramOut2 = NULL; int32_t code = 0; if (pLeft->type < pRight->type) { @@ -575,7 +575,7 @@ int32_t vectorConvert(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, SScalar return TSDB_CODE_SUCCESS; } -void vectorAdd(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorAdd(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; @@ -614,7 +614,7 @@ void vectorAdd(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int } } -void vectorSub(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorSub(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; @@ -651,7 +651,7 @@ void vectorSub(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int } } } -void vectorMultiply(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorMultiply(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; @@ -690,7 +690,7 @@ void vectorMultiply(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out } } -void vectorDivide(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorDivide(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; @@ -736,7 +736,7 @@ void vectorDivide(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, } } -void vectorRemainder(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorRemainder(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; @@ -808,7 +808,7 @@ void vectorRemainder(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *ou } } -void vectorConcat(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorConcat(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { int32_t len = pLeft->bytes + pRight->bytes; int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1; @@ -859,7 +859,7 @@ void vectorConcat(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, } -void vectorBitAnd(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorBitAnd(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; @@ -898,7 +898,7 @@ void vectorBitAnd(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, } } -void vectorBitOr(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorBitOr(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; @@ -938,7 +938,7 @@ void vectorBitOr(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, i } -void vectorCompareImpl(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord, int32_t optr) { +void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord, int32_t optr) { int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; int8_t funcIdx = filterGetCompFuncIdx(pLeft->type, optr); @@ -993,14 +993,14 @@ void vectorCompareImpl(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void * } } -void vectorCompare(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord, int32_t optr) { - SScalarFuncParam pLeftOut = {0}; - SScalarFuncParam pRightOut = {0}; +void vectorCompare(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord, int32_t optr) { + SScalarParam pLeftOut = {0}; + SScalarParam pRightOut = {0}; vectorConvert(pLeft, pRight, &pLeftOut, &pRightOut); - SScalarFuncParam *param1 = NULL; - SScalarFuncParam *param2 = NULL; + SScalarParam *param1 = NULL; + SScalarParam *param2 = NULL; int32_t type = 0; if (pLeftOut->type) { @@ -1018,55 +1018,55 @@ void vectorCompare(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, vectorCompareImpl(pLeftOut, pRightOut, out, _ord, TSDB_RELATION_GREATER); } -void vectorGreater(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorGreater(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_GREATER); } -void vectorGreaterEqual(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorGreaterEqual(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_GREATER_EQUAL); } -void vectorLower(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorLower(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_LESS); } -void vectorLowerEqual(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorLowerEqual(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_LESS_EQUAL); } -void vectorEqual(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorEqual(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_EQUAL); } -void vectorNotEqual(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorNotEqual(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_NOT_EQUAL); } -void vectorIn(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorIn(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_IN); } -void vectorNotIn(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorNotIn(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_NOT_IN); } -void vectorLike(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorLike(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_LIKE); } -void vectorNotLike(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorNotLike(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_NOT_LIKE); } -void vectorMatch(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorMatch(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_MATCH); } -void vectorNotMatch(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorNotMatch(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_NMATCH); } -void vectorIsNull(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorIsNull(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; bool res = false; @@ -1086,7 +1086,7 @@ void vectorIsNull(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, } } -void vectorNotNull(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) { +void vectorNotNull(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; bool res = false; @@ -1143,9 +1143,9 @@ _bin_scalar_fn_t getBinScalarOperatorFn(int32_t binFunctionId) { return vectorMatch; case OP_TYPE_NMATCH: return vectorNotMatch; - case OP_TYPE_ISNULL: + case OP_TYPE_IS_NULL: return vectorIsNull; - case OP_TYPE_NOTNULL: + case OP_TYPE_IS_NOT_NULL: return vectorNotNull; case OP_TYPE_BIT_AND: return vectorBitAnd; diff --git a/source/libs/function/src/tscalar.c b/source/libs/function/src/tscalar.c index e374ce2734..df61d628b3 100644 --- a/source/libs/function/src/tscalar.c +++ b/source/libs/function/src/tscalar.c @@ -2,79 +2,326 @@ #include "tscalar.h" int32_t sclGetOperatorParamNum(EOperatorType type) { - if (OP_TYPE_ISNULL == type || OP_TYPE_NOTNULL == type) { + if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type) { return 1; } return 2; } -int32_t sclPrepareFunctionParams(SScalarFuncParam **pParams, SNodeList* pParameterList) { - *pParams = calloc(pParameterList->length, sizeof(SScalarFuncParam)); +void sclFreeRes(SHashObj *res) { + SScalarParam *p = NULL; + void *pIter = taosHashIterate(res, NULL); + while (pIter) { + p = (SScalarParam *)pIter; + + if (p) { + tfree(p->data); + } + + pIter = taosHashIterate(res, pIter); + } + + taosHashCleanup(res); +} + +void sclFreeParam(SScalarParam *param) { + tfree(param->data); +} + +int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) { + switch (nodeType(node)) { + case QUERY_NODE_VALUE: { + SValueNode *valueNode = (SValueNode *)node; + param->data = nodesGetValueFromNode(valueNode); + param->num = 1; + param->type = valueNode->node.resType.type; + param->bytes = valueNode->node.resType.bytes; + + break; + } + case QUERY_NODE_COLUMN_REF: { + if (NULL == ctx) { + sclError("invalid node type for constant calculating, type:%d, ctx:%p", nodeType(node), ctx); + SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + SColumnRef *ref = (SColumnRef *)node; + if (ref->slotId >= taosArrayGetSize(ctx->pSrc->pDataBlock)) { + sclError("column ref slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, taosArrayGetSize(ctx->pSrc->pDataBlock)); + SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + SColumnInfoData *columnData = (SColumnInfoData *)taosArrayGet(ctx->pSrc->pDataBlock, ref->slotId); + param->data = columnData->pData; + param->num = ctx->pSrc->info.rows; + param->type = columnData->info.type; + param->bytes = columnData->info.bytes; + + break; + } + case QUERY_NODE_LOGIC_CONDITION: + case QUERY_NODE_OPERATOR: { + if (NULL == ctx) { + sclError("invalid node type for constant calculating, type:%d, ctx:%p", nodeType(node), ctx); + SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &node, POINTER_BYTES); + if (NULL == res) { + sclError("no result for node, type:%d, node:%p", nodeType(node), node); + SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + *param = *res; + + break; + } + } + + if (param->num > *rowNum) { + if (1 != param->num) && (1 < *rowNum) { + sclError("different row nums, rowNum:%d, newRowNum:%d", *rowNum, param->num); + SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + *rowNum = param->num; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t sclParamMoveNext(SScalarParam *params, int32_t num) { + SScalarParam *param = NULL; + + for (int32_t i = 0; i < num; ++i) { + param = params + i; + + if (1 == param->num) { + continue; + } + + if (IS_VAR_DATA_TYPE(param->type)) { + param->data = (char *)(param->data) + varDataTLen(param->data); + } else { + param->data = (char *)(param->data) + tDataTypes[param->type].bytes; + } + } + + return TSDB_CODE_SUCCESS; +} + +int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *rowNum) { + int32_t code = 0; + *pParams = calloc(pParamList->length, sizeof(SScalarParam)); if (NULL == *pParams) { - sclError("calloc %d failed", pParameterList->length * sizeof(SScalarFuncParam)); - return TSDB_CODE_QRY_OUT_OF_MEMORY; + sclError("calloc %d failed", pParamList->length * sizeof(SScalarParam)); + SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SListCell *cell = pParameterList->pHead; - for (int32_t i = 0; i < pParameterList->length; ++i) { + SListCell *cell = pParamList->pHead; + for (int32_t i = 0; i < pParamList->length; ++i) { if (NULL == cell || NULL == cell->pNode) { sclError("invalid cell, cell:%p, pNode:%p", cell, cell->pNode); - tfree(*pParams); - return TSDB_CODE_QRY_INVALID_INPUT; + SCL_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - if (QUERY_NODE_VALUE != nodeType(cell->pNode)) { - sclError("invalid node type in cell, type:%d", nodeType(cell->pNode)); - tfree(*pParams); - return TSDB_CODE_QRY_APP_ERROR; - } + SCL_ERR_JRET(sclInitParam(cell->pNode, &pParams[i], ctx, rowNum)); + + cell = cell->pNext; + } - SValueNode *valueNode = (SValueNode *)cell->pNode; - pParams[i].data = nodesGetValueFromNode(valueNode); - pParams[i].num = 1; - pParams[i].type = valueNode->node.resType.type; - pParams[i].bytes = valueNode->node.resType.bytes; + return TSDB_CODE_SUCCESS; - cell = cell->pNext; +_return: + + tfree(*pParams); + SCL_RET(code); +} + +int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScalarCtx *ctx, int32_t *rowNum) { + int32_t code = 0; + int32_t paramNum = sclGetOperatorParamNum(node->opType); + if (NULL == node->pLeft || (paramNum == 2 && NULL == node->pRight)) { + sclError("invalid operation node, left:%p, right:%p", node->pLeft, node->pRight); + SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + *pParams = calloc(paramNum, sizeof(SScalarParam)); + if (NULL == *pParams) { + sclError("calloc %d failed", paramNum * sizeof(SScalarParam)); + SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SCL_ERR_JRET(sclInitParam(node->pLeft, &pParams[0], ctx, rowNum)); + if (paramNum > 1) { + SCL_ERR_JRET(sclInitParam(node->pRight, &pParams[1], ctx, rowNum)); } return TSDB_CODE_SUCCESS; + +_return: + + tfree(*pParams); + SCL_RET(code); } -EDealRes sclRewriteFunction(SNode** pNode, void* pContext) { - SFunctionNode *node = (SFunctionNode *)*pNode; + +int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) { if (NULL == node->pParameterList || node->pParameterList->length <= 0) { sclError("invalid function parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0); - *(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT; - return DEAL_RES_ERROR; + SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } SScalarFuncExecFuncs ffpSet = {0}; int32_t code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet); if (code) { sclError( "fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); - *(int32_t *)pContext = code; - return DEAL_RES_ERROR; + SCL_ERR_RET(code); } - SScalarFuncParam *input = NULL; - if (sclPrepareFunctionParams(&input, node->pParameterList)) { - return DEAL_RES_ERROR; + SScalarParam *params = NULL; + int32_t rowNum = 0; + SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, &rowNum)); + + output->type = node->node.resType.type; + output->data = calloc(rowNum, sizeof(tDataTypes[output->type].bytes)); + if (NULL == output->data) { + sclError("calloc %d failed", (int32_t)rowNum * sizeof(tDataTypes[output->type].bytes)); + SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + for (int32_t i = 0; i < rowNum; ++i) { + code = (*ffpSet.process)(params, node->pParameterList->length, output); + if (code) { + sclError( "scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); + SCL_ERR_JRET(code); + } + + sclParamMoveNext(output, 1); + sclParamMoveNext(params, node->pParameterList->length); + } + + return TSDB_CODE_SUCCESS; + +_return: + + tfree(params); + SCL_RET(code); +} + + +int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *output) { + if (NULL == node->pParameterList || node->pParameterList->length <= 0) { + sclError("invalid logic parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0); + SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + if (TSDB_DATA_TYPE_BOOL != node->node.resType.type) { + sclError("invalid logic resType, type:%d", node->node.resType.type); + SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SScalarFuncParam output = {0}; + if (LOGIC_COND_TYPE_NOT == node->condType && node->pParameterList->length > 1) { + sclError("invalid NOT operation parameter number, paramNum:%d", node->pParameterList->length); + SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + SScalarParam *params = NULL; + int32_t rowNum = 0; + int32_t code = 0; - code = (*ffpSet.process)(input, node->pParameterList->length, &output); - if (code) { - sclError( "scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); - *(int32_t *)pContext = code; + SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, &rowNum)); + + output->type = node->node.resType.type; + output->data = calloc(rowNum, sizeof(bool)); + if (NULL == output->data) { + sclError("calloc %d failed", (int32_t)rowNum * sizeof(bool)); + SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + bool value = false; + + for (int32_t i = 0; i < rowNum; ++i) { + for (int32_t m = 0; m < node->pParameterList->length; ++m) { + GET_TYPED_DATA(value, bool, params[m].type, params[m].data); + + if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) { + break; + } else if (LOGIC_COND_TYPE_OR == node->condType && value) { + break; + } else if (LOGIC_COND_TYPE_NOT == node->condType) { + value = !value; + } + } + + *(bool *)output->data = value; + + sclParamMoveNext(output, 1); + sclParamMoveNext(params, node->pParameterList->length); + } + + return TSDB_CODE_SUCCESS; + +_return: + + tfree(params); + CTG_RET(code); +} + +int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *output) { + SScalarParam *params = NULL; + int32_t rowNum = 0; + int32_t code = 0; + + SCL_ERR_RET(sclInitOperatorParams(¶ms, node, ctx, &rowNum)); + + output->type = node->node.resType.type; + output->data = calloc(rowNum, sizeof(tDataTypes[output->type].bytes)); + if (NULL == output->data) { + sclError("calloc %d failed", (int32_t)rowNum * sizeof(tDataTypes[output->type].bytes)); + SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType); + + int32_t paramNum = sclGetOperatorParamNum(node->opType); + SScalarParam* pLeft = ¶ms[0]; + SScalarParam* pRight = paramNum > 1 ? ¶ms[1] : NULL; + + for (int32_t i = 0; i < rowNum; ++i) { + + OperatorFn(pLeft, pRight, output->data, TSDB_ORDER_ASC); + + sclParamMoveNext(output, 1); + sclParamMoveNext(pLeft, 1); + if (pRight) { + sclParamMoveNext(pRight, 1); + } + } + + return TSDB_CODE_SUCCESS; + +_return: + + tfree(params); + CTG_RET(code); +} + + +EDealRes sclRewriteFunction(SNode** pNode, void* pContext) { + SFunctionNode *node = (SFunctionNode *)*pNode; + SScalarParam output = {0}; + + *(int32_t *)pContext = sclExecFuncion(node, NULL, &output); + if (*(int32_t *)pContext) { return DEAL_RES_ERROR; } SValueNode *res = nodesMakeNode(QUERY_NODE_VALUE); if (NULL == res) { sclError("make value node failed"); + sclFreeParam(&output); *(int32_t *)pContext = TSDB_CODE_QRY_OUT_OF_MEMORY; return DEAL_RES_ERROR; } @@ -86,107 +333,66 @@ EDealRes sclRewriteFunction(SNode** pNode, void* pContext) { nodesDestroyNode(*pNode); *pNode = (SNode*)res; - tfree(output.data); + sclFreeParam(&output); return DEAL_RES_CONTINUE; } EDealRes sclRewriteLogic(SNode** pNode, void* pContext) { SLogicConditionNode *node = (SLogicConditionNode *)*pNode; - if (NULL == node->pParameterList || node->pParameterList->length <= 0) { - sclError("invalid logic parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0); - *(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT; - return DEAL_RES_ERROR; - } + SScalarParam output = {0}; - if (LOGIC_COND_TYPE_NOT == node->condType && node->pParameterList->length > 1) { - sclError("invalid NOT operation parameter number, paramNum:%d", node->pParameterList->length); - *(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT; + *(int32_t *)pContext = sclExecLogic(node, NULL, &output); + if (*(int32_t *)pContext) { return DEAL_RES_ERROR; } - bool value = false; - SListCell *cell = node->pParameterList->pHead; - for (int32_t i = 0; i < node->pParameterList->length; ++i) { - if (NULL == cell || NULL == cell->pNode) { - sclError("invalid cell, cell:%p, pNode:%p", cell, cell->pNode); - return TSDB_CODE_QRY_INVALID_INPUT; - } - - if (QUERY_NODE_VALUE != nodeType(cell->pNode)) { - sclError("invalid node type in cell, type:%d", nodeType(cell->pNode)); - return TSDB_CODE_QRY_APP_ERROR; - } - - SValueNode *valueNode = (SValueNode *)cell->pNode; - GET_TYPED_DATA(value, bool, valueNode->node.resType.type, nodesGetValueFromNode(valueNode)); - if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) { - break; - } else if (LOGIC_COND_TYPE_OR == node->condType && value) { - break; - } else if (LOGIC_COND_TYPE_NOT == node->condType) { - value = !value; - } - - cell = cell->pNext; - } - SValueNode *res = nodesMakeNode(QUERY_NODE_VALUE); if (NULL == res) { sclError("make value node failed"); + sclFreeParam(&output); *(int32_t *)pContext = TSDB_CODE_QRY_OUT_OF_MEMORY; return DEAL_RES_ERROR; } res->node.resType = node->node.resType; - SET_TYPED_DATA(nodesGetValueFromNode(res), res->node.resType.type, value); + SET_TYPED_DATA(nodesGetValueFromNode(res), res->node.resType.type, output.data); nodesDestroyNode(*pNode); *pNode = (SNode*)res; + sclFreeParam(&output); + return DEAL_RES_CONTINUE; } EDealRes sclRewriteOperator(SNode** pNode, void* pContext) { - SOperatorNode *oper = (SOperatorNode *)*pNode; - int32_t paramNum = sclGetOperatorParamNum(oper->opType); - if (NULL == oper->pLeft || (paramNum == 2 && NULL == oper->pRight)) { - sclError("invalid operation node, left:%p, right:%p", oper->pLeft, oper->pRight); - *(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT; - return DEAL_RES_ERROR; - } + SOperatorNode *node = (SOperatorNode *)*pNode; + SScalarParam output = {0}; - if (QUERY_NODE_VALUE != nodeType(oper->pLeft) || (paramNum == 2 && QUERY_NODE_VALUE != nodeType(oper->pRight))) { - sclError("invalid operation node, leftType:%d, rightType:%d", nodeType(oper->pLeft), oper->pRight ? nodeType(oper->pRight) : 0); - *(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT; + *(int32_t *)pContext = sclExecOperator(node, NULL, &output); + if (*(int32_t *)pContext) { return DEAL_RES_ERROR; } SValueNode *res = nodesMakeNode(QUERY_NODE_VALUE); if (NULL == res) { - sclError("make value node failed"); + sclError("make value node failed"); + sclFreeParam(&output); *(int32_t *)pContext = TSDB_CODE_QRY_OUT_OF_MEMORY; return DEAL_RES_ERROR; } - res->node.resType = oper->node.resType; + res->node.resType = node->node.resType; - SValueNode *leftValue = (SValueNode *)oper->pLeft; - SValueNode *rightValue = (SValueNode *)oper->pRight; - - SScalarFuncParam leftParam = {0}, rightParam = {0}; - _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(oper->opType); - setScalarFuncParam(&leftParam, leftValue->node.resType.type, 0, nodesGetValueFromNode(leftValue), 1); - if (2 == paramNum) { - setScalarFuncParam(&rightParam, rightValue->node.resType.type, 0, nodesGetValueFromNode(rightValue), 1); - } - - OperatorFn(&leftParam, &rightParam, nodesGetValueFromNode(res), TSDB_ORDER_ASC); + SET_TYPED_DATA(nodesGetValueFromNode(res), res->node.resType.type, output.data); nodesDestroyNode(*pNode); *pNode = (SNode*)res; + sclFreeParam(&output); + return DEAL_RES_CONTINUE; } @@ -204,73 +410,99 @@ EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) { return sclRewriteLogic(pNode, pContext); } - if (QUERY_NODE_OPERATOR != nodeType(*pNode)) { - sclError("invalid node type for calculating constants, type:%d", ); - *(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT; - return DEAL_RES_ERROR; + if (QUERY_NODE_OPERATOR == nodeType(*pNode)) { + return sclRewriteOperator(pNode, pContext); } - return sclRewriteOperator(pNode, pContext); + sclError("invalid node type for calculating constants, type:%d", nodeType(*pNode)); + + *(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT; + + return DEAL_RES_ERROR; } -EDealRes sclCalculate(SNode** pNode, void* pContext) { - if (QUERY_NODE_VALUE == nodeType(*pNode)) { - return DEAL_RES_CONTINUE; + +EDealRes sclWalkFunction(SNode** pNode, void* pContext) { + SScalarCtx *ctx = (SScalarCtx *)pContext; + SFunctionNode *node = (SFunctionNode *)*pNode; + SScalarParam output = {0}; + + ctx->code = sclExecFuncion(node, ctx, &output); + if (ctx->code) { + return DEAL_RES_ERROR; } - if (QUERY_NODE_FUNCTION == nodeType(*pNode)) { - return sclCalculateFunction(pNode, pContext); + if (taosHashPut(ctx->pRes, pNode, POINTER_BYTES, &output, sizeof(output))) { + ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return DEAL_RES_ERROR; } - if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) { - return sclCalculateLogic(pNode, pContext); + return DEAL_RES_CONTINUE; +} + + +EDealRes sclWalkLogic(SNode** pNode, void* pContext) { + SScalarCtx *ctx = (SScalarCtx *)pContext; + SLogicConditionNode *node = (SLogicConditionNode *)*pNode; + SScalarParam output = {0}; + + ctx->code = sclExecLogic(node, ctx, &output); + if (ctx->code) { + return DEAL_RES_ERROR; } - if (QUERY_NODE_OPERATOR != nodeType(*pNode)) { - sclError("invalid node type for calculating constants, type:%d", ); - *(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT; + if (taosHashPut(ctx->pRes, pNode, POINTER_BYTES, &output, sizeof(output))) { + ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; return DEAL_RES_ERROR; - } + } + + return DEAL_RES_CONTINUE; +} + + +EDealRes sclWalkOperator(SNode** pNode, void* pContext) { + SScalarCtx *ctx = (SScalarCtx *)pContext; + SOperatorNode *node = (SOperatorNode *)*pNode; + SScalarParam output = {0}; - SOperatorNode *oper = (SOperatorNode *)*pNode; - int32_t paramNum = sclGetOperatorParamNum(oper->opType); - if (NULL == oper->pLeft || (paramNum == 2 && NULL == oper->pRight)) { - sclError("invalid operation node, left:%p, right:%p", oper->pLeft, oper->pRight); - *(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT; + ctx->code = sclExecOperator(node, ctx, &output); + if (ctx->code) { return DEAL_RES_ERROR; } - if (QUERY_NODE_VALUE != nodeType(oper->pLeft) || (paramNum == 2 && QUERY_NODE_VALUE != nodeType(oper->pRight))) { - sclError("invalid operation node, leftType:%d, rightType:%d", nodeType(oper->pLeft), oper->pRight ? nodeType(oper->pRight) : 0); - *(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT; + if (taosHashPut(ctx->pRes, pNode, POINTER_BYTES, &output, sizeof(output))) { + ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; return DEAL_RES_ERROR; } - SValueNode *res = nodesMakeNode(QUERY_NODE_VALUE); - if (NULL == res) { - sclError("make value node failed"); - *(int32_t *)pContext = TSDB_CODE_QRY_OUT_OF_MEMORY; - return DEAL_RES_ERROR; + return DEAL_RES_CONTINUE; +} + + +EDealRes sclCalcWalker(SNode** pNode, void* pContext) { + if (QUERY_NODE_VALUE == nodeType(*pNode)) { + return DEAL_RES_CONTINUE; } - res->node.resType = oper->node.resType; + if (QUERY_NODE_FUNCTION == nodeType(*pNode)) { + return sclWalkFunction(pNode, pContext); + } - SValueNode *leftValue = (SValueNode *)oper->pLeft; - SValueNode *rightValue = (SValueNode *)oper->pRight; + if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) { + return sclWalkLogic(pNode, pContext); + } - SScalarFuncParam leftParam = {0}, rightParam = {0}; - _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(oper->opType); - setScalarFuncParam(&leftParam, leftValue->node.resType.type, 0, nodesGetValueFromNode(leftValue), 1); - if (2 == paramNum) { - setScalarFuncParam(&rightParam, rightValue->node.resType.type, 0, nodesGetValueFromNode(rightValue), 1); + if (QUERY_NODE_OPERATOR == nodeType(*pNode)) { + return sclWalkOperator(pNode, pContext); } - - OperatorFn(&leftParam, &rightParam, nodesGetValueFromNode(res), TSDB_ORDER_ASC); - nodesDestroyNode(*pNode); - *pNode = (SNode*)res; + sclError("invalid node type for calculating constants, type:%d", nodeType(*pNode)); - return DEAL_RES_CONTINUE; + SScalarCtx *ctx = (SScalarCtx *)pContext; + + ctx->code = TSDB_CODE_QRY_INVALID_INPUT; + + return DEAL_RES_ERROR; } @@ -294,23 +526,39 @@ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) { SCL_RET(code); } -int32_t scalarCalculate(SNode *pNode, SSDataBlock *pSrc, SSDataBlock *pDst) { - if (NULL == pNode) { +int32_t scalarCalculate(SNode *pNode, SSDataBlock *pSrc, SScalarParam *pDst) { + if (NULL == pNode || NULL == pSrc || NULL == pDst) { SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } int32_t code = 0; + SScalarCtx ctx = {.code = 0, .pSrc = pSrc}; + + ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + if (NULL == ctx.pRes) { + sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM); + SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } - nodesRewriteNodePostOrder(&pNode, sclCalculate, (void *)&code); + nodesWalkNodePostOrder(&pNode, sclCalcWalker, (void *)&ctx); - if (code) { + if (ctx.code) { nodesDestroyNode(pNode); - SCL_ERR_RET(code); + sclFreeRes(ctx.pRes); + SCL_ERR_RET(ctx.code); } - *pRes = pNode; + SScalarParam *res = taosHashGet(ctx.pRes, &pNode, POINTER_BYTES); + if (NULL == res) { + sclError("no res for calculating, node:%d, type:%d", pNode, nodeType(pNode)); + SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + *pDst = *res; - SCL_RET(code); + nodesDestroyNode(pNode); + + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/function/src/tscalarfunction.c b/source/libs/function/src/tscalarfunction.c index 50aaa07757..df8a654c96 100644 --- a/source/libs/function/src/tscalarfunction.c +++ b/source/libs/function/src/tscalarfunction.c @@ -2,13 +2,13 @@ #include "tbinoperator.h" #include "tunaryoperator.h" -static void assignBasicParaInfo(struct SScalarFuncParam* dst, const struct SScalarFuncParam* src) { +static void assignBasicParaInfo(struct SScalarParam* dst, const struct SScalarParam* src) { dst->type = src->type; dst->bytes = src->bytes; dst->num = src->num; } -static void tceil(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) { +static void tceil(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) { assignBasicParaInfo(pOutput, pLeft); assert(numOfInput == 1); @@ -34,7 +34,7 @@ static void tceil(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFun } } -static void tfloor(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) { +static void tfloor(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) { assignBasicParaInfo(pOutput, pLeft); assert(numOfInput == 1); @@ -62,7 +62,7 @@ static void tfloor(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFu } } -static void _tabs(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) { +static void _tabs(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) { assignBasicParaInfo(pOutput, pLeft); assert(numOfInput == 1); @@ -120,7 +120,7 @@ static void _tabs(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFun } } -static void tround(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) { +static void tround(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) { assignBasicParaInfo(pOutput, pLeft); assert(numOfInput == 1); @@ -146,7 +146,7 @@ static void tround(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFu } } -static void tlength(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) { +static void tlength(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) { assert(numOfInput == 1); int64_t* out = (int64_t*) pOutput->data; @@ -157,7 +157,7 @@ static void tlength(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarF } } -static void tconcat(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) { +static void tconcat(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) { assert(numOfInput > 0); int32_t rowLen = 0; @@ -189,11 +189,11 @@ static void tconcat(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarF } } -static void tltrim(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) { +static void tltrim(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) { } -static void trtrim(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) { +static void trtrim(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) { } @@ -262,7 +262,7 @@ static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOf } } -static void setScalarFuncParam(SScalarFuncParam* param, int32_t type, int32_t bytes, void* pInput, int32_t numOfRows) { +static void setScalarFuncParam(SScalarParam* param, int32_t type, int32_t bytes, void* pInput, int32_t numOfRows) { param->bytes = bytes; param->type = type; param->num = numOfRows; -- GitLab