提交 46d4bf90 编写于 作者: dengyihao's avatar dengyihao

refactor rpc code

上级 67d99c9c
...@@ -393,9 +393,9 @@ typedef struct SDelayQueue { ...@@ -393,9 +393,9 @@ typedef struct SDelayQueue {
uv_loop_t* loop; uv_loop_t* loop;
} SDelayQueue; } SDelayQueue;
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); int transDQCreate(uv_loop_t* loop, SDelayQueue** queue);
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)); void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg));
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
bool transEpSetIsEqual(SEpSet* a, SEpSet* b); bool transEpSetIsEqual(SEpSet* a, SEpSet* b);
/* /*
......
...@@ -1054,6 +1054,12 @@ static void doDelayTask(void* param) { ...@@ -1054,6 +1054,12 @@ static void doDelayTask(void* param) {
cliHandleReq(pMsg, pThrd); cliHandleReq(pMsg, pThrd);
} }
static void doCloseIdleConn(void* param) {
STaskArg* arg = param;
SCliConn* conn = arg->param1;
SCliThrd* pThrd = arg->param2;
}
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
...@@ -1075,7 +1081,7 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { ...@@ -1075,7 +1081,7 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
} }
} }
bool cliTryToExtractEpSet(STransMsg* pResp, SEpSet* dst) { bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
if ((pResp == NULL || pResp->info.hasEpSet == 0)) { if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
return false; return false;
} }
...@@ -1116,7 +1122,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -1116,7 +1122,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
*/ */
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
int32_t code = pResp->code; int32_t code = pResp->code;
bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false;
bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false;
if (retry) { if (retry) {
pMsg->sent = 0; pMsg->sent = 0;
pCtx->retryCnt += 1; pCtx->retryCnt += 1;
...@@ -1125,6 +1132,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -1125,6 +1132,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (pCtx->retryCnt < pCtx->retryLimit) { if (pCtx->retryCnt < pCtx->retryLimit) {
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
EPSET_FORWARD_INUSE(&pCtx->epSet); EPSET_FORWARD_INUSE(&pCtx->epSet);
transFreeMsg(pResp->pCont);
cliSchedMsgToNextNode(pMsg, pThrd); cliSchedMsgToNextNode(pMsg, pThrd);
return -1; return -1;
} }
...@@ -1148,7 +1156,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -1148,7 +1156,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
STraceId* trace = &pResp->info.traceId; STraceId* trace = &pResp->info.traceId;
bool hasEpSet = cliTryToExtractEpSet(pResp, &pCtx->epSet); bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
if (hasEpSet) { if (hasEpSet) {
char tbuf[256] = {0}; char tbuf[256] = {0};
EPSET_DEBUG_STR(&pCtx->epSet, tbuf); EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
......
...@@ -488,8 +488,25 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { ...@@ -488,8 +488,25 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
heapDestroy(queue->heap); heapDestroy(queue->heap);
taosMemoryFree(queue); taosMemoryFree(queue);
} }
void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
uv_timer_stop(queue->timer);
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { if (heapSize(queue->heap) <= 0) return;
heapRemove(queue->heap, &task->node);
if (heapSize(queue->heap) != 0) {
HeapNode* minNode = heapMin(queue->heap);
if (minNode != NULL) return;
uint64_t now = taosGetTimestampMs();
SDelayTask* task = container_of(minNode, SDelayTask, node);
uint64_t timeout = now > task->execTime ? now - task->execTime : 0;
uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
}
}
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
uint64_t now = taosGetTimestampMs(); uint64_t now = taosGetTimestampMs();
SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask)); SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
task->func = func; task->func = func;
...@@ -507,7 +524,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_ ...@@ -507,7 +524,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_
tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs); tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs);
heapInsert(queue->heap, &task->node); heapInsert(queue->heap, &task->node);
uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0); uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0);
return 0; return task;
} }
void transPrintEpSet(SEpSet* pEpSet) { void transPrintEpSet(SEpSet* pEpSet) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册