From 207fd455fb7f23c53e6890ba0cd2d7b2ef9547fd Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 26 Apr 2022 18:17:45 +0800 Subject: [PATCH] feature(rpc): add retry --- include/libs/transport/trpc.h | 8 +- source/libs/transport/inc/transComm.h | 15 ++- source/libs/transport/inc/transportInt.h | 3 +- source/libs/transport/src/trans.c | 13 +-- source/libs/transport/src/transCli.c | 120 +++++++++++++---------- source/libs/transport/src/transComm.c | 11 --- source/libs/transport/src/transSrv.c | 5 +- 7 files changed, 91 insertions(+), 84 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 7b9f68103d..0e7d486eab 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -59,9 +59,13 @@ typedef struct { void * pNode; } SNodeMsg; -typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *); +typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf); typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); -typedef int (*RpcRfp)(void *parent, SRpcMsg *, SEpSet *); +/// +// // SRpcMsg code +// REDIERE, +// NOT READY, EpSet +typedef bool (*RpcRfp)(int32_t code); typedef struct SRpcInit { uint16_t localPort; // local port diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 5dab6f0a97..aa8a03f3d2 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -103,6 +103,9 @@ typedef void* queue[2]; /* Return the structure holding the given element. */ #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) +#define TRANS_RETRY_COUNT_LIMIT 10 // retry count limit +#define TRANS_RETRY_INTERVAL 5 // ms retry interval + typedef struct { SRpcInfo* pRpc; // associated SRpcInfo SEpSet epSet; // ip list provided by app @@ -137,14 +140,12 @@ typedef struct { int8_t connType; // connection type cli/srv int64_t rid; // refId returned by taosAddRef + int8_t retryCount; STransCtx appCtx; // STransMsg* pRsp; // for synchronous API tsem_t* pSem; // for synchronous API - int hThrdIdx; - char* ip; - uint32_t port; - // SEpSet* pSet; // for synchronous API + int hThrdIdx; } STransConnCtx; #pragma pack(push, 1) @@ -215,8 +216,6 @@ void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); bool transCompressMsg(char* msg, int32_t len, int32_t* flen); bool transDecompressMsg(char* msg, int32_t len, int32_t* flen); -void transConnCtxDestroy(STransConnCtx* ctx); - void transFreeMsg(void* msg); // @@ -262,8 +261,8 @@ void transUnrefCliHandle(void* handle); void transReleaseCliHandle(void* handle); void transReleaseSrvHandle(void* handle); -void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* pCtx); -void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp); +void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); +void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); void transSendResponse(const STransMsg* msg); void transRegisterMsg(const STransMsg* msg); int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index eaca9b0fc7..56f38a7a55 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -62,8 +62,7 @@ typedef struct { char ckey[TSDB_PASSWORD_LEN]; // ciphering key void (*cfp)(void* parent, SRpcMsg*, SEpSet*); - int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); - int (*retry)(void* parent, SRpcMsg*, SEpSet*); + bool (*retry)(int32_t code); int32_t refCount; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index fa517d6d61..c0da3f9c1f 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -38,7 +38,6 @@ void* rpcOpen(const SRpcInit* pInit) { // register callback handle pRpc->cfp = pInit->cfp; - pRpc->afp = pInit->afp; pRpc->retry = pInit->rfp; if (pInit->connType == TAOS_CONN_SERVER) { @@ -116,19 +115,13 @@ int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } void rpcCancelRequest(int64_t rid) { return; } void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { - char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); - uint32_t port = pEpSet->eps[pEpSet->inUse].port; - transSendRequest(shandle, ip, port, pMsg, NULL); + transSendRequest(shandle, pEpSet, pMsg, NULL); } void rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { - char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); - uint32_t port = pEpSet->eps[pEpSet->inUse].port; - transSendRequest(shandle, ip, port, pMsg, pCtx); + transSendRequest(shandle, pEpSet, pMsg, pCtx); } void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { - char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); - uint32_t port = pEpSet->eps[pEpSet->inUse].port; - transSendRecv(shandle, ip, port, pMsg, pRsp); + transSendRecv(shandle, pEpSet, pMsg, pRsp); } void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b43b8a1e0c..1136cfb1d5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1,5 +1,4 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. +/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 @@ -97,7 +96,7 @@ static void cliSendCb(uv_write_t* req, int status); static void cliConnCb(uv_connect_t* req, int status); static void cliAsyncCb(uv_async_t* handle); -static void cliAppCb(SCliConn* pConn, STransMsg* pMsg); +static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); static SCliConn* cliCreateConn(SCliThrdObj* thrd); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); @@ -227,6 +226,9 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); #define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) #define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release) +#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn) +#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port) + static void* cliWorkThread(void* arg); bool cliMaySendCachedMsg(SCliConn* conn) { @@ -311,14 +313,10 @@ void cliHandleResp(SCliConn* conn) { return; } - if (pCtx == NULL || pCtx->pSem == NULL) { - tTrace("%s cli conn %p handle resp", pTransInst->label, conn); - cliAppCb(conn, &transMsg); - //(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); - } else { - tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn); - memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg)); - tsem_post(pCtx->pSem); + int ret = cliAppCb(conn, &transMsg, pMsg); + if (ret != 0) { + tTrace("try to send req to next node"); + return; } destroyCmsg(pMsg); @@ -375,17 +373,15 @@ void cliHandleExcept(SCliConn* pConn) { } if (pCtx == NULL || pCtx->pSem == NULL) { - tTrace("%s cli conn %p handle except", pTransInst->label, pConn); if (transMsg.ahandle == NULL) { once = true; continue; } - cliAppCb(pConn, &transMsg); - //(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); - } else { - tTrace("%s cli conn(sync) %p handle except", pTransInst->label, pConn); - memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg)); - tsem_post(pCtx->pSem); + } + int ret = cliAppCb(pConn, &transMsg, pMsg); + if (ret != 0) { + tTrace("try to send req to next node"); + return; } destroyCmsg(pMsg); tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); @@ -695,7 +691,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { } } else { STransConnCtx* pCtx = pMsg->ctx; - conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); + conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet)); if (conn != NULL) { tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); } else { @@ -719,10 +715,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { transCtxMerge(&conn->ctx, &pCtx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); - // tTrace("%s cli conn %p queue msg size %d", ((STrans*)pThrd->pTransInst)->label, conn, 2); - // return; - //} - // transDestroyBuffer(&conn->readBuf); cliSend(conn); } else { conn = cliCreateConn(pThrd); @@ -730,8 +722,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { transQueuePush(&conn->cliMsgs, pMsg); conn->hThrdIdx = pCtx->hThrdIdx; - conn->ip = strdup(pMsg->ctx->ip); - conn->port = pMsg->ctx->port; + conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet)); + conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet); int ret = transSetConnOption((uv_tcp_t*)conn->stream); if (ret) { @@ -743,10 +735,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { addr.sin_family = AF_INET; addr.sin_addr.s_addr = taosGetIpv4FromFqdn(conn->ip); addr.sin_port = (uint16_t)htons((uint16_t)conn->port); - // uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); - // handle error in callback if fail to connect - tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); - uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); + tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port); + ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); + if (ret != 0) { + tTrace("%s cli conn %p failed to connect to %s:%d, reason: %s", pTransInst->label, conn, conn->ip, conn->port, + uv_err_name(ret)); + cliHandleExcept(conn); + return; + } } } static void cliAsyncCb(uv_async_t* handle) { @@ -856,12 +852,10 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { } static void transDestroyConnCtx(STransConnCtx* ctx) { - if (ctx != NULL) { - taosMemoryFree(ctx->ip); - } + // taosMemoryFree(ctx); } -// + void cliSendQuit(SCliThrdObj* thrd) { // cli can stop gracefully SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg)); @@ -881,17 +875,41 @@ int cliRBChoseIdx(STrans* pTransInst) { } return index % pTransInst->numOfThreads; } -void cliAppCb(SCliConn* pConn, STransMsg* transMsg) { +int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { SCliThrdObj* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; - if (transMsg->code == TSDB_CODE_RPC_REDIRECT && pTransInst->retry != NULL) { - SMEpSet emsg = {0}; - tDeserializeSMEpSet(transMsg->pCont, transMsg->contLen, &emsg); - pTransInst->retry(pTransInst, transMsg, &(emsg.epSet)); + if (pMsg == NULL || pMsg->ctx == NULL) { + tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); + pTransInst->cfp(pTransInst->parent, pResp, NULL); + return 0; + } + + STransConnCtx* pCtx = pMsg->ctx; + SEpSet* pEpSet = &pCtx->epSet; + if (pTransInst->retry != NULL && pTransInst->retry(pResp->code) && pCtx->retryCount <= TRANS_RETRY_COUNT_LIMIT) { + pCtx->retryCount += 1; + if (pResp->contLen == 0) { + pEpSet->inUse = (pEpSet->inUse++) % pEpSet->numOfEps; + } else { + SMEpSet emsg = {0}; + tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg); + pCtx->epSet = emsg.epSet; + } + // release pConn + cliHandleReq(pMsg, pThrd); + return -1; + } + + if (pCtx->pSem != NULL) { + tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); + memcpy((char*)pCtx->pRsp, (char*)&pResp, sizeof(*pResp)); + tsem_post(pCtx->pSem); } else { - pTransInst->cfp(pTransInst->parent, transMsg, NULL); + tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); + pTransInst->cfp(pTransInst->parent, pResp, pEpSet); } + return 0; } void transCloseClient(void* arg) { @@ -934,18 +952,17 @@ void transReleaseCliHandle(void* handle) { transSendAsync(thrd->asyncPool, &cmsg->q); } -void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* ctx) { +void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { STrans* pTransInst = (STrans*)shandle; - int index = CONN_HOST_THREAD_INDEX((SCliConn*)pMsg->handle); + int index = CONN_HOST_THREAD_INDEX((SCliConn*)pReq->handle); if (index == -1) { index = cliRBChoseIdx(pTransInst); } STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); - pCtx->ahandle = pMsg->ahandle; - pCtx->msgType = pMsg->msgType; - pCtx->ip = strdup(ip); - pCtx->port = port; + pCtx->epSet = *pEpSet; + pCtx->ahandle = pReq->ahandle; + pCtx->msgType = pReq->msgType; pCtx->hThrdIdx = index; if (ctx != NULL) { @@ -955,17 +972,18 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cliMsg->ctx = pCtx; - cliMsg->msg = *pMsg; + cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); cliMsg->type = Normal; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; - tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pMsg, ip, port, pMsg->ahandle); + tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pReq, EPSET_GET_INUSE_IP(&pCtx->epSet), + EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle); ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0); } -void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) { +void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { STrans* pTransInst = (STrans*)shandle; int index = CONN_HOST_THREAD_INDEX(pReq->handle); if (index == -1) { @@ -973,10 +991,9 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq } STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + pCtx->epSet = *pEpSet; pCtx->ahandle = pReq->ahandle; pCtx->msgType = pReq->msgType; - pCtx->ip = strdup(ip); - pCtx->port = port; pCtx->hThrdIdx = index; pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t)); pCtx->pRsp = pRsp; @@ -989,6 +1006,9 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq cliMsg->type = Normal; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; + tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pReq, EPSET_GET_INUSE_IP(&pCtx->epSet), + EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle); + transSendAsync(thrd->asyncPool, &(cliMsg->q)); tsem_t* pSem = pCtx->pSem; tsem_wait(pSem); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index ef595fb0ec..eb42029090 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -93,11 +93,6 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) { return false; } -void transConnCtxDestroy(STransConnCtx* ctx) { - taosMemoryFree(ctx->ip); - taosMemoryFree(ctx); -} - void transFreeMsg(void* msg) { if (msg == NULL) { return; @@ -363,10 +358,4 @@ void transQueueDestroy(STransQueue* queue) { transQueueClear(queue); taosArrayDestroy(queue->q); } -// int32_t transGetExHandle() { -// static -//} -// void transThreadOnce() { -// taosThreadOnce(&transModuleInit, ); -//} #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index c941cace3b..c02cb07101 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -802,7 +802,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, taosThreadOnce(&transModuleInit, uvInitExHandleMgt); transSrvInst++; - // uvOpenExHandleMgt(10000); for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj)); @@ -831,6 +830,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, } else { // TODO: clear all other resource later tError("failed to create worker-thread %d", i); + goto End; } } if (false == addHandleToAcceptloop(srv)) { @@ -840,6 +840,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, if (err == 0) { tDebug("success to create accept-thread"); } else { + tError("failed to create accept-thread"); + goto End; // clear all resource later } @@ -1078,6 +1080,7 @@ void transRegisterMsg(const STransMsg* msg) { transSendAsync(pThrd->asyncPool, &srvMsg->q); uvReleaseExHandle(refId); return; + _return1: tTrace("server handle %p failed to send to register brokenlink", exh); rpcFreeCont(msg->pCont); -- GitLab