diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 5a0c0e0777290856c101cd3b88c75792da606232..e92afc22221a4f647bf6ee5a2a4e5c089e856bfd 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -49,6 +49,7 @@ extern int32_t tsTagFilterResCacheSize; // queue & threads extern int32_t tsNumOfRpcThreads; +extern int32_t tsNumOfRpcSessions; extern int32_t tsNumOfCommitThreads; extern int32_t tsNumOfTaskQueueThreads; extern int32_t tsNumOfMnodeQueryThreads; @@ -86,9 +87,9 @@ extern int32_t tsTelemInterval; extern char tsTelemServer[]; extern uint16_t tsTelemPort; extern bool tsEnableCrashReport; -extern char* tsTelemUri; -extern char* tsClientCrashReportUri; -extern char* tsSvrCrashReportUri; +extern char *tsTelemUri; +extern char *tsClientCrashReportUri; +extern char *tsSvrCrashReportUri; // query buffer management extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing @@ -159,7 +160,7 @@ extern int32_t tsUptimeInterval; extern int32_t tsRpcRetryLimit; extern int32_t tsRpcRetryInterval; -extern bool tsDisableStream; +extern bool tsDisableStream; // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 1e5291b7cb422b8ce9dba51626d9585a8be66764..e3f08e912ab3bf955f5e1905439a317c4464e214 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -41,6 +41,7 @@ bool tsPrintAuth = false; // queue & threads int32_t tsNumOfRpcThreads = 1; +int32_t tsNumOfRpcSessions = 2000; int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfTaskQueueThreads = 4; int32_t tsNumOfMnodeQueryThreads = 4; @@ -54,7 +55,6 @@ int32_t tsNumOfQnodeQueryThreads = 4; int32_t tsNumOfQnodeFetchThreads = 1; int32_t tsNumOfSnodeStreamThreads = 4; int32_t tsNumOfSnodeWriteThreads = 1; - // sync raft int32_t tsElectInterval = 25 * 1000; int32_t tsHeartbeatInterval = 1000; @@ -392,6 +392,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS); if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1; + tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000); + if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1; + tsNumOfCommitThreads = tsNumOfCores / 2; tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4); if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1; @@ -504,6 +507,14 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } + pItem = cfgGetItem(tsCfg, "numOfRpcSessions"); + if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { + tsNumOfRpcSessions = 2000; + tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000); + pItem->i32 = tsNumOfRpcSessions; + pItem->stype = stype; + } + pItem = cfgGetItem(tsCfg, "numOfCommitThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { tsNumOfCommitThreads = numOfCores / 2; @@ -721,6 +732,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval; tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32; + tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32; tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; @@ -771,7 +783,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } - + tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; GRANT_CFG_GET; @@ -980,6 +992,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32; } else if (strcasecmp("numOfRpcThreads", name) == 0) { tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32; + } else if (strcasecmp("numOfRpcSessions", name) == 0) { + tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32; } else if (strcasecmp("numOfCommitThreads", name) == 0) { tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; } else if (strcasecmp("numOfMnodeReadThreads", name) == 0) { diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 7f9a261cf27083c922dca5af9b66f206e2a2d0f6..3a1ca161a994ebd65a0471a13f49837eaeab48d4 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -284,14 +284,14 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.failFastThreshold = 3; // failed threshold rpcInit.ffp = dmFailFastFp; - int32_t connLimitNum = 10000 / (tsNumOfRpcThreads * 3); - connLimitNum = TMAX(connLimitNum, 100); + int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3); + connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitLock = 1; rpcInit.supportBatch = 1; - rpcInit.batchSize = 16 * 1024; + rpcInit.batchSize = 8 * 1024; pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2c862ed45b368b963608f43fa624aabd00d6cc8f..7e1aeafaad8bb25a88ba248de727deda7932f4df 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2285,7 +2285,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return TSDB_CODE_RPC_BROKEN_LINK; } - if (pTransInst->connLimitNum > 0 && REQUEST_NO_RESP(pReq)) { + /*if (pTransInst->connLimitNum > 0 && REQUEST_NO_RESP(pReq)) { char key[TSDB_FQDN_LEN + 64] = {0}; char* ip = EPSET_GET_INUSE_IP((SEpSet*)pEpSet); uint16_t port = EPSET_GET_INUSE_PORT((SEpSet*)pEpSet); @@ -2297,7 +2297,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return TSDB_CODE_RPC_MAX_SESSIONS; } - } + }*/ TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());