From af03391f3756bb1575f8e33a47a70b0578b995a5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 5 Apr 2021 11:44:09 +0800 Subject: [PATCH] support lua script --- cmake/define.inc | 4 +- src/client/src/tscSQLParser.c | 7 +- src/client/src/tscSystem.c | 7 +- src/dnode/src/dnodeMain.c | 2 + src/query/inc/qScript.h | 85 ++++++ src/query/inc/qUdf.h | 8 +- src/query/src/qExecutor.c | 73 +++-- src/query/src/qScript.c | 484 ++++++++++++++++++++++++++++++++++ 8 files changed, 637 insertions(+), 33 deletions(-) create mode 100644 src/query/inc/qScript.h create mode 100644 src/query/src/qScript.c diff --git a/cmake/define.inc b/cmake/define.inc index 6f49630d5c..1ed09e79d6 100755 --- a/cmake/define.inc +++ b/cmake/define.inc @@ -57,7 +57,7 @@ IF (TD_LINUX_64) ADD_DEFINITIONS(-D_M_X64) ADD_DEFINITIONS(-D_TD_LINUX_64) MESSAGE(STATUS "linux64 is defined") - SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") + SET(COMMON_FLAGS "-llua -std=gnu99 -Wall -Werror -fPIC -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") ADD_DEFINITIONS(-DUSE_LIBICONV) ENDIF () @@ -65,7 +65,7 @@ IF (TD_LINUX_32) ADD_DEFINITIONS(-D_TD_LINUX_32) ADD_DEFINITIONS(-DUSE_LIBICONV) MESSAGE(STATUS "linux32 is defined") - SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -fsigned-char -munaligned-access -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") + SET(COMMON_FLAGS "-llua -std=gnu99 -Wall -Werror -fPIC -fsigned-char -munaligned-access -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") ENDIF () IF (TD_ARM_64) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index e33854cf7c..80a8cf3d6c 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -34,6 +34,7 @@ #include "tstoken.h" #include "tstrbuild.h" #include "ttokendef.h" +#include "qScript.h" #define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0" @@ -285,6 +286,7 @@ int32_t handleUserDefinedFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) { const char *msg1 = "function name is too long"; const char *msg2 = "path is too long"; const char *msg3 = "invalid outputtype"; + const char *msg4 = "invalid script"; SSqlCmd *pCmd = &pSql->cmd; switch (pInfo->type) { @@ -317,8 +319,9 @@ int32_t handleUserDefinedFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) { if (ret) { return ret; } - - + if (isValidScript(buf)) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4); + } //TODO CHECK CODE if (len + sizeof(SCreateFuncMsg) > pSql->cmd.allocSize) { diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index bd79f81846..658f63c3d1 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -28,6 +28,7 @@ #include "tconfig.h" #include "ttimezone.h" #include "tlocale.h" +#include "qScript.h" // global, not configurable #define TSC_VAR_NOT_RELEASE 1 @@ -146,6 +147,8 @@ void taos_init_imp(void) { taosInitNotes(); rpcInit(); + + scriptEnvPoolInit(); tscDebug("starting to initialize TAOS client ..."); tscDebug("Local End Point is:%s", tsLocalEp); } @@ -200,7 +203,9 @@ void taos_cleanup(void) { if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) { return; } - + if (tscEmbedded == 0) { + scriptEnvPoolCleanup(); + } taosHashCleanup(tscTableMetaInfo); tscTableMetaInfo = NULL; diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index fb46709440..455092ad83 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -40,6 +40,7 @@ #include "dnodeShell.h" #include "dnodeTelemetry.h" #include "module.h" +#include "qScript.h" #if !defined(_MODULE) || !defined(_TD_LINUX) int32_t moduleStart() { return 0; } @@ -80,6 +81,7 @@ static SStep tsDnodeSteps[] = { {"dnode-shell", dnodeInitShell, dnodeCleanupShell}, {"dnode-statustmr", dnodeInitStatusTimer,dnodeCleanupStatusTimer}, {"dnode-telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry}, + {"dnode-script", scriptEnvPoolInit, scriptEnvPoolCleanup}, }; static int dnodeCreateDir(const char *dir) { diff --git a/src/query/inc/qScript.h b/src/query/inc/qScript.h new file mode 100644 index 0000000000..4760b19e8d --- /dev/null +++ b/src/query/inc/qScript.h @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_QSCRIPT_H +#define TDENGINE_QSCRIPT_H + +#include +#include +#include +#include + +#include "tutil.h" +#include "hash.h" +#include "tlist.h" +#include "qUdf.h" + +#define MAX_FUNC_NAME 64 + +enum ScriptState { + SCRIPT_STATE_INIT, + SCRIPT_STATE_ADD, + SCRIPT_STATE_MERGE, + SCRIPT_STATE_FINALIZE +}; + + +typedef struct { + SHashObj *funcId; //func already registed in lua_env, may be no use + lua_State *lua_state; // lua env +} ScriptEnv; + +typedef struct ScriptCtx { + char funcName[MAX_FUNC_NAME]; + int8_t state; + ScriptEnv *pEnv; + int8_t isAgg; // agg function or not + //void(*callback)(struct ScriptCtx*ctx, char *input, int16_t iType, int16_t iBytes, int32_t numOfInput, int64_t* ts, char* dataOutput, + // char *tsOutput, int32_t* numOfOutput, char *interbuf, int16_t oType, int16_t oBytes); + + // init value of udf script + int8_t type; + union {int64_t i; double d;} initValue; + int32_t numOfOutput; + int32_t offset; + +} ScriptCtx; + +int taosLoadScriptInit(SUdfInit* pSUdfInit); +int taosLoadScriptNormal(char *pInput, int16_t iType, int16_t iBytes, int32_t numOfRows, + int64_t *ptsList, char* pOutput, char *ptsOutput, int32_t *numOfOutput, + int16_t oType, int16_t oBytes, SUdfInit *init); +int taosLoadScriptFinalize(char *pOutput, int32_t output, SUdfInit *init); +int taosLoadScriptDestroy(SUdfInit* pSUdfInit); + +typedef struct { + SList *scriptEnvs; // + int32_t mSize; // pool limit + int32_t cSize; // current available size + pthread_mutex_t mutex; +} ScriptEnvPool; + +ScriptCtx* createScriptCtx(const char *str); +void destroyScriptCtx(void *pScriptCtx); + +int32_t scriptEnvPoolInit(); +void scriptEnvPoolCleanup(); +bool isValidScript(const char *sript); + + +void execUdf(struct ScriptCtx*ctx, char *input, int16_t iType, int16_t iBytes, int32_t numOfInput, + int64_t* ts, char* dataOutput, char *tsOutput, int32_t* numOfOutput, char *interbuf, int16_t oType, int16_t oBytes); + +#endif //TDENGINE_QSCRIPT_H diff --git a/src/query/inc/qUdf.h b/src/query/inc/qUdf.h index aa8f32c16e..7a1facebb1 100644 --- a/src/query/inc/qUdf.h +++ b/src/query/inc/qUdf.h @@ -26,6 +26,10 @@ typedef struct SUdfInit{ uint64_t length; /* For string functions */ char *ptr; /* free pointer for function data */ int32_t const_item; /* 0 if result is independent of arguments */ + + // script like lua/javascript + void* script_ctx; + void (*destroyCtxFunc)(void *script_ctx); } SUdfInit; @@ -45,8 +49,8 @@ typedef struct SUdfInfo { }; } SUdfInfo; -typedef void (*udfNormalFunc)(char* data, int16_t type, int32_t numOfRows, int64_t* ts, char* dataOutput, char* tsOutput, - int32_t* numOfOutput, SUdfInit* buf); +typedef void (*udfNormalFunc)(char* data, int16_t itype, int16_t iBytes, int32_t numOfRows, int64_t* ts, char* dataOutput, char* tsOutput, + int32_t* numOfOutput, int16_t oType, int16_t oByte, SUdfInit* buf); typedef int32_t (*udfInitFunc)(SUdfInit* data); typedef void (*udfFinalizeFunc)(char* dataOutput, int32_t* numOfOutput, SUdfInit* buf); typedef void (*udfDestroyFunc)(SUdfInit* buf); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 9ba2978395..0e196cadeb 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -27,6 +27,7 @@ #include "tlosertree.h" #include "ttype.h" #include "tscompression.h" +#include "qScript.h" #define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN) #define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN) @@ -779,8 +780,8 @@ static void doInvokeUdf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int if (pUdfInfo && pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]) { qDebug("invoke udf function:%s,%p", pUdfInfo->name, pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]); - (*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])((char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->size, pCtx->ptsList, pCtx->pOutput, - (char *)pCtx->ptsOutputBuf, &output, &pUdfInfo->init); + (*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])((char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->pOutput, + (char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes, &pUdfInfo->init); // set the output value exist pCtx->resultInfo->numOfRes += output; @@ -3144,6 +3145,7 @@ void initCtxOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, in } if (pCtx[j].functionId < 0) { // todo udf initialization + continue; } else { aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo); @@ -5982,40 +5984,59 @@ static int32_t initUdfInfo(SUdfInfo* pUdfInfo) { if (pUdfInfo == NULL) { return TSDB_CODE_SUCCESS; } + if (isValidScript(pUdfInfo->content)) { + //refactor(dengyihao) + ScriptCtx *pScriptCtx = createScriptCtx(pUdfInfo->content); + pUdfInfo->init.script_ctx = pScriptCtx; - char path[PATH_MAX] = {0}; - taosGetTmpfilePath("script", path); + pUdfInfo->funcs[TSDB_UDF_FUNC_INIT] = taosLoadScriptInit; + if (pUdfInfo->funcs[TSDB_UDF_FUNC_INIT] == NULL + || (*(udfInitFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_INIT])(&pUdfInfo->init) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_QRY_SYS_ERROR; + } - FILE* file = fopen(path, "w+"); + pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL] = taosLoadScriptNormal; - // TODO check for failure of flush to disk - /*size_t t = */ fwrite(pUdfInfo->content, pUdfInfo->contLen, 1, file); - fclose(file); + if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { + pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadScriptFinalize; + } + pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY] = taosLoadScriptDestroy; + + } else { + char path[PATH_MAX] = {0}; + taosGetTmpfilePath("script", path); - tfree(pUdfInfo->content); - pUdfInfo->path = strdup(path); + FILE* file = fopen(path, "w+"); - pUdfInfo->handle = taosLoadDll(path); + // TODO check for failure of flush to disk + /*size_t t = */ fwrite(pUdfInfo->content, pUdfInfo->contLen, 1, file); + fclose(file); + tfree(pUdfInfo->content); + + pUdfInfo->path = strdup(path); - if (NULL == pUdfInfo->handle) { - return TSDB_CODE_QRY_SYS_ERROR; - } + pUdfInfo->handle = taosLoadDll(path); - pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_NORMAL)); - if (NULL == pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]) { - return TSDB_CODE_QRY_SYS_ERROR; - } + if (NULL == pUdfInfo->handle) { + return TSDB_CODE_QRY_SYS_ERROR; + } - pUdfInfo->funcs[TSDB_UDF_FUNC_INIT] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_INIT)); - - if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { - pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_FINALIZE)); - } + pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_NORMAL)); + if (NULL == pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]) { + return TSDB_CODE_QRY_SYS_ERROR; + } - pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_DESTROY)); + pUdfInfo->funcs[TSDB_UDF_FUNC_INIT] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_INIT)); + + if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { + pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_FINALIZE)); + } - if (pUdfInfo->funcs[TSDB_UDF_FUNC_INIT]) { - return (*(udfInitFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_INIT])(&pUdfInfo->init); + pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(pUdfInfo->name, TSDB_UDF_FUNC_DESTROY)); + + if (pUdfInfo->funcs[TSDB_UDF_FUNC_INIT]) { + return (*(udfInitFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_INIT])(&pUdfInfo->init); + } } return TSDB_CODE_SUCCESS; diff --git a/src/query/src/qScript.c b/src/query/src/qScript.c new file mode 100644 index 0000000000..ce2466fe2c --- /dev/null +++ b/src/query/src/qScript.c @@ -0,0 +1,484 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "qScript.h" +#include "ttype.h" +#include "tstrbuild.h" +#include "queryLog.h" + +static ScriptEnvPool *pool = NULL; + + +typedef int(*ScriptInit)(SUdfInit *init); +typedef int(*ScriptNormal)(char *pInput, int8_t iType, int32_t size, int64_t *ptsList, char* pOutput, char *ptsOutput, int32_t *output, SUdfInit *init); +typedef int(*ScriptFinalize)(char *pOutput, int32_t output, SUdfInit *init); +typedef int(*ScriptDestroy)(SUdfInit *init); + +#define USER_FUNC_NAME "funcName" +#define USER_FUNC_NAME_LIMIT 48 + +static ScriptEnv* getScriptEnvFromPool(); +static void addScriptEnvToPool(ScriptEnv *pEnv); + +static lua_State* createLuaEnv(); +static void destroyLuaEnv(lua_State *state); + +static void destroyScriptEnv(ScriptEnv *pEnv); + +static void luaValueToTaosType(lua_State *lua, int16_t iType, char *interBuf, int16_t oType, int32_t *numOfOutput, int16_t oBytes); +static void taosValueToLuaType(lua_State *lua, int32_t type, char *val); + +static bool hasBaseFuncDefinedInScript(lua_State *lua, const char *funcPrefix, int32_t len); + +static int userlib_exampleFunc(lua_State *lua) { + double op1 = luaL_checknumber(lua,1); + double op2 = luaL_checknumber(lua,2); + lua_pushnumber(lua, op1 * op2); + return 1; +} +void luaRegisterLibFunc(lua_State *lua) { + lua_register(lua, "exampleFunc", userlib_exampleFunc); +} + +void luaLoadLib(lua_State *lua, const char *libname, lua_CFunction luafunc) { + lua_pushcfunction(lua, luafunc); + lua_pushstring(lua, libname); + lua_call(lua, 1, 0); +} + +LUALIB_API int (luaopen_cjson) (lua_State *L); +LUALIB_API int (luaopen_struct) (lua_State *L); +LUALIB_API int (luaopen_cmsgpack) (lua_State *L); +LUALIB_API int (luaopen_bit) (lua_State *L); + + +static void luaLoadLibraries(lua_State *lua) { + luaLoadLib(lua, "", luaopen_base); + luaLoadLib(lua, LUA_TABLIBNAME, luaopen_table); + luaLoadLib(lua, LUA_STRLIBNAME, luaopen_string); + luaLoadLib(lua, LUA_MATHLIBNAME, luaopen_math); + luaLoadLib(lua, LUA_DBLIBNAME, luaopen_debug); + luaLoadLib(lua, "cjson", luaopen_cjson); + luaLoadLib(lua, "struct", luaopen_struct); + luaLoadLib(lua, "cmsgpack", luaopen_cmsgpack); + luaLoadLib(lua, "bit", luaopen_bit); +} +static void luaRemoveUnsupportedFunctions(lua_State *lua) { + lua_pushnil(lua); + lua_setglobal(lua,"loadfile"); + lua_pushnil(lua); + lua_setglobal(lua,"dofile"); +} +void taosValueToLuaType(lua_State *lua, int32_t type, char *val) { + //TODO(dengyihao): handle more data type + if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) { + int64_t v; + GET_TYPED_DATA(v, int64_t, type, val); + lua_pushnumber(lua, (lua_Number)v); + } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { + uint64_t v; + GET_TYPED_DATA(v, uint64_t, type, val); + lua_pushnumber(lua,(lua_Number)v); + } else if (IS_FLOAT_TYPE(type)) { + double v; + GET_TYPED_DATA(v, double, type, val); + lua_pushnumber(lua,v); + } else if (type == TSDB_DATA_TYPE_BINARY) { + lua_pushlstring(lua, val, varDataLen(val)); + } else if (type == TSDB_DATA_TYPE_NCHAR) { + } +} +int taosLoadScriptInit(SUdfInit* pInit) { + if (pInit->script_ctx == NULL) { return -1;} + + pInit->destroyCtxFunc = destroyScriptCtx; + + ScriptCtx *pCtx = pInit->script_ctx; + char funcName[MAX_FUNC_NAME] = {0}; + sprintf(funcName, "%s_init", pCtx->funcName); + + lua_State* lua = pCtx->pEnv->lua_state; + lua_getglobal(lua, funcName); + if (lua_pcall(lua, 0, -1, 0)) { + lua_pop(lua, -1); + } + if (lua_isnumber(lua, -1)) { + pCtx->initValue.d = lua_tonumber(lua, -1); + } else if (lua_isboolean(lua, -1)){ + pCtx->initValue.i = lua_tointeger(lua, -1); + } else if (lua_istable(lua, -1)) { + // TODO(dengyihao) handle more type + } + return 0; +} +int taosLoadScriptNormal(char *pInput, int16_t iType, int16_t iBytes, int32_t numOfRows, + int64_t *ptsList, char* pOutput, char *ptsOutput, int32_t *numOfOutput, + int16_t oType, int16_t oBytes, SUdfInit *pInit) { + ScriptCtx* pCtx = pInit->script_ctx; + + char funcName[MAX_FUNC_NAME] = {0}; + sprintf(funcName, "%s_add", pCtx->funcName); + lua_State* lua = pCtx->pEnv->lua_state; + lua_getglobal(lua, funcName); + + // first param of script; + lua_newtable(lua); + int32_t offset = 0; + for (int32_t i = 0; i < numOfRows; i++) { + taosValueToLuaType(lua, iType, pInput + offset); + lua_rawseti(lua, -2, i+1); + offset += iBytes; + } + + lua_pushnumber(lua, pCtx->initValue.d); + // do call lua script + if (lua_pcall(lua, 2, 1, 0) != 0) { + qError("SCRIPT ERROR: %s", lua_tostring(lua, -1)); + lua_pop(lua, -1); + return -1; + } + + int tNumOfOutput = 0; + luaValueToTaosType(lua, iType, pOutput, oType, &tNumOfOutput, oBytes); + pCtx->numOfOutput += tNumOfOutput; + *numOfOutput = pCtx->numOfOutput; + return 0; +} +int taosLoadScriptFinalize(char *pOutput, int32_t output, SUdfInit *pInit) { + return 0; +} +int taosLoadScriptDestroy(SUdfInit* pInit) { + pInit->destroyCtxFunc(pInit->script_ctx); + return 0; +} + +ScriptCtx* createScriptCtx(const char *script) { + ScriptCtx *pCtx = (ScriptCtx *)calloc(1, sizeof(ScriptCtx)); + pCtx->state = SCRIPT_STATE_INIT; + pCtx->pEnv = getScriptEnvFromPool(); // + + if (pCtx->pEnv == NULL) { + destroyScriptCtx(pCtx); + return NULL; + } + + lua_State *lua = pCtx->pEnv->lua_state; + if (luaL_dostring(lua, script)) { + lua_pop(lua, 1); + qError("dynamic load script failed"); + destroyScriptCtx(pCtx); + return NULL; + } + lua_getglobal(lua, USER_FUNC_NAME); + char *name = lua_tostring(lua, -1); + if (name == NULL) { + lua_pop(lua, 1); + qError("SCRIPT ERROR: invalid script"); + destroyScriptCtx(pCtx); + return NULL; + } + memcpy(pCtx->funcName, name, strlen(name)); + lua_pop(lua, 1); + + return pCtx; +} +void destroyScriptCtx(void *pCtx) { + if (pCtx == NULL) return; + addScriptEnvToPool(((ScriptCtx *)pCtx)->pEnv); + free(pCtx); +} + +//void XXXX_init(ScriptCtx *pCtx) { +//} +// +//void XXXX_add(ScriptCtx *pCtx, char *input, int16_t iType, int16_t iBytes, int32_t numOfRows, +// char *dataOutput, int16_t oType, int32_t *numOfOutput, int16_t oBytes) { +// char funcName[MAX_FUNC_NAME] = {0}; +// sprintf(funcName, "%s_add", pCtx->funcName); +// +// lua_State* lua = pCtx->pEnv->lua_state; +// lua_getglobal(lua, funcName); +// +// // set first param of XXXX_add; +// lua_newtable(lua); +// int32_t offset = 0; +// for (int32_t i = 0; i < numOfRows; i++) { +// taosValueToLuaType(lua, iType, input + offset); +// lua_rawseti(lua, -2, i+1); +// offset += iBytes; +// } +// +// // set second param of XXXX_add; +// lua_pushnumber(lua, pCtx->initValue.d); +// // do call lua script +// if (lua_pcall(lua, 2, 1, 0) != 0) { +// qError("SCRIPT ERROR: %s", lua_tostring(lua, -1)); +// lua_pop(lua, -1); +// return; +// } +// int tNumOfOutput = 0; +// luaValueToTaosType(lua, iType, dataOutput, oType, &tNumOfOutput, oBytes); +// pCtx->numOfOutput += tNumOfOutput; +// *numOfOutput = pCtx->numOfOutput; +//} +//void XXXX_merge(ScriptCtx *pCtx) { +// char funcName[MAX_FUNC_NAME] = {0}; +// sprintf(funcName, "%s_merge", pCtx->funcName); +// +// lua_State* lua = pCtx->pEnv->lua_state; +// lua_getglobal(lua, funcName); +// +// // set first param of XXXX_merge; +// //lua_newtable(lua); +// //int32_t offset = 0; +// //for (int32_t i = 0; i < numOfRows; i++) { +// // taosValueToLuaType(lua, iType, input + offset); +// // lua_rawseti(lua, -2, i+1); +// // offset += iBytes; +// //} +// +// // set second param of XXXX_merge; +// //lua_newtable(lua); +// //offset = 0; +// //for (int32_t i = 0; i < numOfRows; i++) { +// // taosValueToLuaType(lua, iType, input + offset); +// // lua_rawseti(lua, -2, i+1); +// // offset += iBytes; +// //} +// // push two table +// //if (lua_pcall(lua, 2, 1, 0) != 0) { +// // qError("SCRIPT ERROR: %s", lua_tostring(lua, -1)); +// // lua_pop(lua, -1); +// // return; +// //} +// //int tNumOfOutput = 0; +// //luaValueToTaosType(lua, iType, dataOutput, oType, &tNumOfOutput, oBytes); +// //pCtx->numOfOutput += tNumOfOutput; +// //*numOfOutput = pCtx->numOfOutput; +//} +// +//void XXXX_finalize(ScriptCtx *pCtx) { +// char funcName[MAX_FUNC_NAME] = {0}; +// sprintf(funcName, "%s_finalize", pCtx->funcName); +// +// lua_State* lua = pCtx->pEnv->lua_state; +// lua_getglobal(lua, funcName); +// // push two paramter +// if (lua_pcall(lua, 2, 1, 0) != 0) { +// } +//} + + +void luaValueToTaosType(lua_State *lua, int16_t iType, char *interBuf, int16_t oType, int32_t *numOfOutput, int16_t oBytes) { + int t = lua_type(lua,-1); + int32_t sz = 0; + switch (t) { + case LUA_TSTRING: + //char *result = lua_tostring(lua, -1); + sz = 1; + // agg + break; + case LUA_TBOOLEAN: + //int64_t result = lua_tonumber(lua, -1); + sz = 1; + // agg + break; + case LUA_TNUMBER: + //int64_t result = lua_tonumber(lua, -1); + sz = 1; + break; + case LUA_TTABLE: + { + lua_pushnil(lua); + int32_t offset = 0; + while(lua_next(lua, -2)) { + int32_t v = lua_tonumber(lua, -1); + memcpy(interBuf + offset, (char *)&v, oBytes); + offset += oBytes; + lua_pop(lua, 1); + sz += 1; + } + } + break; + default: + break; + } + lua_pop(lua,1); // pop ret value from script + *numOfOutput = sz; +} + +//void execUdf(ScriptCtx *pCtx, char *input, int16_t iType, int16_t iBytes, int32_t numOfInput, +// int64_t* ts, char* dataOutput, char *tsOutput, int32_t* numOfOutput, char *interBuf, int16_t oType, int16_t oBytes) { +// int8_t state = pCtx->state; +// switch (state) { +// case SCRIPT_STATE_INIT: +// XXXX_init(pCtx); +// XXXX_add(pCtx, input, iType, iBytes, numOfInput, dataOutput, oType, numOfOutput, oBytes); +// break; +// case SCRIPT_STATE_ADD: +// XXXX_add(pCtx, input, iType, iBytes, numOfInput, dataOutput, oType, numOfOutput, oBytes); +// break; +// case SCRIPT_STATE_MERGE: +// XXXX_merge(pCtx); +// break; +// case SCRIPT_STATE_FINALIZE: +// XXXX_finalize(pCtx); +// break; +// default: +// return; +// } +//} + +/* +*Initialize the scripting environment. +*/ +lua_State* createLuaEnv() { + lua_State *lua = lua_open(); + luaLoadLibraries(lua); + luaRemoveUnsupportedFunctions(lua); + + // register func in external lib + luaRegisterLibFunc(lua); + + { + char *errh_func = "local dbg = debug\n" + "function __taos__err__handler(err)\n" + " local i = dbg.getinfo(2,'nSl')\n" + " if i and i.what == 'C' then\n" + " i = dbg.getinfo(3,'nSl')\n" + " end\n" + " if i then\n" + " return i.source .. ':' .. i.currentline .. ': ' .. err\n" + " else\n" + " return err\n" + " end\n" + "end\n"; + luaL_loadbuffer(lua,errh_func,strlen(errh_func),"@err_handler_def"); + lua_pcall(lua,0,0,0); + } + + return lua; +} + +void destroyLuaEnv(lua_State *lua) { + lua_close(lua); +} + +int32_t scriptEnvPoolInit() { + const int size = 10; // configure or not + pool = malloc(sizeof(ScriptEnvPool)); + pthread_mutex_init(&pool->mutex, NULL); + + pool->scriptEnvs = tdListNew(sizeof(ScriptEnv *)); + for (int i = 0; i < size; i++) { + ScriptEnv *env = malloc(sizeof(ScriptEnv)); + env->funcId = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);; + env->lua_state = createLuaEnv(); + tdListAppend(pool->scriptEnvs, (void *)(&env)); + } + pool->mSize = size; + pool->cSize = size; + return 0; +} + +void scriptEnvPoolCleanup() { + if (pool == NULL) return; + + SListNode *pNode = NULL; + while ((pNode = tdListPopHead(pool->scriptEnvs)) != NULL) { + ScriptEnv *pEnv = NULL; + tdListNodeGetData(pool->scriptEnvs, pNode, (void *)(&pEnv)); + destroyScriptEnv(pEnv); + listNodeFree(pNode); + } + tdListFree(pool->scriptEnvs); + pthread_mutex_destroy(&pool->mutex); + free(pool); +} + +void destroyScriptEnv(ScriptEnv *pEnv) { + destroyLuaEnv(pEnv->lua_state); + taosHashCleanup(pEnv->funcId); + free(pEnv); +} + +ScriptEnv* getScriptEnvFromPool() { + ScriptEnv *pEnv = NULL; + + pthread_mutex_lock(&pool->mutex); + if (pool->cSize <= 0) { + pthread_mutex_unlock(&pool->mutex); + return NULL; + } + SListNode *pNode = tdListPopHead(pool->scriptEnvs); + tdListNodeGetData(pool->scriptEnvs, pNode, (void *)(&pEnv)); + listNodeFree(pNode); + + pool->cSize--; + pthread_mutex_unlock(&pool->mutex); + return pEnv; +} + +void addScriptEnvToPool(ScriptEnv *pEnv) { + if (pEnv == NULL) { + return; + } + pthread_mutex_lock(&pool->mutex); + tdListAppend(pool->scriptEnvs, (void *)(&pEnv)); + pool->cSize++; + pthread_mutex_unlock(&pool->mutex); +} + +bool hasBaseFuncDefinedInScript(lua_State *lua, const char *funcPrefix, int32_t len) { + bool ret = true; + char funcName[MAX_FUNC_NAME]; + memcpy(funcName, funcPrefix, len); + + const char *base[] = {"_init", "_add"}; + for (int i = 0; (i < sizeof(base)/sizeof(base[0])) && (ret == true); i++) { + memcpy(funcName + len, base[i], strlen(base[i])); + funcName[len + strlen(base[i])] = 0; + + lua_getglobal(lua, funcName); + ret = lua_isfunction(lua, -1); // exsit function or not + lua_pop(lua, 1); + } + return ret; +} +bool isValidScript(const char *script) { + ScriptEnv *pEnv = getScriptEnvFromPool(); // + if (pEnv == NULL) { + return false; + } + lua_State *lua = pEnv->lua_state; + if (luaL_dostring(lua, script)) { + lua_pop(lua, 1); + addScriptEnvToPool(pEnv); + return false; + } + lua_getglobal(lua, USER_FUNC_NAME); + char *name = lua_tostring(lua, -1); + if (name == NULL || strlen(name) >= USER_FUNC_NAME_LIMIT) { + lua_pop(lua, 1); + addScriptEnvToPool(pEnv); + return false; + } + bool ret = hasBaseFuncDefinedInScript(lua, name, strlen(name)); + lua_pop(lua, 1); // pop + addScriptEnvToPool(pEnv); + return ret; +} + -- GitLab