From 4be196491e6671e383187bc703a826cbc87f9bc2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 5 Dec 2022 22:39:18 +0800 Subject: [PATCH] add fail-fast --- include/libs/transport/trpc.h | 6 +++ source/libs/transport/inc/transportInt.h | 4 ++ source/libs/transport/src/trans.c | 4 ++ source/libs/transport/src/transCli.c | 57 ++++++++++++++++++++++-- 4 files changed, 68 insertions(+), 3 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index d761813db1..87f753e6aa 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -72,6 +72,7 @@ typedef struct SRpcMsg { typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset); typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType); +typedef bool (*RpcFFfp)(tmsg_t msgType); typedef void (*RpcDfp)(void *ahandle); typedef struct SRpcInit { @@ -90,6 +91,9 @@ typedef struct SRpcInit { int32_t retryMaxInterval; // retry max interval int64_t retryMaxTimouet; + int32_t failFastThreshold; + int32_t failFastInterval; + int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int8_t encryption; // encrypt or not @@ -107,6 +111,8 @@ typedef struct SRpcInit { // destroy client ahandle; RpcDfp dfp; + // fail fast fp + RpcFFfp ffp; void *parent; } SRpcInit; diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 833937aa41..57aba67b1d 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -57,10 +57,14 @@ typedef struct { int32_t retryMaxInterval; // retry max interval int32_t retryMaxTimouet; + int32_t failFastThreshold; + int32_t failFastInterval; + void (*cfp)(void* parent, SRpcMsg*, SEpSet*); bool (*retry)(int32_t code, tmsg_t msgType); bool (*startTimer)(int32_t code, tmsg_t msgType); void (*destroyFp)(void* ahandle); + bool (*failFastFp)(tmsg_t msgType); int index; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index c6a5cfdc95..0eac12f7c5 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -56,11 +56,15 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->retryMaxInterval = pInit->retryMaxInterval; pRpc->retryMaxTimouet = pInit->retryMaxTimouet; + pRpc->failFastThreshold = pInit->failFastThreshold; + pRpc->failFastInterval = pInit->failFastInterval; + // register callback handle pRpc->cfp = pInit->cfp; pRpc->retry = pInit->rfp; pRpc->startTimer = pInit->tfp; pRpc->destroyFp = pInit->dfp; + pRpc->failFastFp = pInit->ffp; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; if (pRpc->numOfThreads <= 0) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7339d487d1..3e9b27dc67 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -84,6 +84,8 @@ typedef struct SCliThrd { SHashObj* fqdn2ipCache; SCvtAddr cvtAddr; + SHashObj* failFastCache; + SCliMsg* stopMsg; bool quit; @@ -96,6 +98,13 @@ typedef struct SCliObj { SCliThrd** pThreadObj; } SCliObj; +typedef struct { + int32_t reinit; + int64_t timestamp; + int32_t count; + int32_t threshold; + int64_t interval; +} SFailFastItem; // conn pool // add expire timeout and capacity limit static void* createConnPool(int size); @@ -853,7 +862,7 @@ void cliSend(SCliConn* pConn) { int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); if (status != 0) { - tGError("%s conn %p failed to sent msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType), + tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType), uv_err_name(status)); cliHandleExcept(pConn); } @@ -863,7 +872,6 @@ _RETURN: } void cliConnCb(uv_connect_t* req, int status) { - // impl later SCliConn* pConn = req->data; SCliThrd* pThrd = pConn->hostThrd; @@ -875,7 +883,29 @@ void cliConnCb(uv_connect_t* req, int status) { } if (status != 0) { - tError("%s conn %p failed to connect server:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); + tError("%s conn %p failed to connect to %s:%d, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->ip, + pConn->port, uv_strerror(status)); + SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); + if (REQUEST_NO_RESP(&pMsg->msg)) { + char* ip = pConn->ip; + uint32_t port = pConn->port; + char key[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, ip, port); + + SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); + int64_t cTimestamp = taosGetTimestampMs(); + if (item != NULL) { + if (cTimestamp - item->timestamp < ((STrans*)pThrd->pTransInst)->failFastInterval) { + item->count++; + } else { + item->count = 1; + item->timestamp = cTimestamp; + } + } else { + SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; + taosHashPut(pThrd->failFastCache, key, strlen(key), &item, sizeof(SFailFastItem)); + } + } cliHandleExcept(pConn); return; } @@ -1027,6 +1057,25 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { return; } + if (REQUEST_NO_RESP(&pMsg->msg)) { + char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); + uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + char key[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, ip, port); + + SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); + if (item != NULL) { + int32_t elapse = taosGetTimestampMs() - item->timestamp; + if (item->count >= pTransInst->failFastThreshold && elapse <= pTransInst->failFastInterval) { + STraceId* trace = &(pMsg->msg.info.traceId); + tGTrace("%s, msg %p cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label, pMsg, + ip, port, item->count, elapse); + destroyCmsg(pMsg); + return; + } + } + } + bool ignore = false; SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); if (ignore == true) { @@ -1299,6 +1348,8 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pThrd->quit = false; return pThrd; } -- GitLab