diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index a179c29440eafcc5b1678b91a61256eff7bdbb9b..1f531be1f62b8b209b4907a93bd757fe50a21b1f 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -23,9 +23,95 @@ //TODO: udfd restart when exist or aborts //TODO: network error processing. //TODO: add unit test -void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); -int32_t destructUdfService(); -int32_t constructUdfService(); +//TODO: test libuv queue +typedef void *QUEUE[2]; + +/* Private macros. */ +#define QUEUE_NEXT(q) (*(QUEUE **) &((*(q))[0])) +#define QUEUE_PREV(q) (*(QUEUE **) &((*(q))[1])) +#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) +#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) + +/* Public macros. */ +#define QUEUE_DATA(ptr, type, field) \ + ((type *) ((char *) (ptr) - offsetof(type, field))) + +/* Important note: mutating the list while QUEUE_FOREACH is + * iterating over its elements results in undefined behavior. + */ +#define QUEUE_FOREACH(q, h) \ + for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q)) + +#define QUEUE_EMPTY(q) \ + ((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q)) + +#define QUEUE_HEAD(q) \ + (QUEUE_NEXT(q)) + +#define QUEUE_INIT(q) \ + do { \ + QUEUE_NEXT(q) = (q); \ + QUEUE_PREV(q) = (q); \ + } \ + while (0) + +#define QUEUE_ADD(h, n) \ + do { \ + QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \ + QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \ + QUEUE_PREV(h) = QUEUE_PREV(n); \ + QUEUE_PREV_NEXT(h) = (h); \ + } \ + while (0) + +#define QUEUE_SPLIT(h, q, n) \ + do { \ + QUEUE_PREV(n) = QUEUE_PREV(h); \ + QUEUE_PREV_NEXT(n) = (n); \ + QUEUE_NEXT(n) = (q); \ + QUEUE_PREV(h) = QUEUE_PREV(q); \ + QUEUE_PREV_NEXT(h) = (h); \ + QUEUE_PREV(q) = (n); \ + } \ + while (0) + +#define QUEUE_MOVE(h, n) \ + do { \ + if (QUEUE_EMPTY(h)) \ + QUEUE_INIT(n); \ + else { \ + QUEUE* q = QUEUE_HEAD(h); \ + QUEUE_SPLIT(h, q, n); \ + } \ + } \ + while (0) + +#define QUEUE_INSERT_HEAD(h, q) \ + do { \ + QUEUE_NEXT(q) = QUEUE_NEXT(h); \ + QUEUE_PREV(q) = (h); \ + QUEUE_NEXT_PREV(q) = (q); \ + QUEUE_NEXT(h) = (q); \ + } \ + while (0) + +#define QUEUE_INSERT_TAIL(h, q) \ + do { \ + QUEUE_NEXT(q) = (h); \ + QUEUE_PREV(q) = QUEUE_PREV(h); \ + QUEUE_PREV_NEXT(q) = (q); \ + QUEUE_PREV(h) = (q); \ + } \ + while (0) + +#define QUEUE_REMOVE(q) \ + do { \ + QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \ + QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \ + } \ + while (0) + + enum { UV_TASK_CONNECT = 0, UV_TASK_REQ_RSP = 1, @@ -49,8 +135,9 @@ typedef struct SClientUvTaskNode { uv_sem_t taskSem; uv_buf_t rspBuf; - struct SClientUvTaskNode *prev; - struct SClientUvTaskNode *next; + QUEUE recvTaskQueue; + QUEUE procTaskQueue; + QUEUE connTaskQueue; } SClientUvTaskNode; typedef struct SClientUdfTask { @@ -87,7 +174,7 @@ typedef struct SClientConnBuf { typedef struct SClientUvConn { uv_pipe_t *pipe; - SClientUvTaskNode taskQueue; + QUEUE taskQueue; SClientConnBuf readBuf; } SClientUvConn; @@ -104,8 +191,6 @@ uv_async_t gUdfLoopStopAsync; uv_mutex_t gUdfTaskQueueMutex; int64_t gUdfTaskSeqNum = 0; -SArray* gUdfWaitResultTasks = NULL; - enum { UDFC_STATE_INITAL = 0, // initial state UDFC_STATE_STARTNG, // starting after startUdfService @@ -117,69 +202,11 @@ enum { int8_t gUdfcState = UDFC_STATE_INITAL; //double circular linked list -typedef SClientUvTaskNode *SClientUvTaskQueue; -SClientUvTaskNode gUdfQueueNode; -SClientUvTaskQueue gUdfTaskQueue = &gUdfQueueNode; - -//TODO: deal with uv task that has been started and then udfd core dumped - -void udfTaskQueueInit(SClientUvTaskQueue q) { - q->next = q; - q->prev = q; -} - -bool udfTaskQueueIsEmpty(SClientUvTaskQueue q) { - return q == q->next; -} - -void udfTaskQueueInsertTail(SClientUvTaskQueue q, SClientUvTaskNode *e) { - e->next = q; - e->prev = q->prev; - e->prev->next = e; - q->prev = e; -} - -void udfTaskQueueInsertTaskAtHead(SClientUvTaskQueue q, SClientUvTaskNode *e) { - e->next = q->next; - e->prev = q; - q->next->prev = e; - q->next = e; -} - -void udfTaskQueueRemoveTask(SClientUvTaskNode *e) { - e->prev->next = e->next; - e->next->prev = e->prev; -} - -void udfTaskQueueSplit(SClientUvTaskQueue q, SClientUvTaskNode *from, SClientUvTaskQueue n) { - n->prev = q->prev; - n->prev->next = n; - n->next = from; - q->prev = from->prev; - q->prev->next = q; - from->prev = n; -} - -SClientUvTaskNode *udfTaskQueueHeadTask(SClientUvTaskQueue q) { - return q->next; -} -SClientUvTaskNode *udfTaskQueueTailTask(SClientUvTaskQueue q) { - return q->prev; -} - -SClientUvTaskNode *udfTaskQueueNext(SClientUvTaskNode *e) { - return e->next; -} +QUEUE gUdfTaskQueue = {0}; -void udfTaskQueueMove(SClientUvTaskQueue q, SClientUvTaskQueue n) { - if (udfTaskQueueIsEmpty(q)) { - udfTaskQueueInit(n); - } else { - SClientUvTaskNode *h = udfTaskQueueHeadTask(q); - udfTaskQueueSplit(q, h, n); - } -} +//TODO: deal with uv task that has been started and then udfd core dumped +QUEUE gUvProcTaskQueue = {0}; int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) { debugPrint("%s", "encoding request"); @@ -471,30 +498,14 @@ int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) { return 0; } -void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) { - debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal); - uv_close((uv_handle_t *) req, NULL); - //TODO: restart the udfd process - if (gUdfcState == UDFC_STATE_STOPPING) { - if (term_signal != SIGINT) { - //TODO: log error - } - } - if (gUdfcState == UDFC_STATE_READY) { - gUdfcState = UDFC_STATE_RESTARTING; - //TODO: asynchronous without blocking. how to do it - destructUdfService(); - constructUdfService(); - } - -} - void onUdfcPipeClose(uv_handle_t *handle) { SClientUvConn *conn = handle->data; - if (!udfTaskQueueIsEmpty(&conn->taskQueue)) { - SClientUvTaskNode *task = udfTaskQueueHeadTask(&conn->taskQueue); + if (!QUEUE_EMPTY(&conn->taskQueue)) { + QUEUE* h = QUEUE_HEAD(&conn->taskQueue); + SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue); task->errCode = 0; uv_sem_post(&task->taskSem); + QUEUE_REMOVE(&task->procTaskQueue); } taosMemoryFree(conn->readBuf.buf); @@ -599,14 +610,16 @@ void udfcUvHandleRsp(SClientUvConn *conn) { SClientConnBuf *connBuf = &conn->readBuf; int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen int32_t then seqnum - if (udfTaskQueueIsEmpty(&conn->taskQueue)) { + if (QUEUE_EMPTY(&conn->taskQueue)) { //LOG error return; } bool found = false; SClientUvTaskNode *taskFound = NULL; - SClientUvTaskNode *task = udfTaskQueueNext(&conn->taskQueue); - while (task != &conn->taskQueue) { + QUEUE* h = QUEUE_NEXT(&conn->taskQueue); + SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue); + + while (h != &conn->taskQueue) { if (task->seqNum == seqNum) { if (found == false) { found = true; @@ -616,13 +629,15 @@ void udfcUvHandleRsp(SClientUvConn *conn) { continue; } } - task = udfTaskQueueNext(task); + h = QUEUE_NEXT(h); + task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue); } if (taskFound) { taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len); - udfTaskQueueRemoveTask(taskFound); + QUEUE_REMOVE(&taskFound->connTaskQueue); uv_sem_post(&taskFound->taskSem); + QUEUE_REMOVE(&taskFound->procTaskQueue); } else { //TODO: LOG error } @@ -665,7 +680,7 @@ void onUdfClientWrite(uv_write_t *write, int status) { if (status == 0) { uv_pipe_t *pipe = uvTask->pipe; SClientUvConn *conn = pipe->data; - udfTaskQueueInsertTail(&conn->taskQueue, uvTask); + QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue); } else { //TODO Log error; } @@ -683,6 +698,7 @@ void onUdfClientConnect(uv_connect_t *connect, int status) { uv_read_start((uv_stream_t *) uvTask->pipe, udfcAllocateBuffer, onUdfcRead); taosMemoryFree(connect); uv_sem_post(&uvTask->taskSem); + QUEUE_REMOVE(&uvTask->procTaskQueue); } int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) { @@ -726,7 +742,7 @@ int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) { debugPrint("%s, %d", "queue uv task", uvTask->type); uv_mutex_lock(&gUdfTaskQueueMutex); - udfTaskQueueInsertTail(gUdfTaskQueue, uvTask); + QUEUE_INSERT_TAIL(&gUdfTaskQueue, &uvTask->recvTaskQueue); uv_mutex_unlock(&gUdfTaskQueueMutex); uv_async_send(&gUdfLoopTaskAync); @@ -750,7 +766,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { conn->readBuf.cap = 0; conn->readBuf.buf = 0; conn->readBuf.total = -1; - udfTaskQueueInit(&conn->taskQueue); + QUEUE_INIT(&conn->taskQueue); pipe->data = conn; @@ -769,7 +785,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { } case UV_TASK_DISCONNECT: { SClientUvConn *conn = uvTask->pipe->data; - udfTaskQueueInsertTail(&conn->taskQueue, uvTask); + QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue); uv_close((uv_handle_t *) uvTask->pipe, onUdfcPipeClose); break; } @@ -782,34 +798,33 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { } void udfClientAsyncCb(uv_async_t *async) { - SClientUvTaskNode node; - SClientUvTaskQueue q = &node; - udfTaskQueueInit(q); + QUEUE wq; uv_mutex_lock(&gUdfTaskQueueMutex); - udfTaskQueueMove(gUdfTaskQueue, q); + QUEUE_MOVE(&gUdfTaskQueue, &wq); uv_mutex_unlock(&gUdfTaskQueueMutex); - while (!udfTaskQueueIsEmpty(q)) { - SClientUvTaskNode *task = udfTaskQueueHeadTask(q); - udfTaskQueueRemoveTask(task); + while (!QUEUE_EMPTY(&wq)) { + QUEUE* h = QUEUE_HEAD(&wq); + QUEUE_REMOVE(h); + SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); startUvUdfTask(task); + QUEUE_INSERT_TAIL(&gUvProcTaskQueue, &task->procTaskQueue); } } -void udfStopAsyncCb(uv_async_t *async) { - SClientUvTaskNode node; - SClientUvTaskQueue q = &node; - udfTaskQueueInit(q); +void cleanUpUvTasks() { + QUEUE wq; uv_mutex_lock(&gUdfTaskQueueMutex); - udfTaskQueueMove(gUdfTaskQueue, q); + QUEUE_MOVE(&gUdfTaskQueue, &wq); uv_mutex_unlock(&gUdfTaskQueueMutex); - while (!udfTaskQueueIsEmpty(q)) { - SClientUvTaskNode *task = udfTaskQueueHeadTask(q); - udfTaskQueueRemoveTask(task); + while (!QUEUE_EMPTY(&wq)) { + QUEUE* h = QUEUE_HEAD(&wq); + QUEUE_REMOVE(h); + SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); if (gUdfcState == UDFC_STATE_STOPPING) { task->errCode = UDFC_CODE_STOPPING; } else if (gUdfcState == UDFC_STATE_RESTARTING) { @@ -819,57 +834,99 @@ void udfStopAsyncCb(uv_async_t *async) { } // TODO: deal with tasks that are waiting result. + while (!QUEUE_EMPTY(&gUvProcTaskQueue)) { + QUEUE* h = QUEUE_HEAD(&gUvProcTaskQueue); + QUEUE_REMOVE(h); + SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue); + if (gUdfcState == UDFC_STATE_STOPPING) { + task->errCode = UDFC_CODE_STOPPING; + } else if (gUdfcState == UDFC_STATE_RESTARTING) { + task->errCode = UDFC_CODE_RESTARTING; + } + uv_sem_post(&task->taskSem); + } +} - uv_stop(&gUdfdLoop); +void udfStopAsyncCb(uv_async_t *async) { + cleanUpUvTasks(); + if (gUdfcState == UDFC_STATE_STOPPING) { + uv_stop(&gUdfdLoop); + } } +int32_t startUdfd(); +void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) { + debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal); + uv_close((uv_handle_t *) req, NULL); + //TODO: restart the udfd process + if (gUdfcState == UDFC_STATE_STOPPING) { + if (term_signal != SIGINT) { + //TODO: log error + } + } + if (gUdfcState == UDFC_STATE_READY) { + gUdfcState = UDFC_STATE_RESTARTING; + //TODO: asynchronous without blocking. how to do it + cleanUpUvTasks(); + startUdfd(); + } +} +int32_t startUdfd() { + //TODO: path + uv_process_options_t options = {0}; + static char path[256] = {0}; + size_t cwdSize; + uv_cwd(path, &cwdSize); + strcat(path, "/udfd"); + char* args[2] = {path, NULL}; + options.args = args; + options.file = path; + options.exit_cb = onUdfdExit; + options.stdio_count = 3; + uv_stdio_container_t child_stdio[3]; + child_stdio[0].flags = UV_IGNORE; + child_stdio[1].flags = UV_INHERIT_FD; + child_stdio[1].data.fd = 1; + child_stdio[2].flags = UV_INHERIT_FD; + child_stdio[2].data.fd = 2; + options.stdio = child_stdio; + //TODO spawn error + int err = uv_spawn(&gUdfdLoop, &gUdfdProcess, &options); + if (err != 0) { + debugPrint("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err)); + } + return err; +} -void startUdfd(void *argsThread) { +void constructUdfService(void *argsThread) { uv_loop_init(&gUdfdLoop); - //TODO: path - uv_process_options_t options; - static char path[256] = {0}; - size_t cwdSize; - uv_cwd(path, &cwdSize); - strcat(path, "./udfd"); - char* args[2] = {path, NULL}; - options.args = args; - options.file = path; - options.exit_cb = onUdfdExit; - - int err = uv_spawn(&gUdfdLoop, &gUdfdProcess, &options); - if (err != 0) { - debugPrint("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err)); - } + //TODO spawn error + startUdfd(); uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb); uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb); uv_mutex_init(&gUdfTaskQueueMutex); - udfTaskQueueInit(gUdfTaskQueue); - gUdfWaitResultTasks = taosArrayInit(256, sizeof(SClientUvTaskNode*)); + QUEUE_INIT(&gUdfTaskQueue); + QUEUE_INIT(&gUvProcTaskQueue); uv_barrier_wait(&gUdfInitBarrier); //TODO return value of uv_run uv_run(&gUdfdLoop, UV_RUN_DEFAULT); uv_loop_close(&gUdfdLoop); } -int32_t constructUdfService() { - uv_barrier_init(&gUdfInitBarrier, 2); - uv_thread_create(&gUdfLoopThread, startUdfd, 0); - uv_barrier_wait(&gUdfInitBarrier); - return 0; -} int32_t startUdfService() { gUdfcState = UDFC_STATE_STARTNG; - constructUdfService(); - gUdfcState = UDFC_STATE_READY; + uv_barrier_init(&gUdfInitBarrier, 2); + uv_thread_create(&gUdfLoopThread, constructUdfService, 0); + uv_barrier_wait(&gUdfInitBarrier); gUdfcState = UDFC_STATE_READY; return 0; } -int32_t destructUdfService() { +int32_t stopUdfService() { + gUdfcState = UDFC_STATE_STOPPING; uv_barrier_destroy(&gUdfInitBarrier); if (gUdfcState == UDFC_STATE_STOPPING) { uv_process_kill(&gUdfdProcess, SIGINT); @@ -877,13 +934,7 @@ int32_t destructUdfService() { uv_async_send(&gUdfLoopStopAsync); uv_mutex_destroy(&gUdfTaskQueueMutex); uv_thread_join(&gUdfLoopThread); - return 0; -} - -int32_t stopUdfService() { - gUdfcState = UDFC_STATE_STOPPING; - destructUdfService(); - gUdfcState = UDFC_STATUS_FINAL; + return 0; gUdfcState = UDFC_STATUS_FINAL; return 0; } diff --git a/source/libs/function/test/udf1.c b/source/libs/function/test/udf1.c index dc88e8cf3e194532a34414b9362711141b0b8a9d..3fdc522ef9e0dbae0f70d2ad5b3689781e7b1eea 100644 --- a/source/libs/function/test/udf1.c +++ b/source/libs/function/test/udf1.c @@ -2,18 +2,16 @@ #include #include -#include "os.h" #include "tudf.h" - void udf1(int8_t step, char *state, int32_t stateSize, SUdfDataBlock input, char **newState, int32_t *newStateSize, SUdfDataBlock *output) { fprintf(stdout, "%s, step:%d\n", "udf function called", step); - char *newStateBuf = taosMemoryMalloc(stateSize); + char *newStateBuf = malloc(stateSize); memcpy(newStateBuf, state, stateSize); *newState = newStateBuf; *newStateSize = stateSize; - char *outputBuf = taosMemoryMalloc(input.size); + char *outputBuf = malloc(input.size); memcpy(outputBuf, input.data, input.size); output->data = outputBuf; output->size = input.size;