From 511a4c3a4f3c450bee67b5409f440e6441ec9b6e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 26 May 2022 11:29:31 +0800 Subject: [PATCH] update default epset --- include/libs/transport/trpc.h | 2 +- source/client/src/clientMsgHandler.c | 13 +++++--- source/libs/transport/inc/transComm.h | 10 +++++- source/libs/transport/src/trans.c | 5 +-- source/libs/transport/src/transCli.c | 48 ++++++++++++++++----------- 5 files changed, 49 insertions(+), 29 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 752a0adc5b..02cc78fa81 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -125,7 +125,7 @@ void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -void rpcSetDefaultEpSet(void *thandle, const SEpSet *dst); +void rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); #ifdef __cplusplus } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 33d0d9feb4..f15315fe60 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -59,8 +59,10 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { } if (connectRsp.dnodeNum == 1) { - SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); - rpcSetDefaultEpSet(pTscObj->pAppInfo->pTransporter, &epset); + SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + SEpSet dstEpSet = connectRsp.epSet; + rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn, + dstEpSet.eps[dstEpSet.inUse].fqdn); } else if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) { updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet); } @@ -129,9 +131,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { if (usedbRsp.vgVersion >= 0) { uint64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId; - int32_t code1 = catalogGetHandle(clusterId, &pCatalog); + int32_t code1 = catalogGetHandle(clusterId, &pCatalog); if (code1 != TSDB_CODE_SUCCESS) { - tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId, tstrerror(code1)); + tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId, + tstrerror(code1)); } else { catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid); } @@ -161,7 +164,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash); taosMemoryFreeClear(output.dbVgroup); - tscError("0x%" PRIx64" failed to build use db output since %s", pRequest->requestId, terrstr()); + tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr()); } else if (output.dbVgroup) { struct SCatalog* pCatalog = NULL; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 654bfa7158..7a4c44fe2e 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -104,6 +104,13 @@ typedef SRpcCtxVal STransCtxVal; typedef SRpcInfo STrans; typedef SRpcConnInfo STransHandleInfo; +/*convet from fqdn to ip */ +typedef struct SCvtAddr { + char ip[TSDB_FQDN_LEN]; + char fqdn[TSDB_FQDN_LEN]; + bool cvt; +} SCvtAddr; + typedef struct { SEpSet epSet; // ip list provided by app void* ahandle; // handle provided by app @@ -115,6 +122,7 @@ typedef struct { STransCtx appCtx; // STransMsg* pRsp; // for synchronous API tsem_t* pSem; // for synchronous API + SCvtAddr cvtAddr; int hThrdIdx; } STransConnCtx; @@ -231,7 +239,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransM void transSendResponse(const STransMsg* msg); void transRegisterMsg(const STransMsg* msg); int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); -void transSetDefaultEpSet(void* shandle, const SEpSet* dst); +void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 2e47eb493a..d8b2ca8e0c 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -148,9 +148,10 @@ void rpcReleaseHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*transReleaseHandle[type])(handle); } -void rpcSetDefaultEpSet(void* thandle, const SEpSet* dst) { + +void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { // later - transSetDefaultEpSet(thandle, dst); + transSetDefaultAddr(thandle, ip, fqdn); } int32_t rpcInit() { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 3be37ff4cc..3abaa625f6 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -64,8 +64,7 @@ typedef struct SCliThrdObj { uint64_t nextTimeout; // next timeout void* pTransInst; // - bool useDefaultEpSet; - SEpSet defaultEpSet; + SCvtAddr cvtAddr; bool quit; } SCliThrdObj; @@ -107,6 +106,7 @@ static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle o static void cliDestroy(uv_handle_t* handle); static void cliSend(SCliConn* pConn); +void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); /* * set TCP connection timeout per-socket level */ @@ -692,10 +692,7 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd) { STransConnCtx* pCtx = pMsg->ctx; - pThrd->useDefaultEpSet = true; - pThrd->defaultEpSet = pCtx->epSet; - - // tsem_post(pCtx->pSem); + pThrd->cvtAddr = pCtx->cvtAddr; destroyCmsg(pMsg); } @@ -717,7 +714,17 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { } return conn; } - +void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) { + if (pCvtAddr->cvt == false) { + return; + } + for (int i = 0; i < pEpSet->numOfEps && pEpSet->numOfEps == 1; i++) { + if (strncmp(pEpSet->eps[i].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) { + memset(pEpSet->eps[i].fqdn, 0, TSDB_FQDN_LEN); + memcpy(pEpSet->eps[i].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN); + } + } +} void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; @@ -727,9 +734,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { STransConnCtx* pCtx = pMsg->ctx; STrans* pTransInst = pThrd->pTransInst; - if (pThrd->useDefaultEpSet) { - pCtx->epSet = pThrd->defaultEpSet; - } + cliMayCvtFqdnToIp(&pCtx->epSet, &pCtx->cvtAddr); SCliConn* conn = cliGetConn(pMsg, pThrd); if (conn != NULL) { @@ -860,7 +865,6 @@ static SCliThrdObj* createThrdObj() { pThrd->timer.data = pThrd; pThrd->pool = createConnPool(4); - pThrd->useDefaultEpSet = false; transDQCreate(pThrd->loop, &pThrd->delayQueue); pThrd->quit = false; @@ -1086,28 +1090,32 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM taosMemoryFree(pSem); } -void transSetDefaultEpSet(void* ahandle, const SEpSet* dst) { +/* + * + **/ +void transSetDefaultAddr(void* ahandle, const char* ip, const char* fqdn) { STrans* pTransInst = ahandle; + + SCvtAddr cvtAddr = {0}; + if (ip != NULL && fqdn != NULL) { + memcpy(cvtAddr.ip, ip, strlen(ip)); + memcpy(cvtAddr.fqdn, fqdn, strlen(fqdn)); + cvtAddr.cvt = true; + } for (int i = 0; i < pTransInst->numOfThreads; i++) { STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); pCtx->hThrdIdx = i; - pCtx->epSet = *dst; - // pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t)); - // tsem_init(pCtx->pSem, 0, 0); + pCtx->cvtAddr = cvtAddr; SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cliMsg->ctx = pCtx; cliMsg->type = Update; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; - tDebug("send update epset at thread:%d, threadID:%" PRId64 "", i, thrd->thread); + tDebug("update epset at thread:%d, threadID:%" PRId64 "", i, thrd->thread); tsem_t* pSem = pCtx->pSem; transSendAsync(thrd->asyncPool, &(cliMsg->q)); - // tsem_wait(pSem); - - // tsem_destroy(pSem); - // taosMemoryFree(pSem); } } #endif -- GitLab