From 15fd8a03a7d54da1173ef48c5c8715be2f1022e0 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Wed, 24 Mar 2021 10:02:21 +0800 Subject: [PATCH] support load dll --- src/inc/taosdef.h | 2 +- src/inc/taoserror.h | 3 +- src/mnode/inc/mnodeDef.h | 2 +- src/mnode/src/mnodeFunc.c | 11 +++- src/os/inc/os.h | 1 + src/os/inc/osLinux32.h | 1 + src/os/inc/osLinux64.h | 1 + src/os/inc/osSystem.h | 31 ++++++++++ src/os/src/detail/osSystem.c | 54 +++++++++++++++++ src/os/src/linux/CMakeLists.txt | 2 +- src/query/inc/qExecutor.h | 5 +- src/query/inc/qUdf.h | 21 +++++++ src/query/src/qExecutor.c | 104 ++++++++++++++++++++++++++++---- src/query/src/queryMain.c | 15 +++-- src/util/src/terror.c | 1 + 15 files changed, 227 insertions(+), 27 deletions(-) create mode 100644 src/os/inc/osSystem.h create mode 100644 src/os/src/detail/osSystem.c diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 139ad6f135..6e56e5059a 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -184,7 +184,7 @@ do { \ #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string #define TSDB_DB_NAME_LEN 33 #define TSDB_FUNC_NAME_LEN 65 -#define TSDB_FUNC_CODE_LEN (4096 - 512) +#define TSDB_FUNC_CODE_LEN (65535 - 512) #define TSDB_TYPE_STR_MAX_LEN 32 #define TSDB_TABLE_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN) #define TSDB_COL_NAME_LEN 65 diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 588ec73058..2576285ae0 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -178,7 +178,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_INVALID_FUNC_LEN TAOS_DEF_ERROR_CODE(0, 0x0371) //"Invalid func length") #define TSDB_CODE_MND_INVALID_FUNC_CODE TAOS_DEF_ERROR_CODE(0, 0x0372) //"Invalid func code") #define TSDB_CODE_MND_FUNC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0373) //"Func already exists") -#define TSDB_CODE_MND_INVALID_FUNC TAOS_DEF_ERROR_CODE(0, 0x0351) //"Invalid func") +#define TSDB_CODE_MND_INVALID_FUNC TAOS_DEF_ERROR_CODE(0, 0x0374) //"Invalid func") #define TSDB_CODE_MND_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0380) //"Database not specified or available") #define TSDB_CODE_MND_DB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0381) //"Database already exists") @@ -261,6 +261,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW TAOS_DEF_ERROR_CODE(0, 0x070A) //"Too many time window in query") #define TSDB_CODE_QRY_NOT_ENOUGH_BUFFER TAOS_DEF_ERROR_CODE(0, 0x070B) //"Query buffer limit has reached") #define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica") +#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error") // grant diff --git a/src/mnode/inc/mnodeDef.h b/src/mnode/inc/mnodeDef.h index ba685a9bdc..2a44d372d1 100644 --- a/src/mnode/inc/mnodeDef.h +++ b/src/mnode/inc/mnodeDef.h @@ -273,7 +273,7 @@ typedef struct { void * pIter; void ** ppShow; int16_t offset[TSDB_MAX_COLUMNS]; - int16_t bytes[TSDB_MAX_COLUMNS]; + int32_t bytes[TSDB_MAX_COLUMNS]; int32_t numOfReads; int8_t maxReplica; int8_t reserved0[1]; diff --git a/src/mnode/src/mnodeFunc.c b/src/mnode/src/mnodeFunc.c index 803c6a45b2..50bc6c7ae5 100644 --- a/src/mnode/src/mnodeFunc.c +++ b/src/mnode/src/mnodeFunc.c @@ -231,7 +231,7 @@ int32_t mnodeCreateFunc(SAcctObj *pAcct, char *name, int32_t codeLen, char *code pFunc = calloc(1, sizeof(SFuncObj)); tstrncpy(pFunc->name, name, TSDB_FUNC_NAME_LEN); tstrncpy(pFunc->path, path, tListLen(pFunc->path)); - tstrncpy(pFunc->cont, codeScript, codeLen); + memcpy(pFunc->cont, codeScript, codeLen); pFunc->contLen = codeLen; pFunc->createdTime = taosGetTimestampMs(); pFunc->resType = outputType; @@ -429,7 +429,7 @@ static int32_t mnodeProcessRetrieveFuncImplMsg(SMnodeMsg *pMsg) { SRetrieveFuncMsg *pInfo = pMsg->rpcMsg.pCont; pInfo->num = htonl(pInfo->num); - int32_t t = sizeof(SUdfFuncMsg) + sizeof(SFunctionInfoMsg) * pInfo->num + 16384; + int32_t t = sizeof(SUdfFuncMsg) + (sizeof(SFunctionInfoMsg) + TSDB_FUNC_CODE_LEN) * pInfo->num + 16384; SUdfFuncMsg *pFuncMsg = rpcMallocCont(t); pFuncMsg->num = htonl(pInfo->num); @@ -441,6 +441,11 @@ static int32_t mnodeProcessRetrieveFuncImplMsg(SMnodeMsg *pMsg) { tstrncpy(buf, name->data, TSDB_FUNC_NAME_LEN); SFuncObj* pFuncObj = mnodeGetFunc(buf); + if (pFuncObj == NULL) { + mError("function %s does not exist", buf); + return TSDB_CODE_MND_INVALID_FUNC; + } + SFunctionInfoMsg* pFuncInfo = (SFunctionInfoMsg*) pOutput; strcpy(pFuncInfo->name, buf); @@ -455,4 +460,4 @@ static int32_t mnodeProcessRetrieveFuncImplMsg(SMnodeMsg *pMsg) { pMsg->rpcRsp.rsp = pFuncMsg; pMsg->rpcRsp.len = (pOutput - (char*)pFuncMsg); return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} diff --git a/src/os/inc/os.h b/src/os/inc/os.h index 8312b74a50..a384ac9b06 100644 --- a/src/os/inc/os.h +++ b/src/os/inc/os.h @@ -68,6 +68,7 @@ extern "C" { #include "osSysinfo.h" #include "osTime.h" #include "osTimer.h" +#include "osSystem.h" void osInit(); diff --git a/src/os/inc/osLinux32.h b/src/os/inc/osLinux32.h index cfef05368f..b55c85db88 100644 --- a/src/os/inc/osLinux32.h +++ b/src/os/inc/osLinux32.h @@ -77,6 +77,7 @@ extern "C" { #include #include #include +#include #define TAOS_OS_FUNC_LZ4 #define BUILDIN_CLZL(val) __builtin_clzll(val) diff --git a/src/os/inc/osLinux64.h b/src/os/inc/osLinux64.h index a2febd51b7..a5296689c7 100644 --- a/src/os/inc/osLinux64.h +++ b/src/os/inc/osLinux64.h @@ -80,6 +80,7 @@ extern "C" { #endif #include #include +#include #ifdef __cplusplus } diff --git a/src/os/inc/osSystem.h b/src/os/inc/osSystem.h new file mode 100644 index 0000000000..e7a3ec13ae --- /dev/null +++ b/src/os/inc/osSystem.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_OS_SYSTEM_H +#define TDENGINE_OS_SYSTEM_H + +#ifdef __cplusplus +extern "C" { +#endif + +void* taosLoadDll(const char *filename); +void* taosLoadSym(void* handle, char* name); +void taosCloseDll(void *handle); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/os/src/detail/osSystem.c b/src/os/src/detail/osSystem.c new file mode 100644 index 0000000000..052b7a22a8 --- /dev/null +++ b/src/os/src/detail/osSystem.c @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "tconfig.h" +#include "tglobal.h" +#include "tulog.h" + +void* taosLoadDll(const char *filename) { + void *handle = dlopen (filename, RTLD_LAZY); + if (!handle) { + uError("load dll:%s failed, error:%s", filename, dlerror()); + return NULL; + } + + uDebug("dll %s loaded", filename); + + return handle; +} + +void* taosLoadSym(void* handle, char* name) { + void* sym = dlsym(handle, name); + char* error = NULL; + + if ((error = dlerror()) != NULL) { + uWarn("load sym:%s failed, error:%s", name, dlerror()); + return NULL; + } + + uDebug("sym %s loaded", name) + + return sym; +} + +void taosCloseDll(void *handle) { + if (handle) { + dlclose(handle); + } +} + + diff --git a/src/os/src/linux/CMakeLists.txt b/src/os/src/linux/CMakeLists.txt index b1a7ebf54e..add64b21d9 100644 --- a/src/os/src/linux/CMakeLists.txt +++ b/src/os/src/linux/CMakeLists.txt @@ -4,4 +4,4 @@ PROJECT(TDengine) AUX_SOURCE_DIRECTORY(. SRC) ADD_LIBRARY(os ${SRC}) -TARGET_LINK_LIBRARIES(os m rt z) +TARGET_LINK_LIBRARIES(os m rt z dl) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index b3dbeae8c5..f955155445 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -261,6 +261,7 @@ typedef struct SQueryRuntimeEnv { SRspResultInfo resultInfo; SHashObj *pTableRetrieveTsMap; + SUdfInfo *pUdfInfo; } SQueryRuntimeEnv; enum { @@ -432,7 +433,7 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code); SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, - SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql, uint64_t *qId); + SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql, uint64_t *qId, SUdfInfo* pUdfInfo); int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, SQueryParam* param, bool isSTable); void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); @@ -441,6 +442,8 @@ int32_t checkForQueryBuf(size_t numOfTables); bool doBuildResCheck(SQInfo* pQInfo); void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status); +void destroyUdfInfo(SUdfInfo* pUdfInfo); + bool onlyQueryTags(SQuery* pQuery); bool isValidQInfo(void *param); diff --git a/src/query/inc/qUdf.h b/src/query/inc/qUdf.h index 32ac2b0741..7592d45419 100644 --- a/src/query/inc/qUdf.h +++ b/src/query/inc/qUdf.h @@ -16,6 +16,19 @@ #ifndef TDENGINE_QUDF_H #define TDENGINE_QUDF_H +enum { TSDB_UDF_FUNC_NORMAL = 0, TSDB_UDF_FUNC_INIT, TSDB_UDF_FUNC_AGG, TSDB_UDF_FUNC_MAX_NUM }; + + + +typedef struct SUdfInit{ + int32_t maybe_null; /* 1 if function can return NULL */ + uint32_t decimals; /* for real functions */ + uint64_t length; /* For string functions */ + char *ptr; /* free pointer for function data */ + int32_t const_item; /* 0 if result is independent of arguments */ +} SUdfInit; + + typedef struct SUdfInfo { int32_t functionId; // system assigned function id int8_t funcType; // scalar function or aggregate function @@ -23,10 +36,18 @@ typedef struct SUdfInfo { int16_t resBytes; // result byte int32_t contLen; // content length char *name; // function name + void *handle; // handle loaded in mem + void *funcs[TSDB_UDF_FUNC_MAX_NUM]; // function ptr + SUdfInit init; union { // file path or [in memory] binary content char *content; char *path; }; } SUdfInfo; +typedef void (*udfNormalFunc)(char* data, int8_t type, int32_t numOfRows, int64_t* ts, char* dataOutput, char* tsOutput, + int32_t* numOfOutput, char* buf); +typedef void (*udfInitFunc)(SUdfInit* data); + + #endif // TDENGINE_QUDF_H diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 1baa68b841..5357c21dca 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -187,7 +187,7 @@ static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* static int32_t getGroupbyColumnIndex(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); -static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size); +static void initCtxOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t size); static void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); static bool isPointInterpoQuery(SQuery *pQuery); static void setResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo); @@ -753,9 +753,18 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo return num; } -static void doInvokeUdf(char* data, int8_t type, int32_t numOfRows, int64_t* ts, char* dataOutput, char* tsOutput, +static void doInvokeUdf(SUdfInfo* pUdfInfo, char* data, int8_t type, int32_t numOfRows, int64_t* ts, char* dataOutput, char* tsOutput, int32_t* numOfOutput, char* buf) { + if (pUdfInfo && pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]) { + qDebug("invoke udf function:%s,%p", pUdfInfo->name, pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]); + + (*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])(data, type, numOfRows, ts, dataOutput, tsOutput, numOfOutput, buf); + return; + } + + qError("empty udf function"); + return; } static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, @@ -792,8 +801,8 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx int32_t output = 0; char* buf = GET_ROWCELL_INTERBUF(pCtx[k].resultInfo); - doInvokeUdf(pCtx[k].pInput, pCtx[k].inputType, pCtx[k].size, pCtx[k].ptsList, pCtx[k].pOutput, - pCtx[k].ptsOutputBuf, &output, buf); + doInvokeUdf(pRuntimeEnv->pUdfInfo, pCtx[k].pInput, pCtx[k].inputType, pCtx[k].size, pCtx[k].ptsList, pCtx[k].pOutput, + pCtx[k].ptsOutputBuf, &output, buf); // set the output value exist pCtx[k].resultInfo->numOfRes = output; @@ -1031,8 +1040,18 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction pCtx[k].startTs = startTs;// this can be set during create the struct // aAggs[functionId].xFunction(&pCtx[k]); if (functionId < 0) { - // load the script and exec, pRuntimeEnv->pUdfInfo + int32_t output = 0; + char* buf = GET_ROWCELL_INTERBUF(pCtx[k].resultInfo); + + doInvokeUdf(pRuntimeEnv->pUdfInfo, pCtx[k].pInput, pCtx[k].inputType, pCtx[k].size, pCtx[k].ptsList, pCtx[k].pOutput, + pCtx[k].ptsOutputBuf, &output, buf); + // set the output value exist + pCtx[k].resultInfo->numOfRes = output; + if (output > 0) { + pCtx[k].resultInfo->hasResult = DATA_SET_FLAG; + } + } else { aAggs[functionId].xFunction(&pCtx[k]); } @@ -1051,7 +1070,7 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC int32_t output = 0; char* buf = GET_ROWCELL_INTERBUF(pCtx[k].resultInfo); - doInvokeUdf(pCtx[k].pInput, pCtx[k].inputType, pCtx[k].size, pCtx[k].ptsList, pCtx[k].pOutput, + doInvokeUdf(pRuntimeEnv->pUdfInfo, pCtx[k].pInput, pCtx[k].inputType, pCtx[k].size, pCtx[k].ptsList, pCtx[k].pOutput, pCtx[k].ptsOutputBuf, &output, buf); // set the output value exist @@ -1486,7 +1505,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOp } setResultOutputBuf(pRuntimeEnv, pResultRow, pCtx, numOfCols, rowCellInfoOffset); - initCtxOutputBuffer(pCtx, numOfCols); + initCtxOutputBuffer(pRuntimeEnv, pCtx, numOfCols); return TSDB_CODE_SUCCESS; } @@ -1873,6 +1892,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tfree(pRuntimeEnv->sasArray); } + + destroyUdfInfo(pRuntimeEnv->pUdfInfo); destroyResultBuf(pRuntimeEnv->pResultBuf); doFreeQueryHandle(pRuntimeEnv); @@ -3064,7 +3085,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i } } - initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols); + initCtxOutputBuffer(pRuntimeEnv, pCtx, pDataBlock->info.numOfCols); } void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) { @@ -3100,7 +3121,7 @@ void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) { } } -void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) { +void initCtxOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t size) { for (int32_t j = 0; j < size; ++j) { SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]); if (pResInfo->initialized) { @@ -3108,7 +3129,9 @@ void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) { } if (pCtx[j].functionId < 0) { // todo udf initialization - + if (pRuntimeEnv->pUdfInfo && pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_INIT]) { + (*(udfInitFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_INIT])(&pRuntimeEnv->pUdfInfo->init); + } } else { aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo); } @@ -5859,7 +5882,43 @@ static int32_t updateOutputBufForTopBotQuery(SQueryTableMsg* pQueryMsg, SColumnI return TSDB_CODE_SUCCESS; } -static UNUSED_FUNC int32_t flushUdfContentToDisk(SUdfInfo* pUdfInfo) { +void destroyUdfInfo(SUdfInfo* pUdfInfo) { + if (pUdfInfo == NULL) { + return; + } + + tfree(pUdfInfo->name); + + if (pUdfInfo->path) { + unlink(pUdfInfo->path); + } + + tfree(pUdfInfo->path); + + taosCloseDll(pUdfInfo->handle); + + tfree(pUdfInfo); +} + +static char* getUdfFuncName(char* name, int type) { + char* funcname = calloc(1, TSDB_FUNCTIONS_NAME_MAX_LENGTH + 10); + + switch (type) { + case TSDB_UDF_FUNC_NORMAL: + strcpy(funcname, name); + break; + case TSDB_UDF_FUNC_INIT: + sprintf(funcname, "%s_init", name); + break; + default: + assert(0); + break; + } + + return funcname; +} + +static int32_t flushUdfContentToDisk(SUdfInfo* pUdfInfo) { if (pUdfInfo == NULL) { return TSDB_CODE_SUCCESS; } @@ -5875,6 +5934,20 @@ static UNUSED_FUNC int32_t flushUdfContentToDisk(SUdfInfo* pUdfInfo) { tfree(pUdfInfo->content); pUdfInfo->path = strdup(path); + + pUdfInfo->handle = taosLoadDll(path); + + if (NULL == pUdfInfo->handle) { + return TSDB_CODE_QRY_SYS_ERROR; + } + + pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_NORMAL)); + if (NULL == pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]) { + return TSDB_CODE_QRY_SYS_ERROR; + } + + pUdfInfo->funcs[TSDB_UDF_FUNC_INIT] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_INIT)); + return TSDB_CODE_SUCCESS; } @@ -5885,7 +5958,10 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutpu int32_t code = TSDB_CODE_SUCCESS; // save the udf script or so file -// flushUdfContentToDisk(pUdfInfo); + code = flushUdfContentToDisk(pUdfInfo); + if (code) { + return code; + } SExprInfo *pExprs = (SExprInfo *)calloc(pQueryMsg->numOfOutput, sizeof(SExprInfo)); if (pExprs == NULL) { @@ -6189,7 +6265,7 @@ FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) { SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs, SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, - char* sql, uint64_t *qId) { + char* sql, uint64_t *qId, SUdfInfo* pUdfInfo) { int16_t numOfCols = pQueryMsg->numOfCols; int16_t numOfOutput = pQueryMsg->numOfOutput; @@ -6203,6 +6279,8 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr SQuery* pQuery = &pQInfo->query; pQInfo->runtimeEnv.pQuery = pQuery; + pQInfo->runtimeEnv.pUdfInfo = pUdfInfo; + pQuery->tableGroupInfo = *pTableGroupInfo; pQuery->numOfCols = numOfCols; pQuery->numOfOutput = numOfOutput; diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index d84ca0ad7a..6777372505 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -158,18 +158,19 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi goto _over; } - (*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, isSTableQuery, param.sql, qId); + (*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, isSTableQuery, param.sql, qId, param.pUdfInfo); + + if ((*pQInfo) == NULL) { + code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _over; + } param.sql = NULL; param.pExprs = NULL; param.pSecExprs = NULL; param.pGroupbyExpr = NULL; param.pTagColumnInfo = NULL; - - if ((*pQInfo) == NULL) { - code = TSDB_CODE_QRY_OUT_OF_MEMORY; - goto _over; - } + param.pUdfInfo = NULL; code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, ¶m, isSTableQuery); @@ -178,6 +179,8 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi taosArrayDestroy(param.pGroupbyExpr->columnInfo); } + destroyUdfInfo(param.pUdfInfo); + taosArrayDestroy(param.pTableIdList); param.pTableIdList = NULL; diff --git a/src/util/src/terror.c b/src/util/src/terror.c index 42acc4be75..1a92b16719 100644 --- a/src/util/src/terror.c +++ b/src/util/src/terror.c @@ -272,6 +272,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_IN_EXEC, "Multiple retrieval of TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW, "Too many time window in query") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, "Query buffer limit has reached") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in replica") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error") // grant -- GitLab