diff --git a/include/common/tglobal.h b/include/common/tglobal.h index e92afc22221a4f647bf6ee5a2a4e5c089e856bfd..26bd6fa163ca312a521c9d4bc332b4729e4fc434 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -50,6 +50,7 @@ extern int32_t tsTagFilterResCacheSize; // queue & threads extern int32_t tsNumOfRpcThreads; extern int32_t tsNumOfRpcSessions; +extern int32_t tsTimeToGetAvailableConn; extern int32_t tsNumOfCommitThreads; extern int32_t tsNumOfTaskQueueThreads; extern int32_t tsNumOfMnodeQueryThreads; diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 0cc0ab64eff68ff00e25213eb8ed57ba04916988..c73e5c127af03cdc07e47c1ca95dd775a38599f2 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -114,7 +114,7 @@ typedef struct SRpcInit { int32_t connLimitNum; int32_t connLimitLock; - + int32_t timeToGetConn; int8_t supportBatch; // 0: no batch, 1. batch int32_t batchSize; void *parent; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 6122a1d4657c90d5c826b7ea6907bdc4a4a1e70a..53fe2c7ff33bea6e2d09f2285447d90c40cac7a7 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -159,6 +159,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; + rpcInit.timeToGetConn = tsTimeToGetAvailableConn; void *pDnodeConn = rpcOpen(&rpcInit); if (pDnodeConn == NULL) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index c85e761c0b1c92669efbe71a704ddd44dad6826b..e8751e5b1d9992258fb1348787f0180765978b8b 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2012,6 +2012,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; + rpcInit.timeToGetConn = tsTimeToGetAvailableConn; clientRpc = rpcOpen(&rpcInit); if (clientRpc == NULL) { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 727663ba65583a7ac6f2d83206edf2753a4a576f..adf2c246c4a534064ff80cbadc8868546b77ebc8 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -41,8 +41,8 @@ bool tsPrintAuth = false; // queue & threads int32_t tsNumOfRpcThreads = 1; -int32_t tsNumOfRpcSessions = 2000; -int32_t tsTimeToGetAvailableConn = 1000; +int32_t tsNumOfRpcSessions = 10000; +int32_t tsTimeToGetAvailableConn = 10000; int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfTaskQueueThreads = 4; int32_t tsNumOfMnodeQueryThreads = 4; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 3a1ca161a994ebd65a0471a13f49837eaeab48d4..0245847b20eb63665f6fbb19644a488137afb4d2 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -292,6 +292,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.connLimitLock = 1; rpcInit.supportBatch = 1; rpcInit.batchSize = 8 * 1024; + rpcInit.timeToGetConn = tsTimeToGetAvailableConn; pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 1f3c98ad7283c3f1594a67499d6fad8c0f886f47..8ea0064d4485c031a49b39bad70fd41e0aec7a85 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -64,11 +64,11 @@ typedef struct { void (*destroyFp)(void* ahandle); bool (*failFastFp)(tmsg_t msgType); - int32_t connLimitNum; - int8_t connLimitLock; // 0: no lock. 1. lock - int8_t supportBatch; // 0: no batch, 1: support batch - int32_t batchSize; - + int32_t connLimitNum; + int8_t connLimitLock; // 0: no lock. 1. lock + int8_t supportBatch; // 0: no batch, 1: support batch + int32_t batchSize; + int32_t timeToGetConn; int index; void* parent; void* tcphandle; // returned handle from TCP initialization diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index f5f3b52f50788e84131d8a8101ce7864e7fbdbaa..35b48fea6b153d9b2c96ab2f82775d6bee3fefbb 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -90,7 +90,7 @@ void* rpcOpen(const SRpcInit* pInit) { if (pInit->user) { tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user)); } - + pRpc->timeToGetConn = pInit->timeToGetConn; pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 128d9b81623fd64ef0bb0408940c6963de165b8e..d002366a12c664562763f15522a201702ffe0069 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1422,7 +1422,7 @@ static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMs arg->param1 = pMsg; arg->param2 = pThrd; - pMsg->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, 200); + pMsg->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); QUEUE_PUSH(&(*list)->msgQ, &pMsg->q); return -1; diff --git a/tools/shell/src/shellNettest.c b/tools/shell/src/shellNettest.c index 52ce37b22c7666e5c068937e5b073b6ffb0c530c..1a6ac3489dad818feac855625d5cc502875973e0 100644 --- a/tools/shell/src/shellNettest.c +++ b/tools/shell/src/shellNettest.c @@ -21,7 +21,7 @@ static void shellWorkAsClient() { SRpcInit rpcInit = {0}; SEpSet epSet = {.inUse = 0, .numOfEps = 1}; SRpcMsg rpcRsp = {0}; - void * clientRpc = NULL; + void *clientRpc = NULL; char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)("_pwd"), strlen("_pwd"), pass); @@ -31,6 +31,7 @@ static void shellWorkAsClient() { rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.user = "_dnd"; + rpcInit.timeToGetConn = tsTimeToGetAvailableConn; clientRpc = rpcOpen(&rpcInit); if (clientRpc == NULL) {