提交 511a4c3a 编写于 作者: dengyihao's avatar dengyihao

update default epset

上级 ad162a34
...@@ -125,7 +125,7 @@ void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); ...@@ -125,7 +125,7 @@ void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
void rpcSetDefaultEpSet(void *thandle, const SEpSet *dst); void rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -59,8 +59,10 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -59,8 +59,10 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
} }
if (connectRsp.dnodeNum == 1) { if (connectRsp.dnodeNum == 1) {
SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
rpcSetDefaultEpSet(pTscObj->pAppInfo->pTransporter, &epset); SEpSet dstEpSet = connectRsp.epSet;
rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
dstEpSet.eps[dstEpSet.inUse].fqdn);
} else if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) { } else if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet); updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
} }
...@@ -129,9 +131,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -129,9 +131,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
if (usedbRsp.vgVersion >= 0) { if (usedbRsp.vgVersion >= 0) {
uint64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId; uint64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
int32_t code1 = catalogGetHandle(clusterId, &pCatalog); int32_t code1 = catalogGetHandle(clusterId, &pCatalog);
if (code1 != TSDB_CODE_SUCCESS) { if (code1 != TSDB_CODE_SUCCESS) {
tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId, tstrerror(code1)); tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId,
tstrerror(code1));
} else { } else {
catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid); catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid);
} }
...@@ -161,7 +164,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -161,7 +164,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash); if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
taosMemoryFreeClear(output.dbVgroup); taosMemoryFreeClear(output.dbVgroup);
tscError("0x%" PRIx64" failed to build use db output since %s", pRequest->requestId, terrstr()); tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr());
} else if (output.dbVgroup) { } else if (output.dbVgroup) {
struct SCatalog* pCatalog = NULL; struct SCatalog* pCatalog = NULL;
......
...@@ -104,6 +104,13 @@ typedef SRpcCtxVal STransCtxVal; ...@@ -104,6 +104,13 @@ typedef SRpcCtxVal STransCtxVal;
typedef SRpcInfo STrans; typedef SRpcInfo STrans;
typedef SRpcConnInfo STransHandleInfo; typedef SRpcConnInfo STransHandleInfo;
/*convet from fqdn to ip */
typedef struct SCvtAddr {
char ip[TSDB_FQDN_LEN];
char fqdn[TSDB_FQDN_LEN];
bool cvt;
} SCvtAddr;
typedef struct { typedef struct {
SEpSet epSet; // ip list provided by app SEpSet epSet; // ip list provided by app
void* ahandle; // handle provided by app void* ahandle; // handle provided by app
...@@ -115,6 +122,7 @@ typedef struct { ...@@ -115,6 +122,7 @@ typedef struct {
STransCtx appCtx; // STransCtx appCtx; //
STransMsg* pRsp; // for synchronous API STransMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API tsem_t* pSem; // for synchronous API
SCvtAddr cvtAddr;
int hThrdIdx; int hThrdIdx;
} STransConnCtx; } STransConnCtx;
...@@ -231,7 +239,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransM ...@@ -231,7 +239,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransM
void transSendResponse(const STransMsg* msg); void transSendResponse(const STransMsg* msg);
void transRegisterMsg(const STransMsg* msg); void transRegisterMsg(const STransMsg* msg);
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); int transGetConnInfo(void* thandle, STransHandleInfo* pInfo);
void transSetDefaultEpSet(void* shandle, const SEpSet* dst); void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
......
...@@ -148,9 +148,10 @@ void rpcReleaseHandle(void* handle, int8_t type) { ...@@ -148,9 +148,10 @@ void rpcReleaseHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*transReleaseHandle[type])(handle); (*transReleaseHandle[type])(handle);
} }
void rpcSetDefaultEpSet(void* thandle, const SEpSet* dst) {
void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
// later // later
transSetDefaultEpSet(thandle, dst); transSetDefaultAddr(thandle, ip, fqdn);
} }
int32_t rpcInit() { int32_t rpcInit() {
......
...@@ -64,8 +64,7 @@ typedef struct SCliThrdObj { ...@@ -64,8 +64,7 @@ typedef struct SCliThrdObj {
uint64_t nextTimeout; // next timeout uint64_t nextTimeout; // next timeout
void* pTransInst; // void* pTransInst; //
bool useDefaultEpSet; SCvtAddr cvtAddr;
SEpSet defaultEpSet;
bool quit; bool quit;
} SCliThrdObj; } SCliThrdObj;
...@@ -107,6 +106,7 @@ static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle o ...@@ -107,6 +106,7 @@ static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle o
static void cliDestroy(uv_handle_t* handle); static void cliDestroy(uv_handle_t* handle);
static void cliSend(SCliConn* pConn); static void cliSend(SCliConn* pConn);
void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
/* /*
* set TCP connection timeout per-socket level * set TCP connection timeout per-socket level
*/ */
...@@ -692,10 +692,7 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -692,10 +692,7 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd) {
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
pThrd->useDefaultEpSet = true; pThrd->cvtAddr = pCtx->cvtAddr;
pThrd->defaultEpSet = pCtx->epSet;
// tsem_post(pCtx->pSem);
destroyCmsg(pMsg); destroyCmsg(pMsg);
} }
...@@ -717,7 +714,17 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -717,7 +714,17 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
} }
return conn; return conn;
} }
void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
if (pCvtAddr->cvt == false) {
return;
}
for (int i = 0; i < pEpSet->numOfEps && pEpSet->numOfEps == 1; i++) {
if (strncmp(pEpSet->eps[i].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) {
memset(pEpSet->eps[i].fqdn, 0, TSDB_FQDN_LEN);
memcpy(pEpSet->eps[i].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN);
}
}
}
void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
uint64_t et = taosGetTimestampUs(); uint64_t et = taosGetTimestampUs();
uint64_t el = et - pMsg->st; uint64_t el = et - pMsg->st;
...@@ -727,9 +734,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -727,9 +734,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
if (pThrd->useDefaultEpSet) { cliMayCvtFqdnToIp(&pCtx->epSet, &pCtx->cvtAddr);
pCtx->epSet = pThrd->defaultEpSet;
}
SCliConn* conn = cliGetConn(pMsg, pThrd); SCliConn* conn = cliGetConn(pMsg, pThrd);
if (conn != NULL) { if (conn != NULL) {
...@@ -860,7 +865,6 @@ static SCliThrdObj* createThrdObj() { ...@@ -860,7 +865,6 @@ static SCliThrdObj* createThrdObj() {
pThrd->timer.data = pThrd; pThrd->timer.data = pThrd;
pThrd->pool = createConnPool(4); pThrd->pool = createConnPool(4);
pThrd->useDefaultEpSet = false;
transDQCreate(pThrd->loop, &pThrd->delayQueue); transDQCreate(pThrd->loop, &pThrd->delayQueue);
pThrd->quit = false; pThrd->quit = false;
...@@ -1086,28 +1090,32 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM ...@@ -1086,28 +1090,32 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
taosMemoryFree(pSem); taosMemoryFree(pSem);
} }
void transSetDefaultEpSet(void* ahandle, const SEpSet* dst) { /*
*
**/
void transSetDefaultAddr(void* ahandle, const char* ip, const char* fqdn) {
STrans* pTransInst = ahandle; STrans* pTransInst = ahandle;
SCvtAddr cvtAddr = {0};
if (ip != NULL && fqdn != NULL) {
memcpy(cvtAddr.ip, ip, strlen(ip));
memcpy(cvtAddr.fqdn, fqdn, strlen(fqdn));
cvtAddr.cvt = true;
}
for (int i = 0; i < pTransInst->numOfThreads; i++) { for (int i = 0; i < pTransInst->numOfThreads; i++) {
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
pCtx->hThrdIdx = i; pCtx->hThrdIdx = i;
pCtx->epSet = *dst; pCtx->cvtAddr = cvtAddr;
// pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t));
// tsem_init(pCtx->pSem, 0, 0);
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cliMsg->ctx = pCtx; cliMsg->ctx = pCtx;
cliMsg->type = Update; cliMsg->type = Update;
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
tDebug("send update epset at thread:%d, threadID:%" PRId64 "", i, thrd->thread); tDebug("update epset at thread:%d, threadID:%" PRId64 "", i, thrd->thread);
tsem_t* pSem = pCtx->pSem; tsem_t* pSem = pCtx->pSem;
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transSendAsync(thrd->asyncPool, &(cliMsg->q));
// tsem_wait(pSem);
// tsem_destroy(pSem);
// taosMemoryFree(pSem);
} }
} }
#endif #endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册