未验证 提交 b9e7eb9c 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #15399 from taosdata/szhou/fix/udf

feat: add delay to udf1 to simulate actual processing
...@@ -877,7 +877,7 @@ void udfcUvHandleError(SClientUvConn *conn); ...@@ -877,7 +877,7 @@ void udfcUvHandleError(SClientUvConn *conn);
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
void onUdfcPipeWrite(uv_write_t *write, int status); void onUdfcPipeWrite(uv_write_t *write, int status);
void onUdfcPipeConnect(uv_connect_t *connect, int status); void onUdfcPipeConnect(uv_connect_t *connect, int status);
int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask); int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask);
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask); int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask);
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask); int32_t udfcStartUvTask(SClientUvTaskNode *uvTask);
void udfcAsyncTaskCb(uv_async_t *async); void udfcAsyncTaskCb(uv_async_t *async);
...@@ -1376,8 +1376,7 @@ void onUdfcPipeConnect(uv_connect_t *connect, int status) { ...@@ -1376,8 +1376,7 @@ void onUdfcPipeConnect(uv_connect_t *connect, int status) {
uv_sem_post(&uvTask->taskSem); uv_sem_post(&uvTask->taskSem);
} }
int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) { int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask) {
SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
uvTask->type = uvTaskType; uvTask->type = uvTaskType;
uvTask->udfc = task->session->udfc; uvTask->udfc = task->session->udfc;
...@@ -1412,7 +1411,6 @@ int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN ...@@ -1412,7 +1411,6 @@ int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
} }
uv_sem_init(&uvTask->taskSem, 0); uv_sem_init(&uvTask->taskSem, 0);
*pUvTask = uvTask;
return 0; return 0;
} }
...@@ -1615,10 +1613,10 @@ int32_t udfcClose() { ...@@ -1615,10 +1613,10 @@ int32_t udfcClose() {
} }
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
SClientUvTaskNode *uvTask = NULL; SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
udfcCreateUvTask(task, uvTaskType, &uvTask);
fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe); fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe);
udfcInitializeUvTask(task, uvTaskType, uvTask);
udfcQueueUvTask(uvTask); udfcQueueUvTask(uvTask);
udfcGetUdfTaskResultFromUvTask(task, uvTask); udfcGetUdfTaskResultFromUvTask(task, uvTask);
if (uvTaskType == UV_TASK_CONNECT) { if (uvTaskType == UV_TASK_CONNECT) {
...@@ -1629,6 +1627,8 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { ...@@ -1629,6 +1627,8 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
taosMemoryFree(uvTask->reqBuf.base); taosMemoryFree(uvTask->reqBuf.base);
uvTask->reqBuf.base = NULL; uvTask->reqBuf.base = NULL;
taosMemoryFree(uvTask); taosMemoryFree(uvTask);
fnDebug("udfc freed uvTask: %p", task);
uvTask = NULL; uvTask = NULL;
return task->errCode; return task->errCode;
} }
......
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#ifdef LINUX
#include <unistd.h>
#endif
#ifdef WINDOWS
#include <windows.h>
#endif
#include "taosudf.h" #include "taosudf.h"
...@@ -35,6 +40,12 @@ DLL_EXPORT int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) { ...@@ -35,6 +40,12 @@ DLL_EXPORT int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) {
udfColDataSet(resultCol, i, (char *)&luckyNum, false); udfColDataSet(resultCol, i, (char *)&luckyNum, false);
} }
} }
//to simulate actual processing delay by udf
#ifdef LINUX
usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second)
#endif
#ifdef WINDOWS
Sleep(1);
#endif
return 0; return 0;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册