提交 15fd8a03 编写于 作者: D dapan1121

support load dll

上级 6998f365
...@@ -184,7 +184,7 @@ do { \ ...@@ -184,7 +184,7 @@ do { \
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 33 #define TSDB_DB_NAME_LEN 33
#define TSDB_FUNC_NAME_LEN 65 #define TSDB_FUNC_NAME_LEN 65
#define TSDB_FUNC_CODE_LEN (4096 - 512) #define TSDB_FUNC_CODE_LEN (65535 - 512)
#define TSDB_TYPE_STR_MAX_LEN 32 #define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN) #define TSDB_TABLE_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN)
#define TSDB_COL_NAME_LEN 65 #define TSDB_COL_NAME_LEN 65
......
...@@ -178,7 +178,7 @@ int32_t* taosGetErrno(); ...@@ -178,7 +178,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_FUNC_LEN TAOS_DEF_ERROR_CODE(0, 0x0371) //"Invalid func length") #define TSDB_CODE_MND_INVALID_FUNC_LEN TAOS_DEF_ERROR_CODE(0, 0x0371) //"Invalid func length")
#define TSDB_CODE_MND_INVALID_FUNC_CODE TAOS_DEF_ERROR_CODE(0, 0x0372) //"Invalid func code") #define TSDB_CODE_MND_INVALID_FUNC_CODE TAOS_DEF_ERROR_CODE(0, 0x0372) //"Invalid func code")
#define TSDB_CODE_MND_FUNC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0373) //"Func already exists") #define TSDB_CODE_MND_FUNC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0373) //"Func already exists")
#define TSDB_CODE_MND_INVALID_FUNC TAOS_DEF_ERROR_CODE(0, 0x0351) //"Invalid func") #define TSDB_CODE_MND_INVALID_FUNC TAOS_DEF_ERROR_CODE(0, 0x0374) //"Invalid func")
#define TSDB_CODE_MND_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0380) //"Database not specified or available") #define TSDB_CODE_MND_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0380) //"Database not specified or available")
#define TSDB_CODE_MND_DB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0381) //"Database already exists") #define TSDB_CODE_MND_DB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0381) //"Database already exists")
...@@ -261,6 +261,7 @@ int32_t* taosGetErrno(); ...@@ -261,6 +261,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW TAOS_DEF_ERROR_CODE(0, 0x070A) //"Too many time window in query") #define TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW TAOS_DEF_ERROR_CODE(0, 0x070A) //"Too many time window in query")
#define TSDB_CODE_QRY_NOT_ENOUGH_BUFFER TAOS_DEF_ERROR_CODE(0, 0x070B) //"Query buffer limit has reached") #define TSDB_CODE_QRY_NOT_ENOUGH_BUFFER TAOS_DEF_ERROR_CODE(0, 0x070B) //"Query buffer limit has reached")
#define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica") #define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica")
#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error")
// grant // grant
......
...@@ -273,7 +273,7 @@ typedef struct { ...@@ -273,7 +273,7 @@ typedef struct {
void * pIter; void * pIter;
void ** ppShow; void ** ppShow;
int16_t offset[TSDB_MAX_COLUMNS]; int16_t offset[TSDB_MAX_COLUMNS];
int16_t bytes[TSDB_MAX_COLUMNS]; int32_t bytes[TSDB_MAX_COLUMNS];
int32_t numOfReads; int32_t numOfReads;
int8_t maxReplica; int8_t maxReplica;
int8_t reserved0[1]; int8_t reserved0[1];
......
...@@ -231,7 +231,7 @@ int32_t mnodeCreateFunc(SAcctObj *pAcct, char *name, int32_t codeLen, char *code ...@@ -231,7 +231,7 @@ int32_t mnodeCreateFunc(SAcctObj *pAcct, char *name, int32_t codeLen, char *code
pFunc = calloc(1, sizeof(SFuncObj)); pFunc = calloc(1, sizeof(SFuncObj));
tstrncpy(pFunc->name, name, TSDB_FUNC_NAME_LEN); tstrncpy(pFunc->name, name, TSDB_FUNC_NAME_LEN);
tstrncpy(pFunc->path, path, tListLen(pFunc->path)); tstrncpy(pFunc->path, path, tListLen(pFunc->path));
tstrncpy(pFunc->cont, codeScript, codeLen); memcpy(pFunc->cont, codeScript, codeLen);
pFunc->contLen = codeLen; pFunc->contLen = codeLen;
pFunc->createdTime = taosGetTimestampMs(); pFunc->createdTime = taosGetTimestampMs();
pFunc->resType = outputType; pFunc->resType = outputType;
...@@ -429,7 +429,7 @@ static int32_t mnodeProcessRetrieveFuncImplMsg(SMnodeMsg *pMsg) { ...@@ -429,7 +429,7 @@ static int32_t mnodeProcessRetrieveFuncImplMsg(SMnodeMsg *pMsg) {
SRetrieveFuncMsg *pInfo = pMsg->rpcMsg.pCont; SRetrieveFuncMsg *pInfo = pMsg->rpcMsg.pCont;
pInfo->num = htonl(pInfo->num); pInfo->num = htonl(pInfo->num);
int32_t t = sizeof(SUdfFuncMsg) + sizeof(SFunctionInfoMsg) * pInfo->num + 16384; int32_t t = sizeof(SUdfFuncMsg) + (sizeof(SFunctionInfoMsg) + TSDB_FUNC_CODE_LEN) * pInfo->num + 16384;
SUdfFuncMsg *pFuncMsg = rpcMallocCont(t); SUdfFuncMsg *pFuncMsg = rpcMallocCont(t);
pFuncMsg->num = htonl(pInfo->num); pFuncMsg->num = htonl(pInfo->num);
...@@ -441,6 +441,11 @@ static int32_t mnodeProcessRetrieveFuncImplMsg(SMnodeMsg *pMsg) { ...@@ -441,6 +441,11 @@ static int32_t mnodeProcessRetrieveFuncImplMsg(SMnodeMsg *pMsg) {
tstrncpy(buf, name->data, TSDB_FUNC_NAME_LEN); tstrncpy(buf, name->data, TSDB_FUNC_NAME_LEN);
SFuncObj* pFuncObj = mnodeGetFunc(buf); SFuncObj* pFuncObj = mnodeGetFunc(buf);
if (pFuncObj == NULL) {
mError("function %s does not exist", buf);
return TSDB_CODE_MND_INVALID_FUNC;
}
SFunctionInfoMsg* pFuncInfo = (SFunctionInfoMsg*) pOutput; SFunctionInfoMsg* pFuncInfo = (SFunctionInfoMsg*) pOutput;
strcpy(pFuncInfo->name, buf); strcpy(pFuncInfo->name, buf);
...@@ -455,4 +460,4 @@ static int32_t mnodeProcessRetrieveFuncImplMsg(SMnodeMsg *pMsg) { ...@@ -455,4 +460,4 @@ static int32_t mnodeProcessRetrieveFuncImplMsg(SMnodeMsg *pMsg) {
pMsg->rpcRsp.rsp = pFuncMsg; pMsg->rpcRsp.rsp = pFuncMsg;
pMsg->rpcRsp.len = (pOutput - (char*)pFuncMsg); pMsg->rpcRsp.len = (pOutput - (char*)pFuncMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
\ No newline at end of file
...@@ -68,6 +68,7 @@ extern "C" { ...@@ -68,6 +68,7 @@ extern "C" {
#include "osSysinfo.h" #include "osSysinfo.h"
#include "osTime.h" #include "osTime.h"
#include "osTimer.h" #include "osTimer.h"
#include "osSystem.h"
void osInit(); void osInit();
......
...@@ -77,6 +77,7 @@ extern "C" { ...@@ -77,6 +77,7 @@ extern "C" {
#include <sys/resource.h> #include <sys/resource.h>
#include <error.h> #include <error.h>
#include <math.h> #include <math.h>
#include <dlfcn.h>
#define TAOS_OS_FUNC_LZ4 #define TAOS_OS_FUNC_LZ4
#define BUILDIN_CLZL(val) __builtin_clzll(val) #define BUILDIN_CLZL(val) __builtin_clzll(val)
......
...@@ -80,6 +80,7 @@ extern "C" { ...@@ -80,6 +80,7 @@ extern "C" {
#endif #endif
#include <linux/sysctl.h> #include <linux/sysctl.h>
#include <math.h> #include <math.h>
#include <dlfcn.h>
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_OS_SYSTEM_H
#define TDENGINE_OS_SYSTEM_H
#ifdef __cplusplus
extern "C" {
#endif
void* taosLoadDll(const char *filename);
void* taosLoadSym(void* handle, char* name);
void taosCloseDll(void *handle);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tulog.h"
void* taosLoadDll(const char *filename) {
void *handle = dlopen (filename, RTLD_LAZY);
if (!handle) {
uError("load dll:%s failed, error:%s", filename, dlerror());
return NULL;
}
uDebug("dll %s loaded", filename);
return handle;
}
void* taosLoadSym(void* handle, char* name) {
void* sym = dlsym(handle, name);
char* error = NULL;
if ((error = dlerror()) != NULL) {
uWarn("load sym:%s failed, error:%s", name, dlerror());
return NULL;
}
uDebug("sym %s loaded", name)
return sym;
}
void taosCloseDll(void *handle) {
if (handle) {
dlclose(handle);
}
}
...@@ -4,4 +4,4 @@ PROJECT(TDengine) ...@@ -4,4 +4,4 @@ PROJECT(TDengine)
AUX_SOURCE_DIRECTORY(. SRC) AUX_SOURCE_DIRECTORY(. SRC)
ADD_LIBRARY(os ${SRC}) ADD_LIBRARY(os ${SRC})
TARGET_LINK_LIBRARIES(os m rt z) TARGET_LINK_LIBRARIES(os m rt z dl)
...@@ -261,6 +261,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -261,6 +261,7 @@ typedef struct SQueryRuntimeEnv {
SRspResultInfo resultInfo; SRspResultInfo resultInfo;
SHashObj *pTableRetrieveTsMap; SHashObj *pTableRetrieveTsMap;
SUdfInfo *pUdfInfo;
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
enum { enum {
...@@ -432,7 +433,7 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu ...@@ -432,7 +433,7 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu
SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code); SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code);
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql, uint64_t *qId); SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql, uint64_t *qId, SUdfInfo* pUdfInfo);
int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, SQueryParam* param, bool isSTable); int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, SQueryParam* param, bool isSTable);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
...@@ -441,6 +442,8 @@ int32_t checkForQueryBuf(size_t numOfTables); ...@@ -441,6 +442,8 @@ int32_t checkForQueryBuf(size_t numOfTables);
bool doBuildResCheck(SQInfo* pQInfo); bool doBuildResCheck(SQInfo* pQInfo);
void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status); void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status);
void destroyUdfInfo(SUdfInfo* pUdfInfo);
bool onlyQueryTags(SQuery* pQuery); bool onlyQueryTags(SQuery* pQuery);
bool isValidQInfo(void *param); bool isValidQInfo(void *param);
......
...@@ -16,6 +16,19 @@ ...@@ -16,6 +16,19 @@
#ifndef TDENGINE_QUDF_H #ifndef TDENGINE_QUDF_H
#define TDENGINE_QUDF_H #define TDENGINE_QUDF_H
enum { TSDB_UDF_FUNC_NORMAL = 0, TSDB_UDF_FUNC_INIT, TSDB_UDF_FUNC_AGG, TSDB_UDF_FUNC_MAX_NUM };
typedef struct SUdfInit{
int32_t maybe_null; /* 1 if function can return NULL */
uint32_t decimals; /* for real functions */
uint64_t length; /* For string functions */
char *ptr; /* free pointer for function data */
int32_t const_item; /* 0 if result is independent of arguments */
} SUdfInit;
typedef struct SUdfInfo { typedef struct SUdfInfo {
int32_t functionId; // system assigned function id int32_t functionId; // system assigned function id
int8_t funcType; // scalar function or aggregate function int8_t funcType; // scalar function or aggregate function
...@@ -23,10 +36,18 @@ typedef struct SUdfInfo { ...@@ -23,10 +36,18 @@ typedef struct SUdfInfo {
int16_t resBytes; // result byte int16_t resBytes; // result byte
int32_t contLen; // content length int32_t contLen; // content length
char *name; // function name char *name; // function name
void *handle; // handle loaded in mem
void *funcs[TSDB_UDF_FUNC_MAX_NUM]; // function ptr
SUdfInit init;
union { // file path or [in memory] binary content union { // file path or [in memory] binary content
char *content; char *content;
char *path; char *path;
}; };
} SUdfInfo; } SUdfInfo;
typedef void (*udfNormalFunc)(char* data, int8_t type, int32_t numOfRows, int64_t* ts, char* dataOutput, char* tsOutput,
int32_t* numOfOutput, char* buf);
typedef void (*udfInitFunc)(SUdfInit* data);
#endif // TDENGINE_QUDF_H #endif // TDENGINE_QUDF_H
...@@ -187,7 +187,7 @@ static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* ...@@ -187,7 +187,7 @@ static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo*
static int32_t getGroupbyColumnIndex(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); static int32_t getGroupbyColumnIndex(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size); static void initCtxOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t size);
static void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); static void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
static bool isPointInterpoQuery(SQuery *pQuery); static bool isPointInterpoQuery(SQuery *pQuery);
static void setResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo); static void setResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo);
...@@ -753,9 +753,18 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo ...@@ -753,9 +753,18 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
return num; return num;
} }
static void doInvokeUdf(char* data, int8_t type, int32_t numOfRows, int64_t* ts, char* dataOutput, char* tsOutput, static void doInvokeUdf(SUdfInfo* pUdfInfo, char* data, int8_t type, int32_t numOfRows, int64_t* ts, char* dataOutput, char* tsOutput,
int32_t* numOfOutput, char* buf) { int32_t* numOfOutput, char* buf) {
if (pUdfInfo && pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]) {
qDebug("invoke udf function:%s,%p", pUdfInfo->name, pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]);
(*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])(data, type, numOfRows, ts, dataOutput, tsOutput, numOfOutput, buf);
return;
}
qError("empty udf function");
return;
} }
static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset,
...@@ -792,8 +801,8 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx ...@@ -792,8 +801,8 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
int32_t output = 0; int32_t output = 0;
char* buf = GET_ROWCELL_INTERBUF(pCtx[k].resultInfo); char* buf = GET_ROWCELL_INTERBUF(pCtx[k].resultInfo);
doInvokeUdf(pCtx[k].pInput, pCtx[k].inputType, pCtx[k].size, pCtx[k].ptsList, pCtx[k].pOutput, doInvokeUdf(pRuntimeEnv->pUdfInfo, pCtx[k].pInput, pCtx[k].inputType, pCtx[k].size, pCtx[k].ptsList, pCtx[k].pOutput,
pCtx[k].ptsOutputBuf, &output, buf); pCtx[k].ptsOutputBuf, &output, buf);
// set the output value exist // set the output value exist
pCtx[k].resultInfo->numOfRes = output; pCtx[k].resultInfo->numOfRes = output;
...@@ -1031,8 +1040,18 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction ...@@ -1031,8 +1040,18 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
pCtx[k].startTs = startTs;// this can be set during create the struct pCtx[k].startTs = startTs;// this can be set during create the struct
// aAggs[functionId].xFunction(&pCtx[k]); // aAggs[functionId].xFunction(&pCtx[k]);
if (functionId < 0) { if (functionId < 0) {
// load the script and exec, pRuntimeEnv->pUdfInfo int32_t output = 0;
char* buf = GET_ROWCELL_INTERBUF(pCtx[k].resultInfo);
doInvokeUdf(pRuntimeEnv->pUdfInfo, pCtx[k].pInput, pCtx[k].inputType, pCtx[k].size, pCtx[k].ptsList, pCtx[k].pOutput,
pCtx[k].ptsOutputBuf, &output, buf);
// set the output value exist
pCtx[k].resultInfo->numOfRes = output;
if (output > 0) {
pCtx[k].resultInfo->hasResult = DATA_SET_FLAG;
}
} else { } else {
aAggs[functionId].xFunction(&pCtx[k]); aAggs[functionId].xFunction(&pCtx[k]);
} }
...@@ -1051,7 +1070,7 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC ...@@ -1051,7 +1070,7 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC
int32_t output = 0; int32_t output = 0;
char* buf = GET_ROWCELL_INTERBUF(pCtx[k].resultInfo); char* buf = GET_ROWCELL_INTERBUF(pCtx[k].resultInfo);
doInvokeUdf(pCtx[k].pInput, pCtx[k].inputType, pCtx[k].size, pCtx[k].ptsList, pCtx[k].pOutput, doInvokeUdf(pRuntimeEnv->pUdfInfo, pCtx[k].pInput, pCtx[k].inputType, pCtx[k].size, pCtx[k].ptsList, pCtx[k].pOutput,
pCtx[k].ptsOutputBuf, &output, buf); pCtx[k].ptsOutputBuf, &output, buf);
// set the output value exist // set the output value exist
...@@ -1486,7 +1505,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOp ...@@ -1486,7 +1505,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOp
} }
setResultOutputBuf(pRuntimeEnv, pResultRow, pCtx, numOfCols, rowCellInfoOffset); setResultOutputBuf(pRuntimeEnv, pResultRow, pCtx, numOfCols, rowCellInfoOffset);
initCtxOutputBuffer(pCtx, numOfCols); initCtxOutputBuffer(pRuntimeEnv, pCtx, numOfCols);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1873,6 +1892,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -1873,6 +1892,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tfree(pRuntimeEnv->sasArray); tfree(pRuntimeEnv->sasArray);
} }
destroyUdfInfo(pRuntimeEnv->pUdfInfo);
destroyResultBuf(pRuntimeEnv->pResultBuf); destroyResultBuf(pRuntimeEnv->pResultBuf);
doFreeQueryHandle(pRuntimeEnv); doFreeQueryHandle(pRuntimeEnv);
...@@ -3064,7 +3085,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i ...@@ -3064,7 +3085,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
} }
} }
initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols); initCtxOutputBuffer(pRuntimeEnv, pCtx, pDataBlock->info.numOfCols);
} }
void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) { void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) {
...@@ -3100,7 +3121,7 @@ void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) { ...@@ -3100,7 +3121,7 @@ void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) {
} }
} }
void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) { void initCtxOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t size) {
for (int32_t j = 0; j < size; ++j) { for (int32_t j = 0; j < size; ++j) {
SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]); SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
if (pResInfo->initialized) { if (pResInfo->initialized) {
...@@ -3108,7 +3129,9 @@ void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) { ...@@ -3108,7 +3129,9 @@ void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) {
} }
if (pCtx[j].functionId < 0) { // todo udf initialization if (pCtx[j].functionId < 0) { // todo udf initialization
if (pRuntimeEnv->pUdfInfo && pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_INIT]) {
(*(udfInitFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_INIT])(&pRuntimeEnv->pUdfInfo->init);
}
} else { } else {
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo); aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
} }
...@@ -5859,7 +5882,43 @@ static int32_t updateOutputBufForTopBotQuery(SQueryTableMsg* pQueryMsg, SColumnI ...@@ -5859,7 +5882,43 @@ static int32_t updateOutputBufForTopBotQuery(SQueryTableMsg* pQueryMsg, SColumnI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static UNUSED_FUNC int32_t flushUdfContentToDisk(SUdfInfo* pUdfInfo) { void destroyUdfInfo(SUdfInfo* pUdfInfo) {
if (pUdfInfo == NULL) {
return;
}
tfree(pUdfInfo->name);
if (pUdfInfo->path) {
unlink(pUdfInfo->path);
}
tfree(pUdfInfo->path);
taosCloseDll(pUdfInfo->handle);
tfree(pUdfInfo);
}
static char* getUdfFuncName(char* name, int type) {
char* funcname = calloc(1, TSDB_FUNCTIONS_NAME_MAX_LENGTH + 10);
switch (type) {
case TSDB_UDF_FUNC_NORMAL:
strcpy(funcname, name);
break;
case TSDB_UDF_FUNC_INIT:
sprintf(funcname, "%s_init", name);
break;
default:
assert(0);
break;
}
return funcname;
}
static int32_t flushUdfContentToDisk(SUdfInfo* pUdfInfo) {
if (pUdfInfo == NULL) { if (pUdfInfo == NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -5875,6 +5934,20 @@ static UNUSED_FUNC int32_t flushUdfContentToDisk(SUdfInfo* pUdfInfo) { ...@@ -5875,6 +5934,20 @@ static UNUSED_FUNC int32_t flushUdfContentToDisk(SUdfInfo* pUdfInfo) {
tfree(pUdfInfo->content); tfree(pUdfInfo->content);
pUdfInfo->path = strdup(path); pUdfInfo->path = strdup(path);
pUdfInfo->handle = taosLoadDll(path);
if (NULL == pUdfInfo->handle) {
return TSDB_CODE_QRY_SYS_ERROR;
}
pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_NORMAL));
if (NULL == pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]) {
return TSDB_CODE_QRY_SYS_ERROR;
}
pUdfInfo->funcs[TSDB_UDF_FUNC_INIT] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_INIT));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -5885,7 +5958,10 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutpu ...@@ -5885,7 +5958,10 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutpu
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
// save the udf script or so file // save the udf script or so file
// flushUdfContentToDisk(pUdfInfo); code = flushUdfContentToDisk(pUdfInfo);
if (code) {
return code;
}
SExprInfo *pExprs = (SExprInfo *)calloc(pQueryMsg->numOfOutput, sizeof(SExprInfo)); SExprInfo *pExprs = (SExprInfo *)calloc(pQueryMsg->numOfOutput, sizeof(SExprInfo));
if (pExprs == NULL) { if (pExprs == NULL) {
...@@ -6189,7 +6265,7 @@ FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) { ...@@ -6189,7 +6265,7 @@ FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) {
SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs, SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs,
SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery,
char* sql, uint64_t *qId) { char* sql, uint64_t *qId, SUdfInfo* pUdfInfo) {
int16_t numOfCols = pQueryMsg->numOfCols; int16_t numOfCols = pQueryMsg->numOfCols;
int16_t numOfOutput = pQueryMsg->numOfOutput; int16_t numOfOutput = pQueryMsg->numOfOutput;
...@@ -6203,6 +6279,8 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr ...@@ -6203,6 +6279,8 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
SQuery* pQuery = &pQInfo->query; SQuery* pQuery = &pQInfo->query;
pQInfo->runtimeEnv.pQuery = pQuery; pQInfo->runtimeEnv.pQuery = pQuery;
pQInfo->runtimeEnv.pUdfInfo = pUdfInfo;
pQuery->tableGroupInfo = *pTableGroupInfo; pQuery->tableGroupInfo = *pTableGroupInfo;
pQuery->numOfCols = numOfCols; pQuery->numOfCols = numOfCols;
pQuery->numOfOutput = numOfOutput; pQuery->numOfOutput = numOfOutput;
......
...@@ -158,18 +158,19 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi ...@@ -158,18 +158,19 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
goto _over; goto _over;
} }
(*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, isSTableQuery, param.sql, qId); (*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, isSTableQuery, param.sql, qId, param.pUdfInfo);
if ((*pQInfo) == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _over;
}
param.sql = NULL; param.sql = NULL;
param.pExprs = NULL; param.pExprs = NULL;
param.pSecExprs = NULL; param.pSecExprs = NULL;
param.pGroupbyExpr = NULL; param.pGroupbyExpr = NULL;
param.pTagColumnInfo = NULL; param.pTagColumnInfo = NULL;
param.pUdfInfo = NULL;
if ((*pQInfo) == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _over;
}
code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, &param, isSTableQuery); code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, &param, isSTableQuery);
...@@ -178,6 +179,8 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi ...@@ -178,6 +179,8 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
taosArrayDestroy(param.pGroupbyExpr->columnInfo); taosArrayDestroy(param.pGroupbyExpr->columnInfo);
} }
destroyUdfInfo(param.pUdfInfo);
taosArrayDestroy(param.pTableIdList); taosArrayDestroy(param.pTableIdList);
param.pTableIdList = NULL; param.pTableIdList = NULL;
......
...@@ -272,6 +272,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_IN_EXEC, "Multiple retrieval of ...@@ -272,6 +272,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_IN_EXEC, "Multiple retrieval of
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW, "Too many time window in query") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW, "Too many time window in query")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, "Query buffer limit has reached") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, "Query buffer limit has reached")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in replica") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in replica")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error")
// grant // grant
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册