提交 2caaae39 编写于 作者: dengyihao's avatar dengyihao

add keep alive parameter

上级 cfe319ba
......@@ -58,6 +58,7 @@ extern int32_t tsTagFilterResCacheSize;
extern int32_t tsNumOfRpcThreads;
extern int32_t tsNumOfRpcSessions;
extern int32_t tsTimeToGetAvailableConn;
extern int32_t tsKeepAliveIdle;
extern int32_t tsNumOfCommitThreads;
extern int32_t tsNumOfTaskQueueThreads;
extern int32_t tsNumOfMnodeQueryThreads;
......
......@@ -14,8 +14,8 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tglobal.h"
#include "os.h"
#include "tconfig.h"
#include "tgrant.h"
#include "tlog.h"
......@@ -49,6 +49,8 @@ bool tsPrintAuth = false;
int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 10000;
int32_t tsTimeToGetAvailableConn = 500000;
int32_t tsKeepAliveIdle = 60;
int32_t tsNumOfCommitThreads = 2;
int32_t tsNumOfTaskQueueThreads = 4;
int32_t tsNumOfMnodeQueryThreads = 4;
......@@ -63,7 +65,7 @@ int32_t tsNumOfQnodeFetchThreads = 1;
int32_t tsNumOfSnodeStreamThreads = 4;
int32_t tsNumOfSnodeWriteThreads = 1;
int32_t tsMaxStreamBackendCache = 128; // M
int32_t tsPQSortMemThreshold = 16; // M
int32_t tsPQSortMemThreshold = 16; // M
// sync raft
int32_t tsElectInterval = 25 * 1000;
......@@ -119,8 +121,8 @@ int32_t tsQueryPolicy = 1;
int32_t tsQueryRspPolicy = 0;
int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT
bool tsEnableQueryHb = true;
bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true
bool tsTtlChangeOnWrite = false; // ttl delete time changes on last write if true
bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true
bool tsTtlChangeOnWrite = false; // ttl delete time changes on last write if true
int32_t tsQuerySmaOptimize = 0;
int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data.
bool tsQueryPlannerTrace = false;
......@@ -372,7 +374,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX,
CFG_SCOPE_CLIENT) != 0)
return -1;
if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 0, INT32_MAX, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddString(pCfg, "slowLogScope", "", CFG_SCOPE_CLIENT) != 0) return -1;
......@@ -385,7 +389,11 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, CFG_SCOPE_BOTH) != 0) return -1;
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 10000000);
if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, CFG_SCOPE_BOTH) != 0)
return -1;
tsKeepAliveIdle = TRANGE(tsKeepAliveIdle, 1, 72000);
if (cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH) != 0) return -1;
tsNumOfTaskQueueThreads = tsNumOfCores / 2;
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
......@@ -445,7 +453,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000,
CFG_SCOPE_SERVER) != 0)
return -1;
if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, CFG_SCOPE_SERVER) != 0) return -1;
......@@ -461,6 +471,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000);
if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsNumOfRpcSessions, 20, 1000000, CFG_SCOPE_BOTH) != 0) return -1;
tsKeepAliveIdle = TRANGE(tsKeepAliveIdle, 1, 72000);
if (cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH) != 0) return -1;
tsNumOfCommitThreads = tsNumOfCores / 2;
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER) != 0) return -1;
......@@ -473,7 +486,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 100, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 100, CFG_SCOPE_SERVER) != 0)
return -1;
tsNumOfVnodeFetchThreads = tsNumOfCores / 4;
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
......@@ -493,7 +507,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfSnodeStreamThreads = tsNumOfCores / 4;
tsNumOfSnodeStreamThreads = TRANGE(tsNumOfSnodeStreamThreads, 2, 4);
if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, CFG_SCOPE_SERVER) != 0)
return -1;
tsNumOfSnodeWriteThreads = tsNumOfCores / 4;
tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4);
......@@ -501,14 +516,18 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_BOTH) != 0)
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX,
CFG_SCOPE_BOTH) != 0)
return -1;
if (cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER) != 0)
return -1;
if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER) != 0)
return -1;
if (cfgAddInt64(pCfg, "vndCommitMaxInterval", tsVndCommitMaxIntervalMs, 1000, 1000 * 60 * 60, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "vndCommitMaxInterval", tsVndCommitMaxIntervalMs, 1000, 1000 * 60 * 60, CFG_SCOPE_SERVER) != 0)
return -1;
if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "mndLogRetention", tsMndLogRetention, 500, 10000, CFG_SCOPE_SERVER) != 0) return -1;
......@@ -538,7 +557,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, CFG_SCOPE_SERVER) != 0)
if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX,
CFG_SCOPE_SERVER) != 0)
return -1;
if (cfgAddBool(pCfg, "udf", tsStartUdfd, CFG_SCOPE_SERVER) != 0) return -1;
......@@ -549,7 +569,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "checkpointInterval", tsCheckpointInterval, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0)
return -1;
if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "keepTimeOffset", tsKeepTimeOffset, 0, 23, CFG_SCOPE_SERVER) != 0) return -1;
......@@ -604,6 +625,13 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype;
}
pItem = cfgGetItem(tsCfg, "keepAliveIdle");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsKeepAliveIdle = TRANGE(tsKeepAliveIdle, 1, 720000);
pItem->i32 = tsKeepAliveIdle;
pItem->stype = stype;
}
pItem = cfgGetItem(tsCfg, "numOfCommitThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfCommitThreads = numOfCores / 2;
......@@ -833,6 +861,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32;
tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32;
tsKeepAliveIdle = cfgGetItem(pCfg, "keepAliveIdle")->i32;
return 0;
}
......@@ -872,6 +902,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32;
tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32;
tsKeepAliveIdle = cfgGetItem(pCfg, "keepAliveIdle")->i32;
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
......@@ -902,7 +934,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
tmqMaxTopicNum= cfgGetItem(pCfg, "tmqMaxTopicNum")->i32;
tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32;
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
......@@ -1014,7 +1046,7 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
taosSetCoreDump(enableCore);
} else if (strcasecmp("enableQueryHb", name) == 0) {
tsEnableQueryHb = cfgGetItem(pCfg, "enableQueryHb")->bval;
} else if (strcasecmp("ttlChangeOnWrite", name) == 0) {
} else if (strcasecmp("ttlChangeOnWrite", name) == 0) {
tsTtlChangeOnWrite = cfgGetItem(pCfg, "ttlChangeOnWrite")->bval;
}
break;
......
......@@ -293,7 +293,7 @@ bool transReadComplete(SConnBuffer* connBuf);
int transResetBuffer(SConnBuffer* connBuf);
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf);
int transSetConnOption(uv_tcp_t* stream);
int transSetConnOption(uv_tcp_t* stream, int keepalive);
void transRefSrvHandle(void* handle);
void transUnrefSrvHandle(void* handle);
......
......@@ -1202,7 +1202,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
cliHandleFastFail(conn, -1);
return;
}
ret = transSetConnOption((uv_tcp_t*)conn->stream);
ret = transSetConnOption((uv_tcp_t*)conn->stream, 20);
if (ret != 0) {
tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
cliHandleFastFail(conn, -1);
......@@ -1610,7 +1610,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr);
pThrd->newConnCount++;
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10);
if (fd == -1) {
tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
......@@ -1624,7 +1624,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
cliHandleExcept(conn);
return;
}
ret = transSetConnOption((uv_tcp_t*)conn->stream);
ret = transSetConnOption((uv_tcp_t*)conn->stream, tsKeepAliveIdle);
if (ret != 0) {
tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
cliHandleExcept(conn);
......
......@@ -203,10 +203,10 @@ bool transReadComplete(SConnBuffer* connBuf) {
return (p->left == 0 || p->invalid) ? true : false;
}
int transSetConnOption(uv_tcp_t* stream) {
int transSetConnOption(uv_tcp_t* stream, int keepalive) {
#if defined(WINDOWS) || defined(DARWIN)
#else
uv_tcp_keepalive(stream, 1, 20);
uv_tcp_keepalive(stream, 1, keepalive);
#endif
return uv_tcp_nodelay(stream, 1);
// int ret = uv_tcp_keepalive(stream, 5, 60);
......
......@@ -760,7 +760,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_tcp_init(pThrd->loop, pConn->pTcp);
pConn->pTcp->data = pConn;
transSetConnOption((uv_tcp_t*)pConn->pTcp);
// transSetConnOption((uv_tcp_t*)pConn->pTcp);
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
uv_os_fd_t fd;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册