提交 143a2e85 编写于 作者: B Benguang Zhao

feat: start snapshot replication to rollback in recovery mode

上级 68f8ebac
...@@ -188,7 +188,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -188,7 +188,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
goto _IGNORE; goto _IGNORE;
} }
if (pMsg->prevLogIndex + 1 != pEntry->index) { if (pMsg->prevLogIndex + 1 != pEntry->index || pEntry->term < 0) {
sError("vgId:%d, invalid previous log index in msg. index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 sError("vgId:%d, invalid previous log index in msg. index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64
", prevLogTerm:%" PRId64, ", prevLogTerm:%" PRId64,
ths->vgId, pEntry->index, pEntry->term, pMsg->prevLogIndex, pMsg->prevLogTerm); ths->vgId, pEntry->index, pEntry->term, pMsg->prevLogIndex, pMsg->prevLogTerm);
......
...@@ -85,7 +85,6 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -85,7 +85,6 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
sError("vgId:%d, failed to get log repl mgr for src addr: 0x%016" PRIx64, ths->vgId, pMsg->srcId.addr); sError("vgId:%d, failed to get log repl mgr for src addr: 0x%016" PRIx64, ths->vgId, pMsg->srcId.addr);
return -1; return -1;
} }
ASSERT(pMgr != NULL);
(void)syncLogReplMgrProcessReply(pMgr, ths, pMsg); (void)syncLogReplMgrProcessReply(pMgr, ths, pMsg);
} }
return 0; return 0;
......
...@@ -90,7 +90,7 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S ...@@ -90,7 +90,7 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S
return prevLogTerm; return prevLogTerm;
} }
if (pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) { if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs; int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
ASSERT(timeMs != 0 && "no log entry found"); ASSERT(timeMs != 0 && "no log entry found");
prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term; prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
...@@ -277,7 +277,11 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt ...@@ -277,7 +277,11 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
" %" PRId64 " %" PRId64 ", %" PRId64 ")", " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
pBuf->endIndex); pBuf->endIndex);
ret = 0; SyncTerm term = syncLogReplMgrGetPrevLogTerm(NULL, pNode, index + 1);
ASSERT(pEntry->term >= 0);
if (term == pEntry->term) {
ret = 0;
}
goto _out; goto _out;
} }
...@@ -655,7 +659,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ...@@ -655,7 +659,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true; pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
if (pMsg->matchIndex == pMsg->lastSendIndex) { if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
pMgr->restored = true; pMgr->restored = true;
sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), repl mgr(rs:%d): [%" PRId64 " %" PRId64 sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), repl mgr(rs:%d): [%" PRId64 " %" PRId64
", %" PRId64 "), log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", ", %" PRId64 "), log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
...@@ -664,6 +668,17 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ...@@ -664,6 +668,17 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
return 0; return 0;
} }
if (pMsg->success == false && pMsg->matchIndex >= pMsg->lastSendIndex) {
sError("vgId:%d, failed to rollback match index. peer: %s:%d, match index: %" PRId64 ", last sent: %" PRId64, pNode->vgId,
host, port, pMsg->matchIndex, pMsg->lastSendIndex);
if (syncNodeStartSnapshot(pNode, &destId) < 0) {
sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port);
return -1;
}
sInfo("vgId:%d, snapshot replication to rollback. peer: %s:%d", pNode->vgId, host, port);
return 0;
}
(void)syncLogReplMgrReset(pMgr); (void)syncLogReplMgrReset(pMgr);
} }
...@@ -681,7 +696,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ...@@ -681,7 +696,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port); sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port);
return -1; return -1;
} }
sInfo("vgId:%d, snapshot replication to peer %s:%d started", pNode->vgId, host, port); sInfo("vgId:%d, snapshot replication to peer %s:%d", pNode->vgId, host, port);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册