提交 2e640e8e 编写于 作者: B Benguang Zhao

feat: start snapshot in recovery mode of sync repl mgr with lastMatchTerm in reply msg

上级 f68e41a4
......@@ -105,7 +105,7 @@ typedef struct SyncAppendEntriesReply {
SRaftId destId;
// private data
SyncTerm term;
SyncTerm privateTerm;
SyncTerm lastMatchTerm;
bool success;
SyncIndex matchIndex;
SyncIndex lastSendIndex;
......
......@@ -103,7 +103,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf);
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry);
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm);
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode);
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm);
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex);
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode);
......
......@@ -206,7 +206,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
accepted = true;
_SEND_RESPONSE:
pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths);
pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, &pReply->lastMatchTerm);
bool matched = (pReply->matchIndex >= pReply->lastSendIndex);
if (accepted && matched) {
pReply->success = true;
......
......@@ -2170,7 +2170,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
}
// proceed match index, with replicating on needed
SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths);
SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL);
sDebug("vgId:%d, append raft entry. index: %" PRId64 ", term: %" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
", %" PRId64 ")",
......
......@@ -76,17 +76,17 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S
if (prevIndex == -1) return 0;
if (index - 1 > pBuf->matchIndex) {
if (prevIndex > pBuf->matchIndex) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
ASSERT(index - 1 == prevIndex);
if (index - 1 >= pBuf->startIndex) {
pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
if (prevIndex >= pBuf->startIndex) {
pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
ASSERT(pEntry != NULL && "no log entry found");
prevLogTerm = pBuf->entries[(index + pBuf->size) % pBuf->size].prevLogTerm;
prevLogTerm = pEntry->term;
return prevLogTerm;
}
......@@ -354,7 +354,7 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
return 0;
}
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm) {
taosThreadMutexLock(&pBuf->mutex);
syncLogBufferValidate(pBuf);
......@@ -419,6 +419,9 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
_out:
pBuf->matchIndex = matchIndex;
if (pMatchTerm) {
*pMatchTerm = pBuf->entries[(matchIndex + pBuf->size) % pBuf->size].pItem->term;
}
syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex);
return matchIndex;
......@@ -615,16 +618,16 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
ASSERT(pMgr->restored == false);
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
syncUtilU642Addr(destId.addr, host, sizeof(host), &port);
if (pMgr->endIndex == 0) {
ASSERT(pMgr->startIndex == 0);
ASSERT(pMgr->matchIndex == 0);
if (pMsg->matchIndex < 0) {
pMgr->restored = true;
sInfo("vgId:%d, sync log repl mgr of peer %s:%d restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
sInfo("vgId:%d, sync log repl mgr of peer %s:%d (%" PRIx64 ") restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
"), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, host, port, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0;
}
......@@ -638,9 +641,9 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
if (pMsg->matchIndex == pMsg->lastSendIndex) {
pMgr->restored = true;
sInfo("vgId:%d, sync log repl mgr of peer %s:%d restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
sInfo("vgId:%d, sync log repl mgr of peer %s:%d (%" PRIx64 ") restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
"), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, host, port, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0;
}
......@@ -648,23 +651,38 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
(void)syncLogReplMgrReset(pMgr);
}
// check existence of WAl log
// check last match term
SyncTerm term = -1;
SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
if (pMsg->matchIndex + 1 < firstVer) {
if (syncNodeStartSnapshot(pNode, &destId) < 0) {
sError("vgId:%d, failed to start snapshot for dest: 0x%016" PRIx64, pNode->vgId, destId.addr);
SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);
if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
term = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index + 1);
if (term < 0 || (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
if (syncNodeStartSnapshot(pNode, &destId) < 0) {
sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port);
}
return 0;
}
ASSERT(index + 1 >= firstVer);
if (term == pMsg->lastMatchTerm) {
index = index + 1;
ASSERT(index <= pNode->pLogBuf->matchIndex);
} else {
ASSERT(index > firstVer);
}
return 0;
}
// send match index
SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);
bool barrier = false;
SyncTerm term = -1;
// attempt to replicate the raft log at index
bool barrier = false;
ASSERT(index >= 0);
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, &destId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, destId.addr);
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", peer %s:%d", pNode->vgId,
terrstr(), index, host, port);
return -1;
}
......
......@@ -373,7 +373,7 @@ void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
sNTrace(pSyncNode,
"send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64
", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
host, port, pMsg->term, pMsg->lastMatchTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
}
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
......@@ -384,7 +384,7 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
sNTrace(pSyncNode,
"recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64
", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
host, port, pMsg->term, pMsg->lastMatchTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
}
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册