未验证 提交 a6d540f2 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #18725 from taosdata/enh/failFast

enh: add fail-fast
......@@ -72,6 +72,7 @@ typedef struct SRpcMsg {
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset);
typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcFFfp)(tmsg_t msgType);
typedef void (*RpcDfp)(void *ahandle);
typedef struct SRpcInit {
......@@ -90,6 +91,9 @@ typedef struct SRpcInit {
int32_t retryMaxInterval; // retry max interval
int64_t retryMaxTimouet;
int32_t failFastThreshold;
int32_t failFastInterval;
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t encryption; // encrypt or not
......@@ -107,6 +111,8 @@ typedef struct SRpcInit {
// destroy client ahandle;
RpcDfp dfp;
// fail fast fp
RpcFFfp ffp;
void *parent;
} SRpcInit;
......
......@@ -48,6 +48,11 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
return (*msgFp)(pWrapper->pMgmt, pMsg);
}
static bool dmFailFastFp(tmsg_t msgType) {
// add more msg type later
return msgType == TDMT_SYNC_HEARTBEAT;
}
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans;
int32_t code = -1;
......@@ -260,6 +265,10 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime;
rpcInit.failFastInterval = 1000; // interval threshold(ms)
rpcInit.failFastThreshold = 3; // failed threshold
rpcInit.ffp = dmFailFastFp;
pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) {
dError("failed to init dnode rpc client");
......
......@@ -57,10 +57,14 @@ typedef struct {
int32_t retryMaxInterval; // retry max interval
int32_t retryMaxTimouet;
int32_t failFastThreshold;
int32_t failFastInterval;
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
bool (*retry)(int32_t code, tmsg_t msgType);
bool (*startTimer)(int32_t code, tmsg_t msgType);
void (*destroyFp)(void* ahandle);
bool (*failFastFp)(tmsg_t msgType);
int index;
void* parent;
......
......@@ -56,11 +56,15 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->retryMaxInterval = pInit->retryMaxInterval;
pRpc->retryMaxTimouet = pInit->retryMaxTimouet;
pRpc->failFastThreshold = pInit->failFastThreshold;
pRpc->failFastInterval = pInit->failFastInterval;
// register callback handle
pRpc->cfp = pInit->cfp;
pRpc->retry = pInit->rfp;
pRpc->startTimer = pInit->tfp;
pRpc->destroyFp = pInit->dfp;
pRpc->failFastFp = pInit->ffp;
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
if (pRpc->numOfThreads <= 0) {
......
......@@ -84,6 +84,8 @@ typedef struct SCliThrd {
SHashObj* fqdn2ipCache;
SCvtAddr cvtAddr;
SHashObj* failFastCache;
SCliMsg* stopMsg;
bool quit;
......@@ -96,6 +98,13 @@ typedef struct SCliObj {
SCliThrd** pThreadObj;
} SCliObj;
typedef struct {
int32_t reinit;
int64_t timestamp;
int32_t count;
int32_t threshold;
int64_t interval;
} SFailFastItem;
// conn pool
// add expire timeout and capacity limit
static void* createConnPool(int size);
......@@ -853,7 +862,7 @@ void cliSend(SCliConn* pConn) {
int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
if (status != 0) {
tGError("%s conn %p failed to sent msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType),
tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType),
uv_err_name(status));
cliHandleExcept(pConn);
}
......@@ -863,7 +872,6 @@ _RETURN:
}
void cliConnCb(uv_connect_t* req, int status) {
// impl later
SCliConn* pConn = req->data;
SCliThrd* pThrd = pConn->hostThrd;
......@@ -875,7 +883,32 @@ void cliConnCb(uv_connect_t* req, int status) {
}
if (status != 0) {
tError("%s conn %p failed to connect server:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status));
tError("%s conn %p failed to connect to %s:%d, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->ip,
pConn->port, uv_strerror(status));
SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);
STrans* pTransInst = pThrd->pTransInst;
if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
(pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
char* ip = pConn->ip;
uint32_t port = pConn->port;
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key));
int64_t cTimestamp = taosGetTimestampMs();
if (item != NULL) {
int32_t elapse = cTimestamp - item->timestamp;
if (elapse >= 0 && elapse <= pTransInst->failFastInterval) {
item->count++;
} else {
item->count = 1;
item->timestamp = cTimestamp;
}
} else {
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
taosHashPut(pThrd->failFastCache, key, strlen(key), &item, sizeof(SFailFastItem));
}
}
cliHandleExcept(pConn);
return;
}
......@@ -1027,6 +1060,25 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
return;
}
if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key));
if (item != NULL) {
int32_t elapse = (int32_t)(taosGetTimestampMs() - item->timestamp);
if (item->count >= pTransInst->failFastThreshold && (elapse >= 0 && elapse <= pTransInst->failFastInterval)) {
STraceId* trace = &(pMsg->msg.info.traceId);
tGTrace("%s, msg %p cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label, pMsg,
ip, port, item->count, elapse);
destroyCmsg(pMsg);
return;
}
}
}
bool ignore = false;
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
if (ignore == true) {
......@@ -1299,6 +1351,8 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd->destroyAhandleFp = pTransInst->destroyFp;
pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->quit = false;
return pThrd;
}
......@@ -1325,6 +1379,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop);
taosHashCleanup(pThrd->fqdn2ipCache);
taosHashCleanup(pThrd->failFastCache);
taosMemoryFree(pThrd);
}
......
......@@ -572,7 +572,7 @@ int32_t taosClockGetTime(int clock_id, struct timespec *pTS) {
offsetInitFinished = true;
} else {
while (!offsetInitFinished)
; // Ensure initialization is completed.
; // Ensure initialization is completed.
}
GetSystemTimeAsFileTime(&f);
......
......@@ -496,7 +496,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
if (!osLogSpaceAvailable()) return;
if (!(dflag & DEBUG_FILE) && !(dflag & DEBUG_SCREEN)) return;
char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE);
char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE);
int32_t len = taosBuildLogHead(buffer, flags);
va_list argpointer;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册