提交 76d5298b 编写于 作者: D dapan1121

feature/qnode

上级 bffdb7a8
......@@ -226,13 +226,13 @@ typedef struct SAggFunctionInfo {
int32_t (*dataReqFunc)(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId);
} SAggFunctionInfo;
struct SScalarFuncParam;
struct SScalarParam;
typedef struct SScalarFunctionInfo {
char name[FUNCTIONS_NAME_MAX_LENGTH];
int8_t type; // scalar function or aggregation function
uint32_t functionId; // index of scalar function
void (*process)(struct SScalarFuncParam* pOutput, size_t numOfInput, const struct SScalarFuncParam *pInput);
void (*process)(struct SScalarParam* pOutput, size_t numOfInput, const struct SScalarParam *pInput);
} SScalarFunctionInfo;
typedef struct SMultiFunctionsDesc {
......
......@@ -117,7 +117,7 @@ typedef struct SFuncExecFuncs {
FExecFinalize finalize;
} SFuncExecFuncs;
typedef int32_t (*FScalarExecProcess)(SScalarFuncParam *pInput, int32_t inputNum, SScalarFuncParam *pOutput);
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
typedef struct SScalarFuncExecFuncs {
FScalarExecProcess process;
......
......@@ -104,8 +104,6 @@ typedef enum EOperatorType {
OP_TYPE_NMATCH,
OP_TYPE_IS_NULL,
OP_TYPE_IS_NOT_NULL,
OP_TYPE_BIT_AND,
OP_TYPE_BIT_OR,
// json operator
OP_TYPE_JSON_GET_VALUE,
......
......@@ -22,7 +22,7 @@ extern "C" {
#include "tscalarfunction.h"
typedef void (*_bin_scalar_fn_t)(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *output, int32_t order);
typedef void (*_bin_scalar_fn_t)(SScalarParam* pLeft, SScalarParam* pRight, void *output, int32_t order);
_bin_scalar_fn_t getBinScalarOperatorFn(int32_t binOperator);
bool isBinaryStringOp(int32_t op);
......
......@@ -19,9 +19,13 @@
extern "C" {
#endif
typedef struct SScalarCalcContext {
typedef struct SScalarCtx {
int32_t code;
SSDataBlock *pSrc;
SHashObj *pRes; /* element is SScalarParam */
} SScalarCtx;
} SScalarCalcContext;
#define SCL_DEFAULT_OP_NUM 10
#define sclFatal(...) qFatal(__VA_ARGS__)
#define sclError(...) qError(__VA_ARGS__)
......
......@@ -21,12 +21,12 @@ extern "C" {
#include "function.h"
typedef struct SScalarFuncParam {
typedef struct SScalarParam {
void* data;
int32_t num;
int32_t type;
int32_t bytes;
} SScalarFuncParam;
} SScalarParam;
typedef struct SScalarFunctionSupport {
struct SExprInfo *pExprInfo;
......@@ -39,7 +39,7 @@ typedef struct SScalarFunctionSupport {
extern struct SScalarFunctionInfo scalarFunc[8];
int32_t evaluateExprNodeTree(tExprNode* pExprs, int32_t numOfRows, SScalarFuncParam* pOutput,
int32_t evaluateExprNodeTree(tExprNode* pExprs, int32_t numOfRows, SScalarParam* pOutput,
void* param, char* (*getSourceDataBlock)(void*, const char*, int32_t));
......
......@@ -22,7 +22,7 @@ extern "C" {
#include "tscalarfunction.h"
typedef void (*_unary_scalar_fn_t)(SScalarFuncParam *pLeft, SScalarFuncParam* pOutput);
typedef void (*_unary_scalar_fn_t)(SScalarParam *pLeft, SScalarParam* pOutput);
_unary_scalar_fn_t getUnaryScalarOperatorFn(int32_t binOperator);
#ifdef __cplusplus
......
......@@ -3240,7 +3240,7 @@ static void arithmetic_function(SqlFunctionCtx *pCtx) {
GET_RES_INFO(pCtx)->numOfRes += pCtx->size;
SScalarFunctionSupport *pSup = (SScalarFunctionSupport *)pCtx->param[1].pz;
SScalarFuncParam output = {0};
SScalarParam output = {0};
output.data = pCtx->pOutput;
//evaluateExprNodeTree(pSup->pExprInfo->pExpr, pCtx->size, &output, pSup, getArithColumnData);
......
......@@ -245,7 +245,7 @@ _getValueAddr_fn_t getVectorValueAddrFn(int32_t srcType) {
}
int32_t vectorConvertImpl(SScalarFuncParam* pIn, SScalarFuncParam* pOut) {
int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut) {
int16_t inType = pIn->type;
int16_t inBytes = pIn->bytes;
char *input = pIn->data;
......@@ -512,13 +512,13 @@ int8_t gConvertTypes[TSDB_DATA_TYPE_BLOB+1][TSDB_DATA_TYPE_BLOB+1] = {
/*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
};
int32_t vectorConvert(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, SScalarFuncParam* pLeftOut, SScalarFuncParam* pRightOut) {
int32_t vectorConvert(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam* pLeftOut, SScalarParam* pRightOut) {
if (pLeft->type == pRight->type) {
return TSDB_CODE_SUCCESS;
}
SScalarFuncParam *param1 = NULL, *paramOut1 = NULL;
SScalarFuncParam *param2 = NULL, *paramOut2 = NULL;
SScalarParam *param1 = NULL, *paramOut1 = NULL;
SScalarParam *param2 = NULL, *paramOut2 = NULL;
int32_t code = 0;
if (pLeft->type < pRight->type) {
......@@ -575,7 +575,7 @@ int32_t vectorConvert(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, SScalar
return TSDB_CODE_SUCCESS;
}
void vectorAdd(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorAdd(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1;
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
......@@ -614,7 +614,7 @@ void vectorAdd(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int
}
}
void vectorSub(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorSub(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1;
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
......@@ -651,7 +651,7 @@ void vectorSub(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int
}
}
}
void vectorMultiply(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorMultiply(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1;
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
......@@ -690,7 +690,7 @@ void vectorMultiply(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out
}
}
void vectorDivide(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorDivide(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1;
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
......@@ -736,7 +736,7 @@ void vectorDivide(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out,
}
}
void vectorRemainder(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorRemainder(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1;
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
......@@ -808,7 +808,7 @@ void vectorRemainder(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *ou
}
}
void vectorConcat(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorConcat(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
int32_t len = pLeft->bytes + pRight->bytes;
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1;
......@@ -859,7 +859,7 @@ void vectorConcat(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out,
}
void vectorBitAnd(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorBitAnd(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1;
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
......@@ -898,7 +898,7 @@ void vectorBitAnd(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out,
}
}
void vectorBitOr(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorBitOr(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1;
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
......@@ -938,7 +938,7 @@ void vectorBitOr(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, i
}
void vectorCompareImpl(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord, int32_t optr) {
void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord, int32_t optr) {
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1;
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
int8_t funcIdx = filterGetCompFuncIdx(pLeft->type, optr);
......@@ -993,14 +993,14 @@ void vectorCompareImpl(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *
}
}
void vectorCompare(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord, int32_t optr) {
SScalarFuncParam pLeftOut = {0};
SScalarFuncParam pRightOut = {0};
void vectorCompare(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord, int32_t optr) {
SScalarParam pLeftOut = {0};
SScalarParam pRightOut = {0};
vectorConvert(pLeft, pRight, &pLeftOut, &pRightOut);
SScalarFuncParam *param1 = NULL;
SScalarFuncParam *param2 = NULL;
SScalarParam *param1 = NULL;
SScalarParam *param2 = NULL;
int32_t type = 0;
if (pLeftOut->type) {
......@@ -1018,55 +1018,55 @@ void vectorCompare(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out,
vectorCompareImpl(pLeftOut, pRightOut, out, _ord, TSDB_RELATION_GREATER);
}
void vectorGreater(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorGreater(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_GREATER);
}
void vectorGreaterEqual(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorGreaterEqual(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_GREATER_EQUAL);
}
void vectorLower(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorLower(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_LESS);
}
void vectorLowerEqual(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorLowerEqual(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_LESS_EQUAL);
}
void vectorEqual(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorEqual(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_EQUAL);
}
void vectorNotEqual(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorNotEqual(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_NOT_EQUAL);
}
void vectorIn(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorIn(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_IN);
}
void vectorNotIn(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorNotIn(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_NOT_IN);
}
void vectorLike(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorLike(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_LIKE);
}
void vectorNotLike(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorNotLike(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_NOT_LIKE);
}
void vectorMatch(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorMatch(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_MATCH);
}
void vectorNotMatch(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorNotMatch(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
vectorCompare(pLeft, pRight, out, _ord, TSDB_RELATION_NMATCH);
}
void vectorIsNull(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorIsNull(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1;
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
bool res = false;
......@@ -1086,7 +1086,7 @@ void vectorIsNull(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out,
}
}
void vectorNotNull(SScalarFuncParam* pLeft, SScalarFuncParam* pRight, void *out, int32_t _ord) {
void vectorNotNull(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->num, pRight->num) - 1;
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
bool res = false;
......@@ -1143,9 +1143,9 @@ _bin_scalar_fn_t getBinScalarOperatorFn(int32_t binFunctionId) {
return vectorMatch;
case OP_TYPE_NMATCH:
return vectorNotMatch;
case OP_TYPE_ISNULL:
case OP_TYPE_IS_NULL:
return vectorIsNull;
case OP_TYPE_NOTNULL:
case OP_TYPE_IS_NOT_NULL:
return vectorNotNull;
case OP_TYPE_BIT_AND:
return vectorBitAnd;
......
......@@ -2,79 +2,326 @@
#include "tscalar.h"
int32_t sclGetOperatorParamNum(EOperatorType type) {
if (OP_TYPE_ISNULL == type || OP_TYPE_NOTNULL == type) {
if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type) {
return 1;
}
return 2;
}
int32_t sclPrepareFunctionParams(SScalarFuncParam **pParams, SNodeList* pParameterList) {
*pParams = calloc(pParameterList->length, sizeof(SScalarFuncParam));
void sclFreeRes(SHashObj *res) {
SScalarParam *p = NULL;
void *pIter = taosHashIterate(res, NULL);
while (pIter) {
p = (SScalarParam *)pIter;
if (p) {
tfree(p->data);
}
pIter = taosHashIterate(res, pIter);
}
taosHashCleanup(res);
}
void sclFreeParam(SScalarParam *param) {
tfree(param->data);
}
int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) {
switch (nodeType(node)) {
case QUERY_NODE_VALUE: {
SValueNode *valueNode = (SValueNode *)node;
param->data = nodesGetValueFromNode(valueNode);
param->num = 1;
param->type = valueNode->node.resType.type;
param->bytes = valueNode->node.resType.bytes;
break;
}
case QUERY_NODE_COLUMN_REF: {
if (NULL == ctx) {
sclError("invalid node type for constant calculating, type:%d, ctx:%p", nodeType(node), ctx);
SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
SColumnRef *ref = (SColumnRef *)node;
if (ref->slotId >= taosArrayGetSize(ctx->pSrc->pDataBlock)) {
sclError("column ref slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, taosArrayGetSize(ctx->pSrc->pDataBlock));
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SColumnInfoData *columnData = (SColumnInfoData *)taosArrayGet(ctx->pSrc->pDataBlock, ref->slotId);
param->data = columnData->pData;
param->num = ctx->pSrc->info.rows;
param->type = columnData->info.type;
param->bytes = columnData->info.bytes;
break;
}
case QUERY_NODE_LOGIC_CONDITION:
case QUERY_NODE_OPERATOR: {
if (NULL == ctx) {
sclError("invalid node type for constant calculating, type:%d, ctx:%p", nodeType(node), ctx);
SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &node, POINTER_BYTES);
if (NULL == res) {
sclError("no result for node, type:%d, node:%p", nodeType(node), node);
SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
*param = *res;
break;
}
}
if (param->num > *rowNum) {
if (1 != param->num) && (1 < *rowNum) {
sclError("different row nums, rowNum:%d, newRowNum:%d", *rowNum, param->num);
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
*rowNum = param->num;
}
return TSDB_CODE_SUCCESS;
}
int32_t sclParamMoveNext(SScalarParam *params, int32_t num) {
SScalarParam *param = NULL;
for (int32_t i = 0; i < num; ++i) {
param = params + i;
if (1 == param->num) {
continue;
}
if (IS_VAR_DATA_TYPE(param->type)) {
param->data = (char *)(param->data) + varDataTLen(param->data);
} else {
param->data = (char *)(param->data) + tDataTypes[param->type].bytes;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *rowNum) {
int32_t code = 0;
*pParams = calloc(pParamList->length, sizeof(SScalarParam));
if (NULL == *pParams) {
sclError("calloc %d failed", pParameterList->length * sizeof(SScalarFuncParam));
return TSDB_CODE_QRY_OUT_OF_MEMORY;
sclError("calloc %d failed", pParamList->length * sizeof(SScalarParam));
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SListCell *cell = pParameterList->pHead;
for (int32_t i = 0; i < pParameterList->length; ++i) {
SListCell *cell = pParamList->pHead;
for (int32_t i = 0; i < pParamList->length; ++i) {
if (NULL == cell || NULL == cell->pNode) {
sclError("invalid cell, cell:%p, pNode:%p", cell, cell->pNode);
tfree(*pParams);
return TSDB_CODE_QRY_INVALID_INPUT;
SCL_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (QUERY_NODE_VALUE != nodeType(cell->pNode)) {
sclError("invalid node type in cell, type:%d", nodeType(cell->pNode));
tfree(*pParams);
return TSDB_CODE_QRY_APP_ERROR;
}
SCL_ERR_JRET(sclInitParam(cell->pNode, &pParams[i], ctx, rowNum));
cell = cell->pNext;
}
SValueNode *valueNode = (SValueNode *)cell->pNode;
pParams[i].data = nodesGetValueFromNode(valueNode);
pParams[i].num = 1;
pParams[i].type = valueNode->node.resType.type;
pParams[i].bytes = valueNode->node.resType.bytes;
return TSDB_CODE_SUCCESS;
cell = cell->pNext;
_return:
tfree(*pParams);
SCL_RET(code);
}
int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScalarCtx *ctx, int32_t *rowNum) {
int32_t code = 0;
int32_t paramNum = sclGetOperatorParamNum(node->opType);
if (NULL == node->pLeft || (paramNum == 2 && NULL == node->pRight)) {
sclError("invalid operation node, left:%p, right:%p", node->pLeft, node->pRight);
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
*pParams = calloc(paramNum, sizeof(SScalarParam));
if (NULL == *pParams) {
sclError("calloc %d failed", paramNum * sizeof(SScalarParam));
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCL_ERR_JRET(sclInitParam(node->pLeft, &pParams[0], ctx, rowNum));
if (paramNum > 1) {
SCL_ERR_JRET(sclInitParam(node->pRight, &pParams[1], ctx, rowNum));
}
return TSDB_CODE_SUCCESS;
_return:
tfree(*pParams);
SCL_RET(code);
}
EDealRes sclRewriteFunction(SNode** pNode, void* pContext) {
SFunctionNode *node = (SFunctionNode *)*pNode;
int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
sclError("invalid function parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0);
*(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT;
return DEAL_RES_ERROR;
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SScalarFuncExecFuncs ffpSet = {0};
int32_t code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
if (code) {
sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
*(int32_t *)pContext = code;
return DEAL_RES_ERROR;
SCL_ERR_RET(code);
}
SScalarFuncParam *input = NULL;
if (sclPrepareFunctionParams(&input, node->pParameterList)) {
return DEAL_RES_ERROR;
SScalarParam *params = NULL;
int32_t rowNum = 0;
SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &rowNum));
output->type = node->node.resType.type;
output->data = calloc(rowNum, sizeof(tDataTypes[output->type].bytes));
if (NULL == output->data) {
sclError("calloc %d failed", (int32_t)rowNum * sizeof(tDataTypes[output->type].bytes));
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
for (int32_t i = 0; i < rowNum; ++i) {
code = (*ffpSet.process)(params, node->pParameterList->length, output);
if (code) {
sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
SCL_ERR_JRET(code);
}
sclParamMoveNext(output, 1);
sclParamMoveNext(params, node->pParameterList->length);
}
return TSDB_CODE_SUCCESS;
_return:
tfree(params);
SCL_RET(code);
}
int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *output) {
if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
sclError("invalid logic parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0);
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (TSDB_DATA_TYPE_BOOL != node->node.resType.type) {
sclError("invalid logic resType, type:%d", node->node.resType.type);
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SScalarFuncParam output = {0};
if (LOGIC_COND_TYPE_NOT == node->condType && node->pParameterList->length > 1) {
sclError("invalid NOT operation parameter number, paramNum:%d", node->pParameterList->length);
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SScalarParam *params = NULL;
int32_t rowNum = 0;
int32_t code = 0;
code = (*ffpSet.process)(input, node->pParameterList->length, &output);
if (code) {
sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
*(int32_t *)pContext = code;
SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &rowNum));
output->type = node->node.resType.type;
output->data = calloc(rowNum, sizeof(bool));
if (NULL == output->data) {
sclError("calloc %d failed", (int32_t)rowNum * sizeof(bool));
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
bool value = false;
for (int32_t i = 0; i < rowNum; ++i) {
for (int32_t m = 0; m < node->pParameterList->length; ++m) {
GET_TYPED_DATA(value, bool, params[m].type, params[m].data);
if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) {
break;
} else if (LOGIC_COND_TYPE_OR == node->condType && value) {
break;
} else if (LOGIC_COND_TYPE_NOT == node->condType) {
value = !value;
}
}
*(bool *)output->data = value;
sclParamMoveNext(output, 1);
sclParamMoveNext(params, node->pParameterList->length);
}
return TSDB_CODE_SUCCESS;
_return:
tfree(params);
CTG_RET(code);
}
int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *output) {
SScalarParam *params = NULL;
int32_t rowNum = 0;
int32_t code = 0;
SCL_ERR_RET(sclInitOperatorParams(&params, node, ctx, &rowNum));
output->type = node->node.resType.type;
output->data = calloc(rowNum, sizeof(tDataTypes[output->type].bytes));
if (NULL == output->data) {
sclError("calloc %d failed", (int32_t)rowNum * sizeof(tDataTypes[output->type].bytes));
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
_bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType);
int32_t paramNum = sclGetOperatorParamNum(node->opType);
SScalarParam* pLeft = &params[0];
SScalarParam* pRight = paramNum > 1 ? &params[1] : NULL;
for (int32_t i = 0; i < rowNum; ++i) {
OperatorFn(pLeft, pRight, output->data, TSDB_ORDER_ASC);
sclParamMoveNext(output, 1);
sclParamMoveNext(pLeft, 1);
if (pRight) {
sclParamMoveNext(pRight, 1);
}
}
return TSDB_CODE_SUCCESS;
_return:
tfree(params);
CTG_RET(code);
}
EDealRes sclRewriteFunction(SNode** pNode, void* pContext) {
SFunctionNode *node = (SFunctionNode *)*pNode;
SScalarParam output = {0};
*(int32_t *)pContext = sclExecFuncion(node, NULL, &output);
if (*(int32_t *)pContext) {
return DEAL_RES_ERROR;
}
SValueNode *res = nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) {
sclError("make value node failed");
sclFreeParam(&output);
*(int32_t *)pContext = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
......@@ -86,107 +333,66 @@ EDealRes sclRewriteFunction(SNode** pNode, void* pContext) {
nodesDestroyNode(*pNode);
*pNode = (SNode*)res;
tfree(output.data);
sclFreeParam(&output);
return DEAL_RES_CONTINUE;
}
EDealRes sclRewriteLogic(SNode** pNode, void* pContext) {
SLogicConditionNode *node = (SLogicConditionNode *)*pNode;
if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
sclError("invalid logic parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0);
*(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT;
return DEAL_RES_ERROR;
}
SScalarParam output = {0};
if (LOGIC_COND_TYPE_NOT == node->condType && node->pParameterList->length > 1) {
sclError("invalid NOT operation parameter number, paramNum:%d", node->pParameterList->length);
*(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT;
*(int32_t *)pContext = sclExecLogic(node, NULL, &output);
if (*(int32_t *)pContext) {
return DEAL_RES_ERROR;
}
bool value = false;
SListCell *cell = node->pParameterList->pHead;
for (int32_t i = 0; i < node->pParameterList->length; ++i) {
if (NULL == cell || NULL == cell->pNode) {
sclError("invalid cell, cell:%p, pNode:%p", cell, cell->pNode);
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (QUERY_NODE_VALUE != nodeType(cell->pNode)) {
sclError("invalid node type in cell, type:%d", nodeType(cell->pNode));
return TSDB_CODE_QRY_APP_ERROR;
}
SValueNode *valueNode = (SValueNode *)cell->pNode;
GET_TYPED_DATA(value, bool, valueNode->node.resType.type, nodesGetValueFromNode(valueNode));
if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) {
break;
} else if (LOGIC_COND_TYPE_OR == node->condType && value) {
break;
} else if (LOGIC_COND_TYPE_NOT == node->condType) {
value = !value;
}
cell = cell->pNext;
}
SValueNode *res = nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) {
sclError("make value node failed");
sclFreeParam(&output);
*(int32_t *)pContext = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
res->node.resType = node->node.resType;
SET_TYPED_DATA(nodesGetValueFromNode(res), res->node.resType.type, value);
SET_TYPED_DATA(nodesGetValueFromNode(res), res->node.resType.type, output.data);
nodesDestroyNode(*pNode);
*pNode = (SNode*)res;
sclFreeParam(&output);
return DEAL_RES_CONTINUE;
}
EDealRes sclRewriteOperator(SNode** pNode, void* pContext) {
SOperatorNode *oper = (SOperatorNode *)*pNode;
int32_t paramNum = sclGetOperatorParamNum(oper->opType);
if (NULL == oper->pLeft || (paramNum == 2 && NULL == oper->pRight)) {
sclError("invalid operation node, left:%p, right:%p", oper->pLeft, oper->pRight);
*(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT;
return DEAL_RES_ERROR;
}
SOperatorNode *node = (SOperatorNode *)*pNode;
SScalarParam output = {0};
if (QUERY_NODE_VALUE != nodeType(oper->pLeft) || (paramNum == 2 && QUERY_NODE_VALUE != nodeType(oper->pRight))) {
sclError("invalid operation node, leftType:%d, rightType:%d", nodeType(oper->pLeft), oper->pRight ? nodeType(oper->pRight) : 0);
*(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT;
*(int32_t *)pContext = sclExecOperator(node, NULL, &output);
if (*(int32_t *)pContext) {
return DEAL_RES_ERROR;
}
SValueNode *res = nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) {
sclError("make value node failed");
sclError("make value node failed");
sclFreeParam(&output);
*(int32_t *)pContext = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
res->node.resType = oper->node.resType;
res->node.resType = node->node.resType;
SValueNode *leftValue = (SValueNode *)oper->pLeft;
SValueNode *rightValue = (SValueNode *)oper->pRight;
SScalarFuncParam leftParam = {0}, rightParam = {0};
_bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(oper->opType);
setScalarFuncParam(&leftParam, leftValue->node.resType.type, 0, nodesGetValueFromNode(leftValue), 1);
if (2 == paramNum) {
setScalarFuncParam(&rightParam, rightValue->node.resType.type, 0, nodesGetValueFromNode(rightValue), 1);
}
OperatorFn(&leftParam, &rightParam, nodesGetValueFromNode(res), TSDB_ORDER_ASC);
SET_TYPED_DATA(nodesGetValueFromNode(res), res->node.resType.type, output.data);
nodesDestroyNode(*pNode);
*pNode = (SNode*)res;
sclFreeParam(&output);
return DEAL_RES_CONTINUE;
}
......@@ -204,73 +410,99 @@ EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
return sclRewriteLogic(pNode, pContext);
}
if (QUERY_NODE_OPERATOR != nodeType(*pNode)) {
sclError("invalid node type for calculating constants, type:%d", );
*(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT;
return DEAL_RES_ERROR;
if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
return sclRewriteOperator(pNode, pContext);
}
return sclRewriteOperator(pNode, pContext);
sclError("invalid node type for calculating constants, type:%d", nodeType(*pNode));
*(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT;
return DEAL_RES_ERROR;
}
EDealRes sclCalculate(SNode** pNode, void* pContext) {
if (QUERY_NODE_VALUE == nodeType(*pNode)) {
return DEAL_RES_CONTINUE;
EDealRes sclWalkFunction(SNode** pNode, void* pContext) {
SScalarCtx *ctx = (SScalarCtx *)pContext;
SFunctionNode *node = (SFunctionNode *)*pNode;
SScalarParam output = {0};
ctx->code = sclExecFuncion(node, ctx, &output);
if (ctx->code) {
return DEAL_RES_ERROR;
}
if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
return sclCalculateFunction(pNode, pContext);
if (taosHashPut(ctx->pRes, pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) {
return sclCalculateLogic(pNode, pContext);
return DEAL_RES_CONTINUE;
}
EDealRes sclWalkLogic(SNode** pNode, void* pContext) {
SScalarCtx *ctx = (SScalarCtx *)pContext;
SLogicConditionNode *node = (SLogicConditionNode *)*pNode;
SScalarParam output = {0};
ctx->code = sclExecLogic(node, ctx, &output);
if (ctx->code) {
return DEAL_RES_ERROR;
}
if (QUERY_NODE_OPERATOR != nodeType(*pNode)) {
sclError("invalid node type for calculating constants, type:%d", );
*(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT;
if (taosHashPut(ctx->pRes, pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
}
return DEAL_RES_CONTINUE;
}
EDealRes sclWalkOperator(SNode** pNode, void* pContext) {
SScalarCtx *ctx = (SScalarCtx *)pContext;
SOperatorNode *node = (SOperatorNode *)*pNode;
SScalarParam output = {0};
SOperatorNode *oper = (SOperatorNode *)*pNode;
int32_t paramNum = sclGetOperatorParamNum(oper->opType);
if (NULL == oper->pLeft || (paramNum == 2 && NULL == oper->pRight)) {
sclError("invalid operation node, left:%p, right:%p", oper->pLeft, oper->pRight);
*(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT;
ctx->code = sclExecOperator(node, ctx, &output);
if (ctx->code) {
return DEAL_RES_ERROR;
}
if (QUERY_NODE_VALUE != nodeType(oper->pLeft) || (paramNum == 2 && QUERY_NODE_VALUE != nodeType(oper->pRight))) {
sclError("invalid operation node, leftType:%d, rightType:%d", nodeType(oper->pLeft), oper->pRight ? nodeType(oper->pRight) : 0);
*(int32_t *)pContext = TSDB_CODE_QRY_INVALID_INPUT;
if (taosHashPut(ctx->pRes, pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
SValueNode *res = nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) {
sclError("make value node failed");
*(int32_t *)pContext = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
return DEAL_RES_CONTINUE;
}
EDealRes sclCalcWalker(SNode** pNode, void* pContext) {
if (QUERY_NODE_VALUE == nodeType(*pNode)) {
return DEAL_RES_CONTINUE;
}
res->node.resType = oper->node.resType;
if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
return sclWalkFunction(pNode, pContext);
}
SValueNode *leftValue = (SValueNode *)oper->pLeft;
SValueNode *rightValue = (SValueNode *)oper->pRight;
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) {
return sclWalkLogic(pNode, pContext);
}
SScalarFuncParam leftParam = {0}, rightParam = {0};
_bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(oper->opType);
setScalarFuncParam(&leftParam, leftValue->node.resType.type, 0, nodesGetValueFromNode(leftValue), 1);
if (2 == paramNum) {
setScalarFuncParam(&rightParam, rightValue->node.resType.type, 0, nodesGetValueFromNode(rightValue), 1);
if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
return sclWalkOperator(pNode, pContext);
}
OperatorFn(&leftParam, &rightParam, nodesGetValueFromNode(res), TSDB_ORDER_ASC);
nodesDestroyNode(*pNode);
*pNode = (SNode*)res;
sclError("invalid node type for calculating constants, type:%d", nodeType(*pNode));
return DEAL_RES_CONTINUE;
SScalarCtx *ctx = (SScalarCtx *)pContext;
ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
return DEAL_RES_ERROR;
}
......@@ -294,23 +526,39 @@ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
SCL_RET(code);
}
int32_t scalarCalculate(SNode *pNode, SSDataBlock *pSrc, SSDataBlock *pDst) {
if (NULL == pNode) {
int32_t scalarCalculate(SNode *pNode, SSDataBlock *pSrc, SScalarParam *pDst) {
if (NULL == pNode || NULL == pSrc || NULL == pDst) {
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t code = 0;
SScalarCtx ctx = {.code = 0, .pSrc = pSrc};
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == ctx.pRes) {
sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
nodesRewriteNodePostOrder(&pNode, sclCalculate, (void *)&code);
nodesWalkNodePostOrder(&pNode, sclCalcWalker, (void *)&ctx);
if (code) {
if (ctx.code) {
nodesDestroyNode(pNode);
SCL_ERR_RET(code);
sclFreeRes(ctx.pRes);
SCL_ERR_RET(ctx.code);
}
*pRes = pNode;
SScalarParam *res = taosHashGet(ctx.pRes, &pNode, POINTER_BYTES);
if (NULL == res) {
sclError("no res for calculating, node:%d, type:%d", pNode, nodeType(pNode));
SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
*pDst = *res;
SCL_RET(code);
nodesDestroyNode(pNode);
return TSDB_CODE_SUCCESS;
}
......
......@@ -2,13 +2,13 @@
#include "tbinoperator.h"
#include "tunaryoperator.h"
static void assignBasicParaInfo(struct SScalarFuncParam* dst, const struct SScalarFuncParam* src) {
static void assignBasicParaInfo(struct SScalarParam* dst, const struct SScalarParam* src) {
dst->type = src->type;
dst->bytes = src->bytes;
dst->num = src->num;
}
static void tceil(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) {
static void tceil(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) {
assignBasicParaInfo(pOutput, pLeft);
assert(numOfInput == 1);
......@@ -34,7 +34,7 @@ static void tceil(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFun
}
}
static void tfloor(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) {
static void tfloor(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) {
assignBasicParaInfo(pOutput, pLeft);
assert(numOfInput == 1);
......@@ -62,7 +62,7 @@ static void tfloor(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFu
}
}
static void _tabs(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) {
static void _tabs(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) {
assignBasicParaInfo(pOutput, pLeft);
assert(numOfInput == 1);
......@@ -120,7 +120,7 @@ static void _tabs(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFun
}
}
static void tround(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) {
static void tround(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) {
assignBasicParaInfo(pOutput, pLeft);
assert(numOfInput == 1);
......@@ -146,7 +146,7 @@ static void tround(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFu
}
}
static void tlength(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) {
static void tlength(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) {
assert(numOfInput == 1);
int64_t* out = (int64_t*) pOutput->data;
......@@ -157,7 +157,7 @@ static void tlength(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarF
}
}
static void tconcat(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) {
static void tconcat(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) {
assert(numOfInput > 0);
int32_t rowLen = 0;
......@@ -189,11 +189,11 @@ static void tconcat(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarF
}
}
static void tltrim(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) {
static void tltrim(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) {
}
static void trtrim(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) {
static void trtrim(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) {
}
......@@ -262,7 +262,7 @@ static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOf
}
}
static void setScalarFuncParam(SScalarFuncParam* param, int32_t type, int32_t bytes, void* pInput, int32_t numOfRows) {
static void setScalarFuncParam(SScalarParam* param, int32_t type, int32_t bytes, void* pInput, int32_t numOfRows) {
param->bytes = bytes;
param->type = type;
param->num = numOfRows;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册