diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index dae8c99abae609420af28b7b81e2d9b241f3b564..afacb50d3545a5def9e56f9d669b26fd0ea59f37 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -877,7 +877,7 @@ void udfcUvHandleError(SClientUvConn *conn); void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); void onUdfcPipeWrite(uv_write_t *write, int status); void onUdfcPipeConnect(uv_connect_t *connect, int status); -int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask); +int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask); int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask); int32_t udfcStartUvTask(SClientUvTaskNode *uvTask); void udfcAsyncTaskCb(uv_async_t *async); @@ -1376,8 +1376,7 @@ void onUdfcPipeConnect(uv_connect_t *connect, int status) { uv_sem_post(&uvTask->taskSem); } -int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) { - SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode)); +int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask) { uvTask->type = uvTaskType; uvTask->udfc = task->session->udfc; @@ -1412,7 +1411,6 @@ int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN } uv_sem_init(&uvTask->taskSem, 0); - *pUvTask = uvTask; return 0; } @@ -1615,10 +1613,10 @@ int32_t udfcClose() { } int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { - SClientUvTaskNode *uvTask = NULL; - - udfcCreateUvTask(task, uvTaskType, &uvTask); + SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode)); fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe); + + udfcInitializeUvTask(task, uvTaskType, uvTask); udfcQueueUvTask(uvTask); udfcGetUdfTaskResultFromUvTask(task, uvTask); if (uvTaskType == UV_TASK_CONNECT) { @@ -1629,6 +1627,8 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { taosMemoryFree(uvTask->reqBuf.base); uvTask->reqBuf.base = NULL; taosMemoryFree(uvTask); + fnDebug("udfc freed uvTask: %p", task); + uvTask = NULL; return task->errCode; } diff --git a/source/libs/function/test/udf1.c b/source/libs/function/test/udf1.c index dfbae357efd71c646bff5060d06ee984c05fae63..5be18af553498bf2b05607b69223c302d3826fae 100644 --- a/source/libs/function/test/udf1.c +++ b/source/libs/function/test/udf1.c @@ -1,7 +1,12 @@ #include #include #include - +#ifdef LINUX +#include +#endif +#ifdef WINDOWS +#include +#endif #include "taosudf.h" @@ -35,6 +40,12 @@ DLL_EXPORT int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) { udfColDataSet(resultCol, i, (char *)&luckyNum, false); } } - + //to simulate actual processing delay by udf +#ifdef LINUX + usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second) +#endif +#ifdef WINDOWS + Sleep(1); +#endif return 0; } \ No newline at end of file