From 1a078d5d56a6762a7bdc50714f376b48b3651552 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 8 Jun 2022 18:36:46 +0800 Subject: [PATCH] fix in issue --- include/common/tmsg.h | 1 + include/libs/function/function.h | 1 + source/dnode/mnode/impl/src/mndSma.c | 9 +++++ source/libs/scalar/inc/sclInt.h | 9 ++++- source/libs/scalar/src/scalar.c | 51 +++++++++++++++++++++++++- source/libs/scalar/src/sclvector.c | 53 +++++++++++++--------------- 6 files changed, 94 insertions(+), 30 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4a3c4b0c3f..5453c8d0ea 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2500,6 +2500,7 @@ typedef struct { int64_t sliding; int64_t dstTbUid; int32_t dstVgId; // for stream + SEpSet epSet; char* expr; } STableIndexInfo; diff --git a/include/libs/function/function.h b/include/libs/function/function.h index e8cb363e08..810991f770 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -197,6 +197,7 @@ typedef struct SAggFunctionInfo { struct SScalarParam { SColumnInfoData *columnData; SHashObj *pHashFilter; + int32_t hashValueType; void *param; // other parameter, such as meta handle from vnode, to extract table name/tag value int32_t numOfRows; }; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 1d47f8fc7a..cde36eac58 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -894,6 +894,15 @@ static int32_t mndGetTableSma(SMnode *pMnode, STableIndexReq *indexReq, STableIn info.sliding = pSma->sliding; info.dstTbUid = pSma->dstTbUid; info.dstVgId = pSma->dstVgId; + + SVgObj* pVg = mndAcquireVgroup(pMnode, pSma->dstVgId); + if (pVg == NULL) { + code = -1; + sdbRelease(pSdb, pSma); + return code; + } + info.epSet = mndGetVgroupEpset(pMnode, pVg); + info.expr = taosMemoryMalloc(pSma->exprLen + 1); if (info.expr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/scalar/inc/sclInt.h b/source/libs/scalar/inc/sclInt.h index 1c2e4a358a..4dd16098ff 100644 --- a/source/libs/scalar/inc/sclInt.h +++ b/source/libs/scalar/inc/sclInt.h @@ -22,11 +22,18 @@ extern "C" { #include "thash.h" #include "query.h" +typedef struct SOperatorValueType { + int32_t opResType; + int32_t selfType; + int32_t peerType; +} SOperatorValueType; + typedef struct SScalarCtx { int32_t code; SArray *pBlockList; /* element is SSDataBlock* */ SHashObj *pRes; /* element is SScalarParam */ void *param; // additional parameter (meta actually) for acquire value such as tbname/tags values + SOperatorValueType type; } SScalarCtx; @@ -53,7 +60,7 @@ int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out); SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows); int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode); -#define GET_PARAM_TYPE(_c) ((_c)->columnData->info.type) +#define GET_PARAM_TYPE(_c) ((_c)->columnData ? (_c)->columnData->info.type : (_c)->hashValueType) #define GET_PARAM_BYTES(_c) ((_c)->columnData->info.bytes) #define GET_PARAM_PRECISON(_c) ((_c)->columnData->info.precision) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index d2436b9948..66e4af5ded 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -208,7 +208,13 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t SCL_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SCL_ERR_RET(scalarGenerateSetFromList((void **)¶m->pHashFilter, node, nodeList->dataType.type)); + int32_t type = vectorGetConvertType(ctx->type.selfType, ctx->type.peerType); + if (type == 0) { + type = nodeList->dataType.type; + } + + SCL_ERR_RET(scalarGenerateSetFromList((void **)¶m->pHashFilter, node, type)); + param->hashValueType = type; if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) { taosHashCleanup(param->pHashFilter); sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param)); @@ -334,6 +340,46 @@ _return: SCL_RET(code); } +int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx) { + if (NULL == pNode) { + return -1; + } + + switch (nodeType(pNode)) { + case QUERY_NODE_VALUE: { + SValueNode *valueNode = (SValueNode *)pNode; + return valueNode->node.resType.type; + } + case QUERY_NODE_NODE_LIST: { + SNodeListNode *nodeList = (SNodeListNode *)pNode; + return nodeList->dataType.type; + } + case QUERY_NODE_COLUMN: { + SColumnNode *colNode = (SColumnNode *)pNode; + return colNode->node.resType.type; + } + case QUERY_NODE_FUNCTION: + case QUERY_NODE_OPERATOR: + case QUERY_NODE_LOGIC_CONDITION: { + SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &pNode, POINTER_BYTES); + if (NULL == res) { + sclError("no result for node, type:%d, node:%p", nodeType(pNode), pNode); + return -1; + } + return res->columnData->info.type; + } + } + + return -1; +} + + +void sclSetOperatorValueType(SOperatorNode *node, SScalarCtx *ctx) { + ctx->type.opResType = node->node.resType.type; + ctx->type.selfType = sclGetNodeType(node->pLeft, ctx); + ctx->type.peerType = sclGetNodeType(node->pRight, ctx); +} + int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScalarCtx *ctx, int32_t *rowNum) { int32_t code = 0; int32_t paramNum = scalarGetOperatorParamNum(node->opType); @@ -348,8 +394,11 @@ int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScal SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + sclSetOperatorValueType(node, ctx); + SCL_ERR_JRET(sclInitParam(node->pLeft, ¶mList[0], ctx, rowNum)); if (paramNum > 1) { + TSWAP(ctx->type.selfType, ctx->type.peerType); SCL_ERR_JRET(sclInitParam(node->pRight, ¶mList[1], ctx, rowNum)); } diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index 59208de3c4..1252e27554 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -627,6 +627,11 @@ int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut) { SColumnInfoData* pInputCol = pIn->columnData; SColumnInfoData* pOutputCol = pOut->columnData; + if (NULL == pInputCol) { + sclError("input column is NULL, hashFilter %p", pIn->pHashFilter); + return TSDB_CODE_APP_ERROR; + } + int16_t inType = pInputCol->info.type; int16_t outType = pOutputCol->info.type; @@ -826,11 +831,26 @@ int32_t vectorGetConvertType(int32_t type1, int32_t type2) { return gConvertTypes[type2][type1]; } -int32_t vectorConvert(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam* pLeftOut, SScalarParam* pRightOut) { - if (pLeft->pHashFilter != NULL || pRight->pHashFilter != NULL) { - return TSDB_CODE_SUCCESS; +int32_t vectorConvertScalarParam(SScalarParam *input, SScalarParam *output, int32_t type) { + int32_t code = 0; + SDataType t = {.type = type, .bytes = tDataTypes[type].bytes}; + output->numOfRows = input->numOfRows; + + output->columnData = createColumnInfoData(&t, input->numOfRows); + if (output->columnData == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; } + code = vectorConvertImpl(input, output); + if (code) { +// taosMemoryFreeClear(paramOut1->data); + return code; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t vectorConvert(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam* pLeftOut, SScalarParam* pRightOut) { int32_t leftType = GET_PARAM_TYPE(pLeft); int32_t rightType = GET_PARAM_TYPE(pRight); if (leftType == rightType) { @@ -859,34 +879,11 @@ int32_t vectorConvert(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam* p } if (type != GET_PARAM_TYPE(param1)) { - SDataType t = {.type = type, .bytes = tDataTypes[type].bytes}; - paramOut1->numOfRows = param1->numOfRows; - - paramOut1->columnData = createColumnInfoData(&t, param1->numOfRows); - if (paramOut1->columnData == NULL) { - return terrno; - } - - code = vectorConvertImpl(param1, paramOut1); - if (code) { -// taosMemoryFreeClear(paramOut1->data); - return code; - } + return vectorConvertScalarParam(param1, paramOut1, type); } if (type != GET_PARAM_TYPE(param2)) { - SDataType t = {.type = type, .bytes = tDataTypes[type].bytes}; - paramOut2->numOfRows = param2->numOfRows; - - paramOut2->columnData = createColumnInfoData(&t, param2->numOfRows); - if (paramOut2->columnData == NULL) { - return terrno; - } - - code = vectorConvertImpl(param2, paramOut2); - if (code) { - return code; - } + return vectorConvertScalarParam(param2, paramOut2, type); } return TSDB_CODE_SUCCESS; -- GitLab