提交 4be19649 编写于 作者: dengyihao's avatar dengyihao

add fail-fast

上级 17c6d303
...@@ -72,6 +72,7 @@ typedef struct SRpcMsg { ...@@ -72,6 +72,7 @@ typedef struct SRpcMsg {
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset); typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset);
typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcFFfp)(tmsg_t msgType);
typedef void (*RpcDfp)(void *ahandle); typedef void (*RpcDfp)(void *ahandle);
typedef struct SRpcInit { typedef struct SRpcInit {
...@@ -90,6 +91,9 @@ typedef struct SRpcInit { ...@@ -90,6 +91,9 @@ typedef struct SRpcInit {
int32_t retryMaxInterval; // retry max interval int32_t retryMaxInterval; // retry max interval
int64_t retryMaxTimouet; int64_t retryMaxTimouet;
int32_t failFastThreshold;
int32_t failFastInterval;
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t encryption; // encrypt or not int8_t encryption; // encrypt or not
...@@ -107,6 +111,8 @@ typedef struct SRpcInit { ...@@ -107,6 +111,8 @@ typedef struct SRpcInit {
// destroy client ahandle; // destroy client ahandle;
RpcDfp dfp; RpcDfp dfp;
// fail fast fp
RpcFFfp ffp;
void *parent; void *parent;
} SRpcInit; } SRpcInit;
......
...@@ -57,10 +57,14 @@ typedef struct { ...@@ -57,10 +57,14 @@ typedef struct {
int32_t retryMaxInterval; // retry max interval int32_t retryMaxInterval; // retry max interval
int32_t retryMaxTimouet; int32_t retryMaxTimouet;
int32_t failFastThreshold;
int32_t failFastInterval;
void (*cfp)(void* parent, SRpcMsg*, SEpSet*); void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
bool (*retry)(int32_t code, tmsg_t msgType); bool (*retry)(int32_t code, tmsg_t msgType);
bool (*startTimer)(int32_t code, tmsg_t msgType); bool (*startTimer)(int32_t code, tmsg_t msgType);
void (*destroyFp)(void* ahandle); void (*destroyFp)(void* ahandle);
bool (*failFastFp)(tmsg_t msgType);
int index; int index;
void* parent; void* parent;
......
...@@ -56,11 +56,15 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -56,11 +56,15 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->retryMaxInterval = pInit->retryMaxInterval; pRpc->retryMaxInterval = pInit->retryMaxInterval;
pRpc->retryMaxTimouet = pInit->retryMaxTimouet; pRpc->retryMaxTimouet = pInit->retryMaxTimouet;
pRpc->failFastThreshold = pInit->failFastThreshold;
pRpc->failFastInterval = pInit->failFastInterval;
// register callback handle // register callback handle
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->retry = pInit->rfp; pRpc->retry = pInit->rfp;
pRpc->startTimer = pInit->tfp; pRpc->startTimer = pInit->tfp;
pRpc->destroyFp = pInit->dfp; pRpc->destroyFp = pInit->dfp;
pRpc->failFastFp = pInit->ffp;
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
if (pRpc->numOfThreads <= 0) { if (pRpc->numOfThreads <= 0) {
......
...@@ -84,6 +84,8 @@ typedef struct SCliThrd { ...@@ -84,6 +84,8 @@ typedef struct SCliThrd {
SHashObj* fqdn2ipCache; SHashObj* fqdn2ipCache;
SCvtAddr cvtAddr; SCvtAddr cvtAddr;
SHashObj* failFastCache;
SCliMsg* stopMsg; SCliMsg* stopMsg;
bool quit; bool quit;
...@@ -96,6 +98,13 @@ typedef struct SCliObj { ...@@ -96,6 +98,13 @@ typedef struct SCliObj {
SCliThrd** pThreadObj; SCliThrd** pThreadObj;
} SCliObj; } SCliObj;
typedef struct {
int32_t reinit;
int64_t timestamp;
int32_t count;
int32_t threshold;
int64_t interval;
} SFailFastItem;
// conn pool // conn pool
// add expire timeout and capacity limit // add expire timeout and capacity limit
static void* createConnPool(int size); static void* createConnPool(int size);
...@@ -853,7 +862,7 @@ void cliSend(SCliConn* pConn) { ...@@ -853,7 +862,7 @@ void cliSend(SCliConn* pConn) {
int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
if (status != 0) { if (status != 0) {
tGError("%s conn %p failed to sent msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType), tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType),
uv_err_name(status)); uv_err_name(status));
cliHandleExcept(pConn); cliHandleExcept(pConn);
} }
...@@ -863,7 +872,6 @@ _RETURN: ...@@ -863,7 +872,6 @@ _RETURN:
} }
void cliConnCb(uv_connect_t* req, int status) { void cliConnCb(uv_connect_t* req, int status) {
// impl later
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
...@@ -875,7 +883,29 @@ void cliConnCb(uv_connect_t* req, int status) { ...@@ -875,7 +883,29 @@ void cliConnCb(uv_connect_t* req, int status) {
} }
if (status != 0) { if (status != 0) {
tError("%s conn %p failed to connect server:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); tError("%s conn %p failed to connect to %s:%d, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->ip,
pConn->port, uv_strerror(status));
SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);
if (REQUEST_NO_RESP(&pMsg->msg)) {
char* ip = pConn->ip;
uint32_t port = pConn->port;
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key));
int64_t cTimestamp = taosGetTimestampMs();
if (item != NULL) {
if (cTimestamp - item->timestamp < ((STrans*)pThrd->pTransInst)->failFastInterval) {
item->count++;
} else {
item->count = 1;
item->timestamp = cTimestamp;
}
} else {
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
taosHashPut(pThrd->failFastCache, key, strlen(key), &item, sizeof(SFailFastItem));
}
}
cliHandleExcept(pConn); cliHandleExcept(pConn);
return; return;
} }
...@@ -1027,6 +1057,25 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -1027,6 +1057,25 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
return; return;
} }
if (REQUEST_NO_RESP(&pMsg->msg)) {
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key));
if (item != NULL) {
int32_t elapse = taosGetTimestampMs() - item->timestamp;
if (item->count >= pTransInst->failFastThreshold && elapse <= pTransInst->failFastInterval) {
STraceId* trace = &(pMsg->msg.info.traceId);
tGTrace("%s, msg %p cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label, pMsg,
ip, port, item->count, elapse);
destroyCmsg(pMsg);
return;
}
}
}
bool ignore = false; bool ignore = false;
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
if (ignore == true) { if (ignore == true) {
...@@ -1299,6 +1348,8 @@ static SCliThrd* createThrdObj(void* trans) { ...@@ -1299,6 +1348,8 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->destroyAhandleFp = pTransInst->destroyFp;
pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->quit = false; pThrd->quit = false;
return pThrd; return pThrd;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册