From 6f004e4c7a759b9fc4d3ecfab8725aca10e87ea5 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 14 Oct 2020 22:27:19 +0800 Subject: [PATCH] TD-1652 --- src/sync/src/syncMain.c | 10 ++++++++++ src/vnode/src/vnodeWrite.c | 13 ++++++------- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index f5f1ff5853..ef635e6efc 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -311,6 +311,16 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { if (pNode == NULL) return 0; + if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) { + sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, + pWalHead->version, nodeVersion); + for (int i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + syncRestartConnection(pPeer); + } + return TSDB_CODE_SYN_INVALID_VERSION; + } + // always update version nodeVersion = pWalHead->version; sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole], diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index b1f4539a6c..70b08b9669 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -77,8 +77,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { } // assign version - pVnode->version++; - pHead->version = pVnode->version; + pHead->version = pVnode->version + 1; if (pVnode->delay) usleep(pVnode->delay * 1000); } else { // from wal or forward @@ -86,16 +85,16 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { if (pHead->version <= pVnode->version) return 0; } - pVnode->version = pHead->version; + // forward to peers, even it is WAL/FWD, it shall be called to update version in sync + int32_t syncCode = 0; + syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype); + if (syncCode < 0) return syncCode; // write into WAL code = walWrite(pVnode->wal, pHead); if (code < 0) return code; - // forward to peers, even it is WAL/FWD, it shall be called to update version in sync - int32_t syncCode = 0; - syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype); - if (syncCode < 0) return syncCode; + pVnode->version = pHead->version; // write data locally code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item); -- GitLab