diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index bd9cd4a250581aac1682252dfe048125eab242a9..231d52bae4eae56909d27fc3a6113e1c8ad134ef 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -139,11 +139,11 @@ bool isSimpleAggregate(SQueryInfo* pQueryInfo); bool isBlockDistQuery(SQueryInfo* pQueryInfo); int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo); -bool tscNonOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo *pQueryInfo, int32_t tableIndex); -bool tscOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex); -bool tscIsProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex); +bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex); +bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); +bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); -bool tscIsProjectionQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo); +bool tscIsProjectionQuery(SQueryInfo* pQueryInfo); bool tscIsTwoStageSTableQuery(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscQueryTags(SQueryInfo* pQueryInfo); @@ -246,7 +246,7 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo); int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex); int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo); int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists); -int32_t tscGetUdfFromNode(SSqlObj *pSql); +int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo); void tscResetForNextRetrieve(SSqlRes* pRes); void tscDoQuery(SSqlObj* pSql); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 0ffafe6c01f1e505e1caae824dcdcf3b5ef34e33..4d21337497d954e1fc44537ac8fc6d0358c01871 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -240,6 +240,7 @@ typedef struct SQueryInfo { int32_t havingFieldNum; bool globalMerge; // need global merge bool arithmCalOnAgg; // arithmetic calculation on aggregate result. + SArray *pUdfInfo; // user defined function information SArray } SQueryInfo; typedef struct { @@ -284,7 +285,6 @@ typedef struct { SHashObj *pTableBlockHashList; // data block for each table SArray *pDataBlocks; // SArray. Merged submit block for each vgroup - SArray *pUdfInfo; // user defined function information SArray } SSqlCmd; typedef struct SResRec { diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index a4bfa4f38f864a34339cd301c08445b46b3b8904..3450a3173fe76ae545251d861d8d34f5c0ed4e13 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -306,7 +306,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde pMerger->pDesc->pColumnModel->capacity = 1; // restore the limitation value at the last stage - if (tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) { + if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { pQueryInfo->limit.limit = pQueryInfo->clauseLimit; pQueryInfo->limit.offset = pQueryInfo->prjOffset; } @@ -461,7 +461,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm } // primary timestamp column is involved in final result - if (pQueryInfo->interval.interval != 0 || tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) { + if (pQueryInfo->interval.interval != 0 || tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { numOfGroupByCols++; } @@ -617,7 +617,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr type = pModel->pFields[i].field.type; bytes = pModel->pFields[i].field.bytes; } else if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1); + SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1); int32_t ret = getResultDataInfo(p1.type, p1.bytes, functionId, 0, &type, &bytes, &inter, 0, false, pUdfInfo); assert(ret == TSDB_CODE_SUCCESS); @@ -801,6 +801,15 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { continue; } + + if (functionId < 0) { + SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); + + doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); + + continue; + } + aAggs[functionId].mergeFunc(&pCtx[j]); } } else { @@ -809,6 +818,15 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { continue; } + + if (functionId < 0) { + SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); + + doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); + + continue; + } + aAggs[functionId].xFinalize(&pCtx[j]); } @@ -825,6 +843,10 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S } for(int32_t j = 0; j < numOfExpr; ++j) { + if (pCtx[j].functionId < 0) { + continue; + } + aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo); } @@ -837,6 +859,15 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { continue; } + + if (functionId < 0) { + SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); + + doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); + + continue; + } + aAggs[functionId].mergeFunc(&pCtx[j]); } } @@ -850,6 +881,15 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { continue; } + + if (functionId < 0) { + SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); + + doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); + + continue; + } + aAggs[functionId].mergeFunc(&pCtx[j]); } } @@ -1103,6 +1143,11 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { { // reset output buffer for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { SQLFunctionCtx* pCtx = &pAggInfo->binfo.pCtx[j]; + if (pCtx->functionId < 0) { + clearOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity); + continue; + } + aAggs[pCtx->functionId].init(pCtx, pCtx->resultInfo); } } @@ -1154,6 +1199,14 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { continue; } + if (functionId < 0) { + SUdfInfo* pUdfInfo = taosArrayGet(pAggInfo->udfInfo, -1 * functionId - 1); + + doInvokeUdf(pUdfInfo, &pAggInfo->binfo.pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); + + continue; + } + aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]); } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 51596c134748f9e9a87236cd68007bc3dd395784..691f21bab46de8a864607ddd42b5d064f92b7b44 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -83,7 +83,7 @@ static int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SAr static bool validateIpAddress(const char* ip, size_t size); static bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo); -static bool functionCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery); +static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery); static int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd); @@ -1774,13 +1774,12 @@ void genUdfList(SArray* pUdfInfo, tSqlExpr *pNode) { } } -static int32_t checkForUdf(SSqlObj* pSql, SArray* pSelection) { - SSqlCmd* pCmd = &pSql->cmd; - if (pCmd->pUdfInfo != NULL) { +static int32_t checkForUdf(SSqlObj* pSql, SQueryInfo* pQueryInfo, SArray* pSelection) { + if (pQueryInfo->pUdfInfo != NULL) { return TSDB_CODE_SUCCESS; } - pCmd->pUdfInfo = taosArrayInit(4, sizeof(struct SUdfInfo)); + pQueryInfo->pUdfInfo = taosArrayInit(4, sizeof(struct SUdfInfo)); size_t nExpr = taosArrayGetSize(pSelection); @@ -1789,12 +1788,12 @@ static int32_t checkForUdf(SSqlObj* pSql, SArray* pSelection) { int32_t type = pItem->pNode->type; if (type == SQL_NODE_EXPR || type == SQL_NODE_SQLFUNCTION) { - genUdfList(pCmd->pUdfInfo, pItem->pNode); + genUdfList(pQueryInfo->pUdfInfo, pItem->pNode); } } - if (taosArrayGetSize(pCmd->pUdfInfo) > 0) { - return tscGetUdfFromNode(pSql); + if (taosArrayGetSize(pQueryInfo->pUdfInfo) > 0) { + return tscGetUdfFromNode(pSql, pQueryInfo); } else { return TSDB_CODE_SUCCESS; } @@ -1845,7 +1844,7 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pS if (type == SQL_NODE_SQLFUNCTION) { pItem->pNode->functionId = isValidFunction(pItem->pNode->operand.z, pItem->pNode->operand.n); if (pItem->pNode->functionId < 0) { - SUdfInfo* pUdfInfo = isValidUdf(pCmd->pUdfInfo, pItem->pNode->operand.z, pItem->pNode->operand.n); + SUdfInfo* pUdfInfo = isValidUdf(pQueryInfo->pUdfInfo, pItem->pNode->operand.z, pItem->pNode->operand.n); if (pUdfInfo == NULL) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); } @@ -1890,7 +1889,7 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pS addPrimaryTsColIntoResult(pQueryInfo); } - if (!functionCompatibleCheck(pCmd, pQueryInfo, joinQuery, timeWindowQuery)) { + if (!functionCompatibleCheck(pQueryInfo, joinQuery, timeWindowQuery)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } @@ -2700,7 +2699,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } default: { - SUdfInfo* pUdfInfo = isValidUdf(pCmd->pUdfInfo, pItem->pNode->operand.z, pItem->pNode->operand.n); + SUdfInfo* pUdfInfo = isValidUdf(pQueryInfo->pUdfInfo, pItem->pNode->operand.z, pItem->pNode->operand.n); if (pUdfInfo == NULL) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg9); } @@ -3180,7 +3179,7 @@ static bool groupbyTagsOrNull(SQueryInfo* pQueryInfo) { return true; } -static bool functionCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery) { +static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery) { int32_t startIdx = 0; int32_t aggUdf = 0; int32_t scalarUdf = 0; @@ -3201,7 +3200,7 @@ static bool functionCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, bool int16_t functionId = pExpr1->base.functionId; if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1); + SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1); pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE ? ++aggUdf : ++scalarUdf; continue; @@ -5257,7 +5256,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; // orderby ts query on super table - if (tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) { + if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { addPrimaryTsColIntoResult(pQueryInfo); } } @@ -5635,7 +5634,7 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQu SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, k); if (pExpr->base.functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * pExpr->base.functionId - 1); + SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * pExpr->base.functionId - 1); if (pUdfInfo->funcType == TSDB_UDF_TYPE_SCALAR) { isProjectionFunction = true; break; @@ -5885,13 +5884,13 @@ int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseI // todo refactor if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (!tscQueryTags(pQueryInfo)) { // local handle the super table tag query - if (tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) { + if (tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } // for projection query on super table, all queries are subqueries - if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) && + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY)) { pQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; } @@ -5927,7 +5926,7 @@ int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseI pQueryInfo->prjOffset = pQueryInfo->limit.offset; pQueryInfo->vgroupLimit = -1; - if (tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) { + if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { /* * the offset value should be removed during retrieve data from virtual node, since the * global order are done in client side, so the offset is applied at the client side @@ -6280,7 +6279,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd) } if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1); + SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1); if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { ++numOfAggregation; } @@ -6526,7 +6525,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { } // projection query on super table does not compatible with "group by" syntax - if (tscIsProjectionQuery(pCmd, pQueryInfo)) { + if (tscIsProjectionQuery(pQueryInfo)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } @@ -6708,7 +6707,7 @@ void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex) { char *name = NULL; if (pExpr->base.functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pSql->cmd.pUdfInfo, -1 * pExpr->base.functionId - 1); + SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * pExpr->base.functionId - 1); name = pUdfInfo->name; } else { name = aAggs[pExpr->base.functionId].name; @@ -7034,7 +7033,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { return code; } - code = checkForUdf(pSql, pSqlNode->pSelNodeList); + code = checkForUdf(pSql, pQueryInfo, pSqlNode->pSelNodeList); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -7059,7 +7058,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } - if (!tscIsProjectionQuery(pCmd, pQueryInfo) && pQueryInfo->interval.interval == 0) { + if (!tscIsProjectionQuery(pQueryInfo) && pQueryInfo->interval.interval == 0) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7); } @@ -7399,7 +7398,7 @@ int32_t validateHavingClause(SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SSqlCmd* p } //REDO function check - if (!functionCompatibleCheck(pCmd, pQueryInfo, joinQuery, timeWindowQuery)) { + if (!functionCompatibleCheck(pQueryInfo, joinQuery, timeWindowQuery)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } @@ -7589,18 +7588,18 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) { TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TABLE_QUERY); } + code = checkForUdf(pSql, pQueryInfo, pSqlNode->pSelNodeList); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + // parse the group by clause in the first place if (validateGroupbyNode(pQueryInfo, pSqlNode->pGroupby, pCmd) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } - code = checkForUdf(pSql, pSqlNode->pSelNodeList); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - // set where info - STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); + // set where info + STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); if (pSqlNode->pWhere != NULL) { if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 0920542af2b18426adf5fa757f09037e3e27dbc7..efbf87aa5340487eb50b683591831e8b6230d7e5 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -521,7 +521,7 @@ int tscBuildAndSendRequest(SSqlObj *pSql, SQueryInfo* pQueryInfo) { type = pQueryInfo->type; // while numOfTables equals to 0, it must be Heartbeat - assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0); + assert((pQueryInfo->numOfTables == 0 && (pQueryInfo->command == TSDB_SQL_HB || pSql->cmd.command == TSDB_SQL_RETRIEVE_FUNC)) || pQueryInfo->numOfTables > 0); } tscDebug("0x%"PRIx64" SQL cmd:%s will be processed, name:%s, type:%d", pSql->self, sqlCmd[pCmd->command], name, type); @@ -1026,10 +1026,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } // support only one udf - if (pCmd->pUdfInfo != NULL && taosArrayGetSize(pCmd->pUdfInfo) > 0) { + if (pQueryInfo->pUdfInfo != NULL && taosArrayGetSize(pQueryInfo->pUdfInfo) > 0) { pQueryMsg->udfContentOffset = htonl((int32_t) (pMsg - pCmd->payload)); - for(int32_t i = 0; i < taosArrayGetSize(pCmd->pUdfInfo); ++i) { - SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, i); + for(int32_t i = 0; i < taosArrayGetSize(pQueryInfo->pUdfInfo); ++i) { + SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, i); *(int8_t*) pMsg = pUdfInfo->resType; pMsg += sizeof(pUdfInfo->resType); @@ -1843,14 +1843,15 @@ int tscBuildRetrieveFuncMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; char *pMsg = pCmd->payload; - int32_t numOfFuncs = (int32_t)taosArrayGetSize(pCmd->pUdfInfo); + SQueryInfo* pQueryInfo = tscGetActiveQueryInfo(pCmd); + int32_t numOfFuncs = (int32_t)taosArrayGetSize(pQueryInfo->pUdfInfo); SRetrieveFuncMsg *pRetrieveFuncMsg = (SRetrieveFuncMsg *)pMsg; pRetrieveFuncMsg->num = htonl(numOfFuncs); pMsg += sizeof(SRetrieveFuncMsg); for(int32_t i = 0; i < numOfFuncs; ++i) { - SUdfInfo* pUdf = taosArrayGet(pCmd->pUdfInfo, i); + SUdfInfo* pUdf = taosArrayGet(pQueryInfo->pUdfInfo, i); STR_TO_NET_VARSTR(pMsg, pUdf->name); pMsg += varDataNetTLen(pMsg); } @@ -2125,15 +2126,17 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { int tscProcessRetrieveFuncRsp(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; SUdfFuncMsg* pFuncMsg = (SUdfFuncMsg *)pSql->res.pRsp; + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); + pFuncMsg->num = htonl(pFuncMsg->num); - assert(pFuncMsg->num == taosArrayGetSize(pCmd->pUdfInfo)); + assert(pFuncMsg->num == taosArrayGetSize(pQueryInfo->pUdfInfo)); char* pMsg = pFuncMsg->content; for(int32_t i = 0; i < pFuncMsg->num; ++i) { SFunctionInfoMsg* pFunc = (SFunctionInfoMsg*) pMsg; for(int32_t j = 0; j < pFuncMsg->num; ++j) { - SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, j); + SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, j); if (strcmp(pUdfInfo->name, pFunc->name) != 0) { continue; } @@ -2161,11 +2164,13 @@ int tscProcessRetrieveFuncRsp(SSqlObj* pSql) { return pSql->res.code; } + SQueryInfo* parQueryInfo = tscGetQueryInfo(&parent->cmd, parent->cmd.clauseIndex); + assert(parent->signature == parent && (int64_t)pSql->param == parent->self); - taosArrayDestroy(parent->cmd.pUdfInfo); + taosArrayDestroy(parQueryInfo->pUdfInfo); - parent->cmd.pUdfInfo = pCmd->pUdfInfo; // assigned to parent sql obj. - pCmd->pUdfInfo = NULL; + parQueryInfo->pUdfInfo = pQueryInfo->pUdfInfo; // assigned to parent sql obj. + pQueryInfo->pUdfInfo = NULL; return TSDB_CODE_SUCCESS; } @@ -2479,7 +2484,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { tscSetResRawPtr(pRes, pQueryInfo); } else if ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY)) { tscSetResRawPtr(pRes, pQueryInfo); - } else if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { + } else if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { tscSetResRawPtr(pRes, pQueryInfo); } @@ -2607,7 +2612,7 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create return tscGetTableMeta(pSql, pTableMetaInfo); } -int32_t tscGetUdfFromNode(SSqlObj *pSql) { +int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo) { SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); if (NULL == pNew) { tscError("%p malloc failed for new sqlobj to get user-defined functions", pSql); @@ -2618,14 +2623,24 @@ int32_t tscGetUdfFromNode(SSqlObj *pSql) { pNew->signature = pNew; pNew->cmd.command = TSDB_SQL_RETRIEVE_FUNC; - pNew->cmd.pUdfInfo = taosArrayInit(4, sizeof(SUdfInfo)); - for(int32_t i = 0; i < taosArrayGetSize(pSql->cmd.pUdfInfo); ++i) { + if (tscAddQueryInfo(&pNew->cmd) != TSDB_CODE_SUCCESS) { + tscError("%p malloc failed for new queryinfo", pSql); + tscFreeSqlObj(pNew); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd, 0); + + pNewQueryInfo->pUdfInfo = taosArrayInit(4, sizeof(SUdfInfo)); + for(int32_t i = 0; i < taosArrayGetSize(pQueryInfo->pUdfInfo); ++i) { SUdfInfo info = {0}; - SUdfInfo* p1 = taosArrayGet(pSql->cmd.pUdfInfo, i); + SUdfInfo* p1 = taosArrayGet(pQueryInfo->pUdfInfo, i); info = *p1; info.name = strdup(p1->name); - taosArrayPush(pNew->cmd.pUdfInfo, &info); + taosArrayPush(pNewQueryInfo->pUdfInfo, &info); } + + pNew->cmd.active = pNewQueryInfo; if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) { tscError("%p malloc failed for payload to get table meta", pSql); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index d0c59525aa31e9987dd4948df7fe7bbe6c115822..4db9cf7bc22c69ccf326e18e4f6c5405786f9c13 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -615,7 +615,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { if ((pExpr->base.colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) || (funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) { - int16_t functionId = tscIsProjectionQuery(&pNew->cmd, pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS; + int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS; tscAddFuncInSelectClause(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL); tscPrintSelNodeList(pNew, 0); @@ -636,7 +636,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { assert(pTableMetaInfo->pVgroupTables != NULL); - if (tscNonOrderedProjectionQueryOnSTable(&pNew->cmd, pQueryInfo, 0)) { + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { SArray* p = buildVgroupTableByResult(pQueryInfo, pTableMetaInfo->pVgroupTables); tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables); pTableMetaInfo->pVgroupTables = p; @@ -1429,7 +1429,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR } SSubqueryState* pState = &pParentSql->subState; - if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) && numOfRows == 0) { + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); assert(pQueryInfo->numOfTables == 1); @@ -1561,7 +1561,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { } SQueryInfo* p = tscGetQueryInfo(&pSub->cmd, 0); - orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(&pSub->cmd, p, 0); + orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(p, 0); if (orderedPrjQuery) { break; } @@ -1586,7 +1586,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd, 0); - if (tscNonOrderedProjectionQueryOnSTable(&pSub->cmd, pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows && + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); assert(pQueryInfo->numOfTables == 1); @@ -1788,7 +1788,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { } // In case of consequence query from other vnode, do not wait for other query response here. - if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(&pSql->cmd, pQueryInfo, 0))) { + if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { return; } @@ -1800,7 +1800,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { * if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of * data instead of returning to its invoker */ - if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(&pSql->cmd, pQueryInfo, 0)) { + if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->cmd.command = TSDB_SQL_FETCH; @@ -2773,6 +2773,8 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p // set the command flag must be after the semaphore been correctly set. if (pParentSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) { pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; + + SQueryInfo *pQueryInfo2 = tscGetQueryInfo(&pParentSql->cmd, pParentSql->cmd.clauseIndex); size_t size = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t j = 0; j < size; ++j) { @@ -2781,7 +2783,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p int32_t functionId = pCtx->functionId; if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pParentSql->cmd.pUdfInfo, -1 * functionId - 1); + SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo2->pUdfInfo, -1 * functionId - 1); code = initUdfInfo(pUdfInfo); if (code != TSDB_CODE_SUCCESS) { pParentSql->res.code = code; @@ -2862,8 +2864,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR tscDebug("0x%"PRIx64" sub:%p retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d", pParentSql->self, pSql, pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx); - SSqlCmd* pCmd = &pSql->cmd; - if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) { + if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64, pParentSql, pSql, tsMaxNumOfOrderedResults, num); tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7e0aad3b4402cd3205b9330032872a057deffa13..06a8b91177a12a17fc3127e77537d69a750c2bb1 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -127,7 +127,7 @@ bool tscIsTwoStageSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t tab } // for ordered projection query, iterate all qualified vnodes sequentially - if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, tableIndex)) { + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, tableIndex)) { return false; } @@ -138,7 +138,7 @@ bool tscIsTwoStageSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t tab return false; } -bool tscIsProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex) { +bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex); /* @@ -156,7 +156,7 @@ bool tscIsProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t int32_t functionId = tscSqlExprGet(pQueryInfo, i)->base.functionId; if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1); + SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1); if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { return false; } @@ -179,8 +179,8 @@ bool tscIsProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t } // not order by timestamp projection query on super table -bool tscNonOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex) { - if (!tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, tableIndex)) { +bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { + if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) { return false; } @@ -188,8 +188,8 @@ bool tscNonOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, return pQueryInfo->order.orderColId < 0; } -bool tscOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex) { - if (!tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, tableIndex)) { +bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { + if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) { return false; } @@ -197,14 +197,14 @@ bool tscOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, in return pQueryInfo->order.orderColId >= 0; } -bool tscIsProjectionQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { +bool tscIsProjectionQuery(SQueryInfo* pQueryInfo) { size_t size = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t i = 0; i < size; ++i) { int32_t functionId = tscSqlExprGet(pQueryInfo, i)->base.functionId; if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1); + SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1); if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { return false; } @@ -249,7 +249,7 @@ bool tscIsSecondStageQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { } } - if (tscIsProjectionQuery(pCmd, pQueryInfo)) { + if (tscIsProjectionQuery(pQueryInfo)) { return false; } @@ -422,6 +422,15 @@ bool isSimpleAggregate(SQueryInfo* pQueryInfo) { } int32_t functionId = pExpr->base.functionId; + if (functionId < 0) { + SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1); + if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { + return true; + } + + continue; + } + if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY) { continue; } @@ -799,6 +808,12 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) { tfree(pUp); } + + if (pCmd->subCmd) { + pQueryInfo->pUdfInfo = taosArrayDestroy(pQueryInfo->pUdfInfo); + } else { + pQueryInfo->pUdfInfo = tscDestroyUdfArrayList(pQueryInfo->pUdfInfo); + } freeQueryInfoImpl(pQueryInfo); clearAllTableMetaInfo(pQueryInfo, removeMeta); @@ -842,11 +857,6 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) { pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList, removeMeta); pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); - if (pCmd->subCmd) { - pCmd->pUdfInfo = taosArrayDestroy(pCmd->pUdfInfo); - } else { - pCmd->pUdfInfo = tscDestroyUdfArrayList(pCmd->pUdfInfo); - } tscFreeQueryInfo(pCmd, removeMeta); } @@ -2683,12 +2693,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t goto _error; } - if (pCmd->pUdfInfo) { - pnCmd->pUdfInfo = taosArrayDup(pCmd->pUdfInfo); - } SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pnCmd, 0); + if (pQueryInfo->pUdfInfo) { + pNewQueryInfo->pUdfInfo = taosArrayDup(pQueryInfo->pUdfInfo); + } + pNewQueryInfo->command = pQueryInfo->command; pnCmd->active = pNewQueryInfo; @@ -3096,7 +3107,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) { numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); } - return tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) && + return tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1); } @@ -3114,7 +3125,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { * no result returned from the current virtual node anymore, try the next vnode if exists * if case of: multi-vnode super table projection query */ - assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes)); + assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes)); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups; @@ -3489,9 +3500,15 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf functionId = TSDB_FUNC_STDDEV; } + SUdfInfo* pUdfInfo = NULL; + + if (functionId < 0) { + pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1); + } + int32_t inter = 0; getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, functionId, 0, &pse->resType, - &pse->resBytes, &inter, 0, false, NULL); + &pse->resBytes, &inter, 0, false, pUdfInfo); pse->colType = pse->resType; pse->colBytes = pse->resBytes; @@ -3558,8 +3575,14 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu functionId = TSDB_FUNC_STDDEV; } + SUdfInfo* pUdfInfo = NULL; + + if (functionId < 0) { + pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1); + } + getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, functionId, 0, &pse->resType, &pse->resBytes, &inter, - 0, false, NULL); + 0, false, pUdfInfo); } } @@ -3633,7 +3656,8 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->fillType = pQueryInfo->fillType; pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo); pQueryAttr->havingNum = pQueryInfo->havingFieldNum; - + pQueryAttr->pUdfInfo = pQueryInfo->pUdfInfo; + if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor pQueryAttr->window = pQueryInfo->window; } else { diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 821305c607c6a9a4e1048910c9781a1b31e2b5f4..59ad2adea101c23a909c4691298fddd98048c953 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -236,6 +236,7 @@ typedef struct SQueryAttr { SMemRef memRef; STableGroupInfo tableGroupInfo; // table list SArray int32_t vgId; + SArray *pUdfInfo; // no need to free } SQueryAttr; typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup); @@ -495,6 +496,7 @@ typedef struct SMultiwayMergeInfo { bool hasPrev; bool groupMix; + SArray *udfInfo; } SMultiwayMergeInfo; SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); @@ -515,7 +517,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows, void* merger, bool groupMix); -SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param); +SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo); SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger); SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); @@ -530,6 +532,7 @@ void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlo int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput); void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows); +void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity); void freeParam(SQueryParam *param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); @@ -571,4 +574,6 @@ void freeQueryAttr(SQueryAttr *pQuery); int32_t getMaximumIdleDurationSec(); +void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type); + #endif // TDENGINE_QEXECUTOR_H diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 3092a29e96f51b19671361c7c95e971bfe1c9564..24a0a2394c999d90838b8677f89f1d8b09d8ec81 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -701,39 +701,72 @@ static int32_t getNumOfRowsInTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc return num; } -static void doInvokeUdf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t idx) { +void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type) { int32_t output = 0; - SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo; - if (pUdfInfo && pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]) { - qDebug("invoke udf function:%s,%p", pUdfInfo->name, pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]); - if (pUdfInfo->isScript) { - (*(scriptNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])(pUdfInfo->pScriptCtx, - (char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->pOutput, - (char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes); - } else { - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + if (pUdfInfo == NULL || pUdfInfo->funcs[type] == NULL) { + qError("empty udf function, type:%d", type); + return; + } - void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo); + qDebug("invoke udf function:%s,%p", pUdfInfo->name, pUdfInfo->funcs[type]); - (*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])((char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, - pCtx->pOutput, interBuf, (char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes, &pUdfInfo->init); - } + switch (type) { + case TSDB_UDF_FUNC_NORMAL: + if (pUdfInfo->isScript) { + (*(scriptNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])(pUdfInfo->pScriptCtx, + (char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->pOutput, + (char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes); + } else { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { + void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo); + + (*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])((char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, + pCtx->pOutput, interBuf, (char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes, &pUdfInfo->init); + } + + if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { + pCtx->resultInfo->numOfRes = output; + } else { + pCtx->resultInfo->numOfRes += output; + } + + if (pCtx->resultInfo->numOfRes > 0) { + pCtx->resultInfo->hasResult = DATA_SET_FLAG; + } + + break; + + case TSDB_UDF_FUNC_MERGE: + (*(udfMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pCtx->pInput, pCtx->size, pCtx->pOutput, &output, &pUdfInfo->init); + + // set the output value exist pCtx->resultInfo->numOfRes = output; - } else { - pCtx->resultInfo->numOfRes += output; - } + if (output > 0) { + pCtx->resultInfo->hasResult = DATA_SET_FLAG; + } - if (pCtx->resultInfo->numOfRes > 0) { - pCtx->resultInfo->hasResult = DATA_SET_FLAG; - } + break; - return; + case TSDB_UDF_FUNC_FINALIZE: { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo); + if (pUdfInfo->isScript) { + (*(scriptFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pUdfInfo->pScriptCtx, pCtx->pOutput, &output); + } else { + (*(udfFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx->pOutput, interBuf, &output, &pUdfInfo->init); + } + // set the output value exist + pCtx->resultInfo->numOfRes = output; + if (output > 0) { + pCtx->resultInfo->hasResult = DATA_SET_FLAG; + } + + break; + } } - qError("empty udf function"); return; } @@ -768,7 +801,8 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { // aAggs[functionId].xFunction(&pCtx[k]); if (functionId < 0) { // load the script and exec, pRuntimeEnv->pUdfInfo - doInvokeUdf(pRuntimeEnv, &pCtx[k], 0); + SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo; + doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL); } else { aAggs[functionId].xFunction(&pCtx[k]); } @@ -982,6 +1016,13 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, pCtx[i].pInput = p->pData; assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type); + if (pCtx[i].functionId < 0) { + SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); + pCtx[i].ptsList = (int64_t*) tsInfo->pData; + + continue; + } + uint32_t status = aAggs[pCtx[i].functionId].status; if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) { SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); @@ -1011,7 +1052,8 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction pCtx[k].startTs = startTs;// this can be set during create the struct // aAggs[functionId].xFunction(&pCtx[k]); if (functionId < 0) { - doInvokeUdf(pRuntimeEnv, &pCtx[k], 0); + SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo; + doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL); } else { aAggs[functionId].xFunction(&pCtx[k]); } @@ -1033,8 +1075,9 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC pCtx[k].startTs = pQueryAttr->window.skey; if (pCtx[k].functionId < 0) { - // load the script and exec - doInvokeUdf(pRuntimeEnv, &pCtx[k], 0); + // load the script and exec + SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo; + doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL); } else { aAggs[pCtx[k].functionId].xFunction(&pCtx[k]); } @@ -1359,7 +1402,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn pInfo->binfo.pCtx[k].size = 1; int32_t functionId = pInfo->binfo.pCtx[k].functionId; if (functionId < 0) { - doInvokeUdf(pRuntimeEnv, &pInfo->binfo.pCtx[k], j); + SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo; + doInvokeUdf(pUdfInfo, &pInfo->binfo.pCtx[k], j, TSDB_UDF_FUNC_NORMAL); } else if (functionNeedToExecute(pRuntimeEnv, &pInfo->binfo.pCtx[k], functionId)) { aAggs[functionId].xFunctionF(&pInfo->binfo.pCtx[k], j); } @@ -1848,7 +1892,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_GlobalAggregate: { pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, - pQueryAttr->numOfExpr3, merger); + pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo); break; } @@ -3154,6 +3198,21 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf } } +void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity) { + SSDataBlock* pDataBlock = pBInfo->pRes; + + for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { + SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); + + int32_t functionId = pBInfo->pCtx[i].functionId; + if (functionId < 0) { + memset(pBInfo->pCtx[i].pOutput, 0, pColInfo->info.bytes * (*bufCapacity)); + } + } +} + + + void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) { for (int32_t j = 0; j < size; ++j) { SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]); @@ -3221,23 +3280,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult for (int32_t j = 0; j < numOfOutput; ++j) { if (pCtx[j].functionId < 0) { - int32_t output = 0; - SResultRowCellInfo *pResInfo = GET_RES_INFO(&pCtx[j]); - void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo); - - if (pRuntimeEnv->pUdfInfo && pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE]) { - if (pRuntimeEnv->pUdfInfo->isScript) { - (*(scriptFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pRuntimeEnv->pUdfInfo->pScriptCtx, pCtx[j].pOutput, &output); - } else { - (*(udfFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx[j].pOutput, interBuf, &output, &pRuntimeEnv->pUdfInfo->init); - } - } - - // set the output value exist - pCtx[j].resultInfo->numOfRes = output; - if (output > 0) { - pCtx[j].resultInfo->hasResult = DATA_SET_FLAG; - } + doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); } else { aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]); } @@ -3254,23 +3297,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult } else { for (int32_t j = 0; j < numOfOutput; ++j) { if (pCtx[j].functionId < 0) { - int32_t output = 0; - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo); - - if (pRuntimeEnv->pUdfInfo && pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE]) { - if (pRuntimeEnv->pUdfInfo->isScript) { - (*(scriptFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pRuntimeEnv->pUdfInfo->pScriptCtx, pCtx[j].pOutput, &output); - } else { - (*(udfFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx[j].pOutput, interBuf, &output, &pRuntimeEnv->pUdfInfo->init); - } - } - - // set the output value exist - pCtx[j].resultInfo->numOfRes = output; - if (output > 0) { - pCtx[j].resultInfo->hasResult = DATA_SET_FLAG; - } + doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); } else { aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]); } @@ -4642,7 +4669,7 @@ static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { } SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, - SExprInfo* pExpr, int32_t numOfOutput, void* param) { + SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); pInfo->resultRowFactor = @@ -4653,6 +4680,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, pInfo->pMerge = param; pInfo->bufCapacity = 4096; + pInfo->udfInfo = pUdfInfo; pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); @@ -4919,7 +4947,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) { } // Return result of the previous group in the firstly. - if (newgroup && pRes->info.rows > 0) { + if (*newgroup && pRes->info.rows > 0) { pArithInfo->existDataBlock = pBlock; clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput); return pInfo->pRes; diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index 0554a887ec68ca3353b5e45c1067ea734ea98cfd..dccdc5e1d38e2e3929d901393201c80116752b0c 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -138,6 +138,12 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { } else { // diff/add/multiply/subtract/division op = OP_Arithmetic; taosArrayPush(plan, &op); + + //arithmetic on scalar function + if (pQueryAttr->pExpr2 != NULL) { + op = OP_Arithmetic; + taosArrayPush(plan, &op); + } } if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) { diff --git a/tests/script/general/parser/udf_dll_stable.sim b/tests/script/general/parser/udf_dll_stable.sim index 3dd43202aa62d7823356a6dd2bb7b086220b7637..b8da57467e912ff27f4fbda7226c75e089f04808 100644 --- a/tests/script/general/parser/udf_dll_stable.sim +++ b/tests/script/general/parser/udf_dll_stable.sim @@ -678,7 +678,6 @@ if $data61 != 8 then return -1 endi - sql select add_one(f1)+1 from stb1; if $rows != 17 then return -1