提交 ce9585a3 编写于 作者: S shenglian zhou

before integration test success

上级 44bc9731
......@@ -23,9 +23,95 @@
//TODO: udfd restart when exist or aborts
//TODO: network error processing.
//TODO: add unit test
void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
int32_t destructUdfService();
int32_t constructUdfService();
//TODO: test libuv queue
typedef void *QUEUE[2];
/* Private macros. */
#define QUEUE_NEXT(q) (*(QUEUE **) &((*(q))[0]))
#define QUEUE_PREV(q) (*(QUEUE **) &((*(q))[1]))
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
/* Public macros. */
#define QUEUE_DATA(ptr, type, field) \
((type *) ((char *) (ptr) - offsetof(type, field)))
/* Important note: mutating the list while QUEUE_FOREACH is
* iterating over its elements results in undefined behavior.
*/
#define QUEUE_FOREACH(q, h) \
for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))
#define QUEUE_EMPTY(q) \
((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q))
#define QUEUE_HEAD(q) \
(QUEUE_NEXT(q))
#define QUEUE_INIT(q) \
do { \
QUEUE_NEXT(q) = (q); \
QUEUE_PREV(q) = (q); \
} \
while (0)
#define QUEUE_ADD(h, n) \
do { \
QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \
QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \
QUEUE_PREV(h) = QUEUE_PREV(n); \
QUEUE_PREV_NEXT(h) = (h); \
} \
while (0)
#define QUEUE_SPLIT(h, q, n) \
do { \
QUEUE_PREV(n) = QUEUE_PREV(h); \
QUEUE_PREV_NEXT(n) = (n); \
QUEUE_NEXT(n) = (q); \
QUEUE_PREV(h) = QUEUE_PREV(q); \
QUEUE_PREV_NEXT(h) = (h); \
QUEUE_PREV(q) = (n); \
} \
while (0)
#define QUEUE_MOVE(h, n) \
do { \
if (QUEUE_EMPTY(h)) \
QUEUE_INIT(n); \
else { \
QUEUE* q = QUEUE_HEAD(h); \
QUEUE_SPLIT(h, q, n); \
} \
} \
while (0)
#define QUEUE_INSERT_HEAD(h, q) \
do { \
QUEUE_NEXT(q) = QUEUE_NEXT(h); \
QUEUE_PREV(q) = (h); \
QUEUE_NEXT_PREV(q) = (q); \
QUEUE_NEXT(h) = (q); \
} \
while (0)
#define QUEUE_INSERT_TAIL(h, q) \
do { \
QUEUE_NEXT(q) = (h); \
QUEUE_PREV(q) = QUEUE_PREV(h); \
QUEUE_PREV_NEXT(q) = (q); \
QUEUE_PREV(h) = (q); \
} \
while (0)
#define QUEUE_REMOVE(q) \
do { \
QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \
QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \
} \
while (0)
enum {
UV_TASK_CONNECT = 0,
UV_TASK_REQ_RSP = 1,
......@@ -49,8 +135,9 @@ typedef struct SClientUvTaskNode {
uv_sem_t taskSem;
uv_buf_t rspBuf;
struct SClientUvTaskNode *prev;
struct SClientUvTaskNode *next;
QUEUE recvTaskQueue;
QUEUE procTaskQueue;
QUEUE connTaskQueue;
} SClientUvTaskNode;
typedef struct SClientUdfTask {
......@@ -87,7 +174,7 @@ typedef struct SClientConnBuf {
typedef struct SClientUvConn {
uv_pipe_t *pipe;
SClientUvTaskNode taskQueue;
QUEUE taskQueue;
SClientConnBuf readBuf;
} SClientUvConn;
......@@ -104,8 +191,6 @@ uv_async_t gUdfLoopStopAsync;
uv_mutex_t gUdfTaskQueueMutex;
int64_t gUdfTaskSeqNum = 0;
SArray* gUdfWaitResultTasks = NULL;
enum {
UDFC_STATE_INITAL = 0, // initial state
UDFC_STATE_STARTNG, // starting after startUdfService
......@@ -117,69 +202,11 @@ enum {
int8_t gUdfcState = UDFC_STATE_INITAL;
//double circular linked list
typedef SClientUvTaskNode *SClientUvTaskQueue;
SClientUvTaskNode gUdfQueueNode;
SClientUvTaskQueue gUdfTaskQueue = &gUdfQueueNode;
//TODO: deal with uv task that has been started and then udfd core dumped
void udfTaskQueueInit(SClientUvTaskQueue q) {
q->next = q;
q->prev = q;
}
bool udfTaskQueueIsEmpty(SClientUvTaskQueue q) {
return q == q->next;
}
void udfTaskQueueInsertTail(SClientUvTaskQueue q, SClientUvTaskNode *e) {
e->next = q;
e->prev = q->prev;
e->prev->next = e;
q->prev = e;
}
void udfTaskQueueInsertTaskAtHead(SClientUvTaskQueue q, SClientUvTaskNode *e) {
e->next = q->next;
e->prev = q;
q->next->prev = e;
q->next = e;
}
void udfTaskQueueRemoveTask(SClientUvTaskNode *e) {
e->prev->next = e->next;
e->next->prev = e->prev;
}
void udfTaskQueueSplit(SClientUvTaskQueue q, SClientUvTaskNode *from, SClientUvTaskQueue n) {
n->prev = q->prev;
n->prev->next = n;
n->next = from;
q->prev = from->prev;
q->prev->next = q;
from->prev = n;
}
SClientUvTaskNode *udfTaskQueueHeadTask(SClientUvTaskQueue q) {
return q->next;
}
SClientUvTaskNode *udfTaskQueueTailTask(SClientUvTaskQueue q) {
return q->prev;
}
QUEUE gUdfTaskQueue = {0};
SClientUvTaskNode *udfTaskQueueNext(SClientUvTaskNode *e) {
return e->next;
}
void udfTaskQueueMove(SClientUvTaskQueue q, SClientUvTaskQueue n) {
if (udfTaskQueueIsEmpty(q)) {
udfTaskQueueInit(n);
} else {
SClientUvTaskNode *h = udfTaskQueueHeadTask(q);
udfTaskQueueSplit(q, h, n);
}
}
//TODO: deal with uv task that has been started and then udfd core dumped
QUEUE gUvProcTaskQueue = {0};
int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) {
debugPrint("%s", "encoding request");
......@@ -471,30 +498,14 @@ int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) {
return 0;
}
void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal);
uv_close((uv_handle_t *) req, NULL);
//TODO: restart the udfd process
if (gUdfcState == UDFC_STATE_STOPPING) {
if (term_signal != SIGINT) {
//TODO: log error
}
}
if (gUdfcState == UDFC_STATE_READY) {
gUdfcState = UDFC_STATE_RESTARTING;
//TODO: asynchronous without blocking. how to do it
destructUdfService();
constructUdfService();
}
}
void onUdfcPipeClose(uv_handle_t *handle) {
SClientUvConn *conn = handle->data;
if (!udfTaskQueueIsEmpty(&conn->taskQueue)) {
SClientUvTaskNode *task = udfTaskQueueHeadTask(&conn->taskQueue);
if (!QUEUE_EMPTY(&conn->taskQueue)) {
QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
task->errCode = 0;
uv_sem_post(&task->taskSem);
QUEUE_REMOVE(&task->procTaskQueue);
}
taosMemoryFree(conn->readBuf.buf);
......@@ -599,14 +610,16 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
SClientConnBuf *connBuf = &conn->readBuf;
int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen int32_t then seqnum
if (udfTaskQueueIsEmpty(&conn->taskQueue)) {
if (QUEUE_EMPTY(&conn->taskQueue)) {
//LOG error
return;
}
bool found = false;
SClientUvTaskNode *taskFound = NULL;
SClientUvTaskNode *task = udfTaskQueueNext(&conn->taskQueue);
while (task != &conn->taskQueue) {
QUEUE* h = QUEUE_NEXT(&conn->taskQueue);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
while (h != &conn->taskQueue) {
if (task->seqNum == seqNum) {
if (found == false) {
found = true;
......@@ -616,13 +629,15 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
continue;
}
}
task = udfTaskQueueNext(task);
h = QUEUE_NEXT(h);
task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
}
if (taskFound) {
taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
udfTaskQueueRemoveTask(taskFound);
QUEUE_REMOVE(&taskFound->connTaskQueue);
uv_sem_post(&taskFound->taskSem);
QUEUE_REMOVE(&taskFound->procTaskQueue);
} else {
//TODO: LOG error
}
......@@ -665,7 +680,7 @@ void onUdfClientWrite(uv_write_t *write, int status) {
if (status == 0) {
uv_pipe_t *pipe = uvTask->pipe;
SClientUvConn *conn = pipe->data;
udfTaskQueueInsertTail(&conn->taskQueue, uvTask);
QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
} else {
//TODO Log error;
}
......@@ -683,6 +698,7 @@ void onUdfClientConnect(uv_connect_t *connect, int status) {
uv_read_start((uv_stream_t *) uvTask->pipe, udfcAllocateBuffer, onUdfcRead);
taosMemoryFree(connect);
uv_sem_post(&uvTask->taskSem);
QUEUE_REMOVE(&uvTask->procTaskQueue);
}
int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) {
......@@ -726,7 +742,7 @@ int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) {
debugPrint("%s, %d", "queue uv task", uvTask->type);
uv_mutex_lock(&gUdfTaskQueueMutex);
udfTaskQueueInsertTail(gUdfTaskQueue, uvTask);
QUEUE_INSERT_TAIL(&gUdfTaskQueue, &uvTask->recvTaskQueue);
uv_mutex_unlock(&gUdfTaskQueueMutex);
uv_async_send(&gUdfLoopTaskAync);
......@@ -750,7 +766,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
conn->readBuf.cap = 0;
conn->readBuf.buf = 0;
conn->readBuf.total = -1;
udfTaskQueueInit(&conn->taskQueue);
QUEUE_INIT(&conn->taskQueue);
pipe->data = conn;
......@@ -769,7 +785,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
}
case UV_TASK_DISCONNECT: {
SClientUvConn *conn = uvTask->pipe->data;
udfTaskQueueInsertTail(&conn->taskQueue, uvTask);
QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
uv_close((uv_handle_t *) uvTask->pipe, onUdfcPipeClose);
break;
}
......@@ -782,34 +798,33 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
}
void udfClientAsyncCb(uv_async_t *async) {
SClientUvTaskNode node;
SClientUvTaskQueue q = &node;
udfTaskQueueInit(q);
QUEUE wq;
uv_mutex_lock(&gUdfTaskQueueMutex);
udfTaskQueueMove(gUdfTaskQueue, q);
QUEUE_MOVE(&gUdfTaskQueue, &wq);
uv_mutex_unlock(&gUdfTaskQueueMutex);
while (!udfTaskQueueIsEmpty(q)) {
SClientUvTaskNode *task = udfTaskQueueHeadTask(q);
udfTaskQueueRemoveTask(task);
while (!QUEUE_EMPTY(&wq)) {
QUEUE* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
startUvUdfTask(task);
QUEUE_INSERT_TAIL(&gUvProcTaskQueue, &task->procTaskQueue);
}
}
void udfStopAsyncCb(uv_async_t *async) {
SClientUvTaskNode node;
SClientUvTaskQueue q = &node;
udfTaskQueueInit(q);
void cleanUpUvTasks() {
QUEUE wq;
uv_mutex_lock(&gUdfTaskQueueMutex);
udfTaskQueueMove(gUdfTaskQueue, q);
QUEUE_MOVE(&gUdfTaskQueue, &wq);
uv_mutex_unlock(&gUdfTaskQueueMutex);
while (!udfTaskQueueIsEmpty(q)) {
SClientUvTaskNode *task = udfTaskQueueHeadTask(q);
udfTaskQueueRemoveTask(task);
while (!QUEUE_EMPTY(&wq)) {
QUEUE* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
if (gUdfcState == UDFC_STATE_STOPPING) {
task->errCode = UDFC_CODE_STOPPING;
} else if (gUdfcState == UDFC_STATE_RESTARTING) {
......@@ -819,57 +834,99 @@ void udfStopAsyncCb(uv_async_t *async) {
}
// TODO: deal with tasks that are waiting result.
while (!QUEUE_EMPTY(&gUvProcTaskQueue)) {
QUEUE* h = QUEUE_HEAD(&gUvProcTaskQueue);
QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
if (gUdfcState == UDFC_STATE_STOPPING) {
task->errCode = UDFC_CODE_STOPPING;
} else if (gUdfcState == UDFC_STATE_RESTARTING) {
task->errCode = UDFC_CODE_RESTARTING;
}
uv_sem_post(&task->taskSem);
}
}
void udfStopAsyncCb(uv_async_t *async) {
cleanUpUvTasks();
if (gUdfcState == UDFC_STATE_STOPPING) {
uv_stop(&gUdfdLoop);
}
}
int32_t startUdfd();
void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal);
uv_close((uv_handle_t *) req, NULL);
//TODO: restart the udfd process
if (gUdfcState == UDFC_STATE_STOPPING) {
if (term_signal != SIGINT) {
//TODO: log error
}
}
if (gUdfcState == UDFC_STATE_READY) {
gUdfcState = UDFC_STATE_RESTARTING;
//TODO: asynchronous without blocking. how to do it
cleanUpUvTasks();
startUdfd();
}
}
void startUdfd(void *argsThread) {
uv_loop_init(&gUdfdLoop);
int32_t startUdfd() {
//TODO: path
uv_process_options_t options;
uv_process_options_t options = {0};
static char path[256] = {0};
size_t cwdSize;
uv_cwd(path, &cwdSize);
strcat(path, "./udfd");
strcat(path, "/udfd");
char* args[2] = {path, NULL};
options.args = args;
options.file = path;
options.exit_cb = onUdfdExit;
options.stdio_count = 3;
uv_stdio_container_t child_stdio[3];
child_stdio[0].flags = UV_IGNORE;
child_stdio[1].flags = UV_INHERIT_FD;
child_stdio[1].data.fd = 1;
child_stdio[2].flags = UV_INHERIT_FD;
child_stdio[2].data.fd = 2;
options.stdio = child_stdio;
//TODO spawn error
int err = uv_spawn(&gUdfdLoop, &gUdfdProcess, &options);
if (err != 0) {
debugPrint("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
}
return err;
}
void constructUdfService(void *argsThread) {
uv_loop_init(&gUdfdLoop);
//TODO spawn error
startUdfd();
uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb);
uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb);
uv_mutex_init(&gUdfTaskQueueMutex);
udfTaskQueueInit(gUdfTaskQueue);
gUdfWaitResultTasks = taosArrayInit(256, sizeof(SClientUvTaskNode*));
QUEUE_INIT(&gUdfTaskQueue);
QUEUE_INIT(&gUvProcTaskQueue);
uv_barrier_wait(&gUdfInitBarrier);
//TODO return value of uv_run
uv_run(&gUdfdLoop, UV_RUN_DEFAULT);
uv_loop_close(&gUdfdLoop);
}
int32_t constructUdfService() {
uv_barrier_init(&gUdfInitBarrier, 2);
uv_thread_create(&gUdfLoopThread, startUdfd, 0);
uv_barrier_wait(&gUdfInitBarrier);
return 0;
}
int32_t startUdfService() {
gUdfcState = UDFC_STATE_STARTNG;
constructUdfService();
gUdfcState = UDFC_STATE_READY;
uv_barrier_init(&gUdfInitBarrier, 2);
uv_thread_create(&gUdfLoopThread, constructUdfService, 0);
uv_barrier_wait(&gUdfInitBarrier); gUdfcState = UDFC_STATE_READY;
return 0;
}
int32_t destructUdfService() {
int32_t stopUdfService() {
gUdfcState = UDFC_STATE_STOPPING;
uv_barrier_destroy(&gUdfInitBarrier);
if (gUdfcState == UDFC_STATE_STOPPING) {
uv_process_kill(&gUdfdProcess, SIGINT);
......@@ -877,13 +934,7 @@ int32_t destructUdfService() {
uv_async_send(&gUdfLoopStopAsync);
uv_mutex_destroy(&gUdfTaskQueueMutex);
uv_thread_join(&gUdfLoopThread);
return 0;
}
int32_t stopUdfService() {
gUdfcState = UDFC_STATE_STOPPING;
destructUdfService();
gUdfcState = UDFC_STATUS_FINAL;
return 0; gUdfcState = UDFC_STATUS_FINAL;
return 0;
}
......
......@@ -2,18 +2,16 @@
#include <stdlib.h>
#include <stdio.h>
#include "os.h"
#include "tudf.h"
void udf1(int8_t step, char *state, int32_t stateSize, SUdfDataBlock input,
char **newState, int32_t *newStateSize, SUdfDataBlock *output) {
fprintf(stdout, "%s, step:%d\n", "udf function called", step);
char *newStateBuf = taosMemoryMalloc(stateSize);
char *newStateBuf = malloc(stateSize);
memcpy(newStateBuf, state, stateSize);
*newState = newStateBuf;
*newStateSize = stateSize;
char *outputBuf = taosMemoryMalloc(input.size);
char *outputBuf = malloc(input.size);
memcpy(outputBuf, input.data, input.size);
output->data = outputBuf;
output->size = input.size;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册