diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index ec6dfcbc823426d4c535b540188e9e53a656b14b..411e706fb11a7052f5ed3ad162bd3201e212e552 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -132,6 +132,7 @@ void * syncRestoreData(void *param); int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead); void syncRestartConnection(SSyncPeer *pPeer); void syncBroadcastStatus(SSyncNode *pNode); +uint32_t syncResolvePeerFqdn(SSyncPeer *pPeer); SSyncPeer *syncAcquirePeer(int64_t rid); void syncReleasePeer(SSyncPeer *pPeer); diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 8ce37cbf78378f9e1a70e3483c6ea5ff15d898de..7abbd60b6c570d60886a1e78698f892fe779db04 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -559,7 +559,8 @@ static void syncClosePeerConn(SSyncPeer *pPeer) { static void syncRemovePeer(SSyncPeer *pPeer) { sInfo("%s, it is removed", pPeer->id); - pPeer->ip = 0; + //pPeer->ip = 0; + pPeer->fqdn[0] = '\0'; syncClosePeerConn(pPeer); //taosRemoveRef(tsPeerRefId, pPeer->rid); syncReleasePeer(pPeer); @@ -585,20 +586,31 @@ static void syncStopCheckPeerConn(SSyncPeer *pPeer) { sDebug("%s, stop check peer connection", pPeer->id); } +uint32_t syncResolvePeerFqdn(SSyncPeer *pPeer) { + uint32_t ip = taosGetIpv4FromFqdn(pPeer->fqdn); + if (ip == 0xFFFFFFFF) { + sError("failed to resolve peer fqdn:%s since %s", pPeer->fqdn, strerror(errno)); + terrno = TSDB_CODE_RPC_FQDN_ERROR; + return 0; + } + + return ip; +} + static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { - uint32_t ip = taosGetIpv4FromFqdn(pInfo->nodeFqdn); + /*uint32_t ip = taosGetIpv4FromFqdn(pInfo->nodeFqdn); if (ip == 0xFFFFFFFF) { sError("failed to add peer, can resolve fqdn:%s since %s", pInfo->nodeFqdn, strerror(errno)); terrno = TSDB_CODE_RPC_FQDN_ERROR; return NULL; } - + */ SSyncPeer *pPeer = calloc(1, sizeof(SSyncPeer)); if (pPeer == NULL) return NULL; pPeer->nodeId = pInfo->nodeId; tstrncpy(pPeer->fqdn, pInfo->nodeFqdn, sizeof(pPeer->fqdn)); - pPeer->ip = ip; + //pPeer->ip = ip; pPeer->port = pInfo->nodePort; pPeer->fqdn[sizeof(pPeer->fqdn) - 1] = 0; snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d, nodeId:%d", pNode->vgId, pPeer->nodeId); @@ -864,7 +876,8 @@ static void syncRestartPeer(SSyncPeer *pPeer) { } void syncRestartConnection(SSyncPeer *pPeer) { - if (pPeer->ip == 0) return; + //if (pPeer->ip == 0) return; + if (pPeer->fqdn[0] == '\0') return; if (syncAcquirePeer(pPeer->rid) == NULL) return; @@ -878,7 +891,8 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; sInfo("%s, sync-req is received", pPeer->id); - if (pPeer->ip == 0) return; + //if (pPeer->ip == 0) return; + if (!syncResolvePeerFqdn(pPeer)) return; if (nodeRole != TAOS_SYNC_ROLE_MASTER) { sError("%s, I am not master anymore", pPeer->id); @@ -1090,7 +1104,7 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) { } static int32_t syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) { - if (pPeer->peerFd < 0 || pPeer->ip == 0) { + if (pPeer->peerFd < 0/* || pPeer->ip == 0*/) { sDebug("%s, failed to send status msg, restart fd:%d", pPeer->id, pPeer->peerFd); syncRestartConnection(pPeer); return -1; @@ -1135,7 +1149,10 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { return; } - SOCKET connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); + uint32_t ip = syncResolvePeerFqdn(pPeer); + if (!ip) return; + + SOCKET connFd = taosOpenTcpClientSocket(ip, pPeer->port, 0); if (connFd <= 0) { sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno)); taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer); diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 505ba68c41f2c2373dba3322b6ed31ba0ac853f1..c86ab8549974712658ad3d381c4141427c000762 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -422,6 +422,12 @@ void *syncRetrieveData(void *param) { return NULL; } + uint32_t ip = syncResolvePeerFqdn(pPeer); + if (!ip) { + syncReleasePeer(pPeer); + return NULL; + } + SSyncNode *pNode = pPeer->pSyncNode; taosBlockSIGPIPE(); @@ -430,7 +436,7 @@ void *syncRetrieveData(void *param) { if (pNode->notifyFlowCtrlFp) (*pNode->notifyFlowCtrlFp)(pNode->vgId, pPeer->numOfRetrieves); - pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); + pPeer->syncFd = taosOpenTcpClientSocket(ip, pPeer->port, 0); if (pPeer->syncFd < 0) { sError("%s, failed to open socket to sync", pPeer->id); } else {