未验证 提交 f41bd569 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #11988 from taosdata/feature/refator_retry

refactor(rpc): fefactor retry way
...@@ -348,14 +348,13 @@ typedef struct SDelayQueue { ...@@ -348,14 +348,13 @@ typedef struct SDelayQueue {
uv_timer_t* timer; uv_timer_t* timer;
Heap* heap; Heap* heap;
uv_loop_t* loop; uv_loop_t* loop;
void (*free)(void* arg);
} SDelayQueue; } SDelayQueue;
int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue); int transDQCreate(uv_loop_t* loop, SDelayQueue** queue);
void transDestroyDelayQueue(SDelayQueue* queue); void transDQDestroy(SDelayQueue* queue);
int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
/* /*
* init global func * init global func
......
...@@ -842,7 +842,7 @@ static SCliThrdObj* createThrdObj() { ...@@ -842,7 +842,7 @@ static SCliThrdObj* createThrdObj() {
pThrd->pool = createConnPool(4); pThrd->pool = createConnPool(4);
transCreateDelayQueue(pThrd->loop, &pThrd->delayQueue); transDQCreate(pThrd->loop, &pThrd->delayQueue);
pThrd->quit = false; pThrd->quit = false;
return pThrd; return pThrd;
...@@ -857,7 +857,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { ...@@ -857,7 +857,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
taosThreadMutexDestroy(&pThrd->msgMtx); taosThreadMutexDestroy(&pThrd->msgMtx);
transDestroyAsyncPool(pThrd->asyncPool); transDestroyAsyncPool(pThrd->asyncPool);
transDestroyDelayQueue(pThrd->delayQueue); transDQDestroy(pThrd->delayQueue);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
} }
...@@ -923,14 +923,14 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -923,14 +923,14 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg; arg->param1 = pMsg;
arg->param2 = pThrd; arg->param2 = pThrd;
transPutTaskToDelayQueue(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
cliDestroy((uv_handle_t*)pConn->stream); cliDestroy((uv_handle_t*)pConn->stream);
return -1; return -1;
} }
} else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) { } else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {
if (pResp->contLen == 0) { if (pResp->contLen == 0) {
pEpSet->inUse = (pEpSet->inUse++) % pEpSet->numOfEps; pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
} else { } else {
SMEpSet emsg = {0}; SMEpSet emsg = {0};
tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg); tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg);
...@@ -940,7 +940,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -940,7 +940,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
arg->param1 = pMsg; arg->param1 = pMsg;
arg->param2 = pThrd; arg->param2 = pThrd;
transPutTaskToDelayQueue(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
addConnToPool(pThrd, pConn); addConnToPool(pThrd, pConn);
return -1; return -1;
} }
......
...@@ -369,7 +369,7 @@ static int32_t timeCompare(const HeapNode* a, const HeapNode* b) { ...@@ -369,7 +369,7 @@ static int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
} }
} }
static void transDelayQueueTimeout(uv_timer_t* timer) { static void transDQTimeout(uv_timer_t* timer) {
SDelayQueue* queue = timer->data; SDelayQueue* queue = timer->data;
tTrace("timer %p timeout", timer); tTrace("timer %p timeout", timer);
uint64_t timeout = 0; uint64_t timeout = 0;
...@@ -388,10 +388,10 @@ static void transDelayQueueTimeout(uv_timer_t* timer) { ...@@ -388,10 +388,10 @@ static void transDelayQueueTimeout(uv_timer_t* timer) {
} }
} while (1); } while (1);
if (timeout != 0) { if (timeout != 0) {
uv_timer_start(queue->timer, transDelayQueueTimeout, timeout, 0); uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
} }
} }
int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) { int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) {
uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
uv_timer_init(loop, timer); uv_timer_init(loop, timer);
...@@ -407,7 +407,7 @@ int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) { ...@@ -407,7 +407,7 @@ int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) {
return 0; return 0;
} }
void transDestroyDelayQueue(SDelayQueue* queue) { void transDQDestroy(SDelayQueue* queue) {
taosMemoryFree(queue->timer); taosMemoryFree(queue->timer);
while (heapSize(queue->heap) > 0) { while (heapSize(queue->heap) > 0) {
...@@ -424,19 +424,15 @@ void transDestroyDelayQueue(SDelayQueue* queue) { ...@@ -424,19 +424,15 @@ void transDestroyDelayQueue(SDelayQueue* queue) {
taosMemoryFree(queue); taosMemoryFree(queue);
} }
int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask)); SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
task->func = func; task->func = func;
task->arg = arg; task->arg = arg;
task->execTime = taosGetTimestampMs() + timeoutMs; task->execTime = taosGetTimestampMs() + timeoutMs;
tTrace("timer %p put task into queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs); tTrace("timer %p put task into queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs);
heapInsert(queue->heap, &task->node); heapInsert(queue->heap, &task->node);
if (heapSize(queue->heap) == 1) { uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0);
uv_timer_start(queue->timer, transDelayQueueTimeout, timeoutMs, 0);
}
return 0; return 0;
} }
#endif #endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册