提交 8155c9fb 编写于 作者: Y yihaoDeng

[TD-3980] support lua script

上级 148a73fe
...@@ -60,8 +60,9 @@ typedef struct ScriptCtx { ...@@ -60,8 +60,9 @@ typedef struct ScriptCtx {
int taosLoadScriptInit(void *pInit); int taosLoadScriptInit(void *pInit);
void taosLoadScriptNormal(void *pInit, char *pInput, int16_t iType, int16_t iBytes, int32_t numOfRows, void taosLoadScriptNormal(void *pInit, char *pInput, int16_t iType, int16_t iBytes, int32_t numOfRows,
int64_t *ptsList, char* pOutput, char *ptsOutput, int32_t *numOfOutput, int16_t oType, int16_t oBytes); int64_t *ptsList, int64_t key, char* pOutput, char *ptsOutput, int32_t *numOfOutput, int16_t oType, int16_t oBytes);
void taosLoadScriptFinalize(void *pInit, char *pOutput, int32_t *output); void taosLoadScriptFinalize(void *pInit, int64_t key, char *pOutput, int32_t *output);
void taosLoadScriptMerge(void *pCtx, char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput);
void taosLoadScriptDestroy(void *pInit); void taosLoadScriptDestroy(void *pInit);
typedef struct { typedef struct {
......
...@@ -57,8 +57,8 @@ typedef struct SUdfInfo { ...@@ -57,8 +57,8 @@ typedef struct SUdfInfo {
typedef int32_t (*scriptInitFunc)(void *pCtx); typedef int32_t (*scriptInitFunc)(void *pCtx);
typedef void (*scriptNormalFunc)(void *pCtx, char* data, int16_t iType, int16_t iBytes, int32_t numOfRows, typedef void (*scriptNormalFunc)(void *pCtx, char* data, int16_t iType, int16_t iBytes, int32_t numOfRows,
int64_t* ts, char* dataOutput, char* tsOutput, int32_t* numOfOutput, int16_t oType, int16_t oBytes); int64_t* ptList, int64_t key, char* dataOutput, char* tsOutput, int32_t* numOfOutput, int16_t oType, int16_t oBytes);
typedef void (*scriptFinalizeFunc)(void *pCtx, char* dataOutput, int32_t* numOfOutput); typedef void (*scriptFinalizeFunc)(void *pCtx, int64_t key, char* dataOutput, int32_t* numOfOutput);
typedef void (*scriptMergeFunc)(void *pCtx, char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput); typedef void (*scriptMergeFunc)(void *pCtx, char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput);
typedef void (*scriptDestroyFunc)(void* pCtx); typedef void (*scriptDestroyFunc)(void* pCtx);
......
...@@ -715,7 +715,7 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t ...@@ -715,7 +715,7 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t
case TSDB_UDF_FUNC_NORMAL: case TSDB_UDF_FUNC_NORMAL:
if (pUdfInfo->isScript) { if (pUdfInfo->isScript) {
(*(scriptNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])(pUdfInfo->pScriptCtx, (*(scriptNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])(pUdfInfo->pScriptCtx,
(char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->pOutput, (char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->startTs, pCtx->pOutput,
(char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes); (char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes);
} else { } else {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
...@@ -739,7 +739,11 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t ...@@ -739,7 +739,11 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t
break; break;
case TSDB_UDF_FUNC_MERGE: case TSDB_UDF_FUNC_MERGE:
if (pUdfInfo->isScript) {
(*(scriptMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pUdfInfo->pScriptCtx, pCtx->pInput, pCtx->size, pCtx->pOutput, &output);
} else {
(*(udfMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pCtx->pInput, pCtx->size, pCtx->pOutput, &output, &pUdfInfo->init); (*(udfMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pCtx->pInput, pCtx->size, pCtx->pOutput, &output, &pUdfInfo->init);
}
// set the output value exist // set the output value exist
pCtx->resultInfo->numOfRes = output; pCtx->resultInfo->numOfRes = output;
...@@ -753,7 +757,7 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t ...@@ -753,7 +757,7 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo); void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo);
if (pUdfInfo->isScript) { if (pUdfInfo->isScript) {
(*(scriptFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pUdfInfo->pScriptCtx, pCtx->pOutput, &output); (*(scriptFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pUdfInfo->pScriptCtx, pCtx->startTs, pCtx->pOutput, &output);
} else { } else {
(*(udfFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx->pOutput, interBuf, &output, &pUdfInfo->init); (*(udfFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx->pOutput, interBuf, &output, &pUdfInfo->init);
} }
...@@ -3279,6 +3283,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult ...@@ -3279,6 +3283,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
setResultOutputBuf(pRuntimeEnv, buf, pCtx, numOfOutput, rowCellInfoOffset); setResultOutputBuf(pRuntimeEnv, buf, pCtx, numOfOutput, rowCellInfoOffset);
for (int32_t j = 0; j < numOfOutput; ++j) { for (int32_t j = 0; j < numOfOutput; ++j) {
pCtx[j].startTs = buf->win.skey;
if (pCtx[j].functionId < 0) { if (pCtx[j].functionId < 0) {
doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
} else { } else {
...@@ -6676,6 +6681,7 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) { ...@@ -6676,6 +6681,7 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadScriptFinalize; pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadScriptFinalize;
pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE] = taosLoadScriptMerge;
} }
pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY] = taosLoadScriptDestroy; pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY] = taosLoadScriptDestroy;
......
...@@ -103,7 +103,7 @@ int taosLoadScriptInit(void* pInit) { ...@@ -103,7 +103,7 @@ int taosLoadScriptInit(void* pInit) {
return 0; return 0;
} }
void taosLoadScriptNormal(void *pInit, char *pInput, int16_t iType, int16_t iBytes, int32_t numOfRows, void taosLoadScriptNormal(void *pInit, char *pInput, int16_t iType, int16_t iBytes, int32_t numOfRows,
int64_t *ptsList, char* pOutput, char *ptsOutput, int32_t *numOfOutput, int16_t oType, int16_t oBytes) { int64_t *ptsList, int64_t key, char* pOutput, char *ptsOutput, int32_t *numOfOutput, int16_t oType, int16_t oBytes) {
ScriptCtx* pCtx = pInit; ScriptCtx* pCtx = pInit;
char funcName[MAX_FUNC_NAME] = {0}; char funcName[MAX_FUNC_NAME] = {0};
sprintf(funcName, "%s_add", pCtx->funcName); sprintf(funcName, "%s_add", pCtx->funcName);
...@@ -124,8 +124,9 @@ void taosLoadScriptNormal(void *pInit, char *pInput, int16_t iType, int16_t iByt ...@@ -124,8 +124,9 @@ void taosLoadScriptNormal(void *pInit, char *pInput, int16_t iType, int16_t iByt
if (lua_istable(lua, -1)) { if (lua_istable(lua, -1)) {
isGlobalState = true; isGlobalState = true;
} }
lua_pushnumber(lua, key);
// do call lua script // do call lua script
if (lua_pcall(lua, 2, 1, 0) != 0) { if (lua_pcall(lua, 3, 1, 0) != 0) {
qError("SCRIPT ERROR: %s", lua_tostring(lua, -1)); qError("SCRIPT ERROR: %s", lua_tostring(lua, -1));
lua_pop(lua, -1); lua_pop(lua, -1);
return; return;
...@@ -139,22 +140,51 @@ void taosLoadScriptNormal(void *pInit, char *pInput, int16_t iType, int16_t iByt ...@@ -139,22 +140,51 @@ void taosLoadScriptNormal(void *pInit, char *pInput, int16_t iType, int16_t iByt
*numOfOutput = tNumOfOutput; *numOfOutput = tNumOfOutput;
} }
void taosLoadScriptMerge(void *pInit, char* data, int32_t numOfRows, char* pOutput, int32_t* numOfOutput) {
ScriptCtx *pCtx = pInit;
char funcName[MAX_FUNC_NAME] = {0};
sprintf(funcName, "%s_merge", pCtx->funcName);
lua_State* lua = pCtx->pEnv->lua_state;
lua_getglobal(lua, funcName);
if (!lua_isfunction(lua, -1)) {
qError("SCRIPT ERROR: %s", lua_tostring(lua, -1));
return;
}
lua_getglobal(lua, "global");
if (lua_pcall(lua, 1, 1, 0) != 0) {
qError("SCRIPT ERROR: %s", lua_tostring(lua, -1));
lua_pop(lua, -1);
return;
}
int tNumOfOutput = 0;
luaValueToTaosType(lua, pOutput, &tNumOfOutput, pCtx->resType, pCtx->resBytes);
*numOfOutput = tNumOfOutput;
}
//do not support agg now //do not support agg now
void taosLoadScriptFinalize(void *pInit, char *pOutput, int32_t* numOfOutput) { void taosLoadScriptFinalize(void *pInit,int64_t key, char *pOutput, int32_t* numOfOutput) {
ScriptCtx *pCtx = pInit; ScriptCtx *pCtx = pInit;
char funcName[MAX_FUNC_NAME] = {0}; char funcName[MAX_FUNC_NAME] = {0};
sprintf(funcName, "%s_finalize", pCtx->funcName); sprintf(funcName, "%s_finalize", pCtx->funcName);
lua_State* lua = pCtx->pEnv->lua_state; lua_State* lua = pCtx->pEnv->lua_state;
lua_getglobal(lua, funcName); lua_getglobal(lua, funcName);
if (!lua_isfunction(lua, -1)) {
qError("SCRIPT ERROR: %s", lua_tostring(lua, -1));
return;
}
lua_getglobal(lua, "global"); lua_getglobal(lua, "global");
if (lua_pcall(lua, 1, 1, 0) != 0) { lua_pushnumber(lua, key);
if (lua_pcall(lua, 2, 2, 0) != 0) {
qError("SCRIPT ERROR: %s", lua_tostring(lua, -1)); qError("SCRIPT ERROR: %s", lua_tostring(lua, -1));
lua_pop(lua, -1); lua_pop(lua, -1);
return; return;
} }
lua_setglobal(lua, "global");
int tNumOfOutput = 0; int tNumOfOutput = 0;
luaValueToTaosType(lua, pOutput, &tNumOfOutput, pCtx->resType, pCtx->resBytes); luaValueToTaosType(lua, pOutput, &tNumOfOutput, pCtx->resType, pCtx->resBytes);
*numOfOutput = tNumOfOutput; *numOfOutput = tNumOfOutput;
...@@ -256,6 +286,8 @@ void luaValueToTaosType(lua_State *lua, char *interBuf, int32_t *numOfOutput, in ...@@ -256,6 +286,8 @@ void luaValueToTaosType(lua_State *lua, char *interBuf, int32_t *numOfOutput, in
} }
break; break;
default: default:
setNull(interBuf, oType, oBytes);
sz = 1;
break; break;
} }
lua_pop(lua,1); // pop ret value from script lua_pop(lua,1); // pop ret value from script
......
funcName = "test" funcName = "test"
global = {} global = {}
global["sum"] = 0.0
global["num"] = 0
function test_init() function test_init()
return global return global
end end
function test_add(rows, ans) function test_add(rows, ans, key)
t = {}
t["sum"] = 0.0
t["num"] = 0
for i=1, #rows do for i=1, #rows do
ans["sum"] = ans["sum"] + rows[i] * rows[i] t["sum"] = t["sum"] + rows[i] * rows[i]
end end
ans["num"] = ans["num"] + #rows t["num"] = #rows
if (ans[key] ~= nil)
then
ans[key]["sum"] = ans[key]["sum"] + t["sum"]
ans[key]["num"] = ans[key]["num"] + t["num"]
else
ans[key] = t
end
return ans; return ans;
end end
function test_finalize(ans) function test_finalize(ans, key)
return ans["sum"]/ans["num"]; local ret = 0.0
if (ans[key] ~= nil)
then
ret = ans[key]["sum"]/ans[key]["num"]
ans[key]["sum"] = 0.0
ans[key]["num"] = 0
else
ret = inf
end
return ret, ans
end end
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册