From 50f148abac116102f6cd517e8cd337097fa40ba9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 11 Nov 2022 16:04:20 +0800 Subject: [PATCH] conn timeout refactor --- source/libs/transport/src/transCli.c | 52 +++++++++++++++++++++++----- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 3af0747bcf..a5955beaca 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -27,7 +27,6 @@ typedef struct SCliConn { queue wreqQueue; uv_timer_t* timer; // read timer, forbidden - uv_timer_t connTimer; void* hostThrd; @@ -468,8 +467,15 @@ void cliHandleExcept(SCliConn* conn) { void cliConnTimeout(uv_timer_t* handle) { SCliConn* conn = handle->data; + SCliThrd* pThrd = conn->hostThrd; + tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); + uv_timer_stop(handle); + handle->data = NULL; + taosArrayPush(pThrd->timerList, &conn->timer); + + conn->timer = NULL; cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT); } void cliReadTimeoutCb(uv_timer_t* handle) { @@ -648,8 +654,14 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; - uv_timer_init(pThrd->loop, &conn->connTimer); - conn->connTimer.data = conn; + uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; + if (timer == NULL) { + timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + tDebug("no available timer, create a timer %p", timer); + uv_timer_init(pThrd->loop, timer); + } + timer->data = conn; + conn->timer = timer; conn->connReq.data = conn; transReqQueueInit(&conn->wreqQueue); @@ -831,7 +843,12 @@ _RETURN: void cliConnCb(uv_connect_t* req, int status) { // impl later SCliConn* pConn = req->data; - uv_timer_stop(&pConn->connTimer); + SCliThrd* pThrd = pConn->hostThrd; + + uv_timer_stop(pConn->timer); + pConn->timer->data = NULL; + taosArrayPush(pThrd->timerList, &pConn->timer); + pConn->timer = NULL; if (status != 0) { tError("%s conn %p failed to connect server:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); @@ -1021,11 +1038,16 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { if (ret != 0) { tTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port, uv_err_name(ret)); - uv_timer_stop(&conn->connTimer); + + uv_timer_stop(conn->timer); + conn->timer->data = NULL; + taosArrayPush(pThrd->timerList, &conn->timer); + conn->timer = NULL; + cliHandleExcept(conn); return; } - uv_timer_start(&conn->connTimer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); + uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); } STraceId* trace = &pMsg->msg.info.traceId; tGTrace("%s conn %p ready", pTransInst->label, conn); @@ -1148,6 +1170,8 @@ static void* cliWorkThread(void* arg) { pThrd->pid = taosGetSelfPthreadId(); setThreadName("trans-cli-work"); uv_run(pThrd->loop, UV_RUN_DEFAULT); + + tDebug("thread quit-thread:%08" PRId64, pThrd->pid); return NULL; } @@ -1207,7 +1231,7 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->prepare->data = pThrd; // uv_prepare_start(pThrd->prepare, cliPrepareCb); - int32_t timerSize = 512; + int32_t timerSize = 64; pThrd->timerList = taosArrayInit(timerSize, sizeof(void*)); for (int i = 0; i < timerSize; i++) { uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); @@ -1241,6 +1265,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { transDQDestroy(pThrd->delayQueue, destroyCmsg); transDQDestroy(pThrd->timeoutQueue, NULL); + tDebug("thread destroy %" PRId64, pThrd->pid); for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) { uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i); taosMemoryFree(timer); @@ -1266,7 +1291,18 @@ void cliSendQuit(SCliThrd* thrd) { } void cliWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { - uv_read_stop((uv_stream_t*)handle); + if (uv_handle_get_type(handle) == UV_TIMER) { + // SCliConn* pConn = handle->data; + // if (pConn != NULL && pConn->timer != NULL) { + // SCliThrd* pThrd = pConn->hostThrd; + // uv_timer_stop((uv_timer_t*)handle); + // handle->data = NULL; + // taosArrayPush(pThrd->timerList, &pConn->timer); + // pConn->timer = NULL; + // } + } else { + uv_read_stop((uv_stream_t*)handle); + } uv_close(handle, cliDestroy); } } -- GitLab