diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index dc539ac15ef5a42432916a5e0e38026fb8674b79..bd08eda9541173efac8cd9464e39de3b799eab9e 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -280,11 +280,11 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.retryMaxInterval = tsRedirectMaxPeriod; rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; - rpcInit.failFastInterval = 1000; // interval threshold(ms) + rpcInit.failFastInterval = 5000; // interval threshold(ms) rpcInit.failFastThreshold = 3; // failed threshold rpcInit.ffp = dmFailFastFp; - rpcInit.connLimit = 7500; + rpcInit.connLimit = 3000; pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index dfbc8a5af20dc2ccb8a0566789908380b720c92a..4d7b6b5b2b216e66062e97ee716dbbf0b3cbb89e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -40,9 +40,8 @@ typedef struct SCliConn { bool broken; // link broken or not ConnStatus status; // - int64_t refId; - char* ip; - uint32_t port; + int64_t refId; + char* ip; SDelayTask* task; @@ -86,6 +85,7 @@ typedef struct SCliThrd { SCvtAddr cvtAddr; SHashObj* failFastCache; + SHashObj* connLimit; SCliMsg* stopMsg; @@ -570,10 +570,8 @@ static void addConnToPool(void* pool, SCliConn* conn) { conn->status = ConnInPool; if (conn->list == NULL) { - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port); tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); - conn->list = taosHashGet((SHashObj*)pool, key, strlen(key)); + conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip)); } else { tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); } @@ -751,6 +749,11 @@ static void cliDestroy(uv_handle_t* handle) { tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); transReqQueueClear(&conn->wreqQueue); transDestroyBuffer(&conn->readBuf); + + int32_t* oVal = taosHashGet(pThrd->connLimit, conn->ip, strlen(conn->ip)); + int32_t nVal = oVal == NULL ? 0 : (*oVal) - 1; + taosHashPut(pThrd->connLimit, conn->ip, strlen(conn->ip), &nVal, sizeof(nVal)); + taosMemoryFree(conn); } static bool cliHandleNoResp(SCliConn* conn) { @@ -892,8 +895,8 @@ static void cliHandleFastFail(SCliConn* pConn, int status) { SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); STraceId* trace = &pMsg->msg.info.traceId; - tGError("%s msg %s failed to send, conn %p failed to connect to %s:%d, reason: %s", CONN_GET_INST_LABEL(pConn), - pMsg ? TMSG_INFO(pMsg->msg.msgType) : 0, pConn, pConn->ip, pConn->port, uv_strerror(status)); + tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), + pMsg ? TMSG_INFO(pMsg->msg.msgType) : 0, pConn, pConn->ip, uv_strerror(status)); uv_timer_stop(pConn->timer); pConn->timer->data = NULL; taosArrayPush(pThrd->timerList, &pConn->timer); @@ -901,12 +904,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) { if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { - char* ip = pConn->ip; - uint32_t port = pConn->port; - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, ip, port); - - SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); + SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip)); int64_t cTimestamp = taosGetTimestampMs(); if (item != NULL) { int32_t elapse = cTimestamp - item->timestamp; @@ -918,7 +916,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) { } } else { SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; - taosHashPut(pThrd->failFastCache, key, strlen(key), &item, sizeof(SFailFastItem)); + taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem)); } } cliHandleExcept(pConn); @@ -931,9 +929,13 @@ void cliConnCb(uv_connect_t* req, int status) { cliHandleFastFail(pConn, status); return; } - struct sockaddr peername, sockname; - int addrlen = sizeof(peername); + int32_t* oVal = taosHashGet(pThrd->connLimit, pConn->ip, strlen(pConn->ip)); + int32_t nVal = oVal == NULL ? 0 : (*oVal) + 1; + taosHashPut(pThrd->connLimit, pConn->ip, strlen(pConn->ip), &nVal, sizeof(nVal)); + + struct sockaddr peername, sockname; + int addrlen = sizeof(peername); uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen); transSockInfo2Str(&peername, pConn->dst); @@ -1068,6 +1070,24 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { return; } +static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, SCliMsg* pMsg) { + STrans* pTransInst = pThrd->pTransInst; + + STransConnCtx* pCtx = pMsg->ctx; + char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); + int32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + + char key[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, ip, port); + + int32_t* val = taosHashGet(pThrd->connLimit, key, strlen(key)); + if (val == NULL) return 0; + + if (*val >= pTransInst->connLimit) { + return -1; + } + return 0; +} void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { STrans* pTransInst = pThrd->pTransInst; STransConnCtx* pCtx = pMsg->ctx; @@ -1091,7 +1111,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { if (item != NULL) { int32_t elapse = (int32_t)(taosGetTimestampMs() - item->timestamp); if (item->count >= pTransInst->failFastThreshold && (elapse >= 0 && elapse <= pTransInst->failFastInterval)) { - STraceId* trace = &(pMsg->msg.info.traceId); tGTrace("%s, msg %s cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label, TMSG_INFO(pMsg->msg.msgType), ip, port, item->count, elapse); destroyCmsg(pMsg); @@ -1113,6 +1132,13 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { return; } + if (conn == NULL && REQUEST_NO_RESP(&pMsg->msg) && 0 != cliPreCheckSessionLimit(pThrd, pMsg)) { + tGTrace("%s, msg %s cancel to send, reason: %s", pTransInst->label, TMSG_INFO(pMsg->msg.msgType), + tstrerror(TSDB_CODE_RPC_MAX_SESSIONS)); + destroyCmsg(pMsg); + return; + } + if (conn != NULL) { transCtxMerge(&conn->ctx, &pCtx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); @@ -1126,10 +1152,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { transCtxMerge(&conn->ctx, &pCtx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); - conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet)); - conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + char key[TSDB_FQDN_LEN + 64] = {0}; + char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); + uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + CONN_CONSTRUCT_HASH_KEY(key, ip, port); + + conn->ip = strdup(key); - uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip); + uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, EPSET_GET_INUSE_IP(&pCtx->epSet)); if (ipaddr == 0xffffffff) { uv_timer_stop(conn->timer); conn->timer->data = NULL; @@ -1143,9 +1173,9 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_addr.s_addr = ipaddr; - addr.sin_port = (uint16_t)htons((uint16_t)conn->port); + addr.sin_port = (uint16_t)htons(port); - tGTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port); + tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip); int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4); if (fd == -1) { tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, @@ -1199,7 +1229,6 @@ static void cliAsyncCb(uv_async_t* handle) { if (count >= 2) { tTrace("cli process batch size:%d", count); } - // if (!uv_is_active((uv_handle_t*)pThrd->prepare)) uv_prepare_start(pThrd->prepare, cliPrepareCb); if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd); } @@ -1412,6 +1441,7 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pThrd->connLimit = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->quit = false; return pThrd; @@ -1440,6 +1470,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(pThrd->loop); taosHashCleanup(pThrd->fqdn2ipCache); taosHashCleanup(pThrd->failFastCache); + taosHashCleanup(pThrd->connLimit); taosMemoryFree(pThrd); } @@ -1864,13 +1895,6 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran return TSDB_CODE_RPC_BROKEN_LINK; } - // read only - if (pTransInst->connLimit != 0 && atomic_load_32(&pThrd->connCount) >= pTransInst->connLimit) { - transFreeMsg(pReq->pCont); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return TSDB_CODE_RPC_MAX_SESSIONS; - } - TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); @@ -1912,13 +1936,6 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return TSDB_CODE_RPC_BROKEN_LINK; } - // not limit sync req - // read only - // if (pTransInst->connLimit != 0 && atomic_load_32(&pThrd->connCount) >= pTransInst->connLimit) { - // transFreeMsg(pReq->pCont); - // transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - // return TSDB_CODE_RPC_MAX_SESSIONS; - //} tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); tsem_init(sem, 0, 0);