diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 77397f490ab1e3a27c289e1c9aad6c51a4161414..9a73348b9311c90ea66174107e6b62707215b415 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -333,11 +333,26 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { if (pNode == NULL) return 0; - // always update version - nodeVersion = pWalHead->version; - sDebug("replica:%d nodeRole:%d qtype:%d", pNode->replica, nodeRole, qtype); + if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) { + if (pWalHead->version != nodeVersion + 1) { + sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", + pNode->vgId, pWalHead->version, nodeVersion); + for (int i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + syncRestartConnection(pPeer); + } + } else { + // always update version + nodeVersion = pWalHead->version; + sDebug("vgId:%d, update version, replica:%d nodeRole:%d qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, + nodeRole, qtype, pWalHead->version); + } + return 0; + } - if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; + nodeVersion = pWalHead->version; + sDebug("vgId:%d, forward will send, replica:%d nodeRole:%d qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, + nodeRole, qtype, pWalHead->version); // only pkt from RPC or CQ can be forwarded if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; @@ -888,12 +903,14 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { SWalHead * pHead = (SWalHead *)cont; sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version); +#if 0 if (pHead->version != nodeVersion + 1) { sError("%s, forward is received, ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64, pPeer->id, pHead->version, nodeVersion); syncRestartConnection(pPeer); return; } +#endif if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { // nodeVersion = pHead->version;