提交 9af1206c 编写于 作者: dengyihao's avatar dengyihao

refactor(rpc): fefactor retry way

上级 4e7e8339
...@@ -329,6 +329,14 @@ void transQueueClear(STransQueue* queue); ...@@ -329,6 +329,14 @@ void transQueueClear(STransQueue* queue);
*/ */
void transQueueDestroy(STransQueue* queue); void transQueueDestroy(STransQueue* queue);
/*
* delay queue based on uv loop and uv timer, and only used in retry
*/
typedef struct STaskArg {
void* param1;
void* param2;
} STaskArg;
typedef struct SDelayTask { typedef struct SDelayTask {
void (*func)(void* arg); void (*func)(void* arg);
void* arg; void* arg;
......
...@@ -60,10 +60,10 @@ typedef struct SCliThrdObj { ...@@ -60,10 +60,10 @@ typedef struct SCliThrdObj {
// msg queue // msg queue
queue msg; queue msg;
TdThreadMutex msgMtx; TdThreadMutex msgMtx;
SDelayQueue* delayQueue;
uint64_t nextTimeout; // next timeout uint64_t nextTimeout; // next timeout
void* pTransInst; // void* pTransInst; //
bool quit; bool quit;
} SCliThrdObj; } SCliThrdObj;
typedef struct SCliObj { typedef struct SCliObj {
...@@ -838,12 +838,13 @@ static SCliThrdObj* createThrdObj() { ...@@ -838,12 +838,13 @@ static SCliThrdObj* createThrdObj() {
uv_loop_init(pThrd->loop); uv_loop_init(pThrd->loop);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb); pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb);
uv_timer_init(pThrd->loop, &pThrd->timer); uv_timer_init(pThrd->loop, &pThrd->timer);
pThrd->timer.data = pThrd; pThrd->timer.data = pThrd;
pThrd->pool = createConnPool(4); pThrd->pool = createConnPool(4);
transCreateDelayQueue(pThrd->loop, &pThrd->delayQueue);
pThrd->quit = false; pThrd->quit = false;
return pThrd; return pThrd;
} }
...@@ -851,12 +852,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { ...@@ -851,12 +852,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
if (pThrd == NULL) { if (pThrd == NULL) {
return; return;
} }
taosThreadJoin(pThrd->thread, NULL); taosThreadJoin(pThrd->thread, NULL);
CLI_RELEASE_UV(pThrd->loop); CLI_RELEASE_UV(pThrd->loop);
taosThreadMutexDestroy(&pThrd->msgMtx); taosThreadMutexDestroy(&pThrd->msgMtx);
transDestroyAsyncPool(pThrd->asyncPool); transDestroyAsyncPool(pThrd->asyncPool);
uv_timer_stop(&pThrd->timer); transDestroyDelayQueue(pThrd->delayQueue);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
} }
...@@ -885,6 +887,16 @@ int cliRBChoseIdx(STrans* pTransInst) { ...@@ -885,6 +887,16 @@ int cliRBChoseIdx(STrans* pTransInst) {
} }
return index % pTransInst->numOfThreads; return index % pTransInst->numOfThreads;
} }
static void doDelayTask(void* param) {
STaskArg* arg = param;
SCliMsg* pMsg = arg->param1;
SCliThrdObj* pThrd = arg->param2;
cliHandleReq(pMsg, pThrd);
taosMemoryFree(arg);
}
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
SCliThrdObj* pThrd = pConn->hostThrd; SCliThrdObj* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
...@@ -908,7 +920,12 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -908,7 +920,12 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (msgType == TDMT_MND_CONNECT && pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (msgType == TDMT_MND_CONNECT && pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (pCtx->retryCount < pEpSet->numOfEps) { if (pCtx->retryCount < pEpSet->numOfEps) {
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps; pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
cliHandleReq(pMsg, pThrd);
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg;
arg->param2 = pThrd;
transPutTaskToDelayQueue(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
cliDestroy((uv_handle_t*)pConn->stream); cliDestroy((uv_handle_t*)pConn->stream);
return -1; return -1;
} }
...@@ -920,8 +937,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -920,8 +937,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg); tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg);
pCtx->epSet = emsg.epSet; pCtx->epSet = emsg.epSet;
} }
cliHandleReq(pMsg, pThrd); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
// release pConn arg->param1 = pMsg;
arg->param2 = pThrd;
transPutTaskToDelayQueue(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
addConnToPool(pThrd, pConn); addConnToPool(pThrd, pConn);
return -1; return -1;
} }
......
...@@ -363,7 +363,7 @@ static int32_t timeCompare(const HeapNode* a, const HeapNode* b) { ...@@ -363,7 +363,7 @@ static int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
SDelayTask* arg1 = container_of(a, SDelayTask, node); SDelayTask* arg1 = container_of(a, SDelayTask, node);
SDelayTask* arg2 = container_of(b, SDelayTask, node); SDelayTask* arg2 = container_of(b, SDelayTask, node);
if (arg1->execTime > arg2->execTime) { if (arg1->execTime > arg2->execTime) {
return -1; return 0;
} else { } else {
return 1; return 1;
} }
...@@ -371,23 +371,25 @@ static int32_t timeCompare(const HeapNode* a, const HeapNode* b) { ...@@ -371,23 +371,25 @@ static int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
static void transDelayQueueTimeout(uv_timer_t* timer) { static void transDelayQueueTimeout(uv_timer_t* timer) {
SDelayQueue* queue = timer->data; SDelayQueue* queue = timer->data;
HeapNode* node = heapMin(queue->heap); tTrace("timer %p timeout", timer);
if (node == NULL) { uint64_t timeout = 0;
// DO NOTHING do {
} HeapNode* minNode = heapMin(queue->heap);
heapRemove(queue->heap, node); if (minNode == NULL) break;
SDelayTask* task = container_of(minNode, SDelayTask, node);
SDelayTask* task = container_of(node, SDelayTask, node); if (task->execTime <= taosGetTimestampMs()) {
task->func(task->arg); heapRemove(queue->heap, minNode);
taosMemoryFree(task); task->func(task->arg);
taosMemoryFree(task);
node = heapMin(queue->heap); timeout = 0;
if (node == NULL) { } else {
return; timeout = task->execTime - taosGetTimestampMs();
break;
}
} while (1);
if (timeout != 0) {
uv_timer_start(queue->timer, transDelayQueueTimeout, timeout, 0);
} }
task = container_of(node, SDelayTask, node);
uint64_t timeout = task->execTime > uv_now(queue->loop) ? task->execTime - uv_now(queue->loop) : 0;
uv_timer_start(queue->timer, transDelayQueueTimeout, timeout, 0);
} }
int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) { int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) {
uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
...@@ -406,8 +408,18 @@ int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) { ...@@ -406,8 +408,18 @@ int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) {
} }
void transDestroyDelayQueue(SDelayQueue* queue) { void transDestroyDelayQueue(SDelayQueue* queue) {
uv_timer_stop(queue->timer);
taosMemoryFree(queue->timer); taosMemoryFree(queue->timer);
while (heapSize(queue->heap) > 0) {
HeapNode* minNode = heapMin(queue->heap);
if (minNode == NULL) {
return;
}
heapRemove(queue->heap, minNode);
SDelayTask* task = container_of(minNode, SDelayTask, node);
taosMemoryFree(task);
}
heapDestroy(queue->heap); heapDestroy(queue->heap);
taosMemoryFree(queue); taosMemoryFree(queue);
} }
...@@ -417,11 +429,11 @@ int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* ...@@ -417,11 +429,11 @@ int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void*
task->func = func; task->func = func;
task->arg = arg; task->arg = arg;
task->execTime = uv_now(queue->loop) + timeoutMs; task->execTime = taosGetTimestampMs() + timeoutMs;
int size = heapSize(queue->heap); tTrace("timer %p put task into queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs);
heapInsert(queue->heap, &task->node); heapInsert(queue->heap, &task->node);
if (size == 1) { if (heapSize(queue->heap) == 1) {
uv_timer_start(queue->timer, transDelayQueueTimeout, timeoutMs, 0); uv_timer_start(queue->timer, transDelayQueueTimeout, timeoutMs, 0);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册