diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index d761813db1b5cf10058ada050e2647c184dd5e1c..87f753e6aa48061528ba0df6b8c280d284532801 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -72,6 +72,7 @@ typedef struct SRpcMsg { typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset); typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType); +typedef bool (*RpcFFfp)(tmsg_t msgType); typedef void (*RpcDfp)(void *ahandle); typedef struct SRpcInit { @@ -90,6 +91,9 @@ typedef struct SRpcInit { int32_t retryMaxInterval; // retry max interval int64_t retryMaxTimouet; + int32_t failFastThreshold; + int32_t failFastInterval; + int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int8_t encryption; // encrypt or not @@ -107,6 +111,8 @@ typedef struct SRpcInit { // destroy client ahandle; RpcDfp dfp; + // fail fast fp + RpcFFfp ffp; void *parent; } SRpcInit; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 12aba130d5c61c1eb92c8ab3e404b2a37624a738..04c12abcf995622f8985313ddf65bc2d5cc381c0 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -48,6 +48,11 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { return (*msgFp)(pWrapper->pMgmt, pMsg); } +static bool dmFailFastFp(tmsg_t msgType) { + // add more msg type later + return msgType == TDMT_SYNC_HEARTBEAT; +} + static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { SDnodeTrans *pTrans = &pDnode->trans; int32_t code = -1; @@ -260,6 +265,10 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.retryMaxInterval = tsRedirectMaxPeriod; rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; + rpcInit.failFastInterval = 1000; // interval threshold(ms) + rpcInit.failFastThreshold = 3; // failed threshold + rpcInit.ffp = dmFailFastFp; + pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { dError("failed to init dnode rpc client"); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 833937aa41ecdcd4045241105ce34836baf11e59..57aba67b1d6a970ef77ae1ae0bea95b7eb36e98f 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -57,10 +57,14 @@ typedef struct { int32_t retryMaxInterval; // retry max interval int32_t retryMaxTimouet; + int32_t failFastThreshold; + int32_t failFastInterval; + void (*cfp)(void* parent, SRpcMsg*, SEpSet*); bool (*retry)(int32_t code, tmsg_t msgType); bool (*startTimer)(int32_t code, tmsg_t msgType); void (*destroyFp)(void* ahandle); + bool (*failFastFp)(tmsg_t msgType); int index; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index c6a5cfdc953d775215d7bb29f3e10e87bfa854df..0eac12f7c5d834d2070c8982227a92de82045ddd 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -56,11 +56,15 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->retryMaxInterval = pInit->retryMaxInterval; pRpc->retryMaxTimouet = pInit->retryMaxTimouet; + pRpc->failFastThreshold = pInit->failFastThreshold; + pRpc->failFastInterval = pInit->failFastInterval; + // register callback handle pRpc->cfp = pInit->cfp; pRpc->retry = pInit->rfp; pRpc->startTimer = pInit->tfp; pRpc->destroyFp = pInit->dfp; + pRpc->failFastFp = pInit->ffp; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; if (pRpc->numOfThreads <= 0) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7339d487d1e9ede8f96be49aebd5889d5f3f4ddb..187e17fc30348eb0be73ea12798ac34dde3a86a2 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -84,6 +84,8 @@ typedef struct SCliThrd { SHashObj* fqdn2ipCache; SCvtAddr cvtAddr; + SHashObj* failFastCache; + SCliMsg* stopMsg; bool quit; @@ -96,6 +98,13 @@ typedef struct SCliObj { SCliThrd** pThreadObj; } SCliObj; +typedef struct { + int32_t reinit; + int64_t timestamp; + int32_t count; + int32_t threshold; + int64_t interval; +} SFailFastItem; // conn pool // add expire timeout and capacity limit static void* createConnPool(int size); @@ -853,7 +862,7 @@ void cliSend(SCliConn* pConn) { int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); if (status != 0) { - tGError("%s conn %p failed to sent msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType), + tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType), uv_err_name(status)); cliHandleExcept(pConn); } @@ -863,7 +872,6 @@ _RETURN: } void cliConnCb(uv_connect_t* req, int status) { - // impl later SCliConn* pConn = req->data; SCliThrd* pThrd = pConn->hostThrd; @@ -875,7 +883,32 @@ void cliConnCb(uv_connect_t* req, int status) { } if (status != 0) { - tError("%s conn %p failed to connect server:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); + tError("%s conn %p failed to connect to %s:%d, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->ip, + pConn->port, uv_strerror(status)); + SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); + STrans* pTransInst = pThrd->pTransInst; + if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) && + (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { + char* ip = pConn->ip; + uint32_t port = pConn->port; + char key[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, ip, port); + + SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); + int64_t cTimestamp = taosGetTimestampMs(); + if (item != NULL) { + int32_t elapse = cTimestamp - item->timestamp; + if (elapse >= 0 && elapse <= pTransInst->failFastInterval) { + item->count++; + } else { + item->count = 1; + item->timestamp = cTimestamp; + } + } else { + SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; + taosHashPut(pThrd->failFastCache, key, strlen(key), &item, sizeof(SFailFastItem)); + } + } cliHandleExcept(pConn); return; } @@ -1027,6 +1060,25 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { return; } + if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { + char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); + uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + char key[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, ip, port); + + SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); + if (item != NULL) { + int32_t elapse = (int32_t)(taosGetTimestampMs() - item->timestamp); + if (item->count >= pTransInst->failFastThreshold && (elapse >= 0 && elapse <= pTransInst->failFastInterval)) { + STraceId* trace = &(pMsg->msg.info.traceId); + tGTrace("%s, msg %p cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label, pMsg, + ip, port, item->count, elapse); + destroyCmsg(pMsg); + return; + } + } + } + bool ignore = false; SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); if (ignore == true) { @@ -1299,6 +1351,8 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pThrd->quit = false; return pThrd; } @@ -1325,6 +1379,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->loop); taosHashCleanup(pThrd->fqdn2ipCache); + taosHashCleanup(pThrd->failFastCache); taosMemoryFree(pThrd); } diff --git a/source/os/src/osTime.c b/source/os/src/osTime.c index 68dfba14e95a0bb34af7c60793f6af8bda234a7f..cd4324a5928e11497d3f98276a8cff800e369395 100644 --- a/source/os/src/osTime.c +++ b/source/os/src/osTime.c @@ -572,7 +572,7 @@ int32_t taosClockGetTime(int clock_id, struct timespec *pTS) { offsetInitFinished = true; } else { while (!offsetInitFinished) - ; // Ensure initialization is completed. + ; // Ensure initialization is completed. } GetSystemTimeAsFileTime(&f); diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index fc9d90c9857ac1be20722ec22134dbb3da94b3b8..be1db74f1a6385e4fa09325a9b42ecb225485121 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -496,7 +496,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons if (!osLogSpaceAvailable()) return; if (!(dflag & DEBUG_FILE) && !(dflag & DEBUG_SCREEN)) return; - char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE); + char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE); int32_t len = taosBuildLogHead(buffer, flags); va_list argpointer;