未验证 提交 16bc8cb5 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20038 from taosdata/enh/changeOptPara

enh: opt transport opt
...@@ -49,6 +49,7 @@ extern int32_t tsTagFilterResCacheSize; ...@@ -49,6 +49,7 @@ extern int32_t tsTagFilterResCacheSize;
// queue & threads // queue & threads
extern int32_t tsNumOfRpcThreads; extern int32_t tsNumOfRpcThreads;
extern int32_t tsNumOfRpcSessions;
extern int32_t tsNumOfCommitThreads; extern int32_t tsNumOfCommitThreads;
extern int32_t tsNumOfTaskQueueThreads; extern int32_t tsNumOfTaskQueueThreads;
extern int32_t tsNumOfMnodeQueryThreads; extern int32_t tsNumOfMnodeQueryThreads;
...@@ -86,9 +87,9 @@ extern int32_t tsTelemInterval; ...@@ -86,9 +87,9 @@ extern int32_t tsTelemInterval;
extern char tsTelemServer[]; extern char tsTelemServer[];
extern uint16_t tsTelemPort; extern uint16_t tsTelemPort;
extern bool tsEnableCrashReport; extern bool tsEnableCrashReport;
extern char* tsTelemUri; extern char *tsTelemUri;
extern char* tsClientCrashReportUri; extern char *tsClientCrashReportUri;
extern char* tsSvrCrashReportUri; extern char *tsSvrCrashReportUri;
// query buffer management // query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
...@@ -159,7 +160,7 @@ extern int32_t tsUptimeInterval; ...@@ -159,7 +160,7 @@ extern int32_t tsUptimeInterval;
extern int32_t tsRpcRetryLimit; extern int32_t tsRpcRetryLimit;
extern int32_t tsRpcRetryInterval; extern int32_t tsRpcRetryInterval;
extern bool tsDisableStream; extern bool tsDisableStream;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
......
...@@ -41,6 +41,7 @@ bool tsPrintAuth = false; ...@@ -41,6 +41,7 @@ bool tsPrintAuth = false;
// queue & threads // queue & threads
int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 2000;
int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfCommitThreads = 2;
int32_t tsNumOfTaskQueueThreads = 4; int32_t tsNumOfTaskQueueThreads = 4;
int32_t tsNumOfMnodeQueryThreads = 4; int32_t tsNumOfMnodeQueryThreads = 4;
...@@ -54,7 +55,6 @@ int32_t tsNumOfQnodeQueryThreads = 4; ...@@ -54,7 +55,6 @@ int32_t tsNumOfQnodeQueryThreads = 4;
int32_t tsNumOfQnodeFetchThreads = 1; int32_t tsNumOfQnodeFetchThreads = 1;
int32_t tsNumOfSnodeStreamThreads = 4; int32_t tsNumOfSnodeStreamThreads = 4;
int32_t tsNumOfSnodeWriteThreads = 1; int32_t tsNumOfSnodeWriteThreads = 1;
// sync raft // sync raft
int32_t tsElectInterval = 25 * 1000; int32_t tsElectInterval = 25 * 1000;
int32_t tsHeartbeatInterval = 1000; int32_t tsHeartbeatInterval = 1000;
...@@ -392,6 +392,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -392,6 +392,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS); tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1;
tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000);
if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1;
tsNumOfCommitThreads = tsNumOfCores / 2; tsNumOfCommitThreads = tsNumOfCores / 2;
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4); tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1;
...@@ -504,6 +507,14 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { ...@@ -504,6 +507,14 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype; pItem->stype = stype;
} }
pItem = cfgGetItem(tsCfg, "numOfRpcSessions");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfRpcSessions = 2000;
tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000);
pItem->i32 = tsNumOfRpcSessions;
pItem->stype = stype;
}
pItem = cfgGetItem(tsCfg, "numOfCommitThreads"); pItem = cfgGetItem(tsCfg, "numOfCommitThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfCommitThreads = numOfCores / 2; tsNumOfCommitThreads = numOfCores / 2;
...@@ -721,6 +732,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -721,6 +732,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval; tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval;
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32; tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32;
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
...@@ -771,7 +783,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -771,7 +783,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
if (tsQueryBufferSize >= 0) { if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
} }
tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval;
GRANT_CFG_GET; GRANT_CFG_GET;
...@@ -980,6 +992,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -980,6 +992,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32; tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32;
} else if (strcasecmp("numOfRpcThreads", name) == 0) { } else if (strcasecmp("numOfRpcThreads", name) == 0) {
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32; tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
} else if (strcasecmp("numOfRpcSessions", name) == 0) {
tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32;
} else if (strcasecmp("numOfCommitThreads", name) == 0) { } else if (strcasecmp("numOfCommitThreads", name) == 0) {
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
} else if (strcasecmp("numOfMnodeReadThreads", name) == 0) { } else if (strcasecmp("numOfMnodeReadThreads", name) == 0) {
......
...@@ -284,14 +284,14 @@ int32_t dmInitClient(SDnode *pDnode) { ...@@ -284,14 +284,14 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.failFastThreshold = 3; // failed threshold rpcInit.failFastThreshold = 3; // failed threshold
rpcInit.ffp = dmFailFastFp; rpcInit.ffp = dmFailFastFp;
int32_t connLimitNum = 10000 / (tsNumOfRpcThreads * 3); int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
connLimitNum = TMAX(connLimitNum, 100); connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500); connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitNum = connLimitNum;
rpcInit.connLimitLock = 1; rpcInit.connLimitLock = 1;
rpcInit.supportBatch = 1; rpcInit.supportBatch = 1;
rpcInit.batchSize = 16 * 1024; rpcInit.batchSize = 8 * 1024;
pTrans->clientRpc = rpcOpen(&rpcInit); pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) { if (pTrans->clientRpc == NULL) {
......
...@@ -2285,7 +2285,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran ...@@ -2285,7 +2285,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_BROKEN_LINK; return TSDB_CODE_RPC_BROKEN_LINK;
} }
if (pTransInst->connLimitNum > 0 && REQUEST_NO_RESP(pReq)) { /*if (pTransInst->connLimitNum > 0 && REQUEST_NO_RESP(pReq)) {
char key[TSDB_FQDN_LEN + 64] = {0}; char key[TSDB_FQDN_LEN + 64] = {0};
char* ip = EPSET_GET_INUSE_IP((SEpSet*)pEpSet); char* ip = EPSET_GET_INUSE_IP((SEpSet*)pEpSet);
uint16_t port = EPSET_GET_INUSE_PORT((SEpSet*)pEpSet); uint16_t port = EPSET_GET_INUSE_PORT((SEpSet*)pEpSet);
...@@ -2297,7 +2297,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran ...@@ -2297,7 +2297,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_MAX_SESSIONS; return TSDB_CODE_RPC_MAX_SESSIONS;
} }
} }*/
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册