diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 4f2ed2b0659f8572974028594bb3a9feb1b910a3..c1addd9481f2b29f98fbb36d02a74bae0a627590 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -58,6 +58,7 @@ extern int32_t tsTagFilterResCacheSize; extern int32_t tsNumOfRpcThreads; extern int32_t tsNumOfRpcSessions; extern int32_t tsTimeToGetAvailableConn; +extern int32_t tsKeepAliveIdle; extern int32_t tsNumOfCommitThreads; extern int32_t tsNumOfTaskQueueThreads; extern int32_t tsNumOfMnodeQueryThreads; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index f06eeb230fc17e6dbfa651b7437db670f883acec..e64774ba9c68e01b622e427dd20fe021c3ccc503 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -14,8 +14,8 @@ */ #define _DEFAULT_SOURCE -#include "os.h" #include "tglobal.h" +#include "os.h" #include "tconfig.h" #include "tgrant.h" #include "tlog.h" @@ -49,6 +49,8 @@ bool tsPrintAuth = false; int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfRpcSessions = 10000; int32_t tsTimeToGetAvailableConn = 500000; +int32_t tsKeepAliveIdle = 60; + int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfTaskQueueThreads = 4; int32_t tsNumOfMnodeQueryThreads = 4; @@ -63,7 +65,7 @@ int32_t tsNumOfQnodeFetchThreads = 1; int32_t tsNumOfSnodeStreamThreads = 4; int32_t tsNumOfSnodeWriteThreads = 1; int32_t tsMaxStreamBackendCache = 128; // M -int32_t tsPQSortMemThreshold = 16; // M +int32_t tsPQSortMemThreshold = 16; // M // sync raft int32_t tsElectInterval = 25 * 1000; @@ -119,8 +121,8 @@ int32_t tsQueryPolicy = 1; int32_t tsQueryRspPolicy = 0; int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT bool tsEnableQueryHb = true; -bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true -bool tsTtlChangeOnWrite = false; // ttl delete time changes on last write if true +bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true +bool tsTtlChangeOnWrite = false; // ttl delete time changes on last write if true int32_t tsQuerySmaOptimize = 0; int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data. bool tsQueryPlannerTrace = false; @@ -372,7 +374,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, CFG_SCOPE_CLIENT) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, CFG_SCOPE_CLIENT) != 0) return -1; + if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, + CFG_SCOPE_CLIENT) != 0) + return -1; if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, CFG_SCOPE_CLIENT) != 0) return -1; if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 0, INT32_MAX, CFG_SCOPE_CLIENT) != 0) return -1; if (cfgAddString(pCfg, "slowLogScope", "", CFG_SCOPE_CLIENT) != 0) return -1; @@ -385,7 +389,11 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, CFG_SCOPE_BOTH) != 0) return -1; tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 10000000); - if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, CFG_SCOPE_BOTH) != 0) + return -1; + + tsKeepAliveIdle = TRANGE(tsKeepAliveIdle, 1, 72000); + if (cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH) != 0) return -1; tsNumOfTaskQueueThreads = tsNumOfCores / 2; tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); @@ -445,7 +453,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT) != 0) return -1; if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, CFG_SCOPE_CLIENT) != 0) return -1; - if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, + CFG_SCOPE_SERVER) != 0) + return -1; if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, CFG_SCOPE_SERVER) != 0) return -1; @@ -461,6 +471,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000); if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsNumOfRpcSessions, 20, 1000000, CFG_SCOPE_BOTH) != 0) return -1; + tsKeepAliveIdle = TRANGE(tsKeepAliveIdle, 1, 72000); + if (cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH) != 0) return -1; + tsNumOfCommitThreads = tsNumOfCores / 2; tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4); if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER) != 0) return -1; @@ -473,7 +486,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 100, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 100, CFG_SCOPE_SERVER) != 0) + return -1; tsNumOfVnodeFetchThreads = tsNumOfCores / 4; tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); @@ -493,7 +507,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfSnodeStreamThreads = tsNumOfCores / 4; tsNumOfSnodeStreamThreads = TRANGE(tsNumOfSnodeStreamThreads, 2, 4); - if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, CFG_SCOPE_SERVER) != 0) + return -1; tsNumOfSnodeWriteThreads = tsNumOfCores / 4; tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4); @@ -501,14 +516,18 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); - if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_BOTH) != 0) + if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, + CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER) != 0) + return -1; + if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER) != 0) + return -1; - if (cfgAddInt64(pCfg, "vndCommitMaxInterval", tsVndCommitMaxIntervalMs, 1000, 1000 * 60 * 60, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt64(pCfg, "vndCommitMaxInterval", tsVndCommitMaxIntervalMs, 1000, 1000 * 60 * 60, CFG_SCOPE_SERVER) != 0) + return -1; if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "mndLogRetention", tsMndLogRetention, 500, 10000, CFG_SCOPE_SERVER) != 0) return -1; @@ -538,7 +557,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, CFG_SCOPE_SERVER) != 0) + if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, + CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "udf", tsStartUdfd, CFG_SCOPE_SERVER) != 0) return -1; @@ -549,7 +569,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "checkpointInterval", tsCheckpointInterval, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) + return -1; if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "keepTimeOffset", tsKeepTimeOffset, 0, 23, CFG_SCOPE_SERVER) != 0) return -1; @@ -604,6 +625,13 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } + pItem = cfgGetItem(tsCfg, "keepAliveIdle"); + if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { + tsKeepAliveIdle = TRANGE(tsKeepAliveIdle, 1, 720000); + pItem->i32 = tsKeepAliveIdle; + pItem->stype = stype; + } + pItem = cfgGetItem(tsCfg, "numOfCommitThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { tsNumOfCommitThreads = numOfCores / 2; @@ -833,6 +861,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32; tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32; + + tsKeepAliveIdle = cfgGetItem(pCfg, "keepAliveIdle")->i32; return 0; } @@ -872,6 +902,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32; tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32; + tsKeepAliveIdle = cfgGetItem(pCfg, "keepAliveIdle")->i32; + tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; @@ -902,7 +934,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; - tmqMaxTopicNum= cfgGetItem(pCfg, "tmqMaxTopicNum")->i32; + tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32; tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32; @@ -1014,7 +1046,7 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) { taosSetCoreDump(enableCore); } else if (strcasecmp("enableQueryHb", name) == 0) { tsEnableQueryHb = cfgGetItem(pCfg, "enableQueryHb")->bval; - } else if (strcasecmp("ttlChangeOnWrite", name) == 0) { + } else if (strcasecmp("ttlChangeOnWrite", name) == 0) { tsTtlChangeOnWrite = cfgGetItem(pCfg, "ttlChangeOnWrite")->bval; } break; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 3b304e2c770a39db1ae06c220c0d00e9a7f0a239..a6b7a20f7658043033f8360176b5ae1e8cafcd45 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -293,7 +293,7 @@ bool transReadComplete(SConnBuffer* connBuf); int transResetBuffer(SConnBuffer* connBuf); int transDumpFromBuffer(SConnBuffer* connBuf, char** buf); -int transSetConnOption(uv_tcp_t* stream); +int transSetConnOption(uv_tcp_t* stream, int keepalive); void transRefSrvHandle(void* handle); void transUnrefSrvHandle(void* handle); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 01223a2be96de797c4aaba8d5d6184e2c4116fcb..71379daa50c012cbba39467a24359115ff8feeb8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1202,7 +1202,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { cliHandleFastFail(conn, -1); return; } - ret = transSetConnOption((uv_tcp_t*)conn->stream); + ret = transSetConnOption((uv_tcp_t*)conn->stream, 20); if (ret != 0) { tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); cliHandleFastFail(conn, -1); @@ -1610,7 +1610,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr); pThrd->newConnCount++; - int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4); + int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); if (fd == -1) { tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, tstrerror(TAOS_SYSTEM_ERROR(errno))); @@ -1624,7 +1624,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { cliHandleExcept(conn); return; } - ret = transSetConnOption((uv_tcp_t*)conn->stream); + ret = transSetConnOption((uv_tcp_t*)conn->stream, tsKeepAliveIdle); if (ret != 0) { tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); cliHandleExcept(conn); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index b14db9497e4ec58305f96469cc9fc19f17d2b1fb..5e602b1ea2457536572a02d8b786212c75180880 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -203,10 +203,10 @@ bool transReadComplete(SConnBuffer* connBuf) { return (p->left == 0 || p->invalid) ? true : false; } -int transSetConnOption(uv_tcp_t* stream) { +int transSetConnOption(uv_tcp_t* stream, int keepalive) { #if defined(WINDOWS) || defined(DARWIN) #else - uv_tcp_keepalive(stream, 1, 20); + uv_tcp_keepalive(stream, 1, keepalive); #endif return uv_tcp_nodelay(stream, 1); // int ret = uv_tcp_keepalive(stream, 5, 60); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index f23e176c79bf5fb8fc87a3383a3ad1ef9f2e3317..a546ee815927eb518979b4343c754d4a729ed846 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -760,7 +760,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_tcp_init(pThrd->loop, pConn->pTcp); pConn->pTcp->data = pConn; - transSetConnOption((uv_tcp_t*)pConn->pTcp); + // transSetConnOption((uv_tcp_t*)pConn->pTcp); if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { uv_os_fd_t fd;