diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 8f5cd070dca186651d552ce2cd274856a703e119..90baabce2c146c2cf00b9fa89bddf3bfc1844181 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -899,9 +899,11 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) { typedef struct SUdfAggRes { int8_t finalResNum; int8_t interResNum; + int32_t interResBufLen; char *finalResBuf; char *interResBuf; } SUdfAggRes; + void onUdfcPipeClose(uv_handle_t *handle); int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask); void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf); @@ -1096,9 +1098,10 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult releaseUdfFuncHandle(pCtx->udfName); return false; } - udfRes->interResNum = buf.numOfResult; if (buf.bufLen <= session->bufSize) { memcpy(udfRes->interResBuf, buf.buf, buf.bufLen); + udfRes->interResBufLen = buf.bufLen; + udfRes->interResNum = buf.numOfResult; } else { fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize); releaseUdfFuncHandle(pCtx->udfName); @@ -1136,7 +1139,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { SSDataBlock *inputBlock = blockDataExtractBlock(pTempBlock, start, numOfRows); - SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, .numOfResult = udfRes->interResNum}; + SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum}; SUdfInterBuf newState = {0}; udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState); @@ -1144,17 +1147,17 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { fnError("udfAggProcess error. code: %d", udfCode); newState.numOfResult = 0; } else { - udfRes->interResNum = newState.numOfResult; if (newState.bufLen <= session->bufSize) { memcpy(udfRes->interResBuf, newState.buf, newState.bufLen); + udfRes->interResBufLen = newState.bufLen; + udfRes->interResNum = newState.numOfResult; } else { fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize); udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE; } } - if (newState.numOfResult == 1 || state.numOfResult == 1) { - GET_RES_INFO(pCtx)->numOfRes = 1; - } + + GET_RES_INFO(pCtx)->numOfRes = udfRes->interResNum; blockDataDestroy(inputBlock); @@ -1180,7 +1183,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) { udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen; SUdfInterBuf resultBuf = {0}; - SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, .numOfResult = udfRes->interResNum}; + SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum}; int32_t udfCallCode = 0; udfCallCode = doCallUdfAggFinalize(session, &state, &resultBuf); if (udfCallCode != 0) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 366e890bd7790d86802cc5f01ced6f993476eea4..8db9b386df59b3a1d6ed1d77774a55018680f34e 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -173,6 +173,7 @@ int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, voi // for others, dlopen/dlsym to find function pointers typedef struct SUdfScriptPlugin { int8_t scriptType; + const char* scriptSuffix; char libPath[PATH_MAX]; bool libLoaded; @@ -313,6 +314,7 @@ static void udfdConnectMnodeThreadFunc(void *args); void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; + plugin->scriptSuffix = '.so'; plugin->openFunc = udfdCPluginOpen; plugin->closeFunc = udfdCPluginClose; plugin->udfInitFunc = udfdCPluginUdfInit; @@ -346,6 +348,7 @@ int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON; + plugin->scriptSuffix = "py"; //todo: windows support sprintf(plugin->libPath, "%s", "libtaospyudf.so"); plugin->libLoaded = false; @@ -776,7 +779,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { udf->outputType = pFuncInfo->outputType; udf->outputLen = pFuncInfo->outputLen; udf->bufSize = pFuncInfo->bufSize; - + char* suffix = global.scriptPlugins[udf->scriptType]->scriptSuffix; if (!osTempSpaceAvailable()) { terrno = TSDB_CODE_NO_AVAIL_DISK; msgInfo->code = terrno; @@ -786,9 +789,9 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { char path[PATH_MAX] = {0}; #ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s", tsTempDir, pFuncInfo->name); + snprintf(path, sizeof(path), "%s%s.%s", tsTempDir, pFuncInfo->name, suffix); #else - snprintf(path, sizeof(path), "%s/%s", tsTempDir, pFuncInfo->name); + snprintf(path, sizeof(path), "%s/%s.%s", tsTempDir, pFuncInfo->name, suffix); #endif TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); if (file == NULL) {