From cca2bcdb280275be2daede2ef139cd8a31e26080 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 25 May 2022 13:09:08 +0800 Subject: [PATCH] enh: rpc set default epset --- include/libs/transport/trpc.h | 1 + source/libs/transport/inc/transComm.h | 7 ++-- source/libs/transport/src/trans.c | 4 +++ source/libs/transport/src/transCli.c | 47 +++++++++++++++++++++++++-- 4 files changed, 54 insertions(+), 5 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 754a203471..752a0adc5b 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -125,6 +125,7 @@ void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +void rpcSetDefaultEpSet(void *thandle, const SEpSet *dst); #ifdef __cplusplus } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 30f799f39e..654bfa7158 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -95,8 +95,8 @@ typedef void* queue[2]; #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit -#define TRANS_RETRY_INTERVAL 15 // ms retry interval -#define TRANS_CONN_TIMEOUT 3 // connect timeout +#define TRANS_RETRY_INTERVAL 15 // ms retry interval +#define TRANS_CONN_TIMEOUT 3 // connect timeout typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; @@ -155,7 +155,7 @@ typedef struct { #pragma pack(pop) -typedef enum { Normal, Quit, Release, Register } STransMsgType; +typedef enum { Normal, Quit, Release, Register, Update } STransMsgType; typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus; #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) @@ -231,6 +231,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransM void transSendResponse(const STransMsg* msg); void transRegisterMsg(const STransMsg* msg); int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); +void transSetDefaultEpSet(void* shandle, const SEpSet* dst); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 9e71c87fa5..2e47eb493a 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -148,6 +148,10 @@ void rpcReleaseHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*transReleaseHandle[type])(handle); } +void rpcSetDefaultEpSet(void* thandle, const SEpSet* dst) { + // later + transSetDefaultEpSet(thandle, dst); +} int32_t rpcInit() { // impl later diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 92c5e9faf7..9d43265b80 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -63,7 +63,11 @@ typedef struct SCliThrdObj { SDelayQueue* delayQueue; uint64_t nextTimeout; // next timeout void* pTransInst; // - bool quit; + + bool useDefaultEpSet; + SEpSet defaultEpSet; + + bool quit; } SCliThrdObj; typedef struct SCliObj { @@ -116,7 +120,9 @@ static void cliHandleExcept(SCliConn* conn); static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd); -static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrdObj* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease}; +static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrdObj* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, + cliHandleUpdate}; static void cliSendQuit(SCliThrdObj* thrd); static void destroyUserdata(STransMsg* userdata); @@ -683,6 +689,15 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { transUnrefCliHandle(conn); } } +static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd) { + STransConnCtx* pCtx = pMsg->ctx; + + pThrd->useDefaultEpSet = true; + pThrd->defaultEpSet = pCtx->epSet; + + tsem_post(pCtx->pSem); + destroyCmsg(pMsg); +} SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = NULL; @@ -712,6 +727,10 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { STransConnCtx* pCtx = pMsg->ctx; STrans* pTransInst = pThrd->pTransInst; + if (pThrd->useDefaultEpSet) { + pCtx->epSet = pThrd->defaultEpSet; + } + SCliConn* conn = cliGetConn(pMsg, pThrd); if (conn != NULL) { conn->hThrdIdx = pCtx->hThrdIdx; @@ -1067,4 +1086,28 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM taosMemoryFree(pSem); } +void transSetDefaultEpSet(void* ahandle, const SEpSet* dst) { + STrans* pTransInst = ahandle; + for (int i = 0; i < pTransInst->numOfThreads; i++) { + STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + pCtx->hThrdIdx = i; + pCtx->epSet = *dst; + pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t)); + tsem_init(pCtx->pSem, 0, 0); + + SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + cliMsg->ctx = pCtx; + cliMsg->type = Update; + + SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; + tDebug("send update epset at thread:%d, threadID:%" PRId64 "", i, thrd->thread); + + tsem_t* pSem = pCtx->pSem; + transSendAsync(thrd->asyncPool, &(cliMsg->q)); + + tsem_wait(pSem); + tsem_destroy(pSem); + taosMemoryFree(pSem); + } +} #endif -- GitLab