diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index de8acdbec183e18596bb6dc6a63fc8b1b94616f8..c35a56299819c1be3005c928bb9541de6907e408 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -41,7 +41,7 @@ static int32_t tsSyncRefId = -1; static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); static void syncRecoverFromMaster(SSyncPeer *pPeer); static void syncCheckPeerConnection(void *param, void *tmrId); -static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId); +static int32_t syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId); static void syncProcessBrokenLink(void *param); static int32_t syncProcessPeerMsg(void *param, void *buffer); static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); @@ -954,8 +954,12 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) { return code; } -static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) { - if (pPeer->peerFd < 0 || pPeer->ip == 0) return; +static int32_t syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) { + if (pPeer->peerFd < 0 || pPeer->ip == 0) { + sDebug("%s, failed to send status msg, restart fd:%d", pPeer->id, pPeer->peerFd); + syncRestartConnection(pPeer); + return -1; + } SSyncNode * pNode = pPeer->pSyncNode; SPeersStatus msg; @@ -978,9 +982,11 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type sDebug("%s, status is sent, self:%s:%s:%" PRIu64 ", peer:%s:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d", pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role], syncStatus[pPeer->sstatus], pPeer->version, ack, tranId, statusType[type], pPeer->peerFd); + return 0; } else { sDebug("%s, failed to send status msg, restart", pPeer->id); syncRestartConnection(pPeer); + return -1; } }