未验证 提交 1af6c539 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #6516 from taosdata/fix/TD-4672

[TD-4672]<fix>: remove peer fqdn IP caching
...@@ -132,6 +132,7 @@ void * syncRestoreData(void *param); ...@@ -132,6 +132,7 @@ void * syncRestoreData(void *param);
int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead); int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead);
void syncRestartConnection(SSyncPeer *pPeer); void syncRestartConnection(SSyncPeer *pPeer);
void syncBroadcastStatus(SSyncNode *pNode); void syncBroadcastStatus(SSyncNode *pNode);
uint32_t syncResolvePeerFqdn(SSyncPeer *pPeer);
SSyncPeer *syncAcquirePeer(int64_t rid); SSyncPeer *syncAcquirePeer(int64_t rid);
void syncReleasePeer(SSyncPeer *pPeer); void syncReleasePeer(SSyncPeer *pPeer);
......
...@@ -559,7 +559,8 @@ static void syncClosePeerConn(SSyncPeer *pPeer) { ...@@ -559,7 +559,8 @@ static void syncClosePeerConn(SSyncPeer *pPeer) {
static void syncRemovePeer(SSyncPeer *pPeer) { static void syncRemovePeer(SSyncPeer *pPeer) {
sInfo("%s, it is removed", pPeer->id); sInfo("%s, it is removed", pPeer->id);
pPeer->ip = 0; //pPeer->ip = 0;
pPeer->fqdn[0] = '\0';
syncClosePeerConn(pPeer); syncClosePeerConn(pPeer);
//taosRemoveRef(tsPeerRefId, pPeer->rid); //taosRemoveRef(tsPeerRefId, pPeer->rid);
syncReleasePeer(pPeer); syncReleasePeer(pPeer);
...@@ -585,20 +586,31 @@ static void syncStopCheckPeerConn(SSyncPeer *pPeer) { ...@@ -585,20 +586,31 @@ static void syncStopCheckPeerConn(SSyncPeer *pPeer) {
sDebug("%s, stop check peer connection", pPeer->id); sDebug("%s, stop check peer connection", pPeer->id);
} }
uint32_t syncResolvePeerFqdn(SSyncPeer *pPeer) {
uint32_t ip = taosGetIpv4FromFqdn(pPeer->fqdn);
if (ip == 0xFFFFFFFF) {
sError("failed to resolve peer fqdn:%s since %s", pPeer->fqdn, strerror(errno));
terrno = TSDB_CODE_RPC_FQDN_ERROR;
return 0;
}
return ip;
}
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
uint32_t ip = taosGetIpv4FromFqdn(pInfo->nodeFqdn); /*uint32_t ip = taosGetIpv4FromFqdn(pInfo->nodeFqdn);
if (ip == 0xFFFFFFFF) { if (ip == 0xFFFFFFFF) {
sError("failed to add peer, can resolve fqdn:%s since %s", pInfo->nodeFqdn, strerror(errno)); sError("failed to add peer, can resolve fqdn:%s since %s", pInfo->nodeFqdn, strerror(errno));
terrno = TSDB_CODE_RPC_FQDN_ERROR; terrno = TSDB_CODE_RPC_FQDN_ERROR;
return NULL; return NULL;
} }
*/
SSyncPeer *pPeer = calloc(1, sizeof(SSyncPeer)); SSyncPeer *pPeer = calloc(1, sizeof(SSyncPeer));
if (pPeer == NULL) return NULL; if (pPeer == NULL) return NULL;
pPeer->nodeId = pInfo->nodeId; pPeer->nodeId = pInfo->nodeId;
tstrncpy(pPeer->fqdn, pInfo->nodeFqdn, sizeof(pPeer->fqdn)); tstrncpy(pPeer->fqdn, pInfo->nodeFqdn, sizeof(pPeer->fqdn));
pPeer->ip = ip; //pPeer->ip = ip;
pPeer->port = pInfo->nodePort; pPeer->port = pInfo->nodePort;
pPeer->fqdn[sizeof(pPeer->fqdn) - 1] = 0; pPeer->fqdn[sizeof(pPeer->fqdn) - 1] = 0;
snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d, nodeId:%d", pNode->vgId, pPeer->nodeId); snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d, nodeId:%d", pNode->vgId, pPeer->nodeId);
...@@ -864,7 +876,7 @@ static void syncRestartPeer(SSyncPeer *pPeer) { ...@@ -864,7 +876,7 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
} }
void syncRestartConnection(SSyncPeer *pPeer) { void syncRestartConnection(SSyncPeer *pPeer) {
if (pPeer->ip == 0) return; if (pPeer->fqdn[0] == '\0') return;
if (syncAcquirePeer(pPeer->rid) == NULL) return; if (syncAcquirePeer(pPeer->rid) == NULL) return;
...@@ -878,7 +890,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) { ...@@ -878,7 +890,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
sInfo("%s, sync-req is received", pPeer->id); sInfo("%s, sync-req is received", pPeer->id);
if (pPeer->ip == 0) return; if (pPeer->fqdn[0] == '\0') return;
if (nodeRole != TAOS_SYNC_ROLE_MASTER) { if (nodeRole != TAOS_SYNC_ROLE_MASTER) {
sError("%s, I am not master anymore", pPeer->id); sError("%s, I am not master anymore", pPeer->id);
...@@ -1090,7 +1102,7 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) { ...@@ -1090,7 +1102,7 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) {
} }
static int32_t syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) { static int32_t syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) {
if (pPeer->peerFd < 0 || pPeer->ip == 0) { if (pPeer->peerFd < 0 || pPeer->fqdn[0] == '\0') {
sDebug("%s, failed to send status msg, restart fd:%d", pPeer->id, pPeer->peerFd); sDebug("%s, failed to send status msg, restart fd:%d", pPeer->id, pPeer->peerFd);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
return -1; return -1;
...@@ -1135,7 +1147,13 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -1135,7 +1147,13 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
return; return;
} }
SOCKET connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); uint32_t ip = syncResolvePeerFqdn(pPeer);
if (!ip) {
taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer);
return;
}
SOCKET connFd = taosOpenTcpClientSocket(ip, pPeer->port, 0);
if (connFd <= 0) { if (connFd <= 0) {
sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno)); sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno));
taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer);
......
...@@ -422,6 +422,12 @@ void *syncRetrieveData(void *param) { ...@@ -422,6 +422,12 @@ void *syncRetrieveData(void *param) {
return NULL; return NULL;
} }
uint32_t ip = syncResolvePeerFqdn(pPeer);
if (!ip) {
syncReleasePeer(pPeer);
return NULL;
}
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
...@@ -430,7 +436,7 @@ void *syncRetrieveData(void *param) { ...@@ -430,7 +436,7 @@ void *syncRetrieveData(void *param) {
if (pNode->notifyFlowCtrlFp) (*pNode->notifyFlowCtrlFp)(pNode->vgId, pPeer->numOfRetrieves); if (pNode->notifyFlowCtrlFp) (*pNode->notifyFlowCtrlFp)(pNode->vgId, pPeer->numOfRetrieves);
pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); pPeer->syncFd = taosOpenTcpClientSocket(ip, pPeer->port, 0);
if (pPeer->syncFd < 0) { if (pPeer->syncFd < 0) {
sError("%s, failed to open socket to sync", pPeer->id); sError("%s, failed to open socket to sync", pPeer->id);
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册