diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index f5f1ff5853463e5724dd7bf2684cb97e2cf61263..ef635e6efc1ca5f071c64dbe00920c3987837494 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -311,6 +311,16 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { if (pNode == NULL) return 0; + if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) { + sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, + pWalHead->version, nodeVersion); + for (int i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + syncRestartConnection(pPeer); + } + return TSDB_CODE_SYN_INVALID_VERSION; + } + // always update version nodeVersion = pWalHead->version; sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole], diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index b1f4539a6c5d35c31fed78a95ee794619c5a3df3..70b08b9669d0a66f3f06596305d10091e7c1d36a 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -77,8 +77,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { } // assign version - pVnode->version++; - pHead->version = pVnode->version; + pHead->version = pVnode->version + 1; if (pVnode->delay) usleep(pVnode->delay * 1000); } else { // from wal or forward @@ -86,16 +85,16 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { if (pHead->version <= pVnode->version) return 0; } - pVnode->version = pHead->version; + // forward to peers, even it is WAL/FWD, it shall be called to update version in sync + int32_t syncCode = 0; + syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype); + if (syncCode < 0) return syncCode; // write into WAL code = walWrite(pVnode->wal, pHead); if (code < 0) return code; - // forward to peers, even it is WAL/FWD, it shall be called to update version in sync - int32_t syncCode = 0; - syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype); - if (syncCode < 0) return syncCode; + pVnode->version = pHead->version; // write data locally code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);