提交 27ebbdad 编写于 作者: dengyihao's avatar dengyihao

add rpc retry config

上级 11b20e14
......@@ -135,6 +135,9 @@ extern int32_t tsTtlPushInterval;
extern int32_t tsGrantHBInterval;
extern int32_t tsUptimeInterval;
extern int32_t tsRpcRetryLimit;
extern int32_t tsRpcRetryInterval;
//#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
......@@ -76,12 +76,14 @@ typedef void (*RpcDfp)(void *ahandle);
typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN];
uint16_t localPort; // local port
char *label; // for debug purpose
int32_t numOfThreads; // number of threads to handle connections
int32_t sessions; // number of sessions allowed
int32_t idleTime; // milliseconds, 0 means idle timer is disabled
uint16_t localPort; // local port
char *label; // for debug purpose
int32_t numOfThreads; // number of threads to handle connections
int32_t sessions; // number of sessions allowed
int32_t idleTime; // milliseconds, 0 means idle timer is disabled
int32_t retryLimit; // retry limit
int32_t retryInterval; // retry interval ms
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t encryption; // encrypt or not
......@@ -146,6 +146,8 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.compressSize = tsCompressMsgSize;
rpcInit.dfp = destroyAhandle;
rpcInit.retryLimit = tsRpcRetryLimit;
rpcInit.retryInterval = tsRpcRetryInterval;
void *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) {
......@@ -1971,6 +1971,8 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.compressSize = tsCompressMsgSize;
rpcInit.user = "_dnd";
rpcInit.retryLimit = tsRpcRetryLimit;
rpcInit.retryInterval = tsRpcRetryInterval;
clientRpc = rpcOpen(&rpcInit);
if (clientRpc == NULL) {
......@@ -163,10 +163,12 @@ int32_t tsMqRebalanceInterval = 2;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 86400;
int32_t tsGrantHBInterval = 60;
int32_t tsUptimeInterval = 300; // seconds
int32_t tsUptimeInterval = 300; // seconds
char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits
char tsUdfdLdLibPath[512] = "";
int32_t tsRpcRetryLimit = 100;
int32_t tsRpcRetryInterval = 15;
#ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) {
SConfigItem *pItem = cfgGetItem(pCfg, "dataDir");
......@@ -297,6 +299,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1;
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 10000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 10000, 0) != 0) return -1;
tsNumOfTaskQueueThreads = tsNumOfCores / 2;
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
......@@ -422,6 +426,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1;
if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 1000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 10000, 0) != 0) return -1;
return 0;
......@@ -634,6 +642,9 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32;
tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval;
tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
tsRpcRetryLimit = cfgGetItem(pCfg, "rpcRetryLimit")->i32;
tsRpcRetryInterval = cfgGetItem(pCfg, "rpcRetryInterval")->i32;
return 0;
......@@ -708,6 +719,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
tsRpcRetryLimit = cfgGetItem(pCfg, "rpcRetryLimit")->i32;
tsRpcRetryInterval = cfgGetItem(pCfg, "rpcRetryInterval")->i32;
return 0;
......@@ -258,6 +258,8 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.parent = pDnode;
rpcInit.rfp = rpcRfp;
rpcInit.compressSize = tsCompressMsgSize;
rpcInit.retryLimit = tsRpcRetryLimit;
rpcInit.retryInterval = tsRpcRetryInterval;
pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) {
......@@ -94,11 +94,11 @@ typedef void* queue[2];
/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
#define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
#define TRANS_CONN_TIMEOUT 3 // connect timeout (s)
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
//#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
//#define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
#define TRANS_CONN_TIMEOUT 3 // connect timeout (s)
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
#define TRANS_MAGIC_NUM 0x5f375a86
......@@ -47,8 +47,10 @@ typedef struct {
char label[TSDB_LABEL_LEN];
char user[TSDB_UNI_LEN]; // meter ID
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t encryption; // encrypt or not
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t encryption; // encrypt or not
int32_t retryLimit; // retry limit
int32_t retryInterval; // retry interval ms
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
bool (*retry)(int32_t code, tmsg_t msgType);
......@@ -48,6 +48,8 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->compressSize = pInit->compressSize;
pRpc->encryption = pInit->encryption;
pRpc->retryLimit = pInit->retryLimit;
pRpc->retryInterval = pInit->retryInterval;
// register callback handle
pRpc->cfp = pInit->cfp;
......@@ -1288,6 +1288,7 @@ static void doCloseIdleConn(void* param) {
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STrans* pTransInst = pThrd->pTransInst;
STransConnCtx* pCtx = pMsg->ctx;
STraceId* trace = &pMsg->msg.info.traceId;
......@@ -1299,7 +1300,7 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg;
arg->param2 = pThrd;
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
transDQSched(pThrd->delayQueue, doDelayTask, arg, pTransInst->retryInterval);
FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
......@@ -1351,7 +1352,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
pMsg->sent = 0;
pCtx->retryCnt += 1;
cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, EPSET_GET_SIZE(&pCtx->epSet) * 3);
cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, EPSET_GET_SIZE(&pCtx->epSet) * 3);
if (pCtx->retryCnt < pCtx->retryLimit) {
......@@ -1360,7 +1361,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
return -1;
} else {
cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, pTransInst->retryLimit);
if (pCtx->retryCnt < pCtx->retryLimit) {
if (pResp->contLen == 0) {
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册