diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index aa3c27e537c17a542263ada48aeecb90e41a2be4..21af35e8f768748c5ea7609d70f89fc2d9896f03 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -348,14 +348,13 @@ typedef struct SDelayQueue { uv_timer_t* timer; Heap* heap; uv_loop_t* loop; - void (*free)(void* arg); } SDelayQueue; -int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue); +int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); -void transDestroyDelayQueue(SDelayQueue* queue); +void transDQDestroy(SDelayQueue* queue); -int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); +int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); /* * init global func diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 842093c57906d04b435b567dbce468ff833e0a93..a303d09f24051b7f2afc6ce2808e56926696c6e9 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -843,7 +843,7 @@ static SCliThrdObj* createThrdObj() { pThrd->pool = createConnPool(4); - transCreateDelayQueue(pThrd->loop, &pThrd->delayQueue); + transDQCreate(pThrd->loop, &pThrd->delayQueue); pThrd->quit = false; return pThrd; @@ -858,7 +858,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { taosThreadMutexDestroy(&pThrd->msgMtx); transDestroyAsyncPool(pThrd->asyncPool); - transDestroyDelayQueue(pThrd->delayQueue); + transDQDestroy(pThrd->delayQueue); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } @@ -924,7 +924,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); arg->param1 = pMsg; arg->param2 = pThrd; - transPutTaskToDelayQueue(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); + transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); cliDestroy((uv_handle_t*)pConn->stream); return -1; @@ -941,7 +941,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { arg->param1 = pMsg; arg->param2 = pThrd; - transPutTaskToDelayQueue(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); + transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); addConnToPool(pThrd, pConn); return -1; } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 4d049089ada0abfcd8e5cb19aa5528cef6c9daaa..01a20a466ac7419c6b803fa4b171ecd9f5e8daea 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -369,7 +369,7 @@ static int32_t timeCompare(const HeapNode* a, const HeapNode* b) { } } -static void transDelayQueueTimeout(uv_timer_t* timer) { +static void transDQTimeout(uv_timer_t* timer) { SDelayQueue* queue = timer->data; tTrace("timer %p timeout", timer); uint64_t timeout = 0; @@ -388,10 +388,10 @@ static void transDelayQueueTimeout(uv_timer_t* timer) { } } while (1); if (timeout != 0) { - uv_timer_start(queue->timer, transDelayQueueTimeout, timeout, 0); + uv_timer_start(queue->timer, transDQTimeout, timeout, 0); } } -int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) { +int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) { uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); uv_timer_init(loop, timer); @@ -407,7 +407,7 @@ int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) { return 0; } -void transDestroyDelayQueue(SDelayQueue* queue) { +void transDQDestroy(SDelayQueue* queue) { taosMemoryFree(queue->timer); while (heapSize(queue->heap) > 0) { @@ -424,19 +424,15 @@ void transDestroyDelayQueue(SDelayQueue* queue) { taosMemoryFree(queue); } -int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { +int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask)); - task->func = func; task->arg = arg; task->execTime = taosGetTimestampMs() + timeoutMs; tTrace("timer %p put task into queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs); heapInsert(queue->heap, &task->node); - if (heapSize(queue->heap) == 1) { - uv_timer_start(queue->timer, transDelayQueueTimeout, timeoutMs, 0); - } - + uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0); return 0; } #endif