diff --git a/src/query/inc/qScript.h b/src/query/inc/qScript.h index d43413e2e528fdd6567dee14d607efcc4eb93814..574bb51368afeaeddef5fbd5c5bd8469fbe0cdef 100644 --- a/src/query/inc/qScript.h +++ b/src/query/inc/qScript.h @@ -60,8 +60,9 @@ typedef struct ScriptCtx { int taosLoadScriptInit(void *pInit); void taosLoadScriptNormal(void *pInit, char *pInput, int16_t iType, int16_t iBytes, int32_t numOfRows, - int64_t *ptsList, char* pOutput, char *ptsOutput, int32_t *numOfOutput, int16_t oType, int16_t oBytes); -void taosLoadScriptFinalize(void *pInit, char *pOutput, int32_t *output); + int64_t *ptsList, int64_t key, char* pOutput, char *ptsOutput, int32_t *numOfOutput, int16_t oType, int16_t oBytes); +void taosLoadScriptFinalize(void *pInit, int64_t key, char *pOutput, int32_t *output); +void taosLoadScriptMerge(void *pCtx, char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput); void taosLoadScriptDestroy(void *pInit); typedef struct { diff --git a/src/query/inc/qUdf.h b/src/query/inc/qUdf.h index 26573cb743f1897da21928ddbd80b0e151b1b122..1083b1e698f7591aae4586c7722e5343cd9c4d86 100644 --- a/src/query/inc/qUdf.h +++ b/src/query/inc/qUdf.h @@ -57,8 +57,8 @@ typedef struct SUdfInfo { typedef int32_t (*scriptInitFunc)(void *pCtx); typedef void (*scriptNormalFunc)(void *pCtx, char* data, int16_t iType, int16_t iBytes, int32_t numOfRows, - int64_t* ts, char* dataOutput, char* tsOutput, int32_t* numOfOutput, int16_t oType, int16_t oBytes); -typedef void (*scriptFinalizeFunc)(void *pCtx, char* dataOutput, int32_t* numOfOutput); + int64_t* ptList, int64_t key, char* dataOutput, char* tsOutput, int32_t* numOfOutput, int16_t oType, int16_t oBytes); +typedef void (*scriptFinalizeFunc)(void *pCtx, int64_t key, char* dataOutput, int32_t* numOfOutput); typedef void (*scriptMergeFunc)(void *pCtx, char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput); typedef void (*scriptDestroyFunc)(void* pCtx); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 3e405c6c924a062a8cd0c5571aabaa4ee4befca1..64f55a1d9989ca51801387ac746f66494b9af879 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -715,7 +715,7 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t case TSDB_UDF_FUNC_NORMAL: if (pUdfInfo->isScript) { (*(scriptNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])(pUdfInfo->pScriptCtx, - (char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->pOutput, + (char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->startTs, pCtx->pOutput, (char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes); } else { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); @@ -739,7 +739,11 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t break; case TSDB_UDF_FUNC_MERGE: - (*(udfMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pCtx->pInput, pCtx->size, pCtx->pOutput, &output, &pUdfInfo->init); + if (pUdfInfo->isScript) { + (*(scriptMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pUdfInfo->pScriptCtx, pCtx->pInput, pCtx->size, pCtx->pOutput, &output); + } else { + (*(udfMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pCtx->pInput, pCtx->size, pCtx->pOutput, &output, &pUdfInfo->init); + } // set the output value exist pCtx->resultInfo->numOfRes = output; @@ -753,7 +757,7 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo); if (pUdfInfo->isScript) { - (*(scriptFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pUdfInfo->pScriptCtx, pCtx->pOutput, &output); + (*(scriptFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pUdfInfo->pScriptCtx, pCtx->startTs, pCtx->pOutput, &output); } else { (*(udfFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx->pOutput, interBuf, &output, &pUdfInfo->init); } @@ -3279,6 +3283,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult setResultOutputBuf(pRuntimeEnv, buf, pCtx, numOfOutput, rowCellInfoOffset); for (int32_t j = 0; j < numOfOutput; ++j) { + pCtx[j].startTs = buf->win.skey; if (pCtx[j].functionId < 0) { doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); } else { @@ -6676,6 +6681,7 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) { if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadScriptFinalize; + pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE] = taosLoadScriptMerge; } pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY] = taosLoadScriptDestroy; diff --git a/src/query/src/qScript.c b/src/query/src/qScript.c index 3319428930b9bd5c252892e40a7562688b2bf2c0..d106c5f444c5cdd8e3a63b0c7855cb64f8d38150 100644 --- a/src/query/src/qScript.c +++ b/src/query/src/qScript.c @@ -103,7 +103,7 @@ int taosLoadScriptInit(void* pInit) { return 0; } void taosLoadScriptNormal(void *pInit, char *pInput, int16_t iType, int16_t iBytes, int32_t numOfRows, - int64_t *ptsList, char* pOutput, char *ptsOutput, int32_t *numOfOutput, int16_t oType, int16_t oBytes) { + int64_t *ptsList, int64_t key, char* pOutput, char *ptsOutput, int32_t *numOfOutput, int16_t oType, int16_t oBytes) { ScriptCtx* pCtx = pInit; char funcName[MAX_FUNC_NAME] = {0}; sprintf(funcName, "%s_add", pCtx->funcName); @@ -124,8 +124,9 @@ void taosLoadScriptNormal(void *pInit, char *pInput, int16_t iType, int16_t iByt if (lua_istable(lua, -1)) { isGlobalState = true; } + lua_pushnumber(lua, key); // do call lua script - if (lua_pcall(lua, 2, 1, 0) != 0) { + if (lua_pcall(lua, 3, 1, 0) != 0) { qError("SCRIPT ERROR: %s", lua_tostring(lua, -1)); lua_pop(lua, -1); return; @@ -139,22 +140,51 @@ void taosLoadScriptNormal(void *pInit, char *pInput, int16_t iType, int16_t iByt *numOfOutput = tNumOfOutput; } +void taosLoadScriptMerge(void *pInit, char* data, int32_t numOfRows, char* pOutput, int32_t* numOfOutput) { + ScriptCtx *pCtx = pInit; + char funcName[MAX_FUNC_NAME] = {0}; + sprintf(funcName, "%s_merge", pCtx->funcName); + + lua_State* lua = pCtx->pEnv->lua_state; + lua_getglobal(lua, funcName); + if (!lua_isfunction(lua, -1)) { + qError("SCRIPT ERROR: %s", lua_tostring(lua, -1)); + return; + } + + lua_getglobal(lua, "global"); + if (lua_pcall(lua, 1, 1, 0) != 0) { + qError("SCRIPT ERROR: %s", lua_tostring(lua, -1)); + lua_pop(lua, -1); + return; + } + int tNumOfOutput = 0; + luaValueToTaosType(lua, pOutput, &tNumOfOutput, pCtx->resType, pCtx->resBytes); + *numOfOutput = tNumOfOutput; +} + //do not support agg now -void taosLoadScriptFinalize(void *pInit, char *pOutput, int32_t* numOfOutput) { +void taosLoadScriptFinalize(void *pInit,int64_t key, char *pOutput, int32_t* numOfOutput) { ScriptCtx *pCtx = pInit; char funcName[MAX_FUNC_NAME] = {0}; sprintf(funcName, "%s_finalize", pCtx->funcName); lua_State* lua = pCtx->pEnv->lua_state; lua_getglobal(lua, funcName); + if (!lua_isfunction(lua, -1)) { + qError("SCRIPT ERROR: %s", lua_tostring(lua, -1)); + return; + } lua_getglobal(lua, "global"); - if (lua_pcall(lua, 1, 1, 0) != 0) { + lua_pushnumber(lua, key); + if (lua_pcall(lua, 2, 2, 0) != 0) { qError("SCRIPT ERROR: %s", lua_tostring(lua, -1)); lua_pop(lua, -1); return; } + lua_setglobal(lua, "global"); int tNumOfOutput = 0; luaValueToTaosType(lua, pOutput, &tNumOfOutput, pCtx->resType, pCtx->resBytes); *numOfOutput = tNumOfOutput; @@ -256,6 +286,8 @@ void luaValueToTaosType(lua_State *lua, char *interBuf, int32_t *numOfOutput, in } break; default: + setNull(interBuf, oType, oBytes); + sz = 1; break; } lua_pop(lua,1); // pop ret value from script diff --git a/tests/script/sh/demo.lua b/tests/script/sh/demo.lua index a79015bb87712b71b00eb48e02f4b7e25e07ae32..2bb660b6b3d9b256921aa757c8565dde4526a9dd 100644 --- a/tests/script/sh/demo.lua +++ b/tests/script/sh/demo.lua @@ -1,21 +1,43 @@ funcName = "test" global = {} -global["sum"] = 0.0 -global["num"] = 0 function test_init() return global end -function test_add(rows, ans) +function test_add(rows, ans, key) + t = {} + t["sum"] = 0.0 + t["num"] = 0 for i=1, #rows do - ans["sum"] = ans["sum"] + rows[i] * rows[i] + t["sum"] = t["sum"] + rows[i] * rows[i] end - ans["num"] = ans["num"] + #rows + t["num"] = #rows + + + if (ans[key] ~= nil) + then + ans[key]["sum"] = ans[key]["sum"] + t["sum"] + ans[key]["num"] = ans[key]["num"] + t["num"] + else + ans[key] = t + end + return ans; end -function test_finalize(ans) - return ans["sum"]/ans["num"]; +function test_finalize(ans, key) + local ret = 0.0 + + if (ans[key] ~= nil) + then + ret = ans[key]["sum"]/ans[key]["num"] + ans[key]["sum"] = 0.0 + ans[key]["num"] = 0 + else + ret = inf + end + + return ret, ans end