提交 517e587c 编写于 作者: S shenglian zhou

udfd exit and restart processing

上级 708668d0
......@@ -22,6 +22,11 @@ extern "C" {
//======================================================================================
//begin API to taosd and qworker
enum {
UDFC_CODE_STOPPING = -1,
UDFC_CODE_RESTARTING = -2,
};
/**
* start udf dameon service
* @return error code
......
......@@ -22,9 +22,9 @@
//TODO: udfd restart when exist or aborts
//TODO: network error processing.
//TODO: add unit test
//TODO: add lua support
void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
int32_t destructUdfService();
int32_t constructUdfService();
enum {
UV_TASK_CONNECT = 0,
UV_TASK_REQ_RSP = 1,
......@@ -103,14 +103,22 @@ uv_async_t gUdfLoopStopAsync;
uv_mutex_t gUdfTaskQueueMutex;
int64_t gUdfTaskSeqNum = 0;
enum {
UDFC_STATE_INITAL = 0, // initial state
UDFC_STATE_STARTNG, // starting after startUdfService
UDFC_STATE_READY, // started and begin to receive quests
UDFC_STATE_RESTARTING, // udfd abnormal exit. cleaning up and restart.
UDFC_STATE_STOPPING, // stopping after stopUdfService
UDFC_STATUS_FINAL, // stopped
};
int8_t gUdfcState = UDFC_STATE_INITAL;
//double circular linked list
typedef SClientUvTaskNode *SClientUvTaskQueue;
SClientUvTaskNode gUdfQueueNode;
SClientUvTaskQueue gUdfTaskQueue = &gUdfQueueNode;
//add SClientUvTaskNode task that close conn
//TODO: deal with uv task that has been started and then udfd core dumped
void udfTaskQueueInit(SClientUvTaskQueue q) {
q->next = q;
......@@ -465,6 +473,18 @@ void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal);
uv_close((uv_handle_t *) req, NULL);
//TODO: restart the udfd process
if (gUdfcState == UDFC_STATE_STOPPING) {
if (term_signal != SIGINT) {
//TODO: log error
}
}
if (gUdfcState == UDFC_STATE_READY) {
gUdfcState = UDFC_STATE_RESTARTING;
//TODO: asynchronous without blocking. how to do it
destructUdfService();
constructUdfService();
}
}
void onUdfcPipeClose(uv_handle_t *handle) {
......@@ -602,7 +622,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
udfTaskQueueRemoveTask(taskFound);
uv_sem_post(&taskFound->taskSem);
} else {
//LOG error
//TODO: LOG error
}
connBuf->buf = NULL;
connBuf->total = -1;
......@@ -777,10 +797,32 @@ void udfClientAsyncCb(uv_async_t *async) {
}
void udfStopAsyncCb(uv_async_t *async) {
SClientUvTaskNode node;
SClientUvTaskQueue q = &node;
udfTaskQueueInit(q);
uv_mutex_lock(&gUdfTaskQueueMutex);
udfTaskQueueMove(gUdfTaskQueue, q);
uv_mutex_unlock(&gUdfTaskQueueMutex);
while (!udfTaskQueueIsEmpty(q)) {
SClientUvTaskNode *task = udfTaskQueueHeadTask(q);
udfTaskQueueRemoveTask(task);
if (gUdfcState == UDFC_STATE_STOPPING) {
task->errCode = UDFC_CODE_STOPPING;
} else if (gUdfcState == UDFC_STATE_RESTARTING) {
task->errCode = UDFC_CODE_RESTARTING;
}
uv_sem_post(&task->taskSem);
}
// TODO: deal with tasks that are waiting result.
uv_stop(&gUdfdLoop);
uv_loop_close(&gUdfdLoop);
}
void startUdfd(void *argsThread) {
uv_loop_init(&gUdfdLoop);
......@@ -805,25 +847,43 @@ void startUdfd(void *argsThread) {
uv_mutex_init(&gUdfTaskQueueMutex);
udfTaskQueueInit(gUdfTaskQueue);
uv_barrier_wait(&gUdfInitBarrier);
//TODO return value of uv_run
uv_run(&gUdfdLoop, UV_RUN_DEFAULT);
uv_loop_close(&gUdfdLoop);
}
int32_t startUdfService() {
int32_t constructUdfService() {
uv_barrier_init(&gUdfInitBarrier, 2);
uv_thread_create(&gUdfLoopThread, startUdfd, 0);
uv_barrier_wait(&gUdfInitBarrier);
return 0;
}
int32_t stopUdfService() {
int32_t startUdfService() {
gUdfcState = UDFC_STATE_STARTNG;
constructUdfService();
gUdfcState = UDFC_STATE_READY;
return 0;
}
int32_t destructUdfService() {
uv_barrier_destroy(&gUdfInitBarrier);
uv_process_kill(&gUdfdProcess, SIGINT);
if (gUdfcState == UDFC_STATE_STOPPING) {
uv_process_kill(&gUdfdProcess, SIGINT);
}
uv_async_send(&gUdfLoopStopAsync);
uv_mutex_destroy(&gUdfTaskQueueMutex);
uv_thread_join(&gUdfLoopThread);
return 0;
}
int32_t stopUdfService() {
gUdfcState = UDFC_STATE_STOPPING;
destructUdfService();
gUdfcState = UDFC_STATUS_FINAL;
return 0;
}
int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) {
SClientUvTaskNode *uvTask = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册