diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 87f753e6aa48061528ba0df6b8c280d284532801..de3c2a9f52f7f7743ae58f57c0bc53d1cc0d2991 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -77,14 +77,12 @@ typedef void (*RpcDfp)(void *ahandle); typedef struct SRpcInit { char localFqdn[TSDB_FQDN_LEN]; - uint16_t localPort; // local port - char *label; // for debug purpose - int32_t numOfThreads; // number of threads to handle connections - int32_t sessions; // number of sessions allowed - int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS - int32_t idleTime; // milliseconds, 0 means idle timer is disabled - int32_t retryLimit; // retry limit - int32_t retryInterval; // retry interval ms + uint16_t localPort; // local port + char *label; // for debug purpose + int32_t numOfThreads; // number of threads to handle connections + int32_t sessions; // number of sessions allowed + int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS + int32_t idleTime; // milliseconds, 0 means idle timer is disabled int32_t retryMinInterval; // retry init interval int32_t retryStepFactor; // retry interval factor diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index c694a6c1a04c5f91b66b89e6b33b57f224cd6ab9..2cb337fc4ce6c8432b80e6d87a3824b78d299cb2 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -146,8 +146,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.compressSize = tsCompressMsgSize; rpcInit.dfp = destroyAhandle; - rpcInit.retryLimit = tsRpcRetryLimit; - rpcInit.retryInterval = tsRpcRetryInterval; + rpcInit.retryMinInterval = tsRedirectPeriod; rpcInit.retryStepFactor = tsRedirectFactor; rpcInit.retryMaxInterval = tsRedirectMaxPeriod; @@ -232,7 +231,7 @@ void destroyTscObj(void *pObj) { pTscObj->pAppInfo->numOfConns); // In any cases, we should not free app inst here. Or an race condition rises. - /*int64_t connNum = */atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); + /*int64_t connNum = */ atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFree(pTscObj); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index e26512d065b97fb5d38ca9d01f48131cbed8513a..807c75287314d40c0d114272bf729f05f424c33c 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -190,8 +190,8 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, (*pRequest)->body.param = param; STscObj* pTscObj = (*pRequest)->pTscObj; - int32_t err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self, - sizeof((*pRequest)->self)); + int32_t err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self, + sizeof((*pRequest)->self)); if (err) { tscError("%" PRId64 " failed to add to request container, reqId:0x%" PRIx64 ", conn:%" PRId64 ", %s", (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql); @@ -1646,7 +1646,8 @@ int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) { static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) { char* p = (char*)pResultInfo->pData; - // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length | + // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column + // length | int32_t len = getVersion1BlockMetaSize(p, numOfCols); int32_t* colLength = (int32_t*)(p + len); len += sizeof(int32_t) * numOfCols; @@ -1972,8 +1973,6 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.compressSize = tsCompressMsgSize; rpcInit.user = "_dnd"; - rpcInit.retryLimit = tsRpcRetryLimit; - rpcInit.retryInterval = tsRpcRetryInterval; clientRpc = rpcOpen(&rpcInit); if (clientRpc == NULL) { @@ -2295,14 +2294,14 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly); tsem_wait(¶m->sem); - SRequestObj *pRequest = NULL; + SRequestObj* pRequest = NULL; if (param->pRequest != NULL) { param->pRequest->syncQuery = true; pRequest = param->pRequest; } else { taosMemoryFree(param); } - + return pRequest; } @@ -2318,13 +2317,13 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid); tsem_wait(¶m->sem); - SRequestObj *pRequest = NULL; + SRequestObj* pRequest = NULL; if (param->pRequest != NULL) { param->pRequest->syncQuery = true; pRequest = param->pRequest; } else { taosMemoryFree(param); } - + return pRequest; } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 9d02f06636ffc62aa63324fad9de71d945677296..f57d59fb41381ecf87ffc56f01e8ee929b7da231 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -180,8 +180,6 @@ int32_t tsUptimeInterval = 300; // seconds char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits char tsUdfdLdLibPath[512] = ""; -int32_t tsRpcRetryLimit = 100; -int32_t tsRpcRetryInterval = 15; #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { SConfigItem *pItem = cfgGetItem(pCfg, "dataDir"); @@ -203,7 +201,9 @@ int32_t taosSetTfsCfg(SConfig *pCfg) { int32_t taosSetTfsCfg(SConfig *pCfg); #endif -struct SConfig *taosGetCfg() { return tsCfg; } +struct SConfig *taosGetCfg() { + return tsCfg; +} static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile, char *apolloUrl) { @@ -313,8 +313,6 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; if (cfgAddInt32(pCfg, "smlBatchSize", tsSmlBatchSize, 1, INT32_MAX, true) != 0) return -1; if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1; - if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 100000, 0) != 0) return -1; - if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 100000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1; tsNumOfTaskQueueThreads = tsNumOfCores / 2; @@ -457,9 +455,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1; - if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 100000, 0) != 0) return -1; - if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 100000, 0) != 0) return -1; - GRANT_CFG_ADD; return 0; } @@ -674,8 +669,6 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval; tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval; - tsRpcRetryLimit = cfgGetItem(pCfg, "rpcRetryLimit")->i32; - tsRpcRetryInterval = cfgGetItem(pCfg, "rpcRetryInterval")->i32; tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; return 0; } @@ -738,10 +731,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; - tsElectInterval = cfgGetItem(pCfg, "syncElectInterval")->i32; - tsHeartbeatInterval = cfgGetItem(pCfg, "syncHeartbeatInterval")->i32; - tsHeartbeatTimeout = cfgGetItem(pCfg, "syncHeartbeatTimeout")->i32; - tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32; tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32; @@ -761,9 +750,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } - - tsRpcRetryLimit = cfgGetItem(pCfg, "rpcRetryLimit")->i32; - tsRpcRetryInterval = cfgGetItem(pCfg, "rpcRetryInterval")->i32; GRANT_CFG_GET; return 0; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 4fa09a46b7faaee64a6b38fb187608300f241744..5e1dcc6353734947706e91697e6882c695e8a508 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -145,7 +145,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { if (pMsg == NULL) goto _OVER; memcpy(pMsg, pRpc, sizeof(SRpcMsg)); - dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle, pRpc->contLen); + dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle, + pRpc->contLen); code = dmProcessNodeMsg(pWrapper, pMsg); @@ -258,8 +259,6 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.rfp = rpcRfp; rpcInit.compressSize = tsCompressMsgSize; - rpcInit.retryLimit = tsRpcRetryLimit; - rpcInit.retryInterval = tsRpcRetryInterval; rpcInit.retryMinInterval = tsRedirectPeriod; rpcInit.retryStepFactor = tsRedirectFactor; rpcInit.retryMaxInterval = tsRedirectMaxPeriod; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 9d8fad6fea65b08030ae5d507beb8e8f20c82805..eb6091d60583ecbd07fbb202870d97dce4577d91 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -31,7 +31,7 @@ extern "C" { #define QW_DEFAULT_SCHEDULER_NUMBER 100 #define QW_DEFAULT_TASK_NUMBER 10000 -#define QW_DEFAULT_SCH_TASK_NUMBER 3000 +#define QW_DEFAULT_SCH_TASK_NUMBER 500 #define QW_DEFAULT_SHORT_RUN_TIMES 2 #define QW_DEFAULT_HEARTBEAT_MSEC 5000 #define QW_SCH_TIMEOUT_MSEC 180000 diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 57aba67b1d6a970ef77ae1ae0bea95b7eb36e98f..2db4a72795566d3a52d55650d275fe9c65c54ae5 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -47,10 +47,8 @@ typedef struct { char label[TSDB_LABEL_LEN]; char user[TSDB_UNI_LEN]; // meter ID - int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size - int8_t encryption; // encrypt or not - int32_t retryLimit; // retry limit - int32_t retryInterval; // retry interval ms + int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size + int8_t encryption; // encrypt or not int32_t retryMinInterval; // retry init interval int32_t retryStepFactor; // retry interval factor diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index e026d98091f2a014f1553a510f01f73d7c0da40d..55c3c61b05ebb0728858b8151f5fc33425fc7a65 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -52,8 +52,6 @@ void* rpcOpen(const SRpcInit* pInit) { } pRpc->encryption = pInit->encryption; - pRpc->retryLimit = pInit->retryLimit; - pRpc->retryInterval = pInit->retryInterval; pRpc->retryMinInterval = pInit->retryMinInterval; // retry init interval pRpc->retryStepFactor = pInit->retryStepFactor;