提交 6f004e4c 编写于 作者: S Shengliang Guan

TD-1652

上级 8623f145
......@@ -311,6 +311,16 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
if (pNode == NULL) return 0;
if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) {
sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId,
pWalHead->version, nodeVersion);
for (int i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
syncRestartConnection(pPeer);
}
return TSDB_CODE_SYN_INVALID_VERSION;
}
// always update version
nodeVersion = pWalHead->version;
sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole],
......
......@@ -77,8 +77,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
}
// assign version
pVnode->version++;
pHead->version = pVnode->version;
pHead->version = pVnode->version + 1;
if (pVnode->delay) usleep(pVnode->delay * 1000);
} else { // from wal or forward
......@@ -86,16 +85,16 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
if (pHead->version <= pVnode->version) return 0;
}
pVnode->version = pHead->version;
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync
int32_t syncCode = 0;
syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype);
if (syncCode < 0) return syncCode;
// write into WAL
code = walWrite(pVnode->wal, pHead);
if (code < 0) return code;
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync
int32_t syncCode = 0;
syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype);
if (syncCode < 0) return syncCode;
pVnode->version = pHead->version;
// write data locally
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册