From 18cb7b36fc41fcc9c61a82caf86f5390f60b5520 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 18 Mar 2021 23:03:48 +0800 Subject: [PATCH] [td-3188]refactor. --- src/client/inc/tsclient.h | 10 ----- src/client/src/tscLocalMerge.c | 2 +- src/client/src/tscSQLParser.c | 2 - src/client/src/tscServer.c | 46 ++++++++++++++------ src/inc/taosmsg.h | 2 +- src/mnode/src/mnodeFunc.c | 3 +- src/query/inc/qAggMain.h | 6 +-- src/query/inc/qExecutor.h | 4 +- src/query/inc/qUdf.h | 32 ++++++++++++++ src/query/src/qAggMain.c | 74 ++++++++++++++++---------------- src/query/src/qExecutor.c | 77 +++++++++++++++++++++++++++------- src/query/src/queryMain.c | 2 +- 12 files changed, 174 insertions(+), 86 deletions(-) create mode 100644 src/query/inc/qUdf.h diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 13a6d888de..7a38141b53 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -280,16 +280,6 @@ typedef struct SResRec { int numOfTotal; } SResRec; -typedef struct SUdfInfo { - int32_t functionId; // system assigned function id - char *name; // function name - int16_t resType; // result type - int16_t resBytes; // result byte - int32_t funcType; // scalar function or aggregate function - int32_t contLen; // content length - char *content; // binary content -} SUdfInfo; - typedef struct { int32_t numOfRows; // num of results in current retrieval int64_t numOfRowsGroup; // num of results of current group diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index a44b0c46ba..6217beff44 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -1075,7 +1075,7 @@ static void doExecuteFinalMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool n pCtx->currentStage = MERGE_STAGE; if (needInit) { - aAggs[pCtx->functionId].init(pCtx); + aAggs[pCtx->functionId].init(pCtx, pCtx->resultInfo); } } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 96d3e9632c..7424942787 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -322,8 +322,6 @@ int32_t handleUserDefinedFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) { //TODO CHECK CODE - - if (len + sizeof(SCreateFuncMsg) > pSql->cmd.allocSize) { ret = tscAllocPayload(&pSql->cmd, len + sizeof(SCreateFuncMsg)); if (ret) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 2f6d053351..c9440cfb02 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1063,12 +1063,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } // support only one udf - if (pCmd->pUdfInfo != NULL) { - assert(taosArrayGetSize(pCmd->pUdfInfo) == 1); - + if (pCmd->pUdfInfo != NULL && taosArrayGetSize(pCmd->pUdfInfo) > 0) { pQueryMsg->udfContentOffset = htonl((int32_t) (pMsg - pCmd->payload)); for(int32_t i = 0; i < taosArrayGetSize(pCmd->pUdfInfo); ++i) { SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, i); + *(int8_t*) pMsg = pUdfInfo->resType; + pMsg += sizeof(pUdfInfo->resType); + + *(int16_t*) pMsg = htons(pUdfInfo->resBytes); + pMsg += sizeof(pUdfInfo->resBytes); + STR_TO_VARSTR(pMsg, pUdfInfo->name); pMsg += varDataTLen(pMsg); @@ -2115,25 +2119,41 @@ int tscProcessRetrieveFuncRsp(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; SUdfFuncMsg* pFuncMsg = (SUdfFuncMsg *)pSql->res.pRsp; pFuncMsg->num = htonl(pFuncMsg->num); + assert(pFuncMsg->num == taosArrayGetSize(pCmd->pUdfInfo)); char* pMsg = pFuncMsg->content; for(int32_t i = 0; i < pFuncMsg->num; ++i) { SFunctionInfoMsg* pFunc = (SFunctionInfoMsg*) pMsg; - SUdfInfo info = {0}; - info.name = strndup(pFunc->name, TSDB_FUNC_NAME_LEN); - info.resBytes = htons(pFunc->resBytes); - info.resType = htons(pFunc->resType); - info.funcType = TSDB_UDF_TYPE_SCALAR; + for(int32_t j = 0; j < pFuncMsg->num; ++j) { + SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, j); + if (strcmp(pUdfInfo->name, pFunc->name) != 0) { + continue; + } + + pUdfInfo->resBytes = htons(pFunc->resBytes); + pUdfInfo->resType = pFunc->resType; + pUdfInfo->funcType = TSDB_UDF_TYPE_SCALAR; + pUdfInfo->contLen = htonl(pFunc->len); + + pUdfInfo->content = malloc(pUdfInfo->contLen); + memcpy(pUdfInfo->content, pFunc->content, pUdfInfo->contLen); - info.contLen = htonl(pFunc->len); - info.content = malloc(pFunc->len); - memcpy(info.content, pFunc->content, info.contLen); + pMsg += sizeof(SFunctionInfoMsg) + pUdfInfo->contLen; + } + } - taosArrayPush(pCmd->pUdfInfo, &info); - pMsg += sizeof(SFunctionInfoMsg) + info.contLen; + // master sqlObj locates in param + SSqlObj* parent = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSql->param); + if(parent == NULL) { + return pSql->res.code; } + assert(parent->signature == parent && (int64_t)pSql->param == parent->self); + taosArrayDestroy(parent->cmd.pUdfInfo); + + parent->cmd.pUdfInfo = pCmd->pUdfInfo; // assigned to parent sql obj. + pCmd->pUdfInfo = NULL; return TSDB_CODE_SUCCESS; } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index bf0acf3c3f..7e61010cd2 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -593,7 +593,7 @@ typedef struct { typedef struct { char name[TSDB_FUNC_NAME_LEN]; - int16_t resType; + int8_t resType; int16_t resBytes; int32_t len; char content[]; diff --git a/src/mnode/src/mnodeFunc.c b/src/mnode/src/mnodeFunc.c index 464ad3f110..803c6a45b2 100644 --- a/src/mnode/src/mnodeFunc.c +++ b/src/mnode/src/mnodeFunc.c @@ -447,7 +447,8 @@ static int32_t mnodeProcessRetrieveFuncImplMsg(SMnodeMsg *pMsg) { pFuncInfo->len = htonl(pFuncObj->contLen); memcpy(pFuncInfo->content, pFuncObj->cont, pFuncObj->contLen); - pFuncInfo->resType = htons(pFuncObj->resType); + pFuncInfo->resType = pFuncObj->resType; + pFuncInfo->resBytes = htons(pFuncObj->resBytes); pOutput += sizeof(SFunctionInfoMsg) + pFuncObj->contLen; } diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index c59067c4b3..ad9ec50db8 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -205,7 +205,7 @@ typedef struct SAggFunctionInfo { int8_t stableFuncId; // transfer function for super table query uint16_t status; - bool (*init)(SQLFunctionCtx *pCtx); // setup the execute environment + bool (*init)(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultCellInfo); // setup the execute environment void (*xFunction)(SQLFunctionCtx *pCtx); // blocks version function void (*xFunctionF)(SQLFunctionCtx *pCtx, int32_t position); // single-row function version, todo merge with blockwise function @@ -269,9 +269,9 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const cha static FORCE_INLINE void initResultInfo(SResultRowCellInfo *pResInfo, int32_t bufLen) { pResInfo->initialized = true; // the this struct has been initialized flag - pResInfo->complete = false; + pResInfo->complete = false; pResInfo->hasResult = false; - pResInfo->numOfRes = 0; + pResInfo->numOfRes = 0; memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen); } diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 5ff574ec67..b3dbeae8c5 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -28,6 +28,7 @@ #include "tarray.h" #include "tlockfree.h" #include "tsdb.h" +#include "qUdf.h" struct SColumnFilterElem; typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, const char* val1, const char* val2, int16_t type); @@ -339,6 +340,7 @@ typedef struct SQueryParam { SColIndex *pGroupColIndex; SColumnInfo *pTagColumnInfo; SSqlGroupbyExpr *pGroupbyExpr; + SUdfInfo *pUdfInfo; } SQueryParam; typedef struct STableScanInfo { @@ -424,7 +426,7 @@ typedef struct SSWindowOperatorInfo { void freeParam(SQueryParam *param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, - SColumnInfo* pTagCols); + SColumnInfo* pTagCols, SUdfInfo* pUdfInfo); int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, SExprInfo *prevExpr); diff --git a/src/query/inc/qUdf.h b/src/query/inc/qUdf.h new file mode 100644 index 0000000000..32ac2b0741 --- /dev/null +++ b/src/query/inc/qUdf.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_QUDF_H +#define TDENGINE_QUDF_H + +typedef struct SUdfInfo { + int32_t functionId; // system assigned function id + int8_t funcType; // scalar function or aggregate function + int8_t resType; // result type + int16_t resBytes; // result byte + int32_t contLen; // content length + char *name; // function name + union { // file path or [in memory] binary content + char *content; + char *path; + }; +} SUdfInfo; + +#endif // TDENGINE_QUDF_H diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index f18d093b89..e32ef2479f 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -168,6 +168,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI qError("Illegal data type %d or data type length %d", dataType, dataBytes); return TSDB_CODE_TSC_INVALID_SQL; } + + if (functionId < 0) { + return TSDB_CODE_SUCCESS; + } if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAGPRJ || @@ -381,14 +385,14 @@ static void no_next_step(SQLFunctionCtx *pCtx) { pResInfo->complete = true; } -static bool function_setup(SQLFunctionCtx *pCtx) { - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - if (pResInfo->initialized) { +static bool function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) { +// SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + if (pResultInfo->initialized) { return false; } memset(pCtx->pOutput, 0, (size_t)pCtx->outputBytes); - initResultInfo(pResInfo, pCtx->interBufBytes); + initResultInfo(pResultInfo, pCtx->interBufBytes); return true; } @@ -1088,8 +1092,8 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, } } -static bool min_func_setup(SQLFunctionCtx *pCtx) { - if (!function_setup(pCtx)) { +static bool min_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) { + if (!function_setup(pCtx, pResultInfo)) { return false; // not initialized since it has been initialized } @@ -1133,8 +1137,8 @@ static bool min_func_setup(SQLFunctionCtx *pCtx) { return true; } -static bool max_func_setup(SQLFunctionCtx *pCtx) { - if (!function_setup(pCtx)) { +static bool max_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) { + if (!function_setup(pCtx, pResultInfo)) { return false; // not initialized since it has been initialized } @@ -1809,8 +1813,8 @@ static void stddev_dst_finalizer(SQLFunctionCtx *pCtx) { } ////////////////////////////////////////////////////////////////////////////////////// -static bool first_last_function_setup(SQLFunctionCtx *pCtx) { - if (!function_setup(pCtx)) { +static bool first_last_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { + if (!function_setup(pCtx, pResInfo)) { return false; } @@ -2555,14 +2559,13 @@ static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SQLFunctionCtx *pCtx) { } } -static bool top_bottom_function_setup(SQLFunctionCtx *pCtx) { - if (!function_setup(pCtx)) { +static bool top_bottom_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { + if (!function_setup(pCtx, pResInfo)) { return false; } STopBotInfo *pInfo = getTopBotOutputInfo(pCtx); buildTopBotStruct(pInfo, pCtx); - return true; } @@ -2749,14 +2752,13 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) { } /////////////////////////////////////////////////////////////////////////////////////////////// -static bool percentile_function_setup(SQLFunctionCtx *pCtx) { - if (!function_setup(pCtx)) { +static bool percentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) { + if (!function_setup(pCtx, pResultInfo)) { return false; } // in the first round, get the min-max value of all involved data - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); + SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo); SET_DOUBLE_VAL(&pInfo->minval, DBL_MAX); SET_DOUBLE_VAL(&pInfo->maxval, -DBL_MAX); pInfo->numOfElems = 0; @@ -2945,8 +2947,8 @@ static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) { return pInfo; } -static bool apercentile_function_setup(SQLFunctionCtx *pCtx) { - if (!function_setup(pCtx)) { +static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) { + if (!function_setup(pCtx, pResultInfo)) { return false; } @@ -3073,12 +3075,11 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) { } ///////////////////////////////////////////////////////////////////////////////// -static bool leastsquares_function_setup(SQLFunctionCtx *pCtx) { - if (!function_setup(pCtx)) { +static bool leastsquares_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { + if (!function_setup(pCtx, pResInfo)) { return false; } - SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); SLeastsquaresInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); // 2*3 matrix @@ -3378,8 +3379,8 @@ enum { INITIAL_VALUE_NOT_ASSIGNED = 0, }; -static bool diff_function_setup(SQLFunctionCtx *pCtx) { - if (function_setup(pCtx)) { +static bool diff_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { + if (function_setup(pCtx, pResInfo)) { return false; } @@ -3727,12 +3728,12 @@ static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) { } ///////////////////////////////////////////////////////////////////////////////// -static bool spread_function_setup(SQLFunctionCtx *pCtx) { - if (!function_setup(pCtx)) { +static bool spread_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { + if (!function_setup(pCtx, pResInfo)) { return false; } - SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); // this is the server-side setup function in client-side, the secondary merge do not need this procedure if (pCtx->currentStage == MERGE_STAGE) { @@ -3929,12 +3930,10 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) { * param[2]: end time * @param pCtx */ -static bool twa_function_setup(SQLFunctionCtx *pCtx) { - if (!function_setup(pCtx)) { +static bool twa_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { + if (!function_setup(pCtx, pResInfo)) { return false; } - - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); pInfo->p.key = INT64_MIN; @@ -4326,14 +4325,12 @@ static void interp_function(SQLFunctionCtx *pCtx) { } } -static bool ts_comp_function_setup(SQLFunctionCtx *pCtx) { - if (!function_setup(pCtx)) { +static bool ts_comp_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { + if (!function_setup(pCtx, pResInfo)) { return false; // not initialized since it has been initialized } - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); STSCompInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); - pInfo->pTSBuf = tsBufCreate(false, pCtx->order); pInfo->pTSBuf->tsOrder = pCtx->order; return true; @@ -4435,13 +4432,12 @@ static double do_calc_rate(const SRateInfo* pRateInfo) { return resultVal; } -static bool rate_function_setup(SQLFunctionCtx *pCtx) { - if (!function_setup(pCtx)) { +static bool rate_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { + if (!function_setup(pCtx, pResInfo)) { return false; } - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); //->pOutput + pCtx->outputBytes; - SRateInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo); + SRateInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); pInfo->CorrectionValue = 0; pInfo->firstKey = INT64_MIN; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index b57a578f2a..9f8fe0bcb2 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -35,13 +35,6 @@ #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) -#define CHECK_IF_QUERY_KILLED(_q) \ - do { \ - if (isQueryKilled((_q)->qinfo)) { \ - longjmp((_q)->env, TSDB_CODE_TSC_QUERY_CANCELLED); \ - } \ - } while (0) - #define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0} #define TIME_WINDOW_COPY(_dst, _src) do {\ @@ -116,6 +109,7 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { if (pQuery->precision == TSDB_TIME_PRECISION_MICRO) { key /= 1000; } + if (pQuery->interval.intervalUnit == 'y') { interval *= 12; } @@ -3066,7 +3060,11 @@ void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) { continue; } - aAggs[pCtx[j].functionId].init(&pCtx[j]); + if (pCtx[j].functionId < 0) { // udf initialize + + } else { + aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo); + } } } @@ -3205,7 +3203,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe } if (!pResInfo->initialized) { - aAggs[functionId].init(&pCtx[i]); + aAggs[functionId].init(&pCtx[i], pResInfo); } } } @@ -5482,6 +5480,9 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { pQueryMsg->prevResultLen = htonl(pQueryMsg->prevResultLen); pQueryMsg->sw.gap = htobe64(pQueryMsg->sw.gap); pQueryMsg->sw.primaryColId = htonl(pQueryMsg->sw.primaryColId); + pQueryMsg->udfContentOffset = htonl(pQueryMsg->udfContentOffset); + pQueryMsg->udfContentLen = htonl(pQueryMsg->udfContentLen); + pQueryMsg->udfNum = htonl(pQueryMsg->udfNum); // query msg safety check if (!validateQueryMsg(pQueryMsg)) { @@ -5728,6 +5729,27 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { pMsg = (char *)pQueryMsg + pQueryMsg->tsOffset + pQueryMsg->tsLen; } + if (pQueryMsg->udfContentLen > 0) { + param->pUdfInfo = calloc(1, sizeof(SUdfInfo)); + param->pUdfInfo->contLen = pQueryMsg->udfContentLen; + + pMsg = (char*)pQueryMsg + pQueryMsg->udfContentOffset; + param->pUdfInfo->resType = *(int8_t*) pMsg; + pMsg += sizeof(int8_t); + + param->pUdfInfo->resBytes = htons(*(int16_t*)pMsg); + pMsg += sizeof(int16_t); + + tstr* name = (tstr*)(pMsg); + param->pUdfInfo->name = strndup(name->data, name->len); + + pMsg += varDataTLen(name); + param->pUdfInfo->content = malloc(pQueryMsg->udfContentLen); + memcpy(param->pUdfInfo->content, pMsg, pQueryMsg->udfContentLen); + + pMsg += pQueryMsg->udfContentLen; + } + param->sql = strndup(pMsg, pQueryMsg->sqlstrLen); if (!validateQuerySourceCols(pQueryMsg, param->pExprMsg, param->pTagColumnInfo)) { @@ -5790,12 +5812,34 @@ static int32_t updateOutputBufForTopBotQuery(SQueryTableMsg* pQueryMsg, SColumnI return TSDB_CODE_SUCCESS; } +static UNUSED_FUNC int32_t flushUdfContentToDisk(SUdfInfo* pUdfInfo) { + if (pUdfInfo == NULL) { + return TSDB_CODE_SUCCESS; + } + + char path[PATH_MAX] = {0}; + taosGetTmpfilePath("script", path); + + FILE* file = fopen(path, "w+"); + + // TODO check for failure of flush to disk + /*size_t t = */ fwrite(pUdfInfo->content, pUdfInfo->contLen, 1, file); + fclose(file); + + tfree(pUdfInfo->content); + pUdfInfo->path = strdup(path); + return TSDB_CODE_SUCCESS; +} + // TODO tag length should be passed from client int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo, - SSqlFuncMsg** pExprMsg, SColumnInfo* pTagCols) { + SSqlFuncMsg** pExprMsg, SColumnInfo* pTagCols, SUdfInfo* pUdfInfo) { *pExprInfo = NULL; int32_t code = TSDB_CODE_SUCCESS; + // save the udf script or so file +// flushUdfContentToDisk(pUdfInfo); + SExprInfo *pExprs = (SExprInfo *)calloc(pQueryMsg->numOfOutput, sizeof(SExprInfo)); if (pExprs == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; @@ -5871,10 +5915,15 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutpu return TSDB_CODE_QRY_INVALID_MSG; } - if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].type, &pExprs[i].bytes, - &pExprs[i].interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) { - tfree(pExprs); - return TSDB_CODE_QRY_INVALID_MSG; + if (pExprs[i].base.functionId < 0) { + pExprs[i].type = pUdfInfo->resType; + pExprs[i].bytes = pUdfInfo->resBytes; + } else { + if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].type, &pExprs[i].bytes, + &pExprs[i].interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) { + tfree(pExprs); + return TSDB_CODE_QRY_INVALID_MSG; + } } if (pExprs[i].base.functionId == TSDB_FUNC_TAG_DUMMY || pExprs[i].base.functionId == TSDB_FUNC_TS_DUMMY) { diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 7eb5cc2600..d84ca0ad7a 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -91,7 +91,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi goto _over; } - if ((code = createQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->numOfOutput, ¶m.pExprs, param.pExprMsg, param.pTagColumnInfo)) != TSDB_CODE_SUCCESS) { + if ((code = createQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->numOfOutput, ¶m.pExprs, param.pExprMsg, param.pTagColumnInfo, param.pUdfInfo)) != TSDB_CODE_SUCCESS) { goto _over; } -- GitLab