diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 70977bba871dd109d8e3d7a9b747df2e5435fa58..839194da94e5a184ab11b446077e334f085d68b5 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -124,6 +124,7 @@ void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +void rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); #ifdef __cplusplus } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index dfce01dd6356f19da8dce1b8de9c2eb9e9ca42e4..f15315fe6055127f13b15849f897d8edda5a381b 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -58,7 +58,12 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { return code; } - if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) { + if (connectRsp.dnodeNum == 1) { + SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + SEpSet dstEpSet = connectRsp.epSet; + rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn, + dstEpSet.eps[dstEpSet.inUse].fqdn); + } else if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) { updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet); } @@ -126,9 +131,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { if (usedbRsp.vgVersion >= 0) { uint64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId; - int32_t code1 = catalogGetHandle(clusterId, &pCatalog); + int32_t code1 = catalogGetHandle(clusterId, &pCatalog); if (code1 != TSDB_CODE_SUCCESS) { - tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId, tstrerror(code1)); + tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId, + tstrerror(code1)); } else { catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid); } @@ -158,7 +164,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash); taosMemoryFreeClear(output.dbVgroup); - tscError("0x%" PRIx64" failed to build use db output since %s", pRequest->requestId, terrstr()); + tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr()); } else if (output.dbVgroup) { struct SCatalog* pCatalog = NULL; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 18a85865df1da125a25815a1030ab448bd2c6c01..91a721d80f243a857487001f8bd5fa7df4e0e7c1 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -104,6 +104,13 @@ typedef SRpcCtxVal STransCtxVal; typedef SRpcInfo STrans; typedef SRpcConnInfo STransHandleInfo; +/*convet from fqdn to ip */ +typedef struct SCvtAddr { + char ip[TSDB_FQDN_LEN]; + char fqdn[TSDB_FQDN_LEN]; + bool cvt; +} SCvtAddr; + typedef struct { SEpSet epSet; // ip list provided by app void* ahandle; // handle provided by app @@ -115,6 +122,7 @@ typedef struct { STransCtx appCtx; // STransMsg* pRsp; // for synchronous API tsem_t* pSem; // for synchronous API + SCvtAddr cvtAddr; int hThrdIdx; } STransConnCtx; @@ -155,7 +163,7 @@ typedef struct { #pragma pack(pop) -typedef enum { Normal, Quit, Release, Register } STransMsgType; +typedef enum { Normal, Quit, Release, Register, Update } STransMsgType; typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus; #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) @@ -231,6 +239,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransM void transSendResponse(const STransMsg* msg); void transRegisterMsg(const STransMsg* msg); int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); +void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 018d5f98c0907789c30769bf811538af98e94039..84b0156e3697996a81f7743940b04d73c20d0a05 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -154,6 +154,11 @@ void rpcReleaseHandle(void* handle, int8_t type) { (*transReleaseHandle[type])(handle); } +void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { + // later + transSetDefaultAddr(thandle, ip, fqdn); +} + int32_t rpcInit() { // impl later return 0; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 0313ea583220a72ad94ba5e833385089763551f5..f0af0d99c9e2097c54b62b9c7d8609db271e1037 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -63,7 +63,10 @@ typedef struct SCliThrdObj { SDelayQueue* delayQueue; uint64_t nextTimeout; // next timeout void* pTransInst; // - bool quit; + + SCvtAddr cvtAddr; + + bool quit; } SCliThrdObj; typedef struct SCliObj { @@ -103,6 +106,7 @@ static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle o static void cliDestroy(uv_handle_t* handle); static void cliSend(SCliConn* pConn); +void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); /* * set TCP connection timeout per-socket level */ @@ -116,7 +120,9 @@ static void cliHandleExcept(SCliConn* conn); static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd); -static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrdObj* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease}; +static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrdObj* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, + NULL, cliHandleUpdate}; static void cliSendQuit(SCliThrdObj* thrd); static void destroyUserdata(STransMsg* userdata); @@ -697,6 +703,12 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { transUnrefCliHandle(conn); } } +static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd) { + STransConnCtx* pCtx = pMsg->ctx; + + pThrd->cvtAddr = pCtx->cvtAddr; + destroyCmsg(pMsg); +} SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = NULL; @@ -716,7 +728,17 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { } return conn; } - +void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) { + if (pCvtAddr->cvt == false) { + return; + } + for (int i = 0; i < pEpSet->numOfEps && pEpSet->numOfEps == 1; i++) { + if (strncmp(pEpSet->eps[i].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) { + memset(pEpSet->eps[i].fqdn, 0, TSDB_FQDN_LEN); + memcpy(pEpSet->eps[i].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN); + } + } +} void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; @@ -726,6 +748,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { STransConnCtx* pCtx = pMsg->ctx; STrans* pTransInst = pThrd->pTransInst; + cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); + SCliConn* conn = cliGetConn(pMsg, pThrd); if (conn != NULL) { conn->hThrdIdx = pCtx->hThrdIdx; @@ -855,7 +879,6 @@ static SCliThrdObj* createThrdObj() { pThrd->timer.data = pThrd; pThrd->pool = createConnPool(4); - transDQCreate(pThrd->loop, &pThrd->delayQueue); pThrd->quit = false; @@ -1088,4 +1111,32 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM taosMemoryFree(pSem); } +/* + * + **/ +void transSetDefaultAddr(void* ahandle, const char* ip, const char* fqdn) { + STrans* pTransInst = ahandle; + + SCvtAddr cvtAddr = {0}; + if (ip != NULL && fqdn != NULL) { + memcpy(cvtAddr.ip, ip, strlen(ip)); + memcpy(cvtAddr.fqdn, fqdn, strlen(fqdn)); + cvtAddr.cvt = true; + } + for (int i = 0; i < pTransInst->numOfThreads; i++) { + STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + pCtx->hThrdIdx = i; + pCtx->cvtAddr = cvtAddr; + + SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + cliMsg->ctx = pCtx; + cliMsg->type = Update; + + SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; + tDebug("update epset at thread:%d, threadID:%" PRId64 "", i, thrd->thread); + + tsem_t* pSem = pCtx->pSem; + transSendAsync(thrd->asyncPool, &(cliMsg->q)); + } +} #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 09d47960d482400af7c38adaae6143f01f46f2be..12cfff093c02ae9d2997378d3b4e3b55685bd548 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -146,7 +146,7 @@ static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd); static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, - uvHandleRegister}; + uvHandleRegister, NULL}; static int32_t exHandlesMgt;