diff --git a/include/util/tcoding.h b/include/util/tcoding.h index 74e64d5292e9232d803762ec528554be0a9ae975..5962949a70def936f00a037967b113e9b1f66f2e 100644 --- a/include/util/tcoding.h +++ b/include/util/tcoding.h @@ -63,14 +63,14 @@ static FORCE_INLINE void *taosSkipFixedLen(const void *buf, size_t len) { return static FORCE_INLINE int32_t taosEncodeFixedBool(void **buf, bool value) { if (buf != NULL) { - ((int8_t *)(*buf))[0] = value ? 1 : 0; + ((int8_t *)(*buf))[0] = (value ? 1 : 0); *buf = POINTER_SHIFT(*buf, sizeof(int8_t)); } return (int32_t)sizeof(int8_t); } static FORCE_INLINE void *taosDecodeFixedBool(const void *buf, bool *value) { - *value = ((int8_t *)buf)[0] == 0 ? false : true; + *value = ( (((int8_t *)buf)[0] == 0) ? false : true ); return POINTER_SHIFT(buf, sizeof(int8_t)); } diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 03a3891a4ce39cef4d87656737ce64415fd83039..192e04c9a38cb69938f51116b65c764e16dcaee0 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1287,7 +1287,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { task->type = UDF_TASK_SETUP; SUdfSetupRequest *req = &task->_setup.req; - memcpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN); + strncpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN); int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT); if (errCode != 0) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index e644ea6172790279f55a0503105e41dae9ed750f..19421072f86e82913a21e0d122ec246dd4b83601 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -226,7 +226,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfDataBlock input = {0}; convertDataBlockToUdfDataBlock(&call->block, &input); code = udf->scalarProcFunc(&input, &output); - + freeUdfDataDataBlock(&input); convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); freeUdfColumn(&output); break; @@ -246,6 +246,8 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { .bufLen= udf->bufSize, .numOfResult = 0}; code = udf->aggProcFunc(&input, &call->interBuf, &outBuf); + freeUdfInterBuf(&call->interBuf); + freeUdfDataDataBlock(&input); subRsp->resultBuf = outBuf; break; @@ -255,6 +257,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { .bufLen= udf->bufSize, .numOfResult = 0}; code = udf->aggFinishFunc(&call->interBuf, &outBuf); + freeUdfInterBuf(&call->interBuf); subRsp->resultBuf = outBuf; break; } @@ -274,6 +277,30 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { encodeUdfResponse(&buf, rsp); uvUdf->output = uv_buf_init(bufBegin, len); + switch (call->callType) { + case TSDB_UDF_CALL_SCALA_PROC: { + tDeleteSSDataBlock(&call->block); + tDeleteSSDataBlock(&subRsp->resultData); + break; + } + case TSDB_UDF_CALL_AGG_INIT: { + freeUdfInterBuf(&subRsp->resultBuf); + break; + } + case TSDB_UDF_CALL_AGG_PROC: { + tDeleteSSDataBlock(&call->block); + freeUdfInterBuf(&subRsp->resultBuf); + break; + } + case TSDB_UDF_CALL_AGG_FIN: { + freeUdfInterBuf(&subRsp->resultBuf); + break; + } + default: + break; + + } + taosMemoryFree(uvUdf->input.base); return; } @@ -348,9 +375,8 @@ void udfdProcessRequest(uv_work_t *req) { void udfdOnWrite(uv_write_t *req, int status) { SUvUdfWork *work = (SUvUdfWork *)req->data; if (status < 0) { - // TODO:log error and process it. + fnError("udfd send response error, length: %zu code: %s", work->output.len, uv_err_name(status)); } - fnDebug("send response. length:%zu, status: %s", work->output.len, uv_err_name(status)); taosMemoryFree(work->output.base); taosMemoryFree(work); taosMemoryFree(req); @@ -549,6 +575,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); taosCloseFile(&file); strncpy(udf->path, path, strlen(path)); + tFreeSFuncInfo(pFuncInfo); taosArrayDestroy(retrieveRsp.pFuncInfos); msgInfo->code = 0; } @@ -800,15 +827,26 @@ static int32_t udfdUvInit() { return 0; } +static void udfdCloseWalkCb(uv_handle_t* handle, void* arg) { + if (!uv_is_closing(handle)) { + uv_close(handle, NULL); + } +} + static int32_t udfdRun() { global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); uv_mutex_init(&global.udfsMutex); - fnInfo("start the udfd"); - int code = uv_run(global.loop, UV_RUN_DEFAULT); - fnInfo("udfd stopped. result: %s, code: %d", uv_err_name(code), code); - int codeClose = uv_loop_close(global.loop); - fnDebug("uv loop close. result: %s", uv_err_name(codeClose)); + fnInfo("start udfd event loop"); + uv_run(global.loop, UV_RUN_DEFAULT); + fnInfo("udfd event loop stopped."); + + uv_loop_close(global.loop); + + uv_walk(global.loop, udfdCloseWalkCb, NULL); + uv_run(global.loop, UV_RUN_DEFAULT); + uv_loop_close(global.loop); + uv_mutex_destroy(&global.udfsMutex); taosHashCleanup(global.udfsHash); return 0;