From 9af1206cbdb84a20a83b3944a3856c5840e05cd2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 28 Apr 2022 11:56:00 +0800 Subject: [PATCH] refactor(rpc): fefactor retry way --- source/libs/transport/inc/transComm.h | 8 ++++ source/libs/transport/src/transCli.c | 38 ++++++++++++++----- source/libs/transport/src/transComm.c | 54 ++++++++++++++++----------- 3 files changed, 70 insertions(+), 30 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index db6b3daf98..aa3c27e537 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -329,6 +329,14 @@ void transQueueClear(STransQueue* queue); */ void transQueueDestroy(STransQueue* queue); +/* + * delay queue based on uv loop and uv timer, and only used in retry + */ +typedef struct STaskArg { + void* param1; + void* param2; +} STaskArg; + typedef struct SDelayTask { void (*func)(void* arg); void* arg; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 20406763de..842093c579 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -60,10 +60,10 @@ typedef struct SCliThrdObj { // msg queue queue msg; TdThreadMutex msgMtx; - - uint64_t nextTimeout; // next timeout - void* pTransInst; // - bool quit; + SDelayQueue* delayQueue; + uint64_t nextTimeout; // next timeout + void* pTransInst; // + bool quit; } SCliThrdObj; typedef struct SCliObj { @@ -838,12 +838,13 @@ static SCliThrdObj* createThrdObj() { uv_loop_init(pThrd->loop); pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb); - uv_timer_init(pThrd->loop, &pThrd->timer); pThrd->timer.data = pThrd; pThrd->pool = createConnPool(4); + transCreateDelayQueue(pThrd->loop, &pThrd->delayQueue); + pThrd->quit = false; return pThrd; } @@ -851,12 +852,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { if (pThrd == NULL) { return; } + taosThreadJoin(pThrd->thread, NULL); CLI_RELEASE_UV(pThrd->loop); taosThreadMutexDestroy(&pThrd->msgMtx); transDestroyAsyncPool(pThrd->asyncPool); - uv_timer_stop(&pThrd->timer); + transDestroyDelayQueue(pThrd->delayQueue); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } @@ -885,6 +887,16 @@ int cliRBChoseIdx(STrans* pTransInst) { } return index % pTransInst->numOfThreads; } +static void doDelayTask(void* param) { + STaskArg* arg = param; + + SCliMsg* pMsg = arg->param1; + SCliThrdObj* pThrd = arg->param2; + + cliHandleReq(pMsg, pThrd); + + taosMemoryFree(arg); +} int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { SCliThrdObj* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; @@ -908,7 +920,12 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { if (msgType == TDMT_MND_CONNECT && pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (pCtx->retryCount < pEpSet->numOfEps) { pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps; - cliHandleReq(pMsg, pThrd); + + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + arg->param1 = pMsg; + arg->param2 = pThrd; + transPutTaskToDelayQueue(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); + cliDestroy((uv_handle_t*)pConn->stream); return -1; } @@ -920,8 +937,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg); pCtx->epSet = emsg.epSet; } - cliHandleReq(pMsg, pThrd); - // release pConn + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + arg->param1 = pMsg; + arg->param2 = pThrd; + + transPutTaskToDelayQueue(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); addConnToPool(pThrd, pConn); return -1; } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 00816eb709..4d049089ad 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -363,7 +363,7 @@ static int32_t timeCompare(const HeapNode* a, const HeapNode* b) { SDelayTask* arg1 = container_of(a, SDelayTask, node); SDelayTask* arg2 = container_of(b, SDelayTask, node); if (arg1->execTime > arg2->execTime) { - return -1; + return 0; } else { return 1; } @@ -371,23 +371,25 @@ static int32_t timeCompare(const HeapNode* a, const HeapNode* b) { static void transDelayQueueTimeout(uv_timer_t* timer) { SDelayQueue* queue = timer->data; - HeapNode* node = heapMin(queue->heap); - if (node == NULL) { - // DO NOTHING - } - heapRemove(queue->heap, node); - - SDelayTask* task = container_of(node, SDelayTask, node); - task->func(task->arg); - taosMemoryFree(task); - - node = heapMin(queue->heap); - if (node == NULL) { - return; + tTrace("timer %p timeout", timer); + uint64_t timeout = 0; + do { + HeapNode* minNode = heapMin(queue->heap); + if (minNode == NULL) break; + SDelayTask* task = container_of(minNode, SDelayTask, node); + if (task->execTime <= taosGetTimestampMs()) { + heapRemove(queue->heap, minNode); + task->func(task->arg); + taosMemoryFree(task); + timeout = 0; + } else { + timeout = task->execTime - taosGetTimestampMs(); + break; + } + } while (1); + if (timeout != 0) { + uv_timer_start(queue->timer, transDelayQueueTimeout, timeout, 0); } - task = container_of(node, SDelayTask, node); - uint64_t timeout = task->execTime > uv_now(queue->loop) ? task->execTime - uv_now(queue->loop) : 0; - uv_timer_start(queue->timer, transDelayQueueTimeout, timeout, 0); } int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) { uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); @@ -406,8 +408,18 @@ int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) { } void transDestroyDelayQueue(SDelayQueue* queue) { - uv_timer_stop(queue->timer); taosMemoryFree(queue->timer); + + while (heapSize(queue->heap) > 0) { + HeapNode* minNode = heapMin(queue->heap); + if (minNode == NULL) { + return; + } + heapRemove(queue->heap, minNode); + + SDelayTask* task = container_of(minNode, SDelayTask, node); + taosMemoryFree(task); + } heapDestroy(queue->heap); taosMemoryFree(queue); } @@ -417,11 +429,11 @@ int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* task->func = func; task->arg = arg; - task->execTime = uv_now(queue->loop) + timeoutMs; + task->execTime = taosGetTimestampMs() + timeoutMs; - int size = heapSize(queue->heap); + tTrace("timer %p put task into queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs); heapInsert(queue->heap, &task->node); - if (size == 1) { + if (heapSize(queue->heap) == 1) { uv_timer_start(queue->timer, transDelayQueueTimeout, timeoutMs, 0); } -- GitLab