diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 3e2f5967840805cde8b9a37d4e437900965f42e3..ab26cfc1555b6f50a064fb8ab41a91bad62d7b88 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -54,12 +54,13 @@ typedef struct { uint16_t clientPort; SRpcMsg rpcMsg; int32_t rspLen; - void *pRsp; - void *pNode; + void * pRsp; + void * pNode; } SNodeMsg; typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *); typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); +typedef int (*RpcRfp)(void *parent, SRpcMsg *, SEpSet *); typedef struct SRpcInit { uint16_t localPort; // local port @@ -80,22 +81,25 @@ typedef struct SRpcInit { RpcCfp cfp; // call back to retrieve the client auth info, for server app only - RpcAfp afp;; + RpcAfp afp; + + // user defined retry func + RpcRfp rfp; void *parent; } SRpcInit; typedef struct { - void *val; + void *val; int32_t (*clone)(void *src, void **dst); - void (*freeFunc)(const void *arg); + void (*freeFunc)(const void *arg); } SRpcCtxVal; typedef struct { - int32_t msgType; - void *val; + int32_t msgType; + void * val; int32_t (*clone)(void *src, void **dst); - void (*freeFunc)(const void *arg); + void (*freeFunc)(const void *arg); } SRpcBrokenlinkVal; typedef struct { diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index ad50948a021ac663bcc012ff6cbf08f86a354e4e..eaca9b0fc765ddee699db235733d9508dd45dc36 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -63,13 +63,14 @@ typedef struct { void (*cfp)(void* parent, SRpcMsg*, SEpSet*); int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); + int (*retry)(void* parent, SRpcMsg*, SEpSet*); - int32_t refCount; - void* parent; - void* idPool; // handle to ID pool - void* tmrCtrl; // handle to timer - SHashObj* hash; // handle returned by hash utility - void* tcphandle; // returned handle from TCP initialization + int32_t refCount; + void* parent; + void* idPool; // handle to ID pool + void* tmrCtrl; // handle to timer + SHashObj* hash; // handle returned by hash utility + void* tcphandle; // returned handle from TCP initialization TdThreadMutex mutex; } SRpcInfo; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 5f9af2eb48715f2f969f6a20b4e53b771f0055ba..fa517d6d61cca34395dd5648a4146a446b9e2e64 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -39,6 +39,7 @@ void* rpcOpen(const SRpcInit* pInit) { // register callback handle pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; + pRpc->retry = pInit->rfp; if (pInit->connType == TAOS_CONN_SERVER) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b2d0e7f020af25a93603b4484593040b7a296c78..46eb0404686f4c48a7cba654a23ee3f9000bbe5e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -97,6 +97,8 @@ static void cliSendCb(uv_write_t* req, int status); static void cliConnCb(uv_connect_t* req, int status); static void cliAsyncCb(uv_async_t* handle); +static void cliAppCb(SCliConn* pConn, STransMsg* pMsg); + static SCliConn* cliCreateConn(SCliThrdObj* thrd); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroy(uv_handle_t* handle); @@ -311,7 +313,8 @@ void cliHandleResp(SCliConn* conn) { if (pCtx == NULL || pCtx->pSem == NULL) { tTrace("%s cli conn %p handle resp", pTransInst->label, conn); - (pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); + cliAppCb(conn, &transMsg); + //(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); } else { tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn); memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg)); @@ -377,7 +380,8 @@ void cliHandleExcept(SCliConn* pConn) { once = true; continue; } - (pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); + cliAppCb(pConn, &transMsg); + //(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); } else { tTrace("%s cli conn(sync) %p handle except", pTransInst->label, pConn); memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg)); @@ -877,6 +881,16 @@ int cliRBChoseIdx(STrans* pTransInst) { } return index % pTransInst->numOfThreads; } +void cliAppCb(SCliConn* pConn, STransMsg* transMsg) { + SCliThrdObj* pThrd = pConn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; + + if (transMsg->code == TSDB_CODE_RPC_REDIRECT && pTransInst->retry != NULL) { + // impl retry + } else { + (*pTransInst->cfp)(pTransInst->parent, transMsg, NULL); + } +} void transCloseClient(void* arg) { SCliObj* cli = arg;