/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "os.h" #include "qScript.h" #include "ttype.h" #include "tstrbuild.h" #include "queryLog.h" #include "ttokendef.h" static ScriptEnvPool *pool = NULL; static ScriptEnv* getScriptEnvFromPool(); static void addScriptEnvToPool(ScriptEnv *pEnv); static lua_State* createLuaEnv(); static void destroyLuaEnv(lua_State *state); static void destroyScriptEnv(ScriptEnv *pEnv); static void luaValueToTaosType(lua_State *lua, char *interBuf, int32_t *numOfOutput, int16_t oType, int16_t oBytes); static void taosValueToLuaType(lua_State *lua, int32_t type, char *val); static bool hasBaseFuncDefinedInScript(lua_State *lua, const char *funcPrefix, int32_t len); static int userlib_exampleFunc(lua_State *lua) { double op1 = luaL_checknumber(lua,1); double op2 = luaL_checknumber(lua,2); lua_pushnumber(lua, op1 * op2); return 1; } void luaRegisterLibFunc(lua_State *lua) { lua_register(lua, "exampleFunc", userlib_exampleFunc); } void luaLoadLib(lua_State *lua, const char *libname, lua_CFunction luafunc) { lua_pushcfunction(lua, luafunc); lua_pushstring(lua, libname); lua_call(lua, 1, 0); } LUALIB_API int (luaopen_cjson) (lua_State *L); LUALIB_API int (luaopen_struct) (lua_State *L); LUALIB_API int (luaopen_cmsgpack) (lua_State *L); LUALIB_API int (luaopen_bit) (lua_State *L); static void luaLoadLibraries(lua_State *lua) { luaLoadLib(lua, "", luaopen_base); luaLoadLib(lua, LUA_TABLIBNAME, luaopen_table); luaLoadLib(lua, LUA_STRLIBNAME, luaopen_string); luaLoadLib(lua, LUA_MATHLIBNAME, luaopen_math); luaLoadLib(lua, LUA_DBLIBNAME, luaopen_debug); } static void luaRemoveUnsupportedFunctions(lua_State *lua) { lua_pushnil(lua); lua_setglobal(lua,"loadfile"); lua_pushnil(lua); lua_setglobal(lua,"dofile"); } void taosValueToLuaType(lua_State *lua, int32_t type, char *val) { //TODO(dengyihao): handle more data type if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) { int64_t v; GET_TYPED_DATA(v, int64_t, type, val); lua_pushnumber(lua, (lua_Number)v); } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { uint64_t v; GET_TYPED_DATA(v, uint64_t, type, val); lua_pushnumber(lua,(lua_Number)v); } else if (IS_FLOAT_TYPE(type)) { double v; GET_TYPED_DATA(v, double, type, val); lua_pushnumber(lua,v); } else if (type == TSDB_DATA_TYPE_BINARY) { lua_pushlstring(lua, val, varDataLen(val)); } else if (type == TSDB_DATA_TYPE_NCHAR) { } } int taosLoadScriptInit(void* pInit) { ScriptCtx *pCtx = pInit; char funcName[MAX_FUNC_NAME] = {0}; sprintf(funcName, "%s_init", pCtx->funcName); lua_State* lua = pCtx->pEnv->lua_state; lua_getglobal(lua, funcName); if (lua_pcall(lua, 0, -1, 0)) { lua_pop(lua, -1); } lua_setglobal(lua, "global"); return 0; } void taosLoadScriptNormal(void *pInit, char *pInput, int16_t iType, int16_t iBytes, int32_t numOfRows, int64_t *ptsList, int64_t key, char* pOutput, char *ptsOutput, int32_t *numOfOutput, int16_t oType, int16_t oBytes) { ScriptCtx* pCtx = pInit; char funcName[MAX_FUNC_NAME] = {0}; sprintf(funcName, "%s_add", pCtx->funcName); lua_State* lua = pCtx->pEnv->lua_state; lua_getglobal(lua, funcName); // first param of script; lua_newtable(lua); int32_t offset = 0; for (int32_t i = 0; i < numOfRows; i++) { taosValueToLuaType(lua, iType, pInput + offset); lua_rawseti(lua, -2, i+1); offset += iBytes; } int isGlobalState = false; lua_getglobal(lua, "global"); if (lua_istable(lua, -1)) { isGlobalState = true; } lua_pushnumber(lua, key); // do call lua script if (lua_pcall(lua, 3, 1, 0) != 0) { qError("SCRIPT ERROR: %s", lua_tostring(lua, -1)); lua_pop(lua, -1); return; } int tNumOfOutput = 0; if (isGlobalState == false) { luaValueToTaosType(lua, pOutput, &tNumOfOutput, oType, oBytes); } else { lua_setglobal(lua, "global"); } *numOfOutput = tNumOfOutput; } void taosLoadScriptMerge(void *pInit, char* data, int32_t numOfRows, char* pOutput, int32_t* numOfOutput) { ScriptCtx *pCtx = pInit; char funcName[MAX_FUNC_NAME] = {0}; sprintf(funcName, "%s_merge", pCtx->funcName); lua_State* lua = pCtx->pEnv->lua_state; lua_getglobal(lua, funcName); if (!lua_isfunction(lua, -1)) { qError("SCRIPT ERROR: %s", lua_tostring(lua, -1)); return; } lua_getglobal(lua, "global"); if (lua_pcall(lua, 1, 1, 0) != 0) { qError("SCRIPT ERROR: %s", lua_tostring(lua, -1)); lua_pop(lua, -1); return; } int tNumOfOutput = 0; luaValueToTaosType(lua, pOutput, &tNumOfOutput, pCtx->resType, pCtx->resBytes); *numOfOutput = tNumOfOutput; } //do not support agg now void taosLoadScriptFinalize(void *pInit,int64_t key, char *pOutput, int32_t* numOfOutput) { ScriptCtx *pCtx = pInit; char funcName[MAX_FUNC_NAME] = {0}; sprintf(funcName, "%s_finalize", pCtx->funcName); lua_State* lua = pCtx->pEnv->lua_state; lua_getglobal(lua, funcName); if (!lua_isfunction(lua, -1)) { qError("SCRIPT ERROR: %s", lua_tostring(lua, -1)); return; } lua_getglobal(lua, "global"); lua_pushnumber(lua, key); if (lua_pcall(lua, 2, 2, 0) != 0) { qError("SCRIPT ERROR: %s", lua_tostring(lua, -1)); lua_pop(lua, -1); return; } lua_setglobal(lua, "global"); int tNumOfOutput = 0; luaValueToTaosType(lua, pOutput, &tNumOfOutput, pCtx->resType, pCtx->resBytes); *numOfOutput = tNumOfOutput; } void taosLoadScriptDestroy(void *pInit) { destroyScriptCtx(pInit); } ScriptCtx* createScriptCtx(char *script, int8_t resType, int16_t resBytes) { ScriptCtx *pCtx = (ScriptCtx *)calloc(1, sizeof(ScriptCtx)); pCtx->state = SCRIPT_STATE_INIT; pCtx->pEnv = getScriptEnvFromPool(); // pCtx->resType = resType; pCtx->resBytes = resBytes; if (pCtx->pEnv == NULL) { destroyScriptCtx(pCtx); return NULL; } lua_State *lua = pCtx->pEnv->lua_state; if (luaL_dostring(lua, script)) { lua_pop(lua, 1); qError("dynamic load script failed"); destroyScriptCtx(pCtx); return NULL; } lua_getglobal(lua, USER_FUNC_NAME); const char *name = lua_tostring(lua, -1); if (name == NULL) { lua_pop(lua, 1); qError("SCRIPT ERROR: invalid script"); destroyScriptCtx(pCtx); return NULL; } memcpy(pCtx->funcName, name, strlen(name)); lua_pop(lua, 1); return pCtx; } void destroyScriptCtx(void *pCtx) { if (pCtx == NULL) return; addScriptEnvToPool(((ScriptCtx *)pCtx)->pEnv); free(pCtx); } void luaValueToTaosType(lua_State *lua, char *interBuf, int32_t *numOfOutput, int16_t oType, int16_t oBytes) { int t = lua_type(lua,-1); int32_t sz = 0; switch (t) { case LUA_TSTRING: //TODO(yihaodeng): handle str type { const char *v = lua_tostring(lua, -1); memcpy(interBuf, v, strlen(v)); sz = 1; } break; case LUA_TBOOLEAN: { double v = lua_tonumber(lua, -1); memcpy(interBuf, (char *)&v, oBytes); sz = 1; } break; case LUA_TNUMBER: { if (oType == TSDB_DATA_TYPE_FLOAT) { float v = lua_tonumber(lua, -1); memcpy(interBuf, (char *)&v, oBytes); sz = 1; } else if (oType == TSDB_DATA_TYPE_DOUBLE) { double v = lua_tonumber(lua, -1); memcpy(interBuf, (char *)&v, oBytes); sz = 1; } else if (oType == TSDB_DATA_TYPE_BIGINT) { int64_t v = lua_tonumber(lua, -1); memcpy(interBuf, (char *)&v, oBytes); sz = 1; } else if (oType <= TSDB_DATA_TYPE_INT) { int32_t v = lua_tonumber(lua, -1); memcpy(interBuf, (char *)&v, oBytes); sz = 1; } } break; case LUA_TTABLE: { lua_pushnil(lua); int32_t offset = 0; while(lua_next(lua, -2)) { int32_t v = lua_tonumber(lua, -1); memcpy(interBuf + offset, (char *)&v, oBytes); offset += oBytes; lua_pop(lua, 1); sz += 1; } } break; default: setNull(interBuf, oType, oBytes); sz = 1; break; } lua_pop(lua,1); // pop ret value from script *numOfOutput = sz; } /* *Initialize the scripting environment. */ lua_State* createLuaEnv() { lua_State *lua = lua_open(); luaLoadLibraries(lua); luaRemoveUnsupportedFunctions(lua); // register func in external lib luaRegisterLibFunc(lua); { char *errh_func = "local dbg = debug\n" "function __taos__err__handler(err)\n" " local i = dbg.getinfo(2,'nSl')\n" " if i and i.what == 'C' then\n" " i = dbg.getinfo(3,'nSl')\n" " end\n" " if i then\n" " return i.source .. ':' .. i.currentline .. ': ' .. err\n" " else\n" " return err\n" " end\n" "end\n"; luaL_loadbuffer(lua,errh_func,strlen(errh_func),"@err_handler_def"); lua_pcall(lua,0,0,0); } return lua; } void destroyLuaEnv(lua_State *lua) { lua_close(lua); } int32_t scriptEnvPoolInit() { const int size = 10; // configure or not pool = malloc(sizeof(ScriptEnvPool)); pthread_mutex_init(&pool->mutex, NULL); pool->scriptEnvs = tdListNew(sizeof(ScriptEnv *)); for (int i = 0; i < size; i++) { ScriptEnv *env = malloc(sizeof(ScriptEnv)); env->funcId = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);; env->lua_state = createLuaEnv(); tdListAppend(pool->scriptEnvs, (void *)(&env)); } pool->mSize = size; pool->cSize = size; return 0; } void scriptEnvPoolCleanup() { if (pool == NULL) return; SListNode *pNode = NULL; while ((pNode = tdListPopHead(pool->scriptEnvs)) != NULL) { ScriptEnv *pEnv = NULL; tdListNodeGetData(pool->scriptEnvs, pNode, (void *)(&pEnv)); destroyScriptEnv(pEnv); listNodeFree(pNode); } tdListFree(pool->scriptEnvs); pthread_mutex_destroy(&pool->mutex); free(pool); } void destroyScriptEnv(ScriptEnv *pEnv) { destroyLuaEnv(pEnv->lua_state); taosHashCleanup(pEnv->funcId); free(pEnv); } ScriptEnv* getScriptEnvFromPool() { ScriptEnv *pEnv = NULL; pthread_mutex_lock(&pool->mutex); if (pool->cSize <= 0) { pthread_mutex_unlock(&pool->mutex); return NULL; } SListNode *pNode = tdListPopHead(pool->scriptEnvs); tdListNodeGetData(pool->scriptEnvs, pNode, (void *)(&pEnv)); listNodeFree(pNode); pool->cSize--; pthread_mutex_unlock(&pool->mutex); return pEnv; } void addScriptEnvToPool(ScriptEnv *pEnv) { if (pEnv == NULL) { return; } pthread_mutex_lock(&pool->mutex); lua_settop(pEnv->lua_state, 0); tdListAppend(pool->scriptEnvs, (void *)(&pEnv)); pool->cSize++; pthread_mutex_unlock(&pool->mutex); } bool hasBaseFuncDefinedInScript(lua_State *lua, const char *funcPrefix, int32_t len) { bool ret = true; char funcName[MAX_FUNC_NAME]; memcpy(funcName, funcPrefix, len); const char *base[] = {"_init", "_add"}; for (int i = 0; (i < sizeof(base)/sizeof(base[0])) && (ret == true); i++) { memcpy(funcName + len, base[i], strlen(base[i])); memset(funcName + len + strlen(base[i]), 0, MAX_FUNC_NAME - len - strlen(base[i])); lua_getglobal(lua, funcName); ret = lua_isfunction(lua, -1); // exsit function or not lua_pop(lua, 1); } return ret; } bool isValidScript(char *script, int32_t len) { ScriptEnv *pEnv = getScriptEnvFromPool(); // if (pEnv == NULL) { return false; } lua_State *lua = pEnv->lua_state; if (len < strlen(script)) { script[len] = 0; } if (luaL_dostring(lua, script)) { lua_pop(lua, 1); addScriptEnvToPool(pEnv); qError("error at %s and %d", script, (int)(strlen(script))); return false; } lua_getglobal(lua, USER_FUNC_NAME); const char *name = lua_tostring(lua, -1); if (name == NULL || strlen(name) >= USER_FUNC_NAME_LIMIT) { lua_pop(lua, 1); addScriptEnvToPool(pEnv); qError("error at %s name: %s, len = %d", script, name, (int)(strlen(name))); return false; } bool ret = hasBaseFuncDefinedInScript(lua, name, strlen(name)); lua_pop(lua, 1); // pop addScriptEnvToPool(pEnv); return ret; }