From f22d07319f36669143489e5d5e0d20e0a21d25c0 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 15 Feb 2023 20:06:04 +0800 Subject: [PATCH] handle too many session --- include/libs/transport/trpc.h | 3 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 3 +- source/libs/transport/inc/transportInt.h | 4 ++- source/libs/transport/src/trans.c | 3 +- source/libs/transport/src/transCli.c | 32 +++++++++++++------ 5 files changed, 32 insertions(+), 13 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index ff68b72fc2..5787f41772 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -112,7 +112,8 @@ typedef struct SRpcInit { // fail fast fp RpcFFfp ffp; - int32_t connLimit; + int32_t connLimitNum; + int32_t connLimitLock; void *parent; } SRpcInit; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index bd08eda954..d23e67b195 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -284,7 +284,8 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.failFastThreshold = 3; // failed threshold rpcInit.ffp = dmFailFastFp; - rpcInit.connLimit = 3000; + rpcInit.connLimitNum = 3000; + rpcInit.connLimitLock = 1; pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 92477bb514..1fe32955b9 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -64,7 +64,9 @@ typedef struct { void (*destroyFp)(void* ahandle); bool (*failFastFp)(tmsg_t msgType); - int32_t connLimit; + int32_t connLimitNum; + int8_t connLimitLock; // 0: no lock. 1. lock + int index; void* parent; void* tcphandle; // returned handle from TCP initialization diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 61ca9743b3..6eec54b370 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -67,7 +67,8 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->startTimer = pInit->tfp; pRpc->destroyFp = pInit->dfp; pRpc->failFastFp = pInit->ffp; - pRpc->connLimit = pInit->connLimit; + pRpc->connLimitNum = pInit->connLimitNum; + pRpc->connLimitLock = pInit->connLimitLock; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; if (pRpc->numOfThreads <= 0) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4d7b6b5b2b..a1b4766e80 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -85,7 +85,7 @@ typedef struct SCliThrd { SCvtAddr cvtAddr; SHashObj* failFastCache; - SHashObj* connLimit; + SHashObj* connLimitCache; SCliMsg* stopMsg; @@ -750,9 +750,9 @@ static void cliDestroy(uv_handle_t* handle) { transReqQueueClear(&conn->wreqQueue); transDestroyBuffer(&conn->readBuf); - int32_t* oVal = taosHashGet(pThrd->connLimit, conn->ip, strlen(conn->ip)); + int32_t* oVal = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip)); int32_t nVal = oVal == NULL ? 0 : (*oVal) - 1; - taosHashPut(pThrd->connLimit, conn->ip, strlen(conn->ip), &nVal, sizeof(nVal)); + taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nVal, sizeof(nVal)); taosMemoryFree(conn); } @@ -930,9 +930,9 @@ void cliConnCb(uv_connect_t* req, int status) { return; } - int32_t* oVal = taosHashGet(pThrd->connLimit, pConn->ip, strlen(pConn->ip)); + int32_t* oVal = taosHashGet(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip)); int32_t nVal = oVal == NULL ? 0 : (*oVal) + 1; - taosHashPut(pThrd->connLimit, pConn->ip, strlen(pConn->ip), &nVal, sizeof(nVal)); + taosHashPut(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip), &nVal, sizeof(nVal)); struct sockaddr peername, sockname; int addrlen = sizeof(peername); @@ -1080,10 +1080,10 @@ static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, SCliMsg* pMsg) { char key[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(key, ip, port); - int32_t* val = taosHashGet(pThrd->connLimit, key, strlen(key)); + int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key)); if (val == NULL) return 0; - if (*val >= pTransInst->connLimit) { + if (*val >= pTransInst->connLimitNum) { return -1; } return 0; @@ -1441,7 +1441,8 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - pThrd->connLimit = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, + pTransInst->connLimitLock == 0 ? HASH_NO_LOCK : HASH_ENTRY_LOCK); pThrd->quit = false; return pThrd; @@ -1470,7 +1471,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(pThrd->loop); taosHashCleanup(pThrd->fqdn2ipCache); taosHashCleanup(pThrd->failFastCache); - taosHashCleanup(pThrd->connLimit); + taosHashCleanup(pThrd->connLimitCache); taosMemoryFree(pThrd); } @@ -1894,6 +1895,19 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return TSDB_CODE_RPC_BROKEN_LINK; } + if (pTransInst->connLimitNum > 0 && REQUEST_NO_RESP(pReq)) { + char key[TSDB_FQDN_LEN + 64] = {0}; + char* ip = EPSET_GET_INUSE_IP((SEpSet*)pEpSet); + uint16_t port = EPSET_GET_INUSE_PORT((SEpSet*)pEpSet); + CONN_CONSTRUCT_HASH_KEY(key, ip, port); + + int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key)); + if (val != NULL && *val >= pTransInst->connLimitNum) { + transFreeMsg(pReq->pCont); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return TSDB_CODE_RPC_BROKEN_LINK; + } + } TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); -- GitLab