From 607c042a2fffedbc4521a5c8da886cd008cc847b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 22 Jul 2022 15:40:35 +0800 Subject: [PATCH] fix rpc code --- source/libs/transport/inc/transComm.h | 1 + source/libs/transport/src/transCli.c | 33 +++++++++++++++-- source/libs/transport/src/transComm.c | 11 ++++-- source/libs/transport/src/transSvr.c | 51 +++++++++++++-------------- 4 files changed, 66 insertions(+), 30 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 3fa6344009..9fd4e483c2 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -396,6 +396,7 @@ typedef struct SDelayQueue { int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)); SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); +void transDQCancel(SDelayQueue* queue, SDelayTask* task); bool transEpSetIsEqual(SEpSet* a, SEpSet* b); /* diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9de8c273d9..ddb8f4b9a3 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -37,6 +37,7 @@ typedef struct SCliConn { char* ip; uint32_t port; + SDelayTask* task; // debug and log info struct sockaddr_in addr; struct sockaddr_in localAddr; @@ -65,6 +66,7 @@ typedef struct SCliThrd { queue msg; TdThreadMutex msgMtx; SDelayQueue* delayQueue; + SDelayQueue* timeoutQueue; uint64_t nextTimeout; // next timeout void* pTransInst; // @@ -92,6 +94,7 @@ static void* createConnPool(int size); static void* destroyConnPool(void* pool); static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); static void addConnToPool(void* pool, SCliConn* conn); +static void doCloseIdleConn(void* param); // register timer in each thread to clear expire conn static void cliTimeoutCb(uv_timer_t* handle); @@ -184,7 +187,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { pThrd = (SCliThrd*)(exh)->pThrd; \ } \ } while (0) -#define CONN_PERSIST_TIME(para) (para * 1000 * 10) +#define CONN_PERSIST_TIME(para) (para * 20) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_SHOULD_RELEASE(conn, head) \ @@ -506,6 +509,10 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); assert(h == &conn->q); + + transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); + conn->task = NULL; + return conn; } static int32_t allocConnRef(SCliConn* conn, bool update) { @@ -537,6 +544,7 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { transReleaseExHandle(transGetRefMgt(), handle); return 0; } + static void addConnToPool(void* pool, SCliConn* conn) { if (conn->status == ConnInPool) { return; @@ -562,7 +570,14 @@ static void addConnToPool(void* pool, SCliConn* conn) { assert(plist != NULL); QUEUE_INIT(&conn->q); QUEUE_PUSH(&plist->conn, &conn->q); + assert(!QUEUE_IS_EMPTY(&plist->conn)); + + STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); + arg->param1 = conn; + arg->param2 = thrd; + + conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); } static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; @@ -631,6 +646,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { transRemoveExHandle(transGetRefMgt(), conn->refId); conn->refId = -1; + if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); + if (clear) { if (!uv_is_closing((uv_handle_t*)conn->stream)) { uv_read_stop(conn->stream); @@ -997,6 +1014,8 @@ static SCliThrd* createThrdObj() { pThrd->pool = createConnPool(4); transDQCreate(pThrd->loop, &pThrd->delayQueue); + transDQCreate(pThrd->loop, &pThrd->timeoutQueue); + pThrd->quit = false; return pThrd; } @@ -1012,6 +1031,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { transAsyncPoolDestroy(pThrd->asyncPool); transDQDestroy(pThrd->delayQueue, destroyCmsg); + transDQDestroy(pThrd->timeoutQueue, NULL); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } @@ -1058,6 +1078,9 @@ static void doCloseIdleConn(void* param) { STaskArg* arg = param; SCliConn* conn = arg->param1; SCliThrd* pThrd = arg->param2; + + cliDestroyConn(conn, true); + taosMemoryFree(arg); } static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { @@ -1248,11 +1271,17 @@ int transReleaseCliHandle(void* handle) { if (pThrd == NULL) { return -1; } + STransMsg tmsg = {.info.handle = handle}; - SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); + + SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cmsg->msg = tmsg; cmsg->type = Release; + STraceId* trace = &tmsg.info.traceId; + tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid); + if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) { return -1; } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 34849df2b2..6ac75a75e1 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -480,7 +480,7 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { SDelayTask* task = container_of(minNode, SDelayTask, node); STaskArg* arg = task->arg; - freeFunc(arg->param1); + if (freeFunc) freeFunc(arg->param1); taosMemoryFree(arg); taosMemoryFree(task); @@ -491,9 +491,16 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { void transDQCancel(SDelayQueue* queue, SDelayTask* task) { uv_timer_stop(queue->timer); - if (heapSize(queue->heap) <= 0) return; + if (heapSize(queue->heap) <= 0) { + taosMemoryFree(task->arg); + taosMemoryFree(task); + return; + } heapRemove(queue->heap, &task->node); + taosMemoryFree(task->arg); + taosMemoryFree(task); + if (heapSize(queue->heap) != 0) { HeapNode* minNode = heapMin(queue->heap); if (minNode != NULL) return; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 3fb947bdba..0512765619 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -149,32 +149,31 @@ static void* transAcceptThread(void* arg); static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName); static bool addHandleToAcceptloop(void* arg); -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - conn->status = ConnRelease; \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - tTrace("conn %p received release request", conn); \ - \ - STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.ahandle = NULL}; \ - SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \ - srvMsg->msg = tmsg; \ - srvMsg->type = Release; \ - srvMsg->pConn = conn; \ - reallocConnRef(conn); \ - if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ - return; \ - } \ - if (conn->regArg.init) { \ - tTrace("conn %p release, notify server app", conn); \ - STrans* pTransInst = conn->pTransInst; \ - (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \ - memset(&conn->regArg, 0, sizeof(conn->regArg)); \ - } \ - uvStartSendRespInternal(srvMsg); \ - return; \ - } \ +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + conn->status = ConnRelease; \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + tTrace("conn %p received release request", conn); \ + STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.traceId = head->traceId, .info.ahandle = NULL}; \ + SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \ + srvMsg->msg = tmsg; \ + srvMsg->type = Release; \ + srvMsg->pConn = conn; \ + reallocConnRef(conn); \ + if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ + return; \ + } \ + if (conn->regArg.init) { \ + tTrace("conn %p release, notify server app", conn); \ + STrans* pTransInst = conn->pTransInst; \ + (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \ + memset(&conn->regArg, 0, sizeof(conn->regArg)); \ + } \ + uvStartSendRespInternal(srvMsg); \ + return; \ + } \ } while (0) #define SRV_RELEASE_UV(loop) \ -- GitLab