提交 91238e82 编写于 作者: M Minghao Li

refactor(sync): add snapshot2 interface

上级 4033e2dd
...@@ -165,6 +165,7 @@ typedef struct SSyncLogStore { ...@@ -165,6 +165,7 @@ typedef struct SSyncLogStore {
bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore); bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore);
int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore); int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore);
int32_t (*syncLogRestoreFromSnapshot)(struct SSyncLogStore* pLogStore, SyncIndex index); int32_t (*syncLogRestoreFromSnapshot)(struct SSyncLogStore* pLogStore, SyncIndex index);
bool (*syncLogExist)(struct SSyncLogStore* pLogStore, SyncIndex index);
SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore);
SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
......
...@@ -44,6 +44,11 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p ...@@ -44,6 +44,11 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
typedef struct SReaderParam {
SyncIndex start;
SyncIndex end;
} SReaderParam;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -58,11 +58,6 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode); ...@@ -58,11 +58,6 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode);
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg); int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg);
typedef struct SReaderParam {
SyncIndex start;
SyncIndex end;
} SReaderParam;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -108,67 +108,29 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p ...@@ -108,67 +108,29 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
return ret; return ret;
} }
#if 0 // only start once
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { static void syncNodeStartSnapshot(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm,
int32_t ret = 0; SyncAppendEntriesReply* pMsg) {
// get sender
char logBuf[128] = {0}; SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesReplyCb== term:%lu", ths->pRaftStore->currentTerm); ASSERT(pSender != NULL);
syncAppendEntriesReplyLog2(logBuf, pMsg);
SSnapshot snapshot = {
if (pMsg->term < ths->pRaftStore->currentTerm) { .data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID};
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
ths->pRaftStore->currentTerm); void* pReader = NULL;
return ret; SReaderParam readerParam = {.start = beginIndex, .end = endIndex};
} ths->pFsm->FpSnapshotStartRead(ths->pFsm, &pReader);
if (!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) {
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pNextIndex", ths->pNextIndex); ASSERT(pReader != NULL);
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pMatchIndex", ths->pMatchIndex); snapshotSenderStart(pSender, snapshot, pReader);
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
// }
if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "syncNodeOnAppendEntriesReplyCb error term, receive:%lu current:%lu", pMsg->term,
ths->pRaftStore->currentTerm);
syncNodeLog2(logBuf, ths);
sError("%s", logBuf);
return ret;
}
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
if (pMsg->success) {
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
// matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
// maybe commit
syncMaybeAdvanceCommitIndex(ths);
} else { } else {
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); if (pReader != NULL) {
ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader);
// notice! int64, uint64
if (nextIndex > SYNC_INDEX_BEGIN) {
--nextIndex;
} else {
nextIndex = SYNC_INDEX_BEGIN;
} }
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
} }
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pNextIndex", ths->pNextIndex);
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pMatchIndex", ths->pMatchIndex);
return ret;
} }
#endif
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0; int32_t ret = 0;
...@@ -187,6 +149,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ...@@ -187,6 +149,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
return -1; return -1;
} }
// error term
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128]; char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%lu", pMsg->term); snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%lu", pMsg->term);
...@@ -197,19 +160,29 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ...@@ -197,19 +160,29 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
ASSERT(pMsg->term == ths->pRaftStore->currentTerm); ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
if (pMsg->success) { if (pMsg->success) {
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] SyncIndex newNextIndex = pMsg->matchIndex + 1;
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); SyncIndex newMatchIndex = pMsg->matchIndex;
if (gRaftDetailLog) { if (ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex) &&
sTrace("update next match, index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success); ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex - 1)) {
} // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), newNextIndex);
// matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex);
// maybe commit // maybe commit
if (ths->state == TAOS_SYNC_STATE_LEADER) { if (ths->state == TAOS_SYNC_STATE_LEADER) {
syncMaybeAdvanceCommitIndex(ths); syncMaybeAdvanceCommitIndex(ths);
}
} else {
// start snapshot <match+1, old snapshot.end>
SSnapshot snapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
syncNodeStartSnapshot(ths, newMatchIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pMsg);
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), snapshot.lastApplyIndex + 1);
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex);
} }
} else { } else {
...@@ -218,6 +191,27 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ...@@ -218,6 +191,27 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
// notice! int64, uint64 // notice! int64, uint64
if (nextIndex > SYNC_INDEX_BEGIN) { if (nextIndex > SYNC_INDEX_BEGIN) {
--nextIndex; --nextIndex;
if (ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex) &&
ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex - 1)) {
// do nothing
} else {
SSyncRaftEntry* pEntry;
int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, nextIndex, &pEntry);
ASSERT(code == 0);
syncNodeStartSnapshot(ths, SYNC_INDEX_BEGIN, nextIndex, pEntry->term, pMsg);
// get sender
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
ASSERT(pSender != NULL);
SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1;
// update nextIndex to sentryIndex
if (nextIndex <= sentryIndex) {
nextIndex = sentryIndex;
}
}
} else { } else {
nextIndex = SYNC_INDEX_BEGIN; nextIndex = SYNC_INDEX_BEGIN;
} }
......
...@@ -128,38 +128,14 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { ...@@ -128,38 +128,14 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
// next index // next index
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
// if pre-entry not exist, create snapshot
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SSyncRaftEntry* pPreEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preLogIndex, &pPreEntry);
if (code == -1 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
void* pReader = NULL;
SReaderParam readerParam = {.start = 0, .end = nextIndex};
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot, &readerParam, &pReader);
// get sender
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
ASSERT(pSender != NULL);
if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN && nextIndex <= snapshot.lastApplyIndex + 1 &&
!snapshotSenderIsStart(pSender)) {
// start snapshot
snapshotSenderStart(pSender, snapshot, pReader);
} else {
// no snapshot
if (pReader != NULL) {
pSyncNode->pFsm->FpSnapshotStopRead(pSyncNode->pFsm, pReader);
}
}
}
}
// pre index, pre term // pre index, pre term
preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
if (preLogTerm == SYNC_TERM_INVALID) { if (preLogTerm == SYNC_TERM_INVALID) {
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
ASSERT(pSender != NULL);
ASSERT(!snapshotSenderIsStart(pSender));
SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册